18d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# -*- coding: utf-8 -*- 28d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# Copyright 2010 Google Inc. All Rights Reserved. 38d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# 48d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# Licensed under the Apache License, Version 2.0 (the "License"); 58d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# you may not use this file except in compliance with the License. 68d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# You may obtain a copy of the License at 78d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# 88d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# http://www.apache.org/licenses/LICENSE-2.0 98d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# 108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# Unless required by applicable law or agreed to in writing, software 118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# distributed under the License is distributed on an "AS IS" BASIS, 128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# See the License for the specific language governing permissions and 148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# limitations under the License. 158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi"""Base class for gsutil commands. 168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi KandoiIn addition to base class code, this file contains helpers that depend on base 188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiclass state (such as GetAndPrintAcl) In general, functions that depend on 198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiclass state and that are used by multiple commands belong in this file. 208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi KandoiFunctions that don't depend on class state belong in util.py, and non-shared 218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoihelpers belong in individual subclasses. 228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi""" 238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom __future__ import absolute_import 258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport codecs 278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom collections import namedtuple 288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport copy 298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport getopt 308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport logging 318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport multiprocessing 328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport os 338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport Queue 348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport signal 358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport sys 368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport textwrap 378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport threading 388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport traceback 398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport boto 418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom boto.storage_uri import StorageUri 428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport gslib 438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.cloud_api import AccessDeniedException 448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.cloud_api import ArgumentException 458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.cloud_api import ServiceException 468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.cloud_api_delegator import CloudApiDelegator 478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.cs_api_map import ApiSelector 488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.cs_api_map import GsutilApiMapFactory 498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.exception import CommandException 508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.help_provider import HelpProvider 518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.name_expansion import NameExpansionIterator 528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.name_expansion import NameExpansionResult 53cef7893435aa41160dd1255c43cb8498279738ccChris Craikfrom gslib.parallelism_framework_util import AtomicDict 548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.plurality_checkable_iterator import PluralityCheckableIterator 558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.sig_handling import RegisterSignalHandler 568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.storage_url import StorageUrlFromString 578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages 588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.translation_helper import AclTranslation 59cef7893435aa41160dd1255c43cb8498279738ccChris Craikfrom gslib.translation_helper import PRIVATE_DEFAULT_OBJ_ACL 60cef7893435aa41160dd1255c43cb8498279738ccChris Craikfrom gslib.util import CheckMultiprocessingAvailableAndInit 618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.util import GetConfigFilePath 628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.util import GsutilStreamHandler 638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.util import HaveFileUrls 648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.util import HaveProviderUrls 658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.util import IS_WINDOWS 668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.util import NO_MAX 678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.util import UrlsAreForSingleProvider 688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.util import UTF8 698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.wildcard_iterator import CreateWildcardIterator 708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi KandoiOFFER_GSUTIL_M_SUGGESTION_THRESHOLD = 5 728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiif IS_WINDOWS: 748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi import ctypes # pylint: disable=g-import-not-at-top 758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef _DefaultExceptionHandler(cls, e): 788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi cls.logger.exception(e) 798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef CreateGsutilLogger(command_name): 828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Creates a logger that resembles 'print' output. 838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi This logger abides by gsutil -d/-D/-DD/-q options. 858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi By default (if none of the above options is specified) the logger will display 878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi all messages logged with level INFO or above. Log propagation is disabled. 888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Args: 908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi command_name: Command name to create logger for. 918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Returns: 938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi A logger object. 948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi log = logging.getLogger(command_name) 968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi log.propagate = False 978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi log.setLevel(logging.root.level) 988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi log_handler = GsutilStreamHandler() 998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi log_handler.setFormatter(logging.Formatter('%(message)s')) 1008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Commands that call other commands (like mv) would cause log handlers to be 1018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # added more than once, so avoid adding if one is already present. 1028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if not log.handlers: 1038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi log.addHandler(log_handler) 1048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return log 1058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef _UrlArgChecker(command_instance, url): 1088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if not command_instance.exclude_symlinks: 1098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return True 1108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi exp_src_url = url.expanded_storage_url 1118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if exp_src_url.IsFileUrl() and os.path.islink(exp_src_url.object_name): 1128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi command_instance.logger.info('Skipping symbolic link %s...', exp_src_url) 1138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return False 1148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return True 1158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef DummyArgChecker(*unused_args): 1188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return True 1198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef SetAclFuncWrapper(cls, name_expansion_result, thread_state=None): 1228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return cls.SetAclFunc(name_expansion_result, thread_state=thread_state) 1238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef SetAclExceptionHandler(cls, e): 1268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Exception handler that maintains state about post-completion status.""" 1278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi cls.logger.error(str(e)) 1288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi cls.everything_set_okay = False 1298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# We will keep this list of all thread- or process-safe queues ever created by 1318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# the main thread so that we can forcefully kill them upon shutdown. Otherwise, 1328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# we encounter a Python bug in which empty queues block forever on join (which 1338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# is called as part of the Python exit function cleanup) under the impression 1348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# that they are non-empty. 1358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# However, this also lets us shut down somewhat more cleanly when interrupted. 1368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiqueues = [] 1378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef _NewMultiprocessingQueue(): 1408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi queue = multiprocessing.Queue(MAX_QUEUE_SIZE) 1418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi queues.append(queue) 1428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return queue 1438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef _NewThreadsafeQueue(): 1468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi queue = Queue.Queue(MAX_QUEUE_SIZE) 1478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi queues.append(queue) 1488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return queue 1498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# The maximum size of a process- or thread-safe queue. Imposing this limit 1518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# prevents us from needing to hold an arbitrary amount of data in memory. 1528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# However, setting this number too high (e.g., >= 32768 on OS X) can cause 1538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# problems on some operating systems. 1548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi KandoiMAX_QUEUE_SIZE = 32500 1558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# That maximum depth of the tree of recursive calls to command.Apply. This is 1578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# an arbitrary limit put in place to prevent developers from accidentally 1588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# causing problems with infinite recursion, and it can be increased if needed. 1598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi KandoiMAX_RECURSIVE_DEPTH = 5 1608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi KandoiZERO_TASKS_TO_DO_ARGUMENT = ('There were no', 'tasks to do') 1628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# Map from deprecated aliases to the current command and subcommands that 1648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# provide the same behavior. 1658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# TODO: Remove this map and deprecate old commands on 9/9/14. 1668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi KandoiOLD_ALIAS_MAP = {'chacl': ['acl', 'ch'], 1678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'getacl': ['acl', 'get'], 1688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'setacl': ['acl', 'set'], 1698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'getcors': ['cors', 'get'], 1708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'setcors': ['cors', 'set'], 1718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'chdefacl': ['defacl', 'ch'], 1728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'getdefacl': ['defacl', 'get'], 1738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'setdefacl': ['defacl', 'set'], 1748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'disablelogging': ['logging', 'set', 'off'], 1758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'enablelogging': ['logging', 'set', 'on'], 1768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'getlogging': ['logging', 'get'], 1778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'getversioning': ['versioning', 'get'], 1788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'setversioning': ['versioning', 'set'], 1798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'getwebcfg': ['web', 'get'], 1808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'setwebcfg': ['web', 'set']} 1818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# Declare all of the module level variables - see 1848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# InitializeMultiprocessingVariables for an explanation of why this is 1858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# necessary. 1868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# pylint: disable=global-at-module-level 1878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiglobal manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter 1888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiglobal total_tasks, call_completed_map, global_return_values_map 1898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiglobal need_pool_or_done_cond, caller_id_finished_count, new_pool_needed 1908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiglobal current_max_recursive_level, shared_vars_map, shared_vars_list_map 1918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiglobal class_map, worker_checking_level_lock, failure_count 1928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef InitializeMultiprocessingVariables(): 1958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Initializes module-level variables that will be inherited by subprocesses. 1968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi On Windows, a multiprocessing.Manager object should only 1988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi be created within an "if __name__ == '__main__':" block. This function 1998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi must be called, otherwise every command that calls Command.Apply will fail. 2008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 2018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # This list of global variables must exactly match the above list of 2028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # declarations. 2038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # pylint: disable=global-variable-undefined 2048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi global manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter 2058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi global total_tasks, call_completed_map, global_return_values_map 2068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi global need_pool_or_done_cond, caller_id_finished_count, new_pool_needed 2078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi global current_max_recursive_level, shared_vars_map, shared_vars_list_map 2088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi global class_map, worker_checking_level_lock, failure_count 2098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 2108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi manager = multiprocessing.Manager() 2118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 2128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi consumer_pools = [] 2138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 2148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # List of all existing task queues - used by all pools to find the queue 2158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # that's appropriate for the given recursive_apply_level. 2168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi task_queues = [] 2178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 2188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Used to assign a globally unique caller ID to each Apply call. 2198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi caller_id_lock = manager.Lock() 2208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi caller_id_counter = multiprocessing.Value('i', 0) 2218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 2228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Map from caller_id to total number of tasks to be completed for that ID. 223cef7893435aa41160dd1255c43cb8498279738ccChris Craik total_tasks = AtomicDict(manager=manager) 2248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 2258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Map from caller_id to a boolean which is True iff all its tasks are 2268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # finished. 227cef7893435aa41160dd1255c43cb8498279738ccChris Craik call_completed_map = AtomicDict(manager=manager) 2288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 2298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Used to keep track of the set of return values for each caller ID. 230cef7893435aa41160dd1255c43cb8498279738ccChris Craik global_return_values_map = AtomicDict(manager=manager) 2318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 2328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Condition used to notify any waiting threads that a task has finished or 2338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # that a call to Apply needs a new set of consumer processes. 2348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi need_pool_or_done_cond = manager.Condition() 2358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 2368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Lock used to prevent multiple worker processes from asking the main thread 2378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # to create a new consumer pool for the same level. 2388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi worker_checking_level_lock = manager.Lock() 2398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 2408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Map from caller_id to the current number of completed tasks for that ID. 241cef7893435aa41160dd1255c43cb8498279738ccChris Craik caller_id_finished_count = AtomicDict(manager=manager) 2428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 2438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Used as a way for the main thread to distinguish between being woken up 2448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # by another call finishing and being woken up by a call that needs a new set 2458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # of consumer processes. 2468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi new_pool_needed = multiprocessing.Value('i', 0) 2478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 2488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi current_max_recursive_level = multiprocessing.Value('i', 0) 2498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 2508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Map from (caller_id, name) to the value of that shared variable. 251cef7893435aa41160dd1255c43cb8498279738ccChris Craik shared_vars_map = AtomicDict(manager=manager) 252cef7893435aa41160dd1255c43cb8498279738ccChris Craik shared_vars_list_map = AtomicDict(manager=manager) 2538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 2548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Map from caller_id to calling class. 2558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi class_map = manager.dict() 2568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 2578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Number of tasks that resulted in an exception in calls to Apply(). 2588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi failure_count = multiprocessing.Value('i', 0) 2598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 2608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 261cef7893435aa41160dd1255c43cb8498279738ccChris Craikdef InitializeThreadingVariables(): 262cef7893435aa41160dd1255c43cb8498279738ccChris Craik """Initializes module-level variables used when running multi-threaded. 263cef7893435aa41160dd1255c43cb8498279738ccChris Craik 264cef7893435aa41160dd1255c43cb8498279738ccChris Craik When multiprocessing is not available (or on Windows where only 1 process 265cef7893435aa41160dd1255c43cb8498279738ccChris Craik is used), thread-safe analogs to the multiprocessing global variables 266cef7893435aa41160dd1255c43cb8498279738ccChris Craik must be initialized. This function is the thread-safe analog to 267cef7893435aa41160dd1255c43cb8498279738ccChris Craik InitializeMultiprocessingVariables. 268cef7893435aa41160dd1255c43cb8498279738ccChris Craik """ 269cef7893435aa41160dd1255c43cb8498279738ccChris Craik # pylint: disable=global-variable-undefined 270cef7893435aa41160dd1255c43cb8498279738ccChris Craik global global_return_values_map, shared_vars_map, failure_count 271cef7893435aa41160dd1255c43cb8498279738ccChris Craik global caller_id_finished_count, shared_vars_list_map, total_tasks 272cef7893435aa41160dd1255c43cb8498279738ccChris Craik global need_pool_or_done_cond, call_completed_map, class_map 273cef7893435aa41160dd1255c43cb8498279738ccChris Craik global task_queues, caller_id_lock, caller_id_counter 274cef7893435aa41160dd1255c43cb8498279738ccChris Craik caller_id_counter = 0 275cef7893435aa41160dd1255c43cb8498279738ccChris Craik caller_id_finished_count = AtomicDict() 276cef7893435aa41160dd1255c43cb8498279738ccChris Craik caller_id_lock = threading.Lock() 277cef7893435aa41160dd1255c43cb8498279738ccChris Craik call_completed_map = AtomicDict() 278cef7893435aa41160dd1255c43cb8498279738ccChris Craik class_map = AtomicDict() 279cef7893435aa41160dd1255c43cb8498279738ccChris Craik failure_count = 0 280cef7893435aa41160dd1255c43cb8498279738ccChris Craik global_return_values_map = AtomicDict() 281cef7893435aa41160dd1255c43cb8498279738ccChris Craik need_pool_or_done_cond = threading.Condition() 282cef7893435aa41160dd1255c43cb8498279738ccChris Craik shared_vars_list_map = AtomicDict() 283cef7893435aa41160dd1255c43cb8498279738ccChris Craik shared_vars_map = AtomicDict() 284cef7893435aa41160dd1255c43cb8498279738ccChris Craik task_queues = [] 285cef7893435aa41160dd1255c43cb8498279738ccChris Craik total_tasks = AtomicDict() 286cef7893435aa41160dd1255c43cb8498279738ccChris Craik 287cef7893435aa41160dd1255c43cb8498279738ccChris Craik 2888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# Each subclass of Command must define a property named 'command_spec' that is 2898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# an instance of the following class. 2908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi KandoiCommandSpec = namedtuple('CommandSpec', [ 2918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Name of command. 2928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'command_name', 2938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Usage synopsis. 2948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'usage_synopsis', 2958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # List of command name aliases. 2968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'command_name_aliases', 2978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Min number of args required by this command. 2988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'min_args', 2998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Max number of args required by this command, or NO_MAX. 3008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'max_args', 3018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Getopt-style string specifying acceptable sub args. 3028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'supported_sub_args', 3038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # True if file URLs are acceptable for this command. 3048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'file_url_ok', 3058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # True if provider-only URLs are acceptable for this command. 3068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'provider_url_ok', 3078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Index in args of first URL arg. 3088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'urls_start_arg', 3098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # List of supported APIs 3108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'gs_api_support', 3118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Default API to use for this command 3128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'gs_default_api', 3138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Private arguments (for internal testing) 3148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'supported_private_args', 3158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'argparse_arguments', 3168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi]) 3178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 3188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 3198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiclass Command(HelpProvider): 3208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Base class for all gsutil commands.""" 3218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 3228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Each subclass must override this with an instance of CommandSpec. 3238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi command_spec = None 3248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 3258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi _commands_with_subcommands_and_subopts = ['acl', 'defacl', 'logging', 'web', 3268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'notification'] 3278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 3288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # This keeps track of the recursive depth of the current call to Apply. 3298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi recursive_apply_level = 0 3308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 3318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # If the multiprocessing module isn't available, we'll use this to keep track 3328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # of the caller_id. 3338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi sequential_caller_id = -1 3348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 3358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi @staticmethod 3368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def CreateCommandSpec(command_name, usage_synopsis=None, 3378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi command_name_aliases=None, min_args=0, 3388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi max_args=NO_MAX, supported_sub_args='', 3398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi file_url_ok=False, provider_url_ok=False, 3408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi urls_start_arg=0, gs_api_support=None, 3418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi gs_default_api=None, supported_private_args=None, 3428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi argparse_arguments=None): 3438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Creates an instance of CommandSpec, with defaults.""" 3448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return CommandSpec( 3458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi command_name=command_name, 3468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi usage_synopsis=usage_synopsis, 3478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi command_name_aliases=command_name_aliases or [], 3488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi min_args=min_args, 3498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi max_args=max_args, 3508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi supported_sub_args=supported_sub_args, 3518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi file_url_ok=file_url_ok, 3528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi provider_url_ok=provider_url_ok, 3538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi urls_start_arg=urls_start_arg, 3548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi gs_api_support=gs_api_support or [ApiSelector.XML], 3558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi gs_default_api=gs_default_api or ApiSelector.XML, 3568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi supported_private_args=supported_private_args, 3578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi argparse_arguments=argparse_arguments or []) 3588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 3598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Define a convenience property for command name, since it's used many places. 3608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def _GetDefaultCommandName(self): 3618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return self.command_spec.command_name 3628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi command_name = property(_GetDefaultCommandName) 3638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 3648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def _CalculateUrlsStartArg(self): 3658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Calculate the index in args of the first URL arg. 3668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 3678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Returns: 3688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Index of the first URL arg (according to the command spec). 3698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 3708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return self.command_spec.urls_start_arg 3718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 3728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def _TranslateDeprecatedAliases(self, args): 3738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Map deprecated aliases to the corresponding new command, and warn.""" 3748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi new_command_args = OLD_ALIAS_MAP.get(self.command_alias_used, None) 3758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if new_command_args: 3768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Prepend any subcommands for the new command. The command name itself 3778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # is not part of the args, so leave it out. 3788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi args = new_command_args[1:] + args 3798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.logger.warn('\n'.join(textwrap.wrap( 3808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi ('You are using a deprecated alias, "%(used_alias)s", for the ' 3818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi '"%(command_name)s" command. This will stop working on 9/9/2014. ' 3828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'Please use "%(command_name)s" with the appropriate sub-command in ' 3838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'the future. See "gsutil help %(command_name)s" for details.') % 3848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi {'used_alias': self.command_alias_used, 3858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'command_name': self.command_name}))) 3868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return args 3878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 388cef7893435aa41160dd1255c43cb8498279738ccChris Craik def __init__(self, command_runner, args, headers, debug, trace_token, 389cef7893435aa41160dd1255c43cb8498279738ccChris Craik parallel_operations, bucket_storage_uri_class, 390cef7893435aa41160dd1255c43cb8498279738ccChris Craik gsutil_api_class_map_factory, logging_filters=None, 3918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi command_alias_used=None): 3928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Instantiates a Command. 3938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 3948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Args: 3958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi command_runner: CommandRunner (for commands built atop other commands). 3968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi args: Command-line args (arg0 = actual arg, not command name ala bash). 3978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi headers: Dictionary containing optional HTTP headers to pass to boto. 3988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi debug: Debug level to pass in to boto connection (range 0..3). 399cef7893435aa41160dd1255c43cb8498279738ccChris Craik trace_token: Trace token to pass to the API implementation. 4008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi parallel_operations: Should command operations be executed in parallel? 4018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi bucket_storage_uri_class: Class to instantiate for cloud StorageUris. 4028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Settable for testing/mocking. 4038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi gsutil_api_class_map_factory: Creates map of cloud storage interfaces. 4048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Settable for testing/mocking. 405cef7893435aa41160dd1255c43cb8498279738ccChris Craik logging_filters: Optional list of logging. Filters to apply to this 4068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi command's logger. 4078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi command_alias_used: The alias that was actually used when running this 4088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi command (as opposed to the "official" command name, 4098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi which will always correspond to the file name). 4108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 4118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Implementation note: subclasses shouldn't need to define an __init__ 4128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi method, and instead depend on the shared initialization that happens 4138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi here. If you do define an __init__ method in a subclass you'll need to 4148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi explicitly call super().__init__(). But you're encouraged not to do this, 4158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi because it will make changing the __init__ interface more painful. 4168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 4178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Save class values from constructor params. 4188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.command_runner = command_runner 4198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.unparsed_args = args 4208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.headers = headers 4218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.debug = debug 422cef7893435aa41160dd1255c43cb8498279738ccChris Craik self.trace_token = trace_token 4238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.parallel_operations = parallel_operations 4248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.bucket_storage_uri_class = bucket_storage_uri_class 4258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.gsutil_api_class_map_factory = gsutil_api_class_map_factory 4268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.exclude_symlinks = False 4278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.recursion_requested = False 4288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.all_versions = False 4298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.command_alias_used = command_alias_used 4308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 4318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Global instance of a threaded logger object. 4328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.logger = CreateGsutilLogger(self.command_name) 4338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if logging_filters: 4348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi for log_filter in logging_filters: 4358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.logger.addFilter(log_filter) 4368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 4378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if self.command_spec is None: 4388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise CommandException('"%s" command implementation is missing a ' 4398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'command_spec definition.' % self.command_name) 4408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 4418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Parse and validate args. 4428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.args = self._TranslateDeprecatedAliases(args) 4438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.ParseSubOpts() 4448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 4458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Named tuple public functions start with _ 4468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # pylint: disable=protected-access 4478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.command_spec = self.command_spec._replace( 4488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi urls_start_arg=self._CalculateUrlsStartArg()) 4498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 4508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if (len(self.args) < self.command_spec.min_args 4518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi or len(self.args) > self.command_spec.max_args): 4528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.RaiseWrongNumberOfArgumentsException() 4538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 4548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if self.command_name not in self._commands_with_subcommands_and_subopts: 4558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.CheckArguments() 4568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 4578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Build the support and default maps from the command spec. 4588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi support_map = { 4598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'gs': self.command_spec.gs_api_support, 4608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 's3': [ApiSelector.XML] 4618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi } 4628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi default_map = { 4638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'gs': self.command_spec.gs_default_api, 4648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 's3': ApiSelector.XML 4658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi } 4668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.gsutil_api_map = GsutilApiMapFactory.GetApiMap( 4678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.gsutil_api_class_map_factory, support_map, default_map) 4688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 4698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.project_id = None 4708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.gsutil_api = CloudApiDelegator( 4718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi bucket_storage_uri_class, self.gsutil_api_map, 472cef7893435aa41160dd1255c43cb8498279738ccChris Craik self.logger, debug=self.debug, trace_token=self.trace_token) 4738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 4748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Cross-platform path to run gsutil binary. 4758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.gsutil_cmd = '' 4768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # If running on Windows, invoke python interpreter explicitly. 4778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if gslib.util.IS_WINDOWS: 4788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.gsutil_cmd += 'python ' 4798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Add full path to gsutil to make sure we test the correct version. 4808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.gsutil_path = gslib.GSUTIL_PATH 4818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.gsutil_cmd += self.gsutil_path 4828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 4838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # We're treating recursion_requested like it's used by all commands, but 4848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # only some of the commands accept the -R option. 4858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if self.sub_opts: 4868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi for o, unused_a in self.sub_opts: 4878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if o == '-r' or o == '-R': 4888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.recursion_requested = True 4898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi break 4908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 491cef7893435aa41160dd1255c43cb8498279738ccChris Craik self.multiprocessing_is_available = ( 492cef7893435aa41160dd1255c43cb8498279738ccChris Craik CheckMultiprocessingAvailableAndInit().is_available) 4938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 4948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def RaiseWrongNumberOfArgumentsException(self): 4958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Raises exception for wrong number of arguments supplied to command.""" 4968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if len(self.args) < self.command_spec.min_args: 4978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi tail_str = 's' if self.command_spec.min_args > 1 else '' 4988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi message = ('The %s command requires at least %d argument%s.' % 4998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi (self.command_name, self.command_spec.min_args, tail_str)) 5008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: 5018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi message = ('The %s command accepts at most %d arguments.' % 5028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi (self.command_name, self.command_spec.max_args)) 5038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi message += ' Usage:\n%s\nFor additional help run:\n gsutil help %s' % ( 5048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.command_spec.usage_synopsis, self.command_name) 5058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise CommandException(message) 5068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 5078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def RaiseInvalidArgumentException(self): 5088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Raises exception for specifying an invalid argument to command.""" 5098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi message = ('Incorrect option(s) specified. Usage:\n%s\n' 5108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'For additional help run:\n gsutil help %s' % ( 5118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.command_spec.usage_synopsis, self.command_name)) 5128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise CommandException(message) 5138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 5148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def ParseSubOpts(self, check_args=False): 5158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Parses sub-opt args. 5168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 5178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Args: 5188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi check_args: True to have CheckArguments() called after parsing. 5198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 5208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Populates: 5218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi (self.sub_opts, self.args) from parsing. 5228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 5238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Raises: RaiseInvalidArgumentException if invalid args specified. 5248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 5258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi try: 5268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.sub_opts, self.args = getopt.getopt( 5278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.args, self.command_spec.supported_sub_args, 5288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.command_spec.supported_private_args or []) 5298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi except getopt.GetoptError: 5308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.RaiseInvalidArgumentException() 5318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if check_args: 5328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.CheckArguments() 5338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 5348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def CheckArguments(self): 5358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Checks that command line arguments match the command_spec. 5368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 5378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Any commands in self._commands_with_subcommands_and_subopts are responsible 5388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi for calling this method after handling initial parsing of their arguments. 5398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi This prevents commands with sub-commands as well as options from breaking 5408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi the parsing of getopt. 5418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 5428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi TODO: Provide a function to parse commands and sub-commands more 5438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi intelligently once we stop allowing the deprecated command versions. 5448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 5458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Raises: 5468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi CommandException if the arguments don't match. 5478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 5488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 5498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if (not self.command_spec.file_url_ok 5508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi and HaveFileUrls(self.args[self.command_spec.urls_start_arg:])): 5518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise CommandException('"%s" command does not support "file://" URLs. ' 5528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'Did you mean to use a gs:// URL?' % 5538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.command_name) 5548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if (not self.command_spec.provider_url_ok 5558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi and HaveProviderUrls(self.args[self.command_spec.urls_start_arg:])): 5568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise CommandException('"%s" command does not support provider-only ' 5578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'URLs.' % self.command_name) 5588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 5598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def WildcardIterator(self, url_string, all_versions=False): 5608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Helper to instantiate gslib.WildcardIterator. 5618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 5628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Args are same as gslib.WildcardIterator interface, but this method fills in 5638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi most of the values from instance state. 5648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 5658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Args: 5668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi url_string: URL string naming wildcard objects to iterate. 5678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi all_versions: If true, the iterator yields all versions of objects 5688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi matching the wildcard. If false, yields just the live 5698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi object version. 5708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 5718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Returns: 5728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi WildcardIterator for use by caller. 5738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 5748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return CreateWildcardIterator( 5758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi url_string, self.gsutil_api, all_versions=all_versions, 5768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi debug=self.debug, project_id=self.project_id) 5778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 5788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def RunCommand(self): 5798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Abstract function in base class. Subclasses must implement this. 5808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 5818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi The return value of this function will be used as the exit status of the 5828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi process, so subclass commands should return an integer exit code (0 for 5838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi success, a value in [1,255] for failure). 5848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 5858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise CommandException('Command %s is missing its RunCommand() ' 5868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'implementation' % self.command_name) 5878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 5888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi ############################################################ 5898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Shared helper functions that depend on base class state. # 5908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi ############################################################ 5918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 5928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def ApplyAclFunc(self, acl_func, acl_excep_handler, url_strs): 5938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Sets the standard or default object ACL depending on self.command_name. 5948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 5958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Args: 5968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi acl_func: ACL function to be passed to Apply. 5978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi acl_excep_handler: ACL exception handler to be passed to Apply. 5988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi url_strs: URL strings on which to set ACL. 5998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 6008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Raises: 6018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi CommandException if an ACL could not be set. 6028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 6038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi multi_threaded_url_args = [] 6048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Handle bucket ACL setting operations single-threaded, because 6058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # our threading machinery currently assumes it's working with objects 6068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # (name_expansion_iterator), and normally we wouldn't expect users to need 6078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # to set ACLs on huge numbers of buckets at once anyway. 6088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi for url_str in url_strs: 6098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi url = StorageUrlFromString(url_str) 6108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if url.IsCloudUrl() and url.IsBucket(): 6118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if self.recursion_requested: 6128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # If user specified -R option, convert any bucket args to bucket 6138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # wildcards (e.g., gs://bucket/*), to prevent the operation from 6148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # being applied to the buckets themselves. 6158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi url.object_name = '*' 6168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi multi_threaded_url_args.append(url.url_string) 6178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: 6188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Convert to a NameExpansionResult so we can re-use the threaded 6198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # function for the single-threaded implementation. RefType is unused. 6208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi for blr in self.WildcardIterator(url.url_string).IterBuckets( 6218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi bucket_fields=['id']): 6228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi name_expansion_for_url = NameExpansionResult( 6238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi url, False, False, blr.storage_url) 6248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi acl_func(self, name_expansion_for_url) 6258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: 6268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi multi_threaded_url_args.append(url_str) 6278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 6288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if len(multi_threaded_url_args) >= 1: 6298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi name_expansion_iterator = NameExpansionIterator( 6308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.command_name, self.debug, 6318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.logger, self.gsutil_api, 6328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi multi_threaded_url_args, self.recursion_requested, 6338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi all_versions=self.all_versions, 6348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi continue_on_error=self.continue_on_error or self.parallel_operations) 6358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 6368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Perform requests in parallel (-m) mode, if requested, using 6378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # configured number of parallel processes and threads. Otherwise, 6388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # perform requests with sequential function calls in current process. 6398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.Apply(acl_func, name_expansion_iterator, acl_excep_handler, 6408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi fail_on_error=not self.continue_on_error) 6418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 6428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if not self.everything_set_okay and not self.continue_on_error: 6438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise CommandException('ACLs for some objects could not be set.') 6448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 6458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def SetAclFunc(self, name_expansion_result, thread_state=None): 6468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Sets the object ACL for the name_expansion_result provided. 6478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 6488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Args: 6498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi name_expansion_result: NameExpansionResult describing the target object. 6508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi thread_state: If present, use this gsutil Cloud API instance for the set. 6518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 6528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if thread_state: 6538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi assert not self.def_acl 6548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi gsutil_api = thread_state 6558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: 6568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi gsutil_api = self.gsutil_api 6578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi op_string = 'default object ACL' if self.def_acl else 'ACL' 6588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi url = name_expansion_result.expanded_storage_url 6598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.logger.info('Setting %s on %s...', op_string, url) 6608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if (gsutil_api.GetApiSelector(url.scheme) == ApiSelector.XML 6618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi and url.scheme != 'gs'): 6628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # If we are called with a non-google ACL model, we need to use the XML 6638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # passthrough. acl_arg should either be a canned ACL or an XML ACL. 6648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self._SetAclXmlPassthrough(url, gsutil_api) 6658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: 6668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Normal Cloud API path. acl_arg is a JSON ACL or a canned ACL. 6678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self._SetAclGsutilApi(url, gsutil_api) 6688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 6698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def _SetAclXmlPassthrough(self, url, gsutil_api): 6708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Sets the ACL for the URL provided using the XML passthrough functions. 6718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 6728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi This function assumes that self.def_acl, self.canned, 6738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi and self.continue_on_error are initialized, and that self.acl_arg is 6748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi either an XML string or a canned ACL string. 6758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 6768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Args: 6778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi url: CloudURL to set the ACL on. 6788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi gsutil_api: gsutil Cloud API to use for the ACL set. Must support XML 6798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi passthrough functions. 6808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 6818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi try: 6828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi orig_prefer_api = gsutil_api.prefer_api 6838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi gsutil_api.prefer_api = ApiSelector.XML 6848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi gsutil_api.XmlPassThroughSetAcl( 6858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.acl_arg, url, canned=self.canned, 6868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def_obj_acl=self.def_acl, provider=url.scheme) 6878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi except ServiceException as e: 6888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if self.continue_on_error: 6898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.everything_set_okay = False 6908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.logger.error(e) 6918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: 6928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise 6938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi finally: 6948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi gsutil_api.prefer_api = orig_prefer_api 6958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 6968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def _SetAclGsutilApi(self, url, gsutil_api): 6978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Sets the ACL for the URL provided using the gsutil Cloud API. 6988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 6998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi This function assumes that self.def_acl, self.canned, 7008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi and self.continue_on_error are initialized, and that self.acl_arg is 7018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi either a JSON string or a canned ACL string. 7028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 7038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Args: 7048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi url: CloudURL to set the ACL on. 7058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi gsutil_api: gsutil Cloud API to use for the ACL set. 7068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 7078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi try: 7088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if url.IsBucket(): 7098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if self.def_acl: 7108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if self.canned: 7118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi gsutil_api.PatchBucket( 7128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi url.bucket_name, apitools_messages.Bucket(), 7138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi canned_def_acl=self.acl_arg, provider=url.scheme, fields=['id']) 7148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: 7158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def_obj_acl = AclTranslation.JsonToMessage( 7168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.acl_arg, apitools_messages.ObjectAccessControl) 717cef7893435aa41160dd1255c43cb8498279738ccChris Craik if not def_obj_acl: 718cef7893435aa41160dd1255c43cb8498279738ccChris Craik # Use a sentinel value to indicate a private (no entries) default 719cef7893435aa41160dd1255c43cb8498279738ccChris Craik # object ACL. 720cef7893435aa41160dd1255c43cb8498279738ccChris Craik def_obj_acl.append(PRIVATE_DEFAULT_OBJ_ACL) 7218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi bucket_metadata = apitools_messages.Bucket( 7228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi defaultObjectAcl=def_obj_acl) 7238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi gsutil_api.PatchBucket(url.bucket_name, bucket_metadata, 7248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi provider=url.scheme, fields=['id']) 7258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: 7268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if self.canned: 7278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi gsutil_api.PatchBucket( 7288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi url.bucket_name, apitools_messages.Bucket(), 7298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi canned_acl=self.acl_arg, provider=url.scheme, fields=['id']) 7308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: 7318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi bucket_acl = AclTranslation.JsonToMessage( 7328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.acl_arg, apitools_messages.BucketAccessControl) 7338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi bucket_metadata = apitools_messages.Bucket(acl=bucket_acl) 7348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi gsutil_api.PatchBucket(url.bucket_name, bucket_metadata, 7358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi provider=url.scheme, fields=['id']) 7368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: # url.IsObject() 7378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if self.canned: 7388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi gsutil_api.PatchObjectMetadata( 7398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi url.bucket_name, url.object_name, apitools_messages.Object(), 7408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi provider=url.scheme, generation=url.generation, 7418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi canned_acl=self.acl_arg) 7428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: 7438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi object_acl = AclTranslation.JsonToMessage( 7448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.acl_arg, apitools_messages.ObjectAccessControl) 7458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi object_metadata = apitools_messages.Object(acl=object_acl) 7468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi gsutil_api.PatchObjectMetadata(url.bucket_name, url.object_name, 7478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi object_metadata, provider=url.scheme, 7488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi generation=url.generation) 7498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi except ArgumentException, e: 7508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise 7518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi except ServiceException, e: 7528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if self.continue_on_error: 7538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.everything_set_okay = False 7548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.logger.error(e) 7558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: 7568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise 7578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 7588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def SetAclCommandHelper(self, acl_func, acl_excep_handler): 7598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Sets ACLs on the self.args using the passed-in acl function. 7608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 7618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Args: 7628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi acl_func: ACL function to be passed to Apply. 7638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi acl_excep_handler: ACL exception handler to be passed to Apply. 7648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 7658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi acl_arg = self.args[0] 7668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi url_args = self.args[1:] 7678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Disallow multi-provider setacl requests, because there are differences in 7688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # the ACL models. 7698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if not UrlsAreForSingleProvider(url_args): 7708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise CommandException('"%s" command spanning providers not allowed.' % 7718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.command_name) 7728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 7738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Determine whether acl_arg names a file containing XML ACL text vs. the 7748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # string name of a canned ACL. 7758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if os.path.isfile(acl_arg): 7768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi with codecs.open(acl_arg, 'r', UTF8) as f: 7778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi acl_arg = f.read() 7788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.canned = False 7798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: 7808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # No file exists, so expect a canned ACL string. 7818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # validate=False because we allow wildcard urls. 7828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi storage_uri = boto.storage_uri( 7838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi url_args[0], debug=self.debug, validate=False, 7848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi bucket_storage_uri_class=self.bucket_storage_uri_class) 7858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 7868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi canned_acls = storage_uri.canned_acls() 7878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if acl_arg not in canned_acls: 7888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise CommandException('Invalid canned ACL "%s".' % acl_arg) 7898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.canned = True 7908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 7918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Used to track if any ACLs failed to be set. 7928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.everything_set_okay = True 7938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.acl_arg = acl_arg 7948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 7958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.ApplyAclFunc(acl_func, acl_excep_handler, url_args) 7968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if not self.everything_set_okay and not self.continue_on_error: 7978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise CommandException('ACLs for some objects could not be set.') 7988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 7998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def _WarnServiceAccounts(self): 8008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Warns service account users who have received an AccessDenied error. 8018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 8028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi When one of the metadata-related commands fails due to AccessDenied, user 8038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi must ensure that they are listed as an Owner in the API console. 8048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 8058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Import this here so that the value will be set first in 8068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # gcs_oauth2_boto_plugin. 8078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # pylint: disable=g-import-not-at-top 8088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi from gcs_oauth2_boto_plugin.oauth2_plugin import IS_SERVICE_ACCOUNT 8098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 8108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if IS_SERVICE_ACCOUNT: 8118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # This method is only called when canned ACLs are used, so the warning 8128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # definitely applies. 8138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.logger.warning('\n'.join(textwrap.wrap( 8148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'It appears that your service account has been denied access while ' 8158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'attempting to perform a metadata operation. If you believe that you ' 8168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'should have access to this metadata (i.e., if it is associated with ' 8178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'your account), please make sure that your service account''s email ' 8188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'address is listed as an Owner in the Team tab of the API console. ' 8198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'See "gsutil help creds" for further information.\n'))) 8208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 8218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def GetAndPrintAcl(self, url_str): 8228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Prints the standard or default object ACL depending on self.command_name. 8238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 8248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Args: 8258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi url_str: URL string to get ACL for. 8268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 8278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi blr = self.GetAclCommandBucketListingReference(url_str) 8288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi url = StorageUrlFromString(url_str) 8298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if (self.gsutil_api.GetApiSelector(url.scheme) == ApiSelector.XML 8308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi and url.scheme != 'gs'): 8318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Need to use XML passthrough. 8328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi try: 8338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi acl = self.gsutil_api.XmlPassThroughGetAcl( 8348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi url, def_obj_acl=self.def_acl, provider=url.scheme) 8358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi print acl.to_xml() 8368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi except AccessDeniedException, _: 8378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self._WarnServiceAccounts() 8388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise 8398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: 8408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if self.command_name == 'defacl': 8418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi acl = blr.root_object.defaultObjectAcl 8428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if not acl: 8438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.logger.warn( 8448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'No default object ACL present for %s. This could occur if ' 8458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'the default object ACL is private, in which case objects ' 8468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'created in this bucket will be readable only by their ' 8478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'creators. It could also mean you do not have OWNER permission ' 8488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'on %s and therefore do not have permission to read the ' 8498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'default object ACL.', url_str, url_str) 8508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: 8518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi acl = blr.root_object.acl 8528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if not acl: 8538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self._WarnServiceAccounts() 8548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise AccessDeniedException('Access denied. Please ensure you have ' 8558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'OWNER permission on %s.' % url_str) 8568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi print AclTranslation.JsonFromMessage(acl) 8578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 8588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def GetAclCommandBucketListingReference(self, url_str): 8598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Gets a single bucket listing reference for an acl get command. 8608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 8618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Args: 8628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi url_str: URL string to get the bucket listing reference for. 8638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 8648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Returns: 8658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi BucketListingReference for the URL string. 8668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 8678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Raises: 8688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi CommandException if string did not result in exactly one reference. 8698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 8708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # We're guaranteed by caller that we have the appropriate type of url 8718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # string for the call (ex. we will never be called with an object string 8728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # by getdefacl) 8738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi wildcard_url = StorageUrlFromString(url_str) 8748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if wildcard_url.IsObject(): 8758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi plurality_iter = PluralityCheckableIterator( 8768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.WildcardIterator(url_str).IterObjects( 8778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi bucket_listing_fields=['acl'])) 8788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: 8798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Bucket or provider. We call IterBuckets explicitly here to ensure that 8808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # the root object is populated with the acl. 8818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if self.command_name == 'defacl': 8828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi bucket_fields = ['defaultObjectAcl'] 8838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: 8848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi bucket_fields = ['acl'] 8858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi plurality_iter = PluralityCheckableIterator( 8868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.WildcardIterator(url_str).IterBuckets( 8878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi bucket_fields=bucket_fields)) 8888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if plurality_iter.IsEmpty(): 8898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise CommandException('No URLs matched') 8908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if plurality_iter.HasPlurality(): 8918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise CommandException( 8928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi '%s matched more than one URL, which is not allowed by the %s ' 8938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'command' % (url_str, self.command_name)) 8948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return list(plurality_iter)[0] 8958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 896cef7893435aa41160dd1255c43cb8498279738ccChris Craik def _HandleMultiProcessingSigs(self, signal_num, unused_cur_stack_frame): 8978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Handles signals INT AND TERM during a multi-process/multi-thread request. 8988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 8998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Kills subprocesses. 9008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 9018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Args: 9028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi unused_signal_num: signal generated by ^C. 9038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi unused_cur_stack_frame: Current stack frame. 9048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 9058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Note: This only works under Linux/MacOS. See 9068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # https://github.com/GoogleCloudPlatform/gsutil/issues/99 for details 9078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # about why making it work correctly across OS's is harder and still open. 9088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi ShutDownGsutil() 909cef7893435aa41160dd1255c43cb8498279738ccChris Craik if signal_num == signal.SIGINT: 910cef7893435aa41160dd1255c43cb8498279738ccChris Craik sys.stderr.write('Caught ^C - exiting\n') 9118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Simply calling sys.exit(1) doesn't work - see above bug for details. 9128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi KillProcess(os.getpid()) 9138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 9148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def GetSingleBucketUrlFromArg(self, arg, bucket_fields=None): 9158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Gets a single bucket URL based on the command arguments. 9168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 9178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Args: 9188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi arg: String argument to get bucket URL for. 9198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi bucket_fields: Fields to populate for the bucket. 9208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 9218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Returns: 9228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi (StorageUrl referring to a single bucket, Bucket metadata). 9238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 9248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Raises: 9258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi CommandException if args did not match exactly one bucket. 9268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 9278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi plurality_checkable_iterator = self.GetBucketUrlIterFromArg( 9288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi arg, bucket_fields=bucket_fields) 9298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if plurality_checkable_iterator.HasPlurality(): 9308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise CommandException( 9318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi '%s matched more than one URL, which is not\n' 9328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'allowed by the %s command' % (arg, self.command_name)) 9338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi blr = list(plurality_checkable_iterator)[0] 9348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return StorageUrlFromString(blr.url_string), blr.root_object 9358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 9368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def GetBucketUrlIterFromArg(self, arg, bucket_fields=None): 9378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Gets a single bucket URL based on the command arguments. 9388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 9398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Args: 9408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi arg: String argument to iterate over. 9418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi bucket_fields: Fields to populate for the bucket. 9428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 9438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Returns: 9448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi PluralityCheckableIterator over buckets. 9458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 9468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Raises: 9478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi CommandException if iterator matched no buckets. 9488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 9498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi arg_url = StorageUrlFromString(arg) 9508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if not arg_url.IsCloudUrl() or arg_url.IsObject(): 9518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise CommandException('"%s" command must specify a bucket' % 9528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.command_name) 9538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 9548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi plurality_checkable_iterator = PluralityCheckableIterator( 9558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.WildcardIterator(arg).IterBuckets( 9568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi bucket_fields=bucket_fields)) 9578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if plurality_checkable_iterator.IsEmpty(): 9588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise CommandException('No URLs matched') 9598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return plurality_checkable_iterator 9608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 9618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi ###################### 9628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Private functions. # 9638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi ###################### 9648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 9658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def _ResetConnectionPool(self): 9668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Each OS process needs to establish its own set of connections to 9678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # the server to avoid writes from different OS processes interleaving 9688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # onto the same socket (and garbling the underlying SSL session). 9698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # We ensure each process gets its own set of connections here by 9708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # closing all connections in the storage provider connection pool. 9718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi connection_pool = StorageUri.provider_pool 9728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if connection_pool: 9738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi for i in connection_pool: 9748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi connection_pool[i].connection.close() 9758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 9768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def _GetProcessAndThreadCount(self, process_count, thread_count, 9778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi parallel_operations_override): 9788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Determines the values of process_count and thread_count. 9798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 9808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi These values are used for parallel operations. 9818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi If we're not performing operations in parallel, then ignore 9828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi existing values and use process_count = thread_count = 1. 9838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 9848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Args: 9858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi process_count: A positive integer or None. In the latter case, we read 9868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi the value from the .boto config file. 9878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi thread_count: A positive integer or None. In the latter case, we read 9888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi the value from the .boto config file. 9898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi parallel_operations_override: Used to override self.parallel_operations. 9908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi This allows the caller to safely override 9918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi the top-level flag for a single call. 9928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 9938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Returns: 9948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi (process_count, thread_count): The number of processes and threads to use, 9958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi respectively. 9968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 9978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Set OS process and python thread count as a function of options 9988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # and config. 9998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if self.parallel_operations or parallel_operations_override: 10008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if not process_count: 10018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi process_count = boto.config.getint( 10028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'GSUtil', 'parallel_process_count', 10038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi gslib.commands.config.DEFAULT_PARALLEL_PROCESS_COUNT) 10048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if process_count < 1: 10058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise CommandException('Invalid parallel_process_count "%d".' % 10068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi process_count) 10078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if not thread_count: 10088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi thread_count = boto.config.getint( 10098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'GSUtil', 'parallel_thread_count', 10108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi gslib.commands.config.DEFAULT_PARALLEL_THREAD_COUNT) 10118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if thread_count < 1: 10128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise CommandException('Invalid parallel_thread_count "%d".' % 10138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi thread_count) 10148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: 10158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # If -m not specified, then assume 1 OS process and 1 Python thread. 10168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi process_count = 1 10178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi thread_count = 1 10188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 10198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if IS_WINDOWS and process_count > 1: 10208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise CommandException('\n'.join(textwrap.wrap( 10218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi ('It is not possible to set process_count > 1 on Windows. Please ' 10228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'update your config file (located at %s) and set ' 10238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi '"parallel_process_count = 1".') % 10248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi GetConfigFilePath()))) 10258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.logger.debug('process count: %d', process_count) 10268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.logger.debug('thread count: %d', thread_count) 10278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 10288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return (process_count, thread_count) 10298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 10308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def _SetUpPerCallerState(self): 10318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Set up the state for a caller id, corresponding to one Apply call.""" 1032cef7893435aa41160dd1255c43cb8498279738ccChris Craik # pylint: disable=global-variable-undefined,global-variable-not-assigned 1033cef7893435aa41160dd1255c43cb8498279738ccChris Craik # These variables are initialized in InitializeMultiprocessingVariables or 1034cef7893435aa41160dd1255c43cb8498279738ccChris Craik # InitializeThreadingVariables 1035cef7893435aa41160dd1255c43cb8498279738ccChris Craik global global_return_values_map, shared_vars_map, failure_count 1036cef7893435aa41160dd1255c43cb8498279738ccChris Craik global caller_id_finished_count, shared_vars_list_map, total_tasks 1037cef7893435aa41160dd1255c43cb8498279738ccChris Craik global need_pool_or_done_cond, call_completed_map, class_map 1038cef7893435aa41160dd1255c43cb8498279738ccChris Craik global task_queues, caller_id_lock, caller_id_counter 10398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Get a new caller ID. 10408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi with caller_id_lock: 1041cef7893435aa41160dd1255c43cb8498279738ccChris Craik if isinstance(caller_id_counter, int): 1042cef7893435aa41160dd1255c43cb8498279738ccChris Craik caller_id_counter += 1 1043cef7893435aa41160dd1255c43cb8498279738ccChris Craik caller_id = caller_id_counter 1044cef7893435aa41160dd1255c43cb8498279738ccChris Craik else: 1045cef7893435aa41160dd1255c43cb8498279738ccChris Craik caller_id_counter.value += 1 1046cef7893435aa41160dd1255c43cb8498279738ccChris Craik caller_id = caller_id_counter.value 10478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 10488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Create a copy of self with an incremented recursive level. This allows 10498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # the class to report its level correctly if the function called from it 10508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # also needs to call Apply. 10518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi cls = copy.copy(self) 10528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi cls.recursive_apply_level += 1 10538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 10548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Thread-safe loggers can't be pickled, so we will remove it here and 10558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # recreate it later in the WorkerThread. This is not a problem since any 10568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # logger with the same name will be treated as a singleton. 10578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi cls.logger = None 10588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 10598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Likewise, the default API connection can't be pickled, but it is unused 10608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # anyway as each thread gets its own API delegator. 10618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi cls.gsutil_api = None 10628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 10638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi class_map[caller_id] = cls 10648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi total_tasks[caller_id] = -1 # -1 => the producer hasn't finished yet. 10658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi call_completed_map[caller_id] = False 1066cef7893435aa41160dd1255c43cb8498279738ccChris Craik caller_id_finished_count[caller_id] = 0 1067cef7893435aa41160dd1255c43cb8498279738ccChris Craik global_return_values_map[caller_id] = [] 10688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return caller_id 10698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 10708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def _CreateNewConsumerPool(self, num_processes, num_threads): 10718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Create a new pool of processes that call _ApplyThreads.""" 10728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi processes = [] 10738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi task_queue = _NewMultiprocessingQueue() 10748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi task_queues.append(task_queue) 10758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 10768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi current_max_recursive_level.value += 1 10778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if current_max_recursive_level.value > MAX_RECURSIVE_DEPTH: 10788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise CommandException('Recursion depth of Apply calls is too great.') 10798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi for _ in range(num_processes): 10808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi recursive_apply_level = len(consumer_pools) 10818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi p = multiprocessing.Process( 10828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi target=self._ApplyThreads, 10838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi args=(num_threads, num_processes, recursive_apply_level)) 10848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi p.daemon = True 10858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi processes.append(p) 10868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi p.start() 10878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi consumer_pool = _ConsumerPool(processes, task_queue) 10888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi consumer_pools.append(consumer_pool) 10898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 10908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def Apply(self, func, args_iterator, exception_handler, 10918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi shared_attrs=None, arg_checker=_UrlArgChecker, 10928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi parallel_operations_override=False, process_count=None, 10938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi thread_count=None, should_return_results=False, 10948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi fail_on_error=False): 10958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Calls _Parallel/SequentialApply based on multiprocessing availability. 10968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 10978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Args: 10988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi func: Function to call to process each argument. 10998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi args_iterator: Iterable collection of arguments to be put into the 11008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi work queue. 11018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi exception_handler: Exception handler for WorkerThread class. 11028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi shared_attrs: List of attributes to manage across sub-processes. 11038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi arg_checker: Used to determine whether we should process the current 11048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi argument or simply skip it. Also handles any logging that 11058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi is specific to a particular type of argument. 11068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi parallel_operations_override: Used to override self.parallel_operations. 11078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi This allows the caller to safely override 11088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi the top-level flag for a single call. 11098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi process_count: The number of processes to use. If not specified, then 11108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi the configured default will be used. 11118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi thread_count: The number of threads per process. If not speficied, then 11128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi the configured default will be used.. 11138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi should_return_results: If true, then return the results of all successful 11148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi calls to func in a list. 11158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi fail_on_error: If true, then raise any exceptions encountered when 11168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi executing func. This is only applicable in the case of 11178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi process_count == thread_count == 1. 11188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 11198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Returns: 11208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Results from spawned threads. 11218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 11228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if shared_attrs: 11238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi original_shared_vars_values = {} # We'll add these back in at the end. 11248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi for name in shared_attrs: 11258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi original_shared_vars_values[name] = getattr(self, name) 11268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # By setting this to 0, we simplify the logic for computing deltas. 11278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # We'll add it back after all of the tasks have been performed. 11288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi setattr(self, name, 0) 11298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 11308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi (process_count, thread_count) = self._GetProcessAndThreadCount( 11318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi process_count, thread_count, parallel_operations_override) 11328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 11338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi is_main_thread = (self.recursive_apply_level == 0 11348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi and self.sequential_caller_id == -1) 11358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 11368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # We don't honor the fail_on_error flag in the case of multiple threads 11378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # or processes. 11388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi fail_on_error = fail_on_error and (process_count * thread_count == 1) 11398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 11408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Only check this from the first call in the main thread. Apart from the 11418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # fact that it's wasteful to try this multiple times in general, it also 11428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # will never work when called from a subprocess since we use daemon 11438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # processes, and daemons can't create other processes. 1144cef7893435aa41160dd1255c43cb8498279738ccChris Craik if (is_main_thread and not self.multiprocessing_is_available and 1145cef7893435aa41160dd1255c43cb8498279738ccChris Craik process_count > 1): 1146cef7893435aa41160dd1255c43cb8498279738ccChris Craik # Run the check again and log the appropriate warnings. This was run 1147cef7893435aa41160dd1255c43cb8498279738ccChris Craik # before, when the Command object was created, in order to calculate 1148cef7893435aa41160dd1255c43cb8498279738ccChris Craik # self.multiprocessing_is_available, but we don't want to print the 1149cef7893435aa41160dd1255c43cb8498279738ccChris Craik # warning until we're sure the user actually tried to use multiple 1150cef7893435aa41160dd1255c43cb8498279738ccChris Craik # threads or processes. 1151cef7893435aa41160dd1255c43cb8498279738ccChris Craik CheckMultiprocessingAvailableAndInit(logger=self.logger) 1152cef7893435aa41160dd1255c43cb8498279738ccChris Craik 1153cef7893435aa41160dd1255c43cb8498279738ccChris Craik caller_id = self._SetUpPerCallerState() 11548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 11558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # If any shared attributes passed by caller, create a dictionary of 11568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # shared memory variables for every element in the list of shared 11578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # attributes. 11588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if shared_attrs: 11598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi shared_vars_list_map[caller_id] = shared_attrs 11608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi for name in shared_attrs: 1161cef7893435aa41160dd1255c43cb8498279738ccChris Craik shared_vars_map[(caller_id, name)] = 0 11628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 11638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Make all of the requested function calls. 1164cef7893435aa41160dd1255c43cb8498279738ccChris Craik usable_processes_count = (process_count if self.multiprocessing_is_available 1165cef7893435aa41160dd1255c43cb8498279738ccChris Craik else 1) 1166cef7893435aa41160dd1255c43cb8498279738ccChris Craik if thread_count * usable_processes_count > 1: 11678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self._ParallelApply(func, args_iterator, exception_handler, caller_id, 1168cef7893435aa41160dd1255c43cb8498279738ccChris Craik arg_checker, usable_processes_count, thread_count, 11698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi should_return_results, fail_on_error) 11708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: 11718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self._SequentialApply(func, args_iterator, exception_handler, caller_id, 11728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi arg_checker, should_return_results, fail_on_error) 11738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 11748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if shared_attrs: 11758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi for name in shared_attrs: 11768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # This allows us to retain the original value of the shared variable, 11778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # and simply apply the delta after what was done during the call to 11788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # apply. 11798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi final_value = (original_shared_vars_values[name] + 1180cef7893435aa41160dd1255c43cb8498279738ccChris Craik shared_vars_map.get((caller_id, name))) 11818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi setattr(self, name, final_value) 11828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 11838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if should_return_results: 1184cef7893435aa41160dd1255c43cb8498279738ccChris Craik return global_return_values_map.get(caller_id) 11858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 11868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def _MaybeSuggestGsutilDashM(self): 11878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Outputs a sugestion to the user to use gsutil -m.""" 11888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if not (boto.config.getint('GSUtil', 'parallel_process_count', 0) == 1 and 11898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi boto.config.getint('GSUtil', 'parallel_thread_count', 0) == 1): 11908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.logger.info('\n' + textwrap.fill( 11918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi '==> NOTE: You are performing a sequence of gsutil operations that ' 11928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'may run significantly faster if you instead use gsutil -m %s ...\n' 11938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'Please see the -m section under "gsutil help options" for further ' 11948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'information about when gsutil -m can be advantageous.' 11958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi % sys.argv[1]) + '\n') 11968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 11978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # pylint: disable=g-doc-args 11988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def _SequentialApply(self, func, args_iterator, exception_handler, caller_id, 11998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi arg_checker, should_return_results, fail_on_error): 12008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Performs all function calls sequentially in the current thread. 12018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 12028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi No other threads or processes will be spawned. This degraded functionality 12038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi is used when the multiprocessing module is not available or the user 12048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi requests only one thread and one process. 12058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 12068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Create a WorkerThread to handle all of the logic needed to actually call 12078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # the function. Note that this thread will never be started, and all work 12088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # is done in the current thread. 12098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi worker_thread = WorkerThread(None, False) 12108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi args_iterator = iter(args_iterator) 12118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Count of sequential calls that have been made. Used for producing 12128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # suggestion to use gsutil -m. 12138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi sequential_call_count = 0 12148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi while True: 12158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 12168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Try to get the next argument, handling any exceptions that arise. 12178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi try: 12188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi args = args_iterator.next() 12198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi except StopIteration, e: 12208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi break 12218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi except Exception, e: # pylint: disable=broad-except 12228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi _IncrementFailureCount() 12238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if fail_on_error: 12248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise 12258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: 12268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi try: 12278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi exception_handler(self, e) 12288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi except Exception, _: # pylint: disable=broad-except 12298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.logger.debug( 12308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'Caught exception while handling exception for %s:\n%s', 12318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi func, traceback.format_exc()) 12328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi continue 12338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 12348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi sequential_call_count += 1 12358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if sequential_call_count == OFFER_GSUTIL_M_SUGGESTION_THRESHOLD: 12368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Output suggestion near beginning of run, so user sees it early and can 12378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # ^C and try gsutil -m. 12388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self._MaybeSuggestGsutilDashM() 12398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if arg_checker(self, args): 12408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Now that we actually have the next argument, perform the task. 12418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi task = Task(func, args, caller_id, exception_handler, 12428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi should_return_results, arg_checker, fail_on_error) 12438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi worker_thread.PerformTask(task, self) 12448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if sequential_call_count >= gslib.util.GetTermLines(): 12458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Output suggestion at end of long run, in case user missed it at the 12468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # start and it scrolled off-screen. 12478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self._MaybeSuggestGsutilDashM() 12488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1249cef7893435aa41160dd1255c43cb8498279738ccChris Craik # If the final iterated argument results in an exception, and that 1250cef7893435aa41160dd1255c43cb8498279738ccChris Craik # exception modifies shared_attrs, we need to publish the results. 1251cef7893435aa41160dd1255c43cb8498279738ccChris Craik worker_thread.shared_vars_updater.Update(caller_id, self) 1252cef7893435aa41160dd1255c43cb8498279738ccChris Craik 12538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # pylint: disable=g-doc-args 12548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def _ParallelApply(self, func, args_iterator, exception_handler, caller_id, 12558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi arg_checker, process_count, thread_count, 12568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi should_return_results, fail_on_error): 12578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Dispatches input arguments across a thread/process pool. 12588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 12598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Pools are composed of parallel OS processes and/or Python threads, 12608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi based on options (-m or not) and settings in the user's config file. 12618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 12628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi If only one OS process is requested/available, dispatch requests across 12638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi threads in the current OS process. 12648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 12658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi In the multi-process case, we will create one pool of worker processes for 12668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi each level of the tree of recursive calls to Apply. E.g., if A calls 12678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Apply(B), and B ultimately calls Apply(C) followed by Apply(D), then we 12688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi will only create two sets of worker processes - B will execute in the first, 12698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi and C and D will execute in the second. If C is then changed to call 12708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Apply(E) and D is changed to call Apply(F), then we will automatically 12718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi create a third set of processes (lazily, when needed) that will be used to 12728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi execute calls to E and F. This might look something like: 12738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 12748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Pool1 Executes: B 12758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi / \ 12768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Pool2 Executes: C D 12778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi / \ 12788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Pool3 Executes: E F 12798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 12808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Apply's parallelism is generally broken up into 4 cases: 12818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi - If process_count == thread_count == 1, then all tasks will be executed 12828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi by _SequentialApply. 12838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi - If process_count > 1 and thread_count == 1, then the main thread will 12848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi create a new pool of processes (if they don't already exist) and each of 12858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi those processes will execute the tasks in a single thread. 12868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi - If process_count == 1 and thread_count > 1, then this process will create 12878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi a new pool of threads to execute the tasks. 12888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi - If process_count > 1 and thread_count > 1, then the main thread will 12898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi create a new pool of processes (if they don't already exist) and each of 12908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi those processes will, upon creation, create a pool of threads to 12918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi execute the tasks. 12928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 12938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Args: 12948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi caller_id: The caller ID unique to this call to command.Apply. 12958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi See command.Apply for description of other arguments. 12968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 12978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi is_main_thread = self.recursive_apply_level == 0 12988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 12998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Catch SIGINT and SIGTERM under Linux/MacOs so we can do cleanup before 13008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # exiting. 13018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if not IS_WINDOWS and is_main_thread: 13028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Register as a final signal handler because this handler kills the 13038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # main gsutil process (so it must run last). 13048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi RegisterSignalHandler(signal.SIGINT, self._HandleMultiProcessingSigs, 13058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi is_final_handler=True) 13068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi RegisterSignalHandler(signal.SIGTERM, self._HandleMultiProcessingSigs, 13078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi is_final_handler=True) 13088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 13098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if not task_queues: 13108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # The process we create will need to access the next recursive level 13118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # of task queues if it makes a call to Apply, so we always keep around 13128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # one more queue than we know we need. OTOH, if we don't create a new 13138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # process, the existing process still needs a task queue to use. 1314cef7893435aa41160dd1255c43cb8498279738ccChris Craik if process_count > 1: 1315cef7893435aa41160dd1255c43cb8498279738ccChris Craik task_queues.append(_NewMultiprocessingQueue()) 1316cef7893435aa41160dd1255c43cb8498279738ccChris Craik else: 1317cef7893435aa41160dd1255c43cb8498279738ccChris Craik task_queues.append(_NewThreadsafeQueue()) 13188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 13198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if process_count > 1: # Handle process pool creation. 13208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Check whether this call will need a new set of workers. 13218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 13228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Each worker must acquire a shared lock before notifying the main thread 13238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # that it needs a new worker pool, so that at most one worker asks for 13248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # a new worker pool at once. 13258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi try: 13268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if not is_main_thread: 13278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi worker_checking_level_lock.acquire() 13288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if self.recursive_apply_level >= current_max_recursive_level.value: 13298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi with need_pool_or_done_cond: 13308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Only the main thread is allowed to create new processes - 13318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # otherwise, we will run into some Python bugs. 13328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if is_main_thread: 13338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self._CreateNewConsumerPool(process_count, thread_count) 13348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: 13358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Notify the main thread that we need a new consumer pool. 13368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi new_pool_needed.value = 1 13378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi need_pool_or_done_cond.notify_all() 13388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # The main thread will notify us when it finishes. 13398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi need_pool_or_done_cond.wait() 13408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi finally: 13418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if not is_main_thread: 13428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi worker_checking_level_lock.release() 13438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 13448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # If we're running in this process, create a separate task queue. Otherwise, 13458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # if Apply has already been called with process_count > 1, then there will 13468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # be consumer pools trying to use our processes. 13478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if process_count > 1: 13488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi task_queue = task_queues[self.recursive_apply_level] 1349cef7893435aa41160dd1255c43cb8498279738ccChris Craik elif self.multiprocessing_is_available: 13508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi task_queue = _NewMultiprocessingQueue() 1351cef7893435aa41160dd1255c43cb8498279738ccChris Craik else: 1352cef7893435aa41160dd1255c43cb8498279738ccChris Craik task_queue = _NewThreadsafeQueue() 13538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 13548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Kick off a producer thread to throw tasks in the global task queue. We 13558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # do this asynchronously so that the main thread can be free to create new 13568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # consumer pools when needed (otherwise, any thread with a task that needs 13578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # a new consumer pool must block until we're completely done producing; in 13588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # the worst case, every worker blocks on such a call and the producer fills 13598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # up the task queue before it finishes, so we block forever). 13608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi producer_thread = ProducerThread(copy.copy(self), args_iterator, caller_id, 13618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi func, task_queue, should_return_results, 13628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi exception_handler, arg_checker, 13638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi fail_on_error) 13648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 13658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if process_count > 1: 13668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Wait here until either: 13678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # 1. We're the main thread and someone needs a new consumer pool - in 13688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # which case we create one and continue waiting. 13698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # 2. Someone notifies us that all of the work we requested is done, in 13708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # which case we retrieve the results (if applicable) and stop 13718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # waiting. 13728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi while True: 13738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi with need_pool_or_done_cond: 13748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Either our call is done, or someone needs a new level of consumer 13758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # pools, or we the wakeup call was meant for someone else. It's 13768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # impossible for both conditions to be true, since the main thread is 13778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # blocked on any other ongoing calls to Apply, and a thread would not 13788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # ask for a new consumer pool unless it had more work to do. 13798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if call_completed_map[caller_id]: 13808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi break 13818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi elif is_main_thread and new_pool_needed.value: 13828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi new_pool_needed.value = 0 13838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self._CreateNewConsumerPool(process_count, thread_count) 13848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi need_pool_or_done_cond.notify_all() 13858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 13868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Note that we must check the above conditions before the wait() call; 13878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # otherwise, the notification can happen before we start waiting, in 13888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # which case we'll block forever. 13898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi need_pool_or_done_cond.wait() 13908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: # Using a single process. 13918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self._ApplyThreads(thread_count, process_count, 13928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.recursive_apply_level, 13938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi is_blocking_call=True, task_queue=task_queue) 13948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 13958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # We encountered an exception from the producer thread before any arguments 13968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # were enqueued, but it wouldn't have been propagated, so we'll now 13978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # explicitly raise it here. 13988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if producer_thread.unknown_exception: 13998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # pylint: disable=raising-bad-type 14008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise producer_thread.unknown_exception 14018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 14028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # We encountered an exception from the producer thread while iterating over 14038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # the arguments, so raise it here if we're meant to fail on error. 14048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if producer_thread.iterator_exception and fail_on_error: 14058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # pylint: disable=raising-bad-type 14068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise producer_thread.iterator_exception 14078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 14088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def _ApplyThreads(self, thread_count, process_count, recursive_apply_level, 14098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi is_blocking_call=False, task_queue=None): 14108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Assigns the work from the multi-process global task queue. 14118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 14128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Work is assigned to an individual process for later consumption either by 14138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi the WorkerThreads or (if thread_count == 1) this thread. 14148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 14158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Args: 14168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi thread_count: The number of threads used to perform the work. If 1, then 14178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi perform all work in this thread. 14188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi process_count: The number of processes used to perform the work. 14198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi recursive_apply_level: The depth in the tree of recursive calls to Apply 14208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi of this thread. 14218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi is_blocking_call: True iff the call to Apply is blocked on this call 14228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi (which is true iff process_count == 1), implying that 14238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi _ApplyThreads must behave as a blocking call. 14248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 14258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self._ResetConnectionPool() 14268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.recursive_apply_level = recursive_apply_level 14278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 14288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi task_queue = task_queue or task_queues[recursive_apply_level] 14298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1430cef7893435aa41160dd1255c43cb8498279738ccChris Craik # Ensure fairness across processes by filling our WorkerPool only with 1431cef7893435aa41160dd1255c43cb8498279738ccChris Craik # as many tasks as it has WorkerThreads. This semaphore is acquired each 1432cef7893435aa41160dd1255c43cb8498279738ccChris Craik # time that a task is retrieved from the queue and released each time 1433cef7893435aa41160dd1255c43cb8498279738ccChris Craik # a task is completed by a WorkerThread. 1434cef7893435aa41160dd1255c43cb8498279738ccChris Craik worker_semaphore = threading.BoundedSemaphore(thread_count) 1435cef7893435aa41160dd1255c43cb8498279738ccChris Craik 14368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi assert thread_count * process_count > 1, ( 14378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'Invalid state, calling command._ApplyThreads with only one thread ' 14388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'and process.') 1439cef7893435aa41160dd1255c43cb8498279738ccChris Craik # TODO: Presently, this pool gets recreated with each call to Apply. We 1440cef7893435aa41160dd1255c43cb8498279738ccChris Craik # should be able to do it just once, at process creation time. 14418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi worker_pool = WorkerPool( 1442cef7893435aa41160dd1255c43cb8498279738ccChris Craik thread_count, self.logger, worker_semaphore, 14438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi bucket_storage_uri_class=self.bucket_storage_uri_class, 14448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi gsutil_api_map=self.gsutil_api_map, debug=self.debug) 14458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 14468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi num_enqueued = 0 14478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi while True: 1448cef7893435aa41160dd1255c43cb8498279738ccChris Craik worker_semaphore.acquire() 14498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi task = task_queue.get() 14508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if task.args != ZERO_TASKS_TO_DO_ARGUMENT: 14518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # If we have no tasks to do and we're performing a blocking call, we 14528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # need a special signal to tell us to stop - otherwise, we block on 14538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # the call to task_queue.get() forever. 14548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi worker_pool.AddTask(task) 14558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi num_enqueued += 1 1456cef7893435aa41160dd1255c43cb8498279738ccChris Craik else: 1457cef7893435aa41160dd1255c43cb8498279738ccChris Craik # No tasks remain; don't block the semaphore on WorkerThread completion. 1458cef7893435aa41160dd1255c43cb8498279738ccChris Craik worker_semaphore.release() 14598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 14608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if is_blocking_call: 14618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi num_to_do = total_tasks[task.caller_id] 14628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # The producer thread won't enqueue the last task until after it has 14638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # updated total_tasks[caller_id], so we know that num_to_do < 0 implies 14648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # we will do this check again. 14658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if num_to_do >= 0 and num_enqueued == num_to_do: 14668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if thread_count == 1: 14678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return 14688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: 14698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi while True: 14708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi with need_pool_or_done_cond: 14718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if call_completed_map[task.caller_id]: 14728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # We need to check this first, in case the condition was 14738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # notified before we grabbed the lock. 14748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return 14758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi need_pool_or_done_cond.wait() 14768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 14778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 14788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# Below here lie classes and functions related to controlling the flow of tasks 14798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# between various threads and processes. 14808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 14818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 14828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiclass _ConsumerPool(object): 14838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 14848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def __init__(self, processes, task_queue): 14858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.processes = processes 14868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.task_queue = task_queue 14878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 14888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def ShutDown(self): 14898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi for process in self.processes: 14908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi KillProcess(process.pid) 14918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 14928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 14938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef KillProcess(pid): 14948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Make best effort to kill the given process. 14958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 14968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi We ignore all exceptions so a caller looping through a list of processes will 14978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi continue attempting to kill each, even if one encounters a problem. 14988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 14998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Args: 15008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi pid: The process ID. 15018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 15028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi try: 15038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # os.kill doesn't work in 2.X or 3.Y on Windows for any X < 7 or Y < 2. 15048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if IS_WINDOWS and ((2, 6) <= sys.version_info[:3] < (2, 7) or 15058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi (3, 0) <= sys.version_info[:3] < (3, 2)): 15068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi kernel32 = ctypes.windll.kernel32 15078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi handle = kernel32.OpenProcess(1, 0, pid) 15088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi kernel32.TerminateProcess(handle, 0) 15098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: 15108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi os.kill(pid, signal.SIGKILL) 15118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi except: # pylint: disable=bare-except 15128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi pass 15138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 15148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 15158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiclass Task(namedtuple('Task', ( 15168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'func args caller_id exception_handler should_return_results arg_checker ' 15178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'fail_on_error'))): 15188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Task class representing work to be completed. 15198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 15208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Args: 15218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi func: The function to be executed. 15228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi args: The arguments to func. 15238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi caller_id: The globally-unique caller ID corresponding to the Apply call. 15248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi exception_handler: The exception handler to use if the call to func fails. 15258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi should_return_results: True iff the results of this function should be 15268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi returned from the Apply call. 15278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi arg_checker: Used to determine whether we should process the current 15288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi argument or simply skip it. Also handles any logging that 15298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi is specific to a particular type of argument. 15308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi fail_on_error: If true, then raise any exceptions encountered when 15318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi executing func. This is only applicable in the case of 15328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi process_count == thread_count == 1. 15338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 15348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi pass 15358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 15368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 15378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiclass ProducerThread(threading.Thread): 15388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Thread used to enqueue work for other processes and threads.""" 15398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 15408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def __init__(self, cls, args_iterator, caller_id, func, task_queue, 15418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi should_return_results, exception_handler, arg_checker, 15428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi fail_on_error): 15438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Initializes the producer thread. 15448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 15458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Args: 15468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi cls: Instance of Command for which this ProducerThread was created. 15478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi args_iterator: Iterable collection of arguments to be put into the 15488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi work queue. 15498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi caller_id: Globally-unique caller ID corresponding to this call to Apply. 15508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi func: The function to be called on each element of args_iterator. 15518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi task_queue: The queue into which tasks will be put, to later be consumed 15528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi by Command._ApplyThreads. 15538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi should_return_results: True iff the results for this call to command.Apply 15548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi were requested. 15558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi exception_handler: The exception handler to use when errors are 15568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi encountered during calls to func. 15578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi arg_checker: Used to determine whether we should process the current 15588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi argument or simply skip it. Also handles any logging that 15598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi is specific to a particular type of argument. 15608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi fail_on_error: If true, then raise any exceptions encountered when 15618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi executing func. This is only applicable in the case of 15628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi process_count == thread_count == 1. 15638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 15648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi super(ProducerThread, self).__init__() 15658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.func = func 15668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.cls = cls 15678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.args_iterator = args_iterator 15688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.caller_id = caller_id 15698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.task_queue = task_queue 15708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.arg_checker = arg_checker 15718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.exception_handler = exception_handler 15728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.should_return_results = should_return_results 15738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.fail_on_error = fail_on_error 15748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.shared_variables_updater = _SharedVariablesUpdater() 15758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.daemon = True 15768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.unknown_exception = None 15778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.iterator_exception = None 15788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.start() 15798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 15808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def run(self): 15818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi num_tasks = 0 15828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi cur_task = None 15838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi last_task = None 15848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi try: 15858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi args_iterator = iter(self.args_iterator) 15868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi while True: 15878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi try: 15888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi args = args_iterator.next() 15898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi except StopIteration, e: 15908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi break 15918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi except Exception, e: # pylint: disable=broad-except 15928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi _IncrementFailureCount() 15938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if self.fail_on_error: 15948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.iterator_exception = e 15958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise 15968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: 15978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi try: 15988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.exception_handler(self.cls, e) 15998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi except Exception, _: # pylint: disable=broad-except 16008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.cls.logger.debug( 16018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'Caught exception while handling exception for %s:\n%s', 16028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.func, traceback.format_exc()) 16038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.shared_variables_updater.Update(self.caller_id, self.cls) 16048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi continue 16058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 16068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if self.arg_checker(self.cls, args): 16078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi num_tasks += 1 16088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi last_task = cur_task 16098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi cur_task = Task(self.func, args, self.caller_id, 16108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.exception_handler, self.should_return_results, 16118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.arg_checker, self.fail_on_error) 16128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if last_task: 16138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.task_queue.put(last_task) 16148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi except Exception, e: # pylint: disable=broad-except 16158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # This will also catch any exception raised due to an error in the 16168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # iterator when fail_on_error is set, so check that we failed for some 16178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # other reason before claiming that we had an unknown exception. 16188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if not self.iterator_exception: 16198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.unknown_exception = e 16208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi finally: 16218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # We need to make sure to update total_tasks[caller_id] before we enqueue 16228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # the last task. Otherwise, a worker can retrieve the last task and 16238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # complete it, then check total_tasks and determine that we're not done 16248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # producing all before we update total_tasks. This approach forces workers 16258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # to wait on the last task until after we've updated total_tasks. 16268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi total_tasks[self.caller_id] = num_tasks 16278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if not cur_task: 16288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # This happens if there were zero arguments to be put in the queue. 16298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi cur_task = Task(None, ZERO_TASKS_TO_DO_ARGUMENT, self.caller_id, 16308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi None, None, None, None) 16318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.task_queue.put(cur_task) 16328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 16338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # It's possible that the workers finished before we updated total_tasks, 16348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # so we need to check here as well. 16358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi _NotifyIfDone(self.caller_id, 1636cef7893435aa41160dd1255c43cb8498279738ccChris Craik caller_id_finished_count.get(self.caller_id)) 16378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 16388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 16398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiclass WorkerPool(object): 16408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Pool of worker threads to which tasks can be added.""" 16418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1642cef7893435aa41160dd1255c43cb8498279738ccChris Craik def __init__(self, thread_count, logger, worker_semaphore, 1643cef7893435aa41160dd1255c43cb8498279738ccChris Craik bucket_storage_uri_class=None, gsutil_api_map=None, debug=0): 16448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.task_queue = _NewThreadsafeQueue() 16458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.threads = [] 16468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi for _ in range(thread_count): 16478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi worker_thread = WorkerThread( 1648cef7893435aa41160dd1255c43cb8498279738ccChris Craik self.task_queue, logger, worker_semaphore=worker_semaphore, 16498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi bucket_storage_uri_class=bucket_storage_uri_class, 16508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi gsutil_api_map=gsutil_api_map, debug=debug) 16518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.threads.append(worker_thread) 16528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi worker_thread.start() 16538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 16548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def AddTask(self, task): 16558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.task_queue.put(task) 16568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 16578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 16588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiclass WorkerThread(threading.Thread): 16598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Thread where all the work will be performed. 16608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 16618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi This makes the function calls for Apply and takes care of all error handling, 16628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return value propagation, and shared_vars. 16638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 16648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Note that this thread is NOT started upon instantiation because the function- 16658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi calling logic is also used in the single-threaded case. 16668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 16678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1668cef7893435aa41160dd1255c43cb8498279738ccChris Craik def __init__(self, task_queue, logger, worker_semaphore=None, 1669cef7893435aa41160dd1255c43cb8498279738ccChris Craik bucket_storage_uri_class=None, gsutil_api_map=None, debug=0): 16708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Initializes the worker thread. 16718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 16728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Args: 16738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi task_queue: The thread-safe queue from which this thread should obtain 16748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi its work. 16758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi logger: Logger to use for this thread. 1676cef7893435aa41160dd1255c43cb8498279738ccChris Craik worker_semaphore: threading.BoundedSemaphore to be released each time a 1677cef7893435aa41160dd1255c43cb8498279738ccChris Craik task is completed, or None for single-threaded execution. 16788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi bucket_storage_uri_class: Class to instantiate for cloud StorageUris. 16798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Settable for testing/mocking. 16808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi gsutil_api_map: Map of providers and API selector tuples to api classes 16818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi which can be used to communicate with those providers. 16828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Used for the instantiating CloudApiDelegator class. 16838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi debug: debug level for the CloudApiDelegator class. 16848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 16858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi super(WorkerThread, self).__init__() 16868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.task_queue = task_queue 1687cef7893435aa41160dd1255c43cb8498279738ccChris Craik self.worker_semaphore = worker_semaphore 16888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.daemon = True 16898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.cached_classes = {} 16908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.shared_vars_updater = _SharedVariablesUpdater() 16918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 16928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.thread_gsutil_api = None 16938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if bucket_storage_uri_class and gsutil_api_map: 16948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.thread_gsutil_api = CloudApiDelegator( 16958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi bucket_storage_uri_class, gsutil_api_map, logger, debug=debug) 16968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 16978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def PerformTask(self, task, cls): 16988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Makes the function call for a task. 16998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 17008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Args: 17018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi task: The Task to perform. 17028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi cls: The instance of a class which gives context to the functions called 17038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi by the Task's function. E.g., see SetAclFuncWrapper. 17048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 17058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi caller_id = task.caller_id 17068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi try: 17078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi results = task.func(cls, task.args, thread_state=self.thread_gsutil_api) 17088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if task.should_return_results: 1709cef7893435aa41160dd1255c43cb8498279738ccChris Craik global_return_values_map.Increment(caller_id, [results], 1710cef7893435aa41160dd1255c43cb8498279738ccChris Craik default_value=[]) 17118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi except Exception, e: # pylint: disable=broad-except 17128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi _IncrementFailureCount() 17138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if task.fail_on_error: 17148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise # Only happens for single thread and process case. 17158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: 17168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi try: 17178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi task.exception_handler(cls, e) 17188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi except Exception, _: # pylint: disable=broad-except 17198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Don't allow callers to raise exceptions here and kill the worker 17208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # threads. 17218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi cls.logger.debug( 17228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 'Caught exception while handling exception for %s:\n%s', 17238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi task, traceback.format_exc()) 17248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi finally: 1725cef7893435aa41160dd1255c43cb8498279738ccChris Craik if self.worker_semaphore: 1726cef7893435aa41160dd1255c43cb8498279738ccChris Craik self.worker_semaphore.release() 17278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.shared_vars_updater.Update(caller_id, cls) 17288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 17298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Even if we encounter an exception, we still need to claim that that 17308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # the function finished executing. Otherwise, we won't know when to 17318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # stop waiting and return results. 1732cef7893435aa41160dd1255c43cb8498279738ccChris Craik num_done = caller_id_finished_count.Increment(caller_id, 1) 1733cef7893435aa41160dd1255c43cb8498279738ccChris Craik _NotifyIfDone(caller_id, num_done) 17348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 17358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def run(self): 17368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi while True: 17378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi task = self.task_queue.get() 17388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi caller_id = task.caller_id 17398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 17408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Get the instance of the command with the appropriate context. 17418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi cls = self.cached_classes.get(caller_id, None) 17428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if not cls: 17438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi cls = copy.copy(class_map[caller_id]) 17448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi cls.logger = CreateGsutilLogger(cls.command_name) 17458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.cached_classes[caller_id] = cls 17468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 17478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.PerformTask(task, cls) 17488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 17498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 17508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiclass _SharedVariablesUpdater(object): 17518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Used to update shared variable for a class in the global map. 17528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 17538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Note that each thread will have its own instance of the calling class for 17548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi context, and it will also have its own instance of a 17558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi _SharedVariablesUpdater. This is used in the following way: 17568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 17578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1. Before any tasks are performed, each thread will get a copy of the 17588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi calling class, and the globally-consistent value of this shared variable 17598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi will be initialized to whatever it was before the call to Apply began. 17608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 17618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 2. After each time a thread performs a task, it will look at the current 17628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi values of the shared variables in its instance of the calling class. 17638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 17648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 2.A. For each such variable, it computes the delta of this variable 17658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi between the last known value for this class (which is stored in 17668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi a dict local to this class) and the current value of the variable 17678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi in the class. 17688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 17698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 2.B. Using this delta, we update the last known value locally as well 17708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi as the globally-consistent value shared across all classes (the 17718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi globally consistent value is simply increased by the computed 17728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi delta). 17738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 17748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 17758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def __init__(self): 17768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.last_shared_var_values = {} 17778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 17788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def Update(self, caller_id, cls): 17798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Update any shared variables with their deltas.""" 17808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi shared_vars = shared_vars_list_map.get(caller_id, None) 17818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if shared_vars: 17828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi for name in shared_vars: 17838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi key = (caller_id, name) 17848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi last_value = self.last_shared_var_values.get(key, 0) 17858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Compute the change made since the last time we updated here. This is 17868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # calculated by simply subtracting the last known value from the current 17878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # value in the class instance. 17888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi delta = getattr(cls, name) - last_value 17898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.last_shared_var_values[key] = delta + last_value 17908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 17918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Update the globally-consistent value by simply increasing it by the 17928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # computed delta. 1793cef7893435aa41160dd1255c43cb8498279738ccChris Craik shared_vars_map.Increment(key, delta) 17948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 17958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 17968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef _NotifyIfDone(caller_id, num_done): 17978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Notify any threads waiting for results that something has finished. 17988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 17998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Each waiting thread will then need to check the call_completed_map to see if 18008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi its work is done. 18018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 18028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Note that num_done could be calculated here, but it is passed in as an 18038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi optimization so that we have one less call to a globally-locked data 18048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi structure. 18058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 18068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Args: 18078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi caller_id: The caller_id of the function whose progress we're checking. 18088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi num_done: The number of tasks currently completed for that caller_id. 18098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """ 18108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi num_to_do = total_tasks[caller_id] 18118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if num_to_do == num_done and num_to_do >= 0: 18128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Notify the Apply call that's sleeping that it's ready to return. 18138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi with need_pool_or_done_cond: 18148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi call_completed_map[caller_id] = True 18158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi need_pool_or_done_cond.notify_all() 18168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 18178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 18188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef ShutDownGsutil(): 18198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Shut down all processes in consumer pools in preparation for exiting.""" 18208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi for q in queues: 18218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi try: 18228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi q.cancel_join_thread() 18238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi except: # pylint: disable=bare-except 18248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi pass 18258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi for consumer_pool in consumer_pools: 18268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi consumer_pool.ShutDown() 18278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 18288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 18298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# pylint: disable=global-variable-undefined 18308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef _IncrementFailureCount(): 18318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi global failure_count 18328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if isinstance(failure_count, int): 18338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi failure_count += 1 18348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: # Otherwise it's a multiprocessing.Value() of type 'i'. 18358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi failure_count.value += 1 18368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 18378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 18388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# pylint: disable=global-variable-undefined 18398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef GetFailureCount(): 18408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Returns the number of failures processed during calls to Apply().""" 18418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi try: 18428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if isinstance(failure_count, int): 18438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return failure_count 18448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: # It's a multiprocessing.Value() of type 'i'. 18458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return failure_count.value 18468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi except NameError: # If it wasn't initialized, Apply() wasn't called. 18478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return 0 18488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 18498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 18508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef ResetFailureCount(): 18518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi """Resets the failure_count variable to 0 - useful if error is expected.""" 18528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi try: 18538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi global failure_count 18548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if isinstance(failure_count, int): 18558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi failure_count = 0 18568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: # It's a multiprocessing.Value() of type 'i'. 18578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi failure_count = multiprocessing.Value('i', 0) 18588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi except NameError: # If it wasn't initialized, Apply() wasn't called. 18598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi pass 1860