1#!/usr/bin/env python 2# Copyright 2011 Google Inc. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16"""Pipelines for mapreduce library.""" 17 18 19 20__all__ = [ 21 "MapperPipeline", 22 ] 23 24 25from mapreduce import control 26from mapreduce import model 27from mapreduce import parameters 28from mapreduce import pipeline_base 29 30# pylint: disable=g-bad-name 31# pylint: disable=protected-access 32 33 34class MapperPipeline(pipeline_base._OutputSlotsMixin, 35 pipeline_base.PipelineBase): 36 """Pipeline wrapper for mapper job. 37 38 Args: 39 job_name: mapper job name as string 40 handler_spec: mapper handler specification as string. 41 input_reader_spec: input reader specification as string. 42 output_writer_spec: output writer specification as string. 43 params: mapper parameters for input reader and output writer as dict. 44 shards: number of shards in the job as int. 45 46 Returns: 47 default: the list of filenames produced by the mapper if there was any 48 output and the map was completed successfully. 49 result_status: one of model.MapreduceState._RESULTS. 50 job_id: mr id that can be used to query model.MapreduceState. Available 51 immediately after run returns. 52 """ 53 async = True 54 55 # TODO(user): we probably want to output counters too. 56 # Might also need to double filenames as named output. 57 output_names = [ 58 # Job ID. MapreduceState.get_by_job_id can be used to load 59 # mapreduce state. 60 "job_id", 61 # Dictionary of final counter values. Filled when job is completed. 62 "counters"] + pipeline_base._OutputSlotsMixin.output_names 63 64 def run(self, 65 job_name, 66 handler_spec, 67 input_reader_spec, 68 output_writer_spec=None, 69 params=None, 70 shards=None): 71 """Start a mapreduce job. 72 73 Args: 74 job_name: mapreduce name. Only for display purpose. 75 handler_spec: fully qualified name to your map function/class. 76 input_reader_spec: fully qualified name to input reader class. 77 output_writer_spec: fully qualified name to output writer class. 78 params: a dictionary of parameters for input reader and output writer 79 initialization. 80 shards: number of shards. This provides a guide to mapreduce. The real 81 number of shards is determined by how input are splited. 82 """ 83 if shards is None: 84 shards = parameters.config.SHARD_COUNT 85 86 mapreduce_id = control.start_map( 87 job_name, 88 handler_spec, 89 input_reader_spec, 90 params or {}, 91 mapreduce_parameters={ 92 "done_callback": self.get_callback_url(), 93 "done_callback_method": "GET", 94 "pipeline_id": self.pipeline_id, 95 }, 96 shard_count=shards, 97 output_writer_spec=output_writer_spec, 98 queue_name=self.queue_name, 99 ) 100 self.fill(self.outputs.job_id, mapreduce_id) 101 self.set_status(console_url="%s/detail?mapreduce_id=%s" % ( 102 (parameters.config.BASE_PATH, mapreduce_id))) 103 104 def try_cancel(self): 105 """Always allow mappers to be canceled and retried.""" 106 return True 107 108 def callback(self): 109 """Callback after this async pipeline finishes.""" 110 if self.was_aborted: 111 return 112 113 mapreduce_id = self.outputs.job_id.value 114 mapreduce_state = model.MapreduceState.get_by_job_id(mapreduce_id) 115 if mapreduce_state.result_status != model.MapreduceState.RESULT_SUCCESS: 116 self.retry("Job %s had status %s" % ( 117 mapreduce_id, mapreduce_state.result_status)) 118 return 119 120 mapper_spec = mapreduce_state.mapreduce_spec.mapper 121 outputs = [] 122 output_writer_class = mapper_spec.output_writer_class() 123 if (output_writer_class and 124 mapreduce_state.result_status == model.MapreduceState.RESULT_SUCCESS): 125 outputs = output_writer_class.get_filenames(mapreduce_state) 126 127 self.fill(self.outputs.result_status, mapreduce_state.result_status) 128 self.fill(self.outputs.counters, mapreduce_state.counters_map.to_dict()) 129 self.complete(outputs) 130