1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2015 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#      http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from __future__ import print_function
19
20import contextlib
21import hashlib
22import os
23import posixpath
24import random
25import re
26import shlex
27import shutil
28import signal
29import socket
30import string
31import subprocess
32import sys
33import tempfile
34import time
35import unittest
36
37import mock
38
39import adb
40
41
42def requires_root(func):
43    def wrapper(self, *args):
44        if self.device.get_prop('ro.debuggable') != '1':
45            raise unittest.SkipTest('requires rootable build')
46
47        was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
48        if not was_root:
49            self.device.root()
50            self.device.wait()
51
52        try:
53            func(self, *args)
54        finally:
55            if not was_root:
56                self.device.unroot()
57                self.device.wait()
58
59    return wrapper
60
61
62def requires_non_root(func):
63    def wrapper(self, *args):
64        was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
65        if was_root:
66            self.device.unroot()
67            self.device.wait()
68
69        try:
70            func(self, *args)
71        finally:
72            if was_root:
73                self.device.root()
74                self.device.wait()
75
76    return wrapper
77
78
79class GetDeviceTest(unittest.TestCase):
80    def setUp(self):
81        self.android_serial = os.getenv('ANDROID_SERIAL')
82        if 'ANDROID_SERIAL' in os.environ:
83            del os.environ['ANDROID_SERIAL']
84
85    def tearDown(self):
86        if self.android_serial is not None:
87            os.environ['ANDROID_SERIAL'] = self.android_serial
88        else:
89            if 'ANDROID_SERIAL' in os.environ:
90                del os.environ['ANDROID_SERIAL']
91
92    @mock.patch('adb.device.get_devices')
93    def test_explicit(self, mock_get_devices):
94        mock_get_devices.return_value = ['foo', 'bar']
95        device = adb.get_device('foo')
96        self.assertEqual(device.serial, 'foo')
97
98    @mock.patch('adb.device.get_devices')
99    def test_from_env(self, mock_get_devices):
100        mock_get_devices.return_value = ['foo', 'bar']
101        os.environ['ANDROID_SERIAL'] = 'foo'
102        device = adb.get_device()
103        self.assertEqual(device.serial, 'foo')
104
105    @mock.patch('adb.device.get_devices')
106    def test_arg_beats_env(self, mock_get_devices):
107        mock_get_devices.return_value = ['foo', 'bar']
108        os.environ['ANDROID_SERIAL'] = 'bar'
109        device = adb.get_device('foo')
110        self.assertEqual(device.serial, 'foo')
111
112    @mock.patch('adb.device.get_devices')
113    def test_no_such_device(self, mock_get_devices):
114        mock_get_devices.return_value = ['foo', 'bar']
115        self.assertRaises(adb.DeviceNotFoundError, adb.get_device, ['baz'])
116
117        os.environ['ANDROID_SERIAL'] = 'baz'
118        self.assertRaises(adb.DeviceNotFoundError, adb.get_device)
119
120    @mock.patch('adb.device.get_devices')
121    def test_unique_device(self, mock_get_devices):
122        mock_get_devices.return_value = ['foo']
123        device = adb.get_device()
124        self.assertEqual(device.serial, 'foo')
125
126    @mock.patch('adb.device.get_devices')
127    def test_no_unique_device(self, mock_get_devices):
128        mock_get_devices.return_value = ['foo', 'bar']
129        self.assertRaises(adb.NoUniqueDeviceError, adb.get_device)
130
131
132class DeviceTest(unittest.TestCase):
133    def setUp(self):
134        self.device = adb.get_device()
135
136
137class ForwardReverseTest(DeviceTest):
138    def _test_no_rebind(self, description, direction_list, direction,
139                       direction_no_rebind, direction_remove_all):
140        msg = direction_list()
141        self.assertEqual('', msg.strip(),
142                         description + ' list must be empty to run this test.')
143
144        # Use --no-rebind with no existing binding
145        direction_no_rebind('tcp:5566', 'tcp:6655')
146        msg = direction_list()
147        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
148
149        # Use --no-rebind with existing binding
150        with self.assertRaises(subprocess.CalledProcessError):
151            direction_no_rebind('tcp:5566', 'tcp:6677')
152        msg = direction_list()
153        self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
154        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
155
156        # Use the absence of --no-rebind with existing binding
157        direction('tcp:5566', 'tcp:6677')
158        msg = direction_list()
159        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
160        self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
161
162        direction_remove_all()
163        msg = direction_list()
164        self.assertEqual('', msg.strip())
165
166    def test_forward_no_rebind(self):
167        self._test_no_rebind('forward', self.device.forward_list,
168                            self.device.forward, self.device.forward_no_rebind,
169                            self.device.forward_remove_all)
170
171    def test_reverse_no_rebind(self):
172        self._test_no_rebind('reverse', self.device.reverse_list,
173                            self.device.reverse, self.device.reverse_no_rebind,
174                            self.device.reverse_remove_all)
175
176    def test_forward(self):
177        msg = self.device.forward_list()
178        self.assertEqual('', msg.strip(),
179                         'Forwarding list must be empty to run this test.')
180        self.device.forward('tcp:5566', 'tcp:6655')
181        msg = self.device.forward_list()
182        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
183        self.device.forward('tcp:7788', 'tcp:8877')
184        msg = self.device.forward_list()
185        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
186        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
187        self.device.forward_remove('tcp:5566')
188        msg = self.device.forward_list()
189        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
190        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
191        self.device.forward_remove_all()
192        msg = self.device.forward_list()
193        self.assertEqual('', msg.strip())
194
195    def test_forward_tcp_port_0(self):
196        self.assertEqual('', self.device.forward_list().strip(),
197                         'Forwarding list must be empty to run this test.')
198
199        try:
200            # If resolving TCP port 0 is supported, `adb forward` will print
201            # the actual port number.
202            port = self.device.forward('tcp:0', 'tcp:8888').strip()
203            if not port:
204                raise unittest.SkipTest('Forwarding tcp:0 is not available.')
205
206            self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
207                                      self.device.forward_list()))
208        finally:
209            self.device.forward_remove_all()
210
211    def test_reverse(self):
212        msg = self.device.reverse_list()
213        self.assertEqual('', msg.strip(),
214                         'Reverse forwarding list must be empty to run this test.')
215        self.device.reverse('tcp:5566', 'tcp:6655')
216        msg = self.device.reverse_list()
217        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
218        self.device.reverse('tcp:7788', 'tcp:8877')
219        msg = self.device.reverse_list()
220        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
221        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
222        self.device.reverse_remove('tcp:5566')
223        msg = self.device.reverse_list()
224        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
225        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
226        self.device.reverse_remove_all()
227        msg = self.device.reverse_list()
228        self.assertEqual('', msg.strip())
229
230    def test_reverse_tcp_port_0(self):
231        self.assertEqual('', self.device.reverse_list().strip(),
232                         'Reverse list must be empty to run this test.')
233
234        try:
235            # If resolving TCP port 0 is supported, `adb reverse` will print
236            # the actual port number.
237            port = self.device.reverse('tcp:0', 'tcp:8888').strip()
238            if not port:
239                raise unittest.SkipTest('Reversing tcp:0 is not available.')
240
241            self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
242                                      self.device.reverse_list()))
243        finally:
244            self.device.reverse_remove_all()
245
246    # Note: If you run this test when adb connect'd to a physical device over
247    # TCP, it will fail in adb reverse due to https://code.google.com/p/android/issues/detail?id=189821
248    def test_forward_reverse_echo(self):
249        """Send data through adb forward and read it back via adb reverse"""
250        forward_port = 12345
251        reverse_port = forward_port + 1
252        forward_spec = 'tcp:' + str(forward_port)
253        reverse_spec = 'tcp:' + str(reverse_port)
254        forward_setup = False
255        reverse_setup = False
256
257        try:
258            # listen on localhost:forward_port, connect to remote:forward_port
259            self.device.forward(forward_spec, forward_spec)
260            forward_setup = True
261            # listen on remote:forward_port, connect to localhost:reverse_port
262            self.device.reverse(forward_spec, reverse_spec)
263            reverse_setup = True
264
265            listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
266            with contextlib.closing(listener):
267                # Use SO_REUSEADDR so that subsequent runs of the test can grab
268                # the port even if it is in TIME_WAIT.
269                listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
270
271                # Listen on localhost:reverse_port before connecting to
272                # localhost:forward_port because that will cause adb to connect
273                # back to localhost:reverse_port.
274                listener.bind(('127.0.0.1', reverse_port))
275                listener.listen(4)
276
277                client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
278                with contextlib.closing(client):
279                    # Connect to the listener.
280                    client.connect(('127.0.0.1', forward_port))
281
282                    # Accept the client connection.
283                    accepted_connection, addr = listener.accept()
284                    with contextlib.closing(accepted_connection) as server:
285                        data = 'hello'
286
287                        # Send data into the port setup by adb forward.
288                        client.sendall(data)
289                        # Explicitly close() so that server gets EOF.
290                        client.close()
291
292                        # Verify that the data came back via adb reverse.
293                        self.assertEqual(data, server.makefile().read())
294        finally:
295            if reverse_setup:
296                self.device.reverse_remove(forward_spec)
297            if forward_setup:
298                self.device.forward_remove(forward_spec)
299
300
301class ShellTest(DeviceTest):
302    def _interactive_shell(self, shell_args, input):
303        """Runs an interactive adb shell.
304
305        Args:
306          shell_args: List of string arguments to `adb shell`.
307          input: String input to send to the interactive shell.
308
309        Returns:
310          The remote exit code.
311
312        Raises:
313          unittest.SkipTest: The device doesn't support exit codes.
314        """
315        if not self.device.has_shell_protocol():
316            raise unittest.SkipTest('exit codes are unavailable on this device')
317
318        proc = subprocess.Popen(
319                self.device.adb_cmd + ['shell'] + shell_args,
320                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
321                stderr=subprocess.PIPE)
322        # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
323        # to explicitly add an exit command to close the session from the device
324        # side, plus the necessary newline to complete the interactive command.
325        proc.communicate(input + '; exit\n')
326        return proc.returncode
327
328    def test_cat(self):
329        """Check that we can at least cat a file."""
330        out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
331        elements = out.split()
332        self.assertEqual(len(elements), 2)
333
334        uptime, idle = elements
335        self.assertGreater(float(uptime), 0.0)
336        self.assertGreater(float(idle), 0.0)
337
338    def test_throws_on_failure(self):
339        self.assertRaises(adb.ShellError, self.device.shell, ['false'])
340
341    def test_output_not_stripped(self):
342        out = self.device.shell(['echo', 'foo'])[0]
343        self.assertEqual(out, 'foo' + self.device.linesep)
344
345    def test_shell_nocheck_failure(self):
346        rc, out, _ = self.device.shell_nocheck(['false'])
347        self.assertNotEqual(rc, 0)
348        self.assertEqual(out, '')
349
350    def test_shell_nocheck_output_not_stripped(self):
351        rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
352        self.assertEqual(rc, 0)
353        self.assertEqual(out, 'foo' + self.device.linesep)
354
355    def test_can_distinguish_tricky_results(self):
356        # If result checking on ADB shell is naively implemented as
357        # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
358        # output from the result for a cmd of `echo -n 1`.
359        rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
360        self.assertEqual(rc, 0)
361        self.assertEqual(out, '1')
362
363    def test_line_endings(self):
364        """Ensure that line ending translation is not happening in the pty.
365
366        Bug: http://b/19735063
367        """
368        output = self.device.shell(['uname'])[0]
369        self.assertEqual(output, 'Linux' + self.device.linesep)
370
371    def test_pty_logic(self):
372        """Tests that a PTY is allocated when it should be.
373
374        PTY allocation behavior should match ssh.
375        """
376        def check_pty(args):
377            """Checks adb shell PTY allocation.
378
379            Tests |args| for terminal and non-terminal stdin.
380
381            Args:
382                args: -Tt args in a list (e.g. ['-t', '-t']).
383
384            Returns:
385                A tuple (<terminal>, <non-terminal>). True indicates
386                the corresponding shell allocated a remote PTY.
387            """
388            test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
389
390            terminal = subprocess.Popen(
391                    test_cmd, stdin=None,
392                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
393            terminal.communicate()
394
395            non_terminal = subprocess.Popen(
396                    test_cmd, stdin=subprocess.PIPE,
397                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
398            non_terminal.communicate()
399
400            return (terminal.returncode == 0, non_terminal.returncode == 0)
401
402        # -T: never allocate PTY.
403        self.assertEqual((False, False), check_pty(['-T']))
404
405        # These tests require a new device.
406        if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()):
407            # No args: PTY only if stdin is a terminal and shell is interactive,
408            # which is difficult to reliably test from a script.
409            self.assertEqual((False, False), check_pty([]))
410
411            # -t: PTY if stdin is a terminal.
412            self.assertEqual((True, False), check_pty(['-t']))
413
414        # -t -t: always allocate PTY.
415        self.assertEqual((True, True), check_pty(['-t', '-t']))
416
417        # -tt: always allocate PTY, POSIX style (http://b/32216152).
418        self.assertEqual((True, True), check_pty(['-tt']))
419
420        # -ttt: ssh has weird even/odd behavior with multiple -t flags, but
421        # we follow the man page instead.
422        self.assertEqual((True, True), check_pty(['-ttt']))
423
424        # -ttx: -x and -tt aren't incompatible (though -Tx would be an error).
425        self.assertEqual((True, True), check_pty(['-ttx']))
426
427        # -Ttt: -tt cancels out -T.
428        self.assertEqual((True, True), check_pty(['-Ttt']))
429
430        # -ttT: -T cancels out -tt.
431        self.assertEqual((False, False), check_pty(['-ttT']))
432
433    def test_shell_protocol(self):
434        """Tests the shell protocol on the device.
435
436        If the device supports shell protocol, this gives us the ability
437        to separate stdout/stderr and return the exit code directly.
438
439        Bug: http://b/19734861
440        """
441        if not self.device.has_shell_protocol():
442            raise unittest.SkipTest('shell protocol unsupported on this device')
443
444        # Shell protocol should be used by default.
445        result = self.device.shell_nocheck(
446                shlex.split('echo foo; echo bar >&2; exit 17'))
447        self.assertEqual(17, result[0])
448        self.assertEqual('foo' + self.device.linesep, result[1])
449        self.assertEqual('bar' + self.device.linesep, result[2])
450
451        self.assertEqual(17, self._interactive_shell([], 'exit 17'))
452
453        # -x flag should disable shell protocol.
454        result = self.device.shell_nocheck(
455                shlex.split('-x echo foo; echo bar >&2; exit 17'))
456        self.assertEqual(0, result[0])
457        self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
458        self.assertEqual('', result[2])
459
460        self.assertEqual(0, self._interactive_shell(['-x'], 'exit 17'))
461
462    def test_non_interactive_sigint(self):
463        """Tests that SIGINT in a non-interactive shell kills the process.
464
465        This requires the shell protocol in order to detect the broken
466        pipe; raw data transfer mode will only see the break once the
467        subprocess tries to read or write.
468
469        Bug: http://b/23825725
470        """
471        if not self.device.has_shell_protocol():
472            raise unittest.SkipTest('shell protocol unsupported on this device')
473
474        # Start a long-running process.
475        sleep_proc = subprocess.Popen(
476                self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
477                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
478                stderr=subprocess.STDOUT)
479        remote_pid = sleep_proc.stdout.readline().strip()
480        self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
481        proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
482
483        # Verify that the process is running, send signal, verify it stopped.
484        self.device.shell(proc_query)
485        os.kill(sleep_proc.pid, signal.SIGINT)
486        sleep_proc.communicate()
487
488        # It can take some time for the process to receive the signal and die.
489        end_time = time.time() + 3
490        while self.device.shell_nocheck(proc_query)[0] != 1:
491            self.assertFalse(time.time() > end_time,
492                             'subprocess failed to terminate in time')
493
494    def test_non_interactive_stdin(self):
495        """Tests that non-interactive shells send stdin."""
496        if not self.device.has_shell_protocol():
497            raise unittest.SkipTest('non-interactive stdin unsupported '
498                                    'on this device')
499
500        # Test both small and large inputs.
501        small_input = 'foo'
502        large_input = '\n'.join(c * 100 for c in (string.ascii_letters +
503                                                  string.digits))
504
505        for input in (small_input, large_input):
506            proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
507                                    stdin=subprocess.PIPE,
508                                    stdout=subprocess.PIPE,
509                                    stderr=subprocess.PIPE)
510            stdout, stderr = proc.communicate(input)
511            self.assertEqual(input.splitlines(), stdout.splitlines())
512            self.assertEqual('', stderr)
513
514    def test_sighup(self):
515        """Ensure that SIGHUP gets sent upon non-interactive ctrl-c"""
516        log_path = "/data/local/tmp/adb_signal_test.log"
517
518        # Clear the output file.
519        self.device.shell_nocheck(["echo", ">", log_path])
520
521        script = """
522            trap "echo SIGINT > {path}; exit 0" SIGINT
523            trap "echo SIGHUP > {path}; exit 0" SIGHUP
524            echo Waiting
525            read
526        """.format(path=log_path)
527
528        script = ";".join([x.strip() for x in script.strip().splitlines()])
529
530        process = self.device.shell_popen([script], kill_atexit=False,
531                                          stdin=subprocess.PIPE,
532                                          stdout=subprocess.PIPE)
533
534        self.assertEqual("Waiting\n", process.stdout.readline())
535        process.send_signal(signal.SIGINT)
536        process.wait()
537
538        # Waiting for the local adb to finish is insufficient, since it hangs
539        # up immediately.
540        time.sleep(1)
541
542        stdout, _ = self.device.shell(["cat", log_path])
543        self.assertEqual(stdout.strip(), "SIGHUP")
544
545
546class ArgumentEscapingTest(DeviceTest):
547    def test_shell_escaping(self):
548        """Make sure that argument escaping is somewhat sane."""
549
550        # http://b/19734868
551        # Note that this actually matches ssh(1)'s behavior --- it's
552        # converted to `sh -c echo hello; echo world` which sh interprets
553        # as `sh -c echo` (with an argument to that shell of "hello"),
554        # and then `echo world` back in the first shell.
555        result = self.device.shell(
556            shlex.split("sh -c 'echo hello; echo world'"))[0]
557        result = result.splitlines()
558        self.assertEqual(['', 'world'], result)
559        # If you really wanted "hello" and "world", here's what you'd do:
560        result = self.device.shell(
561            shlex.split(r'echo hello\;echo world'))[0].splitlines()
562        self.assertEqual(['hello', 'world'], result)
563
564        # http://b/15479704
565        result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
566        self.assertEqual('t', result)
567        result = self.device.shell(
568            shlex.split("sh -c 'true && echo t'"))[0].strip()
569        self.assertEqual('t', result)
570
571        # http://b/20564385
572        result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
573        self.assertEqual('t', result)
574        result = self.device.shell(
575            shlex.split(r'echo -n 123\;uname'))[0].strip()
576        self.assertEqual('123Linux', result)
577
578    def test_install_argument_escaping(self):
579        """Make sure that install argument escaping works."""
580        # http://b/20323053, http://b/3090932.
581        for file_suffix in ('-text;ls;1.apk', "-Live Hold'em.apk"):
582            tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
583                                             delete=False)
584            tf.close()
585
586            # Installing bogus .apks fails if the device supports exit codes.
587            try:
588                output = self.device.install(tf.name)
589            except subprocess.CalledProcessError as e:
590                output = e.output
591
592            self.assertIn(file_suffix, output)
593            os.remove(tf.name)
594
595
596class RootUnrootTest(DeviceTest):
597    def _test_root(self):
598        message = self.device.root()
599        if 'adbd cannot run as root in production builds' in message:
600            return
601        self.device.wait()
602        self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
603
604    def _test_unroot(self):
605        self.device.unroot()
606        self.device.wait()
607        self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
608
609    def test_root_unroot(self):
610        """Make sure that adb root and adb unroot work, using id(1)."""
611        if self.device.get_prop('ro.debuggable') != '1':
612            raise unittest.SkipTest('requires rootable build')
613
614        original_user = self.device.shell(['id', '-un'])[0].strip()
615        try:
616            if original_user == 'root':
617                self._test_unroot()
618                self._test_root()
619            elif original_user == 'shell':
620                self._test_root()
621                self._test_unroot()
622        finally:
623            if original_user == 'root':
624                self.device.root()
625            else:
626                self.device.unroot()
627            self.device.wait()
628
629
630class TcpIpTest(DeviceTest):
631    def test_tcpip_failure_raises(self):
632        """adb tcpip requires a port.
633
634        Bug: http://b/22636927
635        """
636        self.assertRaises(
637            subprocess.CalledProcessError, self.device.tcpip, '')
638        self.assertRaises(
639            subprocess.CalledProcessError, self.device.tcpip, 'foo')
640
641
642class SystemPropertiesTest(DeviceTest):
643    def test_get_prop(self):
644        self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
645
646    @requires_root
647    def test_set_prop(self):
648        prop_name = 'foo.bar'
649        self.device.shell(['setprop', prop_name, '""'])
650
651        self.device.set_prop(prop_name, 'qux')
652        self.assertEqual(
653            self.device.shell(['getprop', prop_name])[0].strip(), 'qux')
654
655
656def compute_md5(string):
657    hsh = hashlib.md5()
658    hsh.update(string)
659    return hsh.hexdigest()
660
661
662def get_md5_prog(device):
663    """Older platforms (pre-L) had the name md5 rather than md5sum."""
664    try:
665        device.shell(['md5sum', '/proc/uptime'])
666        return 'md5sum'
667    except adb.ShellError:
668        return 'md5'
669
670
671class HostFile(object):
672    def __init__(self, handle, checksum):
673        self.handle = handle
674        self.checksum = checksum
675        self.full_path = handle.name
676        self.base_name = os.path.basename(self.full_path)
677
678
679class DeviceFile(object):
680    def __init__(self, checksum, full_path):
681        self.checksum = checksum
682        self.full_path = full_path
683        self.base_name = posixpath.basename(self.full_path)
684
685
686def make_random_host_files(in_dir, num_files):
687    min_size = 1 * (1 << 10)
688    max_size = 16 * (1 << 10)
689
690    files = []
691    for _ in xrange(num_files):
692        file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
693
694        size = random.randrange(min_size, max_size, 1024)
695        rand_str = os.urandom(size)
696        file_handle.write(rand_str)
697        file_handle.flush()
698        file_handle.close()
699
700        md5 = compute_md5(rand_str)
701        files.append(HostFile(file_handle, md5))
702    return files
703
704
705def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
706    min_size = 1 * (1 << 10)
707    max_size = 16 * (1 << 10)
708
709    files = []
710    for file_num in xrange(num_files):
711        size = random.randrange(min_size, max_size, 1024)
712
713        base_name = prefix + str(file_num)
714        full_path = posixpath.join(in_dir, base_name)
715
716        device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
717                      'bs={}'.format(size), 'count=1'])
718        dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
719
720        files.append(DeviceFile(dev_md5, full_path))
721    return files
722
723
724class FileOperationsTest(DeviceTest):
725    SCRATCH_DIR = '/data/local/tmp'
726    DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
727    DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
728
729    def _verify_remote(self, checksum, remote_path):
730        dev_md5, _ = self.device.shell([get_md5_prog(self.device),
731                                        remote_path])[0].split()
732        self.assertEqual(checksum, dev_md5)
733
734    def _verify_local(self, checksum, local_path):
735        with open(local_path, 'rb') as host_file:
736            host_md5 = compute_md5(host_file.read())
737            self.assertEqual(host_md5, checksum)
738
739    def test_push(self):
740        """Push a randomly generated file to specified device."""
741        kbytes = 512
742        tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
743        rand_str = os.urandom(1024 * kbytes)
744        tmp.write(rand_str)
745        tmp.close()
746
747        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
748        self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
749
750        self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
751        self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
752
753        os.remove(tmp.name)
754
755    def test_push_dir(self):
756        """Push a randomly generated directory of files to the device."""
757        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
758        self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
759
760        try:
761            host_dir = tempfile.mkdtemp()
762
763            # Make sure the temp directory isn't setuid, or else adb will complain.
764            os.chmod(host_dir, 0o700)
765
766            # Create 32 random files.
767            temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
768            self.device.push(host_dir, self.DEVICE_TEMP_DIR)
769
770            for temp_file in temp_files:
771                remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
772                                             os.path.basename(host_dir),
773                                             temp_file.base_name)
774                self._verify_remote(temp_file.checksum, remote_path)
775            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
776        finally:
777            if host_dir is not None:
778                shutil.rmtree(host_dir)
779
780    @unittest.expectedFailure # b/25566053
781    def test_push_empty(self):
782        """Push a directory containing an empty directory to the device."""
783        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
784        self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
785
786        try:
787            host_dir = tempfile.mkdtemp()
788
789            # Make sure the temp directory isn't setuid, or else adb will complain.
790            os.chmod(host_dir, 0o700)
791
792            # Create an empty directory.
793            os.mkdir(os.path.join(host_dir, 'empty'))
794
795            self.device.push(host_dir, self.DEVICE_TEMP_DIR)
796
797            test_empty_cmd = ['[', '-d',
798                              os.path.join(self.DEVICE_TEMP_DIR, 'empty')]
799            rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
800            self.assertEqual(rc, 0)
801            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
802        finally:
803            if host_dir is not None:
804                shutil.rmtree(host_dir)
805
806    @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows")
807    def test_push_symlink(self):
808        """Push a symlink.
809
810        Bug: http://b/31491920
811        """
812        try:
813            host_dir = tempfile.mkdtemp()
814
815            # Make sure the temp directory isn't setuid, or else adb will
816            # complain.
817            os.chmod(host_dir, 0o700)
818
819            with open(os.path.join(host_dir, 'foo'), 'w') as f:
820                f.write('foo')
821
822            symlink_path = os.path.join(host_dir, 'symlink')
823            os.symlink('foo', symlink_path)
824
825            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
826            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
827            self.device.push(symlink_path, self.DEVICE_TEMP_DIR)
828            rc, out, _ = self.device.shell_nocheck(
829                ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')])
830            self.assertEqual(0, rc)
831            self.assertEqual(out.strip(), 'foo')
832        finally:
833            if host_dir is not None:
834                shutil.rmtree(host_dir)
835
836    def test_multiple_push(self):
837        """Push multiple files to the device in one adb push command.
838
839        Bug: http://b/25324823
840        """
841
842        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
843        self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
844
845        try:
846            host_dir = tempfile.mkdtemp()
847
848            # Create some random files and a subdirectory containing more files.
849            temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
850
851            subdir = os.path.join(host_dir, 'subdir')
852            os.mkdir(subdir)
853            subdir_temp_files = make_random_host_files(in_dir=subdir,
854                                                       num_files=4)
855
856            paths = map(lambda temp_file: temp_file.full_path, temp_files)
857            paths.append(subdir)
858            self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
859
860            for temp_file in temp_files:
861                remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
862                                             temp_file.base_name)
863                self._verify_remote(temp_file.checksum, remote_path)
864
865            for subdir_temp_file in subdir_temp_files:
866                remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
867                                             # BROKEN: http://b/25394682
868                                             # 'subdir';
869                                             temp_file.base_name)
870                self._verify_remote(temp_file.checksum, remote_path)
871
872
873            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
874        finally:
875            if host_dir is not None:
876                shutil.rmtree(host_dir)
877
878    @requires_non_root
879    def test_push_error_reporting(self):
880        """Make sure that errors that occur while pushing a file get reported
881
882        Bug: http://b/26816782
883        """
884        with tempfile.NamedTemporaryFile() as tmp_file:
885            tmp_file.write('\0' * 1024 * 1024)
886            tmp_file.flush()
887            try:
888                self.device.push(local=tmp_file.name, remote='/system/')
889                self.fail('push should not have succeeded')
890            except subprocess.CalledProcessError as e:
891                output = e.output
892
893            self.assertTrue('Permission denied' in output or
894                            'Read-only file system' in output)
895
896    def _test_pull(self, remote_file, checksum):
897        tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
898        tmp_write.close()
899        self.device.pull(remote=remote_file, local=tmp_write.name)
900        with open(tmp_write.name, 'rb') as tmp_read:
901            host_contents = tmp_read.read()
902            host_md5 = compute_md5(host_contents)
903        self.assertEqual(checksum, host_md5)
904        os.remove(tmp_write.name)
905
906    @requires_non_root
907    def test_pull_error_reporting(self):
908        self.device.shell(['touch', self.DEVICE_TEMP_FILE])
909        self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
910
911        try:
912            output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
913        except subprocess.CalledProcessError as e:
914            output = e.output
915
916        self.assertIn('Permission denied', output)
917
918        self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
919
920    def test_pull(self):
921        """Pull a randomly generated file from specified device."""
922        kbytes = 512
923        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
924        cmd = ['dd', 'if=/dev/urandom',
925               'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
926               'count={}'.format(kbytes)]
927        self.device.shell(cmd)
928        dev_md5, _ = self.device.shell(
929            [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
930        self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
931        self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
932
933    def test_pull_dir(self):
934        """Pull a randomly generated directory of files from the device."""
935        try:
936            host_dir = tempfile.mkdtemp()
937
938            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
939            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
940
941            # Populate device directory with random files.
942            temp_files = make_random_device_files(
943                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
944
945            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
946
947            for temp_file in temp_files:
948                host_path = os.path.join(
949                    host_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
950                    temp_file.base_name)
951                self._verify_local(temp_file.checksum, host_path)
952
953            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
954        finally:
955            if host_dir is not None:
956                shutil.rmtree(host_dir)
957
958    def test_pull_dir_symlink(self):
959        """Pull a directory into a symlink to a directory.
960
961        Bug: http://b/27362811
962        """
963        if os.name != 'posix':
964            raise unittest.SkipTest('requires POSIX')
965
966        try:
967            host_dir = tempfile.mkdtemp()
968            real_dir = os.path.join(host_dir, 'dir')
969            symlink = os.path.join(host_dir, 'symlink')
970            os.mkdir(real_dir)
971            os.symlink(real_dir, symlink)
972
973            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
974            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
975
976            # Populate device directory with random files.
977            temp_files = make_random_device_files(
978                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
979
980            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink)
981
982            for temp_file in temp_files:
983                host_path = os.path.join(
984                    real_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
985                    temp_file.base_name)
986                self._verify_local(temp_file.checksum, host_path)
987
988            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
989        finally:
990            if host_dir is not None:
991                shutil.rmtree(host_dir)
992
993    def test_pull_dir_symlink_collision(self):
994        """Pull a directory into a colliding symlink to directory."""
995        if os.name != 'posix':
996            raise unittest.SkipTest('requires POSIX')
997
998        try:
999            host_dir = tempfile.mkdtemp()
1000            real_dir = os.path.join(host_dir, 'real')
1001            tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR)
1002            symlink = os.path.join(host_dir, tmp_dirname)
1003            os.mkdir(real_dir)
1004            os.symlink(real_dir, symlink)
1005
1006            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1007            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1008
1009            # Populate device directory with random files.
1010            temp_files = make_random_device_files(
1011                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1012
1013            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
1014
1015            for temp_file in temp_files:
1016                host_path = os.path.join(real_dir, temp_file.base_name)
1017                self._verify_local(temp_file.checksum, host_path)
1018
1019            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1020        finally:
1021            if host_dir is not None:
1022                shutil.rmtree(host_dir)
1023
1024    def test_pull_dir_nonexistent(self):
1025        """Pull a directory of files from the device to a nonexistent path."""
1026        try:
1027            host_dir = tempfile.mkdtemp()
1028            dest_dir = os.path.join(host_dir, 'dest')
1029
1030            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1031            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1032
1033            # Populate device directory with random files.
1034            temp_files = make_random_device_files(
1035                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1036
1037            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir)
1038
1039            for temp_file in temp_files:
1040                host_path = os.path.join(dest_dir, temp_file.base_name)
1041                self._verify_local(temp_file.checksum, host_path)
1042
1043            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1044        finally:
1045            if host_dir is not None:
1046                shutil.rmtree(host_dir)
1047
1048    def test_pull_symlink_dir(self):
1049        """Pull a symlink to a directory of symlinks to files."""
1050        try:
1051            host_dir = tempfile.mkdtemp()
1052
1053            remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents')
1054            remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links')
1055            remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')
1056
1057            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1058            self.device.shell(['mkdir', '-p', remote_dir, remote_links])
1059            self.device.shell(['ln', '-s', remote_links, remote_symlink])
1060
1061            # Populate device directory with random files.
1062            temp_files = make_random_device_files(
1063                self.device, in_dir=remote_dir, num_files=32)
1064
1065            for temp_file in temp_files:
1066                self.device.shell(
1067                    ['ln', '-s', '../contents/{}'.format(temp_file.base_name),
1068                     posixpath.join(remote_links, temp_file.base_name)])
1069
1070            self.device.pull(remote=remote_symlink, local=host_dir)
1071
1072            for temp_file in temp_files:
1073                host_path = os.path.join(
1074                    host_dir, 'symlink', temp_file.base_name)
1075                self._verify_local(temp_file.checksum, host_path)
1076
1077            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1078        finally:
1079            if host_dir is not None:
1080                shutil.rmtree(host_dir)
1081
1082    def test_pull_empty(self):
1083        """Pull a directory containing an empty directory from the device."""
1084        try:
1085            host_dir = tempfile.mkdtemp()
1086
1087            remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
1088            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1089            self.device.shell(['mkdir', '-p', remote_empty_path])
1090
1091            self.device.pull(remote=remote_empty_path, local=host_dir)
1092            self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
1093        finally:
1094            if host_dir is not None:
1095                shutil.rmtree(host_dir)
1096
1097    def test_multiple_pull(self):
1098        """Pull a randomly generated directory of files from the device."""
1099
1100        try:
1101            host_dir = tempfile.mkdtemp()
1102
1103            subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir')
1104            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1105            self.device.shell(['mkdir', '-p', subdir])
1106
1107            # Create some random files and a subdirectory containing more files.
1108            temp_files = make_random_device_files(
1109                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
1110
1111            subdir_temp_files = make_random_device_files(
1112                self.device, in_dir=subdir, num_files=4, prefix='subdir_')
1113
1114            paths = map(lambda temp_file: temp_file.full_path, temp_files)
1115            paths.append(subdir)
1116            self.device._simple_call(['pull'] + paths + [host_dir])
1117
1118            for temp_file in temp_files:
1119                local_path = os.path.join(host_dir, temp_file.base_name)
1120                self._verify_local(temp_file.checksum, local_path)
1121
1122            for subdir_temp_file in subdir_temp_files:
1123                local_path = os.path.join(host_dir,
1124                                          'subdir',
1125                                          subdir_temp_file.base_name)
1126                self._verify_local(subdir_temp_file.checksum, local_path)
1127
1128            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1129        finally:
1130            if host_dir is not None:
1131                shutil.rmtree(host_dir)
1132
1133    def test_sync(self):
1134        """Sync a randomly generated directory of files to specified device."""
1135
1136        try:
1137            base_dir = tempfile.mkdtemp()
1138
1139            # Create mirror device directory hierarchy within base_dir.
1140            full_dir_path = base_dir + self.DEVICE_TEMP_DIR
1141            os.makedirs(full_dir_path)
1142
1143            # Create 32 random files within the host mirror.
1144            temp_files = make_random_host_files(in_dir=full_dir_path, num_files=32)
1145
1146            # Clean up any trash on the device.
1147            device = adb.get_device(product=base_dir)
1148            device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1149
1150            device.sync('data')
1151
1152            # Confirm that every file on the device mirrors that on the host.
1153            for temp_file in temp_files:
1154                device_full_path = posixpath.join(self.DEVICE_TEMP_DIR,
1155                                                  temp_file.base_name)
1156                dev_md5, _ = device.shell(
1157                    [get_md5_prog(self.device), device_full_path])[0].split()
1158                self.assertEqual(temp_file.checksum, dev_md5)
1159
1160            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1161        finally:
1162            if base_dir is not None:
1163                shutil.rmtree(base_dir)
1164
1165    def test_unicode_paths(self):
1166        """Ensure that we can support non-ASCII paths, even on Windows."""
1167        name = u'로보카 폴리'
1168
1169        self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1170        remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
1171
1172        ## push.
1173        tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
1174        tf.close()
1175        self.device.push(tf.name, remote_path)
1176        os.remove(tf.name)
1177        self.assertFalse(os.path.exists(tf.name))
1178
1179        # Verify that the device ended up with the expected UTF-8 path
1180        output = self.device.shell(
1181                ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
1182        self.assertEqual(remote_path.encode('utf-8'), output)
1183
1184        # pull.
1185        self.device.pull(remote_path, tf.name)
1186        self.assertTrue(os.path.exists(tf.name))
1187        os.remove(tf.name)
1188        self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1189
1190
1191def main():
1192    random.seed(0)
1193    if len(adb.get_devices()) > 0:
1194        suite = unittest.TestLoader().loadTestsFromName(__name__)
1195        unittest.TextTestRunner(verbosity=3).run(suite)
1196    else:
1197        print('Test suite must be run with attached devices')
1198
1199
1200if __name__ == '__main__':
1201    main()
1202