1#!/usr/bin/env python
2# Copyright 2011 Google Inc. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""Pipelines for mapreduce library."""
17
18
19
20__all__ = [
21    "MapperPipeline",
22    ]
23
24
25from mapreduce import control
26from mapreduce import model
27from mapreduce import parameters
28from mapreduce import pipeline_base
29
30# pylint: disable=g-bad-name
31# pylint: disable=protected-access
32
33
34class MapperPipeline(pipeline_base._OutputSlotsMixin,
35                     pipeline_base.PipelineBase):
36  """Pipeline wrapper for mapper job.
37
38  Args:
39    job_name: mapper job name as string
40    handler_spec: mapper handler specification as string.
41    input_reader_spec: input reader specification as string.
42    output_writer_spec: output writer specification as string.
43    params: mapper parameters for input reader and output writer as dict.
44    shards: number of shards in the job as int.
45
46  Returns:
47    default: the list of filenames produced by the mapper if there was any
48      output and the map was completed successfully.
49    result_status: one of model.MapreduceState._RESULTS.
50    job_id: mr id that can be used to query model.MapreduceState. Available
51      immediately after run returns.
52  """
53  async = True
54
55  # TODO(user): we probably want to output counters too.
56  # Might also need to double filenames as named output.
57  output_names = [
58      # Job ID. MapreduceState.get_by_job_id can be used to load
59      # mapreduce state.
60      "job_id",
61      # Dictionary of final counter values. Filled when job is completed.
62      "counters"] + pipeline_base._OutputSlotsMixin.output_names
63
64  def run(self,
65          job_name,
66          handler_spec,
67          input_reader_spec,
68          output_writer_spec=None,
69          params=None,
70          shards=None):
71    """Start a mapreduce job.
72
73    Args:
74      job_name: mapreduce name. Only for display purpose.
75      handler_spec: fully qualified name to your map function/class.
76      input_reader_spec: fully qualified name to input reader class.
77      output_writer_spec: fully qualified name to output writer class.
78      params: a dictionary of parameters for input reader and output writer
79        initialization.
80      shards: number of shards. This provides a guide to mapreduce. The real
81        number of shards is determined by how input are splited.
82    """
83    if shards is None:
84      shards = parameters.config.SHARD_COUNT
85
86    mapreduce_id = control.start_map(
87        job_name,
88        handler_spec,
89        input_reader_spec,
90        params or {},
91        mapreduce_parameters={
92            "done_callback": self.get_callback_url(),
93            "done_callback_method": "GET",
94            "pipeline_id": self.pipeline_id,
95        },
96        shard_count=shards,
97        output_writer_spec=output_writer_spec,
98        queue_name=self.queue_name,
99        )
100    self.fill(self.outputs.job_id, mapreduce_id)
101    self.set_status(console_url="%s/detail?mapreduce_id=%s" % (
102        (parameters.config.BASE_PATH, mapreduce_id)))
103
104  def try_cancel(self):
105    """Always allow mappers to be canceled and retried."""
106    return True
107
108  def callback(self):
109    """Callback after this async pipeline finishes."""
110    if self.was_aborted:
111      return
112
113    mapreduce_id = self.outputs.job_id.value
114    mapreduce_state = model.MapreduceState.get_by_job_id(mapreduce_id)
115    if mapreduce_state.result_status != model.MapreduceState.RESULT_SUCCESS:
116      self.retry("Job %s had status %s" % (
117          mapreduce_id, mapreduce_state.result_status))
118      return
119
120    mapper_spec = mapreduce_state.mapreduce_spec.mapper
121    outputs = []
122    output_writer_class = mapper_spec.output_writer_class()
123    if (output_writer_class and
124        mapreduce_state.result_status == model.MapreduceState.RESULT_SUCCESS):
125      outputs = output_writer_class.get_filenames(mapreduce_state)
126
127    self.fill(self.outputs.result_status, mapreduce_state.result_status)
128    self.fill(self.outputs.counters, mapreduce_state.counters_map.to_dict())
129    self.complete(outputs)
130