test_device.py revision e76b9f3dde800b44c4151ebee9ff469b6714d8aa
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2015 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#      http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from __future__ import print_function
19
20import contextlib
21import hashlib
22import os
23import posixpath
24import random
25import re
26import shlex
27import shutil
28import signal
29import socket
30import string
31import subprocess
32import sys
33import tempfile
34import time
35import unittest
36
37import mock
38
39import adb
40
41
42def requires_root(func):
43    def wrapper(self, *args):
44        if self.device.get_prop('ro.debuggable') != '1':
45            raise unittest.SkipTest('requires rootable build')
46
47        was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
48        if not was_root:
49            self.device.root()
50            self.device.wait()
51
52        try:
53            func(self, *args)
54        finally:
55            if not was_root:
56                self.device.unroot()
57                self.device.wait()
58
59    return wrapper
60
61
62def requires_non_root(func):
63    def wrapper(self, *args):
64        was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
65        if was_root:
66            self.device.unroot()
67            self.device.wait()
68
69        try:
70            func(self, *args)
71        finally:
72            if was_root:
73                self.device.root()
74                self.device.wait()
75
76    return wrapper
77
78
79class GetDeviceTest(unittest.TestCase):
80    def setUp(self):
81        self.android_serial = os.getenv('ANDROID_SERIAL')
82        if 'ANDROID_SERIAL' in os.environ:
83            del os.environ['ANDROID_SERIAL']
84
85    def tearDown(self):
86        if self.android_serial is not None:
87            os.environ['ANDROID_SERIAL'] = self.android_serial
88        else:
89            if 'ANDROID_SERIAL' in os.environ:
90                del os.environ['ANDROID_SERIAL']
91
92    @mock.patch('adb.device.get_devices')
93    def test_explicit(self, mock_get_devices):
94        mock_get_devices.return_value = ['foo', 'bar']
95        device = adb.get_device('foo')
96        self.assertEqual(device.serial, 'foo')
97
98    @mock.patch('adb.device.get_devices')
99    def test_from_env(self, mock_get_devices):
100        mock_get_devices.return_value = ['foo', 'bar']
101        os.environ['ANDROID_SERIAL'] = 'foo'
102        device = adb.get_device()
103        self.assertEqual(device.serial, 'foo')
104
105    @mock.patch('adb.device.get_devices')
106    def test_arg_beats_env(self, mock_get_devices):
107        mock_get_devices.return_value = ['foo', 'bar']
108        os.environ['ANDROID_SERIAL'] = 'bar'
109        device = adb.get_device('foo')
110        self.assertEqual(device.serial, 'foo')
111
112    @mock.patch('adb.device.get_devices')
113    def test_no_such_device(self, mock_get_devices):
114        mock_get_devices.return_value = ['foo', 'bar']
115        self.assertRaises(adb.DeviceNotFoundError, adb.get_device, ['baz'])
116
117        os.environ['ANDROID_SERIAL'] = 'baz'
118        self.assertRaises(adb.DeviceNotFoundError, adb.get_device)
119
120    @mock.patch('adb.device.get_devices')
121    def test_unique_device(self, mock_get_devices):
122        mock_get_devices.return_value = ['foo']
123        device = adb.get_device()
124        self.assertEqual(device.serial, 'foo')
125
126    @mock.patch('adb.device.get_devices')
127    def test_no_unique_device(self, mock_get_devices):
128        mock_get_devices.return_value = ['foo', 'bar']
129        self.assertRaises(adb.NoUniqueDeviceError, adb.get_device)
130
131
132class DeviceTest(unittest.TestCase):
133    def setUp(self):
134        self.device = adb.get_device()
135
136
137class ForwardReverseTest(DeviceTest):
138    def _test_no_rebind(self, description, direction_list, direction,
139                       direction_no_rebind, direction_remove_all):
140        msg = direction_list()
141        self.assertEqual('', msg.strip(),
142                         description + ' list must be empty to run this test.')
143
144        # Use --no-rebind with no existing binding
145        direction_no_rebind('tcp:5566', 'tcp:6655')
146        msg = direction_list()
147        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
148
149        # Use --no-rebind with existing binding
150        with self.assertRaises(subprocess.CalledProcessError):
151            direction_no_rebind('tcp:5566', 'tcp:6677')
152        msg = direction_list()
153        self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
154        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
155
156        # Use the absence of --no-rebind with existing binding
157        direction('tcp:5566', 'tcp:6677')
158        msg = direction_list()
159        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
160        self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
161
162        direction_remove_all()
163        msg = direction_list()
164        self.assertEqual('', msg.strip())
165
166    def test_forward_no_rebind(self):
167        self._test_no_rebind('forward', self.device.forward_list,
168                            self.device.forward, self.device.forward_no_rebind,
169                            self.device.forward_remove_all)
170
171    def test_reverse_no_rebind(self):
172        self._test_no_rebind('reverse', self.device.reverse_list,
173                            self.device.reverse, self.device.reverse_no_rebind,
174                            self.device.reverse_remove_all)
175
176    def test_forward(self):
177        msg = self.device.forward_list()
178        self.assertEqual('', msg.strip(),
179                         'Forwarding list must be empty to run this test.')
180        self.device.forward('tcp:5566', 'tcp:6655')
181        msg = self.device.forward_list()
182        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
183        self.device.forward('tcp:7788', 'tcp:8877')
184        msg = self.device.forward_list()
185        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
186        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
187        self.device.forward_remove('tcp:5566')
188        msg = self.device.forward_list()
189        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
190        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
191        self.device.forward_remove_all()
192        msg = self.device.forward_list()
193        self.assertEqual('', msg.strip())
194
195    def test_forward_tcp_port_0(self):
196        self.assertEqual('', self.device.forward_list().strip(),
197                         'Forwarding list must be empty to run this test.')
198
199        try:
200            # If resolving TCP port 0 is supported, `adb forward` will print
201            # the actual port number.
202            port = self.device.forward('tcp:0', 'tcp:8888').strip()
203            if not port:
204                raise unittest.SkipTest('Forwarding tcp:0 is not available.')
205
206            self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
207                                      self.device.forward_list()))
208        finally:
209            self.device.forward_remove_all()
210
211    def test_reverse(self):
212        msg = self.device.reverse_list()
213        self.assertEqual('', msg.strip(),
214                         'Reverse forwarding list must be empty to run this test.')
215        self.device.reverse('tcp:5566', 'tcp:6655')
216        msg = self.device.reverse_list()
217        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
218        self.device.reverse('tcp:7788', 'tcp:8877')
219        msg = self.device.reverse_list()
220        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
221        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
222        self.device.reverse_remove('tcp:5566')
223        msg = self.device.reverse_list()
224        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
225        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
226        self.device.reverse_remove_all()
227        msg = self.device.reverse_list()
228        self.assertEqual('', msg.strip())
229
230    def test_reverse_tcp_port_0(self):
231        self.assertEqual('', self.device.reverse_list().strip(),
232                         'Reverse list must be empty to run this test.')
233
234        try:
235            # If resolving TCP port 0 is supported, `adb reverse` will print
236            # the actual port number.
237            port = self.device.reverse('tcp:0', 'tcp:8888').strip()
238            if not port:
239                raise unittest.SkipTest('Reversing tcp:0 is not available.')
240
241            self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
242                                      self.device.reverse_list()))
243        finally:
244            self.device.reverse_remove_all()
245
246    # Note: If you run this test when adb connect'd to a physical device over
247    # TCP, it will fail in adb reverse due to https://code.google.com/p/android/issues/detail?id=189821
248    def test_forward_reverse_echo(self):
249        """Send data through adb forward and read it back via adb reverse"""
250        forward_port = 12345
251        reverse_port = forward_port + 1
252        forward_spec = 'tcp:' + str(forward_port)
253        reverse_spec = 'tcp:' + str(reverse_port)
254        forward_setup = False
255        reverse_setup = False
256
257        try:
258            # listen on localhost:forward_port, connect to remote:forward_port
259            self.device.forward(forward_spec, forward_spec)
260            forward_setup = True
261            # listen on remote:forward_port, connect to localhost:reverse_port
262            self.device.reverse(forward_spec, reverse_spec)
263            reverse_setup = True
264
265            listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
266            with contextlib.closing(listener):
267                # Use SO_REUSEADDR so that subsequent runs of the test can grab
268                # the port even if it is in TIME_WAIT.
269                listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
270
271                # Listen on localhost:reverse_port before connecting to
272                # localhost:forward_port because that will cause adb to connect
273                # back to localhost:reverse_port.
274                listener.bind(('127.0.0.1', reverse_port))
275                listener.listen(4)
276
277                client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
278                with contextlib.closing(client):
279                    # Connect to the listener.
280                    client.connect(('127.0.0.1', forward_port))
281
282                    # Accept the client connection.
283                    accepted_connection, addr = listener.accept()
284                    with contextlib.closing(accepted_connection) as server:
285                        data = 'hello'
286
287                        # Send data into the port setup by adb forward.
288                        client.sendall(data)
289                        # Explicitly close() so that server gets EOF.
290                        client.close()
291
292                        # Verify that the data came back via adb reverse.
293                        self.assertEqual(data, server.makefile().read())
294        finally:
295            if reverse_setup:
296                self.device.reverse_remove(forward_spec)
297            if forward_setup:
298                self.device.forward_remove(forward_spec)
299
300
301class ShellTest(DeviceTest):
302    def _interactive_shell(self, shell_args, input):
303        """Runs an interactive adb shell.
304
305        Args:
306          shell_args: List of string arguments to `adb shell`.
307          input: String input to send to the interactive shell.
308
309        Returns:
310          The remote exit code.
311
312        Raises:
313          unittest.SkipTest: The device doesn't support exit codes.
314        """
315        if not self.device.has_shell_protocol():
316            raise unittest.SkipTest('exit codes are unavailable on this device')
317
318        proc = subprocess.Popen(
319                self.device.adb_cmd + ['shell'] + shell_args,
320                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
321                stderr=subprocess.PIPE)
322        # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
323        # to explicitly add an exit command to close the session from the device
324        # side, plus the necessary newline to complete the interactive command.
325        proc.communicate(input + '; exit\n')
326        return proc.returncode
327
328    def test_cat(self):
329        """Check that we can at least cat a file."""
330        out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
331        elements = out.split()
332        self.assertEqual(len(elements), 2)
333
334        uptime, idle = elements
335        self.assertGreater(float(uptime), 0.0)
336        self.assertGreater(float(idle), 0.0)
337
338    def test_throws_on_failure(self):
339        self.assertRaises(adb.ShellError, self.device.shell, ['false'])
340
341    def test_output_not_stripped(self):
342        out = self.device.shell(['echo', 'foo'])[0]
343        self.assertEqual(out, 'foo' + self.device.linesep)
344
345    def test_shell_nocheck_failure(self):
346        rc, out, _ = self.device.shell_nocheck(['false'])
347        self.assertNotEqual(rc, 0)
348        self.assertEqual(out, '')
349
350    def test_shell_nocheck_output_not_stripped(self):
351        rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
352        self.assertEqual(rc, 0)
353        self.assertEqual(out, 'foo' + self.device.linesep)
354
355    def test_can_distinguish_tricky_results(self):
356        # If result checking on ADB shell is naively implemented as
357        # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
358        # output from the result for a cmd of `echo -n 1`.
359        rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
360        self.assertEqual(rc, 0)
361        self.assertEqual(out, '1')
362
363    def test_line_endings(self):
364        """Ensure that line ending translation is not happening in the pty.
365
366        Bug: http://b/19735063
367        """
368        output = self.device.shell(['uname'])[0]
369        self.assertEqual(output, 'Linux' + self.device.linesep)
370
371    def test_pty_logic(self):
372        """Tests that a PTY is allocated when it should be.
373
374        PTY allocation behavior should match ssh; some behavior requires
375        a terminal stdin to test so this test will be skipped if stdin
376        is not a terminal.
377        """
378        if not self.device.has_shell_protocol():
379            raise unittest.SkipTest('PTY arguments unsupported on this device')
380        if not os.isatty(sys.stdin.fileno()):
381            raise unittest.SkipTest('PTY tests require stdin terminal')
382
383        def check_pty(args):
384            """Checks adb shell PTY allocation.
385
386            Tests |args| for terminal and non-terminal stdin.
387
388            Args:
389                args: -Tt args in a list (e.g. ['-t', '-t']).
390
391            Returns:
392                A tuple (<terminal>, <non-terminal>). True indicates
393                the corresponding shell allocated a remote PTY.
394            """
395            test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
396
397            terminal = subprocess.Popen(
398                    test_cmd, stdin=None,
399                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
400            terminal.communicate()
401
402            non_terminal = subprocess.Popen(
403                    test_cmd, stdin=subprocess.PIPE,
404                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
405            non_terminal.communicate()
406
407            return (terminal.returncode == 0, non_terminal.returncode == 0)
408
409        # -T: never allocate PTY.
410        self.assertEqual((False, False), check_pty(['-T']))
411
412        # No args: PTY only if stdin is a terminal and shell is interactive,
413        # which is difficult to reliably test from a script.
414        self.assertEqual((False, False), check_pty([]))
415
416        # -t: PTY if stdin is a terminal.
417        self.assertEqual((True, False), check_pty(['-t']))
418
419        # -t -t: always allocate PTY.
420        self.assertEqual((True, True), check_pty(['-t', '-t']))
421
422    def test_shell_protocol(self):
423        """Tests the shell protocol on the device.
424
425        If the device supports shell protocol, this gives us the ability
426        to separate stdout/stderr and return the exit code directly.
427
428        Bug: http://b/19734861
429        """
430        if not self.device.has_shell_protocol():
431            raise unittest.SkipTest('shell protocol unsupported on this device')
432
433        # Shell protocol should be used by default.
434        result = self.device.shell_nocheck(
435                shlex.split('echo foo; echo bar >&2; exit 17'))
436        self.assertEqual(17, result[0])
437        self.assertEqual('foo' + self.device.linesep, result[1])
438        self.assertEqual('bar' + self.device.linesep, result[2])
439
440        self.assertEqual(17, self._interactive_shell([], 'exit 17'))
441
442        # -x flag should disable shell protocol.
443        result = self.device.shell_nocheck(
444                shlex.split('-x echo foo; echo bar >&2; exit 17'))
445        self.assertEqual(0, result[0])
446        self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
447        self.assertEqual('', result[2])
448
449        self.assertEqual(0, self._interactive_shell(['-x'], 'exit 17'))
450
451    def test_non_interactive_sigint(self):
452        """Tests that SIGINT in a non-interactive shell kills the process.
453
454        This requires the shell protocol in order to detect the broken
455        pipe; raw data transfer mode will only see the break once the
456        subprocess tries to read or write.
457
458        Bug: http://b/23825725
459        """
460        if not self.device.has_shell_protocol():
461            raise unittest.SkipTest('shell protocol unsupported on this device')
462
463        # Start a long-running process.
464        sleep_proc = subprocess.Popen(
465                self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
466                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
467                stderr=subprocess.STDOUT)
468        remote_pid = sleep_proc.stdout.readline().strip()
469        self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
470        proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
471
472        # Verify that the process is running, send signal, verify it stopped.
473        self.device.shell(proc_query)
474        os.kill(sleep_proc.pid, signal.SIGINT)
475        sleep_proc.communicate()
476
477        # It can take some time for the process to receive the signal and die.
478        end_time = time.time() + 3
479        while self.device.shell_nocheck(proc_query)[0] != 1:
480            self.assertFalse(time.time() > end_time,
481                             'subprocess failed to terminate in time')
482
483    def test_non_interactive_stdin(self):
484        """Tests that non-interactive shells send stdin."""
485        if not self.device.has_shell_protocol():
486            raise unittest.SkipTest('non-interactive stdin unsupported '
487                                    'on this device')
488
489        # Test both small and large inputs.
490        small_input = 'foo'
491        large_input = '\n'.join(c * 100 for c in (string.ascii_letters +
492                                                  string.digits))
493
494        for input in (small_input, large_input):
495            proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
496                                    stdin=subprocess.PIPE,
497                                    stdout=subprocess.PIPE,
498                                    stderr=subprocess.PIPE)
499            stdout, stderr = proc.communicate(input)
500            self.assertEqual(input.splitlines(), stdout.splitlines())
501            self.assertEqual('', stderr)
502
503    def test_sighup(self):
504        """Ensure that SIGHUP gets sent upon non-interactive ctrl-c"""
505        log_path = "/data/local/tmp/adb_signal_test.log"
506
507        # Clear the output file.
508        self.device.shell_nocheck(["echo", ">", log_path])
509
510        script = """
511            trap "echo SIGINT > {path}; exit 0" SIGINT
512            trap "echo SIGHUP > {path}; exit 0" SIGHUP
513            echo Waiting
514            while true; do sleep 100; done
515        """.format(path=log_path)
516
517        script = ";".join([x.strip() for x in script.strip().splitlines()])
518
519        process = self.device.shell_popen(
520            ["sh", "-c", "'{}'".format(script)], kill_atexit=False, stdout=subprocess.PIPE)
521
522        self.assertEqual("Waiting\n", process.stdout.readline())
523        process.send_signal(signal.SIGINT)
524        process.wait()
525
526        # Waiting for the local adb to finish is insufficient, since it hangs
527        # up immediately.
528        time.sleep(0.25)
529
530        stdout, _ = self.device.shell(["cat", log_path])
531        self.assertEqual(stdout.strip(), "SIGHUP")
532
533
534class ArgumentEscapingTest(DeviceTest):
535    def test_shell_escaping(self):
536        """Make sure that argument escaping is somewhat sane."""
537
538        # http://b/19734868
539        # Note that this actually matches ssh(1)'s behavior --- it's
540        # converted to `sh -c echo hello; echo world` which sh interprets
541        # as `sh -c echo` (with an argument to that shell of "hello"),
542        # and then `echo world` back in the first shell.
543        result = self.device.shell(
544            shlex.split("sh -c 'echo hello; echo world'"))[0]
545        result = result.splitlines()
546        self.assertEqual(['', 'world'], result)
547        # If you really wanted "hello" and "world", here's what you'd do:
548        result = self.device.shell(
549            shlex.split(r'echo hello\;echo world'))[0].splitlines()
550        self.assertEqual(['hello', 'world'], result)
551
552        # http://b/15479704
553        result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
554        self.assertEqual('t', result)
555        result = self.device.shell(
556            shlex.split("sh -c 'true && echo t'"))[0].strip()
557        self.assertEqual('t', result)
558
559        # http://b/20564385
560        result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
561        self.assertEqual('t', result)
562        result = self.device.shell(
563            shlex.split(r'echo -n 123\;uname'))[0].strip()
564        self.assertEqual('123Linux', result)
565
566    def test_install_argument_escaping(self):
567        """Make sure that install argument escaping works."""
568        # http://b/20323053, http://b/3090932.
569        for file_suffix in ('-text;ls;1.apk', "-Live Hold'em.apk"):
570            tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
571                                             delete=False)
572            tf.close()
573
574            # Installing bogus .apks fails if the device supports exit codes.
575            try:
576                output = self.device.install(tf.name)
577            except subprocess.CalledProcessError as e:
578                output = e.output
579
580            self.assertIn(file_suffix, output)
581            os.remove(tf.name)
582
583
584class RootUnrootTest(DeviceTest):
585    def _test_root(self):
586        message = self.device.root()
587        if 'adbd cannot run as root in production builds' in message:
588            return
589        self.device.wait()
590        self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
591
592    def _test_unroot(self):
593        self.device.unroot()
594        self.device.wait()
595        self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
596
597    def test_root_unroot(self):
598        """Make sure that adb root and adb unroot work, using id(1)."""
599        if self.device.get_prop('ro.debuggable') != '1':
600            raise unittest.SkipTest('requires rootable build')
601
602        original_user = self.device.shell(['id', '-un'])[0].strip()
603        try:
604            if original_user == 'root':
605                self._test_unroot()
606                self._test_root()
607            elif original_user == 'shell':
608                self._test_root()
609                self._test_unroot()
610        finally:
611            if original_user == 'root':
612                self.device.root()
613            else:
614                self.device.unroot()
615            self.device.wait()
616
617
618class TcpIpTest(DeviceTest):
619    def test_tcpip_failure_raises(self):
620        """adb tcpip requires a port.
621
622        Bug: http://b/22636927
623        """
624        self.assertRaises(
625            subprocess.CalledProcessError, self.device.tcpip, '')
626        self.assertRaises(
627            subprocess.CalledProcessError, self.device.tcpip, 'foo')
628
629
630class SystemPropertiesTest(DeviceTest):
631    def test_get_prop(self):
632        self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
633
634    @requires_root
635    def test_set_prop(self):
636        prop_name = 'foo.bar'
637        self.device.shell(['setprop', prop_name, '""'])
638
639        self.device.set_prop(prop_name, 'qux')
640        self.assertEqual(
641            self.device.shell(['getprop', prop_name])[0].strip(), 'qux')
642
643
644def compute_md5(string):
645    hsh = hashlib.md5()
646    hsh.update(string)
647    return hsh.hexdigest()
648
649
650def get_md5_prog(device):
651    """Older platforms (pre-L) had the name md5 rather than md5sum."""
652    try:
653        device.shell(['md5sum', '/proc/uptime'])
654        return 'md5sum'
655    except adb.ShellError:
656        return 'md5'
657
658
659class HostFile(object):
660    def __init__(self, handle, checksum):
661        self.handle = handle
662        self.checksum = checksum
663        self.full_path = handle.name
664        self.base_name = os.path.basename(self.full_path)
665
666
667class DeviceFile(object):
668    def __init__(self, checksum, full_path):
669        self.checksum = checksum
670        self.full_path = full_path
671        self.base_name = posixpath.basename(self.full_path)
672
673
674def make_random_host_files(in_dir, num_files):
675    min_size = 1 * (1 << 10)
676    max_size = 16 * (1 << 10)
677
678    files = []
679    for _ in xrange(num_files):
680        file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
681
682        size = random.randrange(min_size, max_size, 1024)
683        rand_str = os.urandom(size)
684        file_handle.write(rand_str)
685        file_handle.flush()
686        file_handle.close()
687
688        md5 = compute_md5(rand_str)
689        files.append(HostFile(file_handle, md5))
690    return files
691
692
693def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
694    min_size = 1 * (1 << 10)
695    max_size = 16 * (1 << 10)
696
697    files = []
698    for file_num in xrange(num_files):
699        size = random.randrange(min_size, max_size, 1024)
700
701        base_name = prefix + str(file_num)
702        full_path = posixpath.join(in_dir, base_name)
703
704        device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
705                      'bs={}'.format(size), 'count=1'])
706        dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
707
708        files.append(DeviceFile(dev_md5, full_path))
709    return files
710
711
712class FileOperationsTest(DeviceTest):
713    SCRATCH_DIR = '/data/local/tmp'
714    DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
715    DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
716
717    def _verify_remote(self, checksum, remote_path):
718        dev_md5, _ = self.device.shell([get_md5_prog(self.device),
719                                        remote_path])[0].split()
720        self.assertEqual(checksum, dev_md5)
721
722    def _verify_local(self, checksum, local_path):
723        with open(local_path, 'rb') as host_file:
724            host_md5 = compute_md5(host_file.read())
725            self.assertEqual(host_md5, checksum)
726
727    def test_push(self):
728        """Push a randomly generated file to specified device."""
729        kbytes = 512
730        tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
731        rand_str = os.urandom(1024 * kbytes)
732        tmp.write(rand_str)
733        tmp.close()
734
735        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
736        self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
737
738        self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
739        self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
740
741        os.remove(tmp.name)
742
743    def test_push_dir(self):
744        """Push a randomly generated directory of files to the device."""
745        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
746        self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
747
748        try:
749            host_dir = tempfile.mkdtemp()
750
751            # Make sure the temp directory isn't setuid, or else adb will complain.
752            os.chmod(host_dir, 0o700)
753
754            # Create 32 random files.
755            temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
756            self.device.push(host_dir, self.DEVICE_TEMP_DIR)
757
758            for temp_file in temp_files:
759                remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
760                                             os.path.basename(host_dir),
761                                             temp_file.base_name)
762                self._verify_remote(temp_file.checksum, remote_path)
763            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
764        finally:
765            if host_dir is not None:
766                shutil.rmtree(host_dir)
767
768    @unittest.expectedFailure # b/25566053
769    def test_push_empty(self):
770        """Push a directory containing an empty directory to the device."""
771        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
772        self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
773
774        try:
775            host_dir = tempfile.mkdtemp()
776
777            # Make sure the temp directory isn't setuid, or else adb will complain.
778            os.chmod(host_dir, 0o700)
779
780            # Create an empty directory.
781            os.mkdir(os.path.join(host_dir, 'empty'))
782
783            self.device.push(host_dir, self.DEVICE_TEMP_DIR)
784
785            test_empty_cmd = ['[', '-d',
786                              os.path.join(self.DEVICE_TEMP_DIR, 'empty')]
787            rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
788            self.assertEqual(rc, 0)
789            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
790        finally:
791            if host_dir is not None:
792                shutil.rmtree(host_dir)
793
794    @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows")
795    def test_push_symlink(self):
796        """Push a symlink.
797
798        Bug: http://b/31491920
799        """
800        try:
801            host_dir = tempfile.mkdtemp()
802
803            # Make sure the temp directory isn't setuid, or else adb will
804            # complain.
805            os.chmod(host_dir, 0o700)
806
807            with open(os.path.join(host_dir, 'foo'), 'w') as f:
808                f.write('foo')
809
810            symlink_path = os.path.join(host_dir, 'symlink')
811            os.symlink('foo', symlink_path)
812
813            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
814            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
815            self.device.push(symlink_path, self.DEVICE_TEMP_DIR)
816            rc, out, _ = self.device.shell_nocheck(
817                ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')])
818            self.assertEqual(0, rc)
819            self.assertEqual(out.strip(), 'foo')
820        finally:
821            if host_dir is not None:
822                shutil.rmtree(host_dir)
823
824    def test_multiple_push(self):
825        """Push multiple files to the device in one adb push command.
826
827        Bug: http://b/25324823
828        """
829
830        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
831        self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
832
833        try:
834            host_dir = tempfile.mkdtemp()
835
836            # Create some random files and a subdirectory containing more files.
837            temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
838
839            subdir = os.path.join(host_dir, 'subdir')
840            os.mkdir(subdir)
841            subdir_temp_files = make_random_host_files(in_dir=subdir,
842                                                       num_files=4)
843
844            paths = map(lambda temp_file: temp_file.full_path, temp_files)
845            paths.append(subdir)
846            self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
847
848            for temp_file in temp_files:
849                remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
850                                             temp_file.base_name)
851                self._verify_remote(temp_file.checksum, remote_path)
852
853            for subdir_temp_file in subdir_temp_files:
854                remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
855                                             # BROKEN: http://b/25394682
856                                             # 'subdir';
857                                             temp_file.base_name)
858                self._verify_remote(temp_file.checksum, remote_path)
859
860
861            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
862        finally:
863            if host_dir is not None:
864                shutil.rmtree(host_dir)
865
866    @requires_non_root
867    def test_push_error_reporting(self):
868        """Make sure that errors that occur while pushing a file get reported
869
870        Bug: http://b/26816782
871        """
872        with tempfile.NamedTemporaryFile() as tmp_file:
873            tmp_file.write('\0' * 1024 * 1024)
874            tmp_file.flush()
875            try:
876                self.device.push(local=tmp_file.name, remote='/system/')
877                self.fail('push should not have succeeded')
878            except subprocess.CalledProcessError as e:
879                output = e.output
880
881            self.assertIn('Permission denied', output)
882
883    def _test_pull(self, remote_file, checksum):
884        tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
885        tmp_write.close()
886        self.device.pull(remote=remote_file, local=tmp_write.name)
887        with open(tmp_write.name, 'rb') as tmp_read:
888            host_contents = tmp_read.read()
889            host_md5 = compute_md5(host_contents)
890        self.assertEqual(checksum, host_md5)
891        os.remove(tmp_write.name)
892
893    @requires_non_root
894    def test_pull_error_reporting(self):
895        self.device.shell(['touch', self.DEVICE_TEMP_FILE])
896        self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
897
898        try:
899            output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
900        except subprocess.CalledProcessError as e:
901            output = e.output
902
903        self.assertIn('Permission denied', output)
904
905        self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
906
907    def test_pull(self):
908        """Pull a randomly generated file from specified device."""
909        kbytes = 512
910        self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
911        cmd = ['dd', 'if=/dev/urandom',
912               'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
913               'count={}'.format(kbytes)]
914        self.device.shell(cmd)
915        dev_md5, _ = self.device.shell(
916            [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
917        self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
918        self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
919
920    def test_pull_dir(self):
921        """Pull a randomly generated directory of files from the device."""
922        try:
923            host_dir = tempfile.mkdtemp()
924
925            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
926            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
927
928            # Populate device directory with random files.
929            temp_files = make_random_device_files(
930                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
931
932            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
933
934            for temp_file in temp_files:
935                host_path = os.path.join(
936                    host_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
937                    temp_file.base_name)
938                self._verify_local(temp_file.checksum, host_path)
939
940            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
941        finally:
942            if host_dir is not None:
943                shutil.rmtree(host_dir)
944
945    def test_pull_dir_symlink(self):
946        """Pull a directory into a symlink to a directory.
947
948        Bug: http://b/27362811
949        """
950        if os.name != 'posix':
951            raise unittest.SkipTest('requires POSIX')
952
953        try:
954            host_dir = tempfile.mkdtemp()
955            real_dir = os.path.join(host_dir, 'dir')
956            symlink = os.path.join(host_dir, 'symlink')
957            os.mkdir(real_dir)
958            os.symlink(real_dir, symlink)
959
960            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
961            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
962
963            # Populate device directory with random files.
964            temp_files = make_random_device_files(
965                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
966
967            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink)
968
969            for temp_file in temp_files:
970                host_path = os.path.join(
971                    real_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
972                    temp_file.base_name)
973                self._verify_local(temp_file.checksum, host_path)
974
975            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
976        finally:
977            if host_dir is not None:
978                shutil.rmtree(host_dir)
979
980    def test_pull_dir_symlink_collision(self):
981        """Pull a directory into a colliding symlink to directory."""
982        if os.name != 'posix':
983            raise unittest.SkipTest('requires POSIX')
984
985        try:
986            host_dir = tempfile.mkdtemp()
987            real_dir = os.path.join(host_dir, 'real')
988            tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR)
989            symlink = os.path.join(host_dir, tmp_dirname)
990            os.mkdir(real_dir)
991            os.symlink(real_dir, symlink)
992
993            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
994            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
995
996            # Populate device directory with random files.
997            temp_files = make_random_device_files(
998                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
999
1000            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
1001
1002            for temp_file in temp_files:
1003                host_path = os.path.join(real_dir, temp_file.base_name)
1004                self._verify_local(temp_file.checksum, host_path)
1005
1006            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1007        finally:
1008            if host_dir is not None:
1009                shutil.rmtree(host_dir)
1010
1011    def test_pull_dir_nonexistent(self):
1012        """Pull a directory of files from the device to a nonexistent path."""
1013        try:
1014            host_dir = tempfile.mkdtemp()
1015            dest_dir = os.path.join(host_dir, 'dest')
1016
1017            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1018            self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1019
1020            # Populate device directory with random files.
1021            temp_files = make_random_device_files(
1022                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1023
1024            self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir)
1025
1026            for temp_file in temp_files:
1027                host_path = os.path.join(dest_dir, temp_file.base_name)
1028                self._verify_local(temp_file.checksum, host_path)
1029
1030            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1031        finally:
1032            if host_dir is not None:
1033                shutil.rmtree(host_dir)
1034
1035    def test_pull_symlink_dir(self):
1036        """Pull a symlink to a directory of symlinks to files."""
1037        try:
1038            host_dir = tempfile.mkdtemp()
1039
1040            remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents')
1041            remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links')
1042            remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')
1043
1044            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1045            self.device.shell(['mkdir', '-p', remote_dir, remote_links])
1046            self.device.shell(['ln', '-s', remote_links, remote_symlink])
1047
1048            # Populate device directory with random files.
1049            temp_files = make_random_device_files(
1050                self.device, in_dir=remote_dir, num_files=32)
1051
1052            for temp_file in temp_files:
1053                self.device.shell(
1054                    ['ln', '-s', '../contents/{}'.format(temp_file.base_name),
1055                     posixpath.join(remote_links, temp_file.base_name)])
1056
1057            self.device.pull(remote=remote_symlink, local=host_dir)
1058
1059            for temp_file in temp_files:
1060                host_path = os.path.join(
1061                    host_dir, 'symlink', temp_file.base_name)
1062                self._verify_local(temp_file.checksum, host_path)
1063
1064            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1065        finally:
1066            if host_dir is not None:
1067                shutil.rmtree(host_dir)
1068
1069    def test_pull_empty(self):
1070        """Pull a directory containing an empty directory from the device."""
1071        try:
1072            host_dir = tempfile.mkdtemp()
1073
1074            remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
1075            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1076            self.device.shell(['mkdir', '-p', remote_empty_path])
1077
1078            self.device.pull(remote=remote_empty_path, local=host_dir)
1079            self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
1080        finally:
1081            if host_dir is not None:
1082                shutil.rmtree(host_dir)
1083
1084    def test_multiple_pull(self):
1085        """Pull a randomly generated directory of files from the device."""
1086
1087        try:
1088            host_dir = tempfile.mkdtemp()
1089
1090            subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir')
1091            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1092            self.device.shell(['mkdir', '-p', subdir])
1093
1094            # Create some random files and a subdirectory containing more files.
1095            temp_files = make_random_device_files(
1096                self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
1097
1098            subdir_temp_files = make_random_device_files(
1099                self.device, in_dir=subdir, num_files=4, prefix='subdir_')
1100
1101            paths = map(lambda temp_file: temp_file.full_path, temp_files)
1102            paths.append(subdir)
1103            self.device._simple_call(['pull'] + paths + [host_dir])
1104
1105            for temp_file in temp_files:
1106                local_path = os.path.join(host_dir, temp_file.base_name)
1107                self._verify_local(temp_file.checksum, local_path)
1108
1109            for subdir_temp_file in subdir_temp_files:
1110                local_path = os.path.join(host_dir,
1111                                          'subdir',
1112                                          subdir_temp_file.base_name)
1113                self._verify_local(subdir_temp_file.checksum, local_path)
1114
1115            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1116        finally:
1117            if host_dir is not None:
1118                shutil.rmtree(host_dir)
1119
1120    def test_sync(self):
1121        """Sync a randomly generated directory of files to specified device."""
1122
1123        try:
1124            base_dir = tempfile.mkdtemp()
1125
1126            # Create mirror device directory hierarchy within base_dir.
1127            full_dir_path = base_dir + self.DEVICE_TEMP_DIR
1128            os.makedirs(full_dir_path)
1129
1130            # Create 32 random files within the host mirror.
1131            temp_files = make_random_host_files(in_dir=full_dir_path, num_files=32)
1132
1133            # Clean up any trash on the device.
1134            device = adb.get_device(product=base_dir)
1135            device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1136
1137            device.sync('data')
1138
1139            # Confirm that every file on the device mirrors that on the host.
1140            for temp_file in temp_files:
1141                device_full_path = posixpath.join(self.DEVICE_TEMP_DIR,
1142                                                  temp_file.base_name)
1143                dev_md5, _ = device.shell(
1144                    [get_md5_prog(self.device), device_full_path])[0].split()
1145                self.assertEqual(temp_file.checksum, dev_md5)
1146
1147            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1148        finally:
1149            if base_dir is not None:
1150                shutil.rmtree(base_dir)
1151
1152    def test_unicode_paths(self):
1153        """Ensure that we can support non-ASCII paths, even on Windows."""
1154        name = u'로보카 폴리'
1155
1156        self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1157        remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
1158
1159        ## push.
1160        tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
1161        tf.close()
1162        self.device.push(tf.name, remote_path)
1163        os.remove(tf.name)
1164        self.assertFalse(os.path.exists(tf.name))
1165
1166        # Verify that the device ended up with the expected UTF-8 path
1167        output = self.device.shell(
1168                ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
1169        self.assertEqual(remote_path.encode('utf-8'), output)
1170
1171        # pull.
1172        self.device.pull(remote_path, tf.name)
1173        self.assertTrue(os.path.exists(tf.name))
1174        os.remove(tf.name)
1175        self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1176
1177
1178def main():
1179    random.seed(0)
1180    if len(adb.get_devices()) > 0:
1181        suite = unittest.TestLoader().loadTestsFromName(__name__)
1182        unittest.TextTestRunner(verbosity=3).run(suite)
1183    else:
1184        print('Test suite must be run with attached devices')
1185
1186
1187if __name__ == '__main__':
1188    main()
1189