1#!/usr/bin/python
2#
3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Script to archive old Autotest results to Google Storage.
8
9Uses gsutil to archive files to the configured Google Storage bucket.
10Upon successful copy, the local results directory is deleted.
11"""
12
13import base64
14import datetime
15import errno
16import glob
17import gzip
18import logging
19import logging.handlers
20import os
21import re
22import shutil
23import signal
24import stat
25import subprocess
26import sys
27import tempfile
28import time
29
30from optparse import OptionParser
31
32import common
33from autotest_lib.client.common_lib import error
34from autotest_lib.client.common_lib import global_config
35from autotest_lib.client.common_lib import site_utils
36from autotest_lib.client.common_lib import utils
37from autotest_lib.site_utils import job_directories
38from autotest_lib.site_utils import pubsub_utils
39from autotest_lib.tko import models
40
41# Autotest requires the psutil module from site-packages, so it must be imported
42# after "import common".
43try:
44    # Does not exist, nor is needed, on moblab.
45    import psutil
46except ImportError:
47    psutil = None
48
49from chromite.lib import parallel
50try:
51    from chromite.lib import metrics
52    from chromite.lib import ts_mon_config
53except ImportError:
54    metrics = site_utils.metrics_mock
55    ts_mon_config = site_utils.metrics_mock
56
57
58GS_OFFLOADING_ENABLED = global_config.global_config.get_config_value(
59        'CROS', 'gs_offloading_enabled', type=bool, default=True)
60
61# Nice setting for process, the higher the number the lower the priority.
62NICENESS = 10
63
64# Maximum number of seconds to allow for offloading a single
65# directory.
66OFFLOAD_TIMEOUT_SECS = 60 * 60
67
68# Sleep time per loop.
69SLEEP_TIME_SECS = 5
70
71# Minimum number of seconds between e-mail reports.
72REPORT_INTERVAL_SECS = 60 * 60
73
74# Location of Autotest results on disk.
75RESULTS_DIR = '/usr/local/autotest/results'
76FAILED_OFFLOADS_FILE = os.path.join(RESULTS_DIR, 'FAILED_OFFLOADS')
77
78# Hosts sub-directory that contains cleanup, verify and repair jobs.
79HOSTS_SUB_DIR = 'hosts'
80
81LOG_LOCATION = '/usr/local/autotest/logs/'
82LOG_FILENAME_FORMAT = 'gs_offloader_%s_log_%s.txt'
83LOG_TIMESTAMP_FORMAT = '%Y%m%d_%H%M%S'
84LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
85
86FAILED_OFFLOADS_FILE_HEADER = '''
87This is the list of gs_offloader failed jobs.
88Last offloader attempt at %s failed to offload %d files.
89Check http://go/cros-triage-gsoffloader to triage the issue
90
91
92First failure       Count   Directory name
93=================== ======  ==============================
94'''
95# --+----1----+----  ----+  ----+----1----+----2----+----3
96
97FAILED_OFFLOADS_LINE_FORMAT = '%19s  %5d  %-1s\n'
98FAILED_OFFLOADS_TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
99
100USE_RSYNC_ENABLED = global_config.global_config.get_config_value(
101        'CROS', 'gs_offloader_use_rsync', type=bool, default=False)
102
103# According to https://cloud.google.com/storage/docs/bucket-naming#objectnames
104INVALID_GS_CHARS = ['[', ']', '*', '?', '#']
105INVALID_GS_CHAR_RANGE = [(0x00, 0x1F), (0x7F, 0x84), (0x86, 0xFF)]
106
107# Maximum number of files in the folder.
108MAX_FILE_COUNT = 500
109FOLDERS_NEVER_ZIP = ['debug', 'ssp_logs', 'autoupdate_logs']
110LIMIT_FILE_COUNT = global_config.global_config.get_config_value(
111        'CROS', 'gs_offloader_limit_file_count', type=bool, default=False)
112
113# Use multiprocessing for gsutil uploading.
114GS_OFFLOADER_MULTIPROCESSING = global_config.global_config.get_config_value(
115        'CROS', 'gs_offloader_multiprocessing', type=bool, default=False)
116
117D = '[0-9][0-9]'
118TIMESTAMP_PATTERN = '%s%s.%s.%s_%s.%s.%s' % (D, D, D, D, D, D, D)
119CTS_RESULT_PATTERN = 'testResult.xml'
120CTS_V2_RESULT_PATTERN = 'test_result.xml'
121# Google Storage bucket URI to store results in.
122DEFAULT_CTS_RESULTS_GSURI = global_config.global_config.get_config_value(
123        'CROS', 'cts_results_server', default='')
124DEFAULT_CTS_APFE_GSURI = global_config.global_config.get_config_value(
125        'CROS', 'cts_apfe_server', default='')
126
127_PUBSUB_ENABLED = global_config.global_config.get_config_value(
128        'CROS', 'cloud_notification_enabled', type=bool, default=False)
129_PUBSUB_TOPIC = global_config.global_config.get_config_value(
130        'CROS', 'cloud_notification_topic', default=None)
131
132
133# Test upload pubsub notification attributes
134NOTIFICATION_ATTR_VERSION = 'version'
135NOTIFICATION_ATTR_GCS_URI = 'gcs_uri'
136NOTIFICATION_ATTR_MOBLAB_MAC = 'moblab_mac_address'
137NOTIFICATION_ATTR_MOBLAB_ID = 'moblab_id'
138NOTIFICATION_VERSION = '1'
139
140
141# the message data for new test result notification.
142NEW_TEST_RESULT_MESSAGE = 'NEW_TEST_RESULT'
143
144
145class TimeoutException(Exception):
146    """Exception raised by the timeout_handler."""
147    pass
148
149
150def timeout_handler(_signum, _frame):
151    """Handler for SIGALRM when the offloading process times out.
152
153    @param _signum: Signal number of the signal that was just caught.
154                    14 for SIGALRM.
155    @param _frame: Current stack frame.
156
157    @raise TimeoutException: Automatically raises so that the time out
158                             is caught by the try/except surrounding the
159                             Popen call.
160    """
161    raise TimeoutException('Process Timed Out')
162
163
164def get_cmd_list(multiprocessing, dir_entry, gs_path):
165    """Return the command to offload a specified directory.
166
167    @param multiprocessing: True to turn on -m option for gsutil.
168    @param dir_entry: Directory entry/path that which we need a cmd_list
169                      to offload.
170    @param gs_path: Location in google storage where we will
171                    offload the directory.
172
173    @return A command list to be executed by Popen.
174    """
175    cmd = ['gsutil']
176    if multiprocessing:
177        cmd.append('-m')
178    if USE_RSYNC_ENABLED:
179        cmd.append('rsync')
180        target = os.path.join(gs_path, os.path.basename(dir_entry))
181    else:
182        cmd.append('cp')
183        target = gs_path
184    cmd += ['-eR', dir_entry, target]
185    return cmd
186
187
188def get_directory_size_kibibytes_cmd_list(directory):
189    """Returns command to get a directory's total size."""
190    # Having this in its own method makes it easier to mock in
191    # unittests.
192    return ['du', '-sk', directory]
193
194
195def get_directory_size_kibibytes(directory):
196    """Calculate the total size of a directory with all its contents.
197
198    @param directory: Path to the directory
199
200    @return Size of the directory in kibibytes.
201    """
202    cmd = get_directory_size_kibibytes_cmd_list(directory)
203    process = subprocess.Popen(cmd,
204                               stdout=subprocess.PIPE,
205                               stderr=subprocess.PIPE)
206    stdout_data, stderr_data = process.communicate()
207
208    if process.returncode != 0:
209        # This function is used for statistics only, if it fails,
210        # nothing else should crash.
211        logging.warning('Getting size of %s failed. Stderr:', directory)
212        logging.warning(stderr_data)
213        return 0
214
215    return int(stdout_data.split('\t', 1)[0])
216
217
218def get_sanitized_name(name):
219    """Get a string with all invalid characters in the name being replaced.
220
221    @param name: Name to be processed.
222
223    @return A string with all invalid characters in the name being
224             replaced.
225    """
226    match_pattern = ''.join([re.escape(c) for c in INVALID_GS_CHARS])
227    match_pattern += ''.join([r'\x%02x-\x%02x' % (r[0], r[1])
228                              for r in INVALID_GS_CHAR_RANGE])
229    invalid = re.compile('[%s]' % match_pattern)
230    return invalid.sub(lambda x: '%%%02x' % ord(x.group(0)), name)
231
232
233def sanitize_dir(dir_entry):
234    """Replace all invalid characters in folder and file names with valid ones.
235
236    FIFOs are converted to regular files to prevent gsutil hangs (see crbug/684122).
237    Symlinks are converted to regular files that store the link destination
238    (crbug/692788).
239
240    @param dir_entry: Directory entry to be sanitized.
241    """
242    if not os.path.exists(dir_entry):
243        return
244    renames = []
245    fifos = []
246    symlinks = []
247    for root, dirs, files in os.walk(dir_entry):
248        sanitized_root = get_sanitized_name(root)
249        for name in dirs + files:
250            sanitized_name = get_sanitized_name(name)
251            sanitized_path = os.path.join(sanitized_root, sanitized_name)
252            if name != sanitized_name:
253                orig_path = os.path.join(sanitized_root, name)
254                renames.append((orig_path, sanitized_path))
255            current_path = os.path.join(root, name)
256            file_stat = os.lstat(current_path)
257            if stat.S_ISFIFO(file_stat.st_mode):
258                # Replace fifos with markers
259                fifos.append(sanitized_path)
260            elif stat.S_ISLNK(file_stat.st_mode):
261                # Replace symlinks with markers
262                destination = os.readlink(current_path)
263                symlinks.append((sanitized_path, destination))
264    for src, dest in renames:
265        logging.warning('Invalid character found. Renaming %s to %s.', src,
266                        dest)
267        shutil.move(src, dest)
268    for fifo in fifos:
269        logging.debug('Removing fifo %s', fifo)
270        os.remove(fifo)
271        logging.debug('Creating marker %s', fifo)
272        with open(fifo, 'a') as marker:
273            marker.write('<FIFO>')
274    for link, destination in symlinks:
275        logging.debug('Removing symlink %s', link)
276        os.remove(link)
277        logging.debug('Creating marker %s', link)
278        with open(link, 'w') as marker:
279            marker.write('<symlink to %s>' % destination)
280
281
282def _get_zippable_folders(dir_entry):
283    folders_list = []
284    for folder in os.listdir(dir_entry):
285        folder_path = os.path.join(dir_entry, folder)
286        if (not os.path.isfile(folder_path) and
287                not folder in FOLDERS_NEVER_ZIP):
288            folders_list.append(folder_path)
289    return folders_list
290
291
292def limit_file_count(dir_entry):
293    """Limit the number of files in given directory.
294
295    The method checks the total number of files in the given directory.
296    If the number is greater than MAX_FILE_COUNT, the method will
297    compress each folder in the given directory, except folders in
298    FOLDERS_NEVER_ZIP.
299
300    @param dir_entry: Directory entry to be checked.
301    """
302    count = utils.run('find "%s" | wc -l' % dir_entry,
303                      ignore_status=True).stdout.strip()
304    try:
305        count = int(count)
306    except (ValueError, TypeError):
307        logging.warning('Fail to get the file count in folder %s.', dir_entry)
308        return
309    if count < MAX_FILE_COUNT:
310        return
311
312    # For test job, zip folders in a second level, e.g. 123-debug/host1.
313    # This is to allow autoserv debug folder still be accessible.
314    # For special task, it does not need to dig one level deeper.
315    is_special_task = re.match(job_directories.SPECIAL_TASK_PATTERN,
316                               dir_entry)
317
318    folders = _get_zippable_folders(dir_entry)
319    if not is_special_task:
320        subfolders = []
321        for folder in folders:
322            subfolders.extend(_get_zippable_folders(folder))
323        folders = subfolders
324
325    for folder in folders:
326        try:
327            zip_name = '%s.tgz' % folder
328            utils.run('tar -cz -C "%s" -f "%s" "%s"' %
329                      (os.path.dirname(folder), zip_name,
330                       os.path.basename(folder)))
331        except error.CmdError as e:
332            logging.error('Fail to compress folder %s. Error: %s',
333                          folder, e)
334            continue
335        shutil.rmtree(folder)
336
337
338def correct_results_folder_permission(dir_entry):
339    """Make sure the results folder has the right permission settings.
340
341    For tests running with server-side packaging, the results folder has
342    the owner of root. This must be changed to the user running the
343    autoserv process, so parsing job can access the results folder.
344
345    @param dir_entry: Path to the results folder.
346    """
347    if not dir_entry:
348        return
349
350    logging.info('Trying to correct file permission of %s.', dir_entry)
351    try:
352        subprocess.check_call(
353                ['sudo', '-n', 'chown', '-R', str(os.getuid()), dir_entry])
354        subprocess.check_call(
355                ['sudo', '-n', 'chgrp', '-R', str(os.getgid()), dir_entry])
356    except subprocess.CalledProcessError as e:
357        logging.error('Failed to modify permission for %s: %s',
358                      dir_entry, e)
359
360
361def upload_testresult_files(dir_entry, multiprocessing):
362    """Upload test results to separate gs buckets.
363
364    Upload testResult.xml.gz/test_result.xml.gz file to cts_results_bucket.
365    Upload timestamp.zip to cts_apfe_bucket.
366
367    @param dir_entry: Path to the results folder.
368    @param multiprocessing: True to turn on -m option for gsutil.
369    """
370    for host in glob.glob(os.path.join(dir_entry, '*')):
371        cts_path = os.path.join(host, 'cheets_CTS.*', 'results', '*',
372                                TIMESTAMP_PATTERN)
373        cts_v2_path = os.path.join(host, 'cheets_CTS_*', 'results', '*',
374                                   TIMESTAMP_PATTERN)
375        gts_v2_path = os.path.join(host, 'cheets_GTS.*', 'results', '*',
376                                   TIMESTAMP_PATTERN)
377        for result_path, result_pattern in [(cts_path, CTS_RESULT_PATTERN),
378                            (cts_v2_path, CTS_V2_RESULT_PATTERN),
379                            (gts_v2_path, CTS_V2_RESULT_PATTERN)]:
380            for path in glob.glob(result_path):
381                try:
382                    _upload_files(host, path, result_pattern, multiprocessing)
383                except Exception as e:
384                    logging.error('ERROR uploading test results %s to GS: %s',
385                                  path, e)
386
387
388def _is_valid_result(build, result_pattern, suite):
389    """Check if the result should be uploaded to CTS/GTS buckets.
390
391    @param build: Builder name.
392    @param result_pattern: XML result file pattern.
393    @param suite: Test suite name.
394
395    @returns: Bool flag indicating whether a valid result.
396    """
397    if build is None or suite is None:
398        return False
399
400    # Not valid if it's not a release build.
401    if not re.match(r'(?!trybot-).*-release/.*', build):
402        return False
403
404    # Not valid if it's cts result but not 'arc-cts*' or 'test_that_wrapper'
405    # suite.
406    whitelisted_suites = ['arc-cts', 'arc-cts-dev', 'arc-cts-beta',
407                          'arc-cts-stable', 'arc-cts-perbuild', 'arc-gts',
408                          'arc-gts-perbuild', 'test_that_wrapper']
409    result_patterns = [CTS_RESULT_PATTERN, CTS_V2_RESULT_PATTERN]
410    if result_pattern in result_patterns and suite not in whitelisted_suites:
411        return False
412
413    return True
414
415
416def _upload_files(host, path, result_pattern, multiprocessing):
417    keyval = models.test.parse_job_keyval(host)
418    build = keyval.get('build')
419    suite = keyval.get('suite')
420
421    if not _is_valid_result(build, result_pattern, suite):
422        # No need to upload current folder, return.
423        return
424
425    parent_job_id = str(keyval['parent_job_id'])
426
427    folders = path.split(os.sep)
428    job_id = folders[-6]
429    package = folders[-4]
430    timestamp = folders[-1]
431
432    # Path: bucket/build/parent_job_id/cheets_CTS.*/job_id_timestamp/
433    # or bucket/build/parent_job_id/cheets_GTS.*/job_id_timestamp/
434    cts_apfe_gs_path = os.path.join(
435            DEFAULT_CTS_APFE_GSURI, build, parent_job_id,
436            package, job_id + '_' + timestamp) + '/'
437
438    # Path: bucket/cheets_CTS.*/job_id_timestamp/
439    # or bucket/cheets_GTS.*/job_id_timestamp/
440    test_result_gs_path = os.path.join(
441            DEFAULT_CTS_RESULTS_GSURI, package,
442            job_id + '_' + timestamp) + '/'
443
444    for zip_file in glob.glob(os.path.join('%s.zip' % path)):
445        utils.run(' '.join(get_cmd_list(
446                multiprocessing, zip_file, cts_apfe_gs_path)))
447        logging.debug('Upload %s to %s ', zip_file, cts_apfe_gs_path)
448
449    for test_result_file in glob.glob(os.path.join(path, result_pattern)):
450        # gzip test_result_file(testResult.xml/test_result.xml)
451
452        test_result_file_gz =  '%s.gz' % test_result_file
453        with open(test_result_file, 'r') as f_in, (
454                gzip.open(test_result_file_gz, 'w')) as f_out:
455            shutil.copyfileobj(f_in, f_out)
456        utils.run(' '.join(get_cmd_list(
457                multiprocessing, test_result_file_gz, test_result_gs_path)))
458        logging.debug('Zip and upload %s to %s',
459                      test_result_file_gz, test_result_gs_path)
460        # Remove test_result_file_gz(testResult.xml.gz/test_result.xml.gz)
461        os.remove(test_result_file_gz)
462
463
464def _create_test_result_notification(gs_path, dir_entry):
465    """Construct a test result notification.
466
467    @param gs_path: The test result Google Cloud Storage URI.
468    @param dir_entry: The local offload directory name.
469
470    @returns The notification message.
471    """
472    gcs_uri = os.path.join(gs_path, os.path.basename(dir_entry))
473    logging.info('Notification on gcs_uri %s', gcs_uri)
474    data = base64.b64encode(NEW_TEST_RESULT_MESSAGE)
475    msg_payload = {'data': data}
476    msg_attributes = {}
477    msg_attributes[NOTIFICATION_ATTR_GCS_URI] = gcs_uri
478    msg_attributes[NOTIFICATION_ATTR_VERSION] = NOTIFICATION_VERSION
479    msg_attributes[NOTIFICATION_ATTR_MOBLAB_MAC] = \
480        site_utils.get_default_interface_mac_address()
481    msg_attributes[NOTIFICATION_ATTR_MOBLAB_ID] = site_utils.get_moblab_id()
482    msg_payload['attributes'] = msg_attributes
483
484    return msg_payload
485
486
487def get_offload_dir_func(gs_uri, multiprocessing, delete_age, pubsub_topic=None):
488    """Returns the offload directory function for the given gs_uri
489
490    @param gs_uri: Google storage bucket uri to offload to.
491    @param multiprocessing: True to turn on -m option for gsutil.
492    @param pubsub_topic: The pubsub topic to publish notificaitons. If None,
493          pubsub is not enabled.
494
495    @return offload_dir function to perform the offload.
496    """
497    @metrics.SecondsTimerDecorator(
498            'chromeos/autotest/gs_offloader/job_offload_duration')
499    def offload_dir(dir_entry, dest_path, job_complete_time):
500        """Offload the specified directory entry to Google storage.
501
502        @param dir_entry: Directory entry to offload.
503        @param dest_path: Location in google storage where we will
504                          offload the directory.
505        @param job_complete_time: The complete time of the job from the AFE
506                                  database.
507
508        """
509        error = False
510        stdout_file = None
511        stderr_file = None
512        try:
513            upload_signal_filename = '%s/%s/.GS_UPLOADED' % (
514                    RESULTS_DIR, dir_entry)
515            if not os.path.isfile(upload_signal_filename):
516                sanitize_dir(dir_entry)
517                if DEFAULT_CTS_RESULTS_GSURI:
518                    upload_testresult_files(dir_entry, multiprocessing)
519
520                if LIMIT_FILE_COUNT:
521                    limit_file_count(dir_entry)
522
523                stdout_file = tempfile.TemporaryFile('w+')
524                stderr_file = tempfile.TemporaryFile('w+')
525                process = None
526                signal.alarm(OFFLOAD_TIMEOUT_SECS)
527                gs_path = '%s%s' % (gs_uri, dest_path)
528                process = subprocess.Popen(
529                        get_cmd_list(multiprocessing, dir_entry, gs_path),
530                        stdout=stdout_file, stderr=stderr_file)
531                process.wait()
532                signal.alarm(0)
533
534                if process.returncode == 0:
535                    dir_size = get_directory_size_kibibytes(dir_entry)
536
537                    m_offload_count = (
538                            'chromeos/autotest/gs_offloader/jobs_offloaded')
539                    metrics.Counter(m_offload_count).increment()
540                    m_offload_size = ('chromeos/autotest/gs_offloader/'
541                                      'kilobytes_transferred')
542                    metrics.Counter(m_offload_size).increment_by(dir_size)
543
544                    if pubsub_topic:
545                        message = _create_test_result_notification(
546                                gs_path, dir_entry)
547                        msg_ids = pubsub_utils.publish_notifications(
548                                pubsub_topic, [message])
549                        if not msg_ids:
550                            error = True
551
552                    if not error:
553                        open(upload_signal_filename, 'a').close()
554                else:
555                    error = True
556            if os.path.isfile(upload_signal_filename):
557                if job_directories.is_job_expired(delete_age, job_complete_time):
558                    shutil.rmtree(dir_entry)
559
560        except TimeoutException:
561            m_timeout = 'chromeos/autotest/errors/gs_offloader/timed_out_count'
562            metrics.Counter(m_timeout).increment()
563            # If we finished the call to Popen(), we may need to
564            # terminate the child process.  We don't bother calling
565            # process.poll(); that inherently races because the child
566            # can die any time it wants.
567            if process:
568                try:
569                    process.terminate()
570                except OSError:
571                    # We don't expect any error other than "No such
572                    # process".
573                    pass
574            logging.error('Offloading %s timed out after waiting %d '
575                          'seconds.', dir_entry, OFFLOAD_TIMEOUT_SECS)
576            error = True
577        except OSError as e:
578            # The wrong file permission can lead call
579            # `shutil.rmtree(dir_entry)` to raise OSError with message
580            # 'Permission denied'. Details can be found in
581            # crbug.com/536151
582            if e.errno == errno.EACCES:
583                correct_results_folder_permission(dir_entry)
584            m_permission_error = ('chromeos/autotest/errors/gs_offloader/'
585                                  'wrong_permissions_count')
586            metrics.Counter(m_permission_error).increment()
587        finally:
588            signal.alarm(0)
589            if error:
590                # Rewind the log files for stdout and stderr and log
591                # their contents.
592                stdout_file.seek(0)
593                stderr_file.seek(0)
594                stderr_content = stderr_file.read()
595                logging.warning('Error occurred when offloading %s:', dir_entry)
596                logging.warning('Stdout:\n%s \nStderr:\n%s', stdout_file.read(),
597                                stderr_content)
598                # Some result files may have wrong file permission. Try
599                # to correct such error so later try can success.
600                # TODO(dshi): The code is added to correct result files
601                # with wrong file permission caused by bug 511778. After
602                # this code is pushed to lab and run for a while to
603                # clean up these files, following code and function
604                # correct_results_folder_permission can be deleted.
605                if 'CommandException: Error opening file' in stderr_content:
606                    correct_results_folder_permission(dir_entry)
607            if stdout_file:
608                stdout_file.close()
609            if stderr_file:
610                stderr_file.close()
611    return offload_dir
612
613
614def delete_files(dir_entry, dest_path, job_complete_time):
615    """Simply deletes the dir_entry from the filesystem.
616
617    Uses same arguments as offload_dir so that it can be used in replace
618    of it on systems that only want to delete files instead of
619    offloading them.
620
621    @param dir_entry: Directory entry to offload.
622    @param dest_path: NOT USED.
623    @param job_complete_time: NOT USED.
624    """
625    shutil.rmtree(dir_entry)
626
627
628def _format_job_for_failure_reporting(job):
629    """Formats a _JobDirectory for reporting / logging.
630
631    @param job: The _JobDirectory to format.
632    """
633    d = datetime.datetime.fromtimestamp(job.get_failure_time())
634    data = (d.strftime(FAILED_OFFLOADS_TIME_FORMAT),
635            job.get_failure_count(),
636            job.get_job_directory())
637    return FAILED_OFFLOADS_LINE_FORMAT % data
638
639
640def wait_for_gs_write_access(gs_uri):
641    """Verify and wait until we have write access to Google Storage.
642
643    @param gs_uri: The Google Storage URI we are trying to offload to.
644    """
645    # TODO (sbasi) Try to use the gsutil command to check write access.
646    # Ensure we have write access to gs_uri.
647    dummy_file = tempfile.NamedTemporaryFile()
648    test_cmd = get_cmd_list(False, dummy_file.name, gs_uri)
649    while True:
650        try:
651            subprocess.check_call(test_cmd)
652            subprocess.check_call(
653                    ['gsutil', 'rm',
654                     os.path.join(gs_uri,
655                                  os.path.basename(dummy_file.name))])
656            break
657        except subprocess.CalledProcessError:
658            logging.debug('Unable to offload to %s, sleeping.', gs_uri)
659            time.sleep(120)
660
661
662class Offloader(object):
663    """State of the offload process.
664
665    Contains the following member fields:
666      * _offload_func:  Function to call for each attempt to offload
667        a job directory.
668      * _jobdir_classes:  List of classes of job directory to be
669        offloaded.
670      * _processes:  Maximum number of outstanding offload processes
671        to allow during an offload cycle.
672      * _age_limit:  Minimum age in days at which a job may be
673        offloaded.
674      * _open_jobs: a dictionary mapping directory paths to Job
675        objects.
676    """
677
678    def __init__(self, options):
679        self._pubsub_topic = None
680        self._upload_age_limit = options.age_to_upload
681        self._delete_age_limit = options.age_to_delete
682        if options.delete_only:
683            self._offload_func = delete_files
684        else:
685            self.gs_uri = utils.get_offload_gsuri()
686            logging.debug('Offloading to: %s', self.gs_uri)
687            multiprocessing = False
688            if options.multiprocessing:
689                multiprocessing = True
690            elif options.multiprocessing is None:
691                multiprocessing = GS_OFFLOADER_MULTIPROCESSING
692            logging.info(
693                    'Offloader multiprocessing is set to:%r', multiprocessing)
694            if options.pubsub_topic_for_job_upload:
695              self._pubsub_topic = options.pubsub_topic_for_job_upload
696            elif _PUBSUB_ENABLED:
697              self._pubsub_topic = _PUBSUB_TOPIC
698            self._offload_func = get_offload_dir_func(
699                    self.gs_uri, multiprocessing, self._delete_age_limit,
700                    self._pubsub_topic)
701        classlist = []
702        if options.process_hosts_only or options.process_all:
703            classlist.append(job_directories.SpecialJobDirectory)
704        if not options.process_hosts_only:
705            classlist.append(job_directories.RegularJobDirectory)
706        self._jobdir_classes = classlist
707        assert self._jobdir_classes
708        self._processes = options.parallelism
709        self._open_jobs = {}
710        self._pusub_topic = None
711
712
713    def _add_new_jobs(self):
714        """Find new job directories that need offloading.
715
716        Go through the file system looking for valid job directories
717        that are currently not in `self._open_jobs`, and add them in.
718
719        """
720        new_job_count = 0
721        for cls in self._jobdir_classes:
722            for resultsdir in cls.get_job_directories():
723                if resultsdir in self._open_jobs:
724                    continue
725                self._open_jobs[resultsdir] = cls(resultsdir)
726                new_job_count += 1
727        logging.debug('Start of offload cycle - found %d new jobs',
728                      new_job_count)
729
730
731    def _remove_offloaded_jobs(self):
732        """Removed offloaded jobs from `self._open_jobs`."""
733        removed_job_count = 0
734        for jobkey, job in self._open_jobs.items():
735            if job.is_offloaded():
736                del self._open_jobs[jobkey]
737                removed_job_count += 1
738        logging.debug('End of offload cycle - cleared %d new jobs, '
739                      'carrying %d open jobs',
740                      removed_job_count, len(self._open_jobs))
741
742
743    def _update_offload_results(self):
744        """Check and report status after attempting offload.
745
746        This function processes all jobs in `self._open_jobs`, assuming
747        an attempt has just been made to offload all of them.
748
749        Any jobs that have been successfully offloaded are removed.
750
751        If any jobs have reportable errors, and we haven't generated
752        an e-mail report in the last `REPORT_INTERVAL_SECS` seconds,
753        send new e-mail describing the failures.
754
755        """
756        self._remove_offloaded_jobs()
757        failed_jobs = [j for j in self._open_jobs.values() if
758                       j.get_failure_time()]
759        self._report_failed_jobs_count(failed_jobs)
760        self._log_failed_jobs_locally(failed_jobs)
761
762
763    def offload_once(self):
764        """Perform one offload cycle.
765
766        Find all job directories for new jobs that we haven't seen
767        before.  Then, attempt to offload the directories for any
768        jobs that have finished running.  Offload of multiple jobs
769        is done in parallel, up to `self._processes` at a time.
770
771        After we've tried uploading all directories, go through the list
772        checking the status of all uploaded directories.  If necessary,
773        report failures via e-mail.
774
775        """
776        self._add_new_jobs()
777        self._report_current_jobs_count()
778        with parallel.BackgroundTaskRunner(
779                self._offload_func, processes=self._processes) as queue:
780            for job in self._open_jobs.values():
781                job.enqueue_offload(queue, self._upload_age_limit)
782        self._update_offload_results()
783
784
785    def _log_failed_jobs_locally(self, failed_jobs,
786                                 log_file=FAILED_OFFLOADS_FILE):
787        """Updates a local file listing all the failed jobs.
788
789        The dropped file can be used by the developers to list jobs that we have
790        failed to upload.
791
792        @param failed_jobs: A list of failed _JobDirectory objects.
793        @param log_file: The file to log the failed jobs to.
794        """
795        now = datetime.datetime.now()
796        now_str = now.strftime(FAILED_OFFLOADS_TIME_FORMAT)
797        formatted_jobs = [_format_job_for_failure_reporting(job)
798                            for job in failed_jobs]
799        formatted_jobs.sort()
800
801        with open(log_file, 'w') as logfile:
802            logfile.write(FAILED_OFFLOADS_FILE_HEADER %
803                          (now_str, len(failed_jobs)))
804            logfile.writelines(formatted_jobs)
805
806
807    def _report_current_jobs_count(self):
808        """Report the number of outstanding jobs to monarch."""
809        metrics.Gauge('chromeos/autotest/gs_offloader/current_jobs_count').set(
810                len(self._open_jobs))
811
812
813    def _report_failed_jobs_count(self, failed_jobs):
814        """Report the number of outstanding failed offload jobs to monarch.
815
816        @param: List of failed jobs.
817        """
818        metrics.Gauge('chromeos/autotest/gs_offloader/failed_jobs_count').set(
819                len(failed_jobs))
820
821
822def parse_options():
823    """Parse the args passed into gs_offloader."""
824    defaults = 'Defaults:\n  Destination: %s\n  Results Path: %s' % (
825            utils.DEFAULT_OFFLOAD_GSURI, RESULTS_DIR)
826    usage = 'usage: %prog [options]\n' + defaults
827    parser = OptionParser(usage)
828    parser.add_option('-a', '--all', dest='process_all',
829                      action='store_true',
830                      help='Offload all files in the results directory.')
831    parser.add_option('-s', '--hosts', dest='process_hosts_only',
832                      action='store_true',
833                      help='Offload only the special tasks result files '
834                      'located in the results/hosts subdirectory')
835    parser.add_option('-p', '--parallelism', dest='parallelism',
836                      type='int', default=1,
837                      help='Number of parallel workers to use.')
838    parser.add_option('-o', '--delete_only', dest='delete_only',
839                      action='store_true',
840                      help='GS Offloader will only the delete the '
841                      'directories and will not offload them to google '
842                      'storage. NOTE: If global_config variable '
843                      'CROS.gs_offloading_enabled is False, --delete_only '
844                      'is automatically True.',
845                      default=not GS_OFFLOADING_ENABLED)
846    parser.add_option('-d', '--days_old', dest='days_old',
847                      help='Minimum job age in days before a result can be '
848                      'offloaded.', type='int', default=0)
849    parser.add_option('-t', '--pubsub_topic_for_job_upload',
850                      dest='pubsub_topic_for_job_upload',
851                      help='The pubsub topic to send notifciations for '
852                      'new job upload',
853                      action='store', type='string', default=None)
854    parser.add_option('-l', '--log_size', dest='log_size',
855                      help='Limit the offloader logs to a specified '
856                      'number of Mega Bytes.', type='int', default=0)
857    parser.add_option('-m', dest='multiprocessing', action='store_true',
858                      help='Turn on -m option for gsutil. If not set, the '
859                      'global config setting gs_offloader_multiprocessing '
860                      'under CROS section is applied.')
861    parser.add_option('-i', '--offload_once', dest='offload_once',
862                      action='store_true',
863                      help='Upload all available results and then exit.')
864    parser.add_option('-y', '--normal_priority', dest='normal_priority',
865                      action='store_true',
866                      help='Upload using normal process priority.')
867    parser.add_option('-u', '--age_to_upload', dest='age_to_upload',
868                      help='Minimum job age in days before a result can be '
869                      'offloaded, but not removed from local storage',
870                      type='int', default=None)
871    parser.add_option('-n', '--age_to_delete', dest='age_to_delete',
872                      help='Minimum job age in days before a result can be '
873                      'removed from local storage',
874                      type='int', default=None)
875
876    options = parser.parse_args()[0]
877    if options.process_all and options.process_hosts_only:
878        parser.print_help()
879        print ('Cannot process all files and only the hosts '
880               'subdirectory. Please remove an argument.')
881        sys.exit(1)
882
883    if options.days_old and (options.age_to_upload or options.age_to_delete):
884        parser.print_help()
885        print('Use the days_old option or the age_to_* options but not both')
886        sys.exit(1)
887
888    if options.age_to_upload == None:
889        options.age_to_upload = options.days_old
890    if options.age_to_delete == None:
891        options.age_to_delete = options.days_old
892
893    return options
894
895
896def main():
897    """Main method of gs_offloader."""
898    options = parse_options()
899
900    if options.process_all:
901        offloader_type = 'all'
902    elif options.process_hosts_only:
903        offloader_type = 'hosts'
904    else:
905        offloader_type = 'jobs'
906
907    log_timestamp = time.strftime(LOG_TIMESTAMP_FORMAT)
908    if options.log_size > 0:
909        log_timestamp = ''
910    log_basename = LOG_FILENAME_FORMAT % (offloader_type, log_timestamp)
911    log_filename = os.path.join(LOG_LOCATION, log_basename)
912    log_formatter = logging.Formatter(LOGGING_FORMAT)
913    # Replace the default logging handler with a RotatingFileHandler. If
914    # options.log_size is 0, the file size will not be limited. Keeps
915    # one backup just in case.
916    handler = logging.handlers.RotatingFileHandler(
917            log_filename, maxBytes=1024 * options.log_size, backupCount=1)
918    handler.setFormatter(log_formatter)
919    logger = logging.getLogger()
920    logger.setLevel(logging.DEBUG)
921    logger.addHandler(handler)
922
923    # Nice our process (carried to subprocesses) so we don't overload
924    # the system.
925    if not options.normal_priority:
926        logging.debug('Set process to nice value: %d', NICENESS)
927        os.nice(NICENESS)
928    if psutil:
929        proc = psutil.Process()
930        logging.debug('Set process to ionice IDLE')
931        proc.ionice(psutil.IOPRIO_CLASS_IDLE)
932
933    # os.listdir returns relative paths, so change to where we need to
934    # be to avoid an os.path.join on each loop.
935    logging.debug('Offloading Autotest results in %s', RESULTS_DIR)
936    os.chdir(RESULTS_DIR)
937
938    signal.signal(signal.SIGALRM, timeout_handler)
939
940    with ts_mon_config.SetupTsMonGlobalState('gs_offloader', indirect=True,
941                                             short_lived=False):
942        offloader = Offloader(options)
943        if not options.delete_only:
944            wait_for_gs_write_access(offloader.gs_uri)
945        while True:
946            offloader.offload_once()
947            if options.offload_once:
948                break
949            time.sleep(SLEEP_TIME_SECS)
950
951
952if __name__ == '__main__':
953    main()
954