1#!/usr/bin/python 2# 3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7"""Script to archive old Autotest results to Google Storage. 8 9Uses gsutil to archive files to the configured Google Storage bucket. 10Upon successful copy, the local results directory is deleted. 11""" 12 13import base64 14import datetime 15import errno 16import glob 17import gzip 18import logging 19import logging.handlers 20import os 21import re 22import shutil 23import signal 24import stat 25import subprocess 26import sys 27import tempfile 28import time 29 30from optparse import OptionParser 31 32import common 33from autotest_lib.client.common_lib import error 34from autotest_lib.client.common_lib import global_config 35from autotest_lib.client.common_lib import site_utils 36from autotest_lib.client.common_lib import utils 37from autotest_lib.site_utils import job_directories 38from autotest_lib.site_utils import pubsub_utils 39from autotest_lib.tko import models 40 41# Autotest requires the psutil module from site-packages, so it must be imported 42# after "import common". 43try: 44 # Does not exist, nor is needed, on moblab. 45 import psutil 46except ImportError: 47 psutil = None 48 49from chromite.lib import parallel 50try: 51 from chromite.lib import metrics 52 from chromite.lib import ts_mon_config 53except ImportError: 54 metrics = site_utils.metrics_mock 55 ts_mon_config = site_utils.metrics_mock 56 57 58GS_OFFLOADING_ENABLED = global_config.global_config.get_config_value( 59 'CROS', 'gs_offloading_enabled', type=bool, default=True) 60 61# Nice setting for process, the higher the number the lower the priority. 62NICENESS = 10 63 64# Maximum number of seconds to allow for offloading a single 65# directory. 66OFFLOAD_TIMEOUT_SECS = 60 * 60 67 68# Sleep time per loop. 69SLEEP_TIME_SECS = 5 70 71# Minimum number of seconds between e-mail reports. 72REPORT_INTERVAL_SECS = 60 * 60 73 74# Location of Autotest results on disk. 75RESULTS_DIR = '/usr/local/autotest/results' 76FAILED_OFFLOADS_FILE = os.path.join(RESULTS_DIR, 'FAILED_OFFLOADS') 77 78# Hosts sub-directory that contains cleanup, verify and repair jobs. 79HOSTS_SUB_DIR = 'hosts' 80 81LOG_LOCATION = '/usr/local/autotest/logs/' 82LOG_FILENAME_FORMAT = 'gs_offloader_%s_log_%s.txt' 83LOG_TIMESTAMP_FORMAT = '%Y%m%d_%H%M%S' 84LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' 85 86FAILED_OFFLOADS_FILE_HEADER = ''' 87This is the list of gs_offloader failed jobs. 88Last offloader attempt at %s failed to offload %d files. 89Check http://go/cros-triage-gsoffloader to triage the issue 90 91 92First failure Count Directory name 93=================== ====== ============================== 94''' 95# --+----1----+---- ----+ ----+----1----+----2----+----3 96 97FAILED_OFFLOADS_LINE_FORMAT = '%19s %5d %-1s\n' 98FAILED_OFFLOADS_TIME_FORMAT = '%Y-%m-%d %H:%M:%S' 99 100USE_RSYNC_ENABLED = global_config.global_config.get_config_value( 101 'CROS', 'gs_offloader_use_rsync', type=bool, default=False) 102 103# According to https://cloud.google.com/storage/docs/bucket-naming#objectnames 104INVALID_GS_CHARS = ['[', ']', '*', '?', '#'] 105INVALID_GS_CHAR_RANGE = [(0x00, 0x1F), (0x7F, 0x84), (0x86, 0xFF)] 106 107# Maximum number of files in the folder. 108MAX_FILE_COUNT = 500 109FOLDERS_NEVER_ZIP = ['debug', 'ssp_logs', 'autoupdate_logs'] 110LIMIT_FILE_COUNT = global_config.global_config.get_config_value( 111 'CROS', 'gs_offloader_limit_file_count', type=bool, default=False) 112 113# Use multiprocessing for gsutil uploading. 114GS_OFFLOADER_MULTIPROCESSING = global_config.global_config.get_config_value( 115 'CROS', 'gs_offloader_multiprocessing', type=bool, default=False) 116 117D = '[0-9][0-9]' 118TIMESTAMP_PATTERN = '%s%s.%s.%s_%s.%s.%s' % (D, D, D, D, D, D, D) 119CTS_RESULT_PATTERN = 'testResult.xml' 120CTS_V2_RESULT_PATTERN = 'test_result.xml' 121# Google Storage bucket URI to store results in. 122DEFAULT_CTS_RESULTS_GSURI = global_config.global_config.get_config_value( 123 'CROS', 'cts_results_server', default='') 124DEFAULT_CTS_APFE_GSURI = global_config.global_config.get_config_value( 125 'CROS', 'cts_apfe_server', default='') 126 127_PUBSUB_ENABLED = global_config.global_config.get_config_value( 128 'CROS', 'cloud_notification_enabled', type=bool, default=False) 129_PUBSUB_TOPIC = global_config.global_config.get_config_value( 130 'CROS', 'cloud_notification_topic', default=None) 131 132 133# Test upload pubsub notification attributes 134NOTIFICATION_ATTR_VERSION = 'version' 135NOTIFICATION_ATTR_GCS_URI = 'gcs_uri' 136NOTIFICATION_ATTR_MOBLAB_MAC = 'moblab_mac_address' 137NOTIFICATION_ATTR_MOBLAB_ID = 'moblab_id' 138NOTIFICATION_VERSION = '1' 139 140 141# the message data for new test result notification. 142NEW_TEST_RESULT_MESSAGE = 'NEW_TEST_RESULT' 143 144 145class TimeoutException(Exception): 146 """Exception raised by the timeout_handler.""" 147 pass 148 149 150def timeout_handler(_signum, _frame): 151 """Handler for SIGALRM when the offloading process times out. 152 153 @param _signum: Signal number of the signal that was just caught. 154 14 for SIGALRM. 155 @param _frame: Current stack frame. 156 157 @raise TimeoutException: Automatically raises so that the time out 158 is caught by the try/except surrounding the 159 Popen call. 160 """ 161 raise TimeoutException('Process Timed Out') 162 163 164def get_cmd_list(multiprocessing, dir_entry, gs_path): 165 """Return the command to offload a specified directory. 166 167 @param multiprocessing: True to turn on -m option for gsutil. 168 @param dir_entry: Directory entry/path that which we need a cmd_list 169 to offload. 170 @param gs_path: Location in google storage where we will 171 offload the directory. 172 173 @return A command list to be executed by Popen. 174 """ 175 cmd = ['gsutil'] 176 if multiprocessing: 177 cmd.append('-m') 178 if USE_RSYNC_ENABLED: 179 cmd.append('rsync') 180 target = os.path.join(gs_path, os.path.basename(dir_entry)) 181 else: 182 cmd.append('cp') 183 target = gs_path 184 cmd += ['-eR', dir_entry, target] 185 return cmd 186 187 188def get_directory_size_kibibytes_cmd_list(directory): 189 """Returns command to get a directory's total size.""" 190 # Having this in its own method makes it easier to mock in 191 # unittests. 192 return ['du', '-sk', directory] 193 194 195def get_directory_size_kibibytes(directory): 196 """Calculate the total size of a directory with all its contents. 197 198 @param directory: Path to the directory 199 200 @return Size of the directory in kibibytes. 201 """ 202 cmd = get_directory_size_kibibytes_cmd_list(directory) 203 process = subprocess.Popen(cmd, 204 stdout=subprocess.PIPE, 205 stderr=subprocess.PIPE) 206 stdout_data, stderr_data = process.communicate() 207 208 if process.returncode != 0: 209 # This function is used for statistics only, if it fails, 210 # nothing else should crash. 211 logging.warning('Getting size of %s failed. Stderr:', directory) 212 logging.warning(stderr_data) 213 return 0 214 215 return int(stdout_data.split('\t', 1)[0]) 216 217 218def get_sanitized_name(name): 219 """Get a string with all invalid characters in the name being replaced. 220 221 @param name: Name to be processed. 222 223 @return A string with all invalid characters in the name being 224 replaced. 225 """ 226 match_pattern = ''.join([re.escape(c) for c in INVALID_GS_CHARS]) 227 match_pattern += ''.join([r'\x%02x-\x%02x' % (r[0], r[1]) 228 for r in INVALID_GS_CHAR_RANGE]) 229 invalid = re.compile('[%s]' % match_pattern) 230 return invalid.sub(lambda x: '%%%02x' % ord(x.group(0)), name) 231 232 233def sanitize_dir(dir_entry): 234 """Replace all invalid characters in folder and file names with valid ones. 235 236 FIFOs are converted to regular files to prevent gsutil hangs (see crbug/684122). 237 Symlinks are converted to regular files that store the link destination 238 (crbug/692788). 239 240 @param dir_entry: Directory entry to be sanitized. 241 """ 242 if not os.path.exists(dir_entry): 243 return 244 renames = [] 245 fifos = [] 246 symlinks = [] 247 for root, dirs, files in os.walk(dir_entry): 248 sanitized_root = get_sanitized_name(root) 249 for name in dirs + files: 250 sanitized_name = get_sanitized_name(name) 251 sanitized_path = os.path.join(sanitized_root, sanitized_name) 252 if name != sanitized_name: 253 orig_path = os.path.join(sanitized_root, name) 254 renames.append((orig_path, sanitized_path)) 255 current_path = os.path.join(root, name) 256 file_stat = os.lstat(current_path) 257 if stat.S_ISFIFO(file_stat.st_mode): 258 # Replace fifos with markers 259 fifos.append(sanitized_path) 260 elif stat.S_ISLNK(file_stat.st_mode): 261 # Replace symlinks with markers 262 destination = os.readlink(current_path) 263 symlinks.append((sanitized_path, destination)) 264 for src, dest in renames: 265 logging.warning('Invalid character found. Renaming %s to %s.', src, 266 dest) 267 shutil.move(src, dest) 268 for fifo in fifos: 269 logging.debug('Removing fifo %s', fifo) 270 os.remove(fifo) 271 logging.debug('Creating marker %s', fifo) 272 with open(fifo, 'a') as marker: 273 marker.write('<FIFO>') 274 for link, destination in symlinks: 275 logging.debug('Removing symlink %s', link) 276 os.remove(link) 277 logging.debug('Creating marker %s', link) 278 with open(link, 'w') as marker: 279 marker.write('<symlink to %s>' % destination) 280 281 282def _get_zippable_folders(dir_entry): 283 folders_list = [] 284 for folder in os.listdir(dir_entry): 285 folder_path = os.path.join(dir_entry, folder) 286 if (not os.path.isfile(folder_path) and 287 not folder in FOLDERS_NEVER_ZIP): 288 folders_list.append(folder_path) 289 return folders_list 290 291 292def limit_file_count(dir_entry): 293 """Limit the number of files in given directory. 294 295 The method checks the total number of files in the given directory. 296 If the number is greater than MAX_FILE_COUNT, the method will 297 compress each folder in the given directory, except folders in 298 FOLDERS_NEVER_ZIP. 299 300 @param dir_entry: Directory entry to be checked. 301 """ 302 count = utils.run('find "%s" | wc -l' % dir_entry, 303 ignore_status=True).stdout.strip() 304 try: 305 count = int(count) 306 except (ValueError, TypeError): 307 logging.warning('Fail to get the file count in folder %s.', dir_entry) 308 return 309 if count < MAX_FILE_COUNT: 310 return 311 312 # For test job, zip folders in a second level, e.g. 123-debug/host1. 313 # This is to allow autoserv debug folder still be accessible. 314 # For special task, it does not need to dig one level deeper. 315 is_special_task = re.match(job_directories.SPECIAL_TASK_PATTERN, 316 dir_entry) 317 318 folders = _get_zippable_folders(dir_entry) 319 if not is_special_task: 320 subfolders = [] 321 for folder in folders: 322 subfolders.extend(_get_zippable_folders(folder)) 323 folders = subfolders 324 325 for folder in folders: 326 try: 327 zip_name = '%s.tgz' % folder 328 utils.run('tar -cz -C "%s" -f "%s" "%s"' % 329 (os.path.dirname(folder), zip_name, 330 os.path.basename(folder))) 331 except error.CmdError as e: 332 logging.error('Fail to compress folder %s. Error: %s', 333 folder, e) 334 continue 335 shutil.rmtree(folder) 336 337 338def correct_results_folder_permission(dir_entry): 339 """Make sure the results folder has the right permission settings. 340 341 For tests running with server-side packaging, the results folder has 342 the owner of root. This must be changed to the user running the 343 autoserv process, so parsing job can access the results folder. 344 345 @param dir_entry: Path to the results folder. 346 """ 347 if not dir_entry: 348 return 349 350 logging.info('Trying to correct file permission of %s.', dir_entry) 351 try: 352 subprocess.check_call( 353 ['sudo', '-n', 'chown', '-R', str(os.getuid()), dir_entry]) 354 subprocess.check_call( 355 ['sudo', '-n', 'chgrp', '-R', str(os.getgid()), dir_entry]) 356 except subprocess.CalledProcessError as e: 357 logging.error('Failed to modify permission for %s: %s', 358 dir_entry, e) 359 360 361def upload_testresult_files(dir_entry, multiprocessing): 362 """Upload test results to separate gs buckets. 363 364 Upload testResult.xml.gz/test_result.xml.gz file to cts_results_bucket. 365 Upload timestamp.zip to cts_apfe_bucket. 366 367 @param dir_entry: Path to the results folder. 368 @param multiprocessing: True to turn on -m option for gsutil. 369 """ 370 for host in glob.glob(os.path.join(dir_entry, '*')): 371 cts_path = os.path.join(host, 'cheets_CTS.*', 'results', '*', 372 TIMESTAMP_PATTERN) 373 cts_v2_path = os.path.join(host, 'cheets_CTS_*', 'results', '*', 374 TIMESTAMP_PATTERN) 375 gts_v2_path = os.path.join(host, 'cheets_GTS.*', 'results', '*', 376 TIMESTAMP_PATTERN) 377 for result_path, result_pattern in [(cts_path, CTS_RESULT_PATTERN), 378 (cts_v2_path, CTS_V2_RESULT_PATTERN), 379 (gts_v2_path, CTS_V2_RESULT_PATTERN)]: 380 for path in glob.glob(result_path): 381 try: 382 _upload_files(host, path, result_pattern, multiprocessing) 383 except Exception as e: 384 logging.error('ERROR uploading test results %s to GS: %s', 385 path, e) 386 387 388def _is_valid_result(build, result_pattern, suite): 389 """Check if the result should be uploaded to CTS/GTS buckets. 390 391 @param build: Builder name. 392 @param result_pattern: XML result file pattern. 393 @param suite: Test suite name. 394 395 @returns: Bool flag indicating whether a valid result. 396 """ 397 if build is None or suite is None: 398 return False 399 400 # Not valid if it's not a release build. 401 if not re.match(r'(?!trybot-).*-release/.*', build): 402 return False 403 404 # Not valid if it's cts result but not 'arc-cts*' or 'test_that_wrapper' 405 # suite. 406 whitelisted_suites = ['arc-cts', 'arc-cts-dev', 'arc-cts-beta', 407 'arc-cts-stable', 'arc-cts-perbuild', 'arc-gts', 408 'arc-gts-perbuild', 'test_that_wrapper'] 409 result_patterns = [CTS_RESULT_PATTERN, CTS_V2_RESULT_PATTERN] 410 if result_pattern in result_patterns and suite not in whitelisted_suites: 411 return False 412 413 return True 414 415 416def _upload_files(host, path, result_pattern, multiprocessing): 417 keyval = models.test.parse_job_keyval(host) 418 build = keyval.get('build') 419 suite = keyval.get('suite') 420 421 if not _is_valid_result(build, result_pattern, suite): 422 # No need to upload current folder, return. 423 return 424 425 parent_job_id = str(keyval['parent_job_id']) 426 427 folders = path.split(os.sep) 428 job_id = folders[-6] 429 package = folders[-4] 430 timestamp = folders[-1] 431 432 # Path: bucket/build/parent_job_id/cheets_CTS.*/job_id_timestamp/ 433 # or bucket/build/parent_job_id/cheets_GTS.*/job_id_timestamp/ 434 cts_apfe_gs_path = os.path.join( 435 DEFAULT_CTS_APFE_GSURI, build, parent_job_id, 436 package, job_id + '_' + timestamp) + '/' 437 438 # Path: bucket/cheets_CTS.*/job_id_timestamp/ 439 # or bucket/cheets_GTS.*/job_id_timestamp/ 440 test_result_gs_path = os.path.join( 441 DEFAULT_CTS_RESULTS_GSURI, package, 442 job_id + '_' + timestamp) + '/' 443 444 for zip_file in glob.glob(os.path.join('%s.zip' % path)): 445 utils.run(' '.join(get_cmd_list( 446 multiprocessing, zip_file, cts_apfe_gs_path))) 447 logging.debug('Upload %s to %s ', zip_file, cts_apfe_gs_path) 448 449 for test_result_file in glob.glob(os.path.join(path, result_pattern)): 450 # gzip test_result_file(testResult.xml/test_result.xml) 451 452 test_result_file_gz = '%s.gz' % test_result_file 453 with open(test_result_file, 'r') as f_in, ( 454 gzip.open(test_result_file_gz, 'w')) as f_out: 455 shutil.copyfileobj(f_in, f_out) 456 utils.run(' '.join(get_cmd_list( 457 multiprocessing, test_result_file_gz, test_result_gs_path))) 458 logging.debug('Zip and upload %s to %s', 459 test_result_file_gz, test_result_gs_path) 460 # Remove test_result_file_gz(testResult.xml.gz/test_result.xml.gz) 461 os.remove(test_result_file_gz) 462 463 464def _create_test_result_notification(gs_path, dir_entry): 465 """Construct a test result notification. 466 467 @param gs_path: The test result Google Cloud Storage URI. 468 @param dir_entry: The local offload directory name. 469 470 @returns The notification message. 471 """ 472 gcs_uri = os.path.join(gs_path, os.path.basename(dir_entry)) 473 logging.info('Notification on gcs_uri %s', gcs_uri) 474 data = base64.b64encode(NEW_TEST_RESULT_MESSAGE) 475 msg_payload = {'data': data} 476 msg_attributes = {} 477 msg_attributes[NOTIFICATION_ATTR_GCS_URI] = gcs_uri 478 msg_attributes[NOTIFICATION_ATTR_VERSION] = NOTIFICATION_VERSION 479 msg_attributes[NOTIFICATION_ATTR_MOBLAB_MAC] = \ 480 site_utils.get_default_interface_mac_address() 481 msg_attributes[NOTIFICATION_ATTR_MOBLAB_ID] = site_utils.get_moblab_id() 482 msg_payload['attributes'] = msg_attributes 483 484 return msg_payload 485 486 487def get_offload_dir_func(gs_uri, multiprocessing, delete_age, pubsub_topic=None): 488 """Returns the offload directory function for the given gs_uri 489 490 @param gs_uri: Google storage bucket uri to offload to. 491 @param multiprocessing: True to turn on -m option for gsutil. 492 @param pubsub_topic: The pubsub topic to publish notificaitons. If None, 493 pubsub is not enabled. 494 495 @return offload_dir function to perform the offload. 496 """ 497 @metrics.SecondsTimerDecorator( 498 'chromeos/autotest/gs_offloader/job_offload_duration') 499 def offload_dir(dir_entry, dest_path, job_complete_time): 500 """Offload the specified directory entry to Google storage. 501 502 @param dir_entry: Directory entry to offload. 503 @param dest_path: Location in google storage where we will 504 offload the directory. 505 @param job_complete_time: The complete time of the job from the AFE 506 database. 507 508 """ 509 error = False 510 stdout_file = None 511 stderr_file = None 512 try: 513 upload_signal_filename = '%s/%s/.GS_UPLOADED' % ( 514 RESULTS_DIR, dir_entry) 515 if not os.path.isfile(upload_signal_filename): 516 sanitize_dir(dir_entry) 517 if DEFAULT_CTS_RESULTS_GSURI: 518 upload_testresult_files(dir_entry, multiprocessing) 519 520 if LIMIT_FILE_COUNT: 521 limit_file_count(dir_entry) 522 523 stdout_file = tempfile.TemporaryFile('w+') 524 stderr_file = tempfile.TemporaryFile('w+') 525 process = None 526 signal.alarm(OFFLOAD_TIMEOUT_SECS) 527 gs_path = '%s%s' % (gs_uri, dest_path) 528 process = subprocess.Popen( 529 get_cmd_list(multiprocessing, dir_entry, gs_path), 530 stdout=stdout_file, stderr=stderr_file) 531 process.wait() 532 signal.alarm(0) 533 534 if process.returncode == 0: 535 dir_size = get_directory_size_kibibytes(dir_entry) 536 537 m_offload_count = ( 538 'chromeos/autotest/gs_offloader/jobs_offloaded') 539 metrics.Counter(m_offload_count).increment() 540 m_offload_size = ('chromeos/autotest/gs_offloader/' 541 'kilobytes_transferred') 542 metrics.Counter(m_offload_size).increment_by(dir_size) 543 544 if pubsub_topic: 545 message = _create_test_result_notification( 546 gs_path, dir_entry) 547 msg_ids = pubsub_utils.publish_notifications( 548 pubsub_topic, [message]) 549 if not msg_ids: 550 error = True 551 552 if not error: 553 open(upload_signal_filename, 'a').close() 554 else: 555 error = True 556 if os.path.isfile(upload_signal_filename): 557 if job_directories.is_job_expired(delete_age, job_complete_time): 558 shutil.rmtree(dir_entry) 559 560 except TimeoutException: 561 m_timeout = 'chromeos/autotest/errors/gs_offloader/timed_out_count' 562 metrics.Counter(m_timeout).increment() 563 # If we finished the call to Popen(), we may need to 564 # terminate the child process. We don't bother calling 565 # process.poll(); that inherently races because the child 566 # can die any time it wants. 567 if process: 568 try: 569 process.terminate() 570 except OSError: 571 # We don't expect any error other than "No such 572 # process". 573 pass 574 logging.error('Offloading %s timed out after waiting %d ' 575 'seconds.', dir_entry, OFFLOAD_TIMEOUT_SECS) 576 error = True 577 except OSError as e: 578 # The wrong file permission can lead call 579 # `shutil.rmtree(dir_entry)` to raise OSError with message 580 # 'Permission denied'. Details can be found in 581 # crbug.com/536151 582 if e.errno == errno.EACCES: 583 correct_results_folder_permission(dir_entry) 584 m_permission_error = ('chromeos/autotest/errors/gs_offloader/' 585 'wrong_permissions_count') 586 metrics.Counter(m_permission_error).increment() 587 finally: 588 signal.alarm(0) 589 if error: 590 # Rewind the log files for stdout and stderr and log 591 # their contents. 592 stdout_file.seek(0) 593 stderr_file.seek(0) 594 stderr_content = stderr_file.read() 595 logging.warning('Error occurred when offloading %s:', dir_entry) 596 logging.warning('Stdout:\n%s \nStderr:\n%s', stdout_file.read(), 597 stderr_content) 598 # Some result files may have wrong file permission. Try 599 # to correct such error so later try can success. 600 # TODO(dshi): The code is added to correct result files 601 # with wrong file permission caused by bug 511778. After 602 # this code is pushed to lab and run for a while to 603 # clean up these files, following code and function 604 # correct_results_folder_permission can be deleted. 605 if 'CommandException: Error opening file' in stderr_content: 606 correct_results_folder_permission(dir_entry) 607 if stdout_file: 608 stdout_file.close() 609 if stderr_file: 610 stderr_file.close() 611 return offload_dir 612 613 614def delete_files(dir_entry, dest_path, job_complete_time): 615 """Simply deletes the dir_entry from the filesystem. 616 617 Uses same arguments as offload_dir so that it can be used in replace 618 of it on systems that only want to delete files instead of 619 offloading them. 620 621 @param dir_entry: Directory entry to offload. 622 @param dest_path: NOT USED. 623 @param job_complete_time: NOT USED. 624 """ 625 shutil.rmtree(dir_entry) 626 627 628def _format_job_for_failure_reporting(job): 629 """Formats a _JobDirectory for reporting / logging. 630 631 @param job: The _JobDirectory to format. 632 """ 633 d = datetime.datetime.fromtimestamp(job.get_failure_time()) 634 data = (d.strftime(FAILED_OFFLOADS_TIME_FORMAT), 635 job.get_failure_count(), 636 job.get_job_directory()) 637 return FAILED_OFFLOADS_LINE_FORMAT % data 638 639 640def wait_for_gs_write_access(gs_uri): 641 """Verify and wait until we have write access to Google Storage. 642 643 @param gs_uri: The Google Storage URI we are trying to offload to. 644 """ 645 # TODO (sbasi) Try to use the gsutil command to check write access. 646 # Ensure we have write access to gs_uri. 647 dummy_file = tempfile.NamedTemporaryFile() 648 test_cmd = get_cmd_list(False, dummy_file.name, gs_uri) 649 while True: 650 try: 651 subprocess.check_call(test_cmd) 652 subprocess.check_call( 653 ['gsutil', 'rm', 654 os.path.join(gs_uri, 655 os.path.basename(dummy_file.name))]) 656 break 657 except subprocess.CalledProcessError: 658 logging.debug('Unable to offload to %s, sleeping.', gs_uri) 659 time.sleep(120) 660 661 662class Offloader(object): 663 """State of the offload process. 664 665 Contains the following member fields: 666 * _offload_func: Function to call for each attempt to offload 667 a job directory. 668 * _jobdir_classes: List of classes of job directory to be 669 offloaded. 670 * _processes: Maximum number of outstanding offload processes 671 to allow during an offload cycle. 672 * _age_limit: Minimum age in days at which a job may be 673 offloaded. 674 * _open_jobs: a dictionary mapping directory paths to Job 675 objects. 676 """ 677 678 def __init__(self, options): 679 self._pubsub_topic = None 680 self._upload_age_limit = options.age_to_upload 681 self._delete_age_limit = options.age_to_delete 682 if options.delete_only: 683 self._offload_func = delete_files 684 else: 685 self.gs_uri = utils.get_offload_gsuri() 686 logging.debug('Offloading to: %s', self.gs_uri) 687 multiprocessing = False 688 if options.multiprocessing: 689 multiprocessing = True 690 elif options.multiprocessing is None: 691 multiprocessing = GS_OFFLOADER_MULTIPROCESSING 692 logging.info( 693 'Offloader multiprocessing is set to:%r', multiprocessing) 694 if options.pubsub_topic_for_job_upload: 695 self._pubsub_topic = options.pubsub_topic_for_job_upload 696 elif _PUBSUB_ENABLED: 697 self._pubsub_topic = _PUBSUB_TOPIC 698 self._offload_func = get_offload_dir_func( 699 self.gs_uri, multiprocessing, self._delete_age_limit, 700 self._pubsub_topic) 701 classlist = [] 702 if options.process_hosts_only or options.process_all: 703 classlist.append(job_directories.SpecialJobDirectory) 704 if not options.process_hosts_only: 705 classlist.append(job_directories.RegularJobDirectory) 706 self._jobdir_classes = classlist 707 assert self._jobdir_classes 708 self._processes = options.parallelism 709 self._open_jobs = {} 710 self._pusub_topic = None 711 712 713 def _add_new_jobs(self): 714 """Find new job directories that need offloading. 715 716 Go through the file system looking for valid job directories 717 that are currently not in `self._open_jobs`, and add them in. 718 719 """ 720 new_job_count = 0 721 for cls in self._jobdir_classes: 722 for resultsdir in cls.get_job_directories(): 723 if resultsdir in self._open_jobs: 724 continue 725 self._open_jobs[resultsdir] = cls(resultsdir) 726 new_job_count += 1 727 logging.debug('Start of offload cycle - found %d new jobs', 728 new_job_count) 729 730 731 def _remove_offloaded_jobs(self): 732 """Removed offloaded jobs from `self._open_jobs`.""" 733 removed_job_count = 0 734 for jobkey, job in self._open_jobs.items(): 735 if job.is_offloaded(): 736 del self._open_jobs[jobkey] 737 removed_job_count += 1 738 logging.debug('End of offload cycle - cleared %d new jobs, ' 739 'carrying %d open jobs', 740 removed_job_count, len(self._open_jobs)) 741 742 743 def _update_offload_results(self): 744 """Check and report status after attempting offload. 745 746 This function processes all jobs in `self._open_jobs`, assuming 747 an attempt has just been made to offload all of them. 748 749 Any jobs that have been successfully offloaded are removed. 750 751 If any jobs have reportable errors, and we haven't generated 752 an e-mail report in the last `REPORT_INTERVAL_SECS` seconds, 753 send new e-mail describing the failures. 754 755 """ 756 self._remove_offloaded_jobs() 757 failed_jobs = [j for j in self._open_jobs.values() if 758 j.get_failure_time()] 759 self._report_failed_jobs_count(failed_jobs) 760 self._log_failed_jobs_locally(failed_jobs) 761 762 763 def offload_once(self): 764 """Perform one offload cycle. 765 766 Find all job directories for new jobs that we haven't seen 767 before. Then, attempt to offload the directories for any 768 jobs that have finished running. Offload of multiple jobs 769 is done in parallel, up to `self._processes` at a time. 770 771 After we've tried uploading all directories, go through the list 772 checking the status of all uploaded directories. If necessary, 773 report failures via e-mail. 774 775 """ 776 self._add_new_jobs() 777 self._report_current_jobs_count() 778 with parallel.BackgroundTaskRunner( 779 self._offload_func, processes=self._processes) as queue: 780 for job in self._open_jobs.values(): 781 job.enqueue_offload(queue, self._upload_age_limit) 782 self._update_offload_results() 783 784 785 def _log_failed_jobs_locally(self, failed_jobs, 786 log_file=FAILED_OFFLOADS_FILE): 787 """Updates a local file listing all the failed jobs. 788 789 The dropped file can be used by the developers to list jobs that we have 790 failed to upload. 791 792 @param failed_jobs: A list of failed _JobDirectory objects. 793 @param log_file: The file to log the failed jobs to. 794 """ 795 now = datetime.datetime.now() 796 now_str = now.strftime(FAILED_OFFLOADS_TIME_FORMAT) 797 formatted_jobs = [_format_job_for_failure_reporting(job) 798 for job in failed_jobs] 799 formatted_jobs.sort() 800 801 with open(log_file, 'w') as logfile: 802 logfile.write(FAILED_OFFLOADS_FILE_HEADER % 803 (now_str, len(failed_jobs))) 804 logfile.writelines(formatted_jobs) 805 806 807 def _report_current_jobs_count(self): 808 """Report the number of outstanding jobs to monarch.""" 809 metrics.Gauge('chromeos/autotest/gs_offloader/current_jobs_count').set( 810 len(self._open_jobs)) 811 812 813 def _report_failed_jobs_count(self, failed_jobs): 814 """Report the number of outstanding failed offload jobs to monarch. 815 816 @param: List of failed jobs. 817 """ 818 metrics.Gauge('chromeos/autotest/gs_offloader/failed_jobs_count').set( 819 len(failed_jobs)) 820 821 822def parse_options(): 823 """Parse the args passed into gs_offloader.""" 824 defaults = 'Defaults:\n Destination: %s\n Results Path: %s' % ( 825 utils.DEFAULT_OFFLOAD_GSURI, RESULTS_DIR) 826 usage = 'usage: %prog [options]\n' + defaults 827 parser = OptionParser(usage) 828 parser.add_option('-a', '--all', dest='process_all', 829 action='store_true', 830 help='Offload all files in the results directory.') 831 parser.add_option('-s', '--hosts', dest='process_hosts_only', 832 action='store_true', 833 help='Offload only the special tasks result files ' 834 'located in the results/hosts subdirectory') 835 parser.add_option('-p', '--parallelism', dest='parallelism', 836 type='int', default=1, 837 help='Number of parallel workers to use.') 838 parser.add_option('-o', '--delete_only', dest='delete_only', 839 action='store_true', 840 help='GS Offloader will only the delete the ' 841 'directories and will not offload them to google ' 842 'storage. NOTE: If global_config variable ' 843 'CROS.gs_offloading_enabled is False, --delete_only ' 844 'is automatically True.', 845 default=not GS_OFFLOADING_ENABLED) 846 parser.add_option('-d', '--days_old', dest='days_old', 847 help='Minimum job age in days before a result can be ' 848 'offloaded.', type='int', default=0) 849 parser.add_option('-t', '--pubsub_topic_for_job_upload', 850 dest='pubsub_topic_for_job_upload', 851 help='The pubsub topic to send notifciations for ' 852 'new job upload', 853 action='store', type='string', default=None) 854 parser.add_option('-l', '--log_size', dest='log_size', 855 help='Limit the offloader logs to a specified ' 856 'number of Mega Bytes.', type='int', default=0) 857 parser.add_option('-m', dest='multiprocessing', action='store_true', 858 help='Turn on -m option for gsutil. If not set, the ' 859 'global config setting gs_offloader_multiprocessing ' 860 'under CROS section is applied.') 861 parser.add_option('-i', '--offload_once', dest='offload_once', 862 action='store_true', 863 help='Upload all available results and then exit.') 864 parser.add_option('-y', '--normal_priority', dest='normal_priority', 865 action='store_true', 866 help='Upload using normal process priority.') 867 parser.add_option('-u', '--age_to_upload', dest='age_to_upload', 868 help='Minimum job age in days before a result can be ' 869 'offloaded, but not removed from local storage', 870 type='int', default=None) 871 parser.add_option('-n', '--age_to_delete', dest='age_to_delete', 872 help='Minimum job age in days before a result can be ' 873 'removed from local storage', 874 type='int', default=None) 875 876 options = parser.parse_args()[0] 877 if options.process_all and options.process_hosts_only: 878 parser.print_help() 879 print ('Cannot process all files and only the hosts ' 880 'subdirectory. Please remove an argument.') 881 sys.exit(1) 882 883 if options.days_old and (options.age_to_upload or options.age_to_delete): 884 parser.print_help() 885 print('Use the days_old option or the age_to_* options but not both') 886 sys.exit(1) 887 888 if options.age_to_upload == None: 889 options.age_to_upload = options.days_old 890 if options.age_to_delete == None: 891 options.age_to_delete = options.days_old 892 893 return options 894 895 896def main(): 897 """Main method of gs_offloader.""" 898 options = parse_options() 899 900 if options.process_all: 901 offloader_type = 'all' 902 elif options.process_hosts_only: 903 offloader_type = 'hosts' 904 else: 905 offloader_type = 'jobs' 906 907 log_timestamp = time.strftime(LOG_TIMESTAMP_FORMAT) 908 if options.log_size > 0: 909 log_timestamp = '' 910 log_basename = LOG_FILENAME_FORMAT % (offloader_type, log_timestamp) 911 log_filename = os.path.join(LOG_LOCATION, log_basename) 912 log_formatter = logging.Formatter(LOGGING_FORMAT) 913 # Replace the default logging handler with a RotatingFileHandler. If 914 # options.log_size is 0, the file size will not be limited. Keeps 915 # one backup just in case. 916 handler = logging.handlers.RotatingFileHandler( 917 log_filename, maxBytes=1024 * options.log_size, backupCount=1) 918 handler.setFormatter(log_formatter) 919 logger = logging.getLogger() 920 logger.setLevel(logging.DEBUG) 921 logger.addHandler(handler) 922 923 # Nice our process (carried to subprocesses) so we don't overload 924 # the system. 925 if not options.normal_priority: 926 logging.debug('Set process to nice value: %d', NICENESS) 927 os.nice(NICENESS) 928 if psutil: 929 proc = psutil.Process() 930 logging.debug('Set process to ionice IDLE') 931 proc.ionice(psutil.IOPRIO_CLASS_IDLE) 932 933 # os.listdir returns relative paths, so change to where we need to 934 # be to avoid an os.path.join on each loop. 935 logging.debug('Offloading Autotest results in %s', RESULTS_DIR) 936 os.chdir(RESULTS_DIR) 937 938 signal.signal(signal.SIGALRM, timeout_handler) 939 940 with ts_mon_config.SetupTsMonGlobalState('gs_offloader', indirect=True, 941 short_lived=False): 942 offloader = Offloader(options) 943 if not options.delete_only: 944 wait_for_gs_write_access(offloader.gs_uri) 945 while True: 946 offloader.offload_once() 947 if options.offload_once: 948 break 949 time.sleep(SLEEP_TIME_SECS) 950 951 952if __name__ == '__main__': 953 main() 954