1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2015 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#      http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from __future__ import print_function
19
20import contextlib
21import hashlib
22import os
23import posixpath
24import random
25import re
26import shlex
27import shutil
28import signal
29import socket
30import string
31import subprocess
32import sys
33import tempfile
34import unittest
35
36import mock
37
38import adb
39
40
41def requires_root(func):
42    def wrapper(self, *args):
43        if self.device.get_prop('ro.debuggable') != '1':
44            raise unittest.SkipTest('requires rootable build')
45
46        was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
47        if not was_root:
48            self.device.root()
49            self.device.wait()
50
51        try:
52            func(self, *args)
53        finally:
54            if not was_root:
55                self.device.unroot()
56                self.device.wait()
57
58    return wrapper
59
60
61def requires_non_root(func):
62    def wrapper(self, *args):
63        was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
64        if was_root:
65            self.device.unroot()
66            self.device.wait()
67
68        try:
69            func(self, *args)
70        finally:
71            if was_root:
72                self.device.root()
73                self.device.wait()
74
75    return wrapper
76
77
78class GetDeviceTest(unittest.TestCase):
79    def setUp(self):
80        self.android_serial = os.getenv('ANDROID_SERIAL')
81        if 'ANDROID_SERIAL' in os.environ:
82            del os.environ['ANDROID_SERIAL']
83
84    def tearDown(self):
85        if self.android_serial is not None:
86            os.environ['ANDROID_SERIAL'] = self.android_serial
87        else:
88            if 'ANDROID_SERIAL' in os.environ:
89                del os.environ['ANDROID_SERIAL']
90
91    @mock.patch('adb.device.get_devices')
92    def test_explicit(self, mock_get_devices):
93        mock_get_devices.return_value = ['foo', 'bar']
94        device = adb.get_device('foo')
95        self.assertEqual(device.serial, 'foo')
96
97    @mock.patch('adb.device.get_devices')
98    def test_from_env(self, mock_get_devices):
99        mock_get_devices.return_value = ['foo', 'bar']
100        os.environ['ANDROID_SERIAL'] = 'foo'
101        device = adb.get_device()
102        self.assertEqual(device.serial, 'foo')
103
104    @mock.patch('adb.device.get_devices')
105    def test_arg_beats_env(self, mock_get_devices):
106        mock_get_devices.return_value = ['foo', 'bar']
107        os.environ['ANDROID_SERIAL'] = 'bar'
108        device = adb.get_device('foo')
109        self.assertEqual(device.serial, 'foo')
110
111    @mock.patch('adb.device.get_devices')
112    def test_no_such_device(self, mock_get_devices):
113        mock_get_devices.return_value = ['foo', 'bar']
114        self.assertRaises(adb.DeviceNotFoundError, adb.get_device, ['baz'])
115
116        os.environ['ANDROID_SERIAL'] = 'baz'
117        self.assertRaises(adb.DeviceNotFoundError, adb.get_device)
118
119    @mock.patch('adb.device.get_devices')
120    def test_unique_device(self, mock_get_devices):
121        mock_get_devices.return_value = ['foo']
122        device = adb.get_device()
123        self.assertEqual(device.serial, 'foo')
124
125    @mock.patch('adb.device.get_devices')
126    def test_no_unique_device(self, mock_get_devices):
127        mock_get_devices.return_value = ['foo', 'bar']
128        self.assertRaises(adb.NoUniqueDeviceError, adb.get_device)
129
130
131class DeviceTest(unittest.TestCase):
132    def setUp(self):
133        self.device = adb.get_device()
134
135
136class ForwardReverseTest(DeviceTest):
137    def _test_no_rebind(self, description, direction_list, direction,
138                       direction_no_rebind, direction_remove_all):
139        msg = direction_list()
140        self.assertEqual('', msg.strip(),
141                         description + ' list must be empty to run this test.')
142
143        # Use --no-rebind with no existing binding
144        direction_no_rebind('tcp:5566', 'tcp:6655')
145        msg = direction_list()
146        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
147
148        # Use --no-rebind with existing binding
149        with self.assertRaises(subprocess.CalledProcessError):
150            direction_no_rebind('tcp:5566', 'tcp:6677')
151        msg = direction_list()
152        self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
153        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
154
155        # Use the absence of --no-rebind with existing binding
156        direction('tcp:5566', 'tcp:6677')
157        msg = direction_list()
158        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
159        self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
160
161        direction_remove_all()
162        msg = direction_list()
163        self.assertEqual('', msg.strip())
164
165    def test_forward_no_rebind(self):
166        self._test_no_rebind('forward', self.device.forward_list,
167                            self.device.forward, self.device.forward_no_rebind,
168                            self.device.forward_remove_all)
169
170    def test_reverse_no_rebind(self):
171        self._test_no_rebind('reverse', self.device.reverse_list,
172                            self.device.reverse, self.device.reverse_no_rebind,
173                            self.device.reverse_remove_all)
174
175    def test_forward(self):
176        msg = self.device.forward_list()
177        self.assertEqual('', msg.strip(),
178                         'Forwarding list must be empty to run this test.')
179        self.device.forward('tcp:5566', 'tcp:6655')
180        msg = self.device.forward_list()
181        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
182        self.device.forward('tcp:7788', 'tcp:8877')
183        msg = self.device.forward_list()
184        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
185        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
186        self.device.forward_remove('tcp:5566')
187        msg = self.device.forward_list()
188        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
189        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
190        self.device.forward_remove_all()
191        msg = self.device.forward_list()
192        self.assertEqual('', msg.strip())
193
194    def test_reverse(self):
195        msg = self.device.reverse_list()
196        self.assertEqual('', msg.strip(),
197                         'Reverse forwarding list must be empty to run this test.')
198        self.device.reverse('tcp:5566', 'tcp:6655')
199        msg = self.device.reverse_list()
200        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
201        self.device.reverse('tcp:7788', 'tcp:8877')
202        msg = self.device.reverse_list()
203        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
204        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
205        self.device.reverse_remove('tcp:5566')
206        msg = self.device.reverse_list()
207        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
208        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
209        self.device.reverse_remove_all()
210        msg = self.device.reverse_list()
211        self.assertEqual('', msg.strip())
212
213    # Note: If you run this test when adb connect'd to a physical device over
214    # TCP, it will fail in adb reverse due to https://code.google.com/p/android/issues/detail?id=189821
215    def test_forward_reverse_echo(self):
216        """Send data through adb forward and read it back via adb reverse"""
217        forward_port = 12345
218        reverse_port = forward_port + 1
219        forward_spec = 'tcp:' + str(forward_port)
220        reverse_spec = 'tcp:' + str(reverse_port)
221        forward_setup = False
222        reverse_setup = False
223
224        try:
225            # listen on localhost:forward_port, connect to remote:forward_port
226            self.device.forward(forward_spec, forward_spec)
227            forward_setup = True
228            # listen on remote:forward_port, connect to localhost:reverse_port
229            self.device.reverse(forward_spec, reverse_spec)
230            reverse_setup = True
231
232            listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
233            with contextlib.closing(listener):
234                # Use SO_REUSEADDR so that subsequent runs of the test can grab
235                # the port even if it is in TIME_WAIT.
236                listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
237
238                # Listen on localhost:reverse_port before connecting to
239                # localhost:forward_port because that will cause adb to connect
240                # back to localhost:reverse_port.
241                listener.bind(('127.0.0.1', reverse_port))
242                listener.listen(4)
243
244                client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
245                with contextlib.closing(client):
246                    # Connect to the listener.
247                    client.connect(('127.0.0.1', forward_port))
248
249                    # Accept the client connection.
250                    accepted_connection, addr = listener.accept()
251                    with contextlib.closing(accepted_connection) as server:
252                        data = 'hello'
253
254                        # Send data into the port setup by adb forward.
255                        client.sendall(data)
256                        # Explicitly close() so that server gets EOF.
257                        client.close()
258
259                        # Verify that the data came back via adb reverse.
260                        self.assertEqual(data, server.makefile().read())
261        finally:
262            if reverse_setup:
263                self.device.reverse_remove(forward_spec)
264            if forward_setup:
265                self.device.forward_remove(forward_spec)
266
267
268class ShellTest(DeviceTest):
269    def _interactive_shell(self, shell_args, input):
270        """Runs an interactive adb shell.
271
272        Args:
273          shell_args: List of string arguments to `adb shell`.
274          input: String input to send to the interactive shell.
275
276        Returns:
277          The remote exit code.
278
279        Raises:
280          unittest.SkipTest: The device doesn't support exit codes.
281        """
282        if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
283            raise unittest.SkipTest('exit codes are unavailable on this device')
284
285        proc = subprocess.Popen(
286                self.device.adb_cmd + ['shell'] + shell_args,
287                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
288                stderr=subprocess.PIPE)
289        # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
290        # to explicitly add an exit command to close the session from the device
291        # side, plus the necessary newline to complete the interactive command.
292        proc.communicate(input + '; exit\n')
293        return proc.returncode
294
295    def test_cat(self):
296        """Check that we can at least cat a file."""
297        out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
298        elements = out.split()
299        self.assertEqual(len(elements), 2)
300
301        uptime, idle = elements
302        self.assertGreater(float(uptime), 0.0)
303        self.assertGreater(float(idle), 0.0)
304
305    def test_throws_on_failure(self):
306        self.assertRaises(adb.ShellError, self.device.shell, ['false'])
307
308    def test_output_not_stripped(self):
309        out = self.device.shell(['echo', 'foo'])[0]
310        self.assertEqual(out, 'foo' + self.device.linesep)
311
312    def test_shell_nocheck_failure(self):
313        rc, out, _ = self.device.shell_nocheck(['false'])
314        self.assertNotEqual(rc, 0)
315        self.assertEqual(out, '')
316
317    def test_shell_nocheck_output_not_stripped(self):
318        rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
319        self.assertEqual(rc, 0)
320        self.assertEqual(out, 'foo' + self.device.linesep)
321
322    def test_can_distinguish_tricky_results(self):
323        # If result checking on ADB shell is naively implemented as
324        # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
325        # output from the result for a cmd of `echo -n 1`.
326        rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
327        self.assertEqual(rc, 0)
328        self.assertEqual(out, '1')
329
330    def test_line_endings(self):
331        """Ensure that line ending translation is not happening in the pty.
332
333        Bug: http://b/19735063
334        """
335        output = self.device.shell(['uname'])[0]
336        self.assertEqual(output, 'Linux' + self.device.linesep)
337
338    def test_pty_logic(self):
339        """Tests that a PTY is allocated when it should be.
340
341        PTY allocation behavior should match ssh; some behavior requires
342        a terminal stdin to test so this test will be skipped if stdin
343        is not a terminal.
344        """
345        if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
346            raise unittest.SkipTest('PTY arguments unsupported on this device')
347        if not os.isatty(sys.stdin.fileno()):
348            raise unittest.SkipTest('PTY tests require stdin terminal')
349
350        def check_pty(args):
351            """Checks adb shell PTY allocation.
352
353            Tests |args| for terminal and non-terminal stdin.
354
355            Args:
356                args: -Tt args in a list (e.g. ['-t', '-t']).
357
358            Returns:
359                A tuple (<terminal>, <non-terminal>). True indicates
360                the corresponding shell allocated a remote PTY.
361            """
362            test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
363
364            terminal = subprocess.Popen(
365                    test_cmd, stdin=None,
366                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
367            terminal.communicate()
368
369            non_terminal = subprocess.Popen(
370                    test_cmd, stdin=subprocess.PIPE,
371                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
372            non_terminal.communicate()
373
374            return (terminal.returncode == 0, non_terminal.returncode == 0)
375
376        # -T: never allocate PTY.
377        self.assertEqual((False, False), check_pty(['-T']))
378
379        # No args: PTY only if stdin is a terminal and shell is interactive,
380        # which is difficult to reliably test from a script.
381        self.assertEqual((False, False), check_pty([]))
382
383        # -t: PTY if stdin is a terminal.
384        self.assertEqual((True, False), check_pty(['-t']))
385
386        # -t -t: always allocate PTY.
387        self.assertEqual((True, True), check_pty(['-t', '-t']))
388
389    def test_shell_protocol(self):
390        """Tests the shell protocol on the device.
391
392        If the device supports shell protocol, this gives us the ability
393        to separate stdout/stderr and return the exit code directly.
394
395        Bug: http://b/19734861
396        """
397        if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
398            raise unittest.SkipTest('shell protocol unsupported on this device')
399
400        # Shell protocol should be used by default.
401        result = self.device.shell_nocheck(
402                shlex.split('echo foo; echo bar >&2; exit 17'))
403        self.assertEqual(17, result[0])
404        self.assertEqual('foo' + self.device.linesep, result[1])
405        self.assertEqual('bar' + self.device.linesep, result[2])
406
407        self.assertEqual(17, self._interactive_shell([], 'exit 17'))
408
409        # -x flag should disable shell protocol.
410        result = self.device.shell_nocheck(
411                shlex.split('-x echo foo; echo bar >&2; exit 17'))
412        self.assertEqual(0, result[0])
413        self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
414        self.assertEqual('', result[2])
415
416        self.assertEqual(0, self._interactive_shell(['-x'], 'exit 17'))
417
418    def test_non_interactive_sigint(self):
419        """Tests that SIGINT in a non-interactive shell kills the process.
420
421        This requires the shell protocol in order to detect the broken
422        pipe; raw data transfer mode will only see the break once the
423        subprocess tries to read or write.
424
425        Bug: http://b/23825725
426        """
427        if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
428            raise unittest.SkipTest('shell protocol unsupported on this device')
429
430        # Start a long-running process.
431        sleep_proc = subprocess.Popen(
432                self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
433                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
434                stderr=subprocess.STDOUT)
435        remote_pid = sleep_proc.stdout.readline().strip()
436        self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
437        proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
438
439        # Verify that the process is running, send signal, verify it stopped.
440        self.device.shell(proc_query)
441        os.kill(sleep_proc.pid, signal.SIGINT)
442        sleep_proc.communicate()
443        self.assertEqual(1, self.device.shell_nocheck(proc_query)[0],
444                         'subprocess failed to terminate')
445
446    def test_non_interactive_stdin(self):
447        """Tests that non-interactive shells send stdin."""
448        if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
449            raise unittest.SkipTest('non-interactive stdin unsupported '
450                                    'on this device')
451
452        # Test both small and large inputs.
453        small_input = 'foo'
454        large_input = '\n'.join(c * 100 for c in (string.ascii_letters +
455                                                  string.digits))
456
457        for input in (small_input, large_input):
458            proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
459                                    stdin=subprocess.PIPE,
460                                    stdout=subprocess.PIPE,
461                                    stderr=subprocess.PIPE)
462            stdout, stderr = proc.communicate(input)
463            self.assertEqual(input.splitlines(), stdout.splitlines())
464            self.assertEqual('', stderr)
465
466
467class ArgumentEscapingTest(DeviceTest):
468    def test_shell_escaping(self):
469        """Make sure that argument escaping is somewhat sane."""
470
471        # http://b/19734868
472        # Note that this actually matches ssh(1)'s behavior --- it's
473        # converted to `sh -c echo hello; echo world` which sh interprets
474        # as `sh -c echo` (with an argument to that shell of "hello"),
475        # and then `echo world` back in the first shell.
476        result = self.device.shell(
477            shlex.split("sh -c 'echo hello; echo world'"))[0]
478        result = result.splitlines()
479        self.assertEqual(['', 'world'], result)
480        # If you really wanted "hello" and "world", here's what you'd do:
481        result = self.device.shell(
482            shlex.split(r'echo hello\;echo world'))[0].splitlines()
483        self.assertEqual(['hello', 'world'], result)
484
485        # http://b/15479704
486        result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
487        self.assertEqual('t', result)
488        result = self.device.shell(
489            shlex.split("sh -c 'true && echo t'"))[0].strip()
490        self.assertEqual('t', result)
491
492        # http://b/20564385
493        result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
494        self.assertEqual('t', result)
495        result = self.device.shell(
496            shlex.split(r'echo -n 123\;uname'))[0].strip()
497        self.assertEqual('123Linux', result)
498
499    def test_install_argument_escaping(self):
500        """Make sure that install argument escaping works."""
501        # http://b/20323053, http://b/3090932.
502        for file_suffix in ('-text;ls;1.apk', "-Live Hold'em.apk"):
503            tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
504                                             delete=False)
505            tf.close()
506
507            # Installing bogus .apks fails if the device supports exit codes.
508            try:
509                output = self.device.install(tf.name)
510            except subprocess.CalledProcessError as e:
511                output = e.output
512
513            self.assertIn(file_suffix, output)
514            os.remove(tf.name)
515
516
517class RootUnrootTest(DeviceTest):
518    def _test_root(self):
519        message = self.device.root()
520        if 'adbd cannot run as root in production builds' in message:
521            return
522        self.device.wait()
523        self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
524
525    def _test_unroot(self):
526        self.device.unroot()
527        self.device.wait()
528        self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
529
530    def test_root_unroot(self):
531        """Make sure that adb root and adb unroot work, using id(1)."""
532        if self.device.get_prop('ro.debuggable') != '1':
533            raise unittest.SkipTest('requires rootable build')
534
535        original_user = self.device.shell(['id', '-un'])[0].strip()
536        try:
537            if original_user == 'root':
538                self._test_unroot()
539                self._test_root()
540            elif original_user == 'shell':
541                self._test_root()
542                self._test_unroot()
543        finally:
544            if original_user == 'root':
545                self.device.root()
546            else:
547                self.device.unroot()
548            self.device.wait()
549
550
551class TcpIpTest(DeviceTest):
552    def test_tcpip_failure_raises(self):
553        """adb tcpip requires a port.
554
555        Bug: http://b/22636927
556        """
557        self.assertRaises(
558            subprocess.CalledProcessError, self.device.tcpip, '')
559        self.assertRaises(
560            subprocess.CalledProcessError, self.device.tcpip, 'foo')
561
562
563class SystemPropertiesTest(DeviceTest):
564    def test_get_prop(self):
565        self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
566
567    @requires_root
568    def test_set_prop(self):
569        prop_name = 'foo.bar'
570        self.device.shell(['setprop', prop_name, '""'])
571
572        self.device.set_prop(prop_name, 'qux')
573        self.assertEqual(
574            self.device.shell(['getprop', prop_name])[0].strip(), 'qux')
575
576
577def compute_md5(string):
578    hsh = hashlib.md5()
579    hsh.update(string)
580    return hsh.hexdigest()
581
582
583def get_md5_prog(device):
584    """Older platforms (pre-L) had the name md5 rather than md5sum."""
585    try:
586        device.shell(['md5sum', '/proc/uptime'])
587        return 'md5sum'
588    except adb.ShellError:
589        return 'md5'
590
591
592class HostFile(object):
593    def __init__(self, handle, checksum):
594        self.handle = handle
595        self.checksum = checksum
596        self.full_path = handle.name
597        self.base_name = os.path.basename(self.full_path)
598
599
600class DeviceFile(object):
601    def __init__(self, checksum, full_path):
602        self.checksum = checksum
603        self.full_path = full_path
604        self.base_name = posixpath.basename(self.full_path)
605
606
607def make_random_host_files(in_dir, num_files):
608    min_size = 1 * (1 << 10)
609    max_size = 16 * (1 << 10)
610
611    files = []
612    for _ in xrange(num_files):
613        file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
614
615        size = random.randrange(min_size, max_size, 1024)
616        rand_str = os.urandom(size)
617        file_handle.write(rand_str)
618        file_handle.flush()
619        file_handle.close()
620
621        md5 = compute_md5(rand_str)
622        files.append(HostFile(file_handle, md5))
623    return files
624
625
626def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
627    min_size = 1 * (1 << 10)
628    max_size = 16 * (1 << 10)
629
630    files = []
631    for file_num in xrange(num_files):
632        size = random.randrange(min_size, max_size, 1024)
633
634        base_name = prefix + str(file_num)
635        full_path = posixpath.join(in_dir, base_name)
636
637        device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
638                      'bs={}'.format(size), 'count=1'])
639        dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
640
641        files.append(DeviceFile(dev_md5, full_path))
642    return files
643
644
645class FileOperationsTest(DeviceTest):
646    SCRATCH_DIR = '/data/local/tmp'
647    DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
648    DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
649
650    def _verify_remote(self, checksum, remote_path):
651        dev_md5, _ = self.device.shell([get_md5_prog(self.device),
652                                        remote_path])[0].split()
653        self.assertEqual(checksum, dev_md5)
654
655    def _verify_local(self, checksum, local_path):
656        with open(local_path, 'rb') as host_file:
657            host_md5 = compute_md5(host_file.read())
658            self.assertEqual(host_md5, checksum)
659
660    def test_push(self):
661        """Push a randomly generated file to specified device."""
662        kbytes = 512
663        tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
664        rand_str = os.urandom(1024 * kbytes)
665        tmp.write(rand_str)
666        tmp.close()
667
668        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
669        self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
670
671        self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
672        self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
673
674        os.remove(tmp.name)
675
676    def test_push_dir(self):
677        """Push a randomly generated directory of files to the device."""
678        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
679        self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
680
681        try:
682            host_dir = tempfile.mkdtemp()
683
684            # Make sure the temp directory isn't setuid, or else adb will complain.
685            os.chmod(host_dir, 0o700)
686
687            # Create 32 random files.
688            temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
689            self.device.push(host_dir, self.DEVICE_TEMP_DIR)
690
691            for temp_file in temp_files:
692                remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
693                                             os.path.basename(host_dir),
694                                             temp_file.base_name)
695                self._verify_remote(temp_file.checksum, remote_path)
696            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
697        finally:
698            if host_dir is not None:
699                shutil.rmtree(host_dir)
700
701    @unittest.expectedFailure # b/25566053
702    def test_push_empty(self):
703        """Push a directory containing an empty directory to the device."""
704        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
705        self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
706
707        try:
708            host_dir = tempfile.mkdtemp()
709
710            # Make sure the temp directory isn't setuid, or else adb will complain.
711            os.chmod(host_dir, 0o700)
712
713            # Create an empty directory.
714            os.mkdir(os.path.join(host_dir, 'empty'))
715
716            self.device.push(host_dir, self.DEVICE_TEMP_DIR)
717
718            test_empty_cmd = ['[', '-d',
719                              os.path.join(self.DEVICE_TEMP_DIR, 'empty')]
720            rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
721            self.assertEqual(rc, 0)
722            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
723        finally:
724            if host_dir is not None:
725                shutil.rmtree(host_dir)
726
727    def test_multiple_push(self):
728        """Push multiple files to the device in one adb push command.
729
730        Bug: http://b/25324823
731        """
732
733        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
734        self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
735
736        try:
737            host_dir = tempfile.mkdtemp()
738
739            # Create some random files and a subdirectory containing more files.
740            temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
741
742            subdir = os.path.join(host_dir, 'subdir')
743            os.mkdir(subdir)
744            subdir_temp_files = make_random_host_files(in_dir=subdir,
745                                                       num_files=4)
746
747            paths = map(lambda temp_file: temp_file.full_path, temp_files)
748            paths.append(subdir)
749            self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
750
751            for temp_file in temp_files:
752                remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
753                                             temp_file.base_name)
754                self._verify_remote(temp_file.checksum, remote_path)
755
756            for subdir_temp_file in subdir_temp_files:
757                remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
758                                             # BROKEN: http://b/25394682
759                                             # 'subdir';
760                                             temp_file.base_name)
761                self._verify_remote(temp_file.checksum, remote_path)
762
763
764            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
765        finally:
766            if host_dir is not None:
767                shutil.rmtree(host_dir)
768
769    @requires_non_root
770    def test_push_error_reporting(self):
771        """Make sure that errors that occur while pushing a file get reported
772
773        Bug: http://b/26816782
774        """
775        with tempfile.NamedTemporaryFile() as tmp_file:
776            tmp_file.write('\0' * 1024 * 1024)
777            tmp_file.flush()
778            try:
779                self.device.push(local=tmp_file.name, remote='/system/')
780                self.fail('push should not have succeeded')
781            except subprocess.CalledProcessError as e:
782                output = e.output
783
784            self.assertIn('Permission denied', output)
785
786    def _test_pull(self, remote_file, checksum):
787        tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
788        tmp_write.close()
789        self.device.pull(remote=remote_file, local=tmp_write.name)
790        with open(tmp_write.name, 'rb') as tmp_read:
791            host_contents = tmp_read.read()
792            host_md5 = compute_md5(host_contents)
793        self.assertEqual(checksum, host_md5)
794        os.remove(tmp_write.name)
795
796    @requires_non_root
797    def test_pull_error_reporting(self):
798        self.device.shell(['touch', self.DEVICE_TEMP_FILE])
799        self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
800
801        try:
802            output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
803        except subprocess.CalledProcessError as e:
804            output = e.output
805
806        self.assertIn('Permission denied', output)
807
808        self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
809
810    def test_pull(self):
811        """Pull a randomly generated file from specified device."""
812        kbytes = 512
813        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
814        cmd = ['dd', 'if=/dev/urandom',
815               'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
816               'count={}'.format(kbytes)]
817        self.device.shell(cmd)
818        dev_md5, _ = self.device.shell(
819            [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
820        self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
821        self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
822
823    def test_pull_dir(self):
824        """Pull a randomly generated directory of files from the device."""
825        try:
826            host_dir = tempfile.mkdtemp()
827
828            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
829            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
830
831            # Populate device directory with random files.
832            temp_files = make_random_device_files(
833                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
834
835            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
836
837            for temp_file in temp_files:
838                host_path = os.path.join(
839                    host_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
840                    temp_file.base_name)
841                self._verify_local(temp_file.checksum, host_path)
842
843            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
844        finally:
845            if host_dir is not None:
846                shutil.rmtree(host_dir)
847
848    def test_pull_dir_symlink(self):
849        """Pull a directory into a symlink to a directory.
850
851        Bug: http://b/27362811
852        """
853        if os.name != 'posix':
854            raise unittest.SkipTest('requires POSIX')
855
856        try:
857            host_dir = tempfile.mkdtemp()
858            real_dir = os.path.join(host_dir, 'dir')
859            symlink = os.path.join(host_dir, 'symlink')
860            os.mkdir(real_dir)
861            os.symlink(real_dir, symlink)
862
863            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
864            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
865
866            # Populate device directory with random files.
867            temp_files = make_random_device_files(
868                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
869
870            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink)
871
872            for temp_file in temp_files:
873                host_path = os.path.join(
874                    real_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
875                    temp_file.base_name)
876                self._verify_local(temp_file.checksum, host_path)
877
878            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
879        finally:
880            if host_dir is not None:
881                shutil.rmtree(host_dir)
882
883    def test_pull_dir_symlink_collision(self):
884        """Pull a directory into a colliding symlink to directory."""
885        if os.name != 'posix':
886            raise unittest.SkipTest('requires POSIX')
887
888        try:
889            host_dir = tempfile.mkdtemp()
890            real_dir = os.path.join(host_dir, 'real')
891            tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR)
892            symlink = os.path.join(host_dir, tmp_dirname)
893            os.mkdir(real_dir)
894            os.symlink(real_dir, symlink)
895
896            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
897            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
898
899            # Populate device directory with random files.
900            temp_files = make_random_device_files(
901                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
902
903            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
904
905            for temp_file in temp_files:
906                host_path = os.path.join(real_dir, temp_file.base_name)
907                self._verify_local(temp_file.checksum, host_path)
908
909            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
910        finally:
911            if host_dir is not None:
912                shutil.rmtree(host_dir)
913
914    def test_pull_dir_nonexistent(self):
915        """Pull a directory of files from the device to a nonexistent path."""
916        try:
917            host_dir = tempfile.mkdtemp()
918            dest_dir = os.path.join(host_dir, 'dest')
919
920            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
921            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
922
923            # Populate device directory with random files.
924            temp_files = make_random_device_files(
925                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
926
927            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir)
928
929            for temp_file in temp_files:
930                host_path = os.path.join(dest_dir, temp_file.base_name)
931                self._verify_local(temp_file.checksum, host_path)
932
933            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
934        finally:
935            if host_dir is not None:
936                shutil.rmtree(host_dir)
937
938    def test_pull_symlink_dir(self):
939        """Pull a symlink to a directory of symlinks to files."""
940        try:
941            host_dir = tempfile.mkdtemp()
942
943            remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents')
944            remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links')
945            remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')
946
947            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
948            self.device.shell(['mkdir', '-p', remote_dir, remote_links])
949            self.device.shell(['ln', '-s', remote_links, remote_symlink])
950
951            # Populate device directory with random files.
952            temp_files = make_random_device_files(
953                self.device, in_dir=remote_dir, num_files=32)
954
955            for temp_file in temp_files:
956                self.device.shell(
957                    ['ln', '-s', '../contents/{}'.format(temp_file.base_name),
958                     posixpath.join(remote_links, temp_file.base_name)])
959
960            self.device.pull(remote=remote_symlink, local=host_dir)
961
962            for temp_file in temp_files:
963                host_path = os.path.join(
964                    host_dir, 'symlink', temp_file.base_name)
965                self._verify_local(temp_file.checksum, host_path)
966
967            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
968        finally:
969            if host_dir is not None:
970                shutil.rmtree(host_dir)
971
972    def test_pull_empty(self):
973        """Pull a directory containing an empty directory from the device."""
974        try:
975            host_dir = tempfile.mkdtemp()
976
977            remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
978            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
979            self.device.shell(['mkdir', '-p', remote_empty_path])
980
981            self.device.pull(remote=remote_empty_path, local=host_dir)
982            self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
983        finally:
984            if host_dir is not None:
985                shutil.rmtree(host_dir)
986
987    def test_multiple_pull(self):
988        """Pull a randomly generated directory of files from the device."""
989
990        try:
991            host_dir = tempfile.mkdtemp()
992
993            subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir')
994            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
995            self.device.shell(['mkdir', '-p', subdir])
996
997            # Create some random files and a subdirectory containing more files.
998            temp_files = make_random_device_files(
999                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
1000
1001            subdir_temp_files = make_random_device_files(
1002                self.device, in_dir=subdir, num_files=4, prefix='subdir_')
1003
1004            paths = map(lambda temp_file: temp_file.full_path, temp_files)
1005            paths.append(subdir)
1006            self.device._simple_call(['pull'] + paths + [host_dir])
1007
1008            for temp_file in temp_files:
1009                local_path = os.path.join(host_dir, temp_file.base_name)
1010                self._verify_local(temp_file.checksum, local_path)
1011
1012            for subdir_temp_file in subdir_temp_files:
1013                local_path = os.path.join(host_dir,
1014                                          'subdir',
1015                                          subdir_temp_file.base_name)
1016                self._verify_local(subdir_temp_file.checksum, local_path)
1017
1018            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1019        finally:
1020            if host_dir is not None:
1021                shutil.rmtree(host_dir)
1022
1023    def test_sync(self):
1024        """Sync a randomly generated directory of files to specified device."""
1025
1026        try:
1027            base_dir = tempfile.mkdtemp()
1028
1029            # Create mirror device directory hierarchy within base_dir.
1030            full_dir_path = base_dir + self.DEVICE_TEMP_DIR
1031            os.makedirs(full_dir_path)
1032
1033            # Create 32 random files within the host mirror.
1034            temp_files = make_random_host_files(in_dir=full_dir_path, num_files=32)
1035
1036            # Clean up any trash on the device.
1037            device = adb.get_device(product=base_dir)
1038            device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1039
1040            device.sync('data')
1041
1042            # Confirm that every file on the device mirrors that on the host.
1043            for temp_file in temp_files:
1044                device_full_path = posixpath.join(self.DEVICE_TEMP_DIR,
1045                                                  temp_file.base_name)
1046                dev_md5, _ = device.shell(
1047                    [get_md5_prog(self.device), device_full_path])[0].split()
1048                self.assertEqual(temp_file.checksum, dev_md5)
1049
1050            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1051        finally:
1052            if base_dir is not None:
1053                shutil.rmtree(base_dir)
1054
1055    def test_unicode_paths(self):
1056        """Ensure that we can support non-ASCII paths, even on Windows."""
1057        name = u'로보카 폴리'
1058
1059        self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1060        remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
1061
1062        ## push.
1063        tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
1064        tf.close()
1065        self.device.push(tf.name, remote_path)
1066        os.remove(tf.name)
1067        self.assertFalse(os.path.exists(tf.name))
1068
1069        # Verify that the device ended up with the expected UTF-8 path
1070        output = self.device.shell(
1071                ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
1072        self.assertEqual(remote_path.encode('utf-8'), output)
1073
1074        # pull.
1075        self.device.pull(remote_path, tf.name)
1076        self.assertTrue(os.path.exists(tf.name))
1077        os.remove(tf.name)
1078        self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1079
1080
1081def main():
1082    random.seed(0)
1083    if len(adb.get_devices()) > 0:
1084        suite = unittest.TestLoader().loadTestsFromName(__name__)
1085        unittest.TextTestRunner(verbosity=3).run(suite)
1086    else:
1087        print('Test suite must be run with attached devices')
1088
1089
1090if __name__ == '__main__':
1091    main()
1092