1# -*- coding: utf-8 -*-
2# Copyright 2012 Google Inc. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Contains the perfdiag gsutil command."""
16
17from __future__ import absolute_import
18
19import calendar
20from collections import defaultdict
21from collections import namedtuple
22import contextlib
23import cStringIO
24import datetime
25import httplib
26import json
27import logging
28import math
29import multiprocessing
30import os
31import random
32import re
33import socket
34import string
35import subprocess
36import tempfile
37import time
38
39import boto
40import boto.gs.connection
41
42import gslib
43from gslib.cloud_api import NotFoundException
44from gslib.cloud_api import ServiceException
45from gslib.cloud_api_helper import GetDownloadSerializationData
46from gslib.command import Command
47from gslib.command import DummyArgChecker
48from gslib.command_argument import CommandArgument
49from gslib.commands import config
50from gslib.cs_api_map import ApiSelector
51from gslib.exception import CommandException
52from gslib.file_part import FilePart
53from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
54from gslib.storage_url import StorageUrlFromString
55from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
56from gslib.util import CheckFreeSpace
57from gslib.util import DivideAndCeil
58from gslib.util import GetCloudApiInstance
59from gslib.util import GetFileSize
60from gslib.util import GetMaxRetryDelay
61from gslib.util import HumanReadableToBytes
62from gslib.util import IS_LINUX
63from gslib.util import MakeBitsHumanReadable
64from gslib.util import MakeHumanReadable
65from gslib.util import Percentile
66from gslib.util import ResumableThreshold
67
68_SYNOPSIS = """
69  gsutil perfdiag [-i in.json]
70  gsutil perfdiag [-o out.json] [-n objects] [-c processes]
71      [-k threads] [-p parallelism type] [-y slices] [-s size] [-d directory]
72      [-t tests] url...
73"""
74
75_DETAILED_HELP_TEXT = ("""
76<B>SYNOPSIS</B>
77""" + _SYNOPSIS + """
78
79
80<B>DESCRIPTION</B>
81  The perfdiag command runs a suite of diagnostic tests for a given Google
82  Storage bucket.
83
84  The 'url' parameter must name an existing bucket (e.g. gs://foo) to which
85  the user has write permission. Several test files will be uploaded to and
86  downloaded from this bucket. All test files will be deleted at the completion
87  of the diagnostic if it finishes successfully.
88
89  gsutil performance can be impacted by many factors at the client, server,
90  and in-between, such as: CPU speed; available memory; the access path to the
91  local disk; network bandwidth; contention and error rates along the path
92  between gsutil and Google; operating system buffering configuration; and
93  firewalls and other network elements. The perfdiag command is provided so
94  that customers can run a known measurement suite when troubleshooting
95  performance problems.
96
97
98<B>PROVIDING DIAGNOSTIC OUTPUT TO GOOGLE CLOUD STORAGE TEAM</B>
99  If the Google Cloud Storage Team asks you to run a performance diagnostic
100  please use the following command, and email the output file (output.json)
101  to gs-team@google.com:
102
103    gsutil perfdiag -o output.json gs://your-bucket
104
105
106<B>OPTIONS</B>
107  -n          Sets the number of objects to use when downloading and uploading
108              files during tests. Defaults to 5.
109
110  -c          Sets the number of processes to use while running throughput
111              experiments. The default value is 1.
112
113  -k          Sets the number of threads per process to use while running
114              throughput experiments. Each process will receive an equal number
115              of threads. The default value is 1.
116
117              Note: All specified threads and processes will be created, but may
118              not by saturated with work if too few objects (specified with -n)
119              and too few components (specified with -y) are specified.
120
121  -p          Sets the type of parallelism to be used (only applicable when
122              threads or processes are specified and threads * processes > 1).
123              The default is to use fan. Must be one of the following:
124
125              fan
126                 Use one thread per object. This is akin to using gsutil -m cp,
127                 with sliced object download / parallel composite upload
128                 disabled.
129
130              slice
131                 Use Y (specified with -y) threads for each object, transferring
132                 one object at a time. This is akin to using parallel object
133                 download / parallel composite upload, without -m. Sliced
134                 uploads not supported for s3.
135
136              both
137                 Use Y (specified with -y) threads for each object, transferring
138                 multiple objects at a time. This is akin to simultaneously
139                 using sliced object download / parallel composite upload and
140                 gsutil -m cp. Sliced uploads not supported for s3.
141
142  -y          Sets the number of slices to divide each file/object into while
143              transferring data. Only applicable with the slice (or both)
144              parallelism type. The default is 4 slices.
145
146  -s          Sets the size (in bytes) for each of the N (set with -n) objects
147              used in the read and write throughput tests. The default is 1 MiB.
148              This can also be specified using byte suffixes such as 500K or 1M.
149              Note: these values are interpreted as multiples of 1024 (K=1024,
150              M=1024*1024, etc.)
151              Note: If rthru_file or wthru_file are performed, N (set with -n)
152              times as much disk space as specified will be required for the
153              operation.
154
155  -d          Sets the directory to store temporary local files in. If not
156              specified, a default temporary directory will be used.
157
158  -t          Sets the list of diagnostic tests to perform. The default is to
159              run the lat, rthru, and wthru diagnostic tests. Must be a
160              comma-separated list containing one or more of the following:
161
162              lat
163                 For N (set with -n) objects, write the object, retrieve its
164                 metadata, read the object, and finally delete the object.
165                 Record the latency of each operation.
166
167              list
168                 Write N (set with -n) objects to the bucket, record how long
169                 it takes for the eventually consistent listing call to return
170                 the N objects in its result, delete the N objects, then record
171                 how long it takes listing to stop returning the N objects.
172                 This test is off by default.
173
174              rthru
175                 Runs N (set with -n) read operations, with at most C
176                 (set with -c) reads outstanding at any given time.
177
178              rthru_file
179                 The same as rthru, but simultaneously writes data to the disk,
180                 to gauge the performance impact of the local disk on downloads.
181
182              wthru
183                 Runs N (set with -n) write operations, with at most C
184                 (set with -c) writes outstanding at any given time.
185
186              wthru_file
187                 The same as wthru, but simultaneously reads data from the disk,
188                 to gauge the performance impact of the local disk on uploads.
189
190  -m          Adds metadata to the result JSON file. Multiple -m values can be
191              specified. Example:
192
193                  gsutil perfdiag -m "key1:val1" -m "key2:val2" gs://bucketname
194
195              Each metadata key will be added to the top-level "metadata"
196              dictionary in the output JSON file.
197
198  -o          Writes the results of the diagnostic to an output file. The output
199              is a JSON file containing system information and performance
200              diagnostic results. The file can be read and reported later using
201              the -i option.
202
203  -i          Reads the JSON output file created using the -o command and prints
204              a formatted description of the results.
205
206
207<B>MEASURING AVAILABILITY</B>
208  The perfdiag command ignores the boto num_retries configuration parameter.
209  Instead, it always retries on HTTP errors in the 500 range and keeps track of
210  how many 500 errors were encountered during the test. The availability
211  measurement is reported at the end of the test.
212
213  Note that HTTP responses are only recorded when the request was made in a
214  single process. When using multiple processes or threads, read and write
215  throughput measurements are performed in an external process, so the
216  availability numbers reported won't include the throughput measurements.
217
218
219<B>NOTE</B>
220  The perfdiag command collects system information. It collects your IP address,
221  executes DNS queries to Google servers and collects the results, and collects
222  network statistics information from the output of netstat -s. It will also
223  attempt to connect to your proxy server if you have one configured. None of
224  this information will be sent to Google unless you choose to send it.
225""")
226
227FileDataTuple = namedtuple(
228    'FileDataTuple',
229    'size md5 data')
230
231# Describes one object in a fanned download. If need_to_slice is specified as
232# True, the object should be downloaded with the slice strategy. Other field
233# names are the same as documented in PerfDiagCommand.Download.
234FanDownloadTuple = namedtuple(
235    'FanDownloadTuple',
236    'need_to_slice object_name file_name serialization_data')
237
238# Describes one slice in a sliced download.
239# Field names are the same as documented in PerfDiagCommand.Download.
240SliceDownloadTuple = namedtuple(
241    'SliceDownloadTuple',
242    'object_name file_name serialization_data start_byte end_byte')
243
244# Describes one file in a fanned upload. If need_to_slice is specified as
245# True, the file should be uploaded with the slice strategy. Other field
246# names are the same as documented in PerfDiagCommand.Upload.
247FanUploadTuple = namedtuple(
248    'FanUploadTuple',
249    'need_to_slice file_name object_name use_file')
250
251# Describes one slice in a sliced upload.
252# Field names are the same as documented in PerfDiagCommand.Upload.
253SliceUploadTuple = namedtuple(
254    'SliceUploadTuple',
255    'file_name object_name use_file file_start file_size')
256
257# Dict storing file_path:FileDataTuple for each temporary file used by
258# perfdiag. This data should be kept outside of the PerfDiagCommand class
259# since calls to Apply will make copies of all member data.
260temp_file_dict = {}
261
262
263class Error(Exception):
264  """Base exception class for this module."""
265  pass
266
267
268class InvalidArgument(Error):
269  """Raised on invalid arguments to functions."""
270  pass
271
272
273def _DownloadObject(cls, args, thread_state=None):
274  """Function argument to apply for performing fanned parallel downloads.
275
276  Args:
277    cls: The calling PerfDiagCommand class instance.
278    args: A FanDownloadTuple object describing this download.
279    thread_state: gsutil Cloud API instance to use for the operation.
280  """
281  cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
282  if args.need_to_slice:
283    cls.PerformSlicedDownload(args.object_name, args.file_name,
284                              args.serialization_data)
285  else:
286    cls.Download(args.object_name, args.file_name, args.serialization_data)
287
288
289def _DownloadSlice(cls, args, thread_state=None):
290  """Function argument to apply for performing sliced downloads.
291
292  Args:
293    cls: The calling PerfDiagCommand class instance.
294    args: A SliceDownloadTuple object describing this download.
295    thread_state: gsutil Cloud API instance to use for the operation.
296  """
297  cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
298  cls.Download(args.object_name, args.file_name, args.serialization_data,
299               args.start_byte, args.end_byte)
300
301
302def _UploadObject(cls, args, thread_state=None):
303  """Function argument to apply for performing fanned parallel uploads.
304
305  Args:
306    cls: The calling PerfDiagCommand class instance.
307    args: A FanUploadTuple object describing this upload.
308    thread_state: gsutil Cloud API instance to use for the operation.
309  """
310  cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
311  if args.need_to_slice:
312    cls.PerformSlicedUpload(args.file_name, args.object_name, args.use_file)
313  else:
314    cls.Upload(args.file_name, args.object_name, args.use_file)
315
316
317def _UploadSlice(cls, args, thread_state=None):
318  """Function argument to apply for performing sliced parallel uploads.
319
320  Args:
321    cls: The calling PerfDiagCommand class instance.
322    args: A SliceUploadTuple object describing this upload.
323    thread_state: gsutil Cloud API instance to use for the operation.
324  """
325  cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
326  cls.Upload(args.file_name, args.object_name, args.use_file,
327             args.file_start, args.file_size)
328
329
330def _DeleteWrapper(cls, object_name, thread_state=None):
331  """Function argument to apply for performing parallel object deletions.
332
333  Args:
334    cls: The calling PerfDiagCommand class instance.
335    object_name: The object name to delete from the test bucket.
336    thread_state: gsutil Cloud API instance to use for the operation.
337  """
338  cls.gsutil_api = GetCloudApiInstance(cls, thread_state)
339  cls.Delete(object_name)
340
341
342def _PerfdiagExceptionHandler(cls, e):
343  """Simple exception handler to allow post-completion status."""
344  cls.logger.error(str(e))
345
346
347def _DummyTrackerCallback(_):
348  pass
349
350
351class DummyFile(object):
352  """A dummy, file-like object that throws away everything written to it."""
353
354  def write(self, *args, **kwargs):  # pylint: disable=invalid-name
355    pass
356
357  def close(self):  # pylint: disable=invalid-name
358    pass
359
360
361# Many functions in perfdiag re-define a temporary function based on a
362# variable from a loop, resulting in a false positive from the linter.
363# pylint: disable=cell-var-from-loop
364class PerfDiagCommand(Command):
365  """Implementation of gsutil perfdiag command."""
366
367  # Command specification. See base class for documentation.
368  command_spec = Command.CreateCommandSpec(
369      'perfdiag',
370      command_name_aliases=['diag', 'diagnostic', 'perf', 'performance'],
371      usage_synopsis=_SYNOPSIS,
372      min_args=0,
373      max_args=1,
374      supported_sub_args='n:c:k:p:y:s:d:t:m:i:o:',
375      file_url_ok=False,
376      provider_url_ok=False,
377      urls_start_arg=0,
378      gs_api_support=[ApiSelector.XML, ApiSelector.JSON],
379      gs_default_api=ApiSelector.JSON,
380      argparse_arguments=[
381          CommandArgument.MakeNCloudBucketURLsArgument(1)
382      ]
383  )
384  # Help specification. See help_provider.py for documentation.
385  help_spec = Command.HelpSpec(
386      help_name='perfdiag',
387      help_name_aliases=[],
388      help_type='command_help',
389      help_one_line_summary='Run performance diagnostic',
390      help_text=_DETAILED_HELP_TEXT,
391      subcommand_help_text={},
392  )
393
394  # Byte sizes to use for latency testing files.
395  # TODO: Consider letting the user specify these sizes with a configuration
396  # parameter.
397  test_lat_file_sizes = (
398      0,  # 0 bytes
399      1024,  # 1 KiB
400      102400,  # 100 KiB
401      1048576,  # 1 MiB
402  )
403
404  # Test names.
405  RTHRU = 'rthru'
406  RTHRU_FILE = 'rthru_file'
407  WTHRU = 'wthru'
408  WTHRU_FILE = 'wthru_file'
409  LAT = 'lat'
410  LIST = 'list'
411
412  # Parallelism strategies.
413  FAN = 'fan'
414  SLICE = 'slice'
415  BOTH = 'both'
416
417  # List of all diagnostic tests.
418  ALL_DIAG_TESTS = (RTHRU, RTHRU_FILE, WTHRU, WTHRU_FILE, LAT, LIST)
419
420  # List of diagnostic tests to run by default.
421  DEFAULT_DIAG_TESTS = (RTHRU, WTHRU, LAT)
422
423  # List of parallelism strategies.
424  PARALLEL_STRATEGIES = (FAN, SLICE, BOTH)
425
426  # Google Cloud Storage XML API endpoint host.
427  XML_API_HOST = boto.config.get(
428      'Credentials', 'gs_host', boto.gs.connection.GSConnection.DefaultHost)
429  # Google Cloud Storage XML API endpoint port.
430  XML_API_PORT = boto.config.get('Credentials', 'gs_port', 80)
431
432  # Maximum number of times to retry requests on 5xx errors.
433  MAX_SERVER_ERROR_RETRIES = 5
434  # Maximum number of times to retry requests on more serious errors like
435  # the socket breaking.
436  MAX_TOTAL_RETRIES = 10
437
438  # The default buffer size in boto's Key object is set to 8 KiB. This becomes a
439  # bottleneck at high throughput rates, so we increase it.
440  KEY_BUFFER_SIZE = 16384
441
442  # The maximum number of bytes to generate pseudo-randomly before beginning
443  # to repeat bytes. This number was chosen as the next prime larger than 5 MiB.
444  MAX_UNIQUE_RANDOM_BYTES = 5242883
445
446  # Maximum amount of time, in seconds, we will wait for object listings to
447  # reflect what we expect in the listing tests.
448  MAX_LISTING_WAIT_TIME = 60.0
449
450  def _Exec(self, cmd, raise_on_error=True, return_output=False,
451            mute_stderr=False):
452    """Executes a command in a subprocess.
453
454    Args:
455      cmd: List containing the command to execute.
456      raise_on_error: Whether or not to raise an exception when a process exits
457          with a non-zero return code.
458      return_output: If set to True, the return value of the function is the
459          stdout of the process.
460      mute_stderr: If set to True, the stderr of the process is not printed to
461          the console.
462
463    Returns:
464      The return code of the process or the stdout if return_output is set.
465
466    Raises:
467      Exception: If raise_on_error is set to True and any process exits with a
468      non-zero return code.
469    """
470    self.logger.debug('Running command: %s', cmd)
471    stderr = subprocess.PIPE if mute_stderr else None
472    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=stderr)
473    (stdoutdata, _) = p.communicate()
474    if raise_on_error and p.returncode:
475      raise CommandException("Received non-zero return code (%d) from "
476                             "subprocess '%s'." % (p.returncode, ' '.join(cmd)))
477    return stdoutdata if return_output else p.returncode
478
479  def _WarnIfLargeData(self):
480    """Outputs a warning message if a large amount of data is being used."""
481    if self.num_objects * self.thru_filesize > HumanReadableToBytes('2GiB'):
482      self.logger.info('This is a large operation, and could take a while.')
483
484  def _MakeTempFile(self, file_size=0, mem_metadata=False,
485                    mem_data=False, prefix='gsutil_test_file'):
486    """Creates a temporary file of the given size and returns its path.
487
488    Args:
489      file_size: The size of the temporary file to create.
490      mem_metadata: If true, store md5 and file size in memory at
491                    temp_file_dict[fpath].md5, tempfile_data[fpath].file_size.
492      mem_data: If true, store the file data in memory at
493                temp_file_dict[fpath].data
494      prefix: The prefix to use for the temporary file. Defaults to
495              gsutil_test_file.
496
497    Returns:
498      The file path of the created temporary file.
499    """
500    fd, fpath = tempfile.mkstemp(suffix='.bin', prefix=prefix,
501                                 dir=self.directory, text=False)
502    with os.fdopen(fd, 'wb') as fp:
503      random_bytes = os.urandom(min(file_size,
504                                    self.MAX_UNIQUE_RANDOM_BYTES))
505      total_bytes_written = 0
506      while total_bytes_written < file_size:
507        num_bytes = min(self.MAX_UNIQUE_RANDOM_BYTES,
508                        file_size - total_bytes_written)
509        fp.write(random_bytes[:num_bytes])
510        total_bytes_written += num_bytes
511
512    if mem_metadata or mem_data:
513      with open(fpath, 'rb') as fp:
514        file_size = GetFileSize(fp) if mem_metadata else None
515        md5 = CalculateB64EncodedMd5FromContents(fp) if mem_metadata else None
516        data = fp.read() if mem_data else None
517        temp_file_dict[fpath] = FileDataTuple(file_size, md5, data)
518
519    self.temporary_files.add(fpath)
520    return fpath
521
522  def _SetUp(self):
523    """Performs setup operations needed before diagnostics can be run."""
524
525    # Stores test result data.
526    self.results = {}
527    # Set of file paths for local temporary files.
528    self.temporary_files = set()
529    # Set of names for test objects that exist in the test bucket.
530    self.temporary_objects = set()
531    # Total number of HTTP requests made.
532    self.total_requests = 0
533    # Total number of HTTP 5xx errors.
534    self.request_errors = 0
535    # Number of responses, keyed by response code.
536    self.error_responses_by_code = defaultdict(int)
537    # Total number of socket errors.
538    self.connection_breaks = 0
539    # Boolean to prevent doing cleanup twice.
540    self.teardown_completed = False
541
542    # Create files for latency test.
543    if self.LAT in self.diag_tests:
544      self.latency_files = []
545      for file_size in self.test_lat_file_sizes:
546        fpath = self._MakeTempFile(file_size, mem_metadata=True, mem_data=True)
547        self.latency_files.append(fpath)
548
549    # Create files for throughput tests.
550    if self.diag_tests.intersection(
551        (self.RTHRU, self.WTHRU, self.RTHRU_FILE, self.WTHRU_FILE)):
552      # Create a file for warming up the TCP connection.
553      self.tcp_warmup_file = self._MakeTempFile(
554          5 * 1024 * 1024, mem_metadata=True, mem_data=True)
555
556      # For in memory tests, throughput tests transfer the same object N times
557      # instead of creating N objects, in order to avoid excessive memory usage.
558      if self.diag_tests.intersection((self.RTHRU, self.WTHRU)):
559        self.mem_thru_file_name = self._MakeTempFile(
560            self.thru_filesize, mem_metadata=True, mem_data=True)
561        self.mem_thru_object_name = os.path.basename(self.mem_thru_file_name)
562
563      # For tests that use disk I/O, it is necessary to create N objects in
564      # in order to properly measure the performance impact of seeks.
565      if self.diag_tests.intersection((self.RTHRU_FILE, self.WTHRU_FILE)):
566        # List of file names and corresponding object names to use for file
567        # throughput tests.
568        self.thru_file_names = []
569        self.thru_object_names = []
570
571        free_disk_space = CheckFreeSpace(self.directory)
572        if free_disk_space >= self.thru_filesize * self.num_objects:
573          self.logger.info('\nCreating %d local files each of size %s.'
574                           % (self.num_objects,
575                              MakeHumanReadable(self.thru_filesize)))
576          self._WarnIfLargeData()
577          for _ in range(self.num_objects):
578            file_name = self._MakeTempFile(self.thru_filesize,
579                                           mem_metadata=True)
580            self.thru_file_names.append(file_name)
581            self.thru_object_names.append(os.path.basename(file_name))
582        else:
583          raise CommandException(
584              'Not enough free disk space for throughput files: '
585              '%s of disk space required, but only %s available.'
586              % (MakeHumanReadable(self.thru_filesize * self.num_objects),
587                 MakeHumanReadable(free_disk_space)))
588
589    # Dummy file buffer to use for downloading that goes nowhere.
590    self.discard_sink = DummyFile()
591
592    # Filter out misleading progress callback output and the incorrect
593    # suggestion to use gsutil -m perfdiag.
594    self.logger.addFilter(self._PerfdiagFilter())
595
596  def _TearDown(self):
597    """Performs operations to clean things up after performing diagnostics."""
598    if not self.teardown_completed:
599      temp_file_dict.clear()
600
601      try:
602        for fpath in self.temporary_files:
603          os.remove(fpath)
604        if self.delete_directory:
605          os.rmdir(self.directory)
606      except OSError:
607        pass
608
609      if self.threads > 1 or self.processes > 1:
610        args = [obj for obj in self.temporary_objects]
611        self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler,
612                   arg_checker=DummyArgChecker,
613                   parallel_operations_override=True,
614                   process_count=self.processes, thread_count=self.threads)
615      else:
616        for object_name in self.temporary_objects:
617          self.Delete(object_name)
618    self.teardown_completed = True
619
620  @contextlib.contextmanager
621  def _Time(self, key, bucket):
622    """A context manager that measures time.
623
624    A context manager that prints a status message before and after executing
625    the inner command and times how long the inner command takes. Keeps track of
626    the timing, aggregated by the given key.
627
628    Args:
629      key: The key to insert the timing value into a dictionary bucket.
630      bucket: A dictionary to place the timing value in.
631
632    Yields:
633      For the context manager.
634    """
635    self.logger.info('%s starting...', key)
636    t0 = time.time()
637    yield
638    t1 = time.time()
639    bucket[key].append(t1 - t0)
640    self.logger.info('%s done.', key)
641
642  def _RunOperation(self, func):
643    """Runs an operation with retry logic.
644
645    Args:
646      func: The function to run.
647
648    Returns:
649      True if the operation succeeds, False if aborted.
650    """
651    # We retry on httplib exceptions that can happen if the socket was closed
652    # by the remote party or the connection broke because of network issues.
653    # Only the BotoServerError is counted as a 5xx error towards the retry
654    # limit.
655    success = False
656    server_error_retried = 0
657    total_retried = 0
658    i = 0
659    return_val = None
660    while not success:
661      next_sleep = min(random.random() * (2 ** i) + 1, GetMaxRetryDelay())
662      try:
663        return_val = func()
664        self.total_requests += 1
665        success = True
666      except tuple(self.exceptions) as e:
667        total_retried += 1
668        if total_retried > self.MAX_TOTAL_RETRIES:
669          self.logger.info('Reached maximum total retries. Not retrying.')
670          break
671        if isinstance(e, ServiceException):
672          if e.status >= 500:
673            self.error_responses_by_code[e.status] += 1
674            self.total_requests += 1
675            self.request_errors += 1
676            server_error_retried += 1
677            time.sleep(next_sleep)
678          else:
679            raise
680          if server_error_retried > self.MAX_SERVER_ERROR_RETRIES:
681            self.logger.info(
682                'Reached maximum server error retries. Not retrying.')
683            break
684        else:
685          self.connection_breaks += 1
686    return return_val
687
688  def _RunLatencyTests(self):
689    """Runs latency tests."""
690    # Stores timing information for each category of operation.
691    self.results['latency'] = defaultdict(list)
692
693    for i in range(self.num_objects):
694      self.logger.info('\nRunning latency iteration %d...', i+1)
695      for fpath in self.latency_files:
696        file_data = temp_file_dict[fpath]
697        url = self.bucket_url.Clone()
698        url.object_name = os.path.basename(fpath)
699        file_size = file_data.size
700        readable_file_size = MakeHumanReadable(file_size)
701
702        self.logger.info(
703            "\nFile of size %s located on disk at '%s' being diagnosed in the "
704            "cloud at '%s'.", readable_file_size, fpath, url)
705
706        upload_target = StorageUrlToUploadObjectMetadata(url)
707
708        def _Upload():
709          io_fp = cStringIO.StringIO(file_data.data)
710          with self._Time('UPLOAD_%d' % file_size, self.results['latency']):
711            self.gsutil_api.UploadObject(
712                io_fp, upload_target, size=file_size, provider=self.provider,
713                fields=['name'])
714        self._RunOperation(_Upload)
715
716        def _Metadata():
717          with self._Time('METADATA_%d' % file_size, self.results['latency']):
718            return self.gsutil_api.GetObjectMetadata(
719                url.bucket_name, url.object_name,
720                provider=self.provider, fields=['name', 'contentType',
721                                                'mediaLink', 'size'])
722        # Download will get the metadata first if we don't pass it in.
723        download_metadata = self._RunOperation(_Metadata)
724        serialization_data = GetDownloadSerializationData(download_metadata)
725
726        def _Download():
727          with self._Time('DOWNLOAD_%d' % file_size, self.results['latency']):
728            self.gsutil_api.GetObjectMedia(
729                url.bucket_name, url.object_name, self.discard_sink,
730                provider=self.provider, serialization_data=serialization_data)
731        self._RunOperation(_Download)
732
733        def _Delete():
734          with self._Time('DELETE_%d' % file_size, self.results['latency']):
735            self.gsutil_api.DeleteObject(url.bucket_name, url.object_name,
736                                         provider=self.provider)
737        self._RunOperation(_Delete)
738
739  class _PerfdiagFilter(logging.Filter):
740
741    def filter(self, record):
742      # Used to prevent unnecessary output when using multiprocessing.
743      msg = record.getMessage()
744      return not (('Copying file:///' in msg) or ('Copying gs://' in msg) or
745                  ('Computing CRC' in msg) or ('gsutil -m perfdiag' in msg))
746
747  def _PerfdiagExceptionHandler(self, e):
748    """Simple exception handler to allow post-completion status."""
749    self.logger.error(str(e))
750
751  def PerformFannedDownload(self, need_to_slice, object_names, file_names,
752                            serialization_data):
753    """Performs a parallel download of multiple objects using the fan strategy.
754
755    Args:
756      need_to_slice: If True, additionally apply the slice strategy to each
757                     object in object_names.
758      object_names: A list of object names to be downloaded. Each object must
759                    already exist in the test bucket.
760      file_names: A list, corresponding by index to object_names, of file names
761                  for downloaded data. If None, discard downloaded data.
762      serialization_data: A list, corresponding by index to object_names,
763                          of serialization data for each object.
764    """
765    args = []
766    for i in range(len(object_names)):
767      file_name = file_names[i] if file_names else None
768      args.append(FanDownloadTuple(
769          need_to_slice, object_names[i], file_name,
770          serialization_data[i]))
771    self.Apply(_DownloadObject, args, _PerfdiagExceptionHandler,
772               ('total_requests', 'request_errors'),
773               arg_checker=DummyArgChecker, parallel_operations_override=True,
774               process_count=self.processes, thread_count=self.threads)
775
776  def PerformSlicedDownload(self, object_name, file_name, serialization_data):
777    """Performs a download of an object using the slice strategy.
778
779    Args:
780      object_name: The name of the object to download.
781      file_name: The name of the file to download data to, or None if data
782                 should be discarded.
783      serialization_data: The serialization data for the object.
784    """
785    if file_name:
786      with open(file_name, 'ab') as fp:
787        fp.truncate(self.thru_filesize)
788    component_size = DivideAndCeil(self.thru_filesize, self.num_slices)
789    args = []
790    for i in range(self.num_slices):
791      start_byte = i * component_size
792      end_byte = min((i + 1) * (component_size) - 1, self.thru_filesize - 1)
793      args.append(SliceDownloadTuple(object_name, file_name, serialization_data,
794                                     start_byte, end_byte))
795    self.Apply(_DownloadSlice, args, _PerfdiagExceptionHandler,
796               ('total_requests', 'request_errors'),
797               arg_checker=DummyArgChecker, parallel_operations_override=True,
798               process_count=self.processes, thread_count=self.threads)
799
800  def PerformFannedUpload(self, need_to_slice, file_names, object_names,
801                          use_file):
802    """Performs a parallel upload of multiple files using the fan strategy.
803
804    The metadata for file_name should be present in temp_file_dict prior
805    to calling. Also, the data for file_name should be present in temp_file_dict
806    if use_file is specified as False.
807
808    Args:
809      need_to_slice: If True, additionally apply the slice strategy to each
810                     file in file_names.
811      file_names: A list of file names to be uploaded.
812      object_names: A list, corresponding by by index to file_names, of object
813                    names to upload data to.
814      use_file: If true, use disk I/O, otherwise read upload data from memory.
815    """
816    args = []
817    for i in range(len(file_names)):
818      args.append(FanUploadTuple(
819          need_to_slice, file_names[i], object_names[i], use_file))
820    self.Apply(_UploadObject, args, _PerfdiagExceptionHandler,
821               ('total_requests', 'request_errors'),
822               arg_checker=DummyArgChecker, parallel_operations_override=True,
823               process_count=self.processes, thread_count=self.threads)
824
825  def PerformSlicedUpload(self, file_name, object_name, use_file):
826    """Performs a parallel upload of a file using the slice strategy.
827
828    The metadata for file_name should be present in temp_file_dict prior
829    to calling. Also, the data from for file_name should be present in
830    temp_file_dict if use_file is specified as False.
831
832    Args:
833      file_name: The name of the file to upload.
834      object_name: The name of the object to upload to.
835      use_file: If true, use disk I/O, otherwise read upload data from memory.
836    """
837    # Divide the file into components.
838    component_size = DivideAndCeil(self.thru_filesize, self.num_slices)
839    component_object_names = (
840        [object_name + str(i) for i in range(self.num_slices)])
841
842    args = []
843    for i in range(self.num_slices):
844      component_start = i * component_size
845      component_size = min(component_size,
846                           temp_file_dict[file_name].size - component_start)
847      args.append(SliceUploadTuple(file_name, component_object_names[i],
848                                   use_file, component_start, component_size))
849
850    # Upload the components in parallel.
851    try:
852      self.Apply(_UploadSlice, args, _PerfdiagExceptionHandler,
853                 ('total_requests', 'request_errors'),
854                 arg_checker=DummyArgChecker, parallel_operations_override=True,
855                 process_count=self.processes, thread_count=self.threads)
856
857      # Compose the components into an object.
858      request_components = []
859      for i in range(self.num_slices):
860        src_obj_metadata = (
861            apitools_messages.ComposeRequest.SourceObjectsValueListEntry(
862                name=component_object_names[i]))
863        request_components.append(src_obj_metadata)
864
865      dst_obj_metadata = apitools_messages.Object()
866      dst_obj_metadata.name = object_name
867      dst_obj_metadata.bucket = self.bucket_url.bucket_name
868      def _Compose():
869        self.gsutil_api.ComposeObject(request_components, dst_obj_metadata,
870                                      provider=self.provider)
871      self._RunOperation(_Compose)
872    finally:
873      # Delete the temporary components.
874      self.Apply(_DeleteWrapper, component_object_names,
875                 _PerfdiagExceptionHandler,
876                 ('total_requests', 'request_errors'),
877                 arg_checker=DummyArgChecker, parallel_operations_override=True,
878                 process_count=self.processes, thread_count=self.threads)
879
880  def _RunReadThruTests(self, use_file=False):
881    """Runs read throughput tests."""
882    test_name = 'read_throughput_file' if use_file else 'read_throughput'
883    file_io_string = 'with file I/O' if use_file else ''
884    self.logger.info(
885        '\nRunning read throughput tests %s (%s objects of size %s)' %
886        (file_io_string, self.num_objects,
887         MakeHumanReadable(self.thru_filesize)))
888    self._WarnIfLargeData()
889
890    self.results[test_name] = {'file_size': self.thru_filesize,
891                               'processes': self.processes,
892                               'threads': self.threads,
893                               'parallelism': self.parallel_strategy
894                              }
895
896    # Copy the file(s) to the test bucket, and also get the serialization data
897    # so that we can pass it to download.
898    if use_file:
899      # For test with file I/O use N files on disk to preserve seek performance.
900      file_names = self.thru_file_names
901      object_names = self.thru_object_names
902      serialization_data = []
903      for i in range(self.num_objects):
904        self.temporary_objects.add(self.thru_object_names[i])
905        if self.WTHRU_FILE in self.diag_tests:
906          # If we ran the WTHRU_FILE test, then the objects already exist.
907          obj_metadata = self.gsutil_api.GetObjectMetadata(
908              self.bucket_url.bucket_name, self.thru_object_names[i],
909              fields=['size', 'mediaLink'], provider=self.bucket_url.scheme)
910        else:
911          obj_metadata = self.Upload(self.thru_file_names[i],
912                                     self.thru_object_names[i], use_file)
913
914        # File overwrite causes performance issues with sliced downloads.
915        # Delete the file and reopen it for download. This matches what a real
916        # download would look like.
917        os.unlink(self.thru_file_names[i])
918        open(self.thru_file_names[i], 'ab').close()
919        serialization_data.append(GetDownloadSerializationData(obj_metadata))
920    else:
921      # For in-memory test only use one file but copy it num_objects times, to
922      # allow scalability in num_objects.
923      self.temporary_objects.add(self.mem_thru_object_name)
924      obj_metadata = self.Upload(self.mem_thru_file_name,
925                                 self.mem_thru_object_name, use_file)
926      file_names = None
927      object_names = [self.mem_thru_object_name] * self.num_objects
928      serialization_data = (
929          [GetDownloadSerializationData(obj_metadata)] * self.num_objects)
930
931    # Warmup the TCP connection.
932    warmup_obj_name = os.path.basename(self.tcp_warmup_file)
933    self.temporary_objects.add(warmup_obj_name)
934    self.Upload(self.tcp_warmup_file, warmup_obj_name)
935    self.Download(warmup_obj_name)
936
937    t0 = time.time()
938    if self.processes == 1 and self.threads == 1:
939      for i in range(self.num_objects):
940        file_name = file_names[i] if use_file else None
941        self.Download(object_names[i], file_name, serialization_data[i])
942    else:
943      if self.parallel_strategy in (self.FAN, self.BOTH):
944        need_to_slice = (self.parallel_strategy == self.BOTH)
945        self.PerformFannedDownload(need_to_slice, object_names, file_names,
946                                   serialization_data)
947      elif self.parallel_strategy == self.SLICE:
948        for i in range(self.num_objects):
949          file_name = file_names[i] if use_file else None
950          self.PerformSlicedDownload(
951              object_names[i], file_name, serialization_data[i])
952    t1 = time.time()
953
954    time_took = t1 - t0
955    total_bytes_copied = self.thru_filesize * self.num_objects
956    bytes_per_second = total_bytes_copied / time_took
957
958    self.results[test_name]['time_took'] = time_took
959    self.results[test_name]['total_bytes_copied'] = total_bytes_copied
960    self.results[test_name]['bytes_per_second'] = bytes_per_second
961
962  def _RunWriteThruTests(self, use_file=False):
963    """Runs write throughput tests."""
964    test_name = 'write_throughput_file' if use_file else 'write_throughput'
965    file_io_string = 'with file I/O' if use_file else ''
966    self.logger.info(
967        '\nRunning write throughput tests %s (%s objects of size %s)' %
968        (file_io_string, self.num_objects,
969         MakeHumanReadable(self.thru_filesize)))
970    self._WarnIfLargeData()
971
972    self.results[test_name] = {'file_size': self.thru_filesize,
973                               'processes': self.processes,
974                               'threads': self.threads,
975                               'parallelism': self.parallel_strategy}
976
977    # Warmup the TCP connection.
978    warmup_obj_name = os.path.basename(self.tcp_warmup_file)
979    self.temporary_objects.add(warmup_obj_name)
980    self.Upload(self.tcp_warmup_file, warmup_obj_name)
981
982    if use_file:
983      # For test with file I/O use N files on disk to preserve seek performance.
984      file_names = self.thru_file_names
985      object_names = self.thru_object_names
986    else:
987      # For in-memory test only use one file but copy it num_objects times, to
988      # allow for scalability in num_objects.
989      file_names = [self.mem_thru_file_name] * self.num_objects
990      object_names = (
991          [self.mem_thru_object_name + str(i) for i in range(self.num_objects)])
992
993    for object_name in object_names:
994      self.temporary_objects.add(object_name)
995
996    t0 = time.time()
997    if self.processes == 1 and self.threads == 1:
998      for i in range(self.num_objects):
999        self.Upload(file_names[i], object_names[i], use_file)
1000    else:
1001      if self.parallel_strategy in (self.FAN, self.BOTH):
1002        need_to_slice = (self.parallel_strategy == self.BOTH)
1003        self.PerformFannedUpload(need_to_slice, file_names, object_names,
1004                                 use_file)
1005      elif self.parallel_strategy == self.SLICE:
1006        for i in range(self.num_objects):
1007          self.PerformSlicedUpload(file_names[i], object_names[i], use_file)
1008    t1 = time.time()
1009
1010    time_took = t1 - t0
1011    total_bytes_copied = self.thru_filesize * self.num_objects
1012    bytes_per_second = total_bytes_copied / time_took
1013
1014    self.results[test_name]['time_took'] = time_took
1015    self.results[test_name]['total_bytes_copied'] = total_bytes_copied
1016    self.results[test_name]['bytes_per_second'] = bytes_per_second
1017
1018  def _RunListTests(self):
1019    """Runs eventual consistency listing latency tests."""
1020    self.results['listing'] = {'num_files': self.num_objects}
1021
1022    # Generate N random objects to put into the bucket.
1023    list_prefix = 'gsutil-perfdiag-list-'
1024    list_fpaths = []
1025    list_objects = []
1026    args = []
1027    for _ in xrange(self.num_objects):
1028      fpath = self._MakeTempFile(0, mem_data=True, mem_metadata=True,
1029                                 prefix=list_prefix)
1030      list_fpaths.append(fpath)
1031      object_name = os.path.basename(fpath)
1032      list_objects.append(object_name)
1033      args.append(FanUploadTuple(False, fpath, object_name, False))
1034      self.temporary_objects.add(object_name)
1035
1036    # Add the objects to the bucket.
1037    self.logger.info(
1038        '\nWriting %s objects for listing test...', self.num_objects)
1039
1040    self.Apply(_UploadObject, args, _PerfdiagExceptionHandler,
1041               arg_checker=DummyArgChecker)
1042
1043    list_latencies = []
1044    files_seen = []
1045    total_start_time = time.time()
1046    expected_objects = set(list_objects)
1047    found_objects = set()
1048
1049    def _List():
1050      """Lists and returns objects in the bucket. Also records latency."""
1051      t0 = time.time()
1052      objects = list(self.gsutil_api.ListObjects(
1053          self.bucket_url.bucket_name, delimiter='/',
1054          provider=self.provider, fields=['items/name']))
1055      t1 = time.time()
1056      list_latencies.append(t1 - t0)
1057      return set([obj.data.name for obj in objects])
1058
1059    self.logger.info(
1060        'Listing bucket %s waiting for %s objects to appear...',
1061        self.bucket_url.bucket_name, self.num_objects)
1062    while expected_objects - found_objects:
1063      def _ListAfterUpload():
1064        names = _List()
1065        found_objects.update(names & expected_objects)
1066        files_seen.append(len(found_objects))
1067      self._RunOperation(_ListAfterUpload)
1068      if expected_objects - found_objects:
1069        if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME:
1070          self.logger.warning('Maximum time reached waiting for listing.')
1071          break
1072    total_end_time = time.time()
1073
1074    self.results['listing']['insert'] = {
1075        'num_listing_calls': len(list_latencies),
1076        'list_latencies': list_latencies,
1077        'files_seen_after_listing': files_seen,
1078        'time_took': total_end_time - total_start_time,
1079    }
1080
1081    args = [object_name for object_name in list_objects]
1082    self.logger.info(
1083        'Deleting %s objects for listing test...', self.num_objects)
1084    self.Apply(_DeleteWrapper, args, _PerfdiagExceptionHandler,
1085               arg_checker=DummyArgChecker)
1086
1087    self.logger.info(
1088        'Listing bucket %s waiting for %s objects to disappear...',
1089        self.bucket_url.bucket_name, self.num_objects)
1090    list_latencies = []
1091    files_seen = []
1092    total_start_time = time.time()
1093    found_objects = set(list_objects)
1094    while found_objects:
1095      def _ListAfterDelete():
1096        names = _List()
1097        found_objects.intersection_update(names)
1098        files_seen.append(len(found_objects))
1099      self._RunOperation(_ListAfterDelete)
1100      if found_objects:
1101        if time.time() - total_start_time > self.MAX_LISTING_WAIT_TIME:
1102          self.logger.warning('Maximum time reached waiting for listing.')
1103          break
1104    total_end_time = time.time()
1105
1106    self.results['listing']['delete'] = {
1107        'num_listing_calls': len(list_latencies),
1108        'list_latencies': list_latencies,
1109        'files_seen_after_listing': files_seen,
1110        'time_took': total_end_time - total_start_time,
1111    }
1112
1113  def Upload(self, file_name, object_name, use_file=False, file_start=0,
1114             file_size=None):
1115    """Performs an upload to the test bucket.
1116
1117    The file is uploaded to the bucket referred to by self.bucket_url, and has
1118    name object_name.
1119
1120    Args:
1121      file_name: The path to the local file, and the key to its entry in
1122                 temp_file_dict.
1123      object_name: The name of the remote object.
1124      use_file: If true, use disk I/O, otherwise read everything from memory.
1125      file_start: The first byte in the file to upload to the object.
1126                  (only should be specified for sliced uploads)
1127      file_size: The size of the file to upload.
1128                 (only should be specified for sliced uploads)
1129
1130    Returns:
1131      Uploaded Object Metadata.
1132    """
1133    fp = None
1134    if file_size is None:
1135      file_size = temp_file_dict[file_name].size
1136
1137    upload_url = self.bucket_url.Clone()
1138    upload_url.object_name = object_name
1139    upload_target = StorageUrlToUploadObjectMetadata(upload_url)
1140
1141    try:
1142      if use_file:
1143        fp = FilePart(file_name, file_start, file_size)
1144      else:
1145        data = temp_file_dict[file_name].data[file_start:file_start+file_size]
1146        fp = cStringIO.StringIO(data)
1147
1148      def _InnerUpload():
1149        if file_size < ResumableThreshold():
1150          return self.gsutil_api.UploadObject(
1151              fp, upload_target, provider=self.provider, size=file_size,
1152              fields=['name', 'mediaLink', 'size'])
1153        else:
1154          return self.gsutil_api.UploadObjectResumable(
1155              fp, upload_target, provider=self.provider, size=file_size,
1156              fields=['name', 'mediaLink', 'size'],
1157              tracker_callback=_DummyTrackerCallback)
1158      return self._RunOperation(_InnerUpload)
1159    finally:
1160      if fp:
1161        fp.close()
1162
1163  def Download(self, object_name, file_name=None, serialization_data=None,
1164               start_byte=0, end_byte=None):
1165    """Downloads an object from the test bucket.
1166
1167    Args:
1168      object_name: The name of the object (in the test bucket) to download.
1169      file_name: Optional file name to write downloaded data to. If None,
1170                 downloaded data is discarded immediately.
1171      serialization_data: Optional serialization data, used so that we don't
1172                          have to get the metadata before downloading.
1173      start_byte: The first byte in the object to download.
1174                  (only should be specified for sliced downloads)
1175      end_byte: The last byte in the object to download.
1176                (only should be specified for sliced downloads)
1177    """
1178    fp = None
1179    try:
1180      if file_name is not None:
1181        fp = open(file_name, 'r+b')
1182        fp.seek(start_byte)
1183      else:
1184        fp = self.discard_sink
1185
1186      def _InnerDownload():
1187        self.gsutil_api.GetObjectMedia(
1188            self.bucket_url.bucket_name, object_name, fp,
1189            provider=self.provider, start_byte=start_byte, end_byte=end_byte,
1190            serialization_data=serialization_data)
1191      self._RunOperation(_InnerDownload)
1192    finally:
1193      if fp:
1194        fp.close()
1195
1196  def Delete(self, object_name):
1197    """Deletes an object from the test bucket.
1198
1199    Args:
1200      object_name: The name of the object to delete.
1201    """
1202    try:
1203      def _InnerDelete():
1204        self.gsutil_api.DeleteObject(self.bucket_url.bucket_name, object_name,
1205                                     provider=self.provider)
1206      self._RunOperation(_InnerDelete)
1207    except NotFoundException:
1208      pass
1209
1210  def _GetDiskCounters(self):
1211    """Retrieves disk I/O statistics for all disks.
1212
1213    Adapted from the psutil module's psutil._pslinux.disk_io_counters:
1214      http://code.google.com/p/psutil/source/browse/trunk/psutil/_pslinux.py
1215
1216    Originally distributed under under a BSD license.
1217    Original Copyright (c) 2009, Jay Loden, Dave Daeschler, Giampaolo Rodola.
1218
1219    Returns:
1220      A dictionary containing disk names mapped to the disk counters from
1221      /disk/diskstats.
1222    """
1223    # iostat documentation states that sectors are equivalent with blocks and
1224    # have a size of 512 bytes since 2.4 kernels. This value is needed to
1225    # calculate the amount of disk I/O in bytes.
1226    sector_size = 512
1227
1228    partitions = []
1229    with open('/proc/partitions', 'r') as f:
1230      lines = f.readlines()[2:]
1231      for line in lines:
1232        _, _, _, name = line.split()
1233        if name[-1].isdigit():
1234          partitions.append(name)
1235
1236    retdict = {}
1237    with open('/proc/diskstats', 'r') as f:
1238      for line in f:
1239        values = line.split()[:11]
1240        _, _, name, reads, _, rbytes, rtime, writes, _, wbytes, wtime = values
1241        if name in partitions:
1242          rbytes = int(rbytes) * sector_size
1243          wbytes = int(wbytes) * sector_size
1244          reads = int(reads)
1245          writes = int(writes)
1246          rtime = int(rtime)
1247          wtime = int(wtime)
1248          retdict[name] = (reads, writes, rbytes, wbytes, rtime, wtime)
1249    return retdict
1250
1251  def _GetTcpStats(self):
1252    """Tries to parse out TCP packet information from netstat output.
1253
1254    Returns:
1255       A dictionary containing TCP information, or None if netstat is not
1256       available.
1257    """
1258    # netstat return code is non-zero for -s on Linux, so don't raise on error.
1259    try:
1260      netstat_output = self._Exec(['netstat', '-s'], return_output=True,
1261                                  raise_on_error=False)
1262    except OSError:
1263      self.logger.warning('netstat not found on your system; some measurement '
1264                          'data will be missing')
1265      return None
1266    netstat_output = netstat_output.strip().lower()
1267    found_tcp = False
1268    tcp_retransmit = None
1269    tcp_received = None
1270    tcp_sent = None
1271    for line in netstat_output.split('\n'):
1272      # Header for TCP section is "Tcp:" in Linux/Mac and
1273      # "TCP Statistics for" in Windows.
1274      if 'tcp:' in line or 'tcp statistics' in line:
1275        found_tcp = True
1276
1277      # Linux == "segments retransmited" (sic), Mac == "retransmit timeouts"
1278      # Windows == "segments retransmitted".
1279      if (found_tcp and tcp_retransmit is None and
1280          ('segments retransmited' in line or 'retransmit timeouts' in line or
1281           'segments retransmitted' in line)):
1282        tcp_retransmit = ''.join(c for c in line if c in string.digits)
1283
1284      # Linux+Windows == "segments received", Mac == "packets received".
1285      if (found_tcp and tcp_received is None and
1286          ('segments received' in line or 'packets received' in line)):
1287        tcp_received = ''.join(c for c in line if c in string.digits)
1288
1289      # Linux == "segments send out" (sic), Mac+Windows == "packets sent".
1290      if (found_tcp and tcp_sent is None and
1291          ('segments send out' in line or 'packets sent' in line or
1292           'segments sent' in line)):
1293        tcp_sent = ''.join(c for c in line if c in string.digits)
1294
1295    result = {}
1296    try:
1297      result['tcp_retransmit'] = int(tcp_retransmit)
1298      result['tcp_received'] = int(tcp_received)
1299      result['tcp_sent'] = int(tcp_sent)
1300    except (ValueError, TypeError):
1301      result['tcp_retransmit'] = None
1302      result['tcp_received'] = None
1303      result['tcp_sent'] = None
1304
1305    return result
1306
1307  def _CollectSysInfo(self):
1308    """Collects system information."""
1309    sysinfo = {}
1310
1311    # All exceptions that might be raised from socket module calls.
1312    socket_errors = (
1313        socket.error, socket.herror, socket.gaierror, socket.timeout)
1314
1315    # Find out whether HTTPS is enabled in Boto.
1316    sysinfo['boto_https_enabled'] = boto.config.get('Boto', 'is_secure', True)
1317
1318    # Look up proxy info.
1319    proxy_host = boto.config.get('Boto', 'proxy', None)
1320    proxy_port = boto.config.getint('Boto', 'proxy_port', 0)
1321    sysinfo['using_proxy'] = bool(proxy_host)
1322
1323    if boto.config.get('Boto', 'proxy_rdns', False):
1324      self.logger.info('DNS lookups are disallowed in this environment, so '
1325                       'some information is not included in this perfdiag run.')
1326
1327    # Get the local IP address from socket lib.
1328    try:
1329      sysinfo['ip_address'] = socket.gethostbyname(socket.gethostname())
1330    except socket_errors:
1331      sysinfo['ip_address'] = ''
1332    # Record the temporary directory used since it can affect performance, e.g.
1333    # when on a networked filesystem.
1334    sysinfo['tempdir'] = self.directory
1335
1336    # Produces an RFC 2822 compliant GMT timestamp.
1337    sysinfo['gmt_timestamp'] = time.strftime('%a, %d %b %Y %H:%M:%S +0000',
1338                                             time.gmtime())
1339
1340    # Execute a CNAME lookup on Google DNS to find what Google server
1341    # it's routing to.
1342    cmd = ['nslookup', '-type=CNAME', self.XML_API_HOST]
1343    try:
1344      nslookup_cname_output = self._Exec(cmd, return_output=True)
1345      m = re.search(r' = (?P<googserv>[^.]+)\.', nslookup_cname_output)
1346      sysinfo['googserv_route'] = m.group('googserv') if m else None
1347    except (CommandException, OSError):
1348      sysinfo['googserv_route'] = ''
1349
1350    # Try to determine the latency of a DNS lookup for the Google hostname
1351    # endpoint. Note: we don't piggyback on gethostbyname_ex below because
1352    # the _ex version requires an extra RTT.
1353    try:
1354      t0 = time.time()
1355      socket.gethostbyname(self.XML_API_HOST)
1356      t1 = time.time()
1357      sysinfo['google_host_dns_latency'] = t1 - t0
1358    except socket_errors:
1359      pass
1360
1361    # Look up IP addresses for Google Server.
1362    try:
1363      (hostname, _, ipaddrlist) = socket.gethostbyname_ex(self.XML_API_HOST)
1364      sysinfo['googserv_ips'] = ipaddrlist
1365    except socket_errors:
1366      ipaddrlist = []
1367      sysinfo['googserv_ips'] = []
1368
1369    # Reverse lookup the hostnames for the Google Server IPs.
1370    sysinfo['googserv_hostnames'] = []
1371    for googserv_ip in ipaddrlist:
1372      try:
1373        (hostname, _, ipaddrlist) = socket.gethostbyaddr(googserv_ip)
1374        sysinfo['googserv_hostnames'].append(hostname)
1375      except socket_errors:
1376        pass
1377
1378    # Query o-o to find out what the Google DNS thinks is the user's IP.
1379    try:
1380      cmd = ['nslookup', '-type=TXT', 'o-o.myaddr.google.com.']
1381      nslookup_txt_output = self._Exec(cmd, return_output=True)
1382      m = re.search(r'text\s+=\s+"(?P<dnsip>[\.\d]+)"', nslookup_txt_output)
1383      sysinfo['dns_o-o_ip'] = m.group('dnsip') if m else None
1384    except (CommandException, OSError):
1385      sysinfo['dns_o-o_ip'] = ''
1386
1387    # Try to determine the latency of connecting to the Google hostname
1388    # endpoint.
1389    sysinfo['google_host_connect_latencies'] = {}
1390    for googserv_ip in ipaddrlist:
1391      try:
1392        sock = socket.socket()
1393        t0 = time.time()
1394        sock.connect((googserv_ip, self.XML_API_PORT))
1395        t1 = time.time()
1396        sysinfo['google_host_connect_latencies'][googserv_ip] = t1 - t0
1397      except socket_errors:
1398        pass
1399
1400    # If using a proxy, try to determine the latency of a DNS lookup to resolve
1401    # the proxy hostname and the latency of connecting to the proxy.
1402    if proxy_host:
1403      proxy_ip = None
1404      try:
1405        t0 = time.time()
1406        proxy_ip = socket.gethostbyname(proxy_host)
1407        t1 = time.time()
1408        sysinfo['proxy_dns_latency'] = t1 - t0
1409      except socket_errors:
1410        pass
1411
1412      try:
1413        sock = socket.socket()
1414        t0 = time.time()
1415        sock.connect((proxy_ip or proxy_host, proxy_port))
1416        t1 = time.time()
1417        sysinfo['proxy_host_connect_latency'] = t1 - t0
1418      except socket_errors:
1419        pass
1420
1421    # Try and find the number of CPUs in the system if available.
1422    try:
1423      sysinfo['cpu_count'] = multiprocessing.cpu_count()
1424    except NotImplementedError:
1425      sysinfo['cpu_count'] = None
1426
1427    # For *nix platforms, obtain the CPU load.
1428    try:
1429      sysinfo['load_avg'] = list(os.getloadavg())
1430    except (AttributeError, OSError):
1431      sysinfo['load_avg'] = None
1432
1433    # Try and collect memory information from /proc/meminfo if possible.
1434    mem_total = None
1435    mem_free = None
1436    mem_buffers = None
1437    mem_cached = None
1438
1439    try:
1440      with open('/proc/meminfo', 'r') as f:
1441        for line in f:
1442          if line.startswith('MemTotal'):
1443            mem_total = (int(''.join(c for c in line if c in string.digits))
1444                         * 1000)
1445          elif line.startswith('MemFree'):
1446            mem_free = (int(''.join(c for c in line if c in string.digits))
1447                        * 1000)
1448          elif line.startswith('Buffers'):
1449            mem_buffers = (int(''.join(c for c in line if c in string.digits))
1450                           * 1000)
1451          elif line.startswith('Cached'):
1452            mem_cached = (int(''.join(c for c in line if c in string.digits))
1453                          * 1000)
1454    except (IOError, ValueError):
1455      pass
1456
1457    sysinfo['meminfo'] = {'mem_total': mem_total,
1458                          'mem_free': mem_free,
1459                          'mem_buffers': mem_buffers,
1460                          'mem_cached': mem_cached}
1461
1462    # Get configuration attributes from config module.
1463    sysinfo['gsutil_config'] = {}
1464    for attr in dir(config):
1465      attr_value = getattr(config, attr)
1466      # Filter out multiline strings that are not useful.
1467      if attr.isupper() and not (isinstance(attr_value, basestring) and
1468                                 '\n' in attr_value):
1469        sysinfo['gsutil_config'][attr] = attr_value
1470
1471    sysinfo['tcp_proc_values'] = {}
1472    stats_to_check = [
1473        '/proc/sys/net/core/rmem_default',
1474        '/proc/sys/net/core/rmem_max',
1475        '/proc/sys/net/core/wmem_default',
1476        '/proc/sys/net/core/wmem_max',
1477        '/proc/sys/net/ipv4/tcp_timestamps',
1478        '/proc/sys/net/ipv4/tcp_sack',
1479        '/proc/sys/net/ipv4/tcp_window_scaling',
1480    ]
1481    for fname in stats_to_check:
1482      try:
1483        with open(fname, 'r') as f:
1484          value = f.read()
1485        sysinfo['tcp_proc_values'][os.path.basename(fname)] = value.strip()
1486      except IOError:
1487        pass
1488
1489    self.results['sysinfo'] = sysinfo
1490
1491  def _DisplayStats(self, trials):
1492    """Prints out mean, standard deviation, median, and 90th percentile."""
1493    n = len(trials)
1494    mean = float(sum(trials)) / n
1495    stdev = math.sqrt(sum((x - mean)**2 for x in trials) / n)
1496
1497    print str(n).rjust(6), '',
1498    print ('%.1f' % (mean * 1000)).rjust(9), '',
1499    print ('%.1f' % (stdev * 1000)).rjust(12), '',
1500    print ('%.1f' % (Percentile(trials, 0.5) * 1000)).rjust(11), '',
1501    print ('%.1f' % (Percentile(trials, 0.9) * 1000)).rjust(11), ''
1502
1503  def _DisplayResults(self):
1504    """Displays results collected from diagnostic run."""
1505    print
1506    print '=' * 78
1507    print 'DIAGNOSTIC RESULTS'.center(78)
1508    print '=' * 78
1509
1510    if 'latency' in self.results:
1511      print
1512      print '-' * 78
1513      print 'Latency'.center(78)
1514      print '-' * 78
1515      print ('Operation       Size  Trials  Mean (ms)  Std Dev (ms)  '
1516             'Median (ms)  90th % (ms)')
1517      print ('=========  =========  ======  =========  ============  '
1518             '===========  ===========')
1519      for key in sorted(self.results['latency']):
1520        trials = sorted(self.results['latency'][key])
1521        op, numbytes = key.split('_')
1522        numbytes = int(numbytes)
1523        if op == 'METADATA':
1524          print 'Metadata'.rjust(9), '',
1525          print MakeHumanReadable(numbytes).rjust(9), '',
1526          self._DisplayStats(trials)
1527        if op == 'DOWNLOAD':
1528          print 'Download'.rjust(9), '',
1529          print MakeHumanReadable(numbytes).rjust(9), '',
1530          self._DisplayStats(trials)
1531        if op == 'UPLOAD':
1532          print 'Upload'.rjust(9), '',
1533          print MakeHumanReadable(numbytes).rjust(9), '',
1534          self._DisplayStats(trials)
1535        if op == 'DELETE':
1536          print 'Delete'.rjust(9), '',
1537          print MakeHumanReadable(numbytes).rjust(9), '',
1538          self._DisplayStats(trials)
1539
1540    if 'write_throughput' in self.results:
1541      print
1542      print '-' * 78
1543      print 'Write Throughput'.center(78)
1544      print '-' * 78
1545      write_thru = self.results['write_throughput']
1546      print 'Copied %s %s file(s) for a total transfer size of %s.' % (
1547          self.num_objects,
1548          MakeHumanReadable(write_thru['file_size']),
1549          MakeHumanReadable(write_thru['total_bytes_copied']))
1550      print 'Write throughput: %s/s.' % (
1551          MakeBitsHumanReadable(write_thru['bytes_per_second'] * 8))
1552      print 'Parallelism strategy: %s' % write_thru['parallelism']
1553
1554    if 'write_throughput_file' in self.results:
1555      print
1556      print '-' * 78
1557      print 'Write Throughput With File I/O'.center(78)
1558      print '-' * 78
1559      write_thru_file = self.results['write_throughput_file']
1560      print 'Copied %s %s file(s) for a total transfer size of %s.' % (
1561          self.num_objects,
1562          MakeHumanReadable(write_thru_file['file_size']),
1563          MakeHumanReadable(write_thru_file['total_bytes_copied']))
1564      print 'Write throughput: %s/s.' % (
1565          MakeBitsHumanReadable(write_thru_file['bytes_per_second'] * 8))
1566      print 'Parallelism strategy: %s' % write_thru_file['parallelism']
1567
1568    if 'read_throughput' in self.results:
1569      print
1570      print '-' * 78
1571      print 'Read Throughput'.center(78)
1572      print '-' * 78
1573      read_thru = self.results['read_throughput']
1574      print 'Copied %s %s file(s) for a total transfer size of %s.' % (
1575          self.num_objects,
1576          MakeHumanReadable(read_thru['file_size']),
1577          MakeHumanReadable(read_thru['total_bytes_copied']))
1578      print 'Read throughput: %s/s.' % (
1579          MakeBitsHumanReadable(read_thru['bytes_per_second'] * 8))
1580      print 'Parallelism strategy: %s' % read_thru['parallelism']
1581
1582    if 'read_throughput_file' in self.results:
1583      print
1584      print '-' * 78
1585      print 'Read Throughput With File I/O'.center(78)
1586      print '-' * 78
1587      read_thru_file = self.results['read_throughput_file']
1588      print 'Copied %s %s file(s) for a total transfer size of %s.' % (
1589          self.num_objects,
1590          MakeHumanReadable(read_thru_file['file_size']),
1591          MakeHumanReadable(read_thru_file['total_bytes_copied']))
1592      print 'Read throughput: %s/s.' % (
1593          MakeBitsHumanReadable(read_thru_file['bytes_per_second'] * 8))
1594      print 'Parallelism strategy: %s' % read_thru_file['parallelism']
1595
1596    if 'listing' in self.results:
1597      print
1598      print '-' * 78
1599      print 'Listing'.center(78)
1600      print '-' * 78
1601
1602      listing = self.results['listing']
1603      insert = listing['insert']
1604      delete = listing['delete']
1605      print 'After inserting %s objects:' % listing['num_files']
1606      print ('  Total time for objects to appear: %.2g seconds' %
1607             insert['time_took'])
1608      print '  Number of listing calls made: %s' % insert['num_listing_calls']
1609      print ('  Individual listing call latencies: [%s]' %
1610             ', '.join('%.2gs' % lat for lat in insert['list_latencies']))
1611      print ('  Files reflected after each call: [%s]' %
1612             ', '.join(map(str, insert['files_seen_after_listing'])))
1613
1614      print 'After deleting %s objects:' % listing['num_files']
1615      print ('  Total time for objects to appear: %.2g seconds' %
1616             delete['time_took'])
1617      print '  Number of listing calls made: %s' % delete['num_listing_calls']
1618      print ('  Individual listing call latencies: [%s]' %
1619             ', '.join('%.2gs' % lat for lat in delete['list_latencies']))
1620      print ('  Files reflected after each call: [%s]' %
1621             ', '.join(map(str, delete['files_seen_after_listing'])))
1622
1623    if 'sysinfo' in self.results:
1624      print
1625      print '-' * 78
1626      print 'System Information'.center(78)
1627      print '-' * 78
1628      info = self.results['sysinfo']
1629      print 'IP Address: \n  %s' % info['ip_address']
1630      print 'Temporary Directory: \n  %s' % info['tempdir']
1631      print 'Bucket URI: \n  %s' % self.results['bucket_uri']
1632      print 'gsutil Version: \n  %s' % self.results.get('gsutil_version',
1633                                                        'Unknown')
1634      print 'boto Version: \n  %s' % self.results.get('boto_version', 'Unknown')
1635
1636      if 'gmt_timestamp' in info:
1637        ts_string = info['gmt_timestamp']
1638        timetuple = None
1639        try:
1640          # Convert RFC 2822 string to Linux timestamp.
1641          timetuple = time.strptime(ts_string, '%a, %d %b %Y %H:%M:%S +0000')
1642        except ValueError:
1643          pass
1644
1645        if timetuple:
1646          # Converts the GMT time tuple to local Linux timestamp.
1647          localtime = calendar.timegm(timetuple)
1648          localdt = datetime.datetime.fromtimestamp(localtime)
1649          print 'Measurement time: \n %s' % localdt.strftime(
1650              '%Y-%m-%d %I:%M:%S %p %Z')
1651
1652      print 'Google Server: \n  %s' % info['googserv_route']
1653      print ('Google Server IP Addresses: \n  %s' %
1654             ('\n  '.join(info['googserv_ips'])))
1655      print ('Google Server Hostnames: \n  %s' %
1656             ('\n  '.join(info['googserv_hostnames'])))
1657      print 'Google DNS thinks your IP is: \n  %s' % info['dns_o-o_ip']
1658      print 'CPU Count: \n  %s' % info['cpu_count']
1659      print 'CPU Load Average: \n  %s' % info['load_avg']
1660      try:
1661        print ('Total Memory: \n  %s' %
1662               MakeHumanReadable(info['meminfo']['mem_total']))
1663        # Free memory is really MemFree + Buffers + Cached.
1664        print 'Free Memory: \n  %s' % MakeHumanReadable(
1665            info['meminfo']['mem_free'] +
1666            info['meminfo']['mem_buffers'] +
1667            info['meminfo']['mem_cached'])
1668      except TypeError:
1669        pass
1670
1671      if 'netstat_end' in info and 'netstat_start' in info:
1672        netstat_after = info['netstat_end']
1673        netstat_before = info['netstat_start']
1674        for tcp_type in ('sent', 'received', 'retransmit'):
1675          try:
1676            delta = (netstat_after['tcp_%s' % tcp_type] -
1677                     netstat_before['tcp_%s' % tcp_type])
1678            print 'TCP segments %s during test:\n  %d' % (tcp_type, delta)
1679          except TypeError:
1680            pass
1681      else:
1682        print ('TCP segment counts not available because "netstat" was not '
1683               'found during test runs')
1684
1685      if 'disk_counters_end' in info and 'disk_counters_start' in info:
1686        print 'Disk Counter Deltas:\n',
1687        disk_after = info['disk_counters_end']
1688        disk_before = info['disk_counters_start']
1689        print '', 'disk'.rjust(6),
1690        for colname in ['reads', 'writes', 'rbytes', 'wbytes', 'rtime',
1691                        'wtime']:
1692          print colname.rjust(8),
1693        print
1694        for diskname in sorted(disk_after):
1695          before = disk_before[diskname]
1696          after = disk_after[diskname]
1697          (reads1, writes1, rbytes1, wbytes1, rtime1, wtime1) = before
1698          (reads2, writes2, rbytes2, wbytes2, rtime2, wtime2) = after
1699          print '', diskname.rjust(6),
1700          deltas = [reads2-reads1, writes2-writes1, rbytes2-rbytes1,
1701                    wbytes2-wbytes1, rtime2-rtime1, wtime2-wtime1]
1702          for delta in deltas:
1703            print str(delta).rjust(8),
1704          print
1705
1706      if 'tcp_proc_values' in info:
1707        print 'TCP /proc values:\n',
1708        for item in info['tcp_proc_values'].iteritems():
1709          print '   %s = %s' % item
1710
1711      if 'boto_https_enabled' in info:
1712        print 'Boto HTTPS Enabled: \n  %s' % info['boto_https_enabled']
1713
1714      if 'using_proxy' in info:
1715        print 'Requests routed through proxy: \n  %s' % info['using_proxy']
1716
1717      if 'google_host_dns_latency' in info:
1718        print ('Latency of the DNS lookup for Google Storage server (ms): '
1719               '\n  %.1f' % (info['google_host_dns_latency'] * 1000.0))
1720
1721      if 'google_host_connect_latencies' in info:
1722        print 'Latencies connecting to Google Storage server IPs (ms):'
1723        for ip, latency in info['google_host_connect_latencies'].iteritems():
1724          print '  %s = %.1f' % (ip, latency * 1000.0)
1725
1726      if 'proxy_dns_latency' in info:
1727        print ('Latency of the DNS lookup for the configured proxy (ms): '
1728               '\n  %.1f' % (info['proxy_dns_latency'] * 1000.0))
1729
1730      if 'proxy_host_connect_latency' in info:
1731        print ('Latency connecting to the configured proxy (ms): \n  %.1f' %
1732               (info['proxy_host_connect_latency'] * 1000.0))
1733
1734    if 'request_errors' in self.results and 'total_requests' in self.results:
1735      print
1736      print '-' * 78
1737      print 'In-Process HTTP Statistics'.center(78)
1738      print '-' * 78
1739      total = int(self.results['total_requests'])
1740      numerrors = int(self.results['request_errors'])
1741      numbreaks = int(self.results['connection_breaks'])
1742      availability = (((total - numerrors) / float(total)) * 100
1743                      if total > 0 else 100)
1744      print 'Total HTTP requests made: %d' % total
1745      print 'HTTP 5xx errors: %d' % numerrors
1746      print 'HTTP connections broken: %d' % numbreaks
1747      print 'Availability: %.7g%%' % availability
1748      if 'error_responses_by_code' in self.results:
1749        sorted_codes = sorted(
1750            self.results['error_responses_by_code'].iteritems())
1751        if sorted_codes:
1752          print 'Error responses by code:'
1753          print '\n'.join('  %s: %s' % c for c in sorted_codes)
1754
1755    if self.output_file:
1756      with open(self.output_file, 'w') as f:
1757        json.dump(self.results, f, indent=2)
1758      print
1759      print "Output file written to '%s'." % self.output_file
1760
1761    print
1762
1763  def _ParsePositiveInteger(self, val, msg):
1764    """Tries to convert val argument to a positive integer.
1765
1766    Args:
1767      val: The value (as a string) to convert to a positive integer.
1768      msg: The error message to place in the CommandException on an error.
1769
1770    Returns:
1771      A valid positive integer.
1772
1773    Raises:
1774      CommandException: If the supplied value is not a valid positive integer.
1775    """
1776    try:
1777      val = int(val)
1778      if val < 1:
1779        raise CommandException(msg)
1780      return val
1781    except ValueError:
1782      raise CommandException(msg)
1783
1784  def _ParseArgs(self):
1785    """Parses arguments for perfdiag command."""
1786    # From -n.
1787    self.num_objects = 5
1788    # From -c.
1789    self.processes = 1
1790    # From -k.
1791    self.threads = 1
1792    # From -p
1793    self.parallel_strategy = None
1794    # From -y
1795    self.num_slices = 4
1796    # From -s.
1797    self.thru_filesize = 1048576
1798    # From -d.
1799    self.directory = tempfile.gettempdir()
1800    # Keep track of whether or not to delete the directory upon completion.
1801    self.delete_directory = False
1802    # From -t.
1803    self.diag_tests = set(self.DEFAULT_DIAG_TESTS)
1804    # From -o.
1805    self.output_file = None
1806    # From -i.
1807    self.input_file = None
1808    # From -m.
1809    self.metadata_keys = {}
1810
1811    if self.sub_opts:
1812      for o, a in self.sub_opts:
1813        if o == '-n':
1814          self.num_objects = self._ParsePositiveInteger(
1815              a, 'The -n parameter must be a positive integer.')
1816        if o == '-c':
1817          self.processes = self._ParsePositiveInteger(
1818              a, 'The -c parameter must be a positive integer.')
1819        if o == '-k':
1820          self.threads = self._ParsePositiveInteger(
1821              a, 'The -k parameter must be a positive integer.')
1822        if o == '-p':
1823          if a.lower() in self.PARALLEL_STRATEGIES:
1824            self.parallel_strategy = a.lower()
1825          else:
1826            raise CommandException(
1827                "'%s' is not a valid parallelism strategy." % a)
1828        if o == '-y':
1829          self.num_slices = self._ParsePositiveInteger(
1830              a, 'The -y parameter must be a positive integer.')
1831        if o == '-s':
1832          try:
1833            self.thru_filesize = HumanReadableToBytes(a)
1834          except ValueError:
1835            raise CommandException('Invalid -s parameter.')
1836        if o == '-d':
1837          self.directory = a
1838          if not os.path.exists(self.directory):
1839            self.delete_directory = True
1840            os.makedirs(self.directory)
1841        if o == '-t':
1842          self.diag_tests = set()
1843          for test_name in a.strip().split(','):
1844            if test_name.lower() not in self.ALL_DIAG_TESTS:
1845              raise CommandException("List of test names (-t) contains invalid "
1846                                     "test name '%s'." % test_name)
1847            self.diag_tests.add(test_name)
1848        if o == '-m':
1849          pieces = a.split(':')
1850          if len(pieces) != 2:
1851            raise CommandException(
1852                "Invalid metadata key-value combination '%s'." % a)
1853          key, value = pieces
1854          self.metadata_keys[key] = value
1855        if o == '-o':
1856          self.output_file = os.path.abspath(a)
1857        if o == '-i':
1858          self.input_file = os.path.abspath(a)
1859          if not os.path.isfile(self.input_file):
1860            raise CommandException("Invalid input file (-i): '%s'." % a)
1861          try:
1862            with open(self.input_file, 'r') as f:
1863              self.results = json.load(f)
1864              self.logger.info("Read input file: '%s'.", self.input_file)
1865          except ValueError:
1866            raise CommandException("Could not decode input file (-i): '%s'." %
1867                                   a)
1868          return
1869
1870    # If parallelism is specified, default parallelism strategy to fan.
1871    if (self.processes > 1 or self.threads > 1) and not self.parallel_strategy:
1872      self.parallel_strategy = self.FAN
1873    elif self.processes == 1 and self.threads == 1 and self.parallel_strategy:
1874      raise CommandException(
1875          'Cannot specify parallelism strategy (-p) without also specifying '
1876          'multiple threads and/or processes (-c and/or -k).')
1877
1878    if not self.args:
1879      self.RaiseWrongNumberOfArgumentsException()
1880
1881    self.bucket_url = StorageUrlFromString(self.args[0])
1882    self.provider = self.bucket_url.scheme
1883    if not self.bucket_url.IsCloudUrl() and self.bucket_url.IsBucket():
1884      raise CommandException('The perfdiag command requires a URL that '
1885                             'specifies a bucket.\n"%s" is not '
1886                             'valid.' % self.args[0])
1887
1888    if (self.thru_filesize > HumanReadableToBytes('2GiB') and
1889        (self.RTHRU in self.diag_tests or self.WTHRU in self.diag_tests)):
1890      raise CommandException(
1891          'For in-memory tests maximum file size is 2GiB. For larger file '
1892          'sizes, specify rthru_file and/or wthru_file with the -t option.')
1893
1894    perform_slice = self.parallel_strategy in (self.SLICE, self.BOTH)
1895    slice_not_available = (
1896        self.provider == 's3' and self.diag_tests.intersection(self.WTHRU,
1897                                                               self.WTHRU_FILE))
1898    if perform_slice and slice_not_available:
1899      raise CommandException('Sliced uploads are not available for s3. '
1900                             'Use -p fan or sequential uploads for s3.')
1901
1902    # Ensure the bucket exists.
1903    self.gsutil_api.GetBucket(self.bucket_url.bucket_name,
1904                              provider=self.bucket_url.scheme,
1905                              fields=['id'])
1906    self.exceptions = [httplib.HTTPException, socket.error, socket.gaierror,
1907                       socket.timeout, httplib.BadStatusLine,
1908                       ServiceException]
1909
1910  # Command entry point.
1911  def RunCommand(self):
1912    """Called by gsutil when the command is being invoked."""
1913    self._ParseArgs()
1914
1915    if self.input_file:
1916      self._DisplayResults()
1917      return 0
1918
1919    # We turn off retries in the underlying boto library because the
1920    # _RunOperation function handles errors manually so it can count them.
1921    boto.config.set('Boto', 'num_retries', '0')
1922
1923    self.logger.info(
1924        'Number of iterations to run: %d\n'
1925        'Base bucket URI: %s\n'
1926        'Number of processes: %d\n'
1927        'Number of threads: %d\n'
1928        'Parallelism strategy: %s\n'
1929        'Throughput file size: %s\n'
1930        'Diagnostics to run: %s',
1931        self.num_objects,
1932        self.bucket_url,
1933        self.processes,
1934        self.threads,
1935        self.parallel_strategy,
1936        MakeHumanReadable(self.thru_filesize),
1937        (', '.join(self.diag_tests)))
1938
1939    try:
1940      self._SetUp()
1941
1942      # Collect generic system info.
1943      self._CollectSysInfo()
1944      # Collect netstat info and disk counters before tests (and again later).
1945      netstat_output = self._GetTcpStats()
1946      if netstat_output:
1947        self.results['sysinfo']['netstat_start'] = netstat_output
1948      if IS_LINUX:
1949        self.results['sysinfo']['disk_counters_start'] = self._GetDiskCounters()
1950      # Record bucket URL.
1951      self.results['bucket_uri'] = str(self.bucket_url)
1952      self.results['json_format'] = 'perfdiag'
1953      self.results['metadata'] = self.metadata_keys
1954
1955      if self.LAT in self.diag_tests:
1956        self._RunLatencyTests()
1957      if self.RTHRU in self.diag_tests:
1958        self._RunReadThruTests()
1959      # Run WTHRU_FILE before RTHRU_FILE. If data is created in WTHRU_FILE it
1960      # will be used in RTHRU_FILE to save time and bandwidth.
1961      if self.WTHRU_FILE in self.diag_tests:
1962        self._RunWriteThruTests(use_file=True)
1963      if self.RTHRU_FILE in self.diag_tests:
1964        self._RunReadThruTests(use_file=True)
1965      if self.WTHRU in self.diag_tests:
1966        self._RunWriteThruTests()
1967      if self.LIST in self.diag_tests:
1968        self._RunListTests()
1969
1970      # Collect netstat info and disk counters after tests.
1971      netstat_output = self._GetTcpStats()
1972      if netstat_output:
1973        self.results['sysinfo']['netstat_end'] = netstat_output
1974      if IS_LINUX:
1975        self.results['sysinfo']['disk_counters_end'] = self._GetDiskCounters()
1976
1977      self.results['total_requests'] = self.total_requests
1978      self.results['request_errors'] = self.request_errors
1979      self.results['error_responses_by_code'] = self.error_responses_by_code
1980      self.results['connection_breaks'] = self.connection_breaks
1981      self.results['gsutil_version'] = gslib.VERSION
1982      self.results['boto_version'] = boto.__version__
1983
1984      self._TearDown()
1985      self._DisplayResults()
1986    finally:
1987      # TODO: Install signal handlers so this is performed in response to a
1988      # terminating signal; consider multi-threaded object deletes during
1989      # cleanup so it happens quickly.
1990      self._TearDown()
1991
1992    return 0
1993
1994
1995def StorageUrlToUploadObjectMetadata(storage_url):
1996  if storage_url.IsCloudUrl() and storage_url.IsObject():
1997    upload_target = apitools_messages.Object()
1998    upload_target.name = storage_url.object_name
1999    upload_target.bucket = storage_url.bucket_name
2000    return upload_target
2001  else:
2002    raise CommandException('Non-cloud URL upload target %s was created in '
2003                           'perfdiag implemenation.' % storage_url)
2004