18d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# -*- coding: utf-8 -*-
28d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# Copyright 2010 Google Inc. All Rights Reserved.
38d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi#
48d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# Licensed under the Apache License, Version 2.0 (the "License");
58d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# you may not use this file except in compliance with the License.
68d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# You may obtain a copy of the License at
78d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi#
88d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi#     http://www.apache.org/licenses/LICENSE-2.0
98d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi#
108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# Unless required by applicable law or agreed to in writing, software
118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# distributed under the License is distributed on an "AS IS" BASIS,
128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# See the License for the specific language governing permissions and
148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# limitations under the License.
158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi"""Base class for gsutil commands.
168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi KandoiIn addition to base class code, this file contains helpers that depend on base
188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiclass state (such as GetAndPrintAcl) In general, functions that depend on
198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiclass state and that are used by multiple commands belong in this file.
208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi KandoiFunctions that don't depend on class state belong in util.py, and non-shared
218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoihelpers belong in individual subclasses.
228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi"""
238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom __future__ import absolute_import
258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport codecs
278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom collections import namedtuple
288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport copy
298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport getopt
308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport logging
318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport multiprocessing
328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport os
338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport Queue
348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport signal
358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport sys
368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport textwrap
378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport threading
388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport traceback
398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport boto
418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom boto.storage_uri import StorageUri
428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport gslib
438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.cloud_api import AccessDeniedException
448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.cloud_api import ArgumentException
458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.cloud_api import ServiceException
468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.cloud_api_delegator import CloudApiDelegator
478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.cs_api_map import ApiSelector
488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.cs_api_map import GsutilApiMapFactory
498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.exception import CommandException
508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.help_provider import HelpProvider
518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.name_expansion import NameExpansionIterator
528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.name_expansion import NameExpansionResult
53cef7893435aa41160dd1255c43cb8498279738ccChris Craikfrom gslib.parallelism_framework_util import AtomicDict
548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.plurality_checkable_iterator import PluralityCheckableIterator
558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.sig_handling import RegisterSignalHandler
568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.storage_url import StorageUrlFromString
578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.translation_helper import AclTranslation
59cef7893435aa41160dd1255c43cb8498279738ccChris Craikfrom gslib.translation_helper import PRIVATE_DEFAULT_OBJ_ACL
60cef7893435aa41160dd1255c43cb8498279738ccChris Craikfrom gslib.util import CheckMultiprocessingAvailableAndInit
618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.util import GetConfigFilePath
628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.util import GsutilStreamHandler
638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.util import HaveFileUrls
648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.util import HaveProviderUrls
658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.util import IS_WINDOWS
668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.util import NO_MAX
678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.util import UrlsAreForSingleProvider
688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.util import UTF8
698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom gslib.wildcard_iterator import CreateWildcardIterator
708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi KandoiOFFER_GSUTIL_M_SUGGESTION_THRESHOLD = 5
728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiif IS_WINDOWS:
748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  import ctypes  # pylint: disable=g-import-not-at-top
758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef _DefaultExceptionHandler(cls, e):
788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  cls.logger.exception(e)
798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef CreateGsutilLogger(command_name):
828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  """Creates a logger that resembles 'print' output.
838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  This logger abides by gsutil -d/-D/-DD/-q options.
858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  By default (if none of the above options is specified) the logger will display
878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  all messages logged with level INFO or above. Log propagation is disabled.
888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  Args:
908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    command_name: Command name to create logger for.
918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  Returns:
938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    A logger object.
948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  """
958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  log = logging.getLogger(command_name)
968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  log.propagate = False
978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  log.setLevel(logging.root.level)
988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  log_handler = GsutilStreamHandler()
998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  log_handler.setFormatter(logging.Formatter('%(message)s'))
1008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # Commands that call other commands (like mv) would cause log handlers to be
1018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # added more than once, so avoid adding if one is already present.
1028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  if not log.handlers:
1038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    log.addHandler(log_handler)
1048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  return log
1058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef _UrlArgChecker(command_instance, url):
1088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  if not command_instance.exclude_symlinks:
1098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    return True
1108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  exp_src_url = url.expanded_storage_url
1118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  if exp_src_url.IsFileUrl() and os.path.islink(exp_src_url.object_name):
1128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    command_instance.logger.info('Skipping symbolic link %s...', exp_src_url)
1138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    return False
1148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  return True
1158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef DummyArgChecker(*unused_args):
1188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  return True
1198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef SetAclFuncWrapper(cls, name_expansion_result, thread_state=None):
1228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  return cls.SetAclFunc(name_expansion_result, thread_state=thread_state)
1238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef SetAclExceptionHandler(cls, e):
1268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  """Exception handler that maintains state about post-completion status."""
1278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  cls.logger.error(str(e))
1288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  cls.everything_set_okay = False
1298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# We will keep this list of all thread- or process-safe queues ever created by
1318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# the main thread so that we can forcefully kill them upon shutdown. Otherwise,
1328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# we encounter a Python bug in which empty queues block forever on join (which
1338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# is called as part of the Python exit function cleanup) under the impression
1348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# that they are non-empty.
1358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# However, this also lets us shut down somewhat more cleanly when interrupted.
1368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiqueues = []
1378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef _NewMultiprocessingQueue():
1408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  queue = multiprocessing.Queue(MAX_QUEUE_SIZE)
1418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  queues.append(queue)
1428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  return queue
1438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef _NewThreadsafeQueue():
1468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  queue = Queue.Queue(MAX_QUEUE_SIZE)
1478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  queues.append(queue)
1488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  return queue
1498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# The maximum size of a process- or thread-safe queue. Imposing this limit
1518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# prevents us from needing to hold an arbitrary amount of data in memory.
1528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# However, setting this number too high (e.g., >= 32768 on OS X) can cause
1538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# problems on some operating systems.
1548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi KandoiMAX_QUEUE_SIZE = 32500
1558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# That maximum depth of the tree of recursive calls to command.Apply. This is
1578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# an arbitrary limit put in place to prevent developers from accidentally
1588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# causing problems with infinite recursion, and it can be increased if needed.
1598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi KandoiMAX_RECURSIVE_DEPTH = 5
1608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi KandoiZERO_TASKS_TO_DO_ARGUMENT = ('There were no', 'tasks to do')
1628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# Map from deprecated aliases to the current command and subcommands that
1648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# provide the same behavior.
1658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# TODO: Remove this map and deprecate old commands on 9/9/14.
1668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi KandoiOLD_ALIAS_MAP = {'chacl': ['acl', 'ch'],
1678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                 'getacl': ['acl', 'get'],
1688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                 'setacl': ['acl', 'set'],
1698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                 'getcors': ['cors', 'get'],
1708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                 'setcors': ['cors', 'set'],
1718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                 'chdefacl': ['defacl', 'ch'],
1728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                 'getdefacl': ['defacl', 'get'],
1738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                 'setdefacl': ['defacl', 'set'],
1748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                 'disablelogging': ['logging', 'set', 'off'],
1758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                 'enablelogging': ['logging', 'set', 'on'],
1768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                 'getlogging': ['logging', 'get'],
1778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                 'getversioning': ['versioning', 'get'],
1788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                 'setversioning': ['versioning', 'set'],
1798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                 'getwebcfg': ['web', 'get'],
1808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                 'setwebcfg': ['web', 'set']}
1818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# Declare all of the module level variables - see
1848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# InitializeMultiprocessingVariables for an explanation of why this is
1858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# necessary.
1868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# pylint: disable=global-at-module-level
1878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiglobal manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter
1888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiglobal total_tasks, call_completed_map, global_return_values_map
1898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiglobal need_pool_or_done_cond, caller_id_finished_count, new_pool_needed
1908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiglobal current_max_recursive_level, shared_vars_map, shared_vars_list_map
1918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiglobal class_map, worker_checking_level_lock, failure_count
1928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef InitializeMultiprocessingVariables():
1958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  """Initializes module-level variables that will be inherited by subprocesses.
1968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  On Windows, a multiprocessing.Manager object should only
1988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  be created within an "if __name__ == '__main__':" block. This function
1998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  must be called, otherwise every command that calls Command.Apply will fail.
2008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  """
2018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # This list of global variables must exactly match the above list of
2028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # declarations.
2038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # pylint: disable=global-variable-undefined
2048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  global manager, consumer_pools, task_queues, caller_id_lock, caller_id_counter
2058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  global total_tasks, call_completed_map, global_return_values_map
2068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  global need_pool_or_done_cond, caller_id_finished_count, new_pool_needed
2078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  global current_max_recursive_level, shared_vars_map, shared_vars_list_map
2088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  global class_map, worker_checking_level_lock, failure_count
2098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
2108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  manager = multiprocessing.Manager()
2118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
2128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  consumer_pools = []
2138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
2148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # List of all existing task queues - used by all pools to find the queue
2158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # that's appropriate for the given recursive_apply_level.
2168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  task_queues = []
2178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
2188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # Used to assign a globally unique caller ID to each Apply call.
2198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  caller_id_lock = manager.Lock()
2208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  caller_id_counter = multiprocessing.Value('i', 0)
2218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
2228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # Map from caller_id to total number of tasks to be completed for that ID.
223cef7893435aa41160dd1255c43cb8498279738ccChris Craik  total_tasks = AtomicDict(manager=manager)
2248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
2258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # Map from caller_id to a boolean which is True iff all its tasks are
2268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # finished.
227cef7893435aa41160dd1255c43cb8498279738ccChris Craik  call_completed_map = AtomicDict(manager=manager)
2288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
2298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # Used to keep track of the set of return values for each caller ID.
230cef7893435aa41160dd1255c43cb8498279738ccChris Craik  global_return_values_map = AtomicDict(manager=manager)
2318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
2328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # Condition used to notify any waiting threads that a task has finished or
2338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # that a call to Apply needs a new set of consumer processes.
2348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  need_pool_or_done_cond = manager.Condition()
2358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
2368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # Lock used to prevent multiple worker processes from asking the main thread
2378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # to create a new consumer pool for the same level.
2388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  worker_checking_level_lock = manager.Lock()
2398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
2408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # Map from caller_id to the current number of completed tasks for that ID.
241cef7893435aa41160dd1255c43cb8498279738ccChris Craik  caller_id_finished_count = AtomicDict(manager=manager)
2428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
2438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # Used as a way for the main thread to distinguish between being woken up
2448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # by another call finishing and being woken up by a call that needs a new set
2458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # of consumer processes.
2468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  new_pool_needed = multiprocessing.Value('i', 0)
2478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
2488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  current_max_recursive_level = multiprocessing.Value('i', 0)
2498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
2508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # Map from (caller_id, name) to the value of that shared variable.
251cef7893435aa41160dd1255c43cb8498279738ccChris Craik  shared_vars_map = AtomicDict(manager=manager)
252cef7893435aa41160dd1255c43cb8498279738ccChris Craik  shared_vars_list_map = AtomicDict(manager=manager)
2538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
2548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # Map from caller_id to calling class.
2558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  class_map = manager.dict()
2568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
2578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # Number of tasks that resulted in an exception in calls to Apply().
2588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  failure_count = multiprocessing.Value('i', 0)
2598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
2608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
261cef7893435aa41160dd1255c43cb8498279738ccChris Craikdef InitializeThreadingVariables():
262cef7893435aa41160dd1255c43cb8498279738ccChris Craik  """Initializes module-level variables used when running multi-threaded.
263cef7893435aa41160dd1255c43cb8498279738ccChris Craik
264cef7893435aa41160dd1255c43cb8498279738ccChris Craik  When multiprocessing is not available (or on Windows where only 1 process
265cef7893435aa41160dd1255c43cb8498279738ccChris Craik  is used), thread-safe analogs to the multiprocessing global variables
266cef7893435aa41160dd1255c43cb8498279738ccChris Craik  must be initialized. This function is the thread-safe analog to
267cef7893435aa41160dd1255c43cb8498279738ccChris Craik  InitializeMultiprocessingVariables.
268cef7893435aa41160dd1255c43cb8498279738ccChris Craik  """
269cef7893435aa41160dd1255c43cb8498279738ccChris Craik  # pylint: disable=global-variable-undefined
270cef7893435aa41160dd1255c43cb8498279738ccChris Craik  global global_return_values_map, shared_vars_map, failure_count
271cef7893435aa41160dd1255c43cb8498279738ccChris Craik  global caller_id_finished_count, shared_vars_list_map, total_tasks
272cef7893435aa41160dd1255c43cb8498279738ccChris Craik  global need_pool_or_done_cond, call_completed_map, class_map
273cef7893435aa41160dd1255c43cb8498279738ccChris Craik  global task_queues, caller_id_lock, caller_id_counter
274cef7893435aa41160dd1255c43cb8498279738ccChris Craik  caller_id_counter = 0
275cef7893435aa41160dd1255c43cb8498279738ccChris Craik  caller_id_finished_count = AtomicDict()
276cef7893435aa41160dd1255c43cb8498279738ccChris Craik  caller_id_lock = threading.Lock()
277cef7893435aa41160dd1255c43cb8498279738ccChris Craik  call_completed_map = AtomicDict()
278cef7893435aa41160dd1255c43cb8498279738ccChris Craik  class_map = AtomicDict()
279cef7893435aa41160dd1255c43cb8498279738ccChris Craik  failure_count = 0
280cef7893435aa41160dd1255c43cb8498279738ccChris Craik  global_return_values_map = AtomicDict()
281cef7893435aa41160dd1255c43cb8498279738ccChris Craik  need_pool_or_done_cond = threading.Condition()
282cef7893435aa41160dd1255c43cb8498279738ccChris Craik  shared_vars_list_map = AtomicDict()
283cef7893435aa41160dd1255c43cb8498279738ccChris Craik  shared_vars_map = AtomicDict()
284cef7893435aa41160dd1255c43cb8498279738ccChris Craik  task_queues = []
285cef7893435aa41160dd1255c43cb8498279738ccChris Craik  total_tasks = AtomicDict()
286cef7893435aa41160dd1255c43cb8498279738ccChris Craik
287cef7893435aa41160dd1255c43cb8498279738ccChris Craik
2888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# Each subclass of Command must define a property named 'command_spec' that is
2898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# an instance of the following class.
2908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi KandoiCommandSpec = namedtuple('CommandSpec', [
2918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Name of command.
2928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    'command_name',
2938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Usage synopsis.
2948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    'usage_synopsis',
2958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # List of command name aliases.
2968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    'command_name_aliases',
2978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Min number of args required by this command.
2988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    'min_args',
2998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Max number of args required by this command, or NO_MAX.
3008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    'max_args',
3018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Getopt-style string specifying acceptable sub args.
3028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    'supported_sub_args',
3038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # True if file URLs are acceptable for this command.
3048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    'file_url_ok',
3058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # True if provider-only URLs are acceptable for this command.
3068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    'provider_url_ok',
3078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Index in args of first URL arg.
3088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    'urls_start_arg',
3098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # List of supported APIs
3108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    'gs_api_support',
3118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Default API to use for this command
3128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    'gs_default_api',
3138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Private arguments (for internal testing)
3148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    'supported_private_args',
3158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    'argparse_arguments',
3168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi])
3178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
3188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
3198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiclass Command(HelpProvider):
3208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  """Base class for all gsutil commands."""
3218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
3228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # Each subclass must override this with an instance of CommandSpec.
3238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  command_spec = None
3248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
3258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  _commands_with_subcommands_and_subopts = ['acl', 'defacl', 'logging', 'web',
3268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                                            'notification']
3278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
3288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # This keeps track of the recursive depth of the current call to Apply.
3298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  recursive_apply_level = 0
3308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
3318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # If the multiprocessing module isn't available, we'll use this to keep track
3328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # of the caller_id.
3338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  sequential_caller_id = -1
3348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
3358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  @staticmethod
3368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def CreateCommandSpec(command_name, usage_synopsis=None,
3378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                        command_name_aliases=None, min_args=0,
3388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                        max_args=NO_MAX, supported_sub_args='',
3398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                        file_url_ok=False, provider_url_ok=False,
3408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                        urls_start_arg=0, gs_api_support=None,
3418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                        gs_default_api=None, supported_private_args=None,
3428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                        argparse_arguments=None):
3438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Creates an instance of CommandSpec, with defaults."""
3448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    return CommandSpec(
3458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        command_name=command_name,
3468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        usage_synopsis=usage_synopsis,
3478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        command_name_aliases=command_name_aliases or [],
3488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        min_args=min_args,
3498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        max_args=max_args,
3508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        supported_sub_args=supported_sub_args,
3518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        file_url_ok=file_url_ok,
3528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        provider_url_ok=provider_url_ok,
3538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        urls_start_arg=urls_start_arg,
3548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        gs_api_support=gs_api_support or [ApiSelector.XML],
3558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        gs_default_api=gs_default_api or ApiSelector.XML,
3568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        supported_private_args=supported_private_args,
3578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        argparse_arguments=argparse_arguments or [])
3588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
3598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # Define a convenience property for command name, since it's used many places.
3608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def _GetDefaultCommandName(self):
3618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    return self.command_spec.command_name
3628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  command_name = property(_GetDefaultCommandName)
3638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
3648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def _CalculateUrlsStartArg(self):
3658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Calculate the index in args of the first URL arg.
3668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
3678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Returns:
3688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      Index of the first URL arg (according to the command spec).
3698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """
3708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    return self.command_spec.urls_start_arg
3718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
3728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def _TranslateDeprecatedAliases(self, args):
3738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Map deprecated aliases to the corresponding new command, and warn."""
3748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    new_command_args = OLD_ALIAS_MAP.get(self.command_alias_used, None)
3758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if new_command_args:
3768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # Prepend any subcommands for the new command. The command name itself
3778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # is not part of the args, so leave it out.
3788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      args = new_command_args[1:] + args
3798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      self.logger.warn('\n'.join(textwrap.wrap(
3808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          ('You are using a deprecated alias, "%(used_alias)s", for the '
3818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi           '"%(command_name)s" command. This will stop working on 9/9/2014. '
3828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi           'Please use "%(command_name)s" with the appropriate sub-command in '
3838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi           'the future. See "gsutil help %(command_name)s" for details.') %
3848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          {'used_alias': self.command_alias_used,
3858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi           'command_name': self.command_name})))
3868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    return args
3878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
388cef7893435aa41160dd1255c43cb8498279738ccChris Craik  def __init__(self, command_runner, args, headers, debug, trace_token,
389cef7893435aa41160dd1255c43cb8498279738ccChris Craik               parallel_operations, bucket_storage_uri_class,
390cef7893435aa41160dd1255c43cb8498279738ccChris Craik               gsutil_api_class_map_factory, logging_filters=None,
3918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi               command_alias_used=None):
3928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Instantiates a Command.
3938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
3948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Args:
3958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      command_runner: CommandRunner (for commands built atop other commands).
3968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      args: Command-line args (arg0 = actual arg, not command name ala bash).
3978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      headers: Dictionary containing optional HTTP headers to pass to boto.
3988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      debug: Debug level to pass in to boto connection (range 0..3).
399cef7893435aa41160dd1255c43cb8498279738ccChris Craik      trace_token: Trace token to pass to the API implementation.
4008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      parallel_operations: Should command operations be executed in parallel?
4018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
4028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                                Settable for testing/mocking.
4038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      gsutil_api_class_map_factory: Creates map of cloud storage interfaces.
4048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                                    Settable for testing/mocking.
405cef7893435aa41160dd1255c43cb8498279738ccChris Craik      logging_filters: Optional list of logging. Filters to apply to this
4068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                       command's logger.
4078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      command_alias_used: The alias that was actually used when running this
4088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                          command (as opposed to the "official" command name,
4098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                          which will always correspond to the file name).
4108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
4118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Implementation note: subclasses shouldn't need to define an __init__
4128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    method, and instead depend on the shared initialization that happens
4138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    here. If you do define an __init__ method in a subclass you'll need to
4148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    explicitly call super().__init__(). But you're encouraged not to do this,
4158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    because it will make changing the __init__ interface more painful.
4168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """
4178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Save class values from constructor params.
4188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.command_runner = command_runner
4198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.unparsed_args = args
4208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.headers = headers
4218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.debug = debug
422cef7893435aa41160dd1255c43cb8498279738ccChris Craik    self.trace_token = trace_token
4238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.parallel_operations = parallel_operations
4248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.bucket_storage_uri_class = bucket_storage_uri_class
4258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.gsutil_api_class_map_factory = gsutil_api_class_map_factory
4268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.exclude_symlinks = False
4278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.recursion_requested = False
4288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.all_versions = False
4298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.command_alias_used = command_alias_used
4308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
4318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Global instance of a threaded logger object.
4328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.logger = CreateGsutilLogger(self.command_name)
4338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if logging_filters:
4348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      for log_filter in logging_filters:
4358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.logger.addFilter(log_filter)
4368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
4378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if self.command_spec is None:
4388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      raise CommandException('"%s" command implementation is missing a '
4398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                             'command_spec definition.' % self.command_name)
4408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
4418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Parse and validate args.
4428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.args = self._TranslateDeprecatedAliases(args)
4438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.ParseSubOpts()
4448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
4458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Named tuple public functions start with _
4468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # pylint: disable=protected-access
4478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.command_spec = self.command_spec._replace(
4488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        urls_start_arg=self._CalculateUrlsStartArg())
4498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
4508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if (len(self.args) < self.command_spec.min_args
4518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        or len(self.args) > self.command_spec.max_args):
4528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      self.RaiseWrongNumberOfArgumentsException()
4538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
4548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if self.command_name not in self._commands_with_subcommands_and_subopts:
4558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      self.CheckArguments()
4568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
4578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Build the support and default maps from the command spec.
4588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    support_map = {
4598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        'gs': self.command_spec.gs_api_support,
4608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        's3': [ApiSelector.XML]
4618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    }
4628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    default_map = {
4638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        'gs': self.command_spec.gs_default_api,
4648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        's3': ApiSelector.XML
4658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    }
4668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.gsutil_api_map = GsutilApiMapFactory.GetApiMap(
4678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.gsutil_api_class_map_factory, support_map, default_map)
4688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
4698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.project_id = None
4708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.gsutil_api = CloudApiDelegator(
4718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        bucket_storage_uri_class, self.gsutil_api_map,
472cef7893435aa41160dd1255c43cb8498279738ccChris Craik        self.logger, debug=self.debug, trace_token=self.trace_token)
4738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
4748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Cross-platform path to run gsutil binary.
4758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.gsutil_cmd = ''
4768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # If running on Windows, invoke python interpreter explicitly.
4778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if gslib.util.IS_WINDOWS:
4788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      self.gsutil_cmd += 'python '
4798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Add full path to gsutil to make sure we test the correct version.
4808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.gsutil_path = gslib.GSUTIL_PATH
4818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.gsutil_cmd += self.gsutil_path
4828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
4838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # We're treating recursion_requested like it's used by all commands, but
4848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # only some of the commands accept the -R option.
4858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if self.sub_opts:
4868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      for o, unused_a in self.sub_opts:
4878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        if o == '-r' or o == '-R':
4888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          self.recursion_requested = True
4898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          break
4908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
491cef7893435aa41160dd1255c43cb8498279738ccChris Craik    self.multiprocessing_is_available = (
492cef7893435aa41160dd1255c43cb8498279738ccChris Craik        CheckMultiprocessingAvailableAndInit().is_available)
4938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
4948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def RaiseWrongNumberOfArgumentsException(self):
4958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Raises exception for wrong number of arguments supplied to command."""
4968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if len(self.args) < self.command_spec.min_args:
4978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      tail_str = 's' if self.command_spec.min_args > 1 else ''
4988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      message = ('The %s command requires at least %d argument%s.' %
4998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                 (self.command_name, self.command_spec.min_args, tail_str))
5008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    else:
5018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      message = ('The %s command accepts at most %d arguments.' %
5028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                 (self.command_name, self.command_spec.max_args))
5038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    message += ' Usage:\n%s\nFor additional help run:\n  gsutil help %s' % (
5048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.command_spec.usage_synopsis, self.command_name)
5058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    raise CommandException(message)
5068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
5078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def RaiseInvalidArgumentException(self):
5088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Raises exception for specifying an invalid argument to command."""
5098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    message = ('Incorrect option(s) specified. Usage:\n%s\n'
5108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi               'For additional help run:\n  gsutil help %s' % (
5118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                   self.command_spec.usage_synopsis, self.command_name))
5128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    raise CommandException(message)
5138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
5148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def ParseSubOpts(self, check_args=False):
5158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Parses sub-opt args.
5168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
5178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Args:
5188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      check_args: True to have CheckArguments() called after parsing.
5198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
5208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Populates:
5218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      (self.sub_opts, self.args) from parsing.
5228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
5238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Raises: RaiseInvalidArgumentException if invalid args specified.
5248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """
5258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    try:
5268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      self.sub_opts, self.args = getopt.getopt(
5278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          self.args, self.command_spec.supported_sub_args,
5288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          self.command_spec.supported_private_args or [])
5298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    except getopt.GetoptError:
5308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      self.RaiseInvalidArgumentException()
5318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if check_args:
5328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      self.CheckArguments()
5338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
5348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def CheckArguments(self):
5358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Checks that command line arguments match the command_spec.
5368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
5378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Any commands in self._commands_with_subcommands_and_subopts are responsible
5388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    for calling this method after handling initial parsing of their arguments.
5398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    This prevents commands with sub-commands as well as options from breaking
5408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    the parsing of getopt.
5418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
5428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    TODO: Provide a function to parse commands and sub-commands more
5438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    intelligently once we stop allowing the deprecated command versions.
5448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
5458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Raises:
5468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      CommandException if the arguments don't match.
5478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """
5488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
5498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if (not self.command_spec.file_url_ok
5508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        and HaveFileUrls(self.args[self.command_spec.urls_start_arg:])):
5518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      raise CommandException('"%s" command does not support "file://" URLs. '
5528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                             'Did you mean to use a gs:// URL?' %
5538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                             self.command_name)
5548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if (not self.command_spec.provider_url_ok
5558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        and HaveProviderUrls(self.args[self.command_spec.urls_start_arg:])):
5568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      raise CommandException('"%s" command does not support provider-only '
5578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                             'URLs.' % self.command_name)
5588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
5598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def WildcardIterator(self, url_string, all_versions=False):
5608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Helper to instantiate gslib.WildcardIterator.
5618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
5628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Args are same as gslib.WildcardIterator interface, but this method fills in
5638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    most of the values from instance state.
5648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
5658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Args:
5668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      url_string: URL string naming wildcard objects to iterate.
5678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      all_versions: If true, the iterator yields all versions of objects
5688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                    matching the wildcard.  If false, yields just the live
5698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                    object version.
5708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
5718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Returns:
5728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      WildcardIterator for use by caller.
5738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """
5748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    return CreateWildcardIterator(
5758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        url_string, self.gsutil_api, all_versions=all_versions,
5768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        debug=self.debug, project_id=self.project_id)
5778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
5788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def RunCommand(self):
5798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Abstract function in base class. Subclasses must implement this.
5808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
5818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    The return value of this function will be used as the exit status of the
5828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    process, so subclass commands should return an integer exit code (0 for
5838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    success, a value in [1,255] for failure).
5848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """
5858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    raise CommandException('Command %s is missing its RunCommand() '
5868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                           'implementation' % self.command_name)
5878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
5888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  ############################################################
5898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # Shared helper functions that depend on base class state. #
5908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  ############################################################
5918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
5928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def ApplyAclFunc(self, acl_func, acl_excep_handler, url_strs):
5938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Sets the standard or default object ACL depending on self.command_name.
5948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
5958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Args:
5968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      acl_func: ACL function to be passed to Apply.
5978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      acl_excep_handler: ACL exception handler to be passed to Apply.
5988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      url_strs: URL strings on which to set ACL.
5998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
6008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Raises:
6018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      CommandException if an ACL could not be set.
6028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """
6038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    multi_threaded_url_args = []
6048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Handle bucket ACL setting operations single-threaded, because
6058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # our threading machinery currently assumes it's working with objects
6068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # (name_expansion_iterator), and normally we wouldn't expect users to need
6078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # to set ACLs on huge numbers of buckets at once anyway.
6088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    for url_str in url_strs:
6098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      url = StorageUrlFromString(url_str)
6108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      if url.IsCloudUrl() and url.IsBucket():
6118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        if self.recursion_requested:
6128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          # If user specified -R option, convert any bucket args to bucket
6138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          # wildcards (e.g., gs://bucket/*), to prevent the operation from
6148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          # being applied to the buckets themselves.
6158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          url.object_name = '*'
6168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          multi_threaded_url_args.append(url.url_string)
6178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        else:
6188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          # Convert to a NameExpansionResult so we can re-use the threaded
6198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          # function for the single-threaded implementation.  RefType is unused.
6208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          for blr in self.WildcardIterator(url.url_string).IterBuckets(
6218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi              bucket_fields=['id']):
6228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            name_expansion_for_url = NameExpansionResult(
6238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                url, False, False, blr.storage_url)
6248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            acl_func(self, name_expansion_for_url)
6258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      else:
6268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        multi_threaded_url_args.append(url_str)
6278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
6288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if len(multi_threaded_url_args) >= 1:
6298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      name_expansion_iterator = NameExpansionIterator(
6308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          self.command_name, self.debug,
6318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          self.logger, self.gsutil_api,
6328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          multi_threaded_url_args, self.recursion_requested,
6338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          all_versions=self.all_versions,
6348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          continue_on_error=self.continue_on_error or self.parallel_operations)
6358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
6368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # Perform requests in parallel (-m) mode, if requested, using
6378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # configured number of parallel processes and threads. Otherwise,
6388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # perform requests with sequential function calls in current process.
6398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      self.Apply(acl_func, name_expansion_iterator, acl_excep_handler,
6408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                 fail_on_error=not self.continue_on_error)
6418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
6428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if not self.everything_set_okay and not self.continue_on_error:
6438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      raise CommandException('ACLs for some objects could not be set.')
6448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
6458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def SetAclFunc(self, name_expansion_result, thread_state=None):
6468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Sets the object ACL for the name_expansion_result provided.
6478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
6488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Args:
6498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      name_expansion_result: NameExpansionResult describing the target object.
6508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      thread_state: If present, use this gsutil Cloud API instance for the set.
6518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """
6528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if thread_state:
6538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      assert not self.def_acl
6548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      gsutil_api = thread_state
6558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    else:
6568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      gsutil_api = self.gsutil_api
6578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    op_string = 'default object ACL' if self.def_acl else 'ACL'
6588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    url = name_expansion_result.expanded_storage_url
6598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.logger.info('Setting %s on %s...', op_string, url)
6608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if (gsutil_api.GetApiSelector(url.scheme) == ApiSelector.XML
6618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        and url.scheme != 'gs'):
6628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # If we are called with a non-google ACL model, we need to use the XML
6638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # passthrough. acl_arg should either be a canned ACL or an XML ACL.
6648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      self._SetAclXmlPassthrough(url, gsutil_api)
6658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    else:
6668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # Normal Cloud API path. acl_arg is a JSON ACL or a canned ACL.
6678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      self._SetAclGsutilApi(url, gsutil_api)
6688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
6698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def _SetAclXmlPassthrough(self, url, gsutil_api):
6708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Sets the ACL for the URL provided using the XML passthrough functions.
6718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
6728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    This function assumes that self.def_acl, self.canned,
6738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    and self.continue_on_error are initialized, and that self.acl_arg is
6748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    either an XML string or a canned ACL string.
6758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
6768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Args:
6778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      url: CloudURL to set the ACL on.
6788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      gsutil_api: gsutil Cloud API to use for the ACL set. Must support XML
6798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          passthrough functions.
6808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """
6818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    try:
6828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      orig_prefer_api = gsutil_api.prefer_api
6838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      gsutil_api.prefer_api = ApiSelector.XML
6848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      gsutil_api.XmlPassThroughSetAcl(
6858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          self.acl_arg, url, canned=self.canned,
6868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          def_obj_acl=self.def_acl, provider=url.scheme)
6878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    except ServiceException as e:
6888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      if self.continue_on_error:
6898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.everything_set_okay = False
6908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.logger.error(e)
6918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      else:
6928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        raise
6938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    finally:
6948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      gsutil_api.prefer_api = orig_prefer_api
6958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
6968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def _SetAclGsutilApi(self, url, gsutil_api):
6978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Sets the ACL for the URL provided using the gsutil Cloud API.
6988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
6998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    This function assumes that self.def_acl, self.canned,
7008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    and self.continue_on_error are initialized, and that self.acl_arg is
7018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    either a JSON string or a canned ACL string.
7028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
7038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Args:
7048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      url: CloudURL to set the ACL on.
7058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      gsutil_api: gsutil Cloud API to use for the ACL set.
7068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """
7078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    try:
7088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      if url.IsBucket():
7098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        if self.def_acl:
7108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          if self.canned:
7118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            gsutil_api.PatchBucket(
7128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                url.bucket_name, apitools_messages.Bucket(),
7138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                canned_def_acl=self.acl_arg, provider=url.scheme, fields=['id'])
7148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          else:
7158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            def_obj_acl = AclTranslation.JsonToMessage(
7168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                self.acl_arg, apitools_messages.ObjectAccessControl)
717cef7893435aa41160dd1255c43cb8498279738ccChris Craik            if not def_obj_acl:
718cef7893435aa41160dd1255c43cb8498279738ccChris Craik              # Use a sentinel value to indicate a private (no entries) default
719cef7893435aa41160dd1255c43cb8498279738ccChris Craik              # object ACL.
720cef7893435aa41160dd1255c43cb8498279738ccChris Craik              def_obj_acl.append(PRIVATE_DEFAULT_OBJ_ACL)
7218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            bucket_metadata = apitools_messages.Bucket(
7228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                defaultObjectAcl=def_obj_acl)
7238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            gsutil_api.PatchBucket(url.bucket_name, bucket_metadata,
7248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                                   provider=url.scheme, fields=['id'])
7258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        else:
7268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          if self.canned:
7278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            gsutil_api.PatchBucket(
7288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                url.bucket_name, apitools_messages.Bucket(),
7298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                canned_acl=self.acl_arg, provider=url.scheme, fields=['id'])
7308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          else:
7318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            bucket_acl = AclTranslation.JsonToMessage(
7328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                self.acl_arg, apitools_messages.BucketAccessControl)
7338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            bucket_metadata = apitools_messages.Bucket(acl=bucket_acl)
7348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            gsutil_api.PatchBucket(url.bucket_name, bucket_metadata,
7358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                                   provider=url.scheme, fields=['id'])
7368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      else:  # url.IsObject()
7378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        if self.canned:
7388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          gsutil_api.PatchObjectMetadata(
7398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi              url.bucket_name, url.object_name, apitools_messages.Object(),
7408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi              provider=url.scheme, generation=url.generation,
7418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi              canned_acl=self.acl_arg)
7428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        else:
7438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          object_acl = AclTranslation.JsonToMessage(
7448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi              self.acl_arg, apitools_messages.ObjectAccessControl)
7458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          object_metadata = apitools_messages.Object(acl=object_acl)
7468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          gsutil_api.PatchObjectMetadata(url.bucket_name, url.object_name,
7478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                                         object_metadata, provider=url.scheme,
7488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                                         generation=url.generation)
7498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    except ArgumentException, e:
7508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      raise
7518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    except ServiceException, e:
7528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      if self.continue_on_error:
7538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.everything_set_okay = False
7548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.logger.error(e)
7558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      else:
7568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        raise
7578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
7588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def SetAclCommandHelper(self, acl_func, acl_excep_handler):
7598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Sets ACLs on the self.args using the passed-in acl function.
7608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
7618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Args:
7628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      acl_func: ACL function to be passed to Apply.
7638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      acl_excep_handler: ACL exception handler to be passed to Apply.
7648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """
7658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    acl_arg = self.args[0]
7668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    url_args = self.args[1:]
7678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Disallow multi-provider setacl requests, because there are differences in
7688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # the ACL models.
7698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if not UrlsAreForSingleProvider(url_args):
7708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      raise CommandException('"%s" command spanning providers not allowed.' %
7718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                             self.command_name)
7728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
7738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Determine whether acl_arg names a file containing XML ACL text vs. the
7748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # string name of a canned ACL.
7758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if os.path.isfile(acl_arg):
7768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      with codecs.open(acl_arg, 'r', UTF8) as f:
7778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        acl_arg = f.read()
7788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      self.canned = False
7798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    else:
7808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # No file exists, so expect a canned ACL string.
7818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # validate=False because we allow wildcard urls.
7828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      storage_uri = boto.storage_uri(
7838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          url_args[0], debug=self.debug, validate=False,
7848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          bucket_storage_uri_class=self.bucket_storage_uri_class)
7858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
7868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      canned_acls = storage_uri.canned_acls()
7878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      if acl_arg not in canned_acls:
7888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        raise CommandException('Invalid canned ACL "%s".' % acl_arg)
7898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      self.canned = True
7908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
7918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Used to track if any ACLs failed to be set.
7928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.everything_set_okay = True
7938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.acl_arg = acl_arg
7948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
7958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.ApplyAclFunc(acl_func, acl_excep_handler, url_args)
7968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if not self.everything_set_okay and not self.continue_on_error:
7978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      raise CommandException('ACLs for some objects could not be set.')
7988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
7998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def _WarnServiceAccounts(self):
8008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Warns service account users who have received an AccessDenied error.
8018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
8028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    When one of the metadata-related commands fails due to AccessDenied, user
8038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    must ensure that they are listed as an Owner in the API console.
8048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """
8058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Import this here so that the value will be set first in
8068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # gcs_oauth2_boto_plugin.
8078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # pylint: disable=g-import-not-at-top
8088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    from gcs_oauth2_boto_plugin.oauth2_plugin import IS_SERVICE_ACCOUNT
8098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
8108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if IS_SERVICE_ACCOUNT:
8118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # This method is only called when canned ACLs are used, so the warning
8128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # definitely applies.
8138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      self.logger.warning('\n'.join(textwrap.wrap(
8148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          'It appears that your service account has been denied access while '
8158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          'attempting to perform a metadata operation. If you believe that you '
8168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          'should have access to this metadata (i.e., if it is associated with '
8178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          'your account), please make sure that your service account''s email '
8188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          'address is listed as an Owner in the Team tab of the API console. '
8198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          'See "gsutil help creds" for further information.\n')))
8208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
8218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def GetAndPrintAcl(self, url_str):
8228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Prints the standard or default object ACL depending on self.command_name.
8238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
8248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Args:
8258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      url_str: URL string to get ACL for.
8268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """
8278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    blr = self.GetAclCommandBucketListingReference(url_str)
8288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    url = StorageUrlFromString(url_str)
8298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if (self.gsutil_api.GetApiSelector(url.scheme) == ApiSelector.XML
8308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        and url.scheme != 'gs'):
8318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # Need to use XML passthrough.
8328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      try:
8338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        acl = self.gsutil_api.XmlPassThroughGetAcl(
8348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            url, def_obj_acl=self.def_acl, provider=url.scheme)
8358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        print acl.to_xml()
8368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      except AccessDeniedException, _:
8378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self._WarnServiceAccounts()
8388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        raise
8398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    else:
8408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      if self.command_name == 'defacl':
8418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        acl = blr.root_object.defaultObjectAcl
8428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        if not acl:
8438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          self.logger.warn(
8448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi              'No default object ACL present for %s. This could occur if '
8458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi              'the default object ACL is private, in which case objects '
8468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi              'created in this bucket will be readable only by their '
8478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi              'creators. It could also mean you do not have OWNER permission '
8488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi              'on %s and therefore do not have permission to read the '
8498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi              'default object ACL.', url_str, url_str)
8508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      else:
8518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        acl = blr.root_object.acl
8528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        if not acl:
8538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          self._WarnServiceAccounts()
8548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          raise AccessDeniedException('Access denied. Please ensure you have '
8558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                                      'OWNER permission on %s.' % url_str)
8568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      print AclTranslation.JsonFromMessage(acl)
8578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
8588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def GetAclCommandBucketListingReference(self, url_str):
8598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Gets a single bucket listing reference for an acl get command.
8608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
8618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Args:
8628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      url_str: URL string to get the bucket listing reference for.
8638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
8648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Returns:
8658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      BucketListingReference for the URL string.
8668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
8678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Raises:
8688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      CommandException if string did not result in exactly one reference.
8698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """
8708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # We're guaranteed by caller that we have the appropriate type of url
8718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # string for the call (ex. we will never be called with an object string
8728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # by getdefacl)
8738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    wildcard_url = StorageUrlFromString(url_str)
8748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if wildcard_url.IsObject():
8758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      plurality_iter = PluralityCheckableIterator(
8768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          self.WildcardIterator(url_str).IterObjects(
8778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi              bucket_listing_fields=['acl']))
8788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    else:
8798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # Bucket or provider.  We call IterBuckets explicitly here to ensure that
8808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # the root object is populated with the acl.
8818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      if self.command_name == 'defacl':
8828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        bucket_fields = ['defaultObjectAcl']
8838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      else:
8848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        bucket_fields = ['acl']
8858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      plurality_iter = PluralityCheckableIterator(
8868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          self.WildcardIterator(url_str).IterBuckets(
8878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi              bucket_fields=bucket_fields))
8888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if plurality_iter.IsEmpty():
8898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      raise CommandException('No URLs matched')
8908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if plurality_iter.HasPlurality():
8918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      raise CommandException(
8928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          '%s matched more than one URL, which is not allowed by the %s '
8938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          'command' % (url_str, self.command_name))
8948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    return list(plurality_iter)[0]
8958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
896cef7893435aa41160dd1255c43cb8498279738ccChris Craik  def _HandleMultiProcessingSigs(self, signal_num, unused_cur_stack_frame):
8978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Handles signals INT AND TERM during a multi-process/multi-thread request.
8988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
8998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Kills subprocesses.
9008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
9018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Args:
9028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      unused_signal_num: signal generated by ^C.
9038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      unused_cur_stack_frame: Current stack frame.
9048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """
9058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Note: This only works under Linux/MacOS. See
9068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # https://github.com/GoogleCloudPlatform/gsutil/issues/99 for details
9078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # about why making it work correctly across OS's is harder and still open.
9088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    ShutDownGsutil()
909cef7893435aa41160dd1255c43cb8498279738ccChris Craik    if signal_num == signal.SIGINT:
910cef7893435aa41160dd1255c43cb8498279738ccChris Craik      sys.stderr.write('Caught ^C - exiting\n')
9118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Simply calling sys.exit(1) doesn't work - see above bug for details.
9128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    KillProcess(os.getpid())
9138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
9148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def GetSingleBucketUrlFromArg(self, arg, bucket_fields=None):
9158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Gets a single bucket URL based on the command arguments.
9168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
9178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Args:
9188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      arg: String argument to get bucket URL for.
9198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      bucket_fields: Fields to populate for the bucket.
9208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
9218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Returns:
9228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      (StorageUrl referring to a single bucket, Bucket metadata).
9238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
9248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Raises:
9258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      CommandException if args did not match exactly one bucket.
9268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """
9278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    plurality_checkable_iterator = self.GetBucketUrlIterFromArg(
9288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        arg, bucket_fields=bucket_fields)
9298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if plurality_checkable_iterator.HasPlurality():
9308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      raise CommandException(
9318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          '%s matched more than one URL, which is not\n'
9328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          'allowed by the %s command' % (arg, self.command_name))
9338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    blr = list(plurality_checkable_iterator)[0]
9348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    return StorageUrlFromString(blr.url_string), blr.root_object
9358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
9368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def GetBucketUrlIterFromArg(self, arg, bucket_fields=None):
9378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Gets a single bucket URL based on the command arguments.
9388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
9398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Args:
9408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      arg: String argument to iterate over.
9418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      bucket_fields: Fields to populate for the bucket.
9428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
9438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Returns:
9448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      PluralityCheckableIterator over buckets.
9458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
9468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Raises:
9478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      CommandException if iterator matched no buckets.
9488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """
9498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    arg_url = StorageUrlFromString(arg)
9508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if not arg_url.IsCloudUrl() or arg_url.IsObject():
9518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      raise CommandException('"%s" command must specify a bucket' %
9528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                             self.command_name)
9538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
9548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    plurality_checkable_iterator = PluralityCheckableIterator(
9558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.WildcardIterator(arg).IterBuckets(
9568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            bucket_fields=bucket_fields))
9578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if plurality_checkable_iterator.IsEmpty():
9588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      raise CommandException('No URLs matched')
9598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    return plurality_checkable_iterator
9608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
9618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  ######################
9628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # Private functions. #
9638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  ######################
9648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
9658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def _ResetConnectionPool(self):
9668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Each OS process needs to establish its own set of connections to
9678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # the server to avoid writes from different OS processes interleaving
9688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # onto the same socket (and garbling the underlying SSL session).
9698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # We ensure each process gets its own set of connections here by
9708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # closing all connections in the storage provider connection pool.
9718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    connection_pool = StorageUri.provider_pool
9728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if connection_pool:
9738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      for i in connection_pool:
9748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        connection_pool[i].connection.close()
9758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
9768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def _GetProcessAndThreadCount(self, process_count, thread_count,
9778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                                parallel_operations_override):
9788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Determines the values of process_count and thread_count.
9798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
9808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    These values are used for parallel operations.
9818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    If we're not performing operations in parallel, then ignore
9828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    existing values and use process_count = thread_count = 1.
9838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
9848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Args:
9858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      process_count: A positive integer or None. In the latter case, we read
9868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                     the value from the .boto config file.
9878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      thread_count: A positive integer or None. In the latter case, we read
9888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                    the value from the .boto config file.
9898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      parallel_operations_override: Used to override self.parallel_operations.
9908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                                    This allows the caller to safely override
9918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                                    the top-level flag for a single call.
9928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
9938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Returns:
9948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      (process_count, thread_count): The number of processes and threads to use,
9958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                                     respectively.
9968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """
9978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Set OS process and python thread count as a function of options
9988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # and config.
9998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if self.parallel_operations or parallel_operations_override:
10008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      if not process_count:
10018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        process_count = boto.config.getint(
10028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            'GSUtil', 'parallel_process_count',
10038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            gslib.commands.config.DEFAULT_PARALLEL_PROCESS_COUNT)
10048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      if process_count < 1:
10058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        raise CommandException('Invalid parallel_process_count "%d".' %
10068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                               process_count)
10078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      if not thread_count:
10088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        thread_count = boto.config.getint(
10098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            'GSUtil', 'parallel_thread_count',
10108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            gslib.commands.config.DEFAULT_PARALLEL_THREAD_COUNT)
10118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      if thread_count < 1:
10128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        raise CommandException('Invalid parallel_thread_count "%d".' %
10138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                               thread_count)
10148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    else:
10158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # If -m not specified, then assume 1 OS process and 1 Python thread.
10168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      process_count = 1
10178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      thread_count = 1
10188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
10198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if IS_WINDOWS and process_count > 1:
10208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      raise CommandException('\n'.join(textwrap.wrap(
10218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          ('It is not possible to set process_count > 1 on Windows. Please '
10228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi           'update your config file (located at %s) and set '
10238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi           '"parallel_process_count = 1".') %
10248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          GetConfigFilePath())))
10258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.logger.debug('process count: %d', process_count)
10268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.logger.debug('thread count: %d', thread_count)
10278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
10288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    return (process_count, thread_count)
10298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
10308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def _SetUpPerCallerState(self):
10318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Set up the state for a caller id, corresponding to one Apply call."""
1032cef7893435aa41160dd1255c43cb8498279738ccChris Craik    # pylint: disable=global-variable-undefined,global-variable-not-assigned
1033cef7893435aa41160dd1255c43cb8498279738ccChris Craik    # These variables are initialized in InitializeMultiprocessingVariables or
1034cef7893435aa41160dd1255c43cb8498279738ccChris Craik    # InitializeThreadingVariables
1035cef7893435aa41160dd1255c43cb8498279738ccChris Craik    global global_return_values_map, shared_vars_map, failure_count
1036cef7893435aa41160dd1255c43cb8498279738ccChris Craik    global caller_id_finished_count, shared_vars_list_map, total_tasks
1037cef7893435aa41160dd1255c43cb8498279738ccChris Craik    global need_pool_or_done_cond, call_completed_map, class_map
1038cef7893435aa41160dd1255c43cb8498279738ccChris Craik    global task_queues, caller_id_lock, caller_id_counter
10398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Get a new caller ID.
10408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    with caller_id_lock:
1041cef7893435aa41160dd1255c43cb8498279738ccChris Craik      if isinstance(caller_id_counter, int):
1042cef7893435aa41160dd1255c43cb8498279738ccChris Craik        caller_id_counter += 1
1043cef7893435aa41160dd1255c43cb8498279738ccChris Craik        caller_id = caller_id_counter
1044cef7893435aa41160dd1255c43cb8498279738ccChris Craik      else:
1045cef7893435aa41160dd1255c43cb8498279738ccChris Craik        caller_id_counter.value += 1
1046cef7893435aa41160dd1255c43cb8498279738ccChris Craik        caller_id = caller_id_counter.value
10478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
10488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Create a copy of self with an incremented recursive level. This allows
10498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # the class to report its level correctly if the function called from it
10508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # also needs to call Apply.
10518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    cls = copy.copy(self)
10528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    cls.recursive_apply_level += 1
10538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
10548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Thread-safe loggers can't be pickled, so we will remove it here and
10558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # recreate it later in the WorkerThread. This is not a problem since any
10568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # logger with the same name will be treated as a singleton.
10578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    cls.logger = None
10588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
10598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Likewise, the default API connection can't be pickled, but it is unused
10608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # anyway as each thread gets its own API delegator.
10618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    cls.gsutil_api = None
10628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
10638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    class_map[caller_id] = cls
10648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    total_tasks[caller_id] = -1  # -1 => the producer hasn't finished yet.
10658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    call_completed_map[caller_id] = False
1066cef7893435aa41160dd1255c43cb8498279738ccChris Craik    caller_id_finished_count[caller_id] = 0
1067cef7893435aa41160dd1255c43cb8498279738ccChris Craik    global_return_values_map[caller_id] = []
10688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    return caller_id
10698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
10708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def _CreateNewConsumerPool(self, num_processes, num_threads):
10718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Create a new pool of processes that call _ApplyThreads."""
10728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    processes = []
10738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    task_queue = _NewMultiprocessingQueue()
10748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    task_queues.append(task_queue)
10758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
10768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    current_max_recursive_level.value += 1
10778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if current_max_recursive_level.value > MAX_RECURSIVE_DEPTH:
10788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      raise CommandException('Recursion depth of Apply calls is too great.')
10798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    for _ in range(num_processes):
10808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      recursive_apply_level = len(consumer_pools)
10818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      p = multiprocessing.Process(
10828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          target=self._ApplyThreads,
10838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          args=(num_threads, num_processes, recursive_apply_level))
10848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      p.daemon = True
10858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      processes.append(p)
10868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      p.start()
10878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    consumer_pool = _ConsumerPool(processes, task_queue)
10888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    consumer_pools.append(consumer_pool)
10898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
10908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def Apply(self, func, args_iterator, exception_handler,
10918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            shared_attrs=None, arg_checker=_UrlArgChecker,
10928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            parallel_operations_override=False, process_count=None,
10938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            thread_count=None, should_return_results=False,
10948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            fail_on_error=False):
10958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Calls _Parallel/SequentialApply based on multiprocessing availability.
10968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
10978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Args:
10988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      func: Function to call to process each argument.
10998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      args_iterator: Iterable collection of arguments to be put into the
11008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                     work queue.
11018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      exception_handler: Exception handler for WorkerThread class.
11028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      shared_attrs: List of attributes to manage across sub-processes.
11038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      arg_checker: Used to determine whether we should process the current
11048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                   argument or simply skip it. Also handles any logging that
11058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                   is specific to a particular type of argument.
11068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      parallel_operations_override: Used to override self.parallel_operations.
11078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                                    This allows the caller to safely override
11088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                                    the top-level flag for a single call.
11098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      process_count: The number of processes to use. If not specified, then
11108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                     the configured default will be used.
11118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      thread_count: The number of threads per process. If not speficied, then
11128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                    the configured default will be used..
11138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      should_return_results: If true, then return the results of all successful
11148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                             calls to func in a list.
11158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      fail_on_error: If true, then raise any exceptions encountered when
11168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                     executing func. This is only applicable in the case of
11178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                     process_count == thread_count == 1.
11188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
11198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Returns:
11208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      Results from spawned threads.
11218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """
11228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if shared_attrs:
11238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      original_shared_vars_values = {}  # We'll add these back in at the end.
11248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      for name in shared_attrs:
11258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        original_shared_vars_values[name] = getattr(self, name)
11268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # By setting this to 0, we simplify the logic for computing deltas.
11278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # We'll add it back after all of the tasks have been performed.
11288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        setattr(self, name, 0)
11298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
11308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    (process_count, thread_count) = self._GetProcessAndThreadCount(
11318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        process_count, thread_count, parallel_operations_override)
11328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
11338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    is_main_thread = (self.recursive_apply_level == 0
11348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                      and self.sequential_caller_id == -1)
11358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
11368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # We don't honor the fail_on_error flag in the case of multiple threads
11378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # or processes.
11388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    fail_on_error = fail_on_error and (process_count * thread_count == 1)
11398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
11408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Only check this from the first call in the main thread. Apart from the
11418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # fact that it's  wasteful to try this multiple times in general, it also
11428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # will never work when called from a subprocess since we use daemon
11438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # processes, and daemons can't create other processes.
1144cef7893435aa41160dd1255c43cb8498279738ccChris Craik    if (is_main_thread and not self.multiprocessing_is_available and
1145cef7893435aa41160dd1255c43cb8498279738ccChris Craik        process_count > 1):
1146cef7893435aa41160dd1255c43cb8498279738ccChris Craik      # Run the check again and log the appropriate warnings. This was run
1147cef7893435aa41160dd1255c43cb8498279738ccChris Craik      # before, when the Command object was created, in order to calculate
1148cef7893435aa41160dd1255c43cb8498279738ccChris Craik      # self.multiprocessing_is_available, but we don't want to print the
1149cef7893435aa41160dd1255c43cb8498279738ccChris Craik      # warning until we're sure the user actually tried to use multiple
1150cef7893435aa41160dd1255c43cb8498279738ccChris Craik      # threads or processes.
1151cef7893435aa41160dd1255c43cb8498279738ccChris Craik      CheckMultiprocessingAvailableAndInit(logger=self.logger)
1152cef7893435aa41160dd1255c43cb8498279738ccChris Craik
1153cef7893435aa41160dd1255c43cb8498279738ccChris Craik    caller_id = self._SetUpPerCallerState()
11548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
11558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # If any shared attributes passed by caller, create a dictionary of
11568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # shared memory variables for every element in the list of shared
11578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # attributes.
11588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if shared_attrs:
11598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      shared_vars_list_map[caller_id] = shared_attrs
11608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      for name in shared_attrs:
1161cef7893435aa41160dd1255c43cb8498279738ccChris Craik        shared_vars_map[(caller_id, name)] = 0
11628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
11638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Make all of the requested function calls.
1164cef7893435aa41160dd1255c43cb8498279738ccChris Craik    usable_processes_count = (process_count if self.multiprocessing_is_available
1165cef7893435aa41160dd1255c43cb8498279738ccChris Craik                              else 1)
1166cef7893435aa41160dd1255c43cb8498279738ccChris Craik    if thread_count * usable_processes_count > 1:
11678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      self._ParallelApply(func, args_iterator, exception_handler, caller_id,
1168cef7893435aa41160dd1255c43cb8498279738ccChris Craik                          arg_checker, usable_processes_count, thread_count,
11698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                          should_return_results, fail_on_error)
11708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    else:
11718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      self._SequentialApply(func, args_iterator, exception_handler, caller_id,
11728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                            arg_checker, should_return_results, fail_on_error)
11738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
11748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if shared_attrs:
11758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      for name in shared_attrs:
11768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # This allows us to retain the original value of the shared variable,
11778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # and simply apply the delta after what was done during the call to
11788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # apply.
11798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        final_value = (original_shared_vars_values[name] +
1180cef7893435aa41160dd1255c43cb8498279738ccChris Craik                       shared_vars_map.get((caller_id, name)))
11818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        setattr(self, name, final_value)
11828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
11838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if should_return_results:
1184cef7893435aa41160dd1255c43cb8498279738ccChris Craik      return global_return_values_map.get(caller_id)
11858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
11868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def _MaybeSuggestGsutilDashM(self):
11878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Outputs a sugestion to the user to use gsutil -m."""
11888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if not (boto.config.getint('GSUtil', 'parallel_process_count', 0) == 1 and
11898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            boto.config.getint('GSUtil', 'parallel_thread_count', 0) == 1):
11908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      self.logger.info('\n' + textwrap.fill(
11918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          '==> NOTE: You are performing a sequence of gsutil operations that '
11928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          'may run significantly faster if you instead use gsutil -m %s ...\n'
11938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          'Please see the -m section under "gsutil help options" for further '
11948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          'information about when gsutil -m can be advantageous.'
11958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          % sys.argv[1]) + '\n')
11968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
11978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # pylint: disable=g-doc-args
11988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def _SequentialApply(self, func, args_iterator, exception_handler, caller_id,
11998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                       arg_checker, should_return_results, fail_on_error):
12008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Performs all function calls sequentially in the current thread.
12018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
12028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    No other threads or processes will be spawned. This degraded functionality
12038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    is used when the multiprocessing module is not available or the user
12048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    requests only one thread and one process.
12058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """
12068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Create a WorkerThread to handle all of the logic needed to actually call
12078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # the function. Note that this thread will never be started, and all work
12088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # is done in the current thread.
12098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    worker_thread = WorkerThread(None, False)
12108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    args_iterator = iter(args_iterator)
12118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Count of sequential calls that have been made. Used for producing
12128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # suggestion to use gsutil -m.
12138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    sequential_call_count = 0
12148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    while True:
12158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
12168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # Try to get the next argument, handling any exceptions that arise.
12178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      try:
12188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        args = args_iterator.next()
12198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      except StopIteration, e:
12208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        break
12218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      except Exception, e:  # pylint: disable=broad-except
12228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        _IncrementFailureCount()
12238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        if fail_on_error:
12248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          raise
12258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        else:
12268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          try:
12278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            exception_handler(self, e)
12288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          except Exception, _:  # pylint: disable=broad-except
12298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            self.logger.debug(
12308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                'Caught exception while handling exception for %s:\n%s',
12318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                func, traceback.format_exc())
12328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          continue
12338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
12348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      sequential_call_count += 1
12358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      if sequential_call_count == OFFER_GSUTIL_M_SUGGESTION_THRESHOLD:
12368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # Output suggestion near beginning of run, so user sees it early and can
12378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # ^C and try gsutil -m.
12388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self._MaybeSuggestGsutilDashM()
12398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      if arg_checker(self, args):
12408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # Now that we actually have the next argument, perform the task.
12418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        task = Task(func, args, caller_id, exception_handler,
12428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                    should_return_results, arg_checker, fail_on_error)
12438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        worker_thread.PerformTask(task, self)
12448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if sequential_call_count >= gslib.util.GetTermLines():
12458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # Output suggestion at end of long run, in case user missed it at the
12468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # start and it scrolled off-screen.
12478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      self._MaybeSuggestGsutilDashM()
12488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1249cef7893435aa41160dd1255c43cb8498279738ccChris Craik    # If the final iterated argument results in an exception, and that
1250cef7893435aa41160dd1255c43cb8498279738ccChris Craik    # exception modifies shared_attrs, we need to publish the results.
1251cef7893435aa41160dd1255c43cb8498279738ccChris Craik    worker_thread.shared_vars_updater.Update(caller_id, self)
1252cef7893435aa41160dd1255c43cb8498279738ccChris Craik
12538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  # pylint: disable=g-doc-args
12548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def _ParallelApply(self, func, args_iterator, exception_handler, caller_id,
12558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                     arg_checker, process_count, thread_count,
12568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                     should_return_results, fail_on_error):
12578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Dispatches input arguments across a thread/process pool.
12588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
12598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Pools are composed of parallel OS processes and/or Python threads,
12608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    based on options (-m or not) and settings in the user's config file.
12618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
12628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    If only one OS process is requested/available, dispatch requests across
12638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    threads in the current OS process.
12648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
12658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    In the multi-process case, we will create one pool of worker processes for
12668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    each level of the tree of recursive calls to Apply. E.g., if A calls
12678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Apply(B), and B ultimately calls Apply(C) followed by Apply(D), then we
12688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    will only create two sets of worker processes - B will execute in the first,
12698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    and C and D will execute in the second. If C is then changed to call
12708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Apply(E) and D is changed to call Apply(F), then we will automatically
12718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    create a third set of processes (lazily, when needed) that will be used to
12728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    execute calls to E and F. This might look something like:
12738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
12748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Pool1 Executes:                B
12758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                                  / \
12768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Pool2 Executes:              C   D
12778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                                /     \
12788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Pool3 Executes:            E       F
12798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
12808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Apply's parallelism is generally broken up into 4 cases:
12818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    - If process_count == thread_count == 1, then all tasks will be executed
12828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      by _SequentialApply.
12838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    - If process_count > 1 and thread_count == 1, then the main thread will
12848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      create a new pool of processes (if they don't already exist) and each of
12858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      those processes will execute the tasks in a single thread.
12868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    - If process_count == 1 and thread_count > 1, then this process will create
12878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      a new pool of threads to execute the tasks.
12888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    - If process_count > 1 and thread_count > 1, then the main thread will
12898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      create a new pool of processes (if they don't already exist) and each of
12908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      those processes will, upon creation, create a pool of threads to
12918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      execute the tasks.
12928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
12938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Args:
12948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      caller_id: The caller ID unique to this call to command.Apply.
12958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      See command.Apply for description of other arguments.
12968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """
12978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    is_main_thread = self.recursive_apply_level == 0
12988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
12998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Catch SIGINT and SIGTERM under Linux/MacOs so we can do cleanup before
13008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # exiting.
13018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if not IS_WINDOWS and is_main_thread:
13028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # Register as a final signal handler because this handler kills the
13038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # main gsutil process (so it must run last).
13048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      RegisterSignalHandler(signal.SIGINT, self._HandleMultiProcessingSigs,
13058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                            is_final_handler=True)
13068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      RegisterSignalHandler(signal.SIGTERM, self._HandleMultiProcessingSigs,
13078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                            is_final_handler=True)
13088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
13098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if not task_queues:
13108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # The process we create will need to access the next recursive level
13118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # of task queues if it makes a call to Apply, so we always keep around
13128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # one more queue than we know we need. OTOH, if we don't create a new
13138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # process, the existing process still needs a task queue to use.
1314cef7893435aa41160dd1255c43cb8498279738ccChris Craik      if process_count > 1:
1315cef7893435aa41160dd1255c43cb8498279738ccChris Craik        task_queues.append(_NewMultiprocessingQueue())
1316cef7893435aa41160dd1255c43cb8498279738ccChris Craik      else:
1317cef7893435aa41160dd1255c43cb8498279738ccChris Craik        task_queues.append(_NewThreadsafeQueue())
13188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
13198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if process_count > 1:  # Handle process pool creation.
13208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # Check whether this call will need a new set of workers.
13218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
13228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # Each worker must acquire a shared lock before notifying the main thread
13238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # that it needs a new worker pool, so that at most one worker asks for
13248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # a new worker pool at once.
13258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      try:
13268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        if not is_main_thread:
13278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          worker_checking_level_lock.acquire()
13288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        if self.recursive_apply_level >= current_max_recursive_level.value:
13298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          with need_pool_or_done_cond:
13308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            # Only the main thread is allowed to create new processes -
13318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            # otherwise, we will run into some Python bugs.
13328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            if is_main_thread:
13338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi              self._CreateNewConsumerPool(process_count, thread_count)
13348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            else:
13358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi              # Notify the main thread that we need a new consumer pool.
13368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi              new_pool_needed.value = 1
13378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi              need_pool_or_done_cond.notify_all()
13388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi              # The main thread will notify us when it finishes.
13398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi              need_pool_or_done_cond.wait()
13408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      finally:
13418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        if not is_main_thread:
13428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          worker_checking_level_lock.release()
13438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
13448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # If we're running in this process, create a separate task queue. Otherwise,
13458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # if Apply has already been called with process_count > 1, then there will
13468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # be consumer pools trying to use our processes.
13478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if process_count > 1:
13488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      task_queue = task_queues[self.recursive_apply_level]
1349cef7893435aa41160dd1255c43cb8498279738ccChris Craik    elif self.multiprocessing_is_available:
13508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      task_queue = _NewMultiprocessingQueue()
1351cef7893435aa41160dd1255c43cb8498279738ccChris Craik    else:
1352cef7893435aa41160dd1255c43cb8498279738ccChris Craik      task_queue = _NewThreadsafeQueue()
13538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
13548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Kick off a producer thread to throw tasks in the global task queue. We
13558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # do this asynchronously so that the main thread can be free to create new
13568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # consumer pools when needed (otherwise, any thread with a task that needs
13578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # a new consumer pool must block until we're completely done producing; in
13588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # the worst case, every worker blocks on such a call and the producer fills
13598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # up the task queue before it finishes, so we block forever).
13608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    producer_thread = ProducerThread(copy.copy(self), args_iterator, caller_id,
13618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                                     func, task_queue, should_return_results,
13628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                                     exception_handler, arg_checker,
13638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                                     fail_on_error)
13648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
13658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if process_count > 1:
13668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # Wait here until either:
13678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      #   1. We're the main thread and someone needs a new consumer pool - in
13688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      #      which case we create one and continue waiting.
13698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      #   2. Someone notifies us that all of the work we requested is done, in
13708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      #      which case we retrieve the results (if applicable) and stop
13718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      #      waiting.
13728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      while True:
13738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        with need_pool_or_done_cond:
13748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          # Either our call is done, or someone needs a new level of consumer
13758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          # pools, or we the wakeup call was meant for someone else. It's
13768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          # impossible for both conditions to be true, since the main thread is
13778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          # blocked on any other ongoing calls to Apply, and a thread would not
13788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          # ask for a new consumer pool unless it had more work to do.
13798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          if call_completed_map[caller_id]:
13808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            break
13818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          elif is_main_thread and new_pool_needed.value:
13828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            new_pool_needed.value = 0
13838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            self._CreateNewConsumerPool(process_count, thread_count)
13848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            need_pool_or_done_cond.notify_all()
13858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
13868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          # Note that we must check the above conditions before the wait() call;
13878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          # otherwise, the notification can happen before we start waiting, in
13888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          # which case we'll block forever.
13898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          need_pool_or_done_cond.wait()
13908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    else:  # Using a single process.
13918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      self._ApplyThreads(thread_count, process_count,
13928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                         self.recursive_apply_level,
13938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                         is_blocking_call=True, task_queue=task_queue)
13948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
13958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # We encountered an exception from the producer thread before any arguments
13968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # were enqueued, but it wouldn't have been propagated, so we'll now
13978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # explicitly raise it here.
13988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if producer_thread.unknown_exception:
13998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # pylint: disable=raising-bad-type
14008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      raise producer_thread.unknown_exception
14018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
14028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # We encountered an exception from the producer thread while iterating over
14038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # the arguments, so raise it here if we're meant to fail on error.
14048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if producer_thread.iterator_exception and fail_on_error:
14058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # pylint: disable=raising-bad-type
14068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      raise producer_thread.iterator_exception
14078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
14088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def _ApplyThreads(self, thread_count, process_count, recursive_apply_level,
14098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                    is_blocking_call=False, task_queue=None):
14108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Assigns the work from the multi-process global task queue.
14118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
14128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Work is assigned to an individual process for later consumption either by
14138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    the WorkerThreads or (if thread_count == 1) this thread.
14148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
14158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Args:
14168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      thread_count: The number of threads used to perform the work. If 1, then
14178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                    perform all work in this thread.
14188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      process_count: The number of processes used to perform the work.
14198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      recursive_apply_level: The depth in the tree of recursive calls to Apply
14208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                             of this thread.
14218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      is_blocking_call: True iff the call to Apply is blocked on this call
14228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                        (which is true iff process_count == 1), implying that
14238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                        _ApplyThreads must behave as a blocking call.
14248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """
14258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self._ResetConnectionPool()
14268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.recursive_apply_level = recursive_apply_level
14278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
14288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    task_queue = task_queue or task_queues[recursive_apply_level]
14298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1430cef7893435aa41160dd1255c43cb8498279738ccChris Craik    # Ensure fairness across processes by filling our WorkerPool only with
1431cef7893435aa41160dd1255c43cb8498279738ccChris Craik    # as many tasks as it has WorkerThreads. This semaphore is acquired each
1432cef7893435aa41160dd1255c43cb8498279738ccChris Craik    # time that a task is retrieved from the queue and released each time
1433cef7893435aa41160dd1255c43cb8498279738ccChris Craik    # a task is completed by a WorkerThread.
1434cef7893435aa41160dd1255c43cb8498279738ccChris Craik    worker_semaphore = threading.BoundedSemaphore(thread_count)
1435cef7893435aa41160dd1255c43cb8498279738ccChris Craik
14368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    assert thread_count * process_count > 1, (
14378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        'Invalid state, calling command._ApplyThreads with only one thread '
14388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        'and process.')
1439cef7893435aa41160dd1255c43cb8498279738ccChris Craik    # TODO: Presently, this pool gets recreated with each call to Apply. We
1440cef7893435aa41160dd1255c43cb8498279738ccChris Craik    # should be able to do it just once, at process creation time.
14418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    worker_pool = WorkerPool(
1442cef7893435aa41160dd1255c43cb8498279738ccChris Craik        thread_count, self.logger, worker_semaphore,
14438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        bucket_storage_uri_class=self.bucket_storage_uri_class,
14448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        gsutil_api_map=self.gsutil_api_map, debug=self.debug)
14458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
14468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    num_enqueued = 0
14478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    while True:
1448cef7893435aa41160dd1255c43cb8498279738ccChris Craik      worker_semaphore.acquire()
14498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      task = task_queue.get()
14508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      if task.args != ZERO_TASKS_TO_DO_ARGUMENT:
14518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # If we have no tasks to do and we're performing a blocking call, we
14528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # need a special signal to tell us to stop - otherwise, we block on
14538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # the call to task_queue.get() forever.
14548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        worker_pool.AddTask(task)
14558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        num_enqueued += 1
1456cef7893435aa41160dd1255c43cb8498279738ccChris Craik      else:
1457cef7893435aa41160dd1255c43cb8498279738ccChris Craik        # No tasks remain; don't block the semaphore on WorkerThread completion.
1458cef7893435aa41160dd1255c43cb8498279738ccChris Craik        worker_semaphore.release()
14598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
14608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      if is_blocking_call:
14618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        num_to_do = total_tasks[task.caller_id]
14628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # The producer thread won't enqueue the last task until after it has
14638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # updated total_tasks[caller_id], so we know that num_to_do < 0 implies
14648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # we will do this check again.
14658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        if num_to_do >= 0 and num_enqueued == num_to_do:
14668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          if thread_count == 1:
14678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            return
14688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          else:
14698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            while True:
14708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi              with need_pool_or_done_cond:
14718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                if call_completed_map[task.caller_id]:
14728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                  # We need to check this first, in case the condition was
14738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                  # notified before we grabbed the lock.
14748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                  return
14758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                need_pool_or_done_cond.wait()
14768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
14778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
14788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# Below here lie classes and functions related to controlling the flow of tasks
14798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# between various threads and processes.
14808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
14818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
14828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiclass _ConsumerPool(object):
14838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
14848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def __init__(self, processes, task_queue):
14858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.processes = processes
14868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.task_queue = task_queue
14878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
14888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def ShutDown(self):
14898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    for process in self.processes:
14908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      KillProcess(process.pid)
14918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
14928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
14938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef KillProcess(pid):
14948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  """Make best effort to kill the given process.
14958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
14968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  We ignore all exceptions so a caller looping through a list of processes will
14978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  continue attempting to kill each, even if one encounters a problem.
14988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
14998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  Args:
15008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    pid: The process ID.
15018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  """
15028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  try:
15038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # os.kill doesn't work in 2.X or 3.Y on Windows for any X < 7 or Y < 2.
15048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if IS_WINDOWS and ((2, 6) <= sys.version_info[:3] < (2, 7) or
15058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                       (3, 0) <= sys.version_info[:3] < (3, 2)):
15068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      kernel32 = ctypes.windll.kernel32
15078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      handle = kernel32.OpenProcess(1, 0, pid)
15088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      kernel32.TerminateProcess(handle, 0)
15098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    else:
15108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      os.kill(pid, signal.SIGKILL)
15118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  except:  # pylint: disable=bare-except
15128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    pass
15138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
15148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
15158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiclass Task(namedtuple('Task', (
15168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    'func args caller_id exception_handler should_return_results arg_checker '
15178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    'fail_on_error'))):
15188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  """Task class representing work to be completed.
15198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
15208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  Args:
15218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    func: The function to be executed.
15228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    args: The arguments to func.
15238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    caller_id: The globally-unique caller ID corresponding to the Apply call.
15248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    exception_handler: The exception handler to use if the call to func fails.
15258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    should_return_results: True iff the results of this function should be
15268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                           returned from the Apply call.
15278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    arg_checker: Used to determine whether we should process the current
15288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                 argument or simply skip it. Also handles any logging that
15298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                 is specific to a particular type of argument.
15308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    fail_on_error: If true, then raise any exceptions encountered when
15318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                   executing func. This is only applicable in the case of
15328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                   process_count == thread_count == 1.
15338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  """
15348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  pass
15358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
15368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
15378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiclass ProducerThread(threading.Thread):
15388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  """Thread used to enqueue work for other processes and threads."""
15398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
15408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def __init__(self, cls, args_iterator, caller_id, func, task_queue,
15418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi               should_return_results, exception_handler, arg_checker,
15428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi               fail_on_error):
15438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Initializes the producer thread.
15448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
15458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Args:
15468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      cls: Instance of Command for which this ProducerThread was created.
15478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      args_iterator: Iterable collection of arguments to be put into the
15488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                     work queue.
15498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      caller_id: Globally-unique caller ID corresponding to this call to Apply.
15508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      func: The function to be called on each element of args_iterator.
15518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      task_queue: The queue into which tasks will be put, to later be consumed
15528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                  by Command._ApplyThreads.
15538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      should_return_results: True iff the results for this call to command.Apply
15548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                             were requested.
15558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      exception_handler: The exception handler to use when errors are
15568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                         encountered during calls to func.
15578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      arg_checker: Used to determine whether we should process the current
15588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                   argument or simply skip it. Also handles any logging that
15598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                   is specific to a particular type of argument.
15608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      fail_on_error: If true, then raise any exceptions encountered when
15618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                     executing func. This is only applicable in the case of
15628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                     process_count == thread_count == 1.
15638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """
15648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    super(ProducerThread, self).__init__()
15658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.func = func
15668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.cls = cls
15678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.args_iterator = args_iterator
15688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.caller_id = caller_id
15698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.task_queue = task_queue
15708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.arg_checker = arg_checker
15718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.exception_handler = exception_handler
15728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.should_return_results = should_return_results
15738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.fail_on_error = fail_on_error
15748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.shared_variables_updater = _SharedVariablesUpdater()
15758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.daemon = True
15768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.unknown_exception = None
15778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.iterator_exception = None
15788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.start()
15798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
15808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def run(self):
15818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    num_tasks = 0
15828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    cur_task = None
15838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    last_task = None
15848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    try:
15858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      args_iterator = iter(self.args_iterator)
15868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      while True:
15878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        try:
15888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          args = args_iterator.next()
15898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        except StopIteration, e:
15908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          break
15918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        except Exception, e:  # pylint: disable=broad-except
15928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          _IncrementFailureCount()
15938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          if self.fail_on_error:
15948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            self.iterator_exception = e
15958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            raise
15968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          else:
15978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            try:
15988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi              self.exception_handler(self.cls, e)
15998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            except Exception, _:  # pylint: disable=broad-except
16008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi              self.cls.logger.debug(
16018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                  'Caught exception while handling exception for %s:\n%s',
16028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                  self.func, traceback.format_exc())
16038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            self.shared_variables_updater.Update(self.caller_id, self.cls)
16048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            continue
16058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
16068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        if self.arg_checker(self.cls, args):
16078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          num_tasks += 1
16088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          last_task = cur_task
16098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          cur_task = Task(self.func, args, self.caller_id,
16108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                          self.exception_handler, self.should_return_results,
16118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                          self.arg_checker, self.fail_on_error)
16128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          if last_task:
16138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            self.task_queue.put(last_task)
16148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    except Exception, e:  # pylint: disable=broad-except
16158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # This will also catch any exception raised due to an error in the
16168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # iterator when fail_on_error is set, so check that we failed for some
16178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # other reason before claiming that we had an unknown exception.
16188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      if not self.iterator_exception:
16198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.unknown_exception = e
16208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    finally:
16218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # We need to make sure to update total_tasks[caller_id] before we enqueue
16228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # the last task. Otherwise, a worker can retrieve the last task and
16238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # complete it, then check total_tasks and determine that we're not done
16248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # producing all before we update total_tasks. This approach forces workers
16258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # to wait on the last task until after we've updated total_tasks.
16268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      total_tasks[self.caller_id] = num_tasks
16278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      if not cur_task:
16288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # This happens if there were zero arguments to be put in the queue.
16298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        cur_task = Task(None, ZERO_TASKS_TO_DO_ARGUMENT, self.caller_id,
16308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                        None, None, None, None)
16318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      self.task_queue.put(cur_task)
16328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
16338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # It's possible that the workers finished before we updated total_tasks,
16348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # so we need to check here as well.
16358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      _NotifyIfDone(self.caller_id,
1636cef7893435aa41160dd1255c43cb8498279738ccChris Craik                    caller_id_finished_count.get(self.caller_id))
16378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
16388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
16398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiclass WorkerPool(object):
16408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  """Pool of worker threads to which tasks can be added."""
16418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1642cef7893435aa41160dd1255c43cb8498279738ccChris Craik  def __init__(self, thread_count, logger, worker_semaphore,
1643cef7893435aa41160dd1255c43cb8498279738ccChris Craik               bucket_storage_uri_class=None, gsutil_api_map=None, debug=0):
16448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.task_queue = _NewThreadsafeQueue()
16458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.threads = []
16468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    for _ in range(thread_count):
16478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      worker_thread = WorkerThread(
1648cef7893435aa41160dd1255c43cb8498279738ccChris Craik          self.task_queue, logger, worker_semaphore=worker_semaphore,
16498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          bucket_storage_uri_class=bucket_storage_uri_class,
16508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          gsutil_api_map=gsutil_api_map, debug=debug)
16518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      self.threads.append(worker_thread)
16528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      worker_thread.start()
16538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
16548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def AddTask(self, task):
16558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.task_queue.put(task)
16568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
16578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
16588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiclass WorkerThread(threading.Thread):
16598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  """Thread where all the work will be performed.
16608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
16618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  This makes the function calls for Apply and takes care of all error handling,
16628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  return value propagation, and shared_vars.
16638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
16648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  Note that this thread is NOT started upon instantiation because the function-
16658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  calling logic is also used in the single-threaded case.
16668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  """
16678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1668cef7893435aa41160dd1255c43cb8498279738ccChris Craik  def __init__(self, task_queue, logger, worker_semaphore=None,
1669cef7893435aa41160dd1255c43cb8498279738ccChris Craik               bucket_storage_uri_class=None, gsutil_api_map=None, debug=0):
16708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Initializes the worker thread.
16718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
16728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Args:
16738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      task_queue: The thread-safe queue from which this thread should obtain
16748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                  its work.
16758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      logger: Logger to use for this thread.
1676cef7893435aa41160dd1255c43cb8498279738ccChris Craik      worker_semaphore: threading.BoundedSemaphore to be released each time a
1677cef7893435aa41160dd1255c43cb8498279738ccChris Craik          task is completed, or None for single-threaded execution.
16788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
16798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                                Settable for testing/mocking.
16808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      gsutil_api_map: Map of providers and API selector tuples to api classes
16818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                      which can be used to communicate with those providers.
16828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                      Used for the instantiating CloudApiDelegator class.
16838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      debug: debug level for the CloudApiDelegator class.
16848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """
16858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    super(WorkerThread, self).__init__()
16868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.task_queue = task_queue
1687cef7893435aa41160dd1255c43cb8498279738ccChris Craik    self.worker_semaphore = worker_semaphore
16888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.daemon = True
16898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.cached_classes = {}
16908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.shared_vars_updater = _SharedVariablesUpdater()
16918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
16928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.thread_gsutil_api = None
16938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if bucket_storage_uri_class and gsutil_api_map:
16948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      self.thread_gsutil_api = CloudApiDelegator(
16958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          bucket_storage_uri_class, gsutil_api_map, logger, debug=debug)
16968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
16978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def PerformTask(self, task, cls):
16988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Makes the function call for a task.
16998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
17008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Args:
17018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      task: The Task to perform.
17028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      cls: The instance of a class which gives context to the functions called
17038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi           by the Task's function. E.g., see SetAclFuncWrapper.
17048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """
17058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    caller_id = task.caller_id
17068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    try:
17078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      results = task.func(cls, task.args, thread_state=self.thread_gsutil_api)
17088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      if task.should_return_results:
1709cef7893435aa41160dd1255c43cb8498279738ccChris Craik        global_return_values_map.Increment(caller_id, [results],
1710cef7893435aa41160dd1255c43cb8498279738ccChris Craik                                           default_value=[])
17118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    except Exception, e:  # pylint: disable=broad-except
17128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      _IncrementFailureCount()
17138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      if task.fail_on_error:
17148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        raise  # Only happens for single thread and process case.
17158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      else:
17168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        try:
17178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          task.exception_handler(cls, e)
17188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        except Exception, _:  # pylint: disable=broad-except
17198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          # Don't allow callers to raise exceptions here and kill the worker
17208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          # threads.
17218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          cls.logger.debug(
17228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi              'Caught exception while handling exception for %s:\n%s',
17238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi              task, traceback.format_exc())
17248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    finally:
1725cef7893435aa41160dd1255c43cb8498279738ccChris Craik      if self.worker_semaphore:
1726cef7893435aa41160dd1255c43cb8498279738ccChris Craik        self.worker_semaphore.release()
17278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      self.shared_vars_updater.Update(caller_id, cls)
17288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
17298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # Even if we encounter an exception, we still need to claim that that
17308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # the function finished executing. Otherwise, we won't know when to
17318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # stop waiting and return results.
1732cef7893435aa41160dd1255c43cb8498279738ccChris Craik      num_done = caller_id_finished_count.Increment(caller_id, 1)
1733cef7893435aa41160dd1255c43cb8498279738ccChris Craik      _NotifyIfDone(caller_id, num_done)
17348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
17358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def run(self):
17368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    while True:
17378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      task = self.task_queue.get()
17388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      caller_id = task.caller_id
17398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
17408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      # Get the instance of the command with the appropriate context.
17418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      cls = self.cached_classes.get(caller_id, None)
17428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      if not cls:
17438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        cls = copy.copy(class_map[caller_id])
17448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        cls.logger = CreateGsutilLogger(cls.command_name)
17458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.cached_classes[caller_id] = cls
17468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
17478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      self.PerformTask(task, cls)
17488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
17498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
17508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiclass _SharedVariablesUpdater(object):
17518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  """Used to update shared variable for a class in the global map.
17528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
17538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi     Note that each thread will have its own instance of the calling class for
17548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi     context, and it will also have its own instance of a
17558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi     _SharedVariablesUpdater.  This is used in the following way:
17568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
17578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi     1. Before any tasks are performed, each thread will get a copy of the
17588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        calling class, and the globally-consistent value of this shared variable
17598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        will be initialized to whatever it was before the call to Apply began.
17608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
17618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi     2. After each time a thread performs a task, it will look at the current
17628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        values of the shared variables in its instance of the calling class.
17638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
17648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        2.A. For each such variable, it computes the delta of this variable
17658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi             between the last known value for this class (which is stored in
17668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi             a dict local to this class) and the current value of the variable
17678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi             in the class.
17688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
17698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        2.B. Using this delta, we update the last known value locally as well
17708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi             as the globally-consistent value shared across all classes (the
17718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi             globally consistent value is simply increased by the computed
17728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi             delta).
17738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  """
17748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
17758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def __init__(self):
17768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    self.last_shared_var_values = {}
17778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
17788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  def Update(self, caller_id, cls):
17798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    """Update any shared variables with their deltas."""
17808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    shared_vars = shared_vars_list_map.get(caller_id, None)
17818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if shared_vars:
17828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      for name in shared_vars:
17838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        key = (caller_id, name)
17848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        last_value = self.last_shared_var_values.get(key, 0)
17858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # Compute the change made since the last time we updated here. This is
17868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # calculated by simply subtracting the last known value from the current
17878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # value in the class instance.
17888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        delta = getattr(cls, name) - last_value
17898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.last_shared_var_values[key] = delta + last_value
17908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
17918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # Update the globally-consistent value by simply increasing it by the
17928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # computed delta.
1793cef7893435aa41160dd1255c43cb8498279738ccChris Craik        shared_vars_map.Increment(key, delta)
17948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
17958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
17968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef _NotifyIfDone(caller_id, num_done):
17978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  """Notify any threads waiting for results that something has finished.
17988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
17998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  Each waiting thread will then need to check the call_completed_map to see if
18008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  its work is done.
18018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
18028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  Note that num_done could be calculated here, but it is passed in as an
18038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  optimization so that we have one less call to a globally-locked data
18048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  structure.
18058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
18068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  Args:
18078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    caller_id: The caller_id of the function whose progress we're checking.
18088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    num_done: The number of tasks currently completed for that caller_id.
18098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  """
18108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  num_to_do = total_tasks[caller_id]
18118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  if num_to_do == num_done and num_to_do >= 0:
18128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    # Notify the Apply call that's sleeping that it's ready to return.
18138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    with need_pool_or_done_cond:
18148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      call_completed_map[caller_id] = True
18158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      need_pool_or_done_cond.notify_all()
18168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
18178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
18188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef ShutDownGsutil():
18198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  """Shut down all processes in consumer pools in preparation for exiting."""
18208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  for q in queues:
18218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    try:
18228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      q.cancel_join_thread()
18238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    except:  # pylint: disable=bare-except
18248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      pass
18258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  for consumer_pool in consumer_pools:
18268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    consumer_pool.ShutDown()
18278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
18288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
18298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# pylint: disable=global-variable-undefined
18308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef _IncrementFailureCount():
18318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  global failure_count
18328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  if isinstance(failure_count, int):
18338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    failure_count += 1
18348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  else:  # Otherwise it's a multiprocessing.Value() of type 'i'.
18358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    failure_count.value += 1
18368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
18378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
18388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# pylint: disable=global-variable-undefined
18398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef GetFailureCount():
18408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  """Returns the number of failures processed during calls to Apply()."""
18418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  try:
18428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if isinstance(failure_count, int):
18438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      return failure_count
18448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    else:  # It's a multiprocessing.Value() of type 'i'.
18458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      return failure_count.value
18468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  except NameError:  # If it wasn't initialized, Apply() wasn't called.
18478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    return 0
18488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
18498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
18508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef ResetFailureCount():
18518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  """Resets the failure_count variable to 0 - useful if error is expected."""
18528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  try:
18538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    global failure_count
18548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if isinstance(failure_count, int):
18558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      failure_count = 0
18568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    else:  # It's a multiprocessing.Value() of type 'i'.
18578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi      failure_count = multiprocessing.Value('i', 0)
18588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi  except NameError:  # If it wasn't initialized, Apply() wasn't called.
18598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    pass
1860