1# -*- coding: utf-8 -*- 2# Copyright 2011 Google Inc. All Rights Reserved. 3# Copyright 2011, Nexenta Systems Inc. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16"""Helper functions for copy functionality.""" 17 18from __future__ import absolute_import 19 20import base64 21from collections import namedtuple 22import csv 23import datetime 24import errno 25import gzip 26from hashlib import md5 27import json 28import logging 29import mimetypes 30from operator import attrgetter 31import os 32import pickle 33import random 34import re 35import shutil 36import stat 37import subprocess 38import tempfile 39import textwrap 40import time 41import traceback 42 43from boto import config 44import crcmod 45 46import gslib 47from gslib.cloud_api import ArgumentException 48from gslib.cloud_api import CloudApi 49from gslib.cloud_api import NotFoundException 50from gslib.cloud_api import PreconditionException 51from gslib.cloud_api import Preconditions 52from gslib.cloud_api import ResumableDownloadException 53from gslib.cloud_api import ResumableUploadAbortException 54from gslib.cloud_api import ResumableUploadException 55from gslib.cloud_api import ResumableUploadStartOverException 56from gslib.cloud_api_helper import GetDownloadSerializationData 57from gslib.commands.compose import MAX_COMPOSE_ARITY 58from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE 59from gslib.commands.config import DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD 60from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_COMPONENT_SIZE 61from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS 62from gslib.commands.config import DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD 63from gslib.cs_api_map import ApiSelector 64from gslib.daisy_chain_wrapper import DaisyChainWrapper 65from gslib.exception import CommandException 66from gslib.exception import HashMismatchException 67from gslib.file_part import FilePart 68from gslib.hashing_helper import Base64EncodeHash 69from gslib.hashing_helper import CalculateB64EncodedMd5FromContents 70from gslib.hashing_helper import CalculateHashesFromContents 71from gslib.hashing_helper import CHECK_HASH_IF_FAST_ELSE_FAIL 72from gslib.hashing_helper import CHECK_HASH_NEVER 73from gslib.hashing_helper import ConcatCrc32c 74from gslib.hashing_helper import GetDownloadHashAlgs 75from gslib.hashing_helper import GetUploadHashAlgs 76from gslib.hashing_helper import HashingFileUploadWrapper 77from gslib.parallelism_framework_util import AtomicDict 78from gslib.progress_callback import ConstructAnnounceText 79from gslib.progress_callback import FileProgressCallbackHandler 80from gslib.progress_callback import ProgressCallbackWithBackoff 81from gslib.resumable_streaming_upload import ResumableStreamingJsonUploadWrapper 82from gslib.storage_url import ContainsWildcard 83from gslib.storage_url import StorageUrlFromString 84from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages 85from gslib.tracker_file import DeleteDownloadTrackerFiles 86from gslib.tracker_file import DeleteTrackerFile 87from gslib.tracker_file import GetTrackerFilePath 88from gslib.tracker_file import RaiseUnwritableTrackerFileException 89from gslib.tracker_file import ReadOrCreateDownloadTrackerFile 90from gslib.tracker_file import TrackerFileType 91from gslib.tracker_file import WriteDownloadComponentTrackerFile 92from gslib.translation_helper import AddS3MarkerAclToObjectMetadata 93from gslib.translation_helper import CopyObjectMetadata 94from gslib.translation_helper import DEFAULT_CONTENT_TYPE 95from gslib.translation_helper import GenerationFromUrlAndString 96from gslib.translation_helper import ObjectMetadataFromHeaders 97from gslib.translation_helper import PreconditionsFromHeaders 98from gslib.translation_helper import S3MarkerAclFromObjectMetadata 99from gslib.util import CheckFreeSpace 100from gslib.util import CheckMultiprocessingAvailableAndInit 101from gslib.util import CreateLock 102from gslib.util import DEFAULT_FILE_BUFFER_SIZE 103from gslib.util import DivideAndCeil 104from gslib.util import GetCloudApiInstance 105from gslib.util import GetFileSize 106from gslib.util import GetJsonResumableChunkSize 107from gslib.util import GetMaxRetryDelay 108from gslib.util import GetNumRetries 109from gslib.util import GetStreamFromFileUrl 110from gslib.util import HumanReadableToBytes 111from gslib.util import IS_WINDOWS 112from gslib.util import IsCloudSubdirPlaceholder 113from gslib.util import MakeHumanReadable 114from gslib.util import MIN_SIZE_COMPUTE_LOGGING 115from gslib.util import ResumableThreshold 116from gslib.util import TEN_MIB 117from gslib.util import UsingCrcmodExtension 118from gslib.util import UTF8 119from gslib.wildcard_iterator import CreateWildcardIterator 120 121# pylint: disable=g-import-not-at-top 122if IS_WINDOWS: 123 import msvcrt 124 125# Declare copy_helper_opts as a global because namedtuple isn't aware of 126# assigning to a class member (which breaks pickling done by multiprocessing). 127# For details see 128# http://stackoverflow.com/questions/16377215/how-to-pickle-a-namedtuple-instance-correctly 129# pylint: disable=global-at-module-level 130global global_copy_helper_opts 131 132# In-memory map of local files that are currently opened for write. Used to 133# ensure that if we write to the same file twice (say, for example, because the 134# user specified two identical source URLs), the writes occur serially. 135global open_files_map, open_files_lock 136open_files_map = ( 137 AtomicDict() if not CheckMultiprocessingAvailableAndInit().is_available 138 else AtomicDict(manager=gslib.util.manager)) 139 140# We don't allow multiple processes on Windows, so using a process-safe lock 141# would be unnecessary. 142open_files_lock = CreateLock() 143 144# For debugging purposes; if True, files and objects that fail hash validation 145# will be saved with the below suffix appended. 146_RENAME_ON_HASH_MISMATCH = False 147_RENAME_ON_HASH_MISMATCH_SUFFIX = '_corrupt' 148 149PARALLEL_UPLOAD_TEMP_NAMESPACE = ( 150 u'/gsutil/tmp/parallel_composite_uploads/for_details_see/gsutil_help_cp/') 151 152PARALLEL_UPLOAD_STATIC_SALT = u""" 153PARALLEL_UPLOAD_SALT_TO_PREVENT_COLLISIONS. 154The theory is that no user will have prepended this to the front of 155one of their object names and then done an MD5 hash of the name, and 156then prepended PARALLEL_UPLOAD_TEMP_NAMESPACE to the front of their object 157name. Note that there will be no problems with object name length since we 158hash the original name. 159""" 160 161# When uploading a file, get the following fields in the response for 162# filling in command output and manifests. 163UPLOAD_RETURN_FIELDS = ['crc32c', 'etag', 'generation', 'md5Hash', 'size'] 164 165# This tuple is used only to encapsulate the arguments needed for 166# command.Apply() in the parallel composite upload case. 167# Note that content_type is used instead of a full apitools Object() because 168# apitools objects are not picklable. 169# filename: String name of file. 170# file_start: start byte of file (may be in the middle of a file for partitioned 171# files). 172# file_length: length of upload (may not be the entire length of a file for 173# partitioned files). 174# src_url: FileUrl describing the source file. 175# dst_url: CloudUrl describing the destination component file. 176# canned_acl: canned_acl to apply to the uploaded file/component. 177# content_type: content-type for final object, used for setting content-type 178# of components and final object. 179# tracker_file: tracker file for this component. 180# tracker_file_lock: tracker file lock for tracker file(s). 181PerformParallelUploadFileToObjectArgs = namedtuple( 182 'PerformParallelUploadFileToObjectArgs', 183 'filename file_start file_length src_url dst_url canned_acl ' 184 'content_type tracker_file tracker_file_lock') 185 186PerformSlicedDownloadObjectToFileArgs = namedtuple( 187 'PerformSlicedDownloadObjectToFileArgs', 188 'component_num src_url src_obj_metadata dst_url download_file_name ' 189 'start_byte end_byte') 190 191PerformSlicedDownloadReturnValues = namedtuple( 192 'PerformSlicedDownloadReturnValues', 193 'component_num crc32c bytes_transferred server_encoding') 194 195ObjectFromTracker = namedtuple('ObjectFromTracker', 196 'object_name generation') 197 198# TODO: Refactor this file to be less cumbersome. In particular, some of the 199# different paths (e.g., uploading a file to an object vs. downloading an 200# object to a file) could be split into separate files. 201 202# Chunk size to use while zipping/unzipping gzip files. 203GZIP_CHUNK_SIZE = 8192 204 205# Number of bytes to wait before updating a sliced download component tracker 206# file. 207TRACKERFILE_UPDATE_THRESHOLD = TEN_MIB 208 209PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD = 150 * 1024 * 1024 210 211# S3 requires special Multipart upload logic (that we currently don't implement) 212# for files > 5GiB in size. 213S3_MAX_UPLOAD_SIZE = 5 * 1024 * 1024 * 1024 214 215# TODO: Create a multiprocessing framework value allocator, then use it instead 216# of a dict. 217global suggested_sliced_transfers, suggested_sliced_transfers_lock 218suggested_sliced_transfers = ( 219 AtomicDict() if not CheckMultiprocessingAvailableAndInit().is_available 220 else AtomicDict(manager=gslib.util.manager)) 221suggested_sliced_transfers_lock = CreateLock() 222 223 224class FileConcurrencySkipError(Exception): 225 """Raised when skipping a file due to a concurrent, duplicate copy.""" 226 227 228def _RmExceptionHandler(cls, e): 229 """Simple exception handler to allow post-completion status.""" 230 cls.logger.error(str(e)) 231 232 233def _ParallelCopyExceptionHandler(cls, e): 234 """Simple exception handler to allow post-completion status.""" 235 cls.logger.error(str(e)) 236 cls.op_failure_count += 1 237 cls.logger.debug('\n\nEncountered exception while copying:\n%s\n', 238 traceback.format_exc()) 239 240 241def _PerformParallelUploadFileToObject(cls, args, thread_state=None): 242 """Function argument to Apply for performing parallel composite uploads. 243 244 Args: 245 cls: Calling Command class. 246 args: PerformParallelUploadFileToObjectArgs tuple describing the target. 247 thread_state: gsutil Cloud API instance to use for the operation. 248 249 Returns: 250 StorageUrl representing a successfully uploaded component. 251 """ 252 fp = FilePart(args.filename, args.file_start, args.file_length) 253 gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state) 254 with fp: 255 # We take many precautions with the component names that make collisions 256 # effectively impossible. Specifying preconditions will just allow us to 257 # reach a state in which uploads will always fail on retries. 258 preconditions = None 259 260 # Fill in content type if one was provided. 261 dst_object_metadata = apitools_messages.Object( 262 name=args.dst_url.object_name, 263 bucket=args.dst_url.bucket_name, 264 contentType=args.content_type) 265 266 try: 267 if global_copy_helper_opts.canned_acl: 268 # No canned ACL support in JSON, force XML API to be used for 269 # upload/copy operations. 270 orig_prefer_api = gsutil_api.prefer_api 271 gsutil_api.prefer_api = ApiSelector.XML 272 ret = _UploadFileToObject(args.src_url, fp, args.file_length, 273 args.dst_url, dst_object_metadata, 274 preconditions, gsutil_api, cls.logger, cls, 275 _ParallelCopyExceptionHandler, 276 gzip_exts=None, allow_splitting=False) 277 finally: 278 if global_copy_helper_opts.canned_acl: 279 gsutil_api.prefer_api = orig_prefer_api 280 281 component = ret[2] 282 _AppendComponentTrackerToParallelUploadTrackerFile( 283 args.tracker_file, component, args.tracker_file_lock) 284 return ret 285 286 287CopyHelperOpts = namedtuple('CopyHelperOpts', [ 288 'perform_mv', 289 'no_clobber', 290 'daisy_chain', 291 'read_args_from_stdin', 292 'print_ver', 293 'use_manifest', 294 'preserve_acl', 295 'canned_acl', 296 'skip_unsupported_objects', 297 'test_callback_file']) 298 299 300# pylint: disable=global-variable-undefined 301def CreateCopyHelperOpts(perform_mv=False, no_clobber=False, daisy_chain=False, 302 read_args_from_stdin=False, print_ver=False, 303 use_manifest=False, preserve_acl=False, 304 canned_acl=None, skip_unsupported_objects=False, 305 test_callback_file=None): 306 """Creates CopyHelperOpts for passing options to CopyHelper.""" 307 # We create a tuple with union of options needed by CopyHelper and any 308 # copy-related functionality in CpCommand, RsyncCommand, or Command class. 309 global global_copy_helper_opts 310 global_copy_helper_opts = CopyHelperOpts( 311 perform_mv=perform_mv, 312 no_clobber=no_clobber, 313 daisy_chain=daisy_chain, 314 read_args_from_stdin=read_args_from_stdin, 315 print_ver=print_ver, 316 use_manifest=use_manifest, 317 preserve_acl=preserve_acl, 318 canned_acl=canned_acl, 319 skip_unsupported_objects=skip_unsupported_objects, 320 test_callback_file=test_callback_file) 321 return global_copy_helper_opts 322 323 324# pylint: disable=global-variable-undefined 325# pylint: disable=global-variable-not-assigned 326def GetCopyHelperOpts(): 327 """Returns namedtuple holding CopyHelper options.""" 328 global global_copy_helper_opts 329 return global_copy_helper_opts 330 331 332def _SelectDownloadStrategy(dst_url): 333 """Get download strategy based on the destination object. 334 335 Args: 336 dst_url: Destination StorageUrl. 337 338 Returns: 339 gsutil Cloud API DownloadStrategy. 340 """ 341 dst_is_special = False 342 if dst_url.IsFileUrl(): 343 # Check explicitly first because os.stat doesn't work on 'nul' in Windows. 344 if dst_url.object_name == os.devnull: 345 dst_is_special = True 346 try: 347 mode = os.stat(dst_url.object_name).st_mode 348 if stat.S_ISCHR(mode): 349 dst_is_special = True 350 except OSError: 351 pass 352 353 if dst_is_special: 354 return CloudApi.DownloadStrategy.ONE_SHOT 355 else: 356 return CloudApi.DownloadStrategy.RESUMABLE 357 358 359def _GetUploadTrackerData(tracker_file_name, logger): 360 """Reads tracker data from an upload tracker file if it exists. 361 362 Args: 363 tracker_file_name: Tracker file name for this upload. 364 logger: for outputting log messages. 365 366 Returns: 367 Serialization data if the tracker file already exists (resume existing 368 upload), None otherwise. 369 """ 370 tracker_file = None 371 372 # If we already have a matching tracker file, get the serialization data 373 # so that we can resume the upload. 374 try: 375 tracker_file = open(tracker_file_name, 'r') 376 tracker_data = tracker_file.read() 377 return tracker_data 378 except IOError as e: 379 # Ignore non-existent file (happens first time a upload is attempted on an 380 # object, or when re-starting an upload after a 381 # ResumableUploadStartOverException), but warn user for other errors. 382 if e.errno != errno.ENOENT: 383 logger.warn('Couldn\'t read upload tracker file (%s): %s. Restarting ' 384 'upload from scratch.', tracker_file_name, e.strerror) 385 finally: 386 if tracker_file: 387 tracker_file.close() 388 389 390def InsistDstUrlNamesContainer(exp_dst_url, have_existing_dst_container, 391 command_name): 392 """Ensures the destination URL names a container. 393 394 Acceptable containers include directory, bucket, bucket 395 subdir, and non-existent bucket subdir. 396 397 Args: 398 exp_dst_url: Wildcard-expanded destination StorageUrl. 399 have_existing_dst_container: bool indicator of whether exp_dst_url 400 names a container (directory, bucket, or existing bucket subdir). 401 command_name: Name of command making call. May not be the same as the 402 calling class's self.command_name in the case of commands implemented 403 atop other commands (like mv command). 404 405 Raises: 406 CommandException: if the URL being checked does not name a container. 407 """ 408 if ((exp_dst_url.IsFileUrl() and not exp_dst_url.IsDirectory()) or 409 (exp_dst_url.IsCloudUrl() and exp_dst_url.IsBucket() 410 and not have_existing_dst_container)): 411 raise CommandException('Destination URL must name a directory, bucket, ' 412 'or bucket\nsubdirectory for the multiple ' 413 'source form of the %s command.' % command_name) 414 415 416def _ShouldTreatDstUrlAsBucketSubDir(have_multiple_srcs, dst_url, 417 have_existing_dest_subdir, 418 src_url_names_container, 419 recursion_requested): 420 """Checks whether dst_url should be treated as a bucket "sub-directory". 421 422 The decision about whether something constitutes a bucket "sub-directory" 423 depends on whether there are multiple sources in this request and whether 424 there is an existing bucket subdirectory. For example, when running the 425 command: 426 gsutil cp file gs://bucket/abc 427 if there's no existing gs://bucket/abc bucket subdirectory we should copy 428 file to the object gs://bucket/abc. In contrast, if 429 there's an existing gs://bucket/abc bucket subdirectory we should copy 430 file to gs://bucket/abc/file. And regardless of whether gs://bucket/abc 431 exists, when running the command: 432 gsutil cp file1 file2 gs://bucket/abc 433 we should copy file1 to gs://bucket/abc/file1 (and similarly for file2). 434 Finally, for recursive copies, if the source is a container then we should 435 copy to a container as the target. For example, when running the command: 436 gsutil cp -r dir1 gs://bucket/dir2 437 we should copy the subtree of dir1 to gs://bucket/dir2. 438 439 Note that we don't disallow naming a bucket "sub-directory" where there's 440 already an object at that URL. For example it's legitimate (albeit 441 confusing) to have an object called gs://bucket/dir and 442 then run the command 443 gsutil cp file1 file2 gs://bucket/dir 444 Doing so will end up with objects gs://bucket/dir, gs://bucket/dir/file1, 445 and gs://bucket/dir/file2. 446 447 Args: 448 have_multiple_srcs: Bool indicator of whether this is a multi-source 449 operation. 450 dst_url: StorageUrl to check. 451 have_existing_dest_subdir: bool indicator whether dest is an existing 452 subdirectory. 453 src_url_names_container: bool indicator of whether the source URL 454 is a container. 455 recursion_requested: True if a recursive operation has been requested. 456 457 Returns: 458 bool indicator. 459 """ 460 if have_existing_dest_subdir: 461 return True 462 if dst_url.IsCloudUrl(): 463 return (have_multiple_srcs or 464 (src_url_names_container and recursion_requested)) 465 466 467def _ShouldTreatDstUrlAsSingleton(have_multiple_srcs, 468 have_existing_dest_subdir, dst_url, 469 recursion_requested): 470 """Checks that dst_url names a single file/object after wildcard expansion. 471 472 It is possible that an object path might name a bucket sub-directory. 473 474 Args: 475 have_multiple_srcs: Bool indicator of whether this is a multi-source 476 operation. 477 have_existing_dest_subdir: bool indicator whether dest is an existing 478 subdirectory. 479 dst_url: StorageUrl to check. 480 recursion_requested: True if a recursive operation has been requested. 481 482 Returns: 483 bool indicator. 484 """ 485 if recursion_requested: 486 return False 487 if dst_url.IsFileUrl(): 488 return not dst_url.IsDirectory() 489 else: # dst_url.IsCloudUrl() 490 return (not have_multiple_srcs and 491 not have_existing_dest_subdir and 492 dst_url.IsObject()) 493 494 495def ConstructDstUrl(src_url, exp_src_url, src_url_names_container, 496 have_multiple_srcs, exp_dst_url, have_existing_dest_subdir, 497 recursion_requested): 498 """Constructs the destination URL for a given exp_src_url/exp_dst_url pair. 499 500 Uses context-dependent naming rules that mimic Linux cp and mv behavior. 501 502 Args: 503 src_url: Source StorageUrl to be copied. 504 exp_src_url: Single StorageUrl from wildcard expansion of src_url. 505 src_url_names_container: True if src_url names a container (including the 506 case of a wildcard-named bucket subdir (like gs://bucket/abc, 507 where gs://bucket/abc/* matched some objects). 508 have_multiple_srcs: True if this is a multi-source request. This can be 509 true if src_url wildcard-expanded to multiple URLs or if there were 510 multiple source URLs in the request. 511 exp_dst_url: the expanded StorageUrl requested for the cp destination. 512 Final written path is constructed from this plus a context-dependent 513 variant of src_url. 514 have_existing_dest_subdir: bool indicator whether dest is an existing 515 subdirectory. 516 recursion_requested: True if a recursive operation has been requested. 517 518 Returns: 519 StorageUrl to use for copy. 520 521 Raises: 522 CommandException if destination object name not specified for 523 source and source is a stream. 524 """ 525 if _ShouldTreatDstUrlAsSingleton( 526 have_multiple_srcs, have_existing_dest_subdir, exp_dst_url, 527 recursion_requested): 528 # We're copying one file or object to one file or object. 529 return exp_dst_url 530 531 if exp_src_url.IsFileUrl() and exp_src_url.IsStream(): 532 if have_existing_dest_subdir: 533 raise CommandException('Destination object name needed when ' 534 'source is a stream') 535 return exp_dst_url 536 537 if not recursion_requested and not have_multiple_srcs: 538 # We're copying one file or object to a subdirectory. Append final comp 539 # of exp_src_url to exp_dst_url. 540 src_final_comp = exp_src_url.object_name.rpartition(src_url.delim)[-1] 541 return StorageUrlFromString('%s%s%s' % ( 542 exp_dst_url.url_string.rstrip(exp_dst_url.delim), 543 exp_dst_url.delim, src_final_comp)) 544 545 # Else we're copying multiple sources to a directory, bucket, or a bucket 546 # "sub-directory". 547 548 # Ensure exp_dst_url ends in delim char if we're doing a multi-src copy or 549 # a copy to a directory. (The check for copying to a directory needs 550 # special-case handling so that the command: 551 # gsutil cp gs://bucket/obj dir 552 # will turn into file://dir/ instead of file://dir -- the latter would cause 553 # the file "dirobj" to be created.) 554 # Note: need to check have_multiple_srcs or src_url.names_container() 555 # because src_url could be a bucket containing a single object, named 556 # as gs://bucket. 557 if ((have_multiple_srcs or src_url_names_container or 558 (exp_dst_url.IsFileUrl() and exp_dst_url.IsDirectory())) 559 and not exp_dst_url.url_string.endswith(exp_dst_url.delim)): 560 exp_dst_url = StorageUrlFromString('%s%s' % (exp_dst_url.url_string, 561 exp_dst_url.delim)) 562 563 # Making naming behavior match how things work with local Linux cp and mv 564 # operations depends on many factors, including whether the destination is a 565 # container, the plurality of the source(s), and whether the mv command is 566 # being used: 567 # 1. For the "mv" command that specifies a non-existent destination subdir, 568 # renaming should occur at the level of the src subdir, vs appending that 569 # subdir beneath the dst subdir like is done for copying. For example: 570 # gsutil rm -r gs://bucket 571 # gsutil cp -r dir1 gs://bucket 572 # gsutil cp -r dir2 gs://bucket/subdir1 573 # gsutil mv gs://bucket/subdir1 gs://bucket/subdir2 574 # would (if using cp naming behavior) end up with paths like: 575 # gs://bucket/subdir2/subdir1/dir2/.svn/all-wcprops 576 # whereas mv naming behavior should result in: 577 # gs://bucket/subdir2/dir2/.svn/all-wcprops 578 # 2. Copying from directories, buckets, or bucket subdirs should result in 579 # objects/files mirroring the source directory hierarchy. For example: 580 # gsutil cp dir1/dir2 gs://bucket 581 # should create the object gs://bucket/dir2/file2, assuming dir1/dir2 582 # contains file2). 583 # To be consistent with Linux cp behavior, there's one more wrinkle when 584 # working with subdirs: The resulting object names depend on whether the 585 # destination subdirectory exists. For example, if gs://bucket/subdir 586 # exists, the command: 587 # gsutil cp -r dir1/dir2 gs://bucket/subdir 588 # should create objects named like gs://bucket/subdir/dir2/a/b/c. In 589 # contrast, if gs://bucket/subdir does not exist, this same command 590 # should create objects named like gs://bucket/subdir/a/b/c. 591 # 3. Copying individual files or objects to dirs, buckets or bucket subdirs 592 # should result in objects/files named by the final source file name 593 # component. Example: 594 # gsutil cp dir1/*.txt gs://bucket 595 # should create the objects gs://bucket/f1.txt and gs://bucket/f2.txt, 596 # assuming dir1 contains f1.txt and f2.txt. 597 598 recursive_move_to_new_subdir = False 599 if (global_copy_helper_opts.perform_mv and recursion_requested 600 and src_url_names_container and not have_existing_dest_subdir): 601 # Case 1. Handle naming rules for bucket subdir mv. Here we want to 602 # line up the src_url against its expansion, to find the base to build 603 # the new name. For example, running the command: 604 # gsutil mv gs://bucket/abcd gs://bucket/xyz 605 # when processing exp_src_url=gs://bucket/abcd/123 606 # exp_src_url_tail should become /123 607 # Note: mv.py code disallows wildcard specification of source URL. 608 recursive_move_to_new_subdir = True 609 exp_src_url_tail = ( 610 exp_src_url.url_string[len(src_url.url_string):]) 611 dst_key_name = '%s/%s' % (exp_dst_url.object_name.rstrip('/'), 612 exp_src_url_tail.strip('/')) 613 614 elif src_url_names_container and (exp_dst_url.IsCloudUrl() or 615 exp_dst_url.IsDirectory()): 616 # Case 2. Container copy to a destination other than a file. 617 # Build dst_key_name from subpath of exp_src_url past 618 # where src_url ends. For example, for src_url=gs://bucket/ and 619 # exp_src_url=gs://bucket/src_subdir/obj, dst_key_name should be 620 # src_subdir/obj. 621 src_url_path_sans_final_dir = GetPathBeforeFinalDir(src_url) 622 dst_key_name = exp_src_url.versionless_url_string[ 623 len(src_url_path_sans_final_dir):].lstrip(src_url.delim) 624 # Handle case where dst_url is a non-existent subdir. 625 if not have_existing_dest_subdir: 626 dst_key_name = dst_key_name.partition(src_url.delim)[-1] 627 # Handle special case where src_url was a directory named with '.' or 628 # './', so that running a command like: 629 # gsutil cp -r . gs://dest 630 # will produce obj names of the form gs://dest/abc instead of 631 # gs://dest/./abc. 632 if dst_key_name.startswith('.%s' % os.sep): 633 dst_key_name = dst_key_name[2:] 634 635 else: 636 # Case 3. 637 dst_key_name = exp_src_url.object_name.rpartition(src_url.delim)[-1] 638 639 if (not recursive_move_to_new_subdir and ( 640 exp_dst_url.IsFileUrl() or _ShouldTreatDstUrlAsBucketSubDir( 641 have_multiple_srcs, exp_dst_url, have_existing_dest_subdir, 642 src_url_names_container, recursion_requested))): 643 if exp_dst_url.object_name and exp_dst_url.object_name.endswith( 644 exp_dst_url.delim): 645 dst_key_name = '%s%s%s' % ( 646 exp_dst_url.object_name.rstrip(exp_dst_url.delim), 647 exp_dst_url.delim, dst_key_name) 648 else: 649 delim = exp_dst_url.delim if exp_dst_url.object_name else '' 650 dst_key_name = '%s%s%s' % (exp_dst_url.object_name or '', 651 delim, dst_key_name) 652 653 new_exp_dst_url = exp_dst_url.Clone() 654 new_exp_dst_url.object_name = dst_key_name.replace(src_url.delim, 655 exp_dst_url.delim) 656 return new_exp_dst_url 657 658 659def _CreateDigestsFromDigesters(digesters): 660 digests = {} 661 if digesters: 662 for alg in digesters: 663 digests[alg] = base64.encodestring( 664 digesters[alg].digest()).rstrip('\n') 665 return digests 666 667 668def _CreateDigestsFromLocalFile(logger, algs, file_name, final_file_name, 669 src_obj_metadata): 670 """Creates a base64 CRC32C and/or MD5 digest from file_name. 671 672 Args: 673 logger: For outputting log messages. 674 algs: List of algorithms to compute. 675 file_name: File to digest. 676 final_file_name: Permanent location to be used for the downloaded file 677 after validation (used for logging). 678 src_obj_metadata: Metadata of source object. 679 680 Returns: 681 Dict of algorithm name : base 64 encoded digest 682 """ 683 hash_dict = {} 684 if 'md5' in algs: 685 hash_dict['md5'] = md5() 686 if 'crc32c' in algs: 687 hash_dict['crc32c'] = crcmod.predefined.Crc('crc-32c') 688 with open(file_name, 'rb') as fp: 689 CalculateHashesFromContents( 690 fp, hash_dict, ProgressCallbackWithBackoff( 691 src_obj_metadata.size, 692 FileProgressCallbackHandler( 693 ConstructAnnounceText('Hashing', final_file_name), 694 logger).call)) 695 digests = {} 696 for alg_name, digest in hash_dict.iteritems(): 697 digests[alg_name] = Base64EncodeHash(digest.hexdigest()) 698 return digests 699 700 701def _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata, 702 dst_obj_metadata): 703 """Validates integrity of two cloud objects copied via daisy-chain. 704 705 Args: 706 logger: for outputting log messages. 707 src_url: CloudUrl for source cloud object. 708 dst_url: CloudUrl for destination cloud object. 709 src_obj_metadata: Cloud Object metadata for object being downloaded from. 710 dst_obj_metadata: Cloud Object metadata for object being uploaded to. 711 712 Raises: 713 CommandException: if cloud digests don't match local digests. 714 """ 715 checked_one = False 716 download_hashes = {} 717 upload_hashes = {} 718 if src_obj_metadata.md5Hash: 719 download_hashes['md5'] = src_obj_metadata.md5Hash 720 if src_obj_metadata.crc32c: 721 download_hashes['crc32c'] = src_obj_metadata.crc32c 722 if dst_obj_metadata.md5Hash: 723 upload_hashes['md5'] = dst_obj_metadata.md5Hash 724 if dst_obj_metadata.crc32c: 725 upload_hashes['crc32c'] = dst_obj_metadata.crc32c 726 727 for alg, upload_b64_digest in upload_hashes.iteritems(): 728 if alg not in download_hashes: 729 continue 730 731 download_b64_digest = download_hashes[alg] 732 logger.debug( 733 'Comparing source vs destination %s-checksum for %s. (%s/%s)', alg, 734 dst_url, download_b64_digest, upload_b64_digest) 735 if download_b64_digest != upload_b64_digest: 736 raise HashMismatchException( 737 '%s signature for source object (%s) doesn\'t match ' 738 'destination object digest (%s). Object (%s) will be deleted.' % ( 739 alg, download_b64_digest, upload_b64_digest, dst_url)) 740 checked_one = True 741 if not checked_one: 742 # One known way this can currently happen is when downloading objects larger 743 # than 5 GiB from S3 (for which the etag is not an MD5). 744 logger.warn( 745 'WARNING: Found no hashes to validate object downloaded from %s and ' 746 'uploaded to %s. Integrity cannot be assured without hashes.', 747 src_url, dst_url) 748 749 750def _CheckHashes(logger, obj_url, obj_metadata, file_name, digests, 751 is_upload=False): 752 """Validates integrity by comparing cloud digest to local digest. 753 754 Args: 755 logger: for outputting log messages. 756 obj_url: CloudUrl for cloud object. 757 obj_metadata: Cloud Object being downloaded from or uploaded to. 758 file_name: Local file name on disk being downloaded to or uploaded from 759 (used only for logging). 760 digests: Computed Digests for the object. 761 is_upload: If true, comparing for an uploaded object (controls logging). 762 763 Raises: 764 CommandException: if cloud digests don't match local digests. 765 """ 766 local_hashes = digests 767 cloud_hashes = {} 768 if obj_metadata.md5Hash: 769 cloud_hashes['md5'] = obj_metadata.md5Hash.rstrip('\n') 770 if obj_metadata.crc32c: 771 cloud_hashes['crc32c'] = obj_metadata.crc32c.rstrip('\n') 772 773 checked_one = False 774 for alg in local_hashes: 775 if alg not in cloud_hashes: 776 continue 777 778 local_b64_digest = local_hashes[alg] 779 cloud_b64_digest = cloud_hashes[alg] 780 logger.debug( 781 'Comparing local vs cloud %s-checksum for %s. (%s/%s)', alg, file_name, 782 local_b64_digest, cloud_b64_digest) 783 if local_b64_digest != cloud_b64_digest: 784 785 raise HashMismatchException( 786 '%s signature computed for local file (%s) doesn\'t match ' 787 'cloud-supplied digest (%s). %s (%s) will be deleted.' % ( 788 alg, local_b64_digest, cloud_b64_digest, 789 'Cloud object' if is_upload else 'Local file', 790 obj_url if is_upload else file_name)) 791 checked_one = True 792 if not checked_one: 793 if is_upload: 794 logger.warn( 795 'WARNING: Found no hashes to validate object uploaded to %s. ' 796 'Integrity cannot be assured without hashes.', obj_url) 797 else: 798 # One known way this can currently happen is when downloading objects larger 799 # than 5 GB from S3 (for which the etag is not an MD5). 800 logger.warn( 801 'WARNING: Found no hashes to validate object downloaded to %s. ' 802 'Integrity cannot be assured without hashes.', file_name) 803 804 805def IsNoClobberServerException(e): 806 """Checks to see if the server attempted to clobber a file. 807 808 In this case we specified via a precondition that we didn't want the file 809 clobbered. 810 811 Args: 812 e: The Exception that was generated by a failed copy operation 813 814 Returns: 815 bool indicator - True indicates that the server did attempt to clobber 816 an existing file. 817 """ 818 return ((isinstance(e, PreconditionException)) or 819 (isinstance(e, ResumableUploadException) and '412' in e.message)) 820 821 822def CheckForDirFileConflict(exp_src_url, dst_url): 823 """Checks whether copying exp_src_url into dst_url is not possible. 824 825 This happens if a directory exists in local file system where a file 826 needs to go or vice versa. In that case we print an error message and 827 exits. Example: if the file "./x" exists and you try to do: 828 gsutil cp gs://mybucket/x/y . 829 the request can't succeed because it requires a directory where 830 the file x exists. 831 832 Note that we don't enforce any corresponding restrictions for buckets, 833 because the flat namespace semantics for buckets doesn't prohibit such 834 cases the way hierarchical file systems do. For example, if a bucket 835 contains an object called gs://bucket/dir and then you run the command: 836 gsutil cp file1 file2 gs://bucket/dir 837 you'll end up with objects gs://bucket/dir, gs://bucket/dir/file1, and 838 gs://bucket/dir/file2. 839 840 Args: 841 exp_src_url: Expanded source StorageUrl. 842 dst_url: Destination StorageUrl. 843 844 Raises: 845 CommandException: if errors encountered. 846 """ 847 if dst_url.IsCloudUrl(): 848 # The problem can only happen for file destination URLs. 849 return 850 dst_path = dst_url.object_name 851 final_dir = os.path.dirname(dst_path) 852 if os.path.isfile(final_dir): 853 raise CommandException('Cannot retrieve %s because a file exists ' 854 'where a directory needs to be created (%s).' % 855 (exp_src_url.url_string, final_dir)) 856 if os.path.isdir(dst_path): 857 raise CommandException('Cannot retrieve %s because a directory exists ' 858 '(%s) where the file needs to be created.' % 859 (exp_src_url.url_string, dst_path)) 860 861 862def _PartitionFile(fp, file_size, src_url, content_type, canned_acl, 863 dst_bucket_url, random_prefix, tracker_file, 864 tracker_file_lock): 865 """Partitions a file into FilePart objects to be uploaded and later composed. 866 867 These objects, when composed, will match the original file. This entails 868 splitting the file into parts, naming and forming a destination URL for each 869 part, and also providing the PerformParallelUploadFileToObjectArgs 870 corresponding to each part. 871 872 Args: 873 fp: The file object to be partitioned. 874 file_size: The size of fp, in bytes. 875 src_url: Source FileUrl from the original command. 876 content_type: content type for the component and final objects. 877 canned_acl: The user-provided canned_acl, if applicable. 878 dst_bucket_url: CloudUrl for the destination bucket 879 random_prefix: The randomly-generated prefix used to prevent collisions 880 among the temporary component names. 881 tracker_file: The path to the parallel composite upload tracker file. 882 tracker_file_lock: The lock protecting access to the tracker file. 883 884 Returns: 885 dst_args: The destination URIs for the temporary component objects. 886 """ 887 parallel_composite_upload_component_size = HumanReadableToBytes( 888 config.get('GSUtil', 'parallel_composite_upload_component_size', 889 DEFAULT_PARALLEL_COMPOSITE_UPLOAD_COMPONENT_SIZE)) 890 (num_components, component_size) = _GetPartitionInfo( 891 file_size, MAX_COMPOSE_ARITY, parallel_composite_upload_component_size) 892 893 dst_args = {} # Arguments to create commands and pass to subprocesses. 894 file_names = [] # Used for the 2-step process of forming dst_args. 895 for i in range(num_components): 896 # "Salt" the object name with something a user is very unlikely to have 897 # used in an object name, then hash the extended name to make sure 898 # we don't run into problems with name length. Using a deterministic 899 # naming scheme for the temporary components allows users to take 900 # advantage of resumable uploads for each component. 901 encoded_name = (PARALLEL_UPLOAD_STATIC_SALT + fp.name).encode(UTF8) 902 content_md5 = md5() 903 content_md5.update(encoded_name) 904 digest = content_md5.hexdigest() 905 temp_file_name = (random_prefix + PARALLEL_UPLOAD_TEMP_NAMESPACE + 906 digest + '_' + str(i)) 907 tmp_dst_url = dst_bucket_url.Clone() 908 tmp_dst_url.object_name = temp_file_name 909 910 if i < (num_components - 1): 911 # Every component except possibly the last is the same size. 912 file_part_length = component_size 913 else: 914 # The last component just gets all of the remaining bytes. 915 file_part_length = (file_size - ((num_components -1) * component_size)) 916 offset = i * component_size 917 func_args = PerformParallelUploadFileToObjectArgs( 918 fp.name, offset, file_part_length, src_url, tmp_dst_url, canned_acl, 919 content_type, tracker_file, tracker_file_lock) 920 file_names.append(temp_file_name) 921 dst_args[temp_file_name] = func_args 922 923 return dst_args 924 925 926def _DoParallelCompositeUpload(fp, src_url, dst_url, dst_obj_metadata, 927 canned_acl, file_size, preconditions, gsutil_api, 928 command_obj, copy_exception_handler): 929 """Uploads a local file to a cloud object using parallel composite upload. 930 931 The file is partitioned into parts, and then the parts are uploaded in 932 parallel, composed to form the original destination object, and deleted. 933 934 Args: 935 fp: The file object to be uploaded. 936 src_url: FileUrl representing the local file. 937 dst_url: CloudUrl representing the destination file. 938 dst_obj_metadata: apitools Object describing the destination object. 939 canned_acl: The canned acl to apply to the object, if any. 940 file_size: The size of the source file in bytes. 941 preconditions: Cloud API Preconditions for the final object. 942 gsutil_api: gsutil Cloud API instance to use. 943 command_obj: Command object (for calling Apply). 944 copy_exception_handler: Copy exception handler (for use in Apply). 945 946 Returns: 947 Elapsed upload time, uploaded Object with generation, crc32c, and size 948 fields populated. 949 """ 950 start_time = time.time() 951 dst_bucket_url = StorageUrlFromString(dst_url.bucket_url_string) 952 api_selector = gsutil_api.GetApiSelector(provider=dst_url.scheme) 953 # Determine which components, if any, have already been successfully 954 # uploaded. 955 tracker_file = GetTrackerFilePath(dst_url, TrackerFileType.PARALLEL_UPLOAD, 956 api_selector, src_url) 957 tracker_file_lock = CreateLock() 958 (random_prefix, existing_components) = ( 959 _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock)) 960 961 # Create the initial tracker file for the upload. 962 _CreateParallelUploadTrackerFile(tracker_file, random_prefix, 963 existing_components, tracker_file_lock) 964 965 # Get the set of all components that should be uploaded. 966 dst_args = _PartitionFile( 967 fp, file_size, src_url, dst_obj_metadata.contentType, canned_acl, 968 dst_bucket_url, random_prefix, tracker_file, tracker_file_lock) 969 970 (components_to_upload, existing_components, existing_objects_to_delete) = ( 971 FilterExistingComponents(dst_args, existing_components, dst_bucket_url, 972 gsutil_api)) 973 974 # In parallel, copy all of the file parts that haven't already been 975 # uploaded to temporary objects. 976 cp_results = command_obj.Apply( 977 _PerformParallelUploadFileToObject, components_to_upload, 978 copy_exception_handler, ('op_failure_count', 'total_bytes_transferred'), 979 arg_checker=gslib.command.DummyArgChecker, 980 parallel_operations_override=True, should_return_results=True) 981 uploaded_components = [] 982 for cp_result in cp_results: 983 uploaded_components.append(cp_result[2]) 984 components = uploaded_components + existing_components 985 986 if len(components) == len(dst_args): 987 # Only try to compose if all of the components were uploaded successfully. 988 989 def _GetComponentNumber(component): 990 return int(component.object_name[component.object_name.rfind('_')+1:]) 991 # Sort the components so that they will be composed in the correct order. 992 components = sorted(components, key=_GetComponentNumber) 993 994 request_components = [] 995 for component_url in components: 996 src_obj_metadata = ( 997 apitools_messages.ComposeRequest.SourceObjectsValueListEntry( 998 name=component_url.object_name)) 999 if component_url.HasGeneration(): 1000 src_obj_metadata.generation = long(component_url.generation) 1001 request_components.append(src_obj_metadata) 1002 1003 composed_object = gsutil_api.ComposeObject( 1004 request_components, dst_obj_metadata, preconditions=preconditions, 1005 provider=dst_url.scheme, fields=['generation', 'crc32c', 'size']) 1006 1007 try: 1008 # Make sure only to delete things that we know were successfully 1009 # uploaded (as opposed to all of the objects that we attempted to 1010 # create) so that we don't delete any preexisting objects, except for 1011 # those that were uploaded by a previous, failed run and have since 1012 # changed (but still have an old generation lying around). 1013 objects_to_delete = components + existing_objects_to_delete 1014 command_obj.Apply( 1015 _DeleteTempComponentObjectFn, objects_to_delete, _RmExceptionHandler, 1016 arg_checker=gslib.command.DummyArgChecker, 1017 parallel_operations_override=True) 1018 except Exception: # pylint: disable=broad-except 1019 # If some of the delete calls fail, don't cause the whole command to 1020 # fail. The copy was successful iff the compose call succeeded, so 1021 # reduce this to a warning. 1022 logging.warning( 1023 'Failed to delete some of the following temporary objects:\n' + 1024 '\n'.join(dst_args.keys())) 1025 finally: 1026 with tracker_file_lock: 1027 if os.path.exists(tracker_file): 1028 os.unlink(tracker_file) 1029 else: 1030 # Some of the components failed to upload. In this case, we want to exit 1031 # without deleting the objects. 1032 raise CommandException( 1033 'Some temporary components were not uploaded successfully. ' 1034 'Please retry this upload.') 1035 1036 elapsed_time = time.time() - start_time 1037 return elapsed_time, composed_object 1038 1039 1040def _ShouldDoParallelCompositeUpload(logger, allow_splitting, src_url, dst_url, 1041 file_size, canned_acl=None): 1042 """Determines whether parallel composite upload strategy should be used. 1043 1044 Args: 1045 logger: for outputting log messages. 1046 allow_splitting: If false, then this function returns false. 1047 src_url: FileUrl corresponding to a local file. 1048 dst_url: CloudUrl corresponding to destination cloud object. 1049 file_size: The size of the source file, in bytes. 1050 canned_acl: Canned ACL to apply to destination object, if any. 1051 1052 Returns: 1053 True iff a parallel upload should be performed on the source file. 1054 """ 1055 global suggested_slice_transfers, suggested_sliced_transfers_lock 1056 parallel_composite_upload_threshold = HumanReadableToBytes(config.get( 1057 'GSUtil', 'parallel_composite_upload_threshold', 1058 DEFAULT_PARALLEL_COMPOSITE_UPLOAD_THRESHOLD)) 1059 1060 all_factors_but_size = ( 1061 allow_splitting # Don't split the pieces multiple times. 1062 and not src_url.IsStream() # We can't partition streams. 1063 and dst_url.scheme == 'gs' # Compose is only for gs. 1064 and not canned_acl) # TODO: Implement canned ACL support for compose. 1065 1066 # Since parallel composite uploads are disabled by default, make user aware of 1067 # them. 1068 # TODO: Once compiled crcmod is being distributed by major Linux distributions 1069 # remove this check. 1070 if (all_factors_but_size and parallel_composite_upload_threshold == 0 1071 and file_size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD): 1072 with suggested_sliced_transfers_lock: 1073 if not suggested_sliced_transfers.get('suggested'): 1074 logger.info('\n'.join(textwrap.wrap( 1075 '==> NOTE: You are uploading one or more large file(s), which ' 1076 'would run significantly faster if you enable parallel composite ' 1077 'uploads. This feature can be enabled by editing the ' 1078 '"parallel_composite_upload_threshold" value in your .boto ' 1079 'configuration file. However, note that if you do this you and any ' 1080 'users that download such composite files will need to have a ' 1081 'compiled crcmod installed (see "gsutil help crcmod").')) + '\n') 1082 suggested_sliced_transfers['suggested'] = True 1083 1084 return (all_factors_but_size 1085 and parallel_composite_upload_threshold > 0 1086 and file_size >= parallel_composite_upload_threshold) 1087 1088 1089def ExpandUrlToSingleBlr(url_str, gsutil_api, debug, project_id, 1090 treat_nonexistent_object_as_subdir=False): 1091 """Expands wildcard if present in url_str. 1092 1093 Args: 1094 url_str: String representation of requested url. 1095 gsutil_api: gsutil Cloud API instance to use. 1096 debug: debug level to use (for iterators). 1097 project_id: project ID to use (for iterators). 1098 treat_nonexistent_object_as_subdir: indicates if should treat a non-existent 1099 object as a subdir. 1100 1101 Returns: 1102 (exp_url, have_existing_dst_container) 1103 where exp_url is a StorageUrl 1104 and have_existing_dst_container is a bool indicating whether 1105 exp_url names an existing directory, bucket, or bucket subdirectory. 1106 In the case where we match a subdirectory AND an object, the 1107 object is returned. 1108 1109 Raises: 1110 CommandException: if url_str matched more than 1 URL. 1111 """ 1112 # Handle wildcarded url case. 1113 if ContainsWildcard(url_str): 1114 blr_expansion = list(CreateWildcardIterator(url_str, gsutil_api, 1115 debug=debug, 1116 project_id=project_id)) 1117 if len(blr_expansion) != 1: 1118 raise CommandException('Destination (%s) must match exactly 1 URL' % 1119 url_str) 1120 blr = blr_expansion[0] 1121 # BLR is either an OBJECT, PREFIX, or BUCKET; the latter two represent 1122 # directories. 1123 return (StorageUrlFromString(blr.url_string), not blr.IsObject()) 1124 1125 storage_url = StorageUrlFromString(url_str) 1126 1127 # Handle non-wildcarded URL. 1128 if storage_url.IsFileUrl(): 1129 return (storage_url, storage_url.IsDirectory()) 1130 1131 # At this point we have a cloud URL. 1132 if storage_url.IsBucket(): 1133 return (storage_url, True) 1134 1135 # For object/prefix URLs, there are four cases that indicate the destination 1136 # is a cloud subdirectory; these are always considered to be an existing 1137 # container. Checking each case allows gsutil to provide Unix-like 1138 # destination folder semantics, but requires up to three HTTP calls, noted 1139 # below. 1140 1141 # Case 1: If a placeholder object ending with '/' exists. 1142 if IsCloudSubdirPlaceholder(storage_url): 1143 return (storage_url, True) 1144 1145 # HTTP call to make an eventually consistent check for a matching prefix, 1146 # _$folder$, or empty listing. 1147 expansion_empty = True 1148 list_iterator = gsutil_api.ListObjects( 1149 storage_url.bucket_name, prefix=storage_url.object_name, delimiter='/', 1150 provider=storage_url.scheme, fields=['prefixes', 'items/name']) 1151 for obj_or_prefix in list_iterator: 1152 # To conserve HTTP calls for the common case, we make a single listing 1153 # that covers prefixes and object names. Listing object names covers the 1154 # _$folder$ case and the nonexistent-object-as-subdir case. However, if 1155 # there are many existing objects for which the target URL is an exact 1156 # prefix, this listing could be paginated and span multiple HTTP calls. 1157 # If this case becomes common, we could heurestically abort the 1158 # listing operation after the first page of results and just query for the 1159 # _$folder$ object directly using GetObjectMetadata. 1160 expansion_empty = False 1161 1162 if obj_or_prefix.datatype == CloudApi.CsObjectOrPrefixType.PREFIX: 1163 # Case 2: If there is a matching prefix when listing the destination URL. 1164 return (storage_url, True) 1165 elif (obj_or_prefix.datatype == CloudApi.CsObjectOrPrefixType.OBJECT and 1166 obj_or_prefix.data.name == storage_url.object_name + '_$folder$'): 1167 # Case 3: If a placeholder object matching destination + _$folder$ 1168 # exists. 1169 return (storage_url, True) 1170 1171 # Case 4: If no objects/prefixes matched, and nonexistent objects should be 1172 # treated as subdirectories. 1173 return (storage_url, expansion_empty and treat_nonexistent_object_as_subdir) 1174 1175 1176def FixWindowsNaming(src_url, dst_url): 1177 """Translates Windows pathnames to cloud pathnames. 1178 1179 Rewrites the destination URL built by ConstructDstUrl(). 1180 1181 Args: 1182 src_url: Source StorageUrl to be copied. 1183 dst_url: The destination StorageUrl built by ConstructDstUrl(). 1184 1185 Returns: 1186 StorageUrl to use for copy. 1187 """ 1188 if (src_url.IsFileUrl() and src_url.delim == '\\' 1189 and dst_url.IsCloudUrl()): 1190 trans_url_str = re.sub(r'\\', '/', dst_url.url_string) 1191 dst_url = StorageUrlFromString(trans_url_str) 1192 return dst_url 1193 1194 1195def SrcDstSame(src_url, dst_url): 1196 """Checks if src_url and dst_url represent the same object or file. 1197 1198 We don't handle anything about hard or symbolic links. 1199 1200 Args: 1201 src_url: Source StorageUrl. 1202 dst_url: Destination StorageUrl. 1203 1204 Returns: 1205 Bool indicator. 1206 """ 1207 if src_url.IsFileUrl() and dst_url.IsFileUrl(): 1208 # Translate a/b/./c to a/b/c, so src=dst comparison below works. 1209 new_src_path = os.path.normpath(src_url.object_name) 1210 new_dst_path = os.path.normpath(dst_url.object_name) 1211 return new_src_path == new_dst_path 1212 else: 1213 return (src_url.url_string == dst_url.url_string and 1214 src_url.generation == dst_url.generation) 1215 1216 1217def _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata): 1218 """Logs copy operation, including Content-Type if appropriate. 1219 1220 Args: 1221 logger: logger instance to use for output. 1222 src_url: Source StorageUrl. 1223 dst_url: Destination StorageUrl. 1224 dst_obj_metadata: Object-specific metadata that should be overidden during 1225 the copy. 1226 """ 1227 if (dst_url.IsCloudUrl() and dst_obj_metadata and 1228 dst_obj_metadata.contentType): 1229 content_type_msg = ' [Content-Type=%s]' % dst_obj_metadata.contentType 1230 else: 1231 content_type_msg = '' 1232 if src_url.IsFileUrl() and src_url.IsStream(): 1233 logger.info('Copying from <STDIN>%s...', content_type_msg) 1234 else: 1235 logger.info('Copying %s%s...', src_url.url_string, content_type_msg) 1236 1237 1238# pylint: disable=undefined-variable 1239def _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url, 1240 dst_obj_metadata, preconditions, gsutil_api, 1241 logger): 1242 """Performs copy-in-the cloud from specified src to dest object. 1243 1244 Args: 1245 src_url: Source CloudUrl. 1246 src_obj_metadata: Metadata for source object; must include etag and size. 1247 dst_url: Destination CloudUrl. 1248 dst_obj_metadata: Object-specific metadata that should be overidden during 1249 the copy. 1250 preconditions: Preconditions to use for the copy. 1251 gsutil_api: gsutil Cloud API instance to use for the copy. 1252 logger: logging.Logger for log message output. 1253 1254 Returns: 1255 (elapsed_time, bytes_transferred, dst_url with generation, 1256 md5 hash of destination) excluding overhead like initial GET. 1257 1258 Raises: 1259 CommandException: if errors encountered. 1260 """ 1261 start_time = time.time() 1262 1263 progress_callback = FileProgressCallbackHandler( 1264 ConstructAnnounceText('Copying', dst_url.url_string), logger).call 1265 if global_copy_helper_opts.test_callback_file: 1266 with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: 1267 progress_callback = pickle.loads(test_fp.read()).call 1268 dst_obj = gsutil_api.CopyObject( 1269 src_obj_metadata, dst_obj_metadata, src_generation=src_url.generation, 1270 canned_acl=global_copy_helper_opts.canned_acl, 1271 preconditions=preconditions, progress_callback=progress_callback, 1272 provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS) 1273 1274 end_time = time.time() 1275 1276 result_url = dst_url.Clone() 1277 result_url.generation = GenerationFromUrlAndString(result_url, 1278 dst_obj.generation) 1279 1280 return (end_time - start_time, src_obj_metadata.size, result_url, 1281 dst_obj.md5Hash) 1282 1283 1284def _SetContentTypeFromFile(src_url, dst_obj_metadata): 1285 """Detects and sets Content-Type if src_url names a local file. 1286 1287 Args: 1288 src_url: Source StorageUrl. 1289 dst_obj_metadata: Object-specific metadata that should be overidden during 1290 the copy. 1291 """ 1292 # contentType == '' if user requested default type. 1293 if (dst_obj_metadata.contentType is None and src_url.IsFileUrl() 1294 and not src_url.IsStream()): 1295 # Only do content type recognition if src_url is a file. Object-to-object 1296 # copies with no -h Content-Type specified re-use the content type of the 1297 # source object. 1298 object_name = src_url.object_name 1299 content_type = None 1300 # Streams (denoted by '-') are expected to be 'application/octet-stream' 1301 # and 'file' would partially consume them. 1302 if object_name != '-': 1303 if config.getbool('GSUtil', 'use_magicfile', False): 1304 p = subprocess.Popen(['file', '--mime-type', object_name], 1305 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 1306 output, error = p.communicate() 1307 p.stdout.close() 1308 p.stderr.close() 1309 if p.returncode != 0 or error: 1310 raise CommandException( 1311 'Encountered error running "file --mime-type %s" ' 1312 '(returncode=%d).\n%s' % (object_name, p.returncode, error)) 1313 # Parse output by removing line delimiter and splitting on last ": 1314 content_type = output.rstrip().rpartition(': ')[2] 1315 else: 1316 content_type = mimetypes.guess_type(object_name)[0] 1317 if not content_type: 1318 content_type = DEFAULT_CONTENT_TYPE 1319 dst_obj_metadata.contentType = content_type 1320 1321 1322# pylint: disable=undefined-variable 1323def _UploadFileToObjectNonResumable(src_url, src_obj_filestream, 1324 src_obj_size, dst_url, dst_obj_metadata, 1325 preconditions, gsutil_api, logger): 1326 """Uploads the file using a non-resumable strategy. 1327 1328 Args: 1329 src_url: Source StorageUrl to upload. 1330 src_obj_filestream: File pointer to uploadable bytes. 1331 src_obj_size: Size of the source object. 1332 dst_url: Destination StorageUrl for the upload. 1333 dst_obj_metadata: Metadata for the target object. 1334 preconditions: Preconditions for the upload, if any. 1335 gsutil_api: gsutil Cloud API instance to use for the upload. 1336 logger: For outputting log messages. 1337 1338 Returns: 1339 Elapsed upload time, uploaded Object with generation, md5, and size fields 1340 populated. 1341 """ 1342 progress_callback = FileProgressCallbackHandler( 1343 ConstructAnnounceText('Uploading', dst_url.url_string), logger).call 1344 if global_copy_helper_opts.test_callback_file: 1345 with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: 1346 progress_callback = pickle.loads(test_fp.read()).call 1347 start_time = time.time() 1348 1349 if src_url.IsStream(): 1350 # TODO: gsutil-beta: Provide progress callbacks for streaming uploads. 1351 uploaded_object = gsutil_api.UploadObjectStreaming( 1352 src_obj_filestream, object_metadata=dst_obj_metadata, 1353 canned_acl=global_copy_helper_opts.canned_acl, 1354 preconditions=preconditions, progress_callback=progress_callback, 1355 provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS) 1356 else: 1357 uploaded_object = gsutil_api.UploadObject( 1358 src_obj_filestream, object_metadata=dst_obj_metadata, 1359 canned_acl=global_copy_helper_opts.canned_acl, size=src_obj_size, 1360 preconditions=preconditions, progress_callback=progress_callback, 1361 provider=dst_url.scheme, fields=UPLOAD_RETURN_FIELDS) 1362 end_time = time.time() 1363 elapsed_time = end_time - start_time 1364 1365 return elapsed_time, uploaded_object 1366 1367 1368# pylint: disable=undefined-variable 1369def _UploadFileToObjectResumable(src_url, src_obj_filestream, 1370 src_obj_size, dst_url, dst_obj_metadata, 1371 preconditions, gsutil_api, logger): 1372 """Uploads the file using a resumable strategy. 1373 1374 Args: 1375 src_url: Source FileUrl to upload. Must not be a stream. 1376 src_obj_filestream: File pointer to uploadable bytes. 1377 src_obj_size: Size of the source object. 1378 dst_url: Destination StorageUrl for the upload. 1379 dst_obj_metadata: Metadata for the target object. 1380 preconditions: Preconditions for the upload, if any. 1381 gsutil_api: gsutil Cloud API instance to use for the upload. 1382 logger: for outputting log messages. 1383 1384 Returns: 1385 Elapsed upload time, uploaded Object with generation, md5, and size fields 1386 populated. 1387 """ 1388 tracker_file_name = GetTrackerFilePath( 1389 dst_url, TrackerFileType.UPLOAD, 1390 gsutil_api.GetApiSelector(provider=dst_url.scheme)) 1391 1392 def _UploadTrackerCallback(serialization_data): 1393 """Creates a new tracker file for starting an upload from scratch. 1394 1395 This function is called by the gsutil Cloud API implementation and the 1396 the serialization data is implementation-specific. 1397 1398 Args: 1399 serialization_data: Serialization data used in resuming the upload. 1400 """ 1401 tracker_file = None 1402 try: 1403 tracker_file = open(tracker_file_name, 'w') 1404 tracker_file.write(str(serialization_data)) 1405 except IOError as e: 1406 RaiseUnwritableTrackerFileException(tracker_file_name, e.strerror) 1407 finally: 1408 if tracker_file: 1409 tracker_file.close() 1410 1411 # This contains the upload URL, which will uniquely identify the 1412 # destination object. 1413 tracker_data = _GetUploadTrackerData(tracker_file_name, logger) 1414 if tracker_data: 1415 logger.info( 1416 'Resuming upload for %s', src_url.url_string) 1417 1418 retryable = True 1419 1420 progress_callback = FileProgressCallbackHandler( 1421 ConstructAnnounceText('Uploading', dst_url.url_string), logger).call 1422 if global_copy_helper_opts.test_callback_file: 1423 with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: 1424 progress_callback = pickle.loads(test_fp.read()).call 1425 1426 start_time = time.time() 1427 num_startover_attempts = 0 1428 # This loop causes us to retry when the resumable upload failed in a way that 1429 # requires starting over with a new upload ID. Retries within a single upload 1430 # ID within the current process are handled in 1431 # gsutil_api.UploadObjectResumable, and retries within a single upload ID 1432 # spanning processes happens if an exception occurs not caught below (which 1433 # will leave the tracker file in place, and cause the upload ID to be reused 1434 # the next time the user runs gsutil and attempts the same upload). 1435 while retryable: 1436 try: 1437 uploaded_object = gsutil_api.UploadObjectResumable( 1438 src_obj_filestream, object_metadata=dst_obj_metadata, 1439 canned_acl=global_copy_helper_opts.canned_acl, 1440 preconditions=preconditions, provider=dst_url.scheme, 1441 size=src_obj_size, serialization_data=tracker_data, 1442 fields=UPLOAD_RETURN_FIELDS, 1443 tracker_callback=_UploadTrackerCallback, 1444 progress_callback=progress_callback) 1445 retryable = False 1446 except ResumableUploadStartOverException, e: 1447 # This can happen, for example, if the server sends a 410 response code. 1448 # In that case the current resumable upload ID can't be reused, so delete 1449 # the tracker file and try again up to max retries. 1450 num_startover_attempts += 1 1451 retryable = (num_startover_attempts < GetNumRetries()) 1452 if not retryable: 1453 raise 1454 1455 # If the server sends a 404 response code, then the upload should only 1456 # be restarted if it was the object (and not the bucket) that was missing. 1457 try: 1458 gsutil_api.GetBucket(dst_obj_metadata.bucket, provider=dst_url.scheme) 1459 except NotFoundException: 1460 raise 1461 1462 logger.info('Restarting upload from scratch after exception %s', e) 1463 DeleteTrackerFile(tracker_file_name) 1464 tracker_data = None 1465 src_obj_filestream.seek(0) 1466 # Reset the progress callback handler. 1467 progress_callback = FileProgressCallbackHandler( 1468 ConstructAnnounceText('Uploading', dst_url.url_string), logger).call 1469 logger.info('\n'.join(textwrap.wrap( 1470 'Resumable upload of %s failed with a response code indicating we ' 1471 'need to start over with a new resumable upload ID. Backing off ' 1472 'and retrying.' % src_url.url_string))) 1473 time.sleep(min(random.random() * (2 ** num_startover_attempts), 1474 GetMaxRetryDelay())) 1475 except ResumableUploadAbortException: 1476 retryable = False 1477 raise 1478 finally: 1479 if not retryable: 1480 DeleteTrackerFile(tracker_file_name) 1481 1482 end_time = time.time() 1483 elapsed_time = end_time - start_time 1484 1485 return (elapsed_time, uploaded_object) 1486 1487 1488def _CompressFileForUpload(src_url, src_obj_filestream, src_obj_size, logger): 1489 """Compresses a to-be-uploaded local file to save bandwidth. 1490 1491 Args: 1492 src_url: Source FileUrl. 1493 src_obj_filestream: Read stream of the source file - will be consumed 1494 and closed. 1495 src_obj_size: Size of the source file. 1496 logger: for outputting log messages. 1497 1498 Returns: 1499 StorageUrl path to compressed file, compressed file size. 1500 """ 1501 # TODO: Compress using a streaming model as opposed to all at once here. 1502 if src_obj_size >= MIN_SIZE_COMPUTE_LOGGING: 1503 logger.info( 1504 'Compressing %s (to tmp)...', src_url) 1505 (gzip_fh, gzip_path) = tempfile.mkstemp() 1506 gzip_fp = None 1507 try: 1508 # Check for temp space. Assume the compressed object is at most 2x 1509 # the size of the object (normally should compress to smaller than 1510 # the object) 1511 if CheckFreeSpace(gzip_path) < 2*int(src_obj_size): 1512 raise CommandException('Inadequate temp space available to compress ' 1513 '%s. See the CHANGING TEMP DIRECTORIES section ' 1514 'of "gsutil help cp" for more info.' % src_url) 1515 gzip_fp = gzip.open(gzip_path, 'wb') 1516 data = src_obj_filestream.read(GZIP_CHUNK_SIZE) 1517 while data: 1518 gzip_fp.write(data) 1519 data = src_obj_filestream.read(GZIP_CHUNK_SIZE) 1520 finally: 1521 if gzip_fp: 1522 gzip_fp.close() 1523 os.close(gzip_fh) 1524 src_obj_filestream.close() 1525 gzip_size = os.path.getsize(gzip_path) 1526 return StorageUrlFromString(gzip_path), gzip_size 1527 1528 1529def _UploadFileToObject(src_url, src_obj_filestream, src_obj_size, 1530 dst_url, dst_obj_metadata, preconditions, gsutil_api, 1531 logger, command_obj, copy_exception_handler, 1532 gzip_exts=None, allow_splitting=True): 1533 """Uploads a local file to an object. 1534 1535 Args: 1536 src_url: Source FileUrl. 1537 src_obj_filestream: Read stream of the source file to be read and closed. 1538 src_obj_size: Size of the source file. 1539 dst_url: Destination CloudUrl. 1540 dst_obj_metadata: Metadata to be applied to the destination object. 1541 preconditions: Preconditions to use for the copy. 1542 gsutil_api: gsutil Cloud API to use for the copy. 1543 logger: for outputting log messages. 1544 command_obj: command object for use in Apply in parallel composite uploads. 1545 copy_exception_handler: For handling copy exceptions during Apply. 1546 gzip_exts: List of file extensions to gzip prior to upload, if any. 1547 allow_splitting: Whether to allow the file to be split into component 1548 pieces for an parallel composite upload. 1549 1550 Returns: 1551 (elapsed_time, bytes_transferred, dst_url with generation, 1552 md5 hash of destination) excluding overhead like initial GET. 1553 1554 Raises: 1555 CommandException: if errors encountered. 1556 """ 1557 if not dst_obj_metadata or not dst_obj_metadata.contentLanguage: 1558 content_language = config.get_value('GSUtil', 'content_language') 1559 if content_language: 1560 dst_obj_metadata.contentLanguage = content_language 1561 1562 fname_parts = src_url.object_name.split('.') 1563 upload_url = src_url 1564 upload_stream = src_obj_filestream 1565 upload_size = src_obj_size 1566 zipped_file = False 1567 if gzip_exts and len(fname_parts) > 1 and fname_parts[-1] in gzip_exts: 1568 upload_url, upload_size = _CompressFileForUpload( 1569 src_url, src_obj_filestream, src_obj_size, logger) 1570 upload_stream = open(upload_url.object_name, 'rb') 1571 dst_obj_metadata.contentEncoding = 'gzip' 1572 zipped_file = True 1573 1574 elapsed_time = None 1575 uploaded_object = None 1576 hash_algs = GetUploadHashAlgs() 1577 digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {}) 1578 1579 parallel_composite_upload = _ShouldDoParallelCompositeUpload( 1580 logger, allow_splitting, upload_url, dst_url, src_obj_size, 1581 canned_acl=global_copy_helper_opts.canned_acl) 1582 1583 if (src_url.IsStream() and 1584 gsutil_api.GetApiSelector(provider=dst_url.scheme) == ApiSelector.JSON): 1585 orig_stream = upload_stream 1586 # Add limited seekable properties to the stream via buffering. 1587 upload_stream = ResumableStreamingJsonUploadWrapper( 1588 orig_stream, GetJsonResumableChunkSize()) 1589 1590 if not parallel_composite_upload and len(hash_algs): 1591 # Parallel composite uploads calculate hashes per-component in subsequent 1592 # calls to this function, but the composition of the final object is a 1593 # cloud-only operation. 1594 wrapped_filestream = HashingFileUploadWrapper(upload_stream, digesters, 1595 hash_algs, upload_url, logger) 1596 else: 1597 wrapped_filestream = upload_stream 1598 1599 try: 1600 if parallel_composite_upload: 1601 elapsed_time, uploaded_object = _DoParallelCompositeUpload( 1602 upload_stream, upload_url, dst_url, dst_obj_metadata, 1603 global_copy_helper_opts.canned_acl, upload_size, preconditions, 1604 gsutil_api, command_obj, copy_exception_handler) 1605 elif upload_size < ResumableThreshold() or src_url.IsStream(): 1606 elapsed_time, uploaded_object = _UploadFileToObjectNonResumable( 1607 upload_url, wrapped_filestream, upload_size, dst_url, 1608 dst_obj_metadata, preconditions, gsutil_api, logger) 1609 else: 1610 elapsed_time, uploaded_object = _UploadFileToObjectResumable( 1611 upload_url, wrapped_filestream, upload_size, dst_url, 1612 dst_obj_metadata, preconditions, gsutil_api, logger) 1613 1614 finally: 1615 if zipped_file: 1616 try: 1617 os.unlink(upload_url.object_name) 1618 # Windows sometimes complains the temp file is locked when you try to 1619 # delete it. 1620 except Exception: # pylint: disable=broad-except 1621 logger.warning( 1622 'Could not delete %s. This can occur in Windows because the ' 1623 'temporary file is still locked.', upload_url.object_name) 1624 # In the gzip case, this is the gzip stream. _CompressFileForUpload will 1625 # have already closed the original source stream. 1626 upload_stream.close() 1627 1628 if not parallel_composite_upload: 1629 try: 1630 digests = _CreateDigestsFromDigesters(digesters) 1631 _CheckHashes(logger, dst_url, uploaded_object, src_url.object_name, 1632 digests, is_upload=True) 1633 except HashMismatchException: 1634 if _RENAME_ON_HASH_MISMATCH: 1635 corrupted_obj_metadata = apitools_messages.Object( 1636 name=dst_obj_metadata.name, 1637 bucket=dst_obj_metadata.bucket, 1638 etag=uploaded_object.etag) 1639 dst_obj_metadata.name = (dst_url.object_name + 1640 _RENAME_ON_HASH_MISMATCH_SUFFIX) 1641 gsutil_api.CopyObject(corrupted_obj_metadata, 1642 dst_obj_metadata, provider=dst_url.scheme) 1643 # If the digest doesn't match, delete the object. 1644 gsutil_api.DeleteObject(dst_url.bucket_name, dst_url.object_name, 1645 generation=uploaded_object.generation, 1646 provider=dst_url.scheme) 1647 raise 1648 1649 result_url = dst_url.Clone() 1650 1651 result_url.generation = uploaded_object.generation 1652 result_url.generation = GenerationFromUrlAndString( 1653 result_url, uploaded_object.generation) 1654 1655 return (elapsed_time, uploaded_object.size, result_url, 1656 uploaded_object.md5Hash) 1657 1658 1659def _GetDownloadFile(dst_url, src_obj_metadata, logger): 1660 """Creates a new download file, and deletes the file that will be replaced. 1661 1662 Names and creates a temporary file for this download. Also, if there is an 1663 existing file at the path where this file will be placed after the download 1664 is completed, that file will be deleted. 1665 1666 Args: 1667 dst_url: Destination FileUrl. 1668 src_obj_metadata: Metadata from the source object. 1669 logger: for outputting log messages. 1670 1671 Returns: 1672 (download_file_name, need_to_unzip) 1673 download_file_name: The name of the temporary file to which the object will 1674 be downloaded. 1675 need_to_unzip: If true, a temporary zip file was used and must be 1676 uncompressed as part of validation. 1677 """ 1678 dir_name = os.path.dirname(dst_url.object_name) 1679 if dir_name and not os.path.exists(dir_name): 1680 # Do dir creation in try block so can ignore case where dir already 1681 # exists. This is needed to avoid a race condition when running gsutil 1682 # -m cp. 1683 try: 1684 os.makedirs(dir_name) 1685 except OSError, e: 1686 if e.errno != errno.EEXIST: 1687 raise 1688 1689 need_to_unzip = False 1690 # For gzipped objects download to a temp file and unzip. For the XML API, 1691 # this represents the result of a HEAD request. For the JSON API, this is 1692 # the stored encoding which the service may not respect. However, if the 1693 # server sends decompressed bytes for a file that is stored compressed 1694 # (double compressed case), there is no way we can validate the hash and 1695 # we will fail our hash check for the object. 1696 if (src_obj_metadata.contentEncoding and 1697 src_obj_metadata.contentEncoding.lower().endswith('gzip')): 1698 need_to_unzip = True 1699 download_file_name = _GetDownloadTempZipFileName(dst_url) 1700 logger.info( 1701 'Downloading to temp gzip filename %s', download_file_name) 1702 else: 1703 download_file_name = _GetDownloadTempFileName(dst_url) 1704 1705 # If a file exists at the permanent destination (where the file will be moved 1706 # after the download is completed), delete it here to reduce disk space 1707 # requirements. 1708 if os.path.exists(dst_url.object_name): 1709 os.unlink(dst_url.object_name) 1710 1711 # Downloads open the temporary download file in r+b mode, which requires it 1712 # to already exist, so we create it here if it doesn't exist already. 1713 fp = open(download_file_name, 'ab') 1714 fp.close() 1715 return download_file_name, need_to_unzip 1716 1717 1718def _ShouldDoSlicedDownload(download_strategy, src_obj_metadata, 1719 allow_splitting, logger): 1720 """Determines whether the sliced download strategy should be used. 1721 1722 Args: 1723 download_strategy: CloudApi download strategy. 1724 src_obj_metadata: Metadata from the source object. 1725 allow_splitting: If false, then this function returns false. 1726 logger: logging.Logger for log message output. 1727 1728 Returns: 1729 True iff a sliced download should be performed on the source file. 1730 """ 1731 sliced_object_download_threshold = HumanReadableToBytes(config.get( 1732 'GSUtil', 'sliced_object_download_threshold', 1733 DEFAULT_SLICED_OBJECT_DOWNLOAD_THRESHOLD)) 1734 1735 max_components = config.getint( 1736 'GSUtil', 'sliced_object_download_max_components', 1737 DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS) 1738 1739 # Don't use sliced download if it will prevent us from performing an 1740 # integrity check. 1741 check_hashes_config = config.get( 1742 'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL) 1743 parallel_hashing = src_obj_metadata.crc32c and UsingCrcmodExtension(crcmod) 1744 hashing_okay = parallel_hashing or check_hashes_config == CHECK_HASH_NEVER 1745 1746 use_slice = ( 1747 allow_splitting 1748 and download_strategy is not CloudApi.DownloadStrategy.ONE_SHOT 1749 and max_components > 1 1750 and hashing_okay 1751 and sliced_object_download_threshold > 0 1752 and src_obj_metadata.size >= sliced_object_download_threshold) 1753 1754 if (not use_slice 1755 and src_obj_metadata.size >= PARALLEL_COMPOSITE_SUGGESTION_THRESHOLD 1756 and not UsingCrcmodExtension(crcmod) 1757 and check_hashes_config != CHECK_HASH_NEVER): 1758 with suggested_sliced_transfers_lock: 1759 if not suggested_sliced_transfers.get('suggested'): 1760 logger.info('\n'.join(textwrap.wrap( 1761 '==> NOTE: You are downloading one or more large file(s), which ' 1762 'would run significantly faster if you enabled sliced object ' 1763 'uploads. This feature is enabled by default but requires that ' 1764 'compiled crcmod be installed (see "gsutil help crcmod").')) + '\n') 1765 suggested_sliced_transfers['suggested'] = True 1766 1767 return use_slice 1768 1769 1770def _PerformSlicedDownloadObjectToFile(cls, args, thread_state=None): 1771 """Function argument to Apply for performing sliced downloads. 1772 1773 Args: 1774 cls: Calling Command class. 1775 args: PerformSlicedDownloadObjectToFileArgs tuple describing the target. 1776 thread_state: gsutil Cloud API instance to use for the operation. 1777 1778 Returns: 1779 PerformSlicedDownloadReturnValues named-tuple filled with: 1780 component_num: The component number for this download. 1781 crc32c: CRC32C hash value (integer) of the downloaded bytes. 1782 bytes_transferred: The number of bytes transferred, potentially less 1783 than the component size if the download was resumed. 1784 """ 1785 gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state) 1786 hash_algs = GetDownloadHashAlgs( 1787 cls.logger, consider_crc32c=args.src_obj_metadata.crc32c) 1788 digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {}) 1789 1790 (bytes_transferred, server_encoding) = ( 1791 _DownloadObjectToFileResumable(args.src_url, args.src_obj_metadata, 1792 args.dst_url, args.download_file_name, 1793 gsutil_api, cls.logger, digesters, 1794 component_num=args.component_num, 1795 start_byte=args.start_byte, 1796 end_byte=args.end_byte)) 1797 1798 crc32c_val = None 1799 if 'crc32c' in digesters: 1800 crc32c_val = digesters['crc32c'].crcValue 1801 return PerformSlicedDownloadReturnValues( 1802 args.component_num, crc32c_val, bytes_transferred, server_encoding) 1803 1804 1805def _MaintainSlicedDownloadTrackerFiles(src_obj_metadata, dst_url, 1806 download_file_name, logger, 1807 api_selector, num_components): 1808 """Maintains sliced download tracker files in order to permit resumability. 1809 1810 Reads or creates a sliced download tracker file representing this object 1811 download. Upon an attempt at cross-process resumption, the contents of the 1812 sliced download tracker file are verified to make sure a resumption is 1813 possible and appropriate. In the case that a resumption should not be 1814 attempted, existing component tracker files are deleted (to prevent child 1815 processes from attempting resumption), and a new sliced download tracker 1816 file is created. 1817 1818 Args: 1819 src_obj_metadata: Metadata from the source object. Must include etag and 1820 generation. 1821 dst_url: Destination FileUrl. 1822 download_file_name: Temporary file name to be used for the download. 1823 logger: for outputting log messages. 1824 api_selector: The Cloud API implementation used. 1825 num_components: The number of components to perform this download with. 1826 """ 1827 assert src_obj_metadata.etag 1828 tracker_file = None 1829 1830 # Only can happen if the resumable threshold is set higher than the 1831 # parallel transfer threshold. 1832 if src_obj_metadata.size < ResumableThreshold(): 1833 return 1834 1835 tracker_file_name = GetTrackerFilePath(dst_url, 1836 TrackerFileType.SLICED_DOWNLOAD, 1837 api_selector) 1838 1839 # Check to see if we should attempt resuming the download. 1840 try: 1841 fp = open(download_file_name, 'rb') 1842 existing_file_size = GetFileSize(fp) 1843 # A parallel resumption should be attempted only if the destination file 1844 # size is exactly the same as the source size and the tracker file matches. 1845 if existing_file_size == src_obj_metadata.size: 1846 tracker_file = open(tracker_file_name, 'r') 1847 tracker_file_data = json.load(tracker_file) 1848 if (tracker_file_data['etag'] == src_obj_metadata.etag and 1849 tracker_file_data['generation'] == src_obj_metadata.generation and 1850 tracker_file_data['num_components'] == num_components): 1851 return 1852 else: 1853 tracker_file.close() 1854 logger.warn('Sliced download tracker file doesn\'t match for ' 1855 'download of %s. Restarting download from scratch.' % 1856 dst_url.object_name) 1857 1858 except (IOError, ValueError) as e: 1859 # Ignore non-existent file (happens first time a download 1860 # is attempted on an object), but warn user for other errors. 1861 if isinstance(e, ValueError) or e.errno != errno.ENOENT: 1862 logger.warn('Couldn\'t read sliced download tracker file (%s): %s. ' 1863 'Restarting download from scratch.' % 1864 (tracker_file_name, str(e))) 1865 finally: 1866 if fp: 1867 fp.close() 1868 if tracker_file: 1869 tracker_file.close() 1870 1871 # Delete component tracker files to guarantee download starts from scratch. 1872 DeleteDownloadTrackerFiles(dst_url, api_selector) 1873 1874 # Create a new sliced download tracker file to represent this download. 1875 try: 1876 with open(tracker_file_name, 'w') as tracker_file: 1877 tracker_file_data = {'etag': src_obj_metadata.etag, 1878 'generation': src_obj_metadata.generation, 1879 'num_components': num_components} 1880 tracker_file.write(json.dumps(tracker_file_data)) 1881 except IOError as e: 1882 RaiseUnwritableTrackerFileException(tracker_file_name, e.strerror) 1883 1884 1885class SlicedDownloadFileWrapper(object): 1886 """Wraps a file object to be used in GetObjectMedia for sliced downloads. 1887 1888 In order to allow resumability, the file object used by each thread in a 1889 sliced object download should be wrapped using SlicedDownloadFileWrapper. 1890 Passing a SlicedDownloadFileWrapper object to GetObjectMedia will allow the 1891 download component tracker file for this component to be updated periodically, 1892 while the downloaded bytes are normally written to file. 1893 """ 1894 1895 def __init__(self, fp, tracker_file_name, src_obj_metadata, start_byte, 1896 end_byte): 1897 """Initializes the SlicedDownloadFileWrapper. 1898 1899 Args: 1900 fp: The already-open file object to be used for writing in 1901 GetObjectMedia. Data will be written to file starting at the current 1902 seek position. 1903 tracker_file_name: The name of the tracker file for this component. 1904 src_obj_metadata: Metadata from the source object. Must include etag and 1905 generation. 1906 start_byte: The first byte to be downloaded for this parallel component. 1907 end_byte: The last byte to be downloaded for this parallel component. 1908 """ 1909 self._orig_fp = fp 1910 self._tracker_file_name = tracker_file_name 1911 self._src_obj_metadata = src_obj_metadata 1912 self._last_tracker_file_byte = None 1913 self._start_byte = start_byte 1914 self._end_byte = end_byte 1915 1916 def write(self, data): # pylint: disable=invalid-name 1917 current_file_pos = self._orig_fp.tell() 1918 assert (self._start_byte <= current_file_pos and 1919 current_file_pos + len(data) <= self._end_byte + 1) 1920 1921 self._orig_fp.write(data) 1922 current_file_pos = self._orig_fp.tell() 1923 1924 threshold = TRACKERFILE_UPDATE_THRESHOLD 1925 if (self._last_tracker_file_byte is None or 1926 current_file_pos - self._last_tracker_file_byte > threshold or 1927 current_file_pos == self._end_byte + 1): 1928 WriteDownloadComponentTrackerFile( 1929 self._tracker_file_name, self._src_obj_metadata, current_file_pos) 1930 self._last_tracker_file_byte = current_file_pos 1931 1932 def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name 1933 if whence == os.SEEK_END: 1934 self._orig_fp.seek(offset + self._end_byte + 1) 1935 else: 1936 self._orig_fp.seek(offset, whence) 1937 assert self._start_byte <= self._orig_fp.tell() <= self._end_byte + 1 1938 1939 def tell(self): # pylint: disable=invalid-name 1940 return self._orig_fp.tell() 1941 1942 def flush(self): # pylint: disable=invalid-name 1943 self._orig_fp.flush() 1944 1945 def close(self): # pylint: disable=invalid-name 1946 if self._orig_fp: 1947 self._orig_fp.close() 1948 1949 1950def _PartitionObject(src_url, src_obj_metadata, dst_url, 1951 download_file_name): 1952 """Partitions an object into components to be downloaded. 1953 1954 Each component is a byte range of the object. The byte ranges 1955 of the returned components are mutually exclusive and collectively 1956 exhaustive. The byte ranges are inclusive at both end points. 1957 1958 Args: 1959 src_url: Source CloudUrl. 1960 src_obj_metadata: Metadata from the source object. 1961 dst_url: Destination FileUrl. 1962 download_file_name: Temporary file name to be used for the download. 1963 1964 Returns: 1965 components_to_download: A list of PerformSlicedDownloadObjectToFileArgs 1966 to be used in Apply for the sliced download. 1967 """ 1968 sliced_download_component_size = HumanReadableToBytes( 1969 config.get('GSUtil', 'sliced_object_download_component_size', 1970 DEFAULT_SLICED_OBJECT_DOWNLOAD_COMPONENT_SIZE)) 1971 1972 max_components = config.getint( 1973 'GSUtil', 'sliced_object_download_max_components', 1974 DEFAULT_SLICED_OBJECT_DOWNLOAD_MAX_COMPONENTS) 1975 1976 num_components, component_size = _GetPartitionInfo( 1977 src_obj_metadata.size, max_components, sliced_download_component_size) 1978 1979 components_to_download = [] 1980 component_lengths = [] 1981 for i in range(num_components): 1982 start_byte = i * component_size 1983 end_byte = min((i + 1) * (component_size) - 1, src_obj_metadata.size - 1) 1984 component_lengths.append(end_byte - start_byte + 1) 1985 components_to_download.append( 1986 PerformSlicedDownloadObjectToFileArgs( 1987 i, src_url, src_obj_metadata, dst_url, download_file_name, 1988 start_byte, end_byte)) 1989 return components_to_download, component_lengths 1990 1991 1992def _DoSlicedDownload(src_url, src_obj_metadata, dst_url, download_file_name, 1993 command_obj, logger, copy_exception_handler, 1994 api_selector): 1995 """Downloads a cloud object to a local file using sliced download. 1996 1997 Byte ranges are decided for each thread/process, and then the parts are 1998 downloaded in parallel. 1999 2000 Args: 2001 src_url: Source CloudUrl. 2002 src_obj_metadata: Metadata from the source object. 2003 dst_url: Destination FileUrl. 2004 download_file_name: Temporary file name to be used for download. 2005 command_obj: command object for use in Apply in parallel composite uploads. 2006 logger: for outputting log messages. 2007 copy_exception_handler: For handling copy exceptions during Apply. 2008 api_selector: The Cloud API implementation used. 2009 2010 Returns: 2011 (bytes_transferred, crc32c) 2012 bytes_transferred: Number of bytes transferred from server this call. 2013 crc32c: a crc32c hash value (integer) for the downloaded bytes, or None if 2014 crc32c hashing wasn't performed. 2015 """ 2016 components_to_download, component_lengths = _PartitionObject( 2017 src_url, src_obj_metadata, dst_url, download_file_name) 2018 2019 num_components = len(components_to_download) 2020 _MaintainSlicedDownloadTrackerFiles(src_obj_metadata, dst_url, 2021 download_file_name, logger, 2022 api_selector, num_components) 2023 2024 # Resize the download file so each child process can seek to its start byte. 2025 with open(download_file_name, 'ab') as fp: 2026 fp.truncate(src_obj_metadata.size) 2027 2028 cp_results = command_obj.Apply( 2029 _PerformSlicedDownloadObjectToFile, components_to_download, 2030 copy_exception_handler, arg_checker=gslib.command.DummyArgChecker, 2031 parallel_operations_override=True, should_return_results=True) 2032 2033 if len(cp_results) < num_components: 2034 raise CommandException( 2035 'Some components of %s were not downloaded successfully. ' 2036 'Please retry this download.' % dst_url.object_name) 2037 2038 # Crc32c hashes have to be concatenated in the correct order. 2039 cp_results = sorted(cp_results, key=attrgetter('component_num')) 2040 crc32c = cp_results[0].crc32c 2041 if crc32c is not None: 2042 for i in range(1, num_components): 2043 crc32c = ConcatCrc32c(crc32c, cp_results[i].crc32c, 2044 component_lengths[i]) 2045 2046 bytes_transferred = 0 2047 expect_gzip = (src_obj_metadata.contentEncoding and 2048 src_obj_metadata.contentEncoding.lower().endswith('gzip')) 2049 for cp_result in cp_results: 2050 bytes_transferred += cp_result.bytes_transferred 2051 server_gzip = (cp_result.server_encoding and 2052 cp_result.server_encoding.lower().endswith('gzip')) 2053 # If the server gzipped any components on the fly, we will have no chance of 2054 # properly reconstructing the file. 2055 if server_gzip and not expect_gzip: 2056 raise CommandException( 2057 'Download of %s failed because the server sent back data with an ' 2058 'unexpected encoding.' % dst_url.object_name) 2059 2060 return bytes_transferred, crc32c 2061 2062 2063def _DownloadObjectToFileResumable(src_url, src_obj_metadata, dst_url, 2064 download_file_name, gsutil_api, logger, 2065 digesters, component_num=None, start_byte=0, 2066 end_byte=None): 2067 """Downloads an object to a local file using the resumable strategy. 2068 2069 Args: 2070 src_url: Source CloudUrl. 2071 src_obj_metadata: Metadata from the source object. 2072 dst_url: Destination FileUrl. 2073 download_file_name: Temporary file name to be used for download. 2074 gsutil_api: gsutil Cloud API instance to use for the download. 2075 logger: for outputting log messages. 2076 digesters: Digesters corresponding to the hash algorithms that will be used 2077 for validation. 2078 component_num: Which component of a sliced download this call is for, or 2079 None if this is not a sliced download. 2080 start_byte: The first byte of a byte range for a sliced download. 2081 end_byte: The last byte of a byte range for a sliced download. 2082 2083 Returns: 2084 (bytes_transferred, server_encoding) 2085 bytes_transferred: Number of bytes transferred from server this call. 2086 server_encoding: Content-encoding string if it was detected that the server 2087 sent encoded bytes during transfer, None otherwise. 2088 """ 2089 if end_byte is None: 2090 end_byte = src_obj_metadata.size - 1 2091 download_size = end_byte - start_byte + 1 2092 2093 is_sliced = component_num is not None 2094 api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme) 2095 server_encoding = None 2096 2097 # Used for logging 2098 download_name = dst_url.object_name 2099 if is_sliced: 2100 download_name += ' component %d' % component_num 2101 2102 try: 2103 fp = open(download_file_name, 'r+b') 2104 fp.seek(start_byte) 2105 api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme) 2106 existing_file_size = GetFileSize(fp) 2107 2108 tracker_file_name, download_start_byte = ( 2109 ReadOrCreateDownloadTrackerFile(src_obj_metadata, dst_url, logger, 2110 api_selector, start_byte, 2111 existing_file_size, component_num)) 2112 2113 if download_start_byte < start_byte or download_start_byte > end_byte + 1: 2114 DeleteTrackerFile(tracker_file_name) 2115 raise CommandException( 2116 'Resumable download start point for %s is not in the correct byte ' 2117 'range. Deleting tracker file, so if you re-try this download it ' 2118 'will start from scratch' % download_name) 2119 2120 download_complete = (download_start_byte == start_byte + download_size) 2121 resuming = (download_start_byte != start_byte) and not download_complete 2122 if resuming: 2123 logger.info('Resuming download for %s', download_name) 2124 elif download_complete: 2125 logger.info( 2126 'Download already complete for %s, skipping download but ' 2127 'will run integrity checks.', download_name) 2128 2129 # This is used for resuming downloads, but also for passing the mediaLink 2130 # and size into the download for new downloads so that we can avoid 2131 # making an extra HTTP call. 2132 serialization_data = GetDownloadSerializationData( 2133 src_obj_metadata, progress=download_start_byte) 2134 2135 if resuming or download_complete: 2136 # Catch up our digester with the hash data. 2137 bytes_digested = 0 2138 total_bytes_to_digest = download_start_byte - start_byte 2139 hash_callback = ProgressCallbackWithBackoff( 2140 total_bytes_to_digest, 2141 FileProgressCallbackHandler( 2142 ConstructAnnounceText('Hashing', 2143 dst_url.url_string), logger).call) 2144 2145 while bytes_digested < total_bytes_to_digest: 2146 bytes_to_read = min(DEFAULT_FILE_BUFFER_SIZE, 2147 total_bytes_to_digest - bytes_digested) 2148 data = fp.read(bytes_to_read) 2149 bytes_digested += bytes_to_read 2150 for alg_name in digesters: 2151 digesters[alg_name].update(data) 2152 hash_callback.Progress(len(data)) 2153 2154 elif not is_sliced: 2155 # Delete file contents and start entire object download from scratch. 2156 fp.truncate(0) 2157 existing_file_size = 0 2158 2159 progress_callback = FileProgressCallbackHandler( 2160 ConstructAnnounceText('Downloading', dst_url.url_string), logger, 2161 start_byte, download_size).call 2162 2163 if global_copy_helper_opts.test_callback_file: 2164 with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: 2165 progress_callback = pickle.loads(test_fp.read()).call 2166 2167 if is_sliced and src_obj_metadata.size >= ResumableThreshold(): 2168 fp = SlicedDownloadFileWrapper(fp, tracker_file_name, src_obj_metadata, 2169 start_byte, end_byte) 2170 2171 # TODO: With gzip encoding (which may occur on-the-fly and not be part of 2172 # the object's metadata), when we request a range to resume, it's possible 2173 # that the server will just resend the entire object, which means our 2174 # caught-up hash will be incorrect. We recalculate the hash on 2175 # the local file in the case of a failed gzip hash anyway, but it would 2176 # be better if we actively detected this case. 2177 if not download_complete: 2178 fp.seek(download_start_byte) 2179 server_encoding = gsutil_api.GetObjectMedia( 2180 src_url.bucket_name, src_url.object_name, fp, 2181 start_byte=download_start_byte, end_byte=end_byte, 2182 generation=src_url.generation, object_size=src_obj_metadata.size, 2183 download_strategy=CloudApi.DownloadStrategy.RESUMABLE, 2184 provider=src_url.scheme, serialization_data=serialization_data, 2185 digesters=digesters, progress_callback=progress_callback) 2186 2187 except ResumableDownloadException as e: 2188 logger.warning('Caught ResumableDownloadException (%s) for download of %s.', 2189 e.reason, download_name) 2190 raise 2191 finally: 2192 if fp: 2193 fp.close() 2194 2195 bytes_transferred = end_byte - download_start_byte + 1 2196 return bytes_transferred, server_encoding 2197 2198 2199def _DownloadObjectToFileNonResumable(src_url, src_obj_metadata, dst_url, 2200 download_file_name, gsutil_api, logger, 2201 digesters): 2202 """Downloads an object to a local file using the non-resumable strategy. 2203 2204 Args: 2205 src_url: Source CloudUrl. 2206 src_obj_metadata: Metadata from the source object. 2207 dst_url: Destination FileUrl. 2208 download_file_name: Temporary file name to be used for download. 2209 gsutil_api: gsutil Cloud API instance to use for the download. 2210 logger: for outputting log messages. 2211 digesters: Digesters corresponding to the hash algorithms that will be used 2212 for validation. 2213 Returns: 2214 (bytes_transferred, server_encoding) 2215 bytes_transferred: Number of bytes transferred from server this call. 2216 server_encoding: Content-encoding string if it was detected that the server 2217 sent encoded bytes during transfer, None otherwise. 2218 """ 2219 try: 2220 fp = open(download_file_name, 'w') 2221 2222 # This is used to pass the mediaLink and the size into the download so that 2223 # we can avoid making an extra HTTP call. 2224 serialization_data = GetDownloadSerializationData(src_obj_metadata) 2225 2226 progress_callback = FileProgressCallbackHandler( 2227 ConstructAnnounceText('Downloading', dst_url.url_string), logger).call 2228 2229 if global_copy_helper_opts.test_callback_file: 2230 with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: 2231 progress_callback = pickle.loads(test_fp.read()).call 2232 2233 server_encoding = gsutil_api.GetObjectMedia( 2234 src_url.bucket_name, src_url.object_name, fp, 2235 generation=src_url.generation, object_size=src_obj_metadata.size, 2236 download_strategy=CloudApi.DownloadStrategy.ONE_SHOT, 2237 provider=src_url.scheme, serialization_data=serialization_data, 2238 digesters=digesters, progress_callback=progress_callback) 2239 finally: 2240 if fp: 2241 fp.close() 2242 2243 return src_obj_metadata.size, server_encoding 2244 2245 2246def _DownloadObjectToFile(src_url, src_obj_metadata, dst_url, 2247 gsutil_api, logger, command_obj, 2248 copy_exception_handler, allow_splitting=True): 2249 """Downloads an object to a local file. 2250 2251 Args: 2252 src_url: Source CloudUrl. 2253 src_obj_metadata: Metadata from the source object. 2254 dst_url: Destination FileUrl. 2255 gsutil_api: gsutil Cloud API instance to use for the download. 2256 logger: for outputting log messages. 2257 command_obj: command object for use in Apply in sliced downloads. 2258 copy_exception_handler: For handling copy exceptions during Apply. 2259 allow_splitting: Whether or not to allow sliced download. 2260 Returns: 2261 (elapsed_time, bytes_transferred, dst_url, md5), where time elapsed 2262 excludes initial GET. 2263 2264 Raises: 2265 FileConcurrencySkipError: if this download is already in progress. 2266 CommandException: if other errors encountered. 2267 """ 2268 global open_files_map, open_files_lock 2269 if dst_url.object_name.endswith(dst_url.delim): 2270 logger.warn('\n'.join(textwrap.wrap( 2271 'Skipping attempt to download to filename ending with slash (%s). This ' 2272 'typically happens when using gsutil to download from a subdirectory ' 2273 'created by the Cloud Console (https://cloud.google.com/console)' 2274 % dst_url.object_name))) 2275 return (0, 0, dst_url, '') 2276 2277 api_selector = gsutil_api.GetApiSelector(provider=src_url.scheme) 2278 download_strategy = _SelectDownloadStrategy(dst_url) 2279 sliced_download = _ShouldDoSlicedDownload( 2280 download_strategy, src_obj_metadata, allow_splitting, logger) 2281 2282 download_file_name, need_to_unzip = _GetDownloadFile( 2283 dst_url, src_obj_metadata, logger) 2284 2285 # Ensure another process/thread is not already writing to this file. 2286 with open_files_lock: 2287 if open_files_map.get(download_file_name, False): 2288 raise FileConcurrencySkipError 2289 open_files_map[download_file_name] = True 2290 2291 # Set up hash digesters. 2292 consider_md5 = src_obj_metadata.md5Hash and not sliced_download 2293 hash_algs = GetDownloadHashAlgs(logger, consider_md5=consider_md5, 2294 consider_crc32c=src_obj_metadata.crc32c) 2295 digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {}) 2296 2297 # Tracks whether the server used a gzip encoding. 2298 server_encoding = None 2299 download_complete = (src_obj_metadata.size == 0) 2300 bytes_transferred = 0 2301 2302 start_time = time.time() 2303 if not download_complete: 2304 if sliced_download: 2305 (bytes_transferred, crc32c) = ( 2306 _DoSlicedDownload(src_url, src_obj_metadata, dst_url, 2307 download_file_name, command_obj, logger, 2308 copy_exception_handler, api_selector)) 2309 if 'crc32c' in digesters: 2310 digesters['crc32c'].crcValue = crc32c 2311 elif download_strategy is CloudApi.DownloadStrategy.ONE_SHOT: 2312 (bytes_transferred, server_encoding) = ( 2313 _DownloadObjectToFileNonResumable(src_url, src_obj_metadata, dst_url, 2314 download_file_name, gsutil_api, 2315 logger, digesters)) 2316 elif download_strategy is CloudApi.DownloadStrategy.RESUMABLE: 2317 (bytes_transferred, server_encoding) = ( 2318 _DownloadObjectToFileResumable(src_url, src_obj_metadata, dst_url, 2319 download_file_name, gsutil_api, logger, 2320 digesters)) 2321 else: 2322 raise CommandException('Invalid download strategy %s chosen for' 2323 'file %s' % (download_strategy, 2324 download_file_name)) 2325 end_time = time.time() 2326 2327 server_gzip = server_encoding and server_encoding.lower().endswith('gzip') 2328 local_md5 = _ValidateAndCompleteDownload( 2329 logger, src_url, src_obj_metadata, dst_url, need_to_unzip, server_gzip, 2330 digesters, hash_algs, download_file_name, api_selector, bytes_transferred) 2331 2332 with open_files_lock: 2333 open_files_map.delete(download_file_name) 2334 2335 return (end_time - start_time, bytes_transferred, dst_url, local_md5) 2336 2337 2338def _GetDownloadTempZipFileName(dst_url): 2339 """Returns temporary file name for a temporarily compressed download.""" 2340 return '%s_.gztmp' % dst_url.object_name 2341 2342 2343def _GetDownloadTempFileName(dst_url): 2344 """Returns temporary download file name for uncompressed downloads.""" 2345 return '%s_.gstmp' % dst_url.object_name 2346 2347 2348def _ValidateAndCompleteDownload(logger, src_url, src_obj_metadata, dst_url, 2349 need_to_unzip, server_gzip, digesters, 2350 hash_algs, download_file_name, 2351 api_selector, bytes_transferred): 2352 """Validates and performs necessary operations on a downloaded file. 2353 2354 Validates the integrity of the downloaded file using hash_algs. If the file 2355 was compressed (temporarily), the file will be decompressed. Then, if the 2356 integrity of the file was successfully validated, the file will be moved 2357 from its temporary download location to its permanent location on disk. 2358 2359 Args: 2360 logger: For outputting log messages. 2361 src_url: StorageUrl for the source object. 2362 src_obj_metadata: Metadata for the source object, potentially containing 2363 hash values. 2364 dst_url: StorageUrl describing the destination file. 2365 need_to_unzip: If true, a temporary zip file was used and must be 2366 uncompressed as part of validation. 2367 server_gzip: If true, the server gzipped the bytes (regardless of whether 2368 the object metadata claimed it was gzipped). 2369 digesters: dict of {string, hash digester} that contains up-to-date digests 2370 computed during the download. If a digester for a particular 2371 algorithm is None, an up-to-date digest is not available and the 2372 hash must be recomputed from the local file. 2373 hash_algs: dict of {string, hash algorithm} that can be used if digesters 2374 don't have up-to-date digests. 2375 download_file_name: Temporary file name that was used for download. 2376 api_selector: The Cloud API implementation used (used tracker file naming). 2377 bytes_transferred: Number of bytes downloaded (used for logging). 2378 2379 Returns: 2380 An MD5 of the local file, if one was calculated as part of the integrity 2381 check. 2382 """ 2383 final_file_name = dst_url.object_name 2384 file_name = download_file_name 2385 digesters_succeeded = True 2386 2387 for alg in digesters: 2388 # If we get a digester with a None algorithm, the underlying 2389 # implementation failed to calculate a digest, so we will need to 2390 # calculate one from scratch. 2391 if not digesters[alg]: 2392 digesters_succeeded = False 2393 break 2394 2395 if digesters_succeeded: 2396 local_hashes = _CreateDigestsFromDigesters(digesters) 2397 else: 2398 local_hashes = _CreateDigestsFromLocalFile( 2399 logger, hash_algs, file_name, final_file_name, src_obj_metadata) 2400 2401 digest_verified = True 2402 hash_invalid_exception = None 2403 try: 2404 _CheckHashes(logger, src_url, src_obj_metadata, final_file_name, 2405 local_hashes) 2406 DeleteDownloadTrackerFiles(dst_url, api_selector) 2407 except HashMismatchException, e: 2408 # If an non-gzipped object gets sent with gzip content encoding, the hash 2409 # we calculate will match the gzipped bytes, not the original object. Thus, 2410 # we'll need to calculate and check it after unzipping. 2411 if server_gzip: 2412 logger.debug( 2413 'Hash did not match but server gzipped the content, will ' 2414 'recalculate.') 2415 digest_verified = False 2416 elif api_selector == ApiSelector.XML: 2417 logger.debug( 2418 'Hash did not match but server may have gzipped the content, will ' 2419 'recalculate.') 2420 # Save off the exception in case this isn't a gzipped file. 2421 hash_invalid_exception = e 2422 digest_verified = False 2423 else: 2424 DeleteDownloadTrackerFiles(dst_url, api_selector) 2425 if _RENAME_ON_HASH_MISMATCH: 2426 os.rename(file_name, 2427 final_file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX) 2428 else: 2429 os.unlink(file_name) 2430 raise 2431 2432 if need_to_unzip or server_gzip: 2433 # Log that we're uncompressing if the file is big enough that 2434 # decompressing would make it look like the transfer "stalled" at the end. 2435 if bytes_transferred > TEN_MIB: 2436 logger.info( 2437 'Uncompressing temporarily gzipped file to %s...', final_file_name) 2438 2439 gzip_fp = None 2440 try: 2441 # Downloaded temporarily gzipped file, unzip to file without '_.gztmp' 2442 # suffix. 2443 gzip_fp = gzip.open(file_name, 'rb') 2444 with open(final_file_name, 'wb') as f_out: 2445 data = gzip_fp.read(GZIP_CHUNK_SIZE) 2446 while data: 2447 f_out.write(data) 2448 data = gzip_fp.read(GZIP_CHUNK_SIZE) 2449 except IOError, e: 2450 # In the XML case where we don't know if the file was gzipped, raise 2451 # the original hash exception if we find that it wasn't. 2452 if 'Not a gzipped file' in str(e) and hash_invalid_exception: 2453 # Linter improperly thinks we're raising None despite the above check. 2454 # pylint: disable=raising-bad-type 2455 raise hash_invalid_exception 2456 finally: 2457 if gzip_fp: 2458 gzip_fp.close() 2459 2460 os.unlink(file_name) 2461 file_name = final_file_name 2462 2463 if not digest_verified: 2464 try: 2465 # Recalculate hashes on the unzipped local file. 2466 local_hashes = _CreateDigestsFromLocalFile( 2467 logger, hash_algs, file_name, final_file_name, src_obj_metadata) 2468 _CheckHashes(logger, src_url, src_obj_metadata, final_file_name, 2469 local_hashes) 2470 DeleteDownloadTrackerFiles(dst_url, api_selector) 2471 except HashMismatchException: 2472 DeleteDownloadTrackerFiles(dst_url, api_selector) 2473 if _RENAME_ON_HASH_MISMATCH: 2474 os.rename(file_name, 2475 file_name + _RENAME_ON_HASH_MISMATCH_SUFFIX) 2476 else: 2477 os.unlink(file_name) 2478 raise 2479 2480 if file_name != final_file_name: 2481 # Data is still in a temporary file, so move it to a permanent location. 2482 if os.path.exists(final_file_name): 2483 os.unlink(final_file_name) 2484 os.rename(file_name, 2485 final_file_name) 2486 2487 if 'md5' in local_hashes: 2488 return local_hashes['md5'] 2489 2490 2491def _CopyFileToFile(src_url, dst_url): 2492 """Copies a local file to a local file. 2493 2494 Args: 2495 src_url: Source FileUrl. 2496 dst_url: Destination FileUrl. 2497 Returns: 2498 (elapsed_time, bytes_transferred, dst_url, md5=None). 2499 2500 Raises: 2501 CommandException: if errors encountered. 2502 """ 2503 src_fp = GetStreamFromFileUrl(src_url) 2504 dir_name = os.path.dirname(dst_url.object_name) 2505 if dir_name and not os.path.exists(dir_name): 2506 os.makedirs(dir_name) 2507 dst_fp = open(dst_url.object_name, 'wb') 2508 start_time = time.time() 2509 shutil.copyfileobj(src_fp, dst_fp) 2510 end_time = time.time() 2511 return (end_time - start_time, os.path.getsize(dst_url.object_name), 2512 dst_url, None) 2513 2514 2515def _DummyTrackerCallback(_): 2516 pass 2517 2518 2519# pylint: disable=undefined-variable 2520def _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata, dst_url, 2521 dst_obj_metadata, preconditions, gsutil_api, 2522 logger): 2523 """Copies from src_url to dst_url in "daisy chain" mode. 2524 2525 See -D OPTION documentation about what daisy chain mode is. 2526 2527 Args: 2528 src_url: Source CloudUrl 2529 src_obj_metadata: Metadata from source object 2530 dst_url: Destination CloudUrl 2531 dst_obj_metadata: Object-specific metadata that should be overidden during 2532 the copy. 2533 preconditions: Preconditions to use for the copy. 2534 gsutil_api: gsutil Cloud API to use for the copy. 2535 logger: For outputting log messages. 2536 2537 Returns: 2538 (elapsed_time, bytes_transferred, dst_url with generation, 2539 md5 hash of destination) excluding overhead like initial GET. 2540 2541 Raises: 2542 CommandException: if errors encountered. 2543 """ 2544 # We don't attempt to preserve ACLs across providers because 2545 # GCS and S3 support different ACLs and disjoint principals. 2546 if (global_copy_helper_opts.preserve_acl 2547 and src_url.scheme != dst_url.scheme): 2548 raise NotImplementedError( 2549 'Cross-provider cp -p not supported') 2550 if not global_copy_helper_opts.preserve_acl: 2551 dst_obj_metadata.acl = [] 2552 2553 # Don't use callbacks for downloads on the daisy chain wrapper because 2554 # upload callbacks will output progress, but respect test hooks if present. 2555 progress_callback = None 2556 if global_copy_helper_opts.test_callback_file: 2557 with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp: 2558 progress_callback = pickle.loads(test_fp.read()).call 2559 2560 start_time = time.time() 2561 upload_fp = DaisyChainWrapper(src_url, src_obj_metadata.size, gsutil_api, 2562 progress_callback=progress_callback) 2563 uploaded_object = None 2564 if src_obj_metadata.size == 0: 2565 # Resumable uploads of size 0 are not supported. 2566 uploaded_object = gsutil_api.UploadObject( 2567 upload_fp, object_metadata=dst_obj_metadata, 2568 canned_acl=global_copy_helper_opts.canned_acl, 2569 preconditions=preconditions, provider=dst_url.scheme, 2570 fields=UPLOAD_RETURN_FIELDS, size=src_obj_metadata.size) 2571 else: 2572 # TODO: Support process-break resumes. This will resume across connection 2573 # breaks and server errors, but the tracker callback is a no-op so this 2574 # won't resume across gsutil runs. 2575 # TODO: Test retries via test_callback_file. 2576 uploaded_object = gsutil_api.UploadObjectResumable( 2577 upload_fp, object_metadata=dst_obj_metadata, 2578 canned_acl=global_copy_helper_opts.canned_acl, 2579 preconditions=preconditions, provider=dst_url.scheme, 2580 fields=UPLOAD_RETURN_FIELDS, size=src_obj_metadata.size, 2581 progress_callback=FileProgressCallbackHandler( 2582 ConstructAnnounceText('Uploading', dst_url.url_string), 2583 logger).call, 2584 tracker_callback=_DummyTrackerCallback) 2585 end_time = time.time() 2586 2587 try: 2588 _CheckCloudHashes(logger, src_url, dst_url, src_obj_metadata, 2589 uploaded_object) 2590 except HashMismatchException: 2591 if _RENAME_ON_HASH_MISMATCH: 2592 corrupted_obj_metadata = apitools_messages.Object( 2593 name=dst_obj_metadata.name, 2594 bucket=dst_obj_metadata.bucket, 2595 etag=uploaded_object.etag) 2596 dst_obj_metadata.name = (dst_url.object_name + 2597 _RENAME_ON_HASH_MISMATCH_SUFFIX) 2598 gsutil_api.CopyObject(corrupted_obj_metadata, 2599 dst_obj_metadata, provider=dst_url.scheme) 2600 # If the digest doesn't match, delete the object. 2601 gsutil_api.DeleteObject(dst_url.bucket_name, dst_url.object_name, 2602 generation=uploaded_object.generation, 2603 provider=dst_url.scheme) 2604 raise 2605 2606 result_url = dst_url.Clone() 2607 result_url.generation = GenerationFromUrlAndString( 2608 result_url, uploaded_object.generation) 2609 2610 return (end_time - start_time, src_obj_metadata.size, result_url, 2611 uploaded_object.md5Hash) 2612 2613 2614# pylint: disable=undefined-variable 2615# pylint: disable=too-many-statements 2616def PerformCopy(logger, src_url, dst_url, gsutil_api, command_obj, 2617 copy_exception_handler, allow_splitting=True, 2618 headers=None, manifest=None, gzip_exts=None): 2619 """Performs copy from src_url to dst_url, handling various special cases. 2620 2621 Args: 2622 logger: for outputting log messages. 2623 src_url: Source StorageUrl. 2624 dst_url: Destination StorageUrl. 2625 gsutil_api: gsutil Cloud API instance to use for the copy. 2626 command_obj: command object for use in Apply in parallel composite uploads 2627 and sliced object downloads. 2628 copy_exception_handler: for handling copy exceptions during Apply. 2629 allow_splitting: Whether to allow the file to be split into component 2630 pieces for an parallel composite upload or download. 2631 headers: optional headers to use for the copy operation. 2632 manifest: optional manifest for tracking copy operations. 2633 gzip_exts: List of file extensions to gzip for uploads, if any. 2634 2635 Returns: 2636 (elapsed_time, bytes_transferred, version-specific dst_url) excluding 2637 overhead like initial GET. 2638 2639 Raises: 2640 ItemExistsError: if no clobber flag is specified and the destination 2641 object already exists. 2642 SkipUnsupportedObjectError: if skip_unsupported_objects flag is specified 2643 and the source is an unsupported type. 2644 CommandException: if other errors encountered. 2645 """ 2646 if headers: 2647 dst_obj_headers = headers.copy() 2648 else: 2649 dst_obj_headers = {} 2650 2651 # Create a metadata instance for each destination object so metadata 2652 # such as content-type can be applied per-object. 2653 # Initialize metadata from any headers passed in via -h. 2654 dst_obj_metadata = ObjectMetadataFromHeaders(dst_obj_headers) 2655 2656 if dst_url.IsCloudUrl() and dst_url.scheme == 'gs': 2657 preconditions = PreconditionsFromHeaders(dst_obj_headers) 2658 else: 2659 preconditions = Preconditions() 2660 2661 src_obj_metadata = None 2662 src_obj_filestream = None 2663 if src_url.IsCloudUrl(): 2664 src_obj_fields = None 2665 if dst_url.IsCloudUrl(): 2666 # For cloud or daisy chain copy, we need every copyable field. 2667 # If we're not modifying or overriding any of the fields, we can get 2668 # away without retrieving the object metadata because the copy 2669 # operation can succeed with just the destination bucket and object 2670 # name. But if we are sending any metadata, the JSON API will expect a 2671 # complete object resource. Since we want metadata like the object size 2672 # for our own tracking, we just get all of the metadata here. 2673 src_obj_fields = ['cacheControl', 'componentCount', 2674 'contentDisposition', 'contentEncoding', 2675 'contentLanguage', 'contentType', 'crc32c', 2676 'etag', 'generation', 'md5Hash', 'mediaLink', 2677 'metadata', 'metageneration', 'size'] 2678 # We only need the ACL if we're going to preserve it. 2679 if global_copy_helper_opts.preserve_acl: 2680 src_obj_fields.append('acl') 2681 if (src_url.scheme == dst_url.scheme 2682 and not global_copy_helper_opts.daisy_chain): 2683 copy_in_the_cloud = True 2684 else: 2685 copy_in_the_cloud = False 2686 else: 2687 # Just get the fields needed to validate the download. 2688 src_obj_fields = ['crc32c', 'contentEncoding', 'contentType', 'etag', 2689 'mediaLink', 'md5Hash', 'size', 'generation'] 2690 2691 if (src_url.scheme == 's3' and 2692 global_copy_helper_opts.skip_unsupported_objects): 2693 src_obj_fields.append('storageClass') 2694 2695 try: 2696 src_generation = GenerationFromUrlAndString(src_url, src_url.generation) 2697 src_obj_metadata = gsutil_api.GetObjectMetadata( 2698 src_url.bucket_name, src_url.object_name, 2699 generation=src_generation, provider=src_url.scheme, 2700 fields=src_obj_fields) 2701 except NotFoundException: 2702 raise CommandException( 2703 'NotFoundException: Could not retrieve source object %s.' % 2704 src_url.url_string) 2705 if (src_url.scheme == 's3' and 2706 global_copy_helper_opts.skip_unsupported_objects and 2707 src_obj_metadata.storageClass == 'GLACIER'): 2708 raise SkipGlacierError() 2709 2710 src_obj_size = src_obj_metadata.size 2711 dst_obj_metadata.contentType = src_obj_metadata.contentType 2712 if global_copy_helper_opts.preserve_acl: 2713 dst_obj_metadata.acl = src_obj_metadata.acl 2714 # Special case for S3-to-S3 copy URLs using 2715 # global_copy_helper_opts.preserve_acl. 2716 # dst_url will be verified in _CopyObjToObjDaisyChainMode if it 2717 # is not s3 (and thus differs from src_url). 2718 if src_url.scheme == 's3': 2719 acl_text = S3MarkerAclFromObjectMetadata(src_obj_metadata) 2720 if acl_text: 2721 AddS3MarkerAclToObjectMetadata(dst_obj_metadata, acl_text) 2722 else: 2723 try: 2724 src_obj_filestream = GetStreamFromFileUrl(src_url) 2725 except Exception, e: # pylint: disable=broad-except 2726 if command_obj.continue_on_error: 2727 message = 'Error copying %s: %s' % (src_url, str(e)) 2728 command_obj.op_failure_count += 1 2729 logger.error(message) 2730 return 2731 else: 2732 raise CommandException('Error opening file "%s": %s.' % (src_url, 2733 e.message)) 2734 if src_url.IsStream(): 2735 src_obj_size = None 2736 else: 2737 src_obj_size = os.path.getsize(src_url.object_name) 2738 2739 if global_copy_helper_opts.use_manifest: 2740 # Set the source size in the manifest. 2741 manifest.Set(src_url.url_string, 'size', src_obj_size) 2742 2743 if (dst_url.scheme == 's3' and src_obj_size > S3_MAX_UPLOAD_SIZE 2744 and src_url != 's3'): 2745 raise CommandException( 2746 '"%s" exceeds the maximum gsutil-supported size for an S3 upload. S3 ' 2747 'objects greater than %s in size require multipart uploads, which ' 2748 'gsutil does not support.' % (src_url, 2749 MakeHumanReadable(S3_MAX_UPLOAD_SIZE))) 2750 2751 # On Windows, stdin is opened as text mode instead of binary which causes 2752 # problems when piping a binary file, so this switches it to binary mode. 2753 if IS_WINDOWS and src_url.IsFileUrl() and src_url.IsStream(): 2754 msvcrt.setmode(GetStreamFromFileUrl(src_url).fileno(), os.O_BINARY) 2755 2756 if global_copy_helper_opts.no_clobber: 2757 # There are two checks to prevent clobbering: 2758 # 1) The first check is to see if the URL 2759 # already exists at the destination and prevent the upload/download 2760 # from happening. This is done by the exists() call. 2761 # 2) The second check is only relevant if we are writing to gs. We can 2762 # enforce that the server only writes the object if it doesn't exist 2763 # by specifying the header below. This check only happens at the 2764 # server after the complete file has been uploaded. We specify this 2765 # header to prevent a race condition where a destination file may 2766 # be created after the first check and before the file is fully 2767 # uploaded. 2768 # In order to save on unnecessary uploads/downloads we perform both 2769 # checks. However, this may come at the cost of additional HTTP calls. 2770 if preconditions.gen_match: 2771 raise ArgumentException('Specifying x-goog-if-generation-match is ' 2772 'not supported with cp -n') 2773 else: 2774 preconditions.gen_match = 0 2775 if dst_url.IsFileUrl() and os.path.exists(dst_url.object_name): 2776 # The local file may be a partial. Check the file sizes. 2777 if src_obj_size == os.path.getsize(dst_url.object_name): 2778 raise ItemExistsError() 2779 elif dst_url.IsCloudUrl(): 2780 try: 2781 dst_object = gsutil_api.GetObjectMetadata( 2782 dst_url.bucket_name, dst_url.object_name, provider=dst_url.scheme) 2783 except NotFoundException: 2784 dst_object = None 2785 if dst_object: 2786 raise ItemExistsError() 2787 2788 if dst_url.IsCloudUrl(): 2789 # Cloud storage API gets object and bucket name from metadata. 2790 dst_obj_metadata.name = dst_url.object_name 2791 dst_obj_metadata.bucket = dst_url.bucket_name 2792 if src_url.IsCloudUrl(): 2793 # Preserve relevant metadata from the source object if it's not already 2794 # provided from the headers. 2795 CopyObjectMetadata(src_obj_metadata, dst_obj_metadata, override=False) 2796 src_obj_metadata.name = src_url.object_name 2797 src_obj_metadata.bucket = src_url.bucket_name 2798 else: 2799 _SetContentTypeFromFile(src_url, dst_obj_metadata) 2800 else: 2801 # Files don't have Cloud API metadata. 2802 dst_obj_metadata = None 2803 2804 _LogCopyOperation(logger, src_url, dst_url, dst_obj_metadata) 2805 2806 if src_url.IsCloudUrl(): 2807 if dst_url.IsFileUrl(): 2808 return _DownloadObjectToFile(src_url, src_obj_metadata, dst_url, 2809 gsutil_api, logger, command_obj, 2810 copy_exception_handler, 2811 allow_splitting=allow_splitting) 2812 elif copy_in_the_cloud: 2813 return _CopyObjToObjInTheCloud(src_url, src_obj_metadata, dst_url, 2814 dst_obj_metadata, preconditions, 2815 gsutil_api, logger) 2816 else: 2817 return _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata, 2818 dst_url, dst_obj_metadata, 2819 preconditions, gsutil_api, logger) 2820 else: # src_url.IsFileUrl() 2821 if dst_url.IsCloudUrl(): 2822 return _UploadFileToObject( 2823 src_url, src_obj_filestream, src_obj_size, dst_url, 2824 dst_obj_metadata, preconditions, gsutil_api, logger, command_obj, 2825 copy_exception_handler, gzip_exts=gzip_exts, 2826 allow_splitting=allow_splitting) 2827 else: # dst_url.IsFileUrl() 2828 return _CopyFileToFile(src_url, dst_url) 2829 2830 2831class Manifest(object): 2832 """Stores the manifest items for the CpCommand class.""" 2833 2834 def __init__(self, path): 2835 # self.items contains a dictionary of rows 2836 self.items = {} 2837 self.manifest_filter = {} 2838 self.lock = CreateLock() 2839 2840 self.manifest_path = os.path.expanduser(path) 2841 self._ParseManifest() 2842 self._CreateManifestFile() 2843 2844 def _ParseManifest(self): 2845 """Load and parse a manifest file. 2846 2847 This information will be used to skip any files that have a skip or OK 2848 status. 2849 """ 2850 try: 2851 if os.path.exists(self.manifest_path): 2852 with open(self.manifest_path, 'rb') as f: 2853 first_row = True 2854 reader = csv.reader(f) 2855 for row in reader: 2856 if first_row: 2857 try: 2858 source_index = row.index('Source') 2859 result_index = row.index('Result') 2860 except ValueError: 2861 # No header and thus not a valid manifest file. 2862 raise CommandException( 2863 'Missing headers in manifest file: %s' % self.manifest_path) 2864 first_row = False 2865 source = row[source_index] 2866 result = row[result_index] 2867 if result in ['OK', 'skip']: 2868 # We're always guaranteed to take the last result of a specific 2869 # source url. 2870 self.manifest_filter[source] = result 2871 except IOError: 2872 raise CommandException('Could not parse %s' % self.manifest_path) 2873 2874 def WasSuccessful(self, src): 2875 """Returns whether the specified src url was marked as successful.""" 2876 return src in self.manifest_filter 2877 2878 def _CreateManifestFile(self): 2879 """Opens the manifest file and assigns it to the file pointer.""" 2880 try: 2881 if ((not os.path.exists(self.manifest_path)) 2882 or (os.stat(self.manifest_path).st_size == 0)): 2883 # Add headers to the new file. 2884 with open(self.manifest_path, 'wb', 1) as f: 2885 writer = csv.writer(f) 2886 writer.writerow(['Source', 2887 'Destination', 2888 'Start', 2889 'End', 2890 'Md5', 2891 'UploadId', 2892 'Source Size', 2893 'Bytes Transferred', 2894 'Result', 2895 'Description']) 2896 except IOError: 2897 raise CommandException('Could not create manifest file.') 2898 2899 def Set(self, url, key, value): 2900 if value is None: 2901 # In case we don't have any information to set we bail out here. 2902 # This is so that we don't clobber existing information. 2903 # To zero information pass '' instead of None. 2904 return 2905 if url in self.items: 2906 self.items[url][key] = value 2907 else: 2908 self.items[url] = {key: value} 2909 2910 def Initialize(self, source_url, destination_url): 2911 # Always use the source_url as the key for the item. This is unique. 2912 self.Set(source_url, 'source_uri', source_url) 2913 self.Set(source_url, 'destination_uri', destination_url) 2914 self.Set(source_url, 'start_time', datetime.datetime.utcnow()) 2915 2916 def SetResult(self, source_url, bytes_transferred, result, 2917 description=''): 2918 self.Set(source_url, 'bytes', bytes_transferred) 2919 self.Set(source_url, 'result', result) 2920 self.Set(source_url, 'description', description) 2921 self.Set(source_url, 'end_time', datetime.datetime.utcnow()) 2922 self._WriteRowToManifestFile(source_url) 2923 self._RemoveItemFromManifest(source_url) 2924 2925 def _WriteRowToManifestFile(self, url): 2926 """Writes a manifest entry to the manifest file for the url argument.""" 2927 row_item = self.items[url] 2928 data = [ 2929 str(row_item['source_uri'].encode(UTF8)), 2930 str(row_item['destination_uri'].encode(UTF8)), 2931 '%sZ' % row_item['start_time'].isoformat(), 2932 '%sZ' % row_item['end_time'].isoformat(), 2933 row_item['md5'] if 'md5' in row_item else '', 2934 row_item['upload_id'] if 'upload_id' in row_item else '', 2935 str(row_item['size']) if 'size' in row_item else '', 2936 str(row_item['bytes']) if 'bytes' in row_item else '', 2937 row_item['result'], 2938 row_item['description'].encode(UTF8)] 2939 2940 # Aquire a lock to prevent multiple threads writing to the same file at 2941 # the same time. This would cause a garbled mess in the manifest file. 2942 with self.lock: 2943 with open(self.manifest_path, 'a', 1) as f: # 1 == line buffered 2944 writer = csv.writer(f) 2945 writer.writerow(data) 2946 2947 def _RemoveItemFromManifest(self, url): 2948 # Remove the item from the dictionary since we're done with it and 2949 # we don't want the dictionary to grow too large in memory for no good 2950 # reason. 2951 del self.items[url] 2952 2953 2954class ItemExistsError(Exception): 2955 """Exception class for objects that are skipped because they already exist.""" 2956 pass 2957 2958 2959class SkipUnsupportedObjectError(Exception): 2960 """Exception for objects skipped because they are an unsupported type.""" 2961 2962 def __init__(self): 2963 super(SkipUnsupportedObjectError, self).__init__() 2964 self.unsupported_type = 'Unknown' 2965 2966 2967class SkipGlacierError(SkipUnsupportedObjectError): 2968 """Exception for objects skipped because they are an unsupported type.""" 2969 2970 def __init__(self): 2971 super(SkipGlacierError, self).__init__() 2972 self.unsupported_type = 'GLACIER' 2973 2974 2975def GetPathBeforeFinalDir(url): 2976 """Returns the path section before the final directory component of the URL. 2977 2978 This handles cases for file system directories, bucket, and bucket 2979 subdirectories. Example: for gs://bucket/dir/ we'll return 'gs://bucket', 2980 and for file://dir we'll return file:// 2981 2982 Args: 2983 url: StorageUrl representing a filesystem directory, cloud bucket or 2984 bucket subdir. 2985 2986 Returns: 2987 String name of above-described path, sans final path separator. 2988 """ 2989 sep = url.delim 2990 if url.IsFileUrl(): 2991 past_scheme = url.url_string[len('file://'):] 2992 if past_scheme.find(sep) == -1: 2993 return 'file://' 2994 else: 2995 return 'file://%s' % past_scheme.rstrip(sep).rpartition(sep)[0] 2996 if url.IsBucket(): 2997 return '%s://' % url.scheme 2998 # Else it names a bucket subdir. 2999 return url.url_string.rstrip(sep).rpartition(sep)[0] 3000 3001 3002def _GetPartitionInfo(file_size, max_components, default_component_size): 3003 """Gets info about a file partition for parallel file/object transfers. 3004 3005 Args: 3006 file_size: The number of bytes in the file to be partitioned. 3007 max_components: The maximum number of components that can be composed. 3008 default_component_size: The size of a component, assuming that 3009 max_components is infinite. 3010 Returns: 3011 The number of components in the partitioned file, and the size of each 3012 component (except the last, which will have a different size iff 3013 file_size != 0 (mod num_components)). 3014 """ 3015 # num_components = ceil(file_size / default_component_size) 3016 num_components = DivideAndCeil(file_size, default_component_size) 3017 3018 # num_components must be in the range [2, max_components] 3019 num_components = max(min(num_components, max_components), 2) 3020 3021 # component_size = ceil(file_size / num_components) 3022 component_size = DivideAndCeil(file_size, num_components) 3023 return (num_components, component_size) 3024 3025 3026def _DeleteTempComponentObjectFn(cls, url_to_delete, thread_state=None): 3027 """Wrapper func to be used with command.Apply to delete temporary objects.""" 3028 gsutil_api = GetCloudApiInstance(cls, thread_state) 3029 try: 3030 gsutil_api.DeleteObject( 3031 url_to_delete.bucket_name, url_to_delete.object_name, 3032 generation=url_to_delete.generation, provider=url_to_delete.scheme) 3033 except NotFoundException: 3034 # The temporary object could already be gone if a retry was 3035 # issued at a lower layer but the original request succeeded. 3036 # Barring other errors, the top-level command should still report success, 3037 # so don't raise here. 3038 pass 3039 3040 3041def _ParseParallelUploadTrackerFile(tracker_file, tracker_file_lock): 3042 """Parse the tracker file from the last parallel composite upload attempt. 3043 3044 If it exists, the tracker file is of the format described in 3045 _CreateParallelUploadTrackerFile. If the file doesn't exist or cannot be 3046 read, then the upload will start from the beginning. 3047 3048 Args: 3049 tracker_file: The name of the file to parse. 3050 tracker_file_lock: Lock protecting access to the tracker file. 3051 3052 Returns: 3053 random_prefix: A randomly-generated prefix to the name of the 3054 temporary components. 3055 existing_objects: A list of ObjectFromTracker objects representing 3056 the set of files that have already been uploaded. 3057 """ 3058 3059 def GenerateRandomPrefix(): 3060 return str(random.randint(1, (10 ** 10) - 1)) 3061 3062 existing_objects = [] 3063 try: 3064 with tracker_file_lock: 3065 with open(tracker_file, 'r') as fp: 3066 lines = fp.readlines() 3067 lines = [line.strip() for line in lines] 3068 if not lines: 3069 print('Parallel upload tracker file (%s) was invalid. ' 3070 'Restarting upload from scratch.' % tracker_file) 3071 lines = [GenerateRandomPrefix()] 3072 3073 except IOError as e: 3074 # We can't read the tracker file, so generate a new random prefix. 3075 lines = [GenerateRandomPrefix()] 3076 3077 # Ignore non-existent file (happens first time an upload 3078 # is attempted on a file), but warn user for other errors. 3079 if e.errno != errno.ENOENT: 3080 # Will restart because we failed to read in the file. 3081 print('Couldn\'t read parallel upload tracker file (%s): %s. ' 3082 'Restarting upload from scratch.' % (tracker_file, e.strerror)) 3083 3084 # The first line contains the randomly-generated prefix. 3085 random_prefix = lines[0] 3086 3087 # The remaining lines were written in pairs to describe a single component 3088 # in the form: 3089 # object_name (without random prefix) 3090 # generation 3091 # Newlines are used as the delimiter because only newlines and carriage 3092 # returns are invalid characters in object names, and users can specify 3093 # a custom prefix in the config file. 3094 i = 1 3095 while i < len(lines): 3096 (name, generation) = (lines[i], lines[i+1]) 3097 if not generation: 3098 # Cover the '' case. 3099 generation = None 3100 existing_objects.append(ObjectFromTracker(name, generation)) 3101 i += 2 3102 return (random_prefix, existing_objects) 3103 3104 3105def _AppendComponentTrackerToParallelUploadTrackerFile(tracker_file, component, 3106 tracker_file_lock): 3107 """Appends info about the uploaded component to an existing tracker file. 3108 3109 Follows the format described in _CreateParallelUploadTrackerFile. 3110 3111 Args: 3112 tracker_file: Tracker file to append to. 3113 component: Component that was uploaded. 3114 tracker_file_lock: Thread and process-safe Lock for the tracker file. 3115 """ 3116 lines = _GetParallelUploadTrackerFileLinesForComponents([component]) 3117 lines = [line + '\n' for line in lines] 3118 with tracker_file_lock: 3119 with open(tracker_file, 'a') as f: 3120 f.writelines(lines) 3121 3122 3123def _CreateParallelUploadTrackerFile(tracker_file, random_prefix, components, 3124 tracker_file_lock): 3125 """Writes information about components that were successfully uploaded. 3126 3127 This way the upload can be resumed at a later date. The tracker file has 3128 the format: 3129 random_prefix 3130 temp_object_1_name 3131 temp_object_1_generation 3132 . 3133 . 3134 . 3135 temp_object_N_name 3136 temp_object_N_generation 3137 where N is the number of components that have been successfully uploaded. 3138 3139 Args: 3140 tracker_file: The name of the parallel upload tracker file. 3141 random_prefix: The randomly-generated prefix that was used for 3142 for uploading any existing components. 3143 components: A list of ObjectFromTracker objects that were uploaded. 3144 tracker_file_lock: The lock protecting access to the tracker file. 3145 """ 3146 lines = [random_prefix] 3147 lines += _GetParallelUploadTrackerFileLinesForComponents(components) 3148 lines = [line + '\n' for line in lines] 3149 try: 3150 with tracker_file_lock: 3151 open(tracker_file, 'w').close() # Clear the file. 3152 with open(tracker_file, 'w') as f: 3153 f.writelines(lines) 3154 except IOError as e: 3155 RaiseUnwritableTrackerFileException(tracker_file, e.strerror) 3156 3157 3158def _GetParallelUploadTrackerFileLinesForComponents(components): 3159 """Return a list of the lines for use in a parallel upload tracker file. 3160 3161 The lines represent the given components, using the format as described in 3162 _CreateParallelUploadTrackerFile. 3163 3164 Args: 3165 components: A list of ObjectFromTracker objects that were uploaded. 3166 3167 Returns: 3168 Lines describing components with their generation for outputting to the 3169 tracker file. 3170 """ 3171 lines = [] 3172 for component in components: 3173 generation = None 3174 generation = component.generation 3175 if not generation: 3176 generation = '' 3177 lines += [component.object_name, str(generation)] 3178 return lines 3179 3180 3181def FilterExistingComponents(dst_args, existing_components, bucket_url, 3182 gsutil_api): 3183 """Determines course of action for component objects. 3184 3185 Given the list of all target objects based on partitioning the file and 3186 the list of objects that have already been uploaded successfully, 3187 this function determines which objects should be uploaded, which 3188 existing components are still valid, and which existing components should 3189 be deleted. 3190 3191 Args: 3192 dst_args: The map of file_name -> PerformParallelUploadFileToObjectArgs 3193 calculated by partitioning the file. 3194 existing_components: A list of ObjectFromTracker objects that have been 3195 uploaded in the past. 3196 bucket_url: CloudUrl of the bucket in which the components exist. 3197 gsutil_api: gsutil Cloud API instance to use for retrieving object metadata. 3198 3199 Returns: 3200 components_to_upload: List of components that need to be uploaded. 3201 uploaded_components: List of components that have already been 3202 uploaded and are still valid. 3203 existing_objects_to_delete: List of components that have already 3204 been uploaded, but are no longer valid 3205 and are in a versioned bucket, and 3206 therefore should be deleted. 3207 """ 3208 components_to_upload = [] 3209 existing_component_names = [component.object_name 3210 for component in existing_components] 3211 for component_name in dst_args: 3212 if component_name not in existing_component_names: 3213 components_to_upload.append(dst_args[component_name]) 3214 3215 objects_already_chosen = [] 3216 3217 # Don't reuse any temporary components whose MD5 doesn't match the current 3218 # MD5 of the corresponding part of the file. If the bucket is versioned, 3219 # also make sure that we delete the existing temporary version. 3220 existing_objects_to_delete = [] 3221 uploaded_components = [] 3222 for tracker_object in existing_components: 3223 if (tracker_object.object_name not in dst_args.keys() 3224 or tracker_object.object_name in objects_already_chosen): 3225 # This could happen if the component size has changed. This also serves 3226 # to handle object names that get duplicated in the tracker file due 3227 # to people doing things they shouldn't (e.g., overwriting an existing 3228 # temporary component in a versioned bucket). 3229 3230 url = bucket_url.Clone() 3231 url.object_name = tracker_object.object_name 3232 url.generation = tracker_object.generation 3233 existing_objects_to_delete.append(url) 3234 continue 3235 3236 dst_arg = dst_args[tracker_object.object_name] 3237 file_part = FilePart(dst_arg.filename, dst_arg.file_start, 3238 dst_arg.file_length) 3239 # TODO: calculate MD5's in parallel when possible. 3240 content_md5 = CalculateB64EncodedMd5FromContents(file_part) 3241 3242 try: 3243 # Get the MD5 of the currently-existing component. 3244 dst_url = dst_arg.dst_url 3245 dst_metadata = gsutil_api.GetObjectMetadata( 3246 dst_url.bucket_name, dst_url.object_name, 3247 generation=dst_url.generation, provider=dst_url.scheme, 3248 fields=['md5Hash', 'etag']) 3249 cloud_md5 = dst_metadata.md5Hash 3250 except Exception: # pylint: disable=broad-except 3251 # We don't actually care what went wrong - we couldn't retrieve the 3252 # object to check the MD5, so just upload it again. 3253 cloud_md5 = None 3254 3255 if cloud_md5 != content_md5: 3256 components_to_upload.append(dst_arg) 3257 objects_already_chosen.append(tracker_object.object_name) 3258 if tracker_object.generation: 3259 # If the old object doesn't have a generation (i.e., it isn't in a 3260 # versioned bucket), then we will just overwrite it anyway. 3261 invalid_component_with_generation = dst_arg.dst_url.Clone() 3262 invalid_component_with_generation.generation = tracker_object.generation 3263 existing_objects_to_delete.append(invalid_component_with_generation) 3264 else: 3265 url = dst_arg.dst_url.Clone() 3266 url.generation = tracker_object.generation 3267 uploaded_components.append(url) 3268 objects_already_chosen.append(tracker_object.object_name) 3269 3270 if uploaded_components: 3271 logging.info('Found %d existing temporary components to reuse.', 3272 len(uploaded_components)) 3273 3274 return (components_to_upload, uploaded_components, 3275 existing_objects_to_delete) 3276