1# -*- coding: utf-8 -*- 2# Copyright 2012 Google Inc. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Name expansion iterator and result classes. 16 17Name expansion support for the various ways gsutil lets users refer to 18collections of data (via explicit wildcarding as well as directory, 19bucket, and bucket subdir implicit wildcarding). This class encapsulates 20the various rules for determining how these expansions are done. 21""" 22 23# Disable warnings for NameExpansionIteratorQueue functions; they implement 24# an interface which does not follow lint guidelines. 25# pylint: disable=invalid-name 26 27from __future__ import absolute_import 28 29import os 30import sys 31 32import gslib 33from gslib.exception import CommandException 34from gslib.plurality_checkable_iterator import PluralityCheckableIterator 35import gslib.wildcard_iterator 36from gslib.wildcard_iterator import StorageUrlFromString 37 38 39class NameExpansionResult(object): 40 """Holds one fully expanded result from iterating over NameExpansionIterator. 41 42 The member data in this class need to be pickleable because 43 NameExpansionResult instances are passed through Multiprocessing.Queue. In 44 particular, don't include any boto state like StorageUri, since that pulls 45 in a big tree of objects, some of which aren't pickleable (and even if 46 they were, pickling/unpickling such a large object tree would result in 47 significant overhead). 48 49 The state held in this object is needed for handling the various naming cases 50 (e.g., copying from a single source URL to a directory generates different 51 dest URL names than copying multiple URLs to a directory, to be consistent 52 with naming rules used by the Unix cp command). For more details see comments 53 in _NameExpansionIterator. 54 """ 55 56 def __init__(self, source_storage_url, is_multi_source_request, 57 names_container, expanded_storage_url): 58 """Instantiates a result from name expansion. 59 60 Args: 61 source_storage_url: StorageUrl that was being expanded. 62 is_multi_source_request: bool indicator whether src_url_str expanded to 63 more than one BucketListingRef. 64 names_container: Bool indicator whether src_url names a container. 65 expanded_storage_url: StorageUrl that was expanded. 66 """ 67 self.source_storage_url = source_storage_url 68 self.is_multi_source_request = is_multi_source_request 69 self.names_container = names_container 70 self.expanded_storage_url = expanded_storage_url 71 72 def __repr__(self): 73 return '%s' % self.expanded_storage_url 74 75 76class _NameExpansionIterator(object): 77 """Class that iterates over all source URLs passed to the iterator. 78 79 See details in __iter__ function doc. 80 """ 81 82 def __init__(self, command_name, debug, logger, gsutil_api, url_strs, 83 recursion_requested, all_versions=False, 84 cmd_supports_recursion=True, project_id=None, 85 continue_on_error=False): 86 """Creates a NameExpansionIterator. 87 88 Args: 89 command_name: name of command being run. 90 debug: Debug level to pass to underlying iterators (range 0..3). 91 logger: logging.Logger object. 92 gsutil_api: Cloud storage interface. Settable for testing/mocking. 93 url_strs: PluralityCheckableIterator of URL strings needing expansion. 94 recursion_requested: True if -r specified on command-line. If so, 95 listings will be flattened so mapped-to results contain objects 96 spanning subdirectories. 97 all_versions: Bool indicating whether to iterate over all object versions. 98 cmd_supports_recursion: Bool indicating whether this command supports a 99 '-r' flag. Useful for printing helpful error messages. 100 project_id: Project id to use for bucket retrieval. 101 continue_on_error: If true, yield no-match exceptions encountered during 102 iteration instead of raising them. 103 104 Examples of _NameExpansionIterator with recursion_requested=True: 105 - Calling with one of the url_strs being 'gs://bucket' will enumerate all 106 top-level objects, as will 'gs://bucket/' and 'gs://bucket/*'. 107 - 'gs://bucket/**' will enumerate all objects in the bucket. 108 - 'gs://bucket/abc' will enumerate either the single object abc or, if 109 abc is a subdirectory, all objects under abc and any of its 110 subdirectories. 111 - 'gs://bucket/abc/**' will enumerate all objects under abc or any of its 112 subdirectories. 113 - 'file:///tmp' will enumerate all files under /tmp, as will 114 'file:///tmp/*' 115 - 'file:///tmp/**' will enumerate all files under /tmp or any of its 116 subdirectories. 117 118 Example if recursion_requested=False: 119 calling with gs://bucket/abc/* lists matching objects 120 or subdirs, but not sub-subdirs or objects beneath subdirs. 121 122 Note: In step-by-step comments below we give examples assuming there's a 123 gs://bucket with object paths: 124 abcd/o1.txt 125 abcd/o2.txt 126 xyz/o1.txt 127 xyz/o2.txt 128 and a directory file://dir with file paths: 129 dir/a.txt 130 dir/b.txt 131 dir/c/ 132 """ 133 self.command_name = command_name 134 self.debug = debug 135 self.logger = logger 136 self.gsutil_api = gsutil_api 137 self.url_strs = url_strs 138 self.recursion_requested = recursion_requested 139 self.all_versions = all_versions 140 # Check self.url_strs.HasPlurality() at start because its value can change 141 # if url_strs is itself an iterator. 142 self.url_strs.has_plurality = self.url_strs.HasPlurality() 143 self.cmd_supports_recursion = cmd_supports_recursion 144 self.project_id = project_id 145 self.continue_on_error = continue_on_error 146 147 # Map holding wildcard strings to use for flat vs subdir-by-subdir listings. 148 # (A flat listing means show all objects expanded all the way down.) 149 self._flatness_wildcard = {True: '**', False: '*'} 150 151 def __iter__(self): 152 """Iterates over all source URLs passed to the iterator. 153 154 For each src url, expands wildcards, object-less bucket names, 155 subdir bucket names, and directory names, and generates a flat listing of 156 all the matching objects/files. 157 158 You should instantiate this object using the static factory function 159 NameExpansionIterator, because consumers of this iterator need the 160 PluralityCheckableIterator wrapper built by that function. 161 162 Yields: 163 gslib.name_expansion.NameExpansionResult. 164 165 Raises: 166 CommandException: if errors encountered. 167 """ 168 for url_str in self.url_strs: 169 storage_url = StorageUrlFromString(url_str) 170 171 if storage_url.IsFileUrl() and storage_url.IsStream(): 172 if self.url_strs.has_plurality: 173 raise CommandException('Multiple URL strings are not supported ' 174 'with streaming ("-") URLs.') 175 yield NameExpansionResult(storage_url, False, False, storage_url) 176 continue 177 178 # Step 1: Expand any explicitly specified wildcards. The output from this 179 # step is an iterator of BucketListingRef. 180 # Starting with gs://buck*/abc* this step would expand to gs://bucket/abcd 181 182 src_names_bucket = False 183 if (storage_url.IsCloudUrl() and storage_url.IsBucket() 184 and not self.recursion_requested): 185 # UNIX commands like rm and cp will omit directory references. 186 # If url_str refers only to buckets and we are not recursing, 187 # then produce references of type BUCKET, because they are guaranteed 188 # to pass through Step 2 and be omitted in Step 3. 189 post_step1_iter = PluralityCheckableIterator( 190 self.WildcardIterator(url_str).IterBuckets( 191 bucket_fields=['id'])) 192 else: 193 # Get a list of objects and prefixes, expanding the top level for 194 # any listed buckets. If our source is a bucket, however, we need 195 # to treat all of the top level expansions as names_container=True. 196 post_step1_iter = PluralityCheckableIterator( 197 self.WildcardIterator(url_str).IterAll( 198 bucket_listing_fields=['name'], 199 expand_top_level_buckets=True)) 200 if storage_url.IsCloudUrl() and storage_url.IsBucket(): 201 src_names_bucket = True 202 203 # Step 2: Expand bucket subdirs. The output from this 204 # step is an iterator of (names_container, BucketListingRef). 205 # Starting with gs://bucket/abcd this step would expand to: 206 # iter([(True, abcd/o1.txt), (True, abcd/o2.txt)]). 207 subdir_exp_wildcard = self._flatness_wildcard[self.recursion_requested] 208 if self.recursion_requested: 209 post_step2_iter = _ImplicitBucketSubdirIterator( 210 self, post_step1_iter, subdir_exp_wildcard) 211 else: 212 post_step2_iter = _NonContainerTuplifyIterator(post_step1_iter) 213 post_step2_iter = PluralityCheckableIterator(post_step2_iter) 214 215 # Because we actually perform and check object listings here, this will 216 # raise if url_args includes a non-existent object. However, 217 # plurality_checkable_iterator will buffer the exception for us, not 218 # raising it until the iterator is actually asked to yield the first 219 # result. 220 if post_step2_iter.IsEmpty(): 221 if self.continue_on_error: 222 try: 223 raise CommandException('No URLs matched: %s' % url_str) 224 except CommandException, e: 225 # Yield a specialized tuple of (exception, stack_trace) to 226 # the wrapping PluralityCheckableIterator. 227 yield (e, sys.exc_info()[2]) 228 else: 229 raise CommandException('No URLs matched: %s' % url_str) 230 231 # Step 3. Omit any directories, buckets, or bucket subdirectories for 232 # non-recursive expansions. 233 post_step3_iter = PluralityCheckableIterator(_OmitNonRecursiveIterator( 234 post_step2_iter, self.recursion_requested, self.command_name, 235 self.cmd_supports_recursion, self.logger)) 236 237 src_url_expands_to_multi = post_step3_iter.HasPlurality() 238 is_multi_source_request = (self.url_strs.has_plurality 239 or src_url_expands_to_multi) 240 241 # Step 4. Expand directories and buckets. This step yields the iterated 242 # values. Starting with gs://bucket this step would expand to: 243 # [abcd/o1.txt, abcd/o2.txt, xyz/o1.txt, xyz/o2.txt] 244 # Starting with file://dir this step would expand to: 245 # [dir/a.txt, dir/b.txt, dir/c/] 246 for (names_container, blr) in post_step3_iter: 247 src_names_container = src_names_bucket or names_container 248 249 if blr.IsObject(): 250 yield NameExpansionResult( 251 storage_url, is_multi_source_request, src_names_container, 252 blr.storage_url) 253 else: 254 # Use implicit wildcarding to do the enumeration. 255 # At this point we are guaranteed that: 256 # - Recursion has been requested because non-object entries are 257 # filtered in step 3 otherwise. 258 # - This is a prefix or bucket subdirectory because only 259 # non-recursive iterations product bucket references. 260 expanded_url = StorageUrlFromString(blr.url_string) 261 if expanded_url.IsFileUrl(): 262 # Convert dir to implicit recursive wildcard. 263 url_to_iterate = '%s%s%s' % (blr, os.sep, subdir_exp_wildcard) 264 else: 265 # Convert subdir to implicit recursive wildcard. 266 url_to_iterate = expanded_url.CreatePrefixUrl( 267 wildcard_suffix=subdir_exp_wildcard) 268 269 wc_iter = PluralityCheckableIterator( 270 self.WildcardIterator(url_to_iterate).IterObjects( 271 bucket_listing_fields=['name'])) 272 src_url_expands_to_multi = (src_url_expands_to_multi 273 or wc_iter.HasPlurality()) 274 is_multi_source_request = (self.url_strs.has_plurality 275 or src_url_expands_to_multi) 276 # This will be a flattened listing of all underlying objects in the 277 # subdir. 278 for blr in wc_iter: 279 yield NameExpansionResult( 280 storage_url, is_multi_source_request, True, blr.storage_url) 281 282 def WildcardIterator(self, url_string): 283 """Helper to instantiate gslib.WildcardIterator. 284 285 Args are same as gslib.WildcardIterator interface, but this method fills 286 in most of the values from instance state. 287 288 Args: 289 url_string: URL string naming wildcard objects to iterate. 290 291 Returns: 292 Wildcard iterator over URL string. 293 """ 294 return gslib.wildcard_iterator.CreateWildcardIterator( 295 url_string, self.gsutil_api, debug=self.debug, 296 all_versions=self.all_versions, 297 project_id=self.project_id) 298 299 300def NameExpansionIterator(command_name, debug, logger, gsutil_api, url_strs, 301 recursion_requested, all_versions=False, 302 cmd_supports_recursion=True, project_id=None, 303 continue_on_error=False): 304 """Static factory function for instantiating _NameExpansionIterator. 305 306 This wraps the resulting iterator in a PluralityCheckableIterator and checks 307 that it is non-empty. Also, allows url_strs to be either an array or an 308 iterator. 309 310 Args: 311 command_name: name of command being run. 312 debug: Debug level to pass to underlying iterators (range 0..3). 313 logger: logging.Logger object. 314 gsutil_api: Cloud storage interface. Settable for testing/mocking. 315 url_strs: Iterable URL strings needing expansion. 316 recursion_requested: True if -r specified on command-line. If so, 317 listings will be flattened so mapped-to results contain objects 318 spanning subdirectories. 319 all_versions: Bool indicating whether to iterate over all object versions. 320 cmd_supports_recursion: Bool indicating whether this command supports a '-r' 321 flag. Useful for printing helpful error messages. 322 project_id: Project id to use for the current command. 323 continue_on_error: If true, yield no-match exceptions encountered during 324 iteration instead of raising them. 325 326 Raises: 327 CommandException if underlying iterator is empty. 328 329 Returns: 330 Name expansion iterator instance. 331 332 For example semantics, see comments in NameExpansionIterator.__init__. 333 """ 334 url_strs = PluralityCheckableIterator(url_strs) 335 name_expansion_iterator = _NameExpansionIterator( 336 command_name, debug, logger, gsutil_api, url_strs, recursion_requested, 337 all_versions=all_versions, cmd_supports_recursion=cmd_supports_recursion, 338 project_id=project_id, continue_on_error=continue_on_error) 339 name_expansion_iterator = PluralityCheckableIterator(name_expansion_iterator) 340 if name_expansion_iterator.IsEmpty(): 341 raise CommandException('No URLs matched') 342 return name_expansion_iterator 343 344 345class NameExpansionIteratorQueue(object): 346 """Wrapper around NameExpansionIterator with Multiprocessing.Queue interface. 347 348 Only a blocking get() function can be called, and the block and timeout 349 params on that function are ignored. All other class functions raise 350 NotImplementedError. 351 352 This class is thread safe. 353 """ 354 355 def __init__(self, name_expansion_iterator, final_value): 356 self.name_expansion_iterator = name_expansion_iterator 357 self.final_value = final_value 358 self.lock = gslib.util.manager.Lock() 359 360 def qsize(self): 361 raise NotImplementedError( 362 'NameExpansionIteratorQueue.qsize() not implemented') 363 364 def empty(self): 365 raise NotImplementedError( 366 'NameExpansionIteratorQueue.empty() not implemented') 367 368 def full(self): 369 raise NotImplementedError( 370 'NameExpansionIteratorQueue.full() not implemented') 371 372 # pylint: disable=unused-argument 373 def put(self, obj=None, block=None, timeout=None): 374 raise NotImplementedError( 375 'NameExpansionIteratorQueue.put() not implemented') 376 377 def put_nowait(self, obj): 378 raise NotImplementedError( 379 'NameExpansionIteratorQueue.put_nowait() not implemented') 380 381 # pylint: disable=unused-argument 382 def get(self, block=None, timeout=None): 383 self.lock.acquire() 384 try: 385 if self.name_expansion_iterator.IsEmpty(): 386 return self.final_value 387 return self.name_expansion_iterator.next() 388 finally: 389 self.lock.release() 390 391 def get_nowait(self): 392 raise NotImplementedError( 393 'NameExpansionIteratorQueue.get_nowait() not implemented') 394 395 def get_no_wait(self): 396 raise NotImplementedError( 397 'NameExpansionIteratorQueue.get_no_wait() not implemented') 398 399 def close(self): 400 raise NotImplementedError( 401 'NameExpansionIteratorQueue.close() not implemented') 402 403 def join_thread(self): 404 raise NotImplementedError( 405 'NameExpansionIteratorQueue.join_thread() not implemented') 406 407 def cancel_join_thread(self): 408 raise NotImplementedError( 409 'NameExpansionIteratorQueue.cancel_join_thread() not implemented') 410 411 412class _NonContainerTuplifyIterator(object): 413 """Iterator that produces the tuple (False, blr) for each iterated value. 414 415 Used for cases where blr_iter iterates over a set of 416 BucketListingRefs known not to name containers. 417 """ 418 419 def __init__(self, blr_iter): 420 """Instantiates iterator. 421 422 Args: 423 blr_iter: iterator of BucketListingRef. 424 """ 425 self.blr_iter = blr_iter 426 427 def __iter__(self): 428 for blr in self.blr_iter: 429 yield (False, blr) 430 431 432class _OmitNonRecursiveIterator(object): 433 """Iterator wrapper for that omits certain values for non-recursive requests. 434 435 This iterates over tuples of (names_container, BucketListingReference) and 436 omits directories, prefixes, and buckets from non-recurisve requests 437 so that we can properly calculate whether the source URL expands to multiple 438 URLs. 439 440 For example, if we have a bucket containing two objects: bucket/foo and 441 bucket/foo/bar and we do a non-recursive iteration, only bucket/foo will be 442 yielded. 443 """ 444 445 def __init__(self, tuple_iter, recursion_requested, command_name, 446 cmd_supports_recursion, logger): 447 """Instanties the iterator. 448 449 Args: 450 tuple_iter: Iterator over names_container, BucketListingReference 451 from step 2 in the NameExpansionIterator 452 recursion_requested: If false, omit buckets, dirs, and subdirs 453 command_name: Command name for user messages 454 cmd_supports_recursion: Command recursion support for user messages 455 logger: Log object for user messages 456 """ 457 self.tuple_iter = tuple_iter 458 self.recursion_requested = recursion_requested 459 self.command_name = command_name 460 self.cmd_supports_recursion = cmd_supports_recursion 461 self.logger = logger 462 463 def __iter__(self): 464 for (names_container, blr) in self.tuple_iter: 465 if not self.recursion_requested and not blr.IsObject(): 466 # At this point we either have a bucket or a prefix, 467 # so if recursion is not requested, we're going to omit it. 468 expanded_url = StorageUrlFromString(blr.url_string) 469 if expanded_url.IsFileUrl(): 470 desc = 'directory' 471 else: 472 desc = blr.type_name 473 if self.cmd_supports_recursion: 474 self.logger.info( 475 'Omitting %s "%s". (Did you mean to do %s -r?)', 476 desc, blr.url_string, self.command_name) 477 else: 478 self.logger.info('Omitting %s "%s".', desc, blr.url_string) 479 else: 480 yield (names_container, blr) 481 482 483class _ImplicitBucketSubdirIterator(object): 484 """Iterator wrapper that performs implicit bucket subdir expansion. 485 486 Each iteration yields tuple (names_container, expanded BucketListingRefs) 487 where names_container is true if URL names a directory, bucket, 488 or bucket subdir. 489 490 For example, iterating over [BucketListingRef("gs://abc")] would expand to: 491 [BucketListingRef("gs://abc/o1"), BucketListingRef("gs://abc/o2")] 492 if those subdir objects exist, and [BucketListingRef("gs://abc") otherwise. 493 """ 494 495 def __init__(self, name_exp_instance, blr_iter, subdir_exp_wildcard): 496 """Instantiates the iterator. 497 498 Args: 499 name_exp_instance: calling instance of NameExpansion class. 500 blr_iter: iterator over BucketListingRef prefixes and objects. 501 subdir_exp_wildcard: wildcard for expanding subdirectories; 502 expected values are ** if the mapped-to results should contain 503 objects spanning subdirectories, or * if only one level should 504 be listed. 505 """ 506 self.blr_iter = blr_iter 507 self.name_exp_instance = name_exp_instance 508 self.subdir_exp_wildcard = subdir_exp_wildcard 509 510 def __iter__(self): 511 for blr in self.blr_iter: 512 if blr.IsPrefix(): 513 # This is a bucket subdirectory, list objects according to the wildcard. 514 prefix_url = StorageUrlFromString(blr.url_string).CreatePrefixUrl( 515 wildcard_suffix=self.subdir_exp_wildcard) 516 implicit_subdir_iterator = PluralityCheckableIterator( 517 self.name_exp_instance.WildcardIterator( 518 prefix_url).IterAll(bucket_listing_fields=['name'])) 519 if not implicit_subdir_iterator.IsEmpty(): 520 for exp_blr in implicit_subdir_iterator: 521 yield (True, exp_blr) 522 else: 523 # Prefix that contains no objects, for example in the $folder$ case 524 # or an empty filesystem directory. 525 yield (False, blr) 526 elif blr.IsObject(): 527 yield (False, blr) 528 else: 529 raise CommandException( 530 '_ImplicitBucketSubdirIterator got a bucket reference %s' % blr) 531