1#!/usr/bin/env python 2# -*- coding: utf-8 -*- 3# 4# Copyright (C) 2015 The Android Open Source Project 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18from __future__ import print_function 19 20import contextlib 21import hashlib 22import os 23import posixpath 24import random 25import re 26import shlex 27import shutil 28import signal 29import socket 30import string 31import subprocess 32import sys 33import tempfile 34import time 35import unittest 36 37import mock 38 39import adb 40 41 42def requires_root(func): 43 def wrapper(self, *args): 44 if self.device.get_prop('ro.debuggable') != '1': 45 raise unittest.SkipTest('requires rootable build') 46 47 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' 48 if not was_root: 49 self.device.root() 50 self.device.wait() 51 52 try: 53 func(self, *args) 54 finally: 55 if not was_root: 56 self.device.unroot() 57 self.device.wait() 58 59 return wrapper 60 61 62def requires_non_root(func): 63 def wrapper(self, *args): 64 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' 65 if was_root: 66 self.device.unroot() 67 self.device.wait() 68 69 try: 70 func(self, *args) 71 finally: 72 if was_root: 73 self.device.root() 74 self.device.wait() 75 76 return wrapper 77 78 79class GetDeviceTest(unittest.TestCase): 80 def setUp(self): 81 self.android_serial = os.getenv('ANDROID_SERIAL') 82 if 'ANDROID_SERIAL' in os.environ: 83 del os.environ['ANDROID_SERIAL'] 84 85 def tearDown(self): 86 if self.android_serial is not None: 87 os.environ['ANDROID_SERIAL'] = self.android_serial 88 else: 89 if 'ANDROID_SERIAL' in os.environ: 90 del os.environ['ANDROID_SERIAL'] 91 92 @mock.patch('adb.device.get_devices') 93 def test_explicit(self, mock_get_devices): 94 mock_get_devices.return_value = ['foo', 'bar'] 95 device = adb.get_device('foo') 96 self.assertEqual(device.serial, 'foo') 97 98 @mock.patch('adb.device.get_devices') 99 def test_from_env(self, mock_get_devices): 100 mock_get_devices.return_value = ['foo', 'bar'] 101 os.environ['ANDROID_SERIAL'] = 'foo' 102 device = adb.get_device() 103 self.assertEqual(device.serial, 'foo') 104 105 @mock.patch('adb.device.get_devices') 106 def test_arg_beats_env(self, mock_get_devices): 107 mock_get_devices.return_value = ['foo', 'bar'] 108 os.environ['ANDROID_SERIAL'] = 'bar' 109 device = adb.get_device('foo') 110 self.assertEqual(device.serial, 'foo') 111 112 @mock.patch('adb.device.get_devices') 113 def test_no_such_device(self, mock_get_devices): 114 mock_get_devices.return_value = ['foo', 'bar'] 115 self.assertRaises(adb.DeviceNotFoundError, adb.get_device, ['baz']) 116 117 os.environ['ANDROID_SERIAL'] = 'baz' 118 self.assertRaises(adb.DeviceNotFoundError, adb.get_device) 119 120 @mock.patch('adb.device.get_devices') 121 def test_unique_device(self, mock_get_devices): 122 mock_get_devices.return_value = ['foo'] 123 device = adb.get_device() 124 self.assertEqual(device.serial, 'foo') 125 126 @mock.patch('adb.device.get_devices') 127 def test_no_unique_device(self, mock_get_devices): 128 mock_get_devices.return_value = ['foo', 'bar'] 129 self.assertRaises(adb.NoUniqueDeviceError, adb.get_device) 130 131 132class DeviceTest(unittest.TestCase): 133 def setUp(self): 134 self.device = adb.get_device() 135 136 137class ForwardReverseTest(DeviceTest): 138 def _test_no_rebind(self, description, direction_list, direction, 139 direction_no_rebind, direction_remove_all): 140 msg = direction_list() 141 self.assertEqual('', msg.strip(), 142 description + ' list must be empty to run this test.') 143 144 # Use --no-rebind with no existing binding 145 direction_no_rebind('tcp:5566', 'tcp:6655') 146 msg = direction_list() 147 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 148 149 # Use --no-rebind with existing binding 150 with self.assertRaises(subprocess.CalledProcessError): 151 direction_no_rebind('tcp:5566', 'tcp:6677') 152 msg = direction_list() 153 self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg)) 154 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 155 156 # Use the absence of --no-rebind with existing binding 157 direction('tcp:5566', 'tcp:6677') 158 msg = direction_list() 159 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 160 self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg)) 161 162 direction_remove_all() 163 msg = direction_list() 164 self.assertEqual('', msg.strip()) 165 166 def test_forward_no_rebind(self): 167 self._test_no_rebind('forward', self.device.forward_list, 168 self.device.forward, self.device.forward_no_rebind, 169 self.device.forward_remove_all) 170 171 def test_reverse_no_rebind(self): 172 self._test_no_rebind('reverse', self.device.reverse_list, 173 self.device.reverse, self.device.reverse_no_rebind, 174 self.device.reverse_remove_all) 175 176 def test_forward(self): 177 msg = self.device.forward_list() 178 self.assertEqual('', msg.strip(), 179 'Forwarding list must be empty to run this test.') 180 self.device.forward('tcp:5566', 'tcp:6655') 181 msg = self.device.forward_list() 182 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 183 self.device.forward('tcp:7788', 'tcp:8877') 184 msg = self.device.forward_list() 185 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 186 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 187 self.device.forward_remove('tcp:5566') 188 msg = self.device.forward_list() 189 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 190 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 191 self.device.forward_remove_all() 192 msg = self.device.forward_list() 193 self.assertEqual('', msg.strip()) 194 195 def test_forward_tcp_port_0(self): 196 self.assertEqual('', self.device.forward_list().strip(), 197 'Forwarding list must be empty to run this test.') 198 199 try: 200 # If resolving TCP port 0 is supported, `adb forward` will print 201 # the actual port number. 202 port = self.device.forward('tcp:0', 'tcp:8888').strip() 203 if not port: 204 raise unittest.SkipTest('Forwarding tcp:0 is not available.') 205 206 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port), 207 self.device.forward_list())) 208 finally: 209 self.device.forward_remove_all() 210 211 def test_reverse(self): 212 msg = self.device.reverse_list() 213 self.assertEqual('', msg.strip(), 214 'Reverse forwarding list must be empty to run this test.') 215 self.device.reverse('tcp:5566', 'tcp:6655') 216 msg = self.device.reverse_list() 217 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 218 self.device.reverse('tcp:7788', 'tcp:8877') 219 msg = self.device.reverse_list() 220 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 221 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 222 self.device.reverse_remove('tcp:5566') 223 msg = self.device.reverse_list() 224 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 225 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 226 self.device.reverse_remove_all() 227 msg = self.device.reverse_list() 228 self.assertEqual('', msg.strip()) 229 230 def test_reverse_tcp_port_0(self): 231 self.assertEqual('', self.device.reverse_list().strip(), 232 'Reverse list must be empty to run this test.') 233 234 try: 235 # If resolving TCP port 0 is supported, `adb reverse` will print 236 # the actual port number. 237 port = self.device.reverse('tcp:0', 'tcp:8888').strip() 238 if not port: 239 raise unittest.SkipTest('Reversing tcp:0 is not available.') 240 241 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port), 242 self.device.reverse_list())) 243 finally: 244 self.device.reverse_remove_all() 245 246 # Note: If you run this test when adb connect'd to a physical device over 247 # TCP, it will fail in adb reverse due to https://code.google.com/p/android/issues/detail?id=189821 248 def test_forward_reverse_echo(self): 249 """Send data through adb forward and read it back via adb reverse""" 250 forward_port = 12345 251 reverse_port = forward_port + 1 252 forward_spec = 'tcp:' + str(forward_port) 253 reverse_spec = 'tcp:' + str(reverse_port) 254 forward_setup = False 255 reverse_setup = False 256 257 try: 258 # listen on localhost:forward_port, connect to remote:forward_port 259 self.device.forward(forward_spec, forward_spec) 260 forward_setup = True 261 # listen on remote:forward_port, connect to localhost:reverse_port 262 self.device.reverse(forward_spec, reverse_spec) 263 reverse_setup = True 264 265 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 266 with contextlib.closing(listener): 267 # Use SO_REUSEADDR so that subsequent runs of the test can grab 268 # the port even if it is in TIME_WAIT. 269 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 270 271 # Listen on localhost:reverse_port before connecting to 272 # localhost:forward_port because that will cause adb to connect 273 # back to localhost:reverse_port. 274 listener.bind(('127.0.0.1', reverse_port)) 275 listener.listen(4) 276 277 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 278 with contextlib.closing(client): 279 # Connect to the listener. 280 client.connect(('127.0.0.1', forward_port)) 281 282 # Accept the client connection. 283 accepted_connection, addr = listener.accept() 284 with contextlib.closing(accepted_connection) as server: 285 data = 'hello' 286 287 # Send data into the port setup by adb forward. 288 client.sendall(data) 289 # Explicitly close() so that server gets EOF. 290 client.close() 291 292 # Verify that the data came back via adb reverse. 293 self.assertEqual(data, server.makefile().read()) 294 finally: 295 if reverse_setup: 296 self.device.reverse_remove(forward_spec) 297 if forward_setup: 298 self.device.forward_remove(forward_spec) 299 300 301class ShellTest(DeviceTest): 302 def _interactive_shell(self, shell_args, input): 303 """Runs an interactive adb shell. 304 305 Args: 306 shell_args: List of string arguments to `adb shell`. 307 input: String input to send to the interactive shell. 308 309 Returns: 310 The remote exit code. 311 312 Raises: 313 unittest.SkipTest: The device doesn't support exit codes. 314 """ 315 if not self.device.has_shell_protocol(): 316 raise unittest.SkipTest('exit codes are unavailable on this device') 317 318 proc = subprocess.Popen( 319 self.device.adb_cmd + ['shell'] + shell_args, 320 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 321 stderr=subprocess.PIPE) 322 # Closing host-side stdin doesn't trigger a PTY shell to exit so we need 323 # to explicitly add an exit command to close the session from the device 324 # side, plus the necessary newline to complete the interactive command. 325 proc.communicate(input + '; exit\n') 326 return proc.returncode 327 328 def test_cat(self): 329 """Check that we can at least cat a file.""" 330 out = self.device.shell(['cat', '/proc/uptime'])[0].strip() 331 elements = out.split() 332 self.assertEqual(len(elements), 2) 333 334 uptime, idle = elements 335 self.assertGreater(float(uptime), 0.0) 336 self.assertGreater(float(idle), 0.0) 337 338 def test_throws_on_failure(self): 339 self.assertRaises(adb.ShellError, self.device.shell, ['false']) 340 341 def test_output_not_stripped(self): 342 out = self.device.shell(['echo', 'foo'])[0] 343 self.assertEqual(out, 'foo' + self.device.linesep) 344 345 def test_shell_nocheck_failure(self): 346 rc, out, _ = self.device.shell_nocheck(['false']) 347 self.assertNotEqual(rc, 0) 348 self.assertEqual(out, '') 349 350 def test_shell_nocheck_output_not_stripped(self): 351 rc, out, _ = self.device.shell_nocheck(['echo', 'foo']) 352 self.assertEqual(rc, 0) 353 self.assertEqual(out, 'foo' + self.device.linesep) 354 355 def test_can_distinguish_tricky_results(self): 356 # If result checking on ADB shell is naively implemented as 357 # `adb shell <cmd>; echo $?`, we would be unable to distinguish the 358 # output from the result for a cmd of `echo -n 1`. 359 rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1']) 360 self.assertEqual(rc, 0) 361 self.assertEqual(out, '1') 362 363 def test_line_endings(self): 364 """Ensure that line ending translation is not happening in the pty. 365 366 Bug: http://b/19735063 367 """ 368 output = self.device.shell(['uname'])[0] 369 self.assertEqual(output, 'Linux' + self.device.linesep) 370 371 def test_pty_logic(self): 372 """Tests that a PTY is allocated when it should be. 373 374 PTY allocation behavior should match ssh. 375 """ 376 def check_pty(args): 377 """Checks adb shell PTY allocation. 378 379 Tests |args| for terminal and non-terminal stdin. 380 381 Args: 382 args: -Tt args in a list (e.g. ['-t', '-t']). 383 384 Returns: 385 A tuple (<terminal>, <non-terminal>). True indicates 386 the corresponding shell allocated a remote PTY. 387 """ 388 test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]'] 389 390 terminal = subprocess.Popen( 391 test_cmd, stdin=None, 392 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 393 terminal.communicate() 394 395 non_terminal = subprocess.Popen( 396 test_cmd, stdin=subprocess.PIPE, 397 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 398 non_terminal.communicate() 399 400 return (terminal.returncode == 0, non_terminal.returncode == 0) 401 402 # -T: never allocate PTY. 403 self.assertEqual((False, False), check_pty(['-T'])) 404 405 # These tests require a new device. 406 if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()): 407 # No args: PTY only if stdin is a terminal and shell is interactive, 408 # which is difficult to reliably test from a script. 409 self.assertEqual((False, False), check_pty([])) 410 411 # -t: PTY if stdin is a terminal. 412 self.assertEqual((True, False), check_pty(['-t'])) 413 414 # -t -t: always allocate PTY. 415 self.assertEqual((True, True), check_pty(['-t', '-t'])) 416 417 # -tt: always allocate PTY, POSIX style (http://b/32216152). 418 self.assertEqual((True, True), check_pty(['-tt'])) 419 420 # -ttt: ssh has weird even/odd behavior with multiple -t flags, but 421 # we follow the man page instead. 422 self.assertEqual((True, True), check_pty(['-ttt'])) 423 424 # -ttx: -x and -tt aren't incompatible (though -Tx would be an error). 425 self.assertEqual((True, True), check_pty(['-ttx'])) 426 427 # -Ttt: -tt cancels out -T. 428 self.assertEqual((True, True), check_pty(['-Ttt'])) 429 430 # -ttT: -T cancels out -tt. 431 self.assertEqual((False, False), check_pty(['-ttT'])) 432 433 def test_shell_protocol(self): 434 """Tests the shell protocol on the device. 435 436 If the device supports shell protocol, this gives us the ability 437 to separate stdout/stderr and return the exit code directly. 438 439 Bug: http://b/19734861 440 """ 441 if not self.device.has_shell_protocol(): 442 raise unittest.SkipTest('shell protocol unsupported on this device') 443 444 # Shell protocol should be used by default. 445 result = self.device.shell_nocheck( 446 shlex.split('echo foo; echo bar >&2; exit 17')) 447 self.assertEqual(17, result[0]) 448 self.assertEqual('foo' + self.device.linesep, result[1]) 449 self.assertEqual('bar' + self.device.linesep, result[2]) 450 451 self.assertEqual(17, self._interactive_shell([], 'exit 17')) 452 453 # -x flag should disable shell protocol. 454 result = self.device.shell_nocheck( 455 shlex.split('-x echo foo; echo bar >&2; exit 17')) 456 self.assertEqual(0, result[0]) 457 self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1]) 458 self.assertEqual('', result[2]) 459 460 self.assertEqual(0, self._interactive_shell(['-x'], 'exit 17')) 461 462 def test_non_interactive_sigint(self): 463 """Tests that SIGINT in a non-interactive shell kills the process. 464 465 This requires the shell protocol in order to detect the broken 466 pipe; raw data transfer mode will only see the break once the 467 subprocess tries to read or write. 468 469 Bug: http://b/23825725 470 """ 471 if not self.device.has_shell_protocol(): 472 raise unittest.SkipTest('shell protocol unsupported on this device') 473 474 # Start a long-running process. 475 sleep_proc = subprocess.Popen( 476 self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'), 477 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 478 stderr=subprocess.STDOUT) 479 remote_pid = sleep_proc.stdout.readline().strip() 480 self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early') 481 proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid)) 482 483 # Verify that the process is running, send signal, verify it stopped. 484 self.device.shell(proc_query) 485 os.kill(sleep_proc.pid, signal.SIGINT) 486 sleep_proc.communicate() 487 488 # It can take some time for the process to receive the signal and die. 489 end_time = time.time() + 3 490 while self.device.shell_nocheck(proc_query)[0] != 1: 491 self.assertFalse(time.time() > end_time, 492 'subprocess failed to terminate in time') 493 494 def test_non_interactive_stdin(self): 495 """Tests that non-interactive shells send stdin.""" 496 if not self.device.has_shell_protocol(): 497 raise unittest.SkipTest('non-interactive stdin unsupported ' 498 'on this device') 499 500 # Test both small and large inputs. 501 small_input = 'foo' 502 large_input = '\n'.join(c * 100 for c in (string.ascii_letters + 503 string.digits)) 504 505 for input in (small_input, large_input): 506 proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'], 507 stdin=subprocess.PIPE, 508 stdout=subprocess.PIPE, 509 stderr=subprocess.PIPE) 510 stdout, stderr = proc.communicate(input) 511 self.assertEqual(input.splitlines(), stdout.splitlines()) 512 self.assertEqual('', stderr) 513 514 def test_sighup(self): 515 """Ensure that SIGHUP gets sent upon non-interactive ctrl-c""" 516 log_path = "/data/local/tmp/adb_signal_test.log" 517 518 # Clear the output file. 519 self.device.shell_nocheck(["echo", ">", log_path]) 520 521 script = """ 522 trap "echo SIGINT > {path}; exit 0" SIGINT 523 trap "echo SIGHUP > {path}; exit 0" SIGHUP 524 echo Waiting 525 read 526 """.format(path=log_path) 527 528 script = ";".join([x.strip() for x in script.strip().splitlines()]) 529 530 process = self.device.shell_popen([script], kill_atexit=False, 531 stdin=subprocess.PIPE, 532 stdout=subprocess.PIPE) 533 534 self.assertEqual("Waiting\n", process.stdout.readline()) 535 process.send_signal(signal.SIGINT) 536 process.wait() 537 538 # Waiting for the local adb to finish is insufficient, since it hangs 539 # up immediately. 540 time.sleep(1) 541 542 stdout, _ = self.device.shell(["cat", log_path]) 543 self.assertEqual(stdout.strip(), "SIGHUP") 544 545 546class ArgumentEscapingTest(DeviceTest): 547 def test_shell_escaping(self): 548 """Make sure that argument escaping is somewhat sane.""" 549 550 # http://b/19734868 551 # Note that this actually matches ssh(1)'s behavior --- it's 552 # converted to `sh -c echo hello; echo world` which sh interprets 553 # as `sh -c echo` (with an argument to that shell of "hello"), 554 # and then `echo world` back in the first shell. 555 result = self.device.shell( 556 shlex.split("sh -c 'echo hello; echo world'"))[0] 557 result = result.splitlines() 558 self.assertEqual(['', 'world'], result) 559 # If you really wanted "hello" and "world", here's what you'd do: 560 result = self.device.shell( 561 shlex.split(r'echo hello\;echo world'))[0].splitlines() 562 self.assertEqual(['hello', 'world'], result) 563 564 # http://b/15479704 565 result = self.device.shell(shlex.split("'true && echo t'"))[0].strip() 566 self.assertEqual('t', result) 567 result = self.device.shell( 568 shlex.split("sh -c 'true && echo t'"))[0].strip() 569 self.assertEqual('t', result) 570 571 # http://b/20564385 572 result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip() 573 self.assertEqual('t', result) 574 result = self.device.shell( 575 shlex.split(r'echo -n 123\;uname'))[0].strip() 576 self.assertEqual('123Linux', result) 577 578 def test_install_argument_escaping(self): 579 """Make sure that install argument escaping works.""" 580 # http://b/20323053, http://b/3090932. 581 for file_suffix in ('-text;ls;1.apk', "-Live Hold'em.apk"): 582 tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix, 583 delete=False) 584 tf.close() 585 586 # Installing bogus .apks fails if the device supports exit codes. 587 try: 588 output = self.device.install(tf.name) 589 except subprocess.CalledProcessError as e: 590 output = e.output 591 592 self.assertIn(file_suffix, output) 593 os.remove(tf.name) 594 595 596class RootUnrootTest(DeviceTest): 597 def _test_root(self): 598 message = self.device.root() 599 if 'adbd cannot run as root in production builds' in message: 600 return 601 self.device.wait() 602 self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip()) 603 604 def _test_unroot(self): 605 self.device.unroot() 606 self.device.wait() 607 self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip()) 608 609 def test_root_unroot(self): 610 """Make sure that adb root and adb unroot work, using id(1).""" 611 if self.device.get_prop('ro.debuggable') != '1': 612 raise unittest.SkipTest('requires rootable build') 613 614 original_user = self.device.shell(['id', '-un'])[0].strip() 615 try: 616 if original_user == 'root': 617 self._test_unroot() 618 self._test_root() 619 elif original_user == 'shell': 620 self._test_root() 621 self._test_unroot() 622 finally: 623 if original_user == 'root': 624 self.device.root() 625 else: 626 self.device.unroot() 627 self.device.wait() 628 629 630class TcpIpTest(DeviceTest): 631 def test_tcpip_failure_raises(self): 632 """adb tcpip requires a port. 633 634 Bug: http://b/22636927 635 """ 636 self.assertRaises( 637 subprocess.CalledProcessError, self.device.tcpip, '') 638 self.assertRaises( 639 subprocess.CalledProcessError, self.device.tcpip, 'foo') 640 641 642class SystemPropertiesTest(DeviceTest): 643 def test_get_prop(self): 644 self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running') 645 646 @requires_root 647 def test_set_prop(self): 648 prop_name = 'foo.bar' 649 self.device.shell(['setprop', prop_name, '""']) 650 651 self.device.set_prop(prop_name, 'qux') 652 self.assertEqual( 653 self.device.shell(['getprop', prop_name])[0].strip(), 'qux') 654 655 656def compute_md5(string): 657 hsh = hashlib.md5() 658 hsh.update(string) 659 return hsh.hexdigest() 660 661 662def get_md5_prog(device): 663 """Older platforms (pre-L) had the name md5 rather than md5sum.""" 664 try: 665 device.shell(['md5sum', '/proc/uptime']) 666 return 'md5sum' 667 except adb.ShellError: 668 return 'md5' 669 670 671class HostFile(object): 672 def __init__(self, handle, checksum): 673 self.handle = handle 674 self.checksum = checksum 675 self.full_path = handle.name 676 self.base_name = os.path.basename(self.full_path) 677 678 679class DeviceFile(object): 680 def __init__(self, checksum, full_path): 681 self.checksum = checksum 682 self.full_path = full_path 683 self.base_name = posixpath.basename(self.full_path) 684 685 686def make_random_host_files(in_dir, num_files): 687 min_size = 1 * (1 << 10) 688 max_size = 16 * (1 << 10) 689 690 files = [] 691 for _ in xrange(num_files): 692 file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False) 693 694 size = random.randrange(min_size, max_size, 1024) 695 rand_str = os.urandom(size) 696 file_handle.write(rand_str) 697 file_handle.flush() 698 file_handle.close() 699 700 md5 = compute_md5(rand_str) 701 files.append(HostFile(file_handle, md5)) 702 return files 703 704 705def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'): 706 min_size = 1 * (1 << 10) 707 max_size = 16 * (1 << 10) 708 709 files = [] 710 for file_num in xrange(num_files): 711 size = random.randrange(min_size, max_size, 1024) 712 713 base_name = prefix + str(file_num) 714 full_path = posixpath.join(in_dir, base_name) 715 716 device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path), 717 'bs={}'.format(size), 'count=1']) 718 dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split() 719 720 files.append(DeviceFile(dev_md5, full_path)) 721 return files 722 723 724class FileOperationsTest(DeviceTest): 725 SCRATCH_DIR = '/data/local/tmp' 726 DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file' 727 DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir' 728 729 def _verify_remote(self, checksum, remote_path): 730 dev_md5, _ = self.device.shell([get_md5_prog(self.device), 731 remote_path])[0].split() 732 self.assertEqual(checksum, dev_md5) 733 734 def _verify_local(self, checksum, local_path): 735 with open(local_path, 'rb') as host_file: 736 host_md5 = compute_md5(host_file.read()) 737 self.assertEqual(host_md5, checksum) 738 739 def test_push(self): 740 """Push a randomly generated file to specified device.""" 741 kbytes = 512 742 tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False) 743 rand_str = os.urandom(1024 * kbytes) 744 tmp.write(rand_str) 745 tmp.close() 746 747 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 748 self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE) 749 750 self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE) 751 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 752 753 os.remove(tmp.name) 754 755 def test_push_dir(self): 756 """Push a randomly generated directory of files to the device.""" 757 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 758 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 759 760 try: 761 host_dir = tempfile.mkdtemp() 762 763 # Make sure the temp directory isn't setuid, or else adb will complain. 764 os.chmod(host_dir, 0o700) 765 766 # Create 32 random files. 767 temp_files = make_random_host_files(in_dir=host_dir, num_files=32) 768 self.device.push(host_dir, self.DEVICE_TEMP_DIR) 769 770 for temp_file in temp_files: 771 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 772 os.path.basename(host_dir), 773 temp_file.base_name) 774 self._verify_remote(temp_file.checksum, remote_path) 775 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 776 finally: 777 if host_dir is not None: 778 shutil.rmtree(host_dir) 779 780 @unittest.expectedFailure # b/25566053 781 def test_push_empty(self): 782 """Push a directory containing an empty directory to the device.""" 783 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 784 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 785 786 try: 787 host_dir = tempfile.mkdtemp() 788 789 # Make sure the temp directory isn't setuid, or else adb will complain. 790 os.chmod(host_dir, 0o700) 791 792 # Create an empty directory. 793 os.mkdir(os.path.join(host_dir, 'empty')) 794 795 self.device.push(host_dir, self.DEVICE_TEMP_DIR) 796 797 test_empty_cmd = ['[', '-d', 798 os.path.join(self.DEVICE_TEMP_DIR, 'empty')] 799 rc, _, _ = self.device.shell_nocheck(test_empty_cmd) 800 self.assertEqual(rc, 0) 801 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 802 finally: 803 if host_dir is not None: 804 shutil.rmtree(host_dir) 805 806 @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows") 807 def test_push_symlink(self): 808 """Push a symlink. 809 810 Bug: http://b/31491920 811 """ 812 try: 813 host_dir = tempfile.mkdtemp() 814 815 # Make sure the temp directory isn't setuid, or else adb will 816 # complain. 817 os.chmod(host_dir, 0o700) 818 819 with open(os.path.join(host_dir, 'foo'), 'w') as f: 820 f.write('foo') 821 822 symlink_path = os.path.join(host_dir, 'symlink') 823 os.symlink('foo', symlink_path) 824 825 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 826 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 827 self.device.push(symlink_path, self.DEVICE_TEMP_DIR) 828 rc, out, _ = self.device.shell_nocheck( 829 ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')]) 830 self.assertEqual(0, rc) 831 self.assertEqual(out.strip(), 'foo') 832 finally: 833 if host_dir is not None: 834 shutil.rmtree(host_dir) 835 836 def test_multiple_push(self): 837 """Push multiple files to the device in one adb push command. 838 839 Bug: http://b/25324823 840 """ 841 842 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 843 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 844 845 try: 846 host_dir = tempfile.mkdtemp() 847 848 # Create some random files and a subdirectory containing more files. 849 temp_files = make_random_host_files(in_dir=host_dir, num_files=4) 850 851 subdir = os.path.join(host_dir, 'subdir') 852 os.mkdir(subdir) 853 subdir_temp_files = make_random_host_files(in_dir=subdir, 854 num_files=4) 855 856 paths = map(lambda temp_file: temp_file.full_path, temp_files) 857 paths.append(subdir) 858 self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR]) 859 860 for temp_file in temp_files: 861 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 862 temp_file.base_name) 863 self._verify_remote(temp_file.checksum, remote_path) 864 865 for subdir_temp_file in subdir_temp_files: 866 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 867 # BROKEN: http://b/25394682 868 # 'subdir'; 869 temp_file.base_name) 870 self._verify_remote(temp_file.checksum, remote_path) 871 872 873 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 874 finally: 875 if host_dir is not None: 876 shutil.rmtree(host_dir) 877 878 @requires_non_root 879 def test_push_error_reporting(self): 880 """Make sure that errors that occur while pushing a file get reported 881 882 Bug: http://b/26816782 883 """ 884 with tempfile.NamedTemporaryFile() as tmp_file: 885 tmp_file.write('\0' * 1024 * 1024) 886 tmp_file.flush() 887 try: 888 self.device.push(local=tmp_file.name, remote='/system/') 889 self.fail('push should not have succeeded') 890 except subprocess.CalledProcessError as e: 891 output = e.output 892 893 self.assertTrue('Permission denied' in output or 894 'Read-only file system' in output) 895 896 def _test_pull(self, remote_file, checksum): 897 tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False) 898 tmp_write.close() 899 self.device.pull(remote=remote_file, local=tmp_write.name) 900 with open(tmp_write.name, 'rb') as tmp_read: 901 host_contents = tmp_read.read() 902 host_md5 = compute_md5(host_contents) 903 self.assertEqual(checksum, host_md5) 904 os.remove(tmp_write.name) 905 906 @requires_non_root 907 def test_pull_error_reporting(self): 908 self.device.shell(['touch', self.DEVICE_TEMP_FILE]) 909 self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE]) 910 911 try: 912 output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x') 913 except subprocess.CalledProcessError as e: 914 output = e.output 915 916 self.assertIn('Permission denied', output) 917 918 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 919 920 def test_pull(self): 921 """Pull a randomly generated file from specified device.""" 922 kbytes = 512 923 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 924 cmd = ['dd', 'if=/dev/urandom', 925 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024', 926 'count={}'.format(kbytes)] 927 self.device.shell(cmd) 928 dev_md5, _ = self.device.shell( 929 [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split() 930 self._test_pull(self.DEVICE_TEMP_FILE, dev_md5) 931 self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE]) 932 933 def test_pull_dir(self): 934 """Pull a randomly generated directory of files from the device.""" 935 try: 936 host_dir = tempfile.mkdtemp() 937 938 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 939 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 940 941 # Populate device directory with random files. 942 temp_files = make_random_device_files( 943 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 944 945 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 946 947 for temp_file in temp_files: 948 host_path = os.path.join( 949 host_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 950 temp_file.base_name) 951 self._verify_local(temp_file.checksum, host_path) 952 953 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 954 finally: 955 if host_dir is not None: 956 shutil.rmtree(host_dir) 957 958 def test_pull_dir_symlink(self): 959 """Pull a directory into a symlink to a directory. 960 961 Bug: http://b/27362811 962 """ 963 if os.name != 'posix': 964 raise unittest.SkipTest('requires POSIX') 965 966 try: 967 host_dir = tempfile.mkdtemp() 968 real_dir = os.path.join(host_dir, 'dir') 969 symlink = os.path.join(host_dir, 'symlink') 970 os.mkdir(real_dir) 971 os.symlink(real_dir, symlink) 972 973 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 974 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 975 976 # Populate device directory with random files. 977 temp_files = make_random_device_files( 978 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 979 980 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink) 981 982 for temp_file in temp_files: 983 host_path = os.path.join( 984 real_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 985 temp_file.base_name) 986 self._verify_local(temp_file.checksum, host_path) 987 988 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 989 finally: 990 if host_dir is not None: 991 shutil.rmtree(host_dir) 992 993 def test_pull_dir_symlink_collision(self): 994 """Pull a directory into a colliding symlink to directory.""" 995 if os.name != 'posix': 996 raise unittest.SkipTest('requires POSIX') 997 998 try: 999 host_dir = tempfile.mkdtemp() 1000 real_dir = os.path.join(host_dir, 'real') 1001 tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR) 1002 symlink = os.path.join(host_dir, tmp_dirname) 1003 os.mkdir(real_dir) 1004 os.symlink(real_dir, symlink) 1005 1006 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1007 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1008 1009 # Populate device directory with random files. 1010 temp_files = make_random_device_files( 1011 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1012 1013 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 1014 1015 for temp_file in temp_files: 1016 host_path = os.path.join(real_dir, temp_file.base_name) 1017 self._verify_local(temp_file.checksum, host_path) 1018 1019 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1020 finally: 1021 if host_dir is not None: 1022 shutil.rmtree(host_dir) 1023 1024 def test_pull_dir_nonexistent(self): 1025 """Pull a directory of files from the device to a nonexistent path.""" 1026 try: 1027 host_dir = tempfile.mkdtemp() 1028 dest_dir = os.path.join(host_dir, 'dest') 1029 1030 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1031 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1032 1033 # Populate device directory with random files. 1034 temp_files = make_random_device_files( 1035 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1036 1037 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir) 1038 1039 for temp_file in temp_files: 1040 host_path = os.path.join(dest_dir, temp_file.base_name) 1041 self._verify_local(temp_file.checksum, host_path) 1042 1043 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1044 finally: 1045 if host_dir is not None: 1046 shutil.rmtree(host_dir) 1047 1048 def test_pull_symlink_dir(self): 1049 """Pull a symlink to a directory of symlinks to files.""" 1050 try: 1051 host_dir = tempfile.mkdtemp() 1052 1053 remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents') 1054 remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links') 1055 remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink') 1056 1057 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1058 self.device.shell(['mkdir', '-p', remote_dir, remote_links]) 1059 self.device.shell(['ln', '-s', remote_links, remote_symlink]) 1060 1061 # Populate device directory with random files. 1062 temp_files = make_random_device_files( 1063 self.device, in_dir=remote_dir, num_files=32) 1064 1065 for temp_file in temp_files: 1066 self.device.shell( 1067 ['ln', '-s', '../contents/{}'.format(temp_file.base_name), 1068 posixpath.join(remote_links, temp_file.base_name)]) 1069 1070 self.device.pull(remote=remote_symlink, local=host_dir) 1071 1072 for temp_file in temp_files: 1073 host_path = os.path.join( 1074 host_dir, 'symlink', temp_file.base_name) 1075 self._verify_local(temp_file.checksum, host_path) 1076 1077 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1078 finally: 1079 if host_dir is not None: 1080 shutil.rmtree(host_dir) 1081 1082 def test_pull_empty(self): 1083 """Pull a directory containing an empty directory from the device.""" 1084 try: 1085 host_dir = tempfile.mkdtemp() 1086 1087 remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty') 1088 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1089 self.device.shell(['mkdir', '-p', remote_empty_path]) 1090 1091 self.device.pull(remote=remote_empty_path, local=host_dir) 1092 self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty'))) 1093 finally: 1094 if host_dir is not None: 1095 shutil.rmtree(host_dir) 1096 1097 def test_multiple_pull(self): 1098 """Pull a randomly generated directory of files from the device.""" 1099 1100 try: 1101 host_dir = tempfile.mkdtemp() 1102 1103 subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir') 1104 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1105 self.device.shell(['mkdir', '-p', subdir]) 1106 1107 # Create some random files and a subdirectory containing more files. 1108 temp_files = make_random_device_files( 1109 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4) 1110 1111 subdir_temp_files = make_random_device_files( 1112 self.device, in_dir=subdir, num_files=4, prefix='subdir_') 1113 1114 paths = map(lambda temp_file: temp_file.full_path, temp_files) 1115 paths.append(subdir) 1116 self.device._simple_call(['pull'] + paths + [host_dir]) 1117 1118 for temp_file in temp_files: 1119 local_path = os.path.join(host_dir, temp_file.base_name) 1120 self._verify_local(temp_file.checksum, local_path) 1121 1122 for subdir_temp_file in subdir_temp_files: 1123 local_path = os.path.join(host_dir, 1124 'subdir', 1125 subdir_temp_file.base_name) 1126 self._verify_local(subdir_temp_file.checksum, local_path) 1127 1128 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1129 finally: 1130 if host_dir is not None: 1131 shutil.rmtree(host_dir) 1132 1133 def test_sync(self): 1134 """Sync a randomly generated directory of files to specified device.""" 1135 1136 try: 1137 base_dir = tempfile.mkdtemp() 1138 1139 # Create mirror device directory hierarchy within base_dir. 1140 full_dir_path = base_dir + self.DEVICE_TEMP_DIR 1141 os.makedirs(full_dir_path) 1142 1143 # Create 32 random files within the host mirror. 1144 temp_files = make_random_host_files(in_dir=full_dir_path, num_files=32) 1145 1146 # Clean up any trash on the device. 1147 device = adb.get_device(product=base_dir) 1148 device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1149 1150 device.sync('data') 1151 1152 # Confirm that every file on the device mirrors that on the host. 1153 for temp_file in temp_files: 1154 device_full_path = posixpath.join(self.DEVICE_TEMP_DIR, 1155 temp_file.base_name) 1156 dev_md5, _ = device.shell( 1157 [get_md5_prog(self.device), device_full_path])[0].split() 1158 self.assertEqual(temp_file.checksum, dev_md5) 1159 1160 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1161 finally: 1162 if base_dir is not None: 1163 shutil.rmtree(base_dir) 1164 1165 def test_unicode_paths(self): 1166 """Ensure that we can support non-ASCII paths, even on Windows.""" 1167 name = u'로보카 폴리' 1168 1169 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1170 remote_path = u'/data/local/tmp/adb-test-{}'.format(name) 1171 1172 ## push. 1173 tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False) 1174 tf.close() 1175 self.device.push(tf.name, remote_path) 1176 os.remove(tf.name) 1177 self.assertFalse(os.path.exists(tf.name)) 1178 1179 # Verify that the device ended up with the expected UTF-8 path 1180 output = self.device.shell( 1181 ['ls', '/data/local/tmp/adb-test-*'])[0].strip() 1182 self.assertEqual(remote_path.encode('utf-8'), output) 1183 1184 # pull. 1185 self.device.pull(remote_path, tf.name) 1186 self.assertTrue(os.path.exists(tf.name)) 1187 os.remove(tf.name) 1188 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1189 1190 1191def main(): 1192 random.seed(0) 1193 if len(adb.get_devices()) > 0: 1194 suite = unittest.TestLoader().loadTestsFromName(__name__) 1195 unittest.TextTestRunner(verbosity=3).run(suite) 1196 else: 1197 print('Test suite must be run with attached devices') 1198 1199 1200if __name__ == '__main__': 1201 main() 1202