1# -*- coding: utf-8 -*-
2# Copyright 2012 Google Inc. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Name expansion iterator and result classes.
16
17Name expansion support for the various ways gsutil lets users refer to
18collections of data (via explicit wildcarding as well as directory,
19bucket, and bucket subdir implicit wildcarding). This class encapsulates
20the various rules for determining how these expansions are done.
21"""
22
23# Disable warnings for NameExpansionIteratorQueue functions; they implement
24# an interface which does not follow lint guidelines.
25# pylint: disable=invalid-name
26
27from __future__ import absolute_import
28
29import os
30import sys
31
32import gslib
33from gslib.exception import CommandException
34from gslib.plurality_checkable_iterator import PluralityCheckableIterator
35import gslib.wildcard_iterator
36from gslib.wildcard_iterator import StorageUrlFromString
37
38
39class NameExpansionResult(object):
40  """Holds one fully expanded result from iterating over NameExpansionIterator.
41
42  The member data in this class need to be pickleable because
43  NameExpansionResult instances are passed through Multiprocessing.Queue. In
44  particular, don't include any boto state like StorageUri, since that pulls
45  in a big tree of objects, some of which aren't pickleable (and even if
46  they were, pickling/unpickling such a large object tree would result in
47  significant overhead).
48
49  The state held in this object is needed for handling the various naming cases
50  (e.g., copying from a single source URL to a directory generates different
51  dest URL names than copying multiple URLs to a directory, to be consistent
52  with naming rules used by the Unix cp command). For more details see comments
53  in _NameExpansionIterator.
54  """
55
56  def __init__(self, source_storage_url, is_multi_source_request,
57               names_container, expanded_storage_url):
58    """Instantiates a result from name expansion.
59
60    Args:
61      source_storage_url: StorageUrl that was being expanded.
62      is_multi_source_request: bool indicator whether src_url_str expanded to
63          more than one BucketListingRef.
64      names_container: Bool indicator whether src_url names a container.
65      expanded_storage_url: StorageUrl that was expanded.
66    """
67    self.source_storage_url = source_storage_url
68    self.is_multi_source_request = is_multi_source_request
69    self.names_container = names_container
70    self.expanded_storage_url = expanded_storage_url
71
72  def __repr__(self):
73    return '%s' % self.expanded_storage_url
74
75
76class _NameExpansionIterator(object):
77  """Class that iterates over all source URLs passed to the iterator.
78
79  See details in __iter__ function doc.
80  """
81
82  def __init__(self, command_name, debug, logger, gsutil_api, url_strs,
83               recursion_requested, all_versions=False,
84               cmd_supports_recursion=True, project_id=None,
85               continue_on_error=False):
86    """Creates a NameExpansionIterator.
87
88    Args:
89      command_name: name of command being run.
90      debug: Debug level to pass to underlying iterators (range 0..3).
91      logger: logging.Logger object.
92      gsutil_api: Cloud storage interface.  Settable for testing/mocking.
93      url_strs: PluralityCheckableIterator of URL strings needing expansion.
94      recursion_requested: True if -r specified on command-line.  If so,
95          listings will be flattened so mapped-to results contain objects
96          spanning subdirectories.
97      all_versions: Bool indicating whether to iterate over all object versions.
98      cmd_supports_recursion: Bool indicating whether this command supports a
99          '-r' flag. Useful for printing helpful error messages.
100      project_id: Project id to use for bucket retrieval.
101      continue_on_error: If true, yield no-match exceptions encountered during
102                         iteration instead of raising them.
103
104    Examples of _NameExpansionIterator with recursion_requested=True:
105      - Calling with one of the url_strs being 'gs://bucket' will enumerate all
106        top-level objects, as will 'gs://bucket/' and 'gs://bucket/*'.
107      - 'gs://bucket/**' will enumerate all objects in the bucket.
108      - 'gs://bucket/abc' will enumerate either the single object abc or, if
109         abc is a subdirectory, all objects under abc and any of its
110         subdirectories.
111      - 'gs://bucket/abc/**' will enumerate all objects under abc or any of its
112        subdirectories.
113      - 'file:///tmp' will enumerate all files under /tmp, as will
114        'file:///tmp/*'
115      - 'file:///tmp/**' will enumerate all files under /tmp or any of its
116        subdirectories.
117
118    Example if recursion_requested=False:
119      calling with gs://bucket/abc/* lists matching objects
120      or subdirs, but not sub-subdirs or objects beneath subdirs.
121
122    Note: In step-by-step comments below we give examples assuming there's a
123    gs://bucket with object paths:
124      abcd/o1.txt
125      abcd/o2.txt
126      xyz/o1.txt
127      xyz/o2.txt
128    and a directory file://dir with file paths:
129      dir/a.txt
130      dir/b.txt
131      dir/c/
132    """
133    self.command_name = command_name
134    self.debug = debug
135    self.logger = logger
136    self.gsutil_api = gsutil_api
137    self.url_strs = url_strs
138    self.recursion_requested = recursion_requested
139    self.all_versions = all_versions
140    # Check self.url_strs.HasPlurality() at start because its value can change
141    # if url_strs is itself an iterator.
142    self.url_strs.has_plurality = self.url_strs.HasPlurality()
143    self.cmd_supports_recursion = cmd_supports_recursion
144    self.project_id = project_id
145    self.continue_on_error = continue_on_error
146
147    # Map holding wildcard strings to use for flat vs subdir-by-subdir listings.
148    # (A flat listing means show all objects expanded all the way down.)
149    self._flatness_wildcard = {True: '**', False: '*'}
150
151  def __iter__(self):
152    """Iterates over all source URLs passed to the iterator.
153
154    For each src url, expands wildcards, object-less bucket names,
155    subdir bucket names, and directory names, and generates a flat listing of
156    all the matching objects/files.
157
158    You should instantiate this object using the static factory function
159    NameExpansionIterator, because consumers of this iterator need the
160    PluralityCheckableIterator wrapper built by that function.
161
162    Yields:
163      gslib.name_expansion.NameExpansionResult.
164
165    Raises:
166      CommandException: if errors encountered.
167    """
168    for url_str in self.url_strs:
169      storage_url = StorageUrlFromString(url_str)
170
171      if storage_url.IsFileUrl() and storage_url.IsStream():
172        if self.url_strs.has_plurality:
173          raise CommandException('Multiple URL strings are not supported '
174                                 'with streaming ("-") URLs.')
175        yield NameExpansionResult(storage_url, False, False, storage_url)
176        continue
177
178      # Step 1: Expand any explicitly specified wildcards. The output from this
179      # step is an iterator of BucketListingRef.
180      # Starting with gs://buck*/abc* this step would expand to gs://bucket/abcd
181
182      src_names_bucket = False
183      if (storage_url.IsCloudUrl() and storage_url.IsBucket()
184          and not self.recursion_requested):
185        # UNIX commands like rm and cp will omit directory references.
186        # If url_str refers only to buckets and we are not recursing,
187        # then produce references of type BUCKET, because they are guaranteed
188        # to pass through Step 2 and be omitted in Step 3.
189        post_step1_iter = PluralityCheckableIterator(
190            self.WildcardIterator(url_str).IterBuckets(
191                bucket_fields=['id']))
192      else:
193        # Get a list of objects and prefixes, expanding the top level for
194        # any listed buckets.  If our source is a bucket, however, we need
195        # to treat all of the top level expansions as names_container=True.
196        post_step1_iter = PluralityCheckableIterator(
197            self.WildcardIterator(url_str).IterAll(
198                bucket_listing_fields=['name'],
199                expand_top_level_buckets=True))
200        if storage_url.IsCloudUrl() and storage_url.IsBucket():
201          src_names_bucket = True
202
203      # Step 2: Expand bucket subdirs. The output from this
204      # step is an iterator of (names_container, BucketListingRef).
205      # Starting with gs://bucket/abcd this step would expand to:
206      #   iter([(True, abcd/o1.txt), (True, abcd/o2.txt)]).
207      subdir_exp_wildcard = self._flatness_wildcard[self.recursion_requested]
208      if self.recursion_requested:
209        post_step2_iter = _ImplicitBucketSubdirIterator(
210            self, post_step1_iter, subdir_exp_wildcard)
211      else:
212        post_step2_iter = _NonContainerTuplifyIterator(post_step1_iter)
213      post_step2_iter = PluralityCheckableIterator(post_step2_iter)
214
215      # Because we actually perform and check object listings here, this will
216      # raise if url_args includes a non-existent object.  However,
217      # plurality_checkable_iterator will buffer the exception for us, not
218      # raising it until the iterator is actually asked to yield the first
219      # result.
220      if post_step2_iter.IsEmpty():
221        if self.continue_on_error:
222          try:
223            raise CommandException('No URLs matched: %s' % url_str)
224          except CommandException, e:
225            # Yield a specialized tuple of (exception, stack_trace) to
226            # the wrapping PluralityCheckableIterator.
227            yield (e, sys.exc_info()[2])
228        else:
229          raise CommandException('No URLs matched: %s' % url_str)
230
231      # Step 3. Omit any directories, buckets, or bucket subdirectories for
232      # non-recursive expansions.
233      post_step3_iter = PluralityCheckableIterator(_OmitNonRecursiveIterator(
234          post_step2_iter, self.recursion_requested, self.command_name,
235          self.cmd_supports_recursion, self.logger))
236
237      src_url_expands_to_multi = post_step3_iter.HasPlurality()
238      is_multi_source_request = (self.url_strs.has_plurality
239                                 or src_url_expands_to_multi)
240
241      # Step 4. Expand directories and buckets. This step yields the iterated
242      # values. Starting with gs://bucket this step would expand to:
243      #  [abcd/o1.txt, abcd/o2.txt, xyz/o1.txt, xyz/o2.txt]
244      # Starting with file://dir this step would expand to:
245      #  [dir/a.txt, dir/b.txt, dir/c/]
246      for (names_container, blr) in post_step3_iter:
247        src_names_container = src_names_bucket or names_container
248
249        if blr.IsObject():
250          yield NameExpansionResult(
251              storage_url, is_multi_source_request, src_names_container,
252              blr.storage_url)
253        else:
254          # Use implicit wildcarding to do the enumeration.
255          # At this point we are guaranteed that:
256          # - Recursion has been requested because non-object entries are
257          #   filtered in step 3 otherwise.
258          # - This is a prefix or bucket subdirectory because only
259          #   non-recursive iterations product bucket references.
260          expanded_url = StorageUrlFromString(blr.url_string)
261          if expanded_url.IsFileUrl():
262            # Convert dir to implicit recursive wildcard.
263            url_to_iterate = '%s%s%s' % (blr, os.sep, subdir_exp_wildcard)
264          else:
265            # Convert subdir to implicit recursive wildcard.
266            url_to_iterate = expanded_url.CreatePrefixUrl(
267                wildcard_suffix=subdir_exp_wildcard)
268
269          wc_iter = PluralityCheckableIterator(
270              self.WildcardIterator(url_to_iterate).IterObjects(
271                  bucket_listing_fields=['name']))
272          src_url_expands_to_multi = (src_url_expands_to_multi
273                                      or wc_iter.HasPlurality())
274          is_multi_source_request = (self.url_strs.has_plurality
275                                     or src_url_expands_to_multi)
276          # This will be a flattened listing of all underlying objects in the
277          # subdir.
278          for blr in wc_iter:
279            yield NameExpansionResult(
280                storage_url, is_multi_source_request, True, blr.storage_url)
281
282  def WildcardIterator(self, url_string):
283    """Helper to instantiate gslib.WildcardIterator.
284
285    Args are same as gslib.WildcardIterator interface, but this method fills
286    in most of the values from instance state.
287
288    Args:
289      url_string: URL string naming wildcard objects to iterate.
290
291    Returns:
292      Wildcard iterator over URL string.
293    """
294    return gslib.wildcard_iterator.CreateWildcardIterator(
295        url_string, self.gsutil_api, debug=self.debug,
296        all_versions=self.all_versions,
297        project_id=self.project_id)
298
299
300def NameExpansionIterator(command_name, debug, logger, gsutil_api, url_strs,
301                          recursion_requested, all_versions=False,
302                          cmd_supports_recursion=True, project_id=None,
303                          continue_on_error=False):
304  """Static factory function for instantiating _NameExpansionIterator.
305
306  This wraps the resulting iterator in a PluralityCheckableIterator and checks
307  that it is non-empty. Also, allows url_strs to be either an array or an
308  iterator.
309
310  Args:
311    command_name: name of command being run.
312    debug: Debug level to pass to underlying iterators (range 0..3).
313    logger: logging.Logger object.
314    gsutil_api: Cloud storage interface.  Settable for testing/mocking.
315    url_strs: Iterable URL strings needing expansion.
316    recursion_requested: True if -r specified on command-line.  If so,
317        listings will be flattened so mapped-to results contain objects
318        spanning subdirectories.
319    all_versions: Bool indicating whether to iterate over all object versions.
320    cmd_supports_recursion: Bool indicating whether this command supports a '-r'
321        flag. Useful for printing helpful error messages.
322    project_id: Project id to use for the current command.
323    continue_on_error: If true, yield no-match exceptions encountered during
324                       iteration instead of raising them.
325
326  Raises:
327    CommandException if underlying iterator is empty.
328
329  Returns:
330    Name expansion iterator instance.
331
332  For example semantics, see comments in NameExpansionIterator.__init__.
333  """
334  url_strs = PluralityCheckableIterator(url_strs)
335  name_expansion_iterator = _NameExpansionIterator(
336      command_name, debug, logger, gsutil_api, url_strs, recursion_requested,
337      all_versions=all_versions, cmd_supports_recursion=cmd_supports_recursion,
338      project_id=project_id, continue_on_error=continue_on_error)
339  name_expansion_iterator = PluralityCheckableIterator(name_expansion_iterator)
340  if name_expansion_iterator.IsEmpty():
341    raise CommandException('No URLs matched')
342  return name_expansion_iterator
343
344
345class NameExpansionIteratorQueue(object):
346  """Wrapper around NameExpansionIterator with Multiprocessing.Queue interface.
347
348  Only a blocking get() function can be called, and the block and timeout
349  params on that function are ignored. All other class functions raise
350  NotImplementedError.
351
352  This class is thread safe.
353  """
354
355  def __init__(self, name_expansion_iterator, final_value):
356    self.name_expansion_iterator = name_expansion_iterator
357    self.final_value = final_value
358    self.lock = gslib.util.manager.Lock()
359
360  def qsize(self):
361    raise NotImplementedError(
362        'NameExpansionIteratorQueue.qsize() not implemented')
363
364  def empty(self):
365    raise NotImplementedError(
366        'NameExpansionIteratorQueue.empty() not implemented')
367
368  def full(self):
369    raise NotImplementedError(
370        'NameExpansionIteratorQueue.full() not implemented')
371
372  # pylint: disable=unused-argument
373  def put(self, obj=None, block=None, timeout=None):
374    raise NotImplementedError(
375        'NameExpansionIteratorQueue.put() not implemented')
376
377  def put_nowait(self, obj):
378    raise NotImplementedError(
379        'NameExpansionIteratorQueue.put_nowait() not implemented')
380
381  # pylint: disable=unused-argument
382  def get(self, block=None, timeout=None):
383    self.lock.acquire()
384    try:
385      if self.name_expansion_iterator.IsEmpty():
386        return self.final_value
387      return self.name_expansion_iterator.next()
388    finally:
389      self.lock.release()
390
391  def get_nowait(self):
392    raise NotImplementedError(
393        'NameExpansionIteratorQueue.get_nowait() not implemented')
394
395  def get_no_wait(self):
396    raise NotImplementedError(
397        'NameExpansionIteratorQueue.get_no_wait() not implemented')
398
399  def close(self):
400    raise NotImplementedError(
401        'NameExpansionIteratorQueue.close() not implemented')
402
403  def join_thread(self):
404    raise NotImplementedError(
405        'NameExpansionIteratorQueue.join_thread() not implemented')
406
407  def cancel_join_thread(self):
408    raise NotImplementedError(
409        'NameExpansionIteratorQueue.cancel_join_thread() not implemented')
410
411
412class _NonContainerTuplifyIterator(object):
413  """Iterator that produces the tuple (False, blr) for each iterated value.
414
415  Used for cases where blr_iter iterates over a set of
416  BucketListingRefs known not to name containers.
417  """
418
419  def __init__(self, blr_iter):
420    """Instantiates iterator.
421
422    Args:
423      blr_iter: iterator of BucketListingRef.
424    """
425    self.blr_iter = blr_iter
426
427  def __iter__(self):
428    for blr in self.blr_iter:
429      yield (False, blr)
430
431
432class _OmitNonRecursiveIterator(object):
433  """Iterator wrapper for that omits certain values for non-recursive requests.
434
435  This iterates over tuples of (names_container, BucketListingReference) and
436  omits directories, prefixes, and buckets from non-recurisve requests
437  so that we can properly calculate whether the source URL expands to multiple
438  URLs.
439
440  For example, if we have a bucket containing two objects: bucket/foo and
441  bucket/foo/bar and we do a non-recursive iteration, only bucket/foo will be
442  yielded.
443  """
444
445  def __init__(self, tuple_iter, recursion_requested, command_name,
446               cmd_supports_recursion, logger):
447    """Instanties the iterator.
448
449    Args:
450      tuple_iter: Iterator over names_container, BucketListingReference
451                  from step 2 in the NameExpansionIterator
452      recursion_requested: If false, omit buckets, dirs, and subdirs
453      command_name: Command name for user messages
454      cmd_supports_recursion: Command recursion support for user messages
455      logger: Log object for user messages
456    """
457    self.tuple_iter = tuple_iter
458    self.recursion_requested = recursion_requested
459    self.command_name = command_name
460    self.cmd_supports_recursion = cmd_supports_recursion
461    self.logger = logger
462
463  def __iter__(self):
464    for (names_container, blr) in self.tuple_iter:
465      if not self.recursion_requested and not blr.IsObject():
466        # At this point we either have a bucket or a prefix,
467        # so if recursion is not requested, we're going to omit it.
468        expanded_url = StorageUrlFromString(blr.url_string)
469        if expanded_url.IsFileUrl():
470          desc = 'directory'
471        else:
472          desc = blr.type_name
473        if self.cmd_supports_recursion:
474          self.logger.info(
475              'Omitting %s "%s". (Did you mean to do %s -r?)',
476              desc, blr.url_string, self.command_name)
477        else:
478          self.logger.info('Omitting %s "%s".', desc, blr.url_string)
479      else:
480        yield (names_container, blr)
481
482
483class _ImplicitBucketSubdirIterator(object):
484  """Iterator wrapper that performs implicit bucket subdir expansion.
485
486  Each iteration yields tuple (names_container, expanded BucketListingRefs)
487    where names_container is true if URL names a directory, bucket,
488    or bucket subdir.
489
490  For example, iterating over [BucketListingRef("gs://abc")] would expand to:
491    [BucketListingRef("gs://abc/o1"), BucketListingRef("gs://abc/o2")]
492  if those subdir objects exist, and [BucketListingRef("gs://abc") otherwise.
493  """
494
495  def __init__(self, name_exp_instance, blr_iter, subdir_exp_wildcard):
496    """Instantiates the iterator.
497
498    Args:
499      name_exp_instance: calling instance of NameExpansion class.
500      blr_iter: iterator over BucketListingRef prefixes and objects.
501      subdir_exp_wildcard: wildcard for expanding subdirectories;
502          expected values are ** if the mapped-to results should contain
503          objects spanning subdirectories, or * if only one level should
504          be listed.
505    """
506    self.blr_iter = blr_iter
507    self.name_exp_instance = name_exp_instance
508    self.subdir_exp_wildcard = subdir_exp_wildcard
509
510  def __iter__(self):
511    for blr in self.blr_iter:
512      if blr.IsPrefix():
513        # This is a bucket subdirectory, list objects according to the wildcard.
514        prefix_url = StorageUrlFromString(blr.url_string).CreatePrefixUrl(
515            wildcard_suffix=self.subdir_exp_wildcard)
516        implicit_subdir_iterator = PluralityCheckableIterator(
517            self.name_exp_instance.WildcardIterator(
518                prefix_url).IterAll(bucket_listing_fields=['name']))
519        if not implicit_subdir_iterator.IsEmpty():
520          for exp_blr in implicit_subdir_iterator:
521            yield (True, exp_blr)
522        else:
523          # Prefix that contains no objects, for example in the $folder$ case
524          # or an empty filesystem directory.
525          yield (False, blr)
526      elif blr.IsObject():
527        yield (False, blr)
528      else:
529        raise CommandException(
530            '_ImplicitBucketSubdirIterator got a bucket reference %s' % blr)
531