test_device.py revision e76b9f3dde800b44c4151ebee9ff469b6714d8aa
1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3# 4# Copyright (C) 2015 The Android Open Source Project 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18from __future__ import print_function 19 20import contextlib 21import hashlib 22import os 23import posixpath 24import random 25import re 26import shlex 27import shutil 28import signal 29import socket 30import string 31import subprocess 32import sys 33import tempfile 34import time 35import unittest 36 37import mock 38 39import adb 40 41 42def requires_root(func): 43 def wrapper(self, *args): 44 if self.device.get_prop('ro.debuggable') != '1': 45 raise unittest.SkipTest('requires rootable build') 46 47 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' 48 if not was_root: 49 self.device.root() 50 self.device.wait() 51 52 try: 53 func(self, *args) 54 finally: 55 if not was_root: 56 self.device.unroot() 57 self.device.wait() 58 59 return wrapper 60 61 62def requires_non_root(func): 63 def wrapper(self, *args): 64 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' 65 if was_root: 66 self.device.unroot() 67 self.device.wait() 68 69 try: 70 func(self, *args) 71 finally: 72 if was_root: 73 self.device.root() 74 self.device.wait() 75 76 return wrapper 77 78 79class GetDeviceTest(unittest.TestCase): 80 def setUp(self): 81 self.android_serial = os.getenv('ANDROID_SERIAL') 82 if 'ANDROID_SERIAL' in os.environ: 83 del os.environ['ANDROID_SERIAL'] 84 85 def tearDown(self): 86 if self.android_serial is not None: 87 os.environ['ANDROID_SERIAL'] = self.android_serial 88 else: 89 if 'ANDROID_SERIAL' in os.environ: 90 del os.environ['ANDROID_SERIAL'] 91 92 @mock.patch('adb.device.get_devices') 93 def test_explicit(self, mock_get_devices): 94 mock_get_devices.return_value = ['foo', 'bar'] 95 device = adb.get_device('foo') 96 self.assertEqual(device.serial, 'foo') 97 98 @mock.patch('adb.device.get_devices') 99 def test_from_env(self, mock_get_devices): 100 mock_get_devices.return_value = ['foo', 'bar'] 101 os.environ['ANDROID_SERIAL'] = 'foo' 102 device = adb.get_device() 103 self.assertEqual(device.serial, 'foo') 104 105 @mock.patch('adb.device.get_devices') 106 def test_arg_beats_env(self, mock_get_devices): 107 mock_get_devices.return_value = ['foo', 'bar'] 108 os.environ['ANDROID_SERIAL'] = 'bar' 109 device = adb.get_device('foo') 110 self.assertEqual(device.serial, 'foo') 111 112 @mock.patch('adb.device.get_devices') 113 def test_no_such_device(self, mock_get_devices): 114 mock_get_devices.return_value = ['foo', 'bar'] 115 self.assertRaises(adb.DeviceNotFoundError, adb.get_device, ['baz']) 116 117 os.environ['ANDROID_SERIAL'] = 'baz' 118 self.assertRaises(adb.DeviceNotFoundError, adb.get_device) 119 120 @mock.patch('adb.device.get_devices') 121 def test_unique_device(self, mock_get_devices): 122 mock_get_devices.return_value = ['foo'] 123 device = adb.get_device() 124 self.assertEqual(device.serial, 'foo') 125 126 @mock.patch('adb.device.get_devices') 127 def test_no_unique_device(self, mock_get_devices): 128 mock_get_devices.return_value = ['foo', 'bar'] 129 self.assertRaises(adb.NoUniqueDeviceError, adb.get_device) 130 131 132class DeviceTest(unittest.TestCase): 133 def setUp(self): 134 self.device = adb.get_device() 135 136 137class ForwardReverseTest(DeviceTest): 138 def _test_no_rebind(self, description, direction_list, direction, 139 direction_no_rebind, direction_remove_all): 140 msg = direction_list() 141 self.assertEqual('', msg.strip(), 142 description + ' list must be empty to run this test.') 143 144 # Use --no-rebind with no existing binding 145 direction_no_rebind('tcp:5566', 'tcp:6655') 146 msg = direction_list() 147 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 148 149 # Use --no-rebind with existing binding 150 with self.assertRaises(subprocess.CalledProcessError): 151 direction_no_rebind('tcp:5566', 'tcp:6677') 152 msg = direction_list() 153 self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg)) 154 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 155 156 # Use the absence of --no-rebind with existing binding 157 direction('tcp:5566', 'tcp:6677') 158 msg = direction_list() 159 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 160 self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg)) 161 162 direction_remove_all() 163 msg = direction_list() 164 self.assertEqual('', msg.strip()) 165 166 def test_forward_no_rebind(self): 167 self._test_no_rebind('forward', self.device.forward_list, 168 self.device.forward, self.device.forward_no_rebind, 169 self.device.forward_remove_all) 170 171 def test_reverse_no_rebind(self): 172 self._test_no_rebind('reverse', self.device.reverse_list, 173 self.device.reverse, self.device.reverse_no_rebind, 174 self.device.reverse_remove_all) 175 176 def test_forward(self): 177 msg = self.device.forward_list() 178 self.assertEqual('', msg.strip(), 179 'Forwarding list must be empty to run this test.') 180 self.device.forward('tcp:5566', 'tcp:6655') 181 msg = self.device.forward_list() 182 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 183 self.device.forward('tcp:7788', 'tcp:8877') 184 msg = self.device.forward_list() 185 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 186 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 187 self.device.forward_remove('tcp:5566') 188 msg = self.device.forward_list() 189 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 190 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 191 self.device.forward_remove_all() 192 msg = self.device.forward_list() 193 self.assertEqual('', msg.strip()) 194 195 def test_forward_tcp_port_0(self): 196 self.assertEqual('', self.device.forward_list().strip(), 197 'Forwarding list must be empty to run this test.') 198 199 try: 200 # If resolving TCP port 0 is supported, `adb forward` will print 201 # the actual port number. 202 port = self.device.forward('tcp:0', 'tcp:8888').strip() 203 if not port: 204 raise unittest.SkipTest('Forwarding tcp:0 is not available.') 205 206 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port), 207 self.device.forward_list())) 208 finally: 209 self.device.forward_remove_all() 210 211 def test_reverse(self): 212 msg = self.device.reverse_list() 213 self.assertEqual('', msg.strip(), 214 'Reverse forwarding list must be empty to run this test.') 215 self.device.reverse('tcp:5566', 'tcp:6655') 216 msg = self.device.reverse_list() 217 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 218 self.device.reverse('tcp:7788', 'tcp:8877') 219 msg = self.device.reverse_list() 220 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 221 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 222 self.device.reverse_remove('tcp:5566') 223 msg = self.device.reverse_list() 224 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 225 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 226 self.device.reverse_remove_all() 227 msg = self.device.reverse_list() 228 self.assertEqual('', msg.strip()) 229 230 def test_reverse_tcp_port_0(self): 231 self.assertEqual('', self.device.reverse_list().strip(), 232 'Reverse list must be empty to run this test.') 233 234 try: 235 # If resolving TCP port 0 is supported, `adb reverse` will print 236 # the actual port number. 237 port = self.device.reverse('tcp:0', 'tcp:8888').strip() 238 if not port: 239 raise unittest.SkipTest('Reversing tcp:0 is not available.') 240 241 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port), 242 self.device.reverse_list())) 243 finally: 244 self.device.reverse_remove_all() 245 246 # Note: If you run this test when adb connect'd to a physical device over 247 # TCP, it will fail in adb reverse due to https://code.google.com/p/android/issues/detail?id=189821 248 def test_forward_reverse_echo(self): 249 """Send data through adb forward and read it back via adb reverse""" 250 forward_port = 12345 251 reverse_port = forward_port + 1 252 forward_spec = 'tcp:' + str(forward_port) 253 reverse_spec = 'tcp:' + str(reverse_port) 254 forward_setup = False 255 reverse_setup = False 256 257 try: 258 # listen on localhost:forward_port, connect to remote:forward_port 259 self.device.forward(forward_spec, forward_spec) 260 forward_setup = True 261 # listen on remote:forward_port, connect to localhost:reverse_port 262 self.device.reverse(forward_spec, reverse_spec) 263 reverse_setup = True 264 265 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 266 with contextlib.closing(listener): 267 # Use SO_REUSEADDR so that subsequent runs of the test can grab 268 # the port even if it is in TIME_WAIT. 269 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 270 271 # Listen on localhost:reverse_port before connecting to 272 # localhost:forward_port because that will cause adb to connect 273 # back to localhost:reverse_port. 274 listener.bind(('127.0.0.1', reverse_port)) 275 listener.listen(4) 276 277 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 278 with contextlib.closing(client): 279 # Connect to the listener. 280 client.connect(('127.0.0.1', forward_port)) 281 282 # Accept the client connection. 283 accepted_connection, addr = listener.accept() 284 with contextlib.closing(accepted_connection) as server: 285 data = 'hello' 286 287 # Send data into the port setup by adb forward. 288 client.sendall(data) 289 # Explicitly close() so that server gets EOF. 290 client.close() 291 292 # Verify that the data came back via adb reverse. 293 self.assertEqual(data, server.makefile().read()) 294 finally: 295 if reverse_setup: 296 self.device.reverse_remove(forward_spec) 297 if forward_setup: 298 self.device.forward_remove(forward_spec) 299 300 301class ShellTest(DeviceTest): 302 def _interactive_shell(self, shell_args, input): 303 """Runs an interactive adb shell. 304 305 Args: 306 shell_args: List of string arguments to `adb shell`. 307 input: String input to send to the interactive shell. 308 309 Returns: 310 The remote exit code. 311 312 Raises: 313 unittest.SkipTest: The device doesn't support exit codes. 314 """ 315 if not self.device.has_shell_protocol(): 316 raise unittest.SkipTest('exit codes are unavailable on this device') 317 318 proc = subprocess.Popen( 319 self.device.adb_cmd + ['shell'] + shell_args, 320 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 321 stderr=subprocess.PIPE) 322 # Closing host-side stdin doesn't trigger a PTY shell to exit so we need 323 # to explicitly add an exit command to close the session from the device 324 # side, plus the necessary newline to complete the interactive command. 325 proc.communicate(input + '; exit\n') 326 return proc.returncode 327 328 def test_cat(self): 329 """Check that we can at least cat a file.""" 330 out = self.device.shell(['cat', '/proc/uptime'])[0].strip() 331 elements = out.split() 332 self.assertEqual(len(elements), 2) 333 334 uptime, idle = elements 335 self.assertGreater(float(uptime), 0.0) 336 self.assertGreater(float(idle), 0.0) 337 338 def test_throws_on_failure(self): 339 self.assertRaises(adb.ShellError, self.device.shell, ['false']) 340 341 def test_output_not_stripped(self): 342 out = self.device.shell(['echo', 'foo'])[0] 343 self.assertEqual(out, 'foo' + self.device.linesep) 344 345 def test_shell_nocheck_failure(self): 346 rc, out, _ = self.device.shell_nocheck(['false']) 347 self.assertNotEqual(rc, 0) 348 self.assertEqual(out, '') 349 350 def test_shell_nocheck_output_not_stripped(self): 351 rc, out, _ = self.device.shell_nocheck(['echo', 'foo']) 352 self.assertEqual(rc, 0) 353 self.assertEqual(out, 'foo' + self.device.linesep) 354 355 def test_can_distinguish_tricky_results(self): 356 # If result checking on ADB shell is naively implemented as 357 # `adb shell <cmd>; echo $?`, we would be unable to distinguish the 358 # output from the result for a cmd of `echo -n 1`. 359 rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1']) 360 self.assertEqual(rc, 0) 361 self.assertEqual(out, '1') 362 363 def test_line_endings(self): 364 """Ensure that line ending translation is not happening in the pty. 365 366 Bug: http://b/19735063 367 """ 368 output = self.device.shell(['uname'])[0] 369 self.assertEqual(output, 'Linux' + self.device.linesep) 370 371 def test_pty_logic(self): 372 """Tests that a PTY is allocated when it should be. 373 374 PTY allocation behavior should match ssh; some behavior requires 375 a terminal stdin to test so this test will be skipped if stdin 376 is not a terminal. 377 """ 378 if not self.device.has_shell_protocol(): 379 raise unittest.SkipTest('PTY arguments unsupported on this device') 380 if not os.isatty(sys.stdin.fileno()): 381 raise unittest.SkipTest('PTY tests require stdin terminal') 382 383 def check_pty(args): 384 """Checks adb shell PTY allocation. 385 386 Tests |args| for terminal and non-terminal stdin. 387 388 Args: 389 args: -Tt args in a list (e.g. ['-t', '-t']). 390 391 Returns: 392 A tuple (<terminal>, <non-terminal>). True indicates 393 the corresponding shell allocated a remote PTY. 394 """ 395 test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]'] 396 397 terminal = subprocess.Popen( 398 test_cmd, stdin=None, 399 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 400 terminal.communicate() 401 402 non_terminal = subprocess.Popen( 403 test_cmd, stdin=subprocess.PIPE, 404 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 405 non_terminal.communicate() 406 407 return (terminal.returncode == 0, non_terminal.returncode == 0) 408 409 # -T: never allocate PTY. 410 self.assertEqual((False, False), check_pty(['-T'])) 411 412 # No args: PTY only if stdin is a terminal and shell is interactive, 413 # which is difficult to reliably test from a script. 414 self.assertEqual((False, False), check_pty([])) 415 416 # -t: PTY if stdin is a terminal. 417 self.assertEqual((True, False), check_pty(['-t'])) 418 419 # -t -t: always allocate PTY. 420 self.assertEqual((True, True), check_pty(['-t', '-t'])) 421 422 def test_shell_protocol(self): 423 """Tests the shell protocol on the device. 424 425 If the device supports shell protocol, this gives us the ability 426 to separate stdout/stderr and return the exit code directly. 427 428 Bug: http://b/19734861 429 """ 430 if not self.device.has_shell_protocol(): 431 raise unittest.SkipTest('shell protocol unsupported on this device') 432 433 # Shell protocol should be used by default. 434 result = self.device.shell_nocheck( 435 shlex.split('echo foo; echo bar >&2; exit 17')) 436 self.assertEqual(17, result[0]) 437 self.assertEqual('foo' + self.device.linesep, result[1]) 438 self.assertEqual('bar' + self.device.linesep, result[2]) 439 440 self.assertEqual(17, self._interactive_shell([], 'exit 17')) 441 442 # -x flag should disable shell protocol. 443 result = self.device.shell_nocheck( 444 shlex.split('-x echo foo; echo bar >&2; exit 17')) 445 self.assertEqual(0, result[0]) 446 self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1]) 447 self.assertEqual('', result[2]) 448 449 self.assertEqual(0, self._interactive_shell(['-x'], 'exit 17')) 450 451 def test_non_interactive_sigint(self): 452 """Tests that SIGINT in a non-interactive shell kills the process. 453 454 This requires the shell protocol in order to detect the broken 455 pipe; raw data transfer mode will only see the break once the 456 subprocess tries to read or write. 457 458 Bug: http://b/23825725 459 """ 460 if not self.device.has_shell_protocol(): 461 raise unittest.SkipTest('shell protocol unsupported on this device') 462 463 # Start a long-running process. 464 sleep_proc = subprocess.Popen( 465 self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'), 466 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 467 stderr=subprocess.STDOUT) 468 remote_pid = sleep_proc.stdout.readline().strip() 469 self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early') 470 proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid)) 471 472 # Verify that the process is running, send signal, verify it stopped. 473 self.device.shell(proc_query) 474 os.kill(sleep_proc.pid, signal.SIGINT) 475 sleep_proc.communicate() 476 477 # It can take some time for the process to receive the signal and die. 478 end_time = time.time() + 3 479 while self.device.shell_nocheck(proc_query)[0] != 1: 480 self.assertFalse(time.time() > end_time, 481 'subprocess failed to terminate in time') 482 483 def test_non_interactive_stdin(self): 484 """Tests that non-interactive shells send stdin.""" 485 if not self.device.has_shell_protocol(): 486 raise unittest.SkipTest('non-interactive stdin unsupported ' 487 'on this device') 488 489 # Test both small and large inputs. 490 small_input = 'foo' 491 large_input = '\n'.join(c * 100 for c in (string.ascii_letters + 492 string.digits)) 493 494 for input in (small_input, large_input): 495 proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'], 496 stdin=subprocess.PIPE, 497 stdout=subprocess.PIPE, 498 stderr=subprocess.PIPE) 499 stdout, stderr = proc.communicate(input) 500 self.assertEqual(input.splitlines(), stdout.splitlines()) 501 self.assertEqual('', stderr) 502 503 def test_sighup(self): 504 """Ensure that SIGHUP gets sent upon non-interactive ctrl-c""" 505 log_path = "/data/local/tmp/adb_signal_test.log" 506 507 # Clear the output file. 508 self.device.shell_nocheck(["echo", ">", log_path]) 509 510 script = """ 511 trap "echo SIGINT > {path}; exit 0" SIGINT 512 trap "echo SIGHUP > {path}; exit 0" SIGHUP 513 echo Waiting 514 while true; do sleep 100; done 515 """.format(path=log_path) 516 517 script = ";".join([x.strip() for x in script.strip().splitlines()]) 518 519 process = self.device.shell_popen( 520 ["sh", "-c", "'{}'".format(script)], kill_atexit=False, stdout=subprocess.PIPE) 521 522 self.assertEqual("Waiting\n", process.stdout.readline()) 523 process.send_signal(signal.SIGINT) 524 process.wait() 525 526 # Waiting for the local adb to finish is insufficient, since it hangs 527 # up immediately. 528 time.sleep(0.25) 529 530 stdout, _ = self.device.shell(["cat", log_path]) 531 self.assertEqual(stdout.strip(), "SIGHUP") 532 533 534class ArgumentEscapingTest(DeviceTest): 535 def test_shell_escaping(self): 536 """Make sure that argument escaping is somewhat sane.""" 537 538 # http://b/19734868 539 # Note that this actually matches ssh(1)'s behavior --- it's 540 # converted to `sh -c echo hello; echo world` which sh interprets 541 # as `sh -c echo` (with an argument to that shell of "hello"), 542 # and then `echo world` back in the first shell. 543 result = self.device.shell( 544 shlex.split("sh -c 'echo hello; echo world'"))[0] 545 result = result.splitlines() 546 self.assertEqual(['', 'world'], result) 547 # If you really wanted "hello" and "world", here's what you'd do: 548 result = self.device.shell( 549 shlex.split(r'echo hello\;echo world'))[0].splitlines() 550 self.assertEqual(['hello', 'world'], result) 551 552 # http://b/15479704 553 result = self.device.shell(shlex.split("'true && echo t'"))[0].strip() 554 self.assertEqual('t', result) 555 result = self.device.shell( 556 shlex.split("sh -c 'true && echo t'"))[0].strip() 557 self.assertEqual('t', result) 558 559 # http://b/20564385 560 result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip() 561 self.assertEqual('t', result) 562 result = self.device.shell( 563 shlex.split(r'echo -n 123\;uname'))[0].strip() 564 self.assertEqual('123Linux', result) 565 566 def test_install_argument_escaping(self): 567 """Make sure that install argument escaping works.""" 568 # http://b/20323053, http://b/3090932. 569 for file_suffix in ('-text;ls;1.apk', "-Live Hold'em.apk"): 570 tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix, 571 delete=False) 572 tf.close() 573 574 # Installing bogus .apks fails if the device supports exit codes. 575 try: 576 output = self.device.install(tf.name) 577 except subprocess.CalledProcessError as e: 578 output = e.output 579 580 self.assertIn(file_suffix, output) 581 os.remove(tf.name) 582 583 584class RootUnrootTest(DeviceTest): 585 def _test_root(self): 586 message = self.device.root() 587 if 'adbd cannot run as root in production builds' in message: 588 return 589 self.device.wait() 590 self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip()) 591 592 def _test_unroot(self): 593 self.device.unroot() 594 self.device.wait() 595 self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip()) 596 597 def test_root_unroot(self): 598 """Make sure that adb root and adb unroot work, using id(1).""" 599 if self.device.get_prop('ro.debuggable') != '1': 600 raise unittest.SkipTest('requires rootable build') 601 602 original_user = self.device.shell(['id', '-un'])[0].strip() 603 try: 604 if original_user == 'root': 605 self._test_unroot() 606 self._test_root() 607 elif original_user == 'shell': 608 self._test_root() 609 self._test_unroot() 610 finally: 611 if original_user == 'root': 612 self.device.root() 613 else: 614 self.device.unroot() 615 self.device.wait() 616 617 618class TcpIpTest(DeviceTest): 619 def test_tcpip_failure_raises(self): 620 """adb tcpip requires a port. 621 622 Bug: http://b/22636927 623 """ 624 self.assertRaises( 625 subprocess.CalledProcessError, self.device.tcpip, '') 626 self.assertRaises( 627 subprocess.CalledProcessError, self.device.tcpip, 'foo') 628 629 630class SystemPropertiesTest(DeviceTest): 631 def test_get_prop(self): 632 self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running') 633 634 @requires_root 635 def test_set_prop(self): 636 prop_name = 'foo.bar' 637 self.device.shell(['setprop', prop_name, '""']) 638 639 self.device.set_prop(prop_name, 'qux') 640 self.assertEqual( 641 self.device.shell(['getprop', prop_name])[0].strip(), 'qux') 642 643 644def compute_md5(string): 645 hsh = hashlib.md5() 646 hsh.update(string) 647 return hsh.hexdigest() 648 649 650def get_md5_prog(device): 651 """Older platforms (pre-L) had the name md5 rather than md5sum.""" 652 try: 653 device.shell(['md5sum', '/proc/uptime']) 654 return 'md5sum' 655 except adb.ShellError: 656 return 'md5' 657 658 659class HostFile(object): 660 def __init__(self, handle, checksum): 661 self.handle = handle 662 self.checksum = checksum 663 self.full_path = handle.name 664 self.base_name = os.path.basename(self.full_path) 665 666 667class DeviceFile(object): 668 def __init__(self, checksum, full_path): 669 self.checksum = checksum 670 self.full_path = full_path 671 self.base_name = posixpath.basename(self.full_path) 672 673 674def make_random_host_files(in_dir, num_files): 675 min_size = 1 * (1 << 10) 676 max_size = 16 * (1 << 10) 677 678 files = [] 679 for _ in xrange(num_files): 680 file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False) 681 682 size = random.randrange(min_size, max_size, 1024) 683 rand_str = os.urandom(size) 684 file_handle.write(rand_str) 685 file_handle.flush() 686 file_handle.close() 687 688 md5 = compute_md5(rand_str) 689 files.append(HostFile(file_handle, md5)) 690 return files 691 692 693def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'): 694 min_size = 1 * (1 << 10) 695 max_size = 16 * (1 << 10) 696 697 files = [] 698 for file_num in xrange(num_files): 699 size = random.randrange(min_size, max_size, 1024) 700 701 base_name = prefix + str(file_num) 702 full_path = posixpath.join(in_dir, base_name) 703 704 device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path), 705 'bs={}'.format(size), 'count=1']) 706 dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split() 707 708 files.append(DeviceFile(dev_md5, full_path)) 709 return files 710 711 712class FileOperationsTest(DeviceTest): 713 SCRATCH_DIR = '/data/local/tmp' 714 DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file' 715 DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir' 716 717 def _verify_remote(self, checksum, remote_path): 718 dev_md5, _ = self.device.shell([get_md5_prog(self.device), 719 remote_path])[0].split() 720 self.assertEqual(checksum, dev_md5) 721 722 def _verify_local(self, checksum, local_path): 723 with open(local_path, 'rb') as host_file: 724 host_md5 = compute_md5(host_file.read()) 725 self.assertEqual(host_md5, checksum) 726 727 def test_push(self): 728 """Push a randomly generated file to specified device.""" 729 kbytes = 512 730 tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False) 731 rand_str = os.urandom(1024 * kbytes) 732 tmp.write(rand_str) 733 tmp.close() 734 735 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 736 self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE) 737 738 self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE) 739 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 740 741 os.remove(tmp.name) 742 743 def test_push_dir(self): 744 """Push a randomly generated directory of files to the device.""" 745 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 746 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 747 748 try: 749 host_dir = tempfile.mkdtemp() 750 751 # Make sure the temp directory isn't setuid, or else adb will complain. 752 os.chmod(host_dir, 0o700) 753 754 # Create 32 random files. 755 temp_files = make_random_host_files(in_dir=host_dir, num_files=32) 756 self.device.push(host_dir, self.DEVICE_TEMP_DIR) 757 758 for temp_file in temp_files: 759 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 760 os.path.basename(host_dir), 761 temp_file.base_name) 762 self._verify_remote(temp_file.checksum, remote_path) 763 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 764 finally: 765 if host_dir is not None: 766 shutil.rmtree(host_dir) 767 768 @unittest.expectedFailure # b/25566053 769 def test_push_empty(self): 770 """Push a directory containing an empty directory to the device.""" 771 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 772 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 773 774 try: 775 host_dir = tempfile.mkdtemp() 776 777 # Make sure the temp directory isn't setuid, or else adb will complain. 778 os.chmod(host_dir, 0o700) 779 780 # Create an empty directory. 781 os.mkdir(os.path.join(host_dir, 'empty')) 782 783 self.device.push(host_dir, self.DEVICE_TEMP_DIR) 784 785 test_empty_cmd = ['[', '-d', 786 os.path.join(self.DEVICE_TEMP_DIR, 'empty')] 787 rc, _, _ = self.device.shell_nocheck(test_empty_cmd) 788 self.assertEqual(rc, 0) 789 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 790 finally: 791 if host_dir is not None: 792 shutil.rmtree(host_dir) 793 794 @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows") 795 def test_push_symlink(self): 796 """Push a symlink. 797 798 Bug: http://b/31491920 799 """ 800 try: 801 host_dir = tempfile.mkdtemp() 802 803 # Make sure the temp directory isn't setuid, or else adb will 804 # complain. 805 os.chmod(host_dir, 0o700) 806 807 with open(os.path.join(host_dir, 'foo'), 'w') as f: 808 f.write('foo') 809 810 symlink_path = os.path.join(host_dir, 'symlink') 811 os.symlink('foo', symlink_path) 812 813 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 814 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 815 self.device.push(symlink_path, self.DEVICE_TEMP_DIR) 816 rc, out, _ = self.device.shell_nocheck( 817 ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')]) 818 self.assertEqual(0, rc) 819 self.assertEqual(out.strip(), 'foo') 820 finally: 821 if host_dir is not None: 822 shutil.rmtree(host_dir) 823 824 def test_multiple_push(self): 825 """Push multiple files to the device in one adb push command. 826 827 Bug: http://b/25324823 828 """ 829 830 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 831 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 832 833 try: 834 host_dir = tempfile.mkdtemp() 835 836 # Create some random files and a subdirectory containing more files. 837 temp_files = make_random_host_files(in_dir=host_dir, num_files=4) 838 839 subdir = os.path.join(host_dir, 'subdir') 840 os.mkdir(subdir) 841 subdir_temp_files = make_random_host_files(in_dir=subdir, 842 num_files=4) 843 844 paths = map(lambda temp_file: temp_file.full_path, temp_files) 845 paths.append(subdir) 846 self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR]) 847 848 for temp_file in temp_files: 849 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 850 temp_file.base_name) 851 self._verify_remote(temp_file.checksum, remote_path) 852 853 for subdir_temp_file in subdir_temp_files: 854 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 855 # BROKEN: http://b/25394682 856 # 'subdir'; 857 temp_file.base_name) 858 self._verify_remote(temp_file.checksum, remote_path) 859 860 861 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 862 finally: 863 if host_dir is not None: 864 shutil.rmtree(host_dir) 865 866 @requires_non_root 867 def test_push_error_reporting(self): 868 """Make sure that errors that occur while pushing a file get reported 869 870 Bug: http://b/26816782 871 """ 872 with tempfile.NamedTemporaryFile() as tmp_file: 873 tmp_file.write('\0' * 1024 * 1024) 874 tmp_file.flush() 875 try: 876 self.device.push(local=tmp_file.name, remote='/system/') 877 self.fail('push should not have succeeded') 878 except subprocess.CalledProcessError as e: 879 output = e.output 880 881 self.assertIn('Permission denied', output) 882 883 def _test_pull(self, remote_file, checksum): 884 tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False) 885 tmp_write.close() 886 self.device.pull(remote=remote_file, local=tmp_write.name) 887 with open(tmp_write.name, 'rb') as tmp_read: 888 host_contents = tmp_read.read() 889 host_md5 = compute_md5(host_contents) 890 self.assertEqual(checksum, host_md5) 891 os.remove(tmp_write.name) 892 893 @requires_non_root 894 def test_pull_error_reporting(self): 895 self.device.shell(['touch', self.DEVICE_TEMP_FILE]) 896 self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE]) 897 898 try: 899 output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x') 900 except subprocess.CalledProcessError as e: 901 output = e.output 902 903 self.assertIn('Permission denied', output) 904 905 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 906 907 def test_pull(self): 908 """Pull a randomly generated file from specified device.""" 909 kbytes = 512 910 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 911 cmd = ['dd', 'if=/dev/urandom', 912 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024', 913 'count={}'.format(kbytes)] 914 self.device.shell(cmd) 915 dev_md5, _ = self.device.shell( 916 [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split() 917 self._test_pull(self.DEVICE_TEMP_FILE, dev_md5) 918 self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE]) 919 920 def test_pull_dir(self): 921 """Pull a randomly generated directory of files from the device.""" 922 try: 923 host_dir = tempfile.mkdtemp() 924 925 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 926 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 927 928 # Populate device directory with random files. 929 temp_files = make_random_device_files( 930 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 931 932 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 933 934 for temp_file in temp_files: 935 host_path = os.path.join( 936 host_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 937 temp_file.base_name) 938 self._verify_local(temp_file.checksum, host_path) 939 940 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 941 finally: 942 if host_dir is not None: 943 shutil.rmtree(host_dir) 944 945 def test_pull_dir_symlink(self): 946 """Pull a directory into a symlink to a directory. 947 948 Bug: http://b/27362811 949 """ 950 if os.name != 'posix': 951 raise unittest.SkipTest('requires POSIX') 952 953 try: 954 host_dir = tempfile.mkdtemp() 955 real_dir = os.path.join(host_dir, 'dir') 956 symlink = os.path.join(host_dir, 'symlink') 957 os.mkdir(real_dir) 958 os.symlink(real_dir, symlink) 959 960 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 961 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 962 963 # Populate device directory with random files. 964 temp_files = make_random_device_files( 965 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 966 967 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink) 968 969 for temp_file in temp_files: 970 host_path = os.path.join( 971 real_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 972 temp_file.base_name) 973 self._verify_local(temp_file.checksum, host_path) 974 975 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 976 finally: 977 if host_dir is not None: 978 shutil.rmtree(host_dir) 979 980 def test_pull_dir_symlink_collision(self): 981 """Pull a directory into a colliding symlink to directory.""" 982 if os.name != 'posix': 983 raise unittest.SkipTest('requires POSIX') 984 985 try: 986 host_dir = tempfile.mkdtemp() 987 real_dir = os.path.join(host_dir, 'real') 988 tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR) 989 symlink = os.path.join(host_dir, tmp_dirname) 990 os.mkdir(real_dir) 991 os.symlink(real_dir, symlink) 992 993 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 994 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 995 996 # Populate device directory with random files. 997 temp_files = make_random_device_files( 998 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 999 1000 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 1001 1002 for temp_file in temp_files: 1003 host_path = os.path.join(real_dir, temp_file.base_name) 1004 self._verify_local(temp_file.checksum, host_path) 1005 1006 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1007 finally: 1008 if host_dir is not None: 1009 shutil.rmtree(host_dir) 1010 1011 def test_pull_dir_nonexistent(self): 1012 """Pull a directory of files from the device to a nonexistent path.""" 1013 try: 1014 host_dir = tempfile.mkdtemp() 1015 dest_dir = os.path.join(host_dir, 'dest') 1016 1017 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1018 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1019 1020 # Populate device directory with random files. 1021 temp_files = make_random_device_files( 1022 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1023 1024 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir) 1025 1026 for temp_file in temp_files: 1027 host_path = os.path.join(dest_dir, temp_file.base_name) 1028 self._verify_local(temp_file.checksum, host_path) 1029 1030 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1031 finally: 1032 if host_dir is not None: 1033 shutil.rmtree(host_dir) 1034 1035 def test_pull_symlink_dir(self): 1036 """Pull a symlink to a directory of symlinks to files.""" 1037 try: 1038 host_dir = tempfile.mkdtemp() 1039 1040 remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents') 1041 remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links') 1042 remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink') 1043 1044 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1045 self.device.shell(['mkdir', '-p', remote_dir, remote_links]) 1046 self.device.shell(['ln', '-s', remote_links, remote_symlink]) 1047 1048 # Populate device directory with random files. 1049 temp_files = make_random_device_files( 1050 self.device, in_dir=remote_dir, num_files=32) 1051 1052 for temp_file in temp_files: 1053 self.device.shell( 1054 ['ln', '-s', '../contents/{}'.format(temp_file.base_name), 1055 posixpath.join(remote_links, temp_file.base_name)]) 1056 1057 self.device.pull(remote=remote_symlink, local=host_dir) 1058 1059 for temp_file in temp_files: 1060 host_path = os.path.join( 1061 host_dir, 'symlink', temp_file.base_name) 1062 self._verify_local(temp_file.checksum, host_path) 1063 1064 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1065 finally: 1066 if host_dir is not None: 1067 shutil.rmtree(host_dir) 1068 1069 def test_pull_empty(self): 1070 """Pull a directory containing an empty directory from the device.""" 1071 try: 1072 host_dir = tempfile.mkdtemp() 1073 1074 remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty') 1075 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1076 self.device.shell(['mkdir', '-p', remote_empty_path]) 1077 1078 self.device.pull(remote=remote_empty_path, local=host_dir) 1079 self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty'))) 1080 finally: 1081 if host_dir is not None: 1082 shutil.rmtree(host_dir) 1083 1084 def test_multiple_pull(self): 1085 """Pull a randomly generated directory of files from the device.""" 1086 1087 try: 1088 host_dir = tempfile.mkdtemp() 1089 1090 subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir') 1091 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1092 self.device.shell(['mkdir', '-p', subdir]) 1093 1094 # Create some random files and a subdirectory containing more files. 1095 temp_files = make_random_device_files( 1096 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4) 1097 1098 subdir_temp_files = make_random_device_files( 1099 self.device, in_dir=subdir, num_files=4, prefix='subdir_') 1100 1101 paths = map(lambda temp_file: temp_file.full_path, temp_files) 1102 paths.append(subdir) 1103 self.device._simple_call(['pull'] + paths + [host_dir]) 1104 1105 for temp_file in temp_files: 1106 local_path = os.path.join(host_dir, temp_file.base_name) 1107 self._verify_local(temp_file.checksum, local_path) 1108 1109 for subdir_temp_file in subdir_temp_files: 1110 local_path = os.path.join(host_dir, 1111 'subdir', 1112 subdir_temp_file.base_name) 1113 self._verify_local(subdir_temp_file.checksum, local_path) 1114 1115 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1116 finally: 1117 if host_dir is not None: 1118 shutil.rmtree(host_dir) 1119 1120 def test_sync(self): 1121 """Sync a randomly generated directory of files to specified device.""" 1122 1123 try: 1124 base_dir = tempfile.mkdtemp() 1125 1126 # Create mirror device directory hierarchy within base_dir. 1127 full_dir_path = base_dir + self.DEVICE_TEMP_DIR 1128 os.makedirs(full_dir_path) 1129 1130 # Create 32 random files within the host mirror. 1131 temp_files = make_random_host_files(in_dir=full_dir_path, num_files=32) 1132 1133 # Clean up any trash on the device. 1134 device = adb.get_device(product=base_dir) 1135 device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1136 1137 device.sync('data') 1138 1139 # Confirm that every file on the device mirrors that on the host. 1140 for temp_file in temp_files: 1141 device_full_path = posixpath.join(self.DEVICE_TEMP_DIR, 1142 temp_file.base_name) 1143 dev_md5, _ = device.shell( 1144 [get_md5_prog(self.device), device_full_path])[0].split() 1145 self.assertEqual(temp_file.checksum, dev_md5) 1146 1147 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1148 finally: 1149 if base_dir is not None: 1150 shutil.rmtree(base_dir) 1151 1152 def test_unicode_paths(self): 1153 """Ensure that we can support non-ASCII paths, even on Windows.""" 1154 name = u'로보카 폴리' 1155 1156 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1157 remote_path = u'/data/local/tmp/adb-test-{}'.format(name) 1158 1159 ## push. 1160 tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False) 1161 tf.close() 1162 self.device.push(tf.name, remote_path) 1163 os.remove(tf.name) 1164 self.assertFalse(os.path.exists(tf.name)) 1165 1166 # Verify that the device ended up with the expected UTF-8 path 1167 output = self.device.shell( 1168 ['ls', '/data/local/tmp/adb-test-*'])[0].strip() 1169 self.assertEqual(remote_path.encode('utf-8'), output) 1170 1171 # pull. 1172 self.device.pull(remote_path, tf.name) 1173 self.assertTrue(os.path.exists(tf.name)) 1174 os.remove(tf.name) 1175 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1176 1177 1178def main(): 1179 random.seed(0) 1180 if len(adb.get_devices()) > 0: 1181 suite = unittest.TestLoader().loadTestsFromName(__name__) 1182 unittest.TextTestRunner(verbosity=3).run(suite) 1183 else: 1184 print('Test suite must be run with attached devices') 1185 1186 1187if __name__ == '__main__': 1188 main() 1189