1# Copyright (c) 2015 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4import argparse
5import json
6import logging
7import os
8import shutil
9import tempfile
10
11from perf_insights import cloud_storage
12from perf_insights import map_runner
13from perf_insights import function_handle
14from perf_insights.mre import file_handle as file_handle_module
15from perf_insights.results import json_output_formatter
16
17
18_DEFAULT_PARALLEL_DOWNLOADS = 16
19_DEFAULT_DESCRIPTION = """
20Entry point for the cloud mapper. Please consider using
21perf_insights/bin/map_traces for normal development."""
22
23
24def _ReadMapperGCSFile(url):
25  file_handle, file_name = tempfile.mkstemp()
26  try:
27    cloud_storage.Copy(url, file_name)
28  except cloud_storage.CloudStorageError as e:
29    logging.info("Failed to copy: %s" % e)
30    os.close(file_handle)
31    os.unlink(file_name)
32    file_name = None
33  return file_name
34
35
36def _ReadTracesGCSFile(url):
37  file_handle, file_name = tempfile.mkstemp()
38  file_urls = []
39  try:
40    cloud_storage.Copy(url, file_name)
41    with open(file_name, 'r') as f:
42      file_urls = json.loads(f.read())
43  except cloud_storage.CloudStorageError as e:
44    logging.info("Failed to copy: %s" % e)
45  finally:
46    os.close(file_handle)
47    os.unlink(file_name)
48  return file_urls
49
50
51def _DownloadTraceHandles(url, temp_directory):
52  trace_urls = _ReadTracesGCSFile(url)
53
54  trace_handles = []
55  for trace_url in trace_urls:
56    th = file_handle_module.GCSFileHandle(trace_url, temp_directory)
57    trace_handles.append(th)
58  return trace_handles
59
60
61def Main(argv):
62  parser = argparse.ArgumentParser(description=_DEFAULT_DESCRIPTION)
63  parser.add_argument('map_file_url')
64  parser.add_argument('map_function_name')
65  parser.add_argument('input_url')
66  parser.add_argument('output_url')
67  parser.add_argument('--jobs', type=int, default=1)
68
69  args = parser.parse_args(argv[1:])
70
71  map_file = _ReadMapperGCSFile(args.map_file_url)
72  if not map_file:
73    parser.error('Map does not exist.')
74
75  if not args.map_function_name:
76    parser.error('Must provide map function name.')
77
78  temp_directory = tempfile.mkdtemp()
79  _, file_name = tempfile.mkstemp()
80  ofile = open(file_name, 'w')
81
82  try:
83    output_formatter = json_output_formatter.JSONOutputFormatter(ofile)
84    map_function_module = function_handle.ModuleToLoad(
85        filename=os.path.abspath(map_file))
86    map_function_handle = function_handle.FunctionHandle(
87        modules_to_load=[map_function_module],
88        function_name=args.map_function_name)
89
90    trace_handles = _DownloadTraceHandles(args.input_url, temp_directory)
91    runner = map_runner.MapRunner(trace_handles, map_function_handle,
92                                  jobs=args.jobs,
93                                  output_formatters=[output_formatter])
94    results = runner.Run()
95
96    if args.map_function_handle:
97      results = runner.RunMapper()
98    elif args.reduce_function_handle:
99      results = runner.RunReducer(trace_handles)
100
101    output_formatter.Format(results)
102
103    return results
104  finally:
105    ofile.close()
106    os.unlink(map_file)
107    shutil.rmtree(temp_directory)
108