1# Copyright (c) 2015 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4import argparse 5import json 6import logging 7import os 8import shutil 9import tempfile 10 11from perf_insights import cloud_storage 12from perf_insights import map_runner 13from perf_insights import function_handle 14from perf_insights.mre import file_handle as file_handle_module 15from perf_insights.results import json_output_formatter 16 17 18_DEFAULT_PARALLEL_DOWNLOADS = 16 19_DEFAULT_DESCRIPTION = """ 20Entry point for the cloud mapper. Please consider using 21perf_insights/bin/map_traces for normal development.""" 22 23 24def _ReadMapperGCSFile(url): 25 file_handle, file_name = tempfile.mkstemp() 26 try: 27 cloud_storage.Copy(url, file_name) 28 except cloud_storage.CloudStorageError as e: 29 logging.info("Failed to copy: %s" % e) 30 os.close(file_handle) 31 os.unlink(file_name) 32 file_name = None 33 return file_name 34 35 36def _ReadTracesGCSFile(url): 37 file_handle, file_name = tempfile.mkstemp() 38 file_urls = [] 39 try: 40 cloud_storage.Copy(url, file_name) 41 with open(file_name, 'r') as f: 42 file_urls = json.loads(f.read()) 43 except cloud_storage.CloudStorageError as e: 44 logging.info("Failed to copy: %s" % e) 45 finally: 46 os.close(file_handle) 47 os.unlink(file_name) 48 return file_urls 49 50 51def _DownloadTraceHandles(url, temp_directory): 52 trace_urls = _ReadTracesGCSFile(url) 53 54 trace_handles = [] 55 for trace_url in trace_urls: 56 th = file_handle_module.GCSFileHandle(trace_url, temp_directory) 57 trace_handles.append(th) 58 return trace_handles 59 60 61def Main(argv): 62 parser = argparse.ArgumentParser(description=_DEFAULT_DESCRIPTION) 63 parser.add_argument('map_file_url') 64 parser.add_argument('map_function_name') 65 parser.add_argument('input_url') 66 parser.add_argument('output_url') 67 parser.add_argument('--jobs', type=int, default=1) 68 69 args = parser.parse_args(argv[1:]) 70 71 map_file = _ReadMapperGCSFile(args.map_file_url) 72 if not map_file: 73 parser.error('Map does not exist.') 74 75 if not args.map_function_name: 76 parser.error('Must provide map function name.') 77 78 temp_directory = tempfile.mkdtemp() 79 _, file_name = tempfile.mkstemp() 80 ofile = open(file_name, 'w') 81 82 try: 83 output_formatter = json_output_formatter.JSONOutputFormatter(ofile) 84 map_function_module = function_handle.ModuleToLoad( 85 filename=os.path.abspath(map_file)) 86 map_function_handle = function_handle.FunctionHandle( 87 modules_to_load=[map_function_module], 88 function_name=args.map_function_name) 89 90 trace_handles = _DownloadTraceHandles(args.input_url, temp_directory) 91 runner = map_runner.MapRunner(trace_handles, map_function_handle, 92 jobs=args.jobs, 93 output_formatters=[output_formatter]) 94 results = runner.Run() 95 96 if args.map_function_handle: 97 results = runner.RunMapper() 98 elif args.reduce_function_handle: 99 results = runner.RunReducer(trace_handles) 100 101 output_formatter.Format(results) 102 103 return results 104 finally: 105 ofile.close() 106 os.unlink(map_file) 107 shutil.rmtree(temp_directory) 108