1#!/usr/bin/env python
2"""Upload and download support for apitools."""
3from __future__ import print_function
4
5import email.generator as email_generator
6import email.mime.multipart as mime_multipart
7import email.mime.nonmultipart as mime_nonmultipart
8import io
9import json
10import mimetypes
11import os
12import threading
13
14import six
15from six.moves import http_client
16
17from apitools.base.py import buffered_stream
18from apitools.base.py import exceptions
19from apitools.base.py import http_wrapper
20from apitools.base.py import stream_slice
21from apitools.base.py import util
22
23__all__ = [
24    'Download',
25    'Upload',
26    'RESUMABLE_UPLOAD',
27    'SIMPLE_UPLOAD',
28    'DownloadProgressPrinter',
29    'DownloadCompletePrinter',
30    'UploadProgressPrinter',
31    'UploadCompletePrinter',
32]
33
34_RESUMABLE_UPLOAD_THRESHOLD = 5 << 20
35SIMPLE_UPLOAD = 'simple'
36RESUMABLE_UPLOAD = 'resumable'
37
38
39def DownloadProgressPrinter(response, unused_download):
40    """Print download progress based on response."""
41    if 'content-range' in response.info:
42        print('Received %s' % response.info['content-range'])
43    else:
44        print('Received %d bytes' % response.length)
45
46
47def DownloadCompletePrinter(unused_response, unused_download):
48    """Print information about a completed download."""
49    print('Download complete')
50
51
52def UploadProgressPrinter(response, unused_upload):
53    """Print upload progress based on response."""
54    print('Sent %s' % response.info['range'])
55
56
57def UploadCompletePrinter(unused_response, unused_upload):
58    """Print information about a completed upload."""
59    print('Upload complete')
60
61
62class _Transfer(object):
63
64    """Generic bits common to Uploads and Downloads."""
65
66    def __init__(self, stream, close_stream=False, chunksize=None,
67                 auto_transfer=True, http=None, num_retries=5):
68        self.__bytes_http = None
69        self.__close_stream = close_stream
70        self.__http = http
71        self.__stream = stream
72        self.__url = None
73
74        self.__num_retries = 5
75        # Let the @property do validation
76        self.num_retries = num_retries
77
78        self.retry_func = (
79            http_wrapper.HandleExceptionsAndRebuildHttpConnections)
80        self.auto_transfer = auto_transfer
81        self.chunksize = chunksize or 1048576
82
83    def __repr__(self):
84        return str(self)
85
86    @property
87    def close_stream(self):
88        return self.__close_stream
89
90    @property
91    def http(self):
92        return self.__http
93
94    @property
95    def bytes_http(self):
96        return self.__bytes_http or self.http
97
98    @bytes_http.setter
99    def bytes_http(self, value):
100        self.__bytes_http = value
101
102    @property
103    def num_retries(self):
104        return self.__num_retries
105
106    @num_retries.setter
107    def num_retries(self, value):
108        util.Typecheck(value, six.integer_types)
109        if value < 0:
110            raise exceptions.InvalidDataError(
111                'Cannot have negative value for num_retries')
112        self.__num_retries = value
113
114    @property
115    def stream(self):
116        return self.__stream
117
118    @property
119    def url(self):
120        return self.__url
121
122    def _Initialize(self, http, url):
123        """Initialize this download by setting self.http and self.url.
124
125        We want the user to be able to override self.http by having set
126        the value in the constructor; in that case, we ignore the provided
127        http.
128
129        Args:
130          http: An httplib2.Http instance or None.
131          url: The url for this transfer.
132
133        Returns:
134          None. Initializes self.
135        """
136        self.EnsureUninitialized()
137        if self.http is None:
138            self.__http = http or http_wrapper.GetHttp()
139        self.__url = url
140
141    @property
142    def initialized(self):
143        return self.url is not None and self.http is not None
144
145    @property
146    def _type_name(self):
147        return type(self).__name__
148
149    def EnsureInitialized(self):
150        if not self.initialized:
151            raise exceptions.TransferInvalidError(
152                'Cannot use uninitialized %s', self._type_name)
153
154    def EnsureUninitialized(self):
155        if self.initialized:
156            raise exceptions.TransferInvalidError(
157                'Cannot re-initialize %s', self._type_name)
158
159    def __del__(self):
160        if self.__close_stream:
161            self.__stream.close()
162
163    def _ExecuteCallback(self, callback, response):
164        # TODO(craigcitro): Push these into a queue.
165        if callback is not None:
166            threading.Thread(target=callback, args=(response, self)).start()
167
168
169class Download(_Transfer):
170
171    """Data for a single download.
172
173    Public attributes:
174      chunksize: default chunksize to use for transfers.
175    """
176    _ACCEPTABLE_STATUSES = set((
177        http_client.OK,
178        http_client.NO_CONTENT,
179        http_client.PARTIAL_CONTENT,
180        http_client.REQUESTED_RANGE_NOT_SATISFIABLE,
181    ))
182    _REQUIRED_SERIALIZATION_KEYS = set((
183        'auto_transfer', 'progress', 'total_size', 'url'))
184
185    def __init__(self, stream, progress_callback=None, finish_callback=None,
186                 **kwds):
187        total_size = kwds.pop('total_size', None)
188        super(Download, self).__init__(stream, **kwds)
189        self.__initial_response = None
190        self.__progress = 0
191        self.__total_size = total_size
192        self.__encoding = None
193
194        self.progress_callback = progress_callback
195        self.finish_callback = finish_callback
196
197    @property
198    def progress(self):
199        return self.__progress
200
201    @property
202    def encoding(self):
203        return self.__encoding
204
205    @classmethod
206    def FromFile(cls, filename, overwrite=False, auto_transfer=True, **kwds):
207        """Create a new download object from a filename."""
208        path = os.path.expanduser(filename)
209        if os.path.exists(path) and not overwrite:
210            raise exceptions.InvalidUserInputError(
211                'File %s exists and overwrite not specified' % path)
212        return cls(open(path, 'wb'), close_stream=True,
213                   auto_transfer=auto_transfer, **kwds)
214
215    @classmethod
216    def FromStream(cls, stream, auto_transfer=True, total_size=None, **kwds):
217        """Create a new Download object from a stream."""
218        return cls(stream, auto_transfer=auto_transfer, total_size=total_size,
219                   **kwds)
220
221    @classmethod
222    def FromData(cls, stream, json_data, http=None, auto_transfer=None,
223                 **kwds):
224        """Create a new Download object from a stream and serialized data."""
225        info = json.loads(json_data)
226        missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys())
227        if missing_keys:
228            raise exceptions.InvalidDataError(
229                'Invalid serialization data, missing keys: %s' % (
230                    ', '.join(missing_keys)))
231        download = cls.FromStream(stream, **kwds)
232        if auto_transfer is not None:
233            download.auto_transfer = auto_transfer
234        else:
235            download.auto_transfer = info['auto_transfer']
236        setattr(download, '_Download__progress', info['progress'])
237        setattr(download, '_Download__total_size', info['total_size'])
238        download._Initialize(  # pylint: disable=protected-access
239            http, info['url'])
240        return download
241
242    @property
243    def serialization_data(self):
244        self.EnsureInitialized()
245        return {
246            'auto_transfer': self.auto_transfer,
247            'progress': self.progress,
248            'total_size': self.total_size,
249            'url': self.url,
250        }
251
252    @property
253    def total_size(self):
254        return self.__total_size
255
256    def __str__(self):
257        if not self.initialized:
258            return 'Download (uninitialized)'
259        else:
260            return 'Download with %d/%s bytes transferred from url %s' % (
261                self.progress, self.total_size, self.url)
262
263    def ConfigureRequest(self, http_request, url_builder):
264        url_builder.query_params['alt'] = 'media'
265        # TODO(craigcitro): We need to send range requests because by
266        # default httplib2 stores entire reponses in memory. Override
267        # httplib2's download method (as gsutil does) so that this is not
268        # necessary.
269        http_request.headers['Range'] = 'bytes=0-%d' % (self.chunksize - 1,)
270
271    def __SetTotal(self, info):
272        if 'content-range' in info:
273            _, _, total = info['content-range'].rpartition('/')
274            if total != '*':
275                self.__total_size = int(total)
276        # Note "total_size is None" means we don't know it; if no size
277        # info was returned on our initial range request, that means we
278        # have a 0-byte file. (That last statement has been verified
279        # empirically, but is not clearly documented anywhere.)
280        if self.total_size is None:
281            self.__total_size = 0
282
283    def InitializeDownload(self, http_request, http=None, client=None):
284        """Initialize this download by making a request.
285
286        Args:
287          http_request: The HttpRequest to use to initialize this download.
288          http: The httplib2.Http instance for this request.
289          client: If provided, let this client process the final URL before
290              sending any additional requests. If client is provided and
291              http is not, client.http will be used instead.
292        """
293        self.EnsureUninitialized()
294        if http is None and client is None:
295            raise exceptions.UserError('Must provide client or http.')
296        http = http or client.http
297        if client is not None:
298            http_request.url = client.FinalizeTransferUrl(http_request.url)
299        url = http_request.url
300        if self.auto_transfer:
301            end_byte = self.__ComputeEndByte(0)
302            self.__SetRangeHeader(http_request, 0, end_byte)
303            response = http_wrapper.MakeRequest(
304                self.bytes_http or http, http_request)
305            if response.status_code not in self._ACCEPTABLE_STATUSES:
306                raise exceptions.HttpError.FromResponse(response)
307            self.__initial_response = response
308            self.__SetTotal(response.info)
309            url = response.info.get('content-location', response.request_url)
310        if client is not None:
311            url = client.FinalizeTransferUrl(url)
312        self._Initialize(http, url)
313        # Unless the user has requested otherwise, we want to just
314        # go ahead and pump the bytes now.
315        if self.auto_transfer:
316            self.StreamInChunks()
317
318    def __NormalizeStartEnd(self, start, end=None):
319        if end is not None:
320            if start < 0:
321                raise exceptions.TransferInvalidError(
322                    'Cannot have end index with negative start index')
323            elif start >= self.total_size:
324                raise exceptions.TransferInvalidError(
325                    'Cannot have start index greater than total size')
326            end = min(end, self.total_size - 1)
327            if end < start:
328                raise exceptions.TransferInvalidError(
329                    'Range requested with end[%s] < start[%s]' % (end, start))
330            return start, end
331        else:
332            if start < 0:
333                start = max(0, start + self.total_size)
334            return start, self.total_size - 1
335
336    def __SetRangeHeader(self, request, start, end=None):
337        if start < 0:
338            request.headers['range'] = 'bytes=%d' % start
339        elif end is None:
340            request.headers['range'] = 'bytes=%d-' % start
341        else:
342            request.headers['range'] = 'bytes=%d-%d' % (start, end)
343
344    def __ComputeEndByte(self, start, end=None, use_chunks=True):
345        """Compute the last byte to fetch for this request.
346
347        This is all based on the HTTP spec for Range and
348        Content-Range.
349
350        Note that this is potentially confusing in several ways:
351          * the value for the last byte is 0-based, eg "fetch 10 bytes
352            from the beginning" would return 9 here.
353          * if we have no information about size, and don't want to
354            use the chunksize, we'll return None.
355        See the tests for more examples.
356
357        Args:
358          start: byte to start at.
359          end: (int or None, default: None) Suggested last byte.
360          use_chunks: (bool, default: True) If False, ignore self.chunksize.
361
362        Returns:
363          Last byte to use in a Range header, or None.
364
365        """
366        end_byte = end
367
368        if start < 0 and not self.total_size:
369            return end_byte
370
371        if use_chunks:
372            alternate = start + self.chunksize - 1
373            if end_byte is not None:
374                end_byte = min(end_byte, alternate)
375            else:
376                end_byte = alternate
377
378        if self.total_size:
379            alternate = self.total_size - 1
380            if end_byte is not None:
381                end_byte = min(end_byte, alternate)
382            else:
383                end_byte = alternate
384
385        return end_byte
386
387    def __GetChunk(self, start, end, additional_headers=None):
388        """Retrieve a chunk, and return the full response."""
389        self.EnsureInitialized()
390        request = http_wrapper.Request(url=self.url)
391        self.__SetRangeHeader(request, start, end=end)
392        if additional_headers is not None:
393            request.headers.update(additional_headers)
394        return http_wrapper.MakeRequest(
395            self.bytes_http, request, retry_func=self.retry_func,
396            retries=self.num_retries)
397
398    def __ProcessResponse(self, response):
399        """Process response (by updating self and writing to self.stream)."""
400        if response.status_code not in self._ACCEPTABLE_STATUSES:
401            # We distinguish errors that mean we made a mistake in setting
402            # up the transfer versus something we should attempt again.
403            if response.status_code in (http_client.FORBIDDEN,
404                                        http_client.NOT_FOUND):
405                raise exceptions.HttpError.FromResponse(response)
406            else:
407                raise exceptions.TransferRetryError(response.content)
408        if response.status_code in (http_client.OK,
409                                    http_client.PARTIAL_CONTENT):
410            self.stream.write(response.content)
411            self.__progress += response.length
412            if response.info and 'content-encoding' in response.info:
413                # TODO(craigcitro): Handle the case where this changes over a
414                # download.
415                self.__encoding = response.info['content-encoding']
416        elif response.status_code == http_client.NO_CONTENT:
417            # It's important to write something to the stream for the case
418            # of a 0-byte download to a file, as otherwise python won't
419            # create the file.
420            self.stream.write('')
421        return response
422
423    def GetRange(self, start, end=None, additional_headers=None,
424                 use_chunks=True):
425        """Retrieve a given byte range from this download, inclusive.
426
427        Range must be of one of these three forms:
428        * 0 <= start, end = None: Fetch from start to the end of the file.
429        * 0 <= start <= end: Fetch the bytes from start to end.
430        * start < 0, end = None: Fetch the last -start bytes of the file.
431
432        (These variations correspond to those described in the HTTP 1.1
433        protocol for range headers in RFC 2616, sec. 14.35.1.)
434
435        Args:
436          start: (int) Where to start fetching bytes. (See above.)
437          end: (int, optional) Where to stop fetching bytes. (See above.)
438          additional_headers: (bool, optional) Any additional headers to
439              pass with the request.
440          use_chunks: (bool, default: True) If False, ignore self.chunksize
441              and fetch this range in a single request.
442
443        Returns:
444          None. Streams bytes into self.stream.
445        """
446        self.EnsureInitialized()
447        progress_end_normalized = False
448        if self.total_size is not None:
449            progress, end_byte = self.__NormalizeStartEnd(start, end)
450            progress_end_normalized = True
451        else:
452            progress = start
453            end_byte = end
454        while (not progress_end_normalized or end_byte is None or
455               progress <= end_byte):
456            end_byte = self.__ComputeEndByte(progress, end=end_byte,
457                                             use_chunks=use_chunks)
458            response = self.__GetChunk(progress, end_byte,
459                                       additional_headers=additional_headers)
460            if not progress_end_normalized:
461                self.__SetTotal(response.info)
462                progress, end_byte = self.__NormalizeStartEnd(start, end)
463                progress_end_normalized = True
464            response = self.__ProcessResponse(response)
465            progress += response.length
466            if response.length == 0:
467                raise exceptions.TransferRetryError(
468                    'Zero bytes unexpectedly returned in download response')
469
470    def StreamInChunks(self, callback=None, finish_callback=None,
471                       additional_headers=None):
472        """Stream the entire download in chunks."""
473        self.StreamMedia(callback=callback, finish_callback=finish_callback,
474                         additional_headers=additional_headers,
475                         use_chunks=True)
476
477    def StreamMedia(self, callback=None, finish_callback=None,
478                    additional_headers=None, use_chunks=True):
479        """Stream the entire download.
480
481        Args:
482          callback: (default: None) Callback to call as each chunk is
483              completed.
484          finish_callback: (default: None) Callback to call when the
485              download is complete.
486          additional_headers: (default: None) Additional headers to
487              include in fetching bytes.
488          use_chunks: (bool, default: True) If False, ignore self.chunksize
489              and stream this download in a single request.
490
491        Returns:
492            None. Streams bytes into self.stream.
493        """
494        callback = callback or self.progress_callback
495        finish_callback = finish_callback or self.finish_callback
496
497        self.EnsureInitialized()
498        while True:
499            if self.__initial_response is not None:
500                response = self.__initial_response
501                self.__initial_response = None
502            else:
503                end_byte = self.__ComputeEndByte(self.progress,
504                                                 use_chunks=use_chunks)
505                response = self.__GetChunk(
506                    self.progress, end_byte,
507                    additional_headers=additional_headers)
508            if self.total_size is None:
509                self.__SetTotal(response.info)
510            response = self.__ProcessResponse(response)
511            self._ExecuteCallback(callback, response)
512            if (response.status_code == http_client.OK or
513                    self.progress >= self.total_size):
514                break
515        self._ExecuteCallback(finish_callback, response)
516
517
518class Upload(_Transfer):
519
520    """Data for a single Upload.
521
522    Fields:
523      stream: The stream to upload.
524      mime_type: MIME type of the upload.
525      total_size: (optional) Total upload size for the stream.
526      close_stream: (default: False) Whether or not we should close the
527          stream when finished with the upload.
528      auto_transfer: (default: True) If True, stream all bytes as soon as
529          the upload is created.
530    """
531    _REQUIRED_SERIALIZATION_KEYS = set((
532        'auto_transfer', 'mime_type', 'total_size', 'url'))
533
534    def __init__(self, stream, mime_type, total_size=None, http=None,
535                 close_stream=False, chunksize=None, auto_transfer=True,
536                 progress_callback=None, finish_callback=None,
537                 **kwds):
538        super(Upload, self).__init__(
539            stream, close_stream=close_stream, chunksize=chunksize,
540            auto_transfer=auto_transfer, http=http, **kwds)
541        self.__complete = False
542        self.__final_response = None
543        self.__mime_type = mime_type
544        self.__progress = 0
545        self.__server_chunk_granularity = None
546        self.__strategy = None
547        self.__total_size = None
548
549        self.progress_callback = progress_callback
550        self.finish_callback = finish_callback
551        self.total_size = total_size
552
553    @property
554    def progress(self):
555        return self.__progress
556
557    @classmethod
558    def FromFile(cls, filename, mime_type=None, auto_transfer=True, **kwds):
559        """Create a new Upload object from a filename."""
560        path = os.path.expanduser(filename)
561        if not os.path.exists(path):
562            raise exceptions.NotFoundError('Could not find file %s' % path)
563        if not mime_type:
564            mime_type, _ = mimetypes.guess_type(path)
565            if mime_type is None:
566                raise exceptions.InvalidUserInputError(
567                    'Could not guess mime type for %s' % path)
568        size = os.stat(path).st_size
569        return cls(open(path, 'rb'), mime_type, total_size=size,
570                   close_stream=True, auto_transfer=auto_transfer, **kwds)
571
572    @classmethod
573    def FromStream(cls, stream, mime_type, total_size=None, auto_transfer=True,
574                   **kwds):
575        """Create a new Upload object from a stream."""
576        if mime_type is None:
577            raise exceptions.InvalidUserInputError(
578                'No mime_type specified for stream')
579        return cls(stream, mime_type, total_size=total_size,
580                   close_stream=False, auto_transfer=auto_transfer, **kwds)
581
582    @classmethod
583    def FromData(cls, stream, json_data, http, auto_transfer=None, **kwds):
584        """Create a new Upload of stream from serialized json_data and http."""
585        info = json.loads(json_data)
586        missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys())
587        if missing_keys:
588            raise exceptions.InvalidDataError(
589                'Invalid serialization data, missing keys: %s' % (
590                    ', '.join(missing_keys)))
591        if 'total_size' in kwds:
592            raise exceptions.InvalidUserInputError(
593                'Cannot override total_size on serialized Upload')
594        upload = cls.FromStream(stream, info['mime_type'],
595                                total_size=info.get('total_size'), **kwds)
596        if isinstance(stream, io.IOBase) and not stream.seekable():
597            raise exceptions.InvalidUserInputError(
598                'Cannot restart resumable upload on non-seekable stream')
599        if auto_transfer is not None:
600            upload.auto_transfer = auto_transfer
601        else:
602            upload.auto_transfer = info['auto_transfer']
603        upload.strategy = RESUMABLE_UPLOAD
604        upload._Initialize(  # pylint: disable=protected-access
605            http, info['url'])
606        upload.RefreshResumableUploadState()
607        upload.EnsureInitialized()
608        if upload.auto_transfer:
609            upload.StreamInChunks()
610        return upload
611
612    @property
613    def serialization_data(self):
614        self.EnsureInitialized()
615        if self.strategy != RESUMABLE_UPLOAD:
616            raise exceptions.InvalidDataError(
617                'Serialization only supported for resumable uploads')
618        return {
619            'auto_transfer': self.auto_transfer,
620            'mime_type': self.mime_type,
621            'total_size': self.total_size,
622            'url': self.url,
623        }
624
625    @property
626    def complete(self):
627        return self.__complete
628
629    @property
630    def mime_type(self):
631        return self.__mime_type
632
633    def __str__(self):
634        if not self.initialized:
635            return 'Upload (uninitialized)'
636        else:
637            return 'Upload with %d/%s bytes transferred for url %s' % (
638                self.progress, self.total_size or '???', self.url)
639
640    @property
641    def strategy(self):
642        return self.__strategy
643
644    @strategy.setter
645    def strategy(self, value):
646        if value not in (SIMPLE_UPLOAD, RESUMABLE_UPLOAD):
647            raise exceptions.UserError((
648                'Invalid value "%s" for upload strategy, must be one of '
649                '"simple" or "resumable".') % value)
650        self.__strategy = value
651
652    @property
653    def total_size(self):
654        return self.__total_size
655
656    @total_size.setter
657    def total_size(self, value):
658        self.EnsureUninitialized()
659        self.__total_size = value
660
661    def __SetDefaultUploadStrategy(self, upload_config, http_request):
662        """Determine and set the default upload strategy for this upload.
663
664        We generally prefer simple or multipart, unless we're forced to
665        use resumable. This happens when any of (1) the upload is too
666        large, (2) the simple endpoint doesn't support multipart requests
667        and we have metadata, or (3) there is no simple upload endpoint.
668
669        Args:
670          upload_config: Configuration for the upload endpoint.
671          http_request: The associated http request.
672
673        Returns:
674          None.
675        """
676        if upload_config.resumable_path is None:
677            self.strategy = SIMPLE_UPLOAD
678        if self.strategy is not None:
679            return
680        strategy = SIMPLE_UPLOAD
681        if (self.total_size is not None and
682                self.total_size > _RESUMABLE_UPLOAD_THRESHOLD):
683            strategy = RESUMABLE_UPLOAD
684        if http_request.body and not upload_config.simple_multipart:
685            strategy = RESUMABLE_UPLOAD
686        if not upload_config.simple_path:
687            strategy = RESUMABLE_UPLOAD
688        self.strategy = strategy
689
690    def ConfigureRequest(self, upload_config, http_request, url_builder):
691        """Configure the request and url for this upload."""
692        # Validate total_size vs. max_size
693        if (self.total_size and upload_config.max_size and
694                self.total_size > upload_config.max_size):
695            raise exceptions.InvalidUserInputError(
696                'Upload too big: %s larger than max size %s' % (
697                    self.total_size, upload_config.max_size))
698        # Validate mime type
699        if not util.AcceptableMimeType(upload_config.accept, self.mime_type):
700            raise exceptions.InvalidUserInputError(
701                'MIME type %s does not match any accepted MIME ranges %s' % (
702                    self.mime_type, upload_config.accept))
703
704        self.__SetDefaultUploadStrategy(upload_config, http_request)
705        if self.strategy == SIMPLE_UPLOAD:
706            url_builder.relative_path = upload_config.simple_path
707            if http_request.body:
708                url_builder.query_params['uploadType'] = 'multipart'
709                self.__ConfigureMultipartRequest(http_request)
710            else:
711                url_builder.query_params['uploadType'] = 'media'
712                self.__ConfigureMediaRequest(http_request)
713        else:
714            url_builder.relative_path = upload_config.resumable_path
715            url_builder.query_params['uploadType'] = 'resumable'
716            self.__ConfigureResumableRequest(http_request)
717
718    def __ConfigureMediaRequest(self, http_request):
719        """Configure http_request as a simple request for this upload."""
720        http_request.headers['content-type'] = self.mime_type
721        http_request.body = self.stream.read()
722        http_request.loggable_body = '<media body>'
723
724    def __ConfigureMultipartRequest(self, http_request):
725        """Configure http_request as a multipart request for this upload."""
726        # This is a multipart/related upload.
727        msg_root = mime_multipart.MIMEMultipart('related')
728        # msg_root should not write out its own headers
729        setattr(msg_root, '_write_headers', lambda self: None)
730
731        # attach the body as one part
732        msg = mime_nonmultipart.MIMENonMultipart(
733            *http_request.headers['content-type'].split('/'))
734        msg.set_payload(http_request.body)
735        msg_root.attach(msg)
736
737        # attach the media as the second part
738        msg = mime_nonmultipart.MIMENonMultipart(*self.mime_type.split('/'))
739        msg['Content-Transfer-Encoding'] = 'binary'
740        msg.set_payload(self.stream.read())
741        msg_root.attach(msg)
742
743        # NOTE: We encode the body, but can't use
744        #       `email.message.Message.as_string` because it prepends
745        #       `> ` to `From ` lines.
746        # NOTE: We must use six.StringIO() instead of io.StringIO() since the
747        #       `email` library uses cStringIO in Py2 and io.StringIO in Py3.
748        fp = six.StringIO()
749        g = email_generator.Generator(fp, mangle_from_=False)
750        g.flatten(msg_root, unixfrom=False)
751        http_request.body = fp.getvalue()
752
753        multipart_boundary = msg_root.get_boundary()
754        http_request.headers['content-type'] = (
755            'multipart/related; boundary=%r' % multipart_boundary)
756
757        body_components = http_request.body.split(multipart_boundary)
758        headers, _, _ = body_components[-2].partition('\n\n')
759        body_components[-2] = '\n\n'.join([headers, '<media body>\n\n--'])
760        http_request.loggable_body = multipart_boundary.join(body_components)
761
762    def __ConfigureResumableRequest(self, http_request):
763        http_request.headers['X-Upload-Content-Type'] = self.mime_type
764        if self.total_size is not None:
765            http_request.headers[
766                'X-Upload-Content-Length'] = str(self.total_size)
767
768    def RefreshResumableUploadState(self):
769        """Talk to the server and refresh the state of this resumable upload.
770
771        Returns:
772          Response if the upload is complete.
773        """
774        if self.strategy != RESUMABLE_UPLOAD:
775            return
776        self.EnsureInitialized()
777        refresh_request = http_wrapper.Request(
778            url=self.url, http_method='PUT',
779            headers={'Content-Range': 'bytes */*'})
780        refresh_response = http_wrapper.MakeRequest(
781            self.http, refresh_request, redirections=0,
782            retries=self.num_retries)
783        range_header = self._GetRangeHeaderFromResponse(refresh_response)
784        if refresh_response.status_code in (http_client.OK,
785                                            http_client.CREATED):
786            self.__complete = True
787            self.__progress = self.total_size
788            self.stream.seek(self.progress)
789            # If we're finished, the refresh response will contain the metadata
790            # originally requested. Cache it so it can be returned in
791            # StreamInChunks.
792            self.__final_response = refresh_response
793        elif refresh_response.status_code == http_wrapper.RESUME_INCOMPLETE:
794            if range_header is None:
795                self.__progress = 0
796            else:
797                self.__progress = self.__GetLastByte(range_header) + 1
798            self.stream.seek(self.progress)
799        else:
800            raise exceptions.HttpError.FromResponse(refresh_response)
801
802    def _GetRangeHeaderFromResponse(self, response):
803        return response.info.get('Range', response.info.get('range'))
804
805    def InitializeUpload(self, http_request, http=None, client=None):
806        """Initialize this upload from the given http_request."""
807        if self.strategy is None:
808            raise exceptions.UserError(
809                'No upload strategy set; did you call ConfigureRequest?')
810        if http is None and client is None:
811            raise exceptions.UserError('Must provide client or http.')
812        if self.strategy != RESUMABLE_UPLOAD:
813            return
814        http = http or client.http
815        if client is not None:
816            http_request.url = client.FinalizeTransferUrl(http_request.url)
817        self.EnsureUninitialized()
818        http_response = http_wrapper.MakeRequest(http, http_request,
819                                                 retries=self.num_retries)
820        if http_response.status_code != http_client.OK:
821            raise exceptions.HttpError.FromResponse(http_response)
822
823        self.__server_chunk_granularity = http_response.info.get(
824            'X-Goog-Upload-Chunk-Granularity')
825        url = http_response.info['location']
826        if client is not None:
827            url = client.FinalizeTransferUrl(url)
828        self._Initialize(http, url)
829
830        # Unless the user has requested otherwise, we want to just
831        # go ahead and pump the bytes now.
832        if self.auto_transfer:
833            return self.StreamInChunks()
834
835    def __GetLastByte(self, range_header):
836        _, _, end = range_header.partition('-')
837        # TODO(craigcitro): Validate start == 0?
838        return int(end)
839
840    def __ValidateChunksize(self, chunksize=None):
841        if self.__server_chunk_granularity is None:
842            return
843        chunksize = chunksize or self.chunksize
844        if chunksize % self.__server_chunk_granularity:
845            raise exceptions.ConfigurationValueError(
846                'Server requires chunksize to be a multiple of %d',
847                self.__server_chunk_granularity)
848
849    def __StreamMedia(self, callback=None, finish_callback=None,
850                      additional_headers=None, use_chunks=True):
851        """Helper function for StreamMedia / StreamInChunks."""
852        if self.strategy != RESUMABLE_UPLOAD:
853            raise exceptions.InvalidUserInputError(
854                'Cannot stream non-resumable upload')
855        callback = callback or self.progress_callback
856        finish_callback = finish_callback or self.finish_callback
857        # final_response is set if we resumed an already-completed upload.
858        response = self.__final_response
859        send_func = self.__SendChunk if use_chunks else self.__SendMediaBody
860        if use_chunks:
861            self.__ValidateChunksize(self.chunksize)
862        self.EnsureInitialized()
863        while not self.complete:
864            response = send_func(self.stream.tell(),
865                                 additional_headers=additional_headers)
866            if response.status_code in (http_client.OK, http_client.CREATED):
867                self.__complete = True
868                break
869            self.__progress = self.__GetLastByte(response.info['range'])
870            if self.progress + 1 != self.stream.tell():
871                # TODO(craigcitro): Add a better way to recover here.
872                raise exceptions.CommunicationError(
873                    'Failed to transfer all bytes in chunk, upload paused at '
874                    'byte %d' % self.progress)
875            self._ExecuteCallback(callback, response)
876        if self.__complete and hasattr(self.stream, 'seek'):
877            current_pos = self.stream.tell()
878            self.stream.seek(0, os.SEEK_END)
879            end_pos = self.stream.tell()
880            self.stream.seek(current_pos)
881            if current_pos != end_pos:
882                raise exceptions.TransferInvalidError(
883                    'Upload complete with %s additional bytes left in stream' %
884                    (int(end_pos) - int(current_pos)))
885        self._ExecuteCallback(finish_callback, response)
886        return response
887
888    def StreamMedia(self, callback=None, finish_callback=None,
889                    additional_headers=None):
890        """Send this resumable upload in a single request.
891
892        Args:
893          callback: Progress callback function with inputs
894              (http_wrapper.Response, transfer.Upload)
895          finish_callback: Final callback function with inputs
896              (http_wrapper.Response, transfer.Upload)
897          additional_headers: Dict of headers to include with the upload
898              http_wrapper.Request.
899
900        Returns:
901          http_wrapper.Response of final response.
902        """
903        return self.__StreamMedia(
904            callback=callback, finish_callback=finish_callback,
905            additional_headers=additional_headers, use_chunks=False)
906
907    def StreamInChunks(self, callback=None, finish_callback=None,
908                       additional_headers=None):
909        """Send this (resumable) upload in chunks."""
910        return self.__StreamMedia(
911            callback=callback, finish_callback=finish_callback,
912            additional_headers=additional_headers)
913
914    def __SendMediaRequest(self, request, end):
915        """Request helper function for SendMediaBody & SendChunk."""
916        response = http_wrapper.MakeRequest(
917            self.bytes_http, request, retry_func=self.retry_func,
918            retries=self.num_retries)
919        if response.status_code not in (http_client.OK, http_client.CREATED,
920                                        http_wrapper.RESUME_INCOMPLETE):
921            # We want to reset our state to wherever the server left us
922            # before this failed request, and then raise.
923            self.RefreshResumableUploadState()
924            raise exceptions.HttpError.FromResponse(response)
925        if response.status_code == http_wrapper.RESUME_INCOMPLETE:
926            last_byte = self.__GetLastByte(
927                self._GetRangeHeaderFromResponse(response))
928            if last_byte + 1 != end:
929                self.stream.seek(last_byte)
930        return response
931
932    def __SendMediaBody(self, start, additional_headers=None):
933        """Send the entire media stream in a single request."""
934        self.EnsureInitialized()
935        if self.total_size is None:
936            raise exceptions.TransferInvalidError(
937                'Total size must be known for SendMediaBody')
938        body_stream = stream_slice.StreamSlice(
939            self.stream, self.total_size - start)
940
941        request = http_wrapper.Request(url=self.url, http_method='PUT',
942                                       body=body_stream)
943        request.headers['Content-Type'] = self.mime_type
944        if start == self.total_size:
945            # End of an upload with 0 bytes left to send; just finalize.
946            range_string = 'bytes */%s' % self.total_size
947        else:
948            range_string = 'bytes %s-%s/%s' % (start, self.total_size - 1,
949                                               self.total_size)
950
951        request.headers['Content-Range'] = range_string
952        if additional_headers:
953            request.headers.update(additional_headers)
954
955        return self.__SendMediaRequest(request, self.total_size)
956
957    def __SendChunk(self, start, additional_headers=None):
958        """Send the specified chunk."""
959        self.EnsureInitialized()
960        no_log_body = self.total_size is None
961        if self.total_size is None:
962            # For the streaming resumable case, we need to detect when
963            # we're at the end of the stream.
964            body_stream = buffered_stream.BufferedStream(
965                self.stream, start, self.chunksize)
966            end = body_stream.stream_end_position
967            if body_stream.stream_exhausted:
968                self.__total_size = end
969            # TODO: Here, change body_stream from a stream to a string object,
970            # which means reading a chunk into memory.  This works around
971            # https://code.google.com/p/httplib2/issues/detail?id=176 which can
972            # cause httplib2 to skip bytes on 401's for file objects.
973            # Rework this solution to be more general.
974            body_stream = body_stream.read(self.chunksize)
975        else:
976            end = min(start + self.chunksize, self.total_size)
977            body_stream = stream_slice.StreamSlice(self.stream, end - start)
978        # TODO(craigcitro): Think about clearer errors on "no data in
979        # stream".
980        request = http_wrapper.Request(url=self.url, http_method='PUT',
981                                       body=body_stream)
982        request.headers['Content-Type'] = self.mime_type
983        if no_log_body:
984            # Disable logging of streaming body.
985            # TODO: Remove no_log_body and rework as part of a larger logs
986            # refactor.
987            request.loggable_body = '<media body>'
988        if self.total_size is None:
989            # Streaming resumable upload case, unknown total size.
990            range_string = 'bytes %s-%s/*' % (start, end - 1)
991        elif end == start:
992            # End of an upload with 0 bytes left to send; just finalize.
993            range_string = 'bytes */%s' % self.total_size
994        else:
995            # Normal resumable upload case with known sizes.
996            range_string = 'bytes %s-%s/%s' % (start, end - 1, self.total_size)
997
998        request.headers['Content-Range'] = range_string
999        if additional_headers:
1000            request.headers.update(additional_headers)
1001
1002        return self.__SendMediaRequest(request, end)
1003