1# -*- coding: utf-8 -*-
2# Copyright 2014 Google Inc. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Implementation of Unix-like rsync command."""
16
17from __future__ import absolute_import
18
19import errno
20import heapq
21import io
22from itertools import islice
23import os
24import re
25import tempfile
26import textwrap
27import traceback
28import urllib
29
30from boto import config
31import crcmod
32
33from gslib import copy_helper
34from gslib.bucket_listing_ref import BucketListingObject
35from gslib.cloud_api import NotFoundException
36from gslib.command import Command
37from gslib.command import DummyArgChecker
38from gslib.command_argument import CommandArgument
39from gslib.copy_helper import CreateCopyHelperOpts
40from gslib.copy_helper import SkipUnsupportedObjectError
41from gslib.cs_api_map import ApiSelector
42from gslib.exception import CommandException
43from gslib.hashing_helper import CalculateB64EncodedCrc32cFromContents
44from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
45from gslib.hashing_helper import SLOW_CRCMOD_WARNING
46from gslib.plurality_checkable_iterator import PluralityCheckableIterator
47from gslib.sig_handling import GetCaughtSignals
48from gslib.sig_handling import RegisterSignalHandler
49from gslib.storage_url import StorageUrlFromString
50from gslib.util import GetCloudApiInstance
51from gslib.util import IsCloudSubdirPlaceholder
52from gslib.util import TEN_MIB
53from gslib.util import UsingCrcmodExtension
54from gslib.util import UTF8
55from gslib.wildcard_iterator import CreateWildcardIterator
56
57
58_SYNOPSIS = """
59  gsutil rsync [-c] [-C] [-d] [-e] [-n] [-p] [-r] [-U] [-x] src_url dst_url
60"""
61
62_DETAILED_HELP_TEXT = ("""
63<B>SYNOPSIS</B>
64""" + _SYNOPSIS + """
65
66
67<B>DESCRIPTION</B>
68  The gsutil rsync command makes the contents under dst_url the same as the
69  contents under src_url, by copying any missing files/objects, and (if the
70  -d option is specified) deleting any extra files/objects. For example, to
71  make gs://mybucket/data match the contents of the local directory "data"
72  you could do:
73
74    gsutil rsync -d data gs://mybucket/data
75
76  To recurse into directories use the -r option:
77
78    gsutil rsync -d -r data gs://mybucket/data
79
80  To copy only new/changed files without deleting extra files from
81  gs://mybucket/data leave off the -d option:
82
83    gsutil rsync -r data gs://mybucket/data
84
85  If you have a large number of objects to synchronize you might want to use the
86  gsutil -m option, to perform parallel (multi-threaded/multi-processing)
87  synchronization:
88
89    gsutil -m rsync -d -r data gs://mybucket/data
90
91  The -m option typically will provide a large performance boost if either the
92  source or destination (or both) is a cloud URL. If both source and
93  destination are file URLs the -m option will typically thrash the disk and
94  slow synchronization down.
95
96  To make the local directory "data" the same as the contents of
97  gs://mybucket/data:
98
99    gsutil rsync -d -r gs://mybucket/data data
100
101  To make the contents of gs://mybucket2 the same as gs://mybucket1:
102
103    gsutil rsync -d -r gs://mybucket1 gs://mybucket2
104
105  You can also mirror data across local directories:
106
107    gsutil rsync -d -r dir1 dir2
108
109  To mirror your content across clouds:
110
111    gsutil rsync -d -r gs://my-gs-bucket s3://my-s3-bucket
112
113  Note: If you are synchronizing a large amount of data between clouds you might
114  consider setting up a
115  `Google Compute Engine <https://cloud.google.com/products/compute-engine>`_
116  account and running gsutil there. Since cross-provider gsutil data transfers
117  flow through the machine where gsutil is running, doing this can make your
118  transfer run significantly faster than running gsutil on your local
119  workstation.
120
121
122<B>BE CAREFUL WHEN USING -d OPTION!</B>
123  The rsync -d option is very useful and commonly used, because it provides a
124  means of making the contents of a destination bucket or directory match those
125  of a source bucket or directory. However, please exercise caution when you
126  use this option: It's possible to delete large amounts of data accidentally
127  if, for example, you erroneously reverse source and destination. For example,
128  if you meant to synchronize a local directory from a bucket in the cloud but
129  instead run the command:
130
131    gsutil -m rsync -r -d ./your-dir gs://your-bucket
132
133  and your-dir is currently empty, you will quickly delete all of the objects in
134  gs://your-bucket.
135
136  You can also cause large amounts of data to be lost quickly by specifying a
137  subdirectory of the destination as the source of an rsync. For example, the
138  command:
139
140    gsutil -m rsync -r -d gs://your-bucket/data gs://your-bucket
141
142  would cause most or all of the objects in gs://your-bucket to be deleted
143  (some objects may survive if there are any with names that sort lower than
144  "data" under gs://your-bucket/data).
145
146  In addition to paying careful attention to the source and destination you
147  specify with the rsync command, there are two more safety measures your can
148  take when using gsutil rsync -d:
149
150    1. Try running the command with the rsync -n option first, to see what it
151       would do without actually performing the operations. For example, if
152       you run the command:
153
154         gsutil -m rsync -r -d -n gs://your-bucket/data gs://your-bucket
155
156       it will be immediately evident that running that command without the -n
157       option would cause many objects to be deleted.
158
159    2. Enable object versioning in your bucket, which will allow you to restore
160       objects if you accidentally delete them. For more details see
161       "gsutil help versions".
162
163
164<B>IMPACT OF BUCKET LISTING EVENTUAL CONSISTENCY</B>
165  The rsync command operates by listing the source and destination URLs, and
166  then performing copy and remove operations according to the differences
167  between these listings. Because bucket listing is eventually (not strongly)
168  consistent, if you upload new objects or delete objects from a bucket and then
169  immediately run gsutil rsync with that bucket as the source or destination,
170  it's possible the rsync command will not see the recent updates and thus
171  synchronize incorrectly. You can rerun the rsync operation again later to
172  correct the incorrect synchronization.
173
174
175<B>CHECKSUM VALIDATION AND FAILURE HANDLING</B>
176  At the end of every upload or download, the gsutil rsync command validates
177  that the checksum of the source file/object matches the checksum of the
178  destination file/object. If the checksums do not match, gsutil will delete
179  the invalid copy and print a warning message. This very rarely happens, but
180  if it does, please contact gs-team@google.com.
181
182  The rsync command will retry when failures occur, but if enough failures
183  happen during a particular copy or delete operation the command will skip that
184  object and move on. At the end of the synchronization run if any failures were
185  not successfully retried, the rsync command will report the count of failures,
186  and exit with non-zero status. At this point you can run the rsync command
187  again, and it will attempt any remaining needed copy and/or delete operations.
188
189  Note that there are cases where retrying will never succeed, such as if you
190  don't have write permission to the destination bucket or if the destination
191  path for some objects is longer than the maximum allowed length.
192
193  For more details about gsutil's retry handling, please see
194  "gsutil help retries".
195
196
197<B>CHANGE DETECTION ALGORITHM</B>
198  To determine if a file or object has changed gsutil rsync first checks whether
199  the source and destination sizes match. If they match, it next checks if their
200  checksums match, using checksums if available (see below). Unlike the Unix
201  rsync command, gsutil rsync does not use timestamps to determine if the
202  file/object changed, because the GCS API does not permit the caller to set an
203  object's timestamp (hence, timestamps of identical files/objects cannot be
204  made to match).
205
206  Checksums will not be available in two cases:
207
208  1. When synchronizing to or from a file system. By default, gsutil does not
209     checksum files, because of the slowdown caused when working with large
210     files. You can cause gsutil to checksum files by using the gsutil rsync -c
211     option, at the cost of increased local disk I/O and run time when working
212     with large files. You should consider using the -c option if your files can
213     change without changing sizes (e.g., if you have files that contain fixed
214     width data, such as timestamps).
215
216  2. When comparing composite GCS objects with objects at a cloud provider that
217     does not support CRC32C (which is the only checksum available for composite
218     objects). See 'gsutil help compose' for details about composite objects.
219
220
221<B>COPYING IN THE CLOUD AND METADATA PRESERVATION</B>
222  If both the source and destination URL are cloud URLs from the same provider,
223  gsutil copies data "in the cloud" (i.e., without downloading to and uploading
224  from the machine where you run gsutil). In addition to the performance and
225  cost advantages of doing this, copying in the cloud preserves metadata (like
226  Content-Type and Cache-Control). In contrast, when you download data from the
227  cloud it ends up in a file, which has no associated metadata. Thus, unless you
228  have some way to hold on to or re-create that metadata, synchronizing a bucket
229  to a directory in the local file system will not retain the metadata.
230
231  Note that by default, the gsutil rsync command does not copy the ACLs of
232  objects being synchronized and instead will use the default bucket ACL (see
233  "gsutil help defacl"). You can override this behavior with the -p option (see
234  OPTIONS below).
235
236
237<B>SLOW CHECKSUMS</B>
238  If you find that CRC32C checksum computation runs slowly, this is likely
239  because you don't have a compiled CRC32c on your system. Try running:
240
241    gsutil ver -l
242
243  If the output contains:
244
245    compiled crcmod: False
246
247  you are running a Python library for computing CRC32C, which is much slower
248  than using the compiled code. For information on getting a compiled CRC32C
249  implementation, see 'gsutil help crc32c'.
250
251
252<B>LIMITATIONS</B>
253  1. The gsutil rsync command doesn't make the destination object's timestamps
254     match those of the source object (it can't; timestamp setting is not
255     allowed by the GCS API).
256
257  2. The gsutil rsync command considers only the current object generations in
258     the source and destination buckets when deciding what to copy / delete. If
259     versioning is enabled in the destination bucket then gsutil rsync's
260     overwriting or deleting objects will end up creating versions, but the
261     command doesn't try to make the archived generations match in the source
262     and destination buckets.
263
264
265
266<B>OPTIONS</B>
267  -c            Causes the rsync command to compute checksums for files if the
268                size of source and destination match, and then compare
269                checksums.  This option increases local disk I/O and run time
270                if either src_url or dst_url are on the local file system.
271
272  -C            If an error occurs, continue to attempt to copy the remaining
273                files. If errors occurred, gsutil's exit status will be non-zero
274                even if this flag is set. This option is implicitly set when
275                running "gsutil -m rsync...".  Note: -C only applies to the
276                actual copying operation. If an error occurs while iterating
277                over the files in the local directory (e.g., invalid Unicode
278                file name) gsutil will print an error message and abort.
279
280  -d            Delete extra files under dst_url not found under src_url. By
281                default extra files are not deleted. Note: this option can
282                delete data quickly if you specify the wrong source/destination
283                combination. See the help section above,
284                "BE CAREFUL WHEN USING -d OPTION!".
285
286  -e            Exclude symlinks. When specified, symbolic links will be
287                ignored.
288
289  -n            Causes rsync to run in "dry run" mode, i.e., just outputting
290                what would be copied or deleted without actually doing any
291                copying/deleting.
292
293  -p            Causes ACLs to be preserved when synchronizing in the cloud.
294                Note that this option has performance and cost implications when
295                using the XML API, as it requires separate HTTP calls for
296                interacting with ACLs. The performance issue can be mitigated to
297                some degree by using gsutil -m rsync to cause parallel
298                synchronization. Also, this option only works if you have OWNER
299                access to all of the objects that are copied.
300
301                You can avoid the additional performance and cost of using
302                rsync -p if you want all objects in the destination bucket to
303                end up with the same ACL by setting a default object ACL on that
304                bucket instead of using rsync -p. See 'help gsutil defacl'.
305
306  -R, -r        Causes directories, buckets, and bucket subdirectories to be
307                synchronized recursively. If you neglect to use this option
308                gsutil will make only the top-level directory in the source
309                and destination URLs match, skipping any sub-directories.
310
311  -U            Skip objects with unsupported object types instead of failing.
312                Unsupported object types are Amazon S3 Objects in the GLACIER
313                storage class.
314
315  -x pattern    Causes files/objects matching pattern to be excluded, i.e., any
316                matching files/objects will not be copied or deleted. Note that
317                the pattern is a Python regular expression, not a wildcard (so,
318                matching any string ending in 'abc' would be specified using
319                '.*abc' rather than '*abc'). Note also that the exclude path is
320                always relative (similar to Unix rsync or tar exclude options).
321                For example, if you run the command:
322
323                  gsutil rsync -x 'data./.*\\.txt' dir gs://my-bucket
324
325                it will skip the file dir/data1/a.txt.
326
327                You can use regex alternation to specify multiple exclusions,
328                for example:
329
330                  gsutil rsync -x '.*\\.txt|.*\\.jpg' dir gs://my-bucket
331""")
332
333
334class _DiffAction(object):
335  COPY = 'copy'
336  REMOVE = 'remove'
337
338
339_NA = '-'
340_OUTPUT_BUFFER_SIZE = 64 * 1024
341_PROGRESS_REPORT_LISTING_COUNT = 10000
342
343
344# Tracks files we need to clean up at end or if interrupted.
345_tmp_files = []
346
347
348# pylint: disable=unused-argument
349def _HandleSignals(signal_num, cur_stack_frame):
350  """Called when rsync command is killed with SIGINT, SIGQUIT or SIGTERM."""
351  CleanUpTempFiles()
352
353
354def CleanUpTempFiles():
355  """Cleans up temp files.
356
357  This function allows the main (RunCommand) function to clean up at end of
358  operation, or if gsutil rsync is interrupted (e.g., via ^C). This is necessary
359  because tempfile.NamedTemporaryFile doesn't allow the created file to be
360  re-opened in read mode on Windows, so we have to use tempfile.mkstemp, which
361  doesn't automatically delete temp files.
362  """
363  try:
364    for fname in _tmp_files:
365      os.unlink(fname)
366  except:  # pylint: disable=bare-except
367    pass
368
369
370class _DiffToApply(object):
371  """Class that encapsulates info needed to apply diff for one object."""
372
373  def __init__(self, src_url_str, dst_url_str, diff_action):
374    """Constructor.
375
376    Args:
377      src_url_str: The source URL string, or None if diff_action is REMOVE.
378      dst_url_str: The destination URL string.
379      diff_action: _DiffAction to be applied.
380    """
381    self.src_url_str = src_url_str
382    self.dst_url_str = dst_url_str
383    self.diff_action = diff_action
384
385
386def _DiffToApplyArgChecker(command_instance, diff_to_apply):
387  """Arg checker that skips symlinks if -e flag specified."""
388  if (diff_to_apply.diff_action == _DiffAction.REMOVE
389      or not command_instance.exclude_symlinks):
390    # No src URL is populated for REMOVE actions.
391    return True
392  exp_src_url = StorageUrlFromString(diff_to_apply.src_url_str)
393  if exp_src_url.IsFileUrl() and os.path.islink(exp_src_url.object_name):
394    command_instance.logger.info('Skipping symbolic link %s...', exp_src_url)
395    return False
396  return True
397
398
399def _ComputeNeededFileChecksums(logger, src_url_str, src_size, src_crc32c,
400                                src_md5, dst_url_str, dst_size, dst_crc32c,
401                                dst_md5):
402  """Computes any file checksums needed by _ObjectsMatch.
403
404  Args:
405    logger: logging.logger for outputting log messages.
406    src_url_str: Source URL string.
407    src_size: Source size
408    src_crc32c: Source CRC32c.
409    src_md5: Source MD5.
410    dst_url_str: Destination URL string.
411    dst_size: Destination size
412    dst_crc32c: Destination CRC32c.
413    dst_md5: Destination MD5.
414
415  Returns:
416    (src_crc32c, src_md5, dst_crc32c, dst_md5)
417  """
418  src_url = StorageUrlFromString(src_url_str)
419  dst_url = StorageUrlFromString(dst_url_str)
420  if src_url.IsFileUrl():
421    if dst_crc32c != _NA or dst_url.IsFileUrl():
422      if src_size > TEN_MIB:
423        logger.info('Computing MD5 for %s...', src_url_str)
424      with open(src_url.object_name, 'rb') as fp:
425        src_crc32c = CalculateB64EncodedCrc32cFromContents(fp)
426    elif dst_md5 != _NA or dst_url.IsFileUrl():
427      if dst_size > TEN_MIB:
428        logger.info('Computing MD5 for %s...', dst_url_str)
429      with open(src_url.object_name, 'rb') as fp:
430        src_md5 = CalculateB64EncodedMd5FromContents(fp)
431  if dst_url.IsFileUrl():
432    if src_crc32c != _NA:
433      if src_size > TEN_MIB:
434        logger.info('Computing CRC32C for %s...', src_url_str)
435      with open(dst_url.object_name, 'rb') as fp:
436        dst_crc32c = CalculateB64EncodedCrc32cFromContents(fp)
437    elif src_md5 != _NA:
438      if dst_size > TEN_MIB:
439        logger.info('Computing CRC32C for %s...', dst_url_str)
440      with open(dst_url.object_name, 'rb') as fp:
441        dst_md5 = CalculateB64EncodedMd5FromContents(fp)
442  return (src_crc32c, src_md5, dst_crc32c, dst_md5)
443
444
445def _ListUrlRootFunc(cls, args_tuple, thread_state=None):
446  """Worker function for listing files/objects under to be sync'd.
447
448  Outputs sorted list to out_file_name, formatted per _BuildTmpOutputLine. We
449  sort the listed URLs because we don't want to depend on consistent sort
450  order across file systems and cloud providers.
451
452  Args:
453    cls: Command instance.
454    args_tuple: (base_url_str, out_file_name, desc), where base_url_str is
455                top-level URL string to list; out_filename is name of file to
456                which sorted output should be written; desc is 'source' or
457                'destination'.
458    thread_state: gsutil Cloud API instance to use.
459  """
460  gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state)
461  (base_url_str, out_filename, desc) = args_tuple
462  # We sort while iterating over base_url_str, allowing parallelism of batched
463  # sorting with collecting the listing.
464  out_file = io.open(out_filename, mode='w', encoding=UTF8)
465  try:
466    _BatchSort(_FieldedListingIterator(cls, gsutil_api, base_url_str, desc),
467               out_file)
468  except Exception as e:  # pylint: disable=broad-except
469    # Abandon rsync if an exception percolates up to this layer - retryable
470    # exceptions are handled in the lower layers, so we got a non-retryable
471    # exception (like 404 bucket not found) and proceeding would either be
472    # futile or could result in data loss - for example:
473    #     gsutil rsync -d gs://non-existent-bucket ./localdir
474    # would delete files from localdir.
475    cls.logger.error(
476        'Caught non-retryable exception while listing %s: %s' %
477        (base_url_str, e))
478    cls.non_retryable_listing_failures = 1
479  out_file.close()
480
481
482def _LocalDirIterator(base_url):
483  """A generator that yields a BLR for each file in a local directory.
484
485     We use this function instead of WildcardIterator for listing a local
486     directory without recursion, because the glob.globi implementation called
487     by WildcardIterator skips "dot" files (which we don't want to do when
488     synchronizing to or from a local directory).
489
490  Args:
491    base_url: URL for the directory over which to iterate.
492
493  Yields:
494    BucketListingObject for each file in the directory.
495  """
496  for filename in os.listdir(base_url.object_name):
497    filename = os.path.join(base_url.object_name, filename)
498    if os.path.isfile(filename):
499      yield BucketListingObject(StorageUrlFromString(filename), None)
500
501
502def _FieldedListingIterator(cls, gsutil_api, base_url_str, desc):
503  """Iterator over base_url_str formatting output per _BuildTmpOutputLine.
504
505  Args:
506    cls: Command instance.
507    gsutil_api: gsutil Cloud API instance to use for bucket listing.
508    base_url_str: The top-level URL string over which to iterate.
509    desc: 'source' or 'destination'.
510
511  Yields:
512    Output line formatted per _BuildTmpOutputLine.
513  """
514  base_url = StorageUrlFromString(base_url_str)
515  if base_url.scheme == 'file' and not cls.recursion_requested:
516    iterator = _LocalDirIterator(base_url)
517  else:
518    if cls.recursion_requested:
519      wildcard = '%s/**' % base_url_str.rstrip('/\\')
520    else:
521      wildcard = '%s/*' % base_url_str.rstrip('/\\')
522    iterator = CreateWildcardIterator(
523        wildcard, gsutil_api, debug=cls.debug,
524        project_id=cls.project_id).IterObjects(
525            # Request just the needed fields, to reduce bandwidth usage.
526            bucket_listing_fields=['crc32c', 'md5Hash', 'name', 'size'])
527
528  i = 0
529  for blr in iterator:
530    # Various GUI tools (like the GCS web console) create placeholder objects
531    # ending with '/' when the user creates an empty directory. Normally these
532    # tools should delete those placeholders once objects have been written
533    # "under" the directory, but sometimes the placeholders are left around.
534    # We need to filter them out here, otherwise if the user tries to rsync
535    # from GCS to a local directory it will result in a directory/file
536    # conflict (e.g., trying to download an object called "mydata/" where the
537    # local directory "mydata" exists).
538    url = blr.storage_url
539    if IsCloudSubdirPlaceholder(url, blr=blr):
540      # We used to output the message 'Skipping cloud sub-directory placeholder
541      # object...' but we no longer do so because it caused customer confusion.
542      continue
543    if (cls.exclude_symlinks and url.IsFileUrl()
544        and os.path.islink(url.object_name)):
545      continue
546    if cls.exclude_pattern:
547      str_to_check = url.url_string[len(base_url_str):]
548      if str_to_check.startswith(url.delim):
549        str_to_check = str_to_check[1:]
550      if cls.exclude_pattern.match(str_to_check):
551        continue
552    i += 1
553    if i % _PROGRESS_REPORT_LISTING_COUNT == 0:
554      cls.logger.info('At %s listing %d...', desc, i)
555    yield _BuildTmpOutputLine(blr)
556
557
558def _BuildTmpOutputLine(blr):
559  """Builds line to output to temp file for given BucketListingRef.
560
561  Args:
562    blr: The BucketListingRef.
563
564  Returns:
565    The output line, formatted as _EncodeUrl(URL)<sp>size<sp>crc32c<sp>md5
566    where crc32c will only be present for GCS URLs, and md5 will only be
567    present for cloud URLs that aren't composite objects. A missing field is
568    populated with '-'.
569  """
570  crc32c = _NA
571  md5 = _NA
572  url = blr.storage_url
573  if url.IsFileUrl():
574    size = os.path.getsize(url.object_name)
575  elif url.IsCloudUrl():
576    size = blr.root_object.size
577    crc32c = blr.root_object.crc32c or _NA
578    md5 = blr.root_object.md5Hash or _NA
579  else:
580    raise CommandException('Got unexpected URL type (%s)' % url.scheme)
581  return '%s %d %s %s\n' % (_EncodeUrl(url.url_string), size, crc32c, md5)
582
583
584def _EncodeUrl(url_string):
585  """Encodes url_str with quote plus encoding and UTF8 character encoding.
586
587  We use this for all URL encodings.
588
589  Args:
590    url_string: String URL to encode.
591
592  Returns:
593    encoded URL.
594  """
595  return urllib.quote_plus(url_string.encode(UTF8))
596
597
598def _DecodeUrl(enc_url_string):
599  """Inverts encoding from EncodeUrl.
600
601  Args:
602    enc_url_string: String URL to decode.
603
604  Returns:
605    decoded URL.
606  """
607  return urllib.unquote_plus(enc_url_string).decode(UTF8)
608
609
610# pylint: disable=bare-except
611def _BatchSort(in_iter, out_file):
612  """Sorts input lines from in_iter and outputs to out_file.
613
614  Sorts in batches as input arrives, so input file does not need to be loaded
615  into memory all at once. Derived from Python Recipe 466302: Sorting big
616  files the Python 2.4 way by Nicolas Lehuen.
617
618  Sorted format is per _BuildTmpOutputLine. We're sorting on the entire line
619  when we could just sort on the first record (URL); but the sort order is
620  identical either way.
621
622  Args:
623    in_iter: Input iterator.
624    out_file: Output file.
625  """
626  # Note: If chunk_files gets very large we can run out of open FDs. See .boto
627  # file comments about rsync_buffer_lines. If increasing rsync_buffer_lines
628  # doesn't suffice (e.g., for someone synchronizing with a really large
629  # bucket), an option would be to make gsutil merge in passes, never
630  # opening all chunk files simultaneously.
631  buffer_size = config.getint('GSUtil', 'rsync_buffer_lines', 32000)
632  chunk_files = []
633  try:
634    while True:
635      current_chunk = sorted(islice(in_iter, buffer_size))
636      if not current_chunk:
637        break
638      output_chunk = io.open('%s-%06i' % (out_file.name, len(chunk_files)),
639                             mode='w+', encoding=UTF8)
640      chunk_files.append(output_chunk)
641      output_chunk.writelines(unicode(''.join(current_chunk)))
642      output_chunk.flush()
643      output_chunk.seek(0)
644    out_file.writelines(heapq.merge(*chunk_files))
645  except IOError as e:
646    if e.errno == errno.EMFILE:
647      raise CommandException('\n'.join(textwrap.wrap(
648          'Synchronization failed because too many open file handles were '
649          'needed while building synchronization state. Please see the '
650          'comments about rsync_buffer_lines in your .boto config file for a '
651          'possible way to address this problem.')))
652    raise
653  finally:
654    for chunk_file in chunk_files:
655      try:
656        chunk_file.close()
657        os.remove(chunk_file.name)
658      except:
659        pass
660
661
662class _DiffIterator(object):
663  """Iterator yielding sequence of _DiffToApply objects."""
664
665  def __init__(self, command_obj, base_src_url, base_dst_url):
666    self.command_obj = command_obj
667    self.compute_file_checksums = command_obj.compute_file_checksums
668    self.delete_extras = command_obj.delete_extras
669    self.recursion_requested = command_obj.recursion_requested
670    self.logger = self.command_obj.logger
671    self.base_src_url = base_src_url
672    self.base_dst_url = base_dst_url
673    self.logger.info('Building synchronization state...')
674
675    (src_fh, self.sorted_list_src_file_name) = tempfile.mkstemp(
676        prefix='gsutil-rsync-src-')
677    _tmp_files.append(self.sorted_list_src_file_name)
678    (dst_fh, self.sorted_list_dst_file_name) = tempfile.mkstemp(
679        prefix='gsutil-rsync-dst-')
680    _tmp_files.append(self.sorted_list_dst_file_name)
681    # Close the file handles; the file will be opened in write mode by
682    # _ListUrlRootFunc.
683    os.close(src_fh)
684    os.close(dst_fh)
685
686    # Build sorted lists of src and dst URLs in parallel. To do this, pass args
687    # to _ListUrlRootFunc as tuple (base_url_str, out_filename, desc)
688    # where base_url_str is the starting URL string for listing.
689    args_iter = iter([
690        (self.base_src_url.url_string, self.sorted_list_src_file_name,
691         'source'),
692        (self.base_dst_url.url_string, self.sorted_list_dst_file_name,
693         'destination')
694    ])
695
696    # Contains error message from non-retryable listing failure.
697    command_obj.non_retryable_listing_failures = 0
698    shared_attrs = ['non_retryable_listing_failures']
699    command_obj.Apply(_ListUrlRootFunc, args_iter, _RootListingExceptionHandler,
700                      shared_attrs, arg_checker=DummyArgChecker,
701                      parallel_operations_override=True,
702                      fail_on_error=True)
703
704    if command_obj.non_retryable_listing_failures:
705      raise CommandException('Caught non-retryable exception - aborting rsync')
706
707    self.sorted_list_src_file = open(self.sorted_list_src_file_name, 'r')
708    self.sorted_list_dst_file = open(self.sorted_list_dst_file_name, 'r')
709
710    # Wrap iterators in PluralityCheckableIterator so we can check emptiness.
711    self.sorted_src_urls_it = PluralityCheckableIterator(
712        iter(self.sorted_list_src_file))
713    self.sorted_dst_urls_it = PluralityCheckableIterator(
714        iter(self.sorted_list_dst_file))
715
716  def _ParseTmpFileLine(self, line):
717    """Parses output from _BuildTmpOutputLine.
718
719    Parses into tuple:
720      (URL, size, crc32c, md5)
721    where crc32c and/or md5 can be _NA.
722
723    Args:
724      line: The line to parse.
725
726    Returns:
727      Parsed tuple: (url, size, crc32c, md5)
728    """
729    (encoded_url, size, crc32c, md5) = line.split()
730    return (_DecodeUrl(encoded_url), int(size), crc32c, md5.strip())
731
732  def _WarnIfMissingCloudHash(self, url_str, crc32c, md5):
733    """Warns if given url_str is a cloud URL and is missing both crc32c and md5.
734
735    Args:
736      url_str: Destination URL string.
737      crc32c: Destination CRC32c.
738      md5: Destination MD5.
739
740    Returns:
741      True if issued warning.
742    """
743    # One known way this can currently happen is when rsync'ing objects larger
744    # than 5 GB from S3 (for which the etag is not an MD5).
745    if (StorageUrlFromString(url_str).IsCloudUrl()
746        and crc32c == _NA and md5 == _NA):
747      self.logger.warn(
748          'Found no hashes to validate %s. Integrity cannot be assured without '
749          'hashes.', url_str)
750      return True
751    return False
752
753  def _ObjectsMatch(self, src_url_str, src_size, src_crc32c, src_md5,
754                    dst_url_str, dst_size, dst_crc32c, dst_md5):
755    """Returns True if src and dst objects are the same.
756
757    Uses size plus whatever checksums are available.
758
759    Args:
760      src_url_str: Source URL string.
761      src_size: Source size
762      src_crc32c: Source CRC32c.
763      src_md5: Source MD5.
764      dst_url_str: Destination URL string.
765      dst_size: Destination size
766      dst_crc32c: Destination CRC32c.
767      dst_md5: Destination MD5.
768
769    Returns:
770      True/False.
771    """
772    # Note: This function is called from __iter__, which is called from the
773    # Command.Apply driver. Thus, all checksum computation will be run in a
774    # single thread, which is good (having multiple threads concurrently
775    # computing checksums would thrash the disk).
776    if src_size != dst_size:
777      return False
778    if self.compute_file_checksums:
779      (src_crc32c, src_md5, dst_crc32c, dst_md5) = _ComputeNeededFileChecksums(
780          self.logger, src_url_str, src_size, src_crc32c, src_md5, dst_url_str,
781          dst_size, dst_crc32c, dst_md5)
782    if src_md5 != _NA and dst_md5 != _NA:
783      self.logger.debug('Comparing md5 for %s and %s', src_url_str, dst_url_str)
784      return src_md5 == dst_md5
785    if src_crc32c != _NA and dst_crc32c != _NA:
786      self.logger.debug(
787          'Comparing crc32c for %s and %s', src_url_str, dst_url_str)
788      return src_crc32c == dst_crc32c
789    if not self._WarnIfMissingCloudHash(src_url_str, src_crc32c, src_md5):
790      self._WarnIfMissingCloudHash(dst_url_str, dst_crc32c, dst_md5)
791    # Without checksums to compare we depend only on basic size comparison.
792    return True
793
794  def __iter__(self):
795    """Iterates over src/dst URLs and produces a _DiffToApply sequence.
796
797    Yields:
798      The _DiffToApply.
799    """
800    # Strip trailing slashes, if any, so we compute tail length against
801    # consistent position regardless of whether trailing slashes were included
802    # or not in URL.
803    base_src_url_len = len(self.base_src_url.url_string.rstrip('/\\'))
804    base_dst_url_len = len(self.base_dst_url.url_string.rstrip('/\\'))
805    src_url_str = dst_url_str = None
806    # Invariant: After each yield, the URLs in src_url_str, dst_url_str,
807    # self.sorted_src_urls_it, and self.sorted_dst_urls_it are not yet
808    # processed. Each time we encounter None in src_url_str or dst_url_str we
809    # populate from the respective iterator, and we reset one or the other value
810    # to None after yielding an action that disposes of that URL.
811    while not self.sorted_src_urls_it.IsEmpty() or src_url_str is not None:
812      if src_url_str is None:
813        (src_url_str, src_size, src_crc32c, src_md5) = self._ParseTmpFileLine(
814            self.sorted_src_urls_it.next())
815        # Skip past base URL and normalize slashes so we can compare across
816        # clouds/file systems (including Windows).
817        src_url_str_to_check = _EncodeUrl(
818            src_url_str[base_src_url_len:].replace('\\', '/'))
819        dst_url_str_would_copy_to = copy_helper.ConstructDstUrl(
820            self.base_src_url, StorageUrlFromString(src_url_str), True, True,
821            self.base_dst_url, False, self.recursion_requested).url_string
822      if self.sorted_dst_urls_it.IsEmpty():
823        # We've reached end of dst URLs, so copy src to dst.
824        yield _DiffToApply(
825            src_url_str, dst_url_str_would_copy_to, _DiffAction.COPY)
826        src_url_str = None
827        continue
828      if not dst_url_str:
829        (dst_url_str, dst_size, dst_crc32c, dst_md5) = (
830            self._ParseTmpFileLine(self.sorted_dst_urls_it.next()))
831        # Skip past base URL and normalize slashes so we can compare acros
832        # clouds/file systems (including Windows).
833        dst_url_str_to_check = _EncodeUrl(
834            dst_url_str[base_dst_url_len:].replace('\\', '/'))
835
836      if src_url_str_to_check < dst_url_str_to_check:
837        # There's no dst object corresponding to src object, so copy src to dst.
838        yield _DiffToApply(
839            src_url_str, dst_url_str_would_copy_to, _DiffAction.COPY)
840        src_url_str = None
841      elif src_url_str_to_check > dst_url_str_to_check:
842        # dst object without a corresponding src object, so remove dst if -d
843        # option was specified.
844        if self.delete_extras:
845          yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE)
846        dst_url_str = None
847      else:
848        # There is a dst object corresponding to src object, so check if objects
849        # match.
850        if self._ObjectsMatch(
851            src_url_str, src_size, src_crc32c, src_md5,
852            dst_url_str, dst_size, dst_crc32c, dst_md5):
853          # Continue iterating without yielding a _DiffToApply.
854          pass
855        else:
856          yield _DiffToApply(src_url_str, dst_url_str, _DiffAction.COPY)
857        src_url_str = None
858        dst_url_str = None
859
860    # If -d option specified any files/objects left in dst iteration should be
861    # removed.
862    if not self.delete_extras:
863      return
864    if dst_url_str:
865      yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE)
866      dst_url_str = None
867    for line in self.sorted_dst_urls_it:
868      (dst_url_str, _, _, _) = self._ParseTmpFileLine(line)
869      yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE)
870
871
872def _RsyncFunc(cls, diff_to_apply, thread_state=None):
873  """Worker function for performing the actual copy and remove operations."""
874  gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state)
875  dst_url_str = diff_to_apply.dst_url_str
876  dst_url = StorageUrlFromString(dst_url_str)
877  if diff_to_apply.diff_action == _DiffAction.REMOVE:
878    if cls.dryrun:
879      cls.logger.info('Would remove %s', dst_url)
880    else:
881      cls.logger.info('Removing %s', dst_url)
882      if dst_url.IsFileUrl():
883        os.unlink(dst_url.object_name)
884      else:
885        try:
886          gsutil_api.DeleteObject(
887              dst_url.bucket_name, dst_url.object_name,
888              generation=dst_url.generation, provider=dst_url.scheme)
889        except NotFoundException:
890          # If the object happened to be deleted by an external process, this
891          # is fine because it moves us closer to the desired state.
892          pass
893  elif diff_to_apply.diff_action == _DiffAction.COPY:
894    src_url_str = diff_to_apply.src_url_str
895    src_url = StorageUrlFromString(src_url_str)
896    if cls.dryrun:
897      cls.logger.info('Would copy %s to %s', src_url, dst_url)
898    else:
899      try:
900        copy_helper.PerformCopy(cls.logger, src_url, dst_url, gsutil_api, cls,
901                                _RsyncExceptionHandler,
902                                headers=cls.headers)
903      except SkipUnsupportedObjectError, e:
904        cls.logger.info('Skipping item %s with unsupported object type %s',
905                        src_url, e.unsupported_type)
906
907  else:
908    raise CommandException('Got unexpected DiffAction (%d)'
909                           % diff_to_apply.diff_action)
910
911
912def _RootListingExceptionHandler(cls, e):
913  """Simple exception handler for exceptions during listing URLs to sync."""
914  cls.logger.error(str(e))
915
916
917def _RsyncExceptionHandler(cls, e):
918  """Simple exception handler to allow post-completion status."""
919  cls.logger.error(str(e))
920  cls.op_failure_count += 1
921  cls.logger.debug('\n\nEncountered exception while syncing:\n%s\n',
922                   traceback.format_exc())
923
924
925class RsyncCommand(Command):
926  """Implementation of gsutil rsync command."""
927
928  # Command specification. See base class for documentation.
929  command_spec = Command.CreateCommandSpec(
930      'rsync',
931      command_name_aliases=[],
932      usage_synopsis=_SYNOPSIS,
933      min_args=2,
934      max_args=2,
935      supported_sub_args='cCdenprRUx:',
936      file_url_ok=True,
937      provider_url_ok=False,
938      urls_start_arg=0,
939      gs_api_support=[ApiSelector.XML, ApiSelector.JSON],
940      gs_default_api=ApiSelector.JSON,
941      argparse_arguments=[
942          CommandArgument.MakeNCloudOrFileURLsArgument(2)
943      ]
944  )
945  # Help specification. See help_provider.py for documentation.
946  help_spec = Command.HelpSpec(
947      help_name='rsync',
948      help_name_aliases=['sync', 'synchronize'],
949      help_type='command_help',
950      help_one_line_summary='Synchronize content of two buckets/directories',
951      help_text=_DETAILED_HELP_TEXT,
952      subcommand_help_text={},
953  )
954  total_bytes_transferred = 0
955
956  def _InsistContainer(self, url_str, treat_nonexistent_object_as_subdir):
957    """Sanity checks that URL names an existing container.
958
959    Args:
960      url_str: URL string to check.
961      treat_nonexistent_object_as_subdir: indicates if should treat a
962                                          non-existent object as a subdir.
963
964    Returns:
965      URL for checked string.
966
967    Raises:
968      CommandException if url_str doesn't name an existing container.
969    """
970    (url, have_existing_container) = (
971        copy_helper.ExpandUrlToSingleBlr(url_str, self.gsutil_api, self.debug,
972                                         self.project_id,
973                                         treat_nonexistent_object_as_subdir))
974    if not have_existing_container:
975      raise CommandException(
976          'arg (%s) does not name a directory, bucket, or bucket subdir.'
977          % url_str)
978    return url
979
980  def RunCommand(self):
981    """Command entry point for the rsync command."""
982    self._ParseOpts()
983    if self.compute_file_checksums and not UsingCrcmodExtension(crcmod):
984      self.logger.warn(SLOW_CRCMOD_WARNING)
985
986    src_url = self._InsistContainer(self.args[0], False)
987    dst_url = self._InsistContainer(self.args[1], True)
988
989    # Tracks if any copy or rm operations failed.
990    self.op_failure_count = 0
991
992    # List of attributes to share/manage across multiple processes in
993    # parallel (-m) mode.
994    shared_attrs = ['op_failure_count']
995
996    for signal_num in GetCaughtSignals():
997      RegisterSignalHandler(signal_num, _HandleSignals)
998
999    # Perform sync requests in parallel (-m) mode, if requested, using
1000    # configured number of parallel processes and threads. Otherwise,
1001    # perform requests with sequential function calls in current process.
1002    diff_iterator = _DiffIterator(self, src_url, dst_url)
1003    self.logger.info('Starting synchronization')
1004    try:
1005      self.Apply(_RsyncFunc, diff_iterator, _RsyncExceptionHandler,
1006                 shared_attrs, arg_checker=_DiffToApplyArgChecker,
1007                 fail_on_error=True)
1008    finally:
1009      CleanUpTempFiles()
1010
1011    if self.op_failure_count:
1012      plural_str = 's' if self.op_failure_count else ''
1013      raise CommandException(
1014          '%d file%s/object%s could not be copied/removed.' %
1015          (self.op_failure_count, plural_str, plural_str))
1016
1017  def _ParseOpts(self):
1018    # exclude_symlinks is handled by Command parent class, so save in Command
1019    # state rather than CopyHelperOpts.
1020    self.exclude_symlinks = False
1021    # continue_on_error is handled by Command parent class, so save in Command
1022    # state rather than CopyHelperOpts.
1023    self.continue_on_error = False
1024    self.delete_extras = False
1025    preserve_acl = False
1026    self.compute_file_checksums = False
1027    self.dryrun = False
1028    self.exclude_pattern = None
1029    self.skip_unsupported_objects = False
1030    # self.recursion_requested is initialized in command.py (so it can be
1031    # checked in parent class for all commands).
1032
1033    if self.sub_opts:
1034      for o, a in self.sub_opts:
1035        if o == '-c':
1036          self.compute_file_checksums = True
1037        # Note: In gsutil cp command this is specified using -c but here we use
1038        # -C so we can use -c for checksum arg (to be consistent with Unix rsync
1039        # command options).
1040        elif o == '-C':
1041          self.continue_on_error = True
1042        elif o == '-d':
1043          self.delete_extras = True
1044        elif o == '-e':
1045          self.exclude_symlinks = True
1046        elif o == '-n':
1047          self.dryrun = True
1048        elif o == '-p':
1049          preserve_acl = True
1050        elif o == '-r' or o == '-R':
1051          self.recursion_requested = True
1052        elif o == '-U':
1053          self.skip_unsupported_objects = True
1054        elif o == '-x':
1055          if not a:
1056            raise CommandException('Invalid blank exclude filter')
1057          try:
1058            self.exclude_pattern = re.compile(a)
1059          except re.error:
1060            raise CommandException('Invalid exclude filter (%s)' % a)
1061    return CreateCopyHelperOpts(
1062        preserve_acl=preserve_acl,
1063        skip_unsupported_objects=self.skip_unsupported_objects)
1064