1#!/usr/bin/env python 2# Copyright 2015 The Chromium Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6""" 7Unit tests for the contents of device_utils.py (mostly DeviceUtils). 8The test will invoke real devices 9""" 10 11import os 12import sys 13import tempfile 14import unittest 15 16if __name__ == '__main__': 17 sys.path.append( 18 os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', ))) 19 20from devil.android import device_utils 21from devil.android.sdk import adb_wrapper 22from devil.utils import cmd_helper 23 24_OLD_CONTENTS = "foo" 25_NEW_CONTENTS = "bar" 26_DEVICE_DIR = "/data/local/tmp/device_utils_test" 27_SUB_DIR = "sub" 28_SUB_DIR1 = "sub1" 29_SUB_DIR2 = "sub2" 30 31 32class DeviceUtilsPushDeleteFilesTest(unittest.TestCase): 33 34 def setUp(self): 35 devices = adb_wrapper.AdbWrapper.Devices() 36 assert devices, 'A device must be attached' 37 self.adb = devices[0] 38 self.adb.WaitForDevice() 39 self.device = device_utils.DeviceUtils( 40 self.adb, default_timeout=10, default_retries=0) 41 42 @staticmethod 43 def _MakeTempFile(contents): 44 """Make a temporary file with the given contents. 45 46 Args: 47 contents: string to write to the temporary file. 48 49 Returns: 50 the tuple contains the absolute path to the file and the file name 51 """ 52 fi, path = tempfile.mkstemp(text=True) 53 with os.fdopen(fi, 'w') as f: 54 f.write(contents) 55 file_name = os.path.basename(path) 56 return (path, file_name) 57 58 @staticmethod 59 def _MakeTempFileGivenDir(directory, contents): 60 """Make a temporary file under the given directory 61 with the given contents 62 63 Args: 64 directory: the temp directory to create the file 65 contents: string to write to the temp file 66 67 Returns: 68 the list contains the absolute path to the file and the file name 69 """ 70 fi, path = tempfile.mkstemp(dir=directory, text=True) 71 with os.fdopen(fi, 'w') as f: 72 f.write(contents) 73 file_name = os.path.basename(path) 74 return (path, file_name) 75 76 @staticmethod 77 def _ChangeTempFile(path, contents): 78 with os.open(path, 'w') as f: 79 f.write(contents) 80 81 @staticmethod 82 def _DeleteTempFile(path): 83 os.remove(path) 84 85 def testPushChangedFiles_noFileChange(self): 86 (host_file_path, file_name) = self._MakeTempFile(_OLD_CONTENTS) 87 device_file_path = "%s/%s" % (_DEVICE_DIR, file_name) 88 self.adb.Push(host_file_path, device_file_path) 89 self.device.PushChangedFiles([(host_file_path, device_file_path)]) 90 result = self.device.RunShellCommand(['cat', device_file_path], 91 single_line=True) 92 self.assertEqual(_OLD_CONTENTS, result) 93 94 cmd_helper.RunCmd(['rm', host_file_path]) 95 self.device.RunShellCommand(['rm', '-rf', _DEVICE_DIR]) 96 97 def testPushChangedFiles_singleFileChange(self): 98 (host_file_path, file_name) = self._MakeTempFile(_OLD_CONTENTS) 99 device_file_path = "%s/%s" % (_DEVICE_DIR, file_name) 100 self.adb.Push(host_file_path, device_file_path) 101 102 with open(host_file_path, 'w') as f: 103 f.write(_NEW_CONTENTS) 104 self.device.PushChangedFiles([(host_file_path, device_file_path)]) 105 result = self.device.RunShellCommand(['cat', device_file_path], 106 single_line=True) 107 self.assertEqual(_NEW_CONTENTS, result) 108 109 cmd_helper.RunCmd(['rm', host_file_path]) 110 self.device.RunShellCommand(['rm', '-rf', _DEVICE_DIR]) 111 112 def testDeleteFiles(self): 113 host_tmp_dir = tempfile.mkdtemp() 114 (host_file_path, file_name) = self._MakeTempFileGivenDir( 115 host_tmp_dir, _OLD_CONTENTS) 116 117 device_file_path = "%s/%s" % (_DEVICE_DIR, file_name) 118 self.adb.Push(host_file_path, device_file_path) 119 120 cmd_helper.RunCmd(['rm', host_file_path]) 121 self.device.PushChangedFiles([(host_tmp_dir, _DEVICE_DIR)], 122 delete_device_stale=True) 123 result = self.device.RunShellCommand(['ls', _DEVICE_DIR], single_line=True) 124 self.assertEqual('', result) 125 126 cmd_helper.RunCmd(['rm', '-rf', host_tmp_dir]) 127 self.device.RunShellCommand(['rm', '-rf', _DEVICE_DIR]) 128 129 def testPushAndDeleteFiles_noSubDir(self): 130 host_tmp_dir = tempfile.mkdtemp() 131 (host_file_path1, file_name1) = self._MakeTempFileGivenDir( 132 host_tmp_dir, _OLD_CONTENTS) 133 (host_file_path2, file_name2) = self._MakeTempFileGivenDir( 134 host_tmp_dir, _OLD_CONTENTS) 135 136 device_file_path1 = "%s/%s" % (_DEVICE_DIR, file_name1) 137 device_file_path2 = "%s/%s" % (_DEVICE_DIR, file_name2) 138 self.adb.Push(host_file_path1, device_file_path1) 139 self.adb.Push(host_file_path2, device_file_path2) 140 141 with open(host_file_path1, 'w') as f: 142 f.write(_NEW_CONTENTS) 143 cmd_helper.RunCmd(['rm', host_file_path2]) 144 145 self.device.PushChangedFiles([(host_tmp_dir, _DEVICE_DIR)], 146 delete_device_stale=True) 147 result = self.device.RunShellCommand(['cat', device_file_path1], 148 single_line=True) 149 self.assertEqual(_NEW_CONTENTS, result) 150 result = self.device.RunShellCommand(['ls', _DEVICE_DIR], single_line=True) 151 self.assertEqual(file_name1, result) 152 153 self.device.RunShellCommand(['rm', '-rf', _DEVICE_DIR]) 154 cmd_helper.RunCmd(['rm', '-rf', host_tmp_dir]) 155 156 def testPushAndDeleteFiles_SubDir(self): 157 host_tmp_dir = tempfile.mkdtemp() 158 host_sub_dir1 = "%s/%s" % (host_tmp_dir, _SUB_DIR1) 159 host_sub_dir2 = "%s/%s/%s" % (host_tmp_dir, _SUB_DIR, _SUB_DIR2) 160 cmd_helper.RunCmd(['mkdir', '-p', host_sub_dir1]) 161 cmd_helper.RunCmd(['mkdir', '-p', host_sub_dir2]) 162 163 (host_file_path1, file_name1) = self._MakeTempFileGivenDir( 164 host_tmp_dir, _OLD_CONTENTS) 165 (host_file_path2, file_name2) = self._MakeTempFileGivenDir( 166 host_tmp_dir, _OLD_CONTENTS) 167 (host_file_path3, file_name3) = self._MakeTempFileGivenDir( 168 host_sub_dir1, _OLD_CONTENTS) 169 (host_file_path4, file_name4) = self._MakeTempFileGivenDir( 170 host_sub_dir2, _OLD_CONTENTS) 171 172 device_file_path1 = "%s/%s" % (_DEVICE_DIR, file_name1) 173 device_file_path2 = "%s/%s" % (_DEVICE_DIR, file_name2) 174 device_file_path3 = "%s/%s/%s" % (_DEVICE_DIR, _SUB_DIR1, file_name3) 175 device_file_path4 = "%s/%s/%s/%s" % (_DEVICE_DIR, _SUB_DIR, 176 _SUB_DIR2, file_name4) 177 178 self.adb.Push(host_file_path1, device_file_path1) 179 self.adb.Push(host_file_path2, device_file_path2) 180 self.adb.Push(host_file_path3, device_file_path3) 181 self.adb.Push(host_file_path4, device_file_path4) 182 183 with open(host_file_path1, 'w') as f: 184 f.write(_NEW_CONTENTS) 185 cmd_helper.RunCmd(['rm', host_file_path2]) 186 cmd_helper.RunCmd(['rm', host_file_path4]) 187 188 self.device.PushChangedFiles([(host_tmp_dir, _DEVICE_DIR)], 189 delete_device_stale=True) 190 result = self.device.RunShellCommand(['cat', device_file_path1], 191 single_line=True) 192 self.assertEqual(_NEW_CONTENTS, result) 193 194 result = self.device.RunShellCommand(['ls', _DEVICE_DIR]) 195 self.assertIn(file_name1, result) 196 self.assertIn(_SUB_DIR1, result) 197 self.assertIn(_SUB_DIR, result) 198 self.assertEqual(3, len(result)) 199 200 result = self.device.RunShellCommand(['cat', device_file_path3], 201 single_line=True) 202 self.assertEqual(_OLD_CONTENTS, result) 203 204 result = self.device.RunShellCommand(["ls", "%s/%s/%s" 205 % (_DEVICE_DIR, _SUB_DIR, _SUB_DIR2)], 206 single_line=True) 207 self.assertEqual('', result) 208 209 self.device.RunShellCommand(['rm', '-rf', _DEVICE_DIR]) 210 cmd_helper.RunCmd(['rm', '-rf', host_tmp_dir]) 211 212 def testRestartAdbd(self): 213 old_adbd_pid = self.device.RunShellCommand( 214 ['ps', '|', 'grep', 'adbd'])[1].split()[1] 215 self.device.RestartAdbd() 216 new_adbd_pid = self.device.RunShellCommand( 217 ['ps', '|', 'grep', 'adbd'])[1].split()[1] 218 self.assertNotEqual(old_adbd_pid, new_adbd_pid) 219 220 221if __name__ == '__main__': 222 unittest.main() 223