1#!/usr/bin/env python
2#
3# Copyright 2010 Google Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Status page handler for mapreduce framework."""
18
19__author__ = ("aizatsky@google.com (Mike Aizatsky)",
20              "bslatkin@google.com (Brett Slatkin)")
21
22import os
23import pkgutil
24import time
25import zipfile
26
27from google.appengine.api import validation
28from google.appengine.api import yaml_builder
29from google.appengine.api import yaml_errors
30from google.appengine.api import yaml_listener
31from google.appengine.api import yaml_object
32from google.appengine.ext import db
33from google.appengine.ext import webapp
34from mapreduce import base_handler
35from mapreduce import errors
36from mapreduce import model
37
38
39# TODO(user): a list of features we'd like to have in status page:
40# - show sparklet of entities/sec on index page
41# - shard bar chart should color finished shards differently
42
43# mapreduce.yaml file names
44MR_YAML_NAMES = ["mapreduce.yaml", "mapreduce.yml"]
45
46
47class BadStatusParameterError(Exception):
48  """A parameter passed to a status handler was invalid."""
49
50
51class UserParam(validation.Validated):
52  """A user-supplied parameter to a mapreduce job."""
53
54  ATTRIBUTES = {
55      "name":  r"[a-zA-Z0-9_\.]+",
56      "default": validation.Optional(r".*"),
57      "value": validation.Optional(r".*"),
58  }
59
60
61class MapperInfo(validation.Validated):
62  """Configuration parameters for the mapper part of the job."""
63
64  ATTRIBUTES = {
65    "handler": r".+",
66    "input_reader": r".+",
67    "output_writer": validation.Optional(r".+"),
68    "params": validation.Optional(validation.Repeated(UserParam)),
69    "params_validator": validation.Optional(r".+"),
70  }
71
72
73class MapreduceInfo(validation.Validated):
74  """Mapreduce description in mapreduce.yaml."""
75
76  ATTRIBUTES = {
77      "name": r".+",
78      "mapper": MapperInfo,
79      "params": validation.Optional(validation.Repeated(UserParam)),
80      "params_validator": validation.Optional(r".+"),
81  }
82
83
84class MapReduceYaml(validation.Validated):
85  """Root class for mapreduce.yaml.
86
87  File format:
88
89  mapreduce:
90  - name: <mapreduce_name>
91    mapper:
92      - input_reader: google.appengine.ext.mapreduce.DatastoreInputReader
93      - handler: path_to_my.MapperFunction
94      - params:
95        - name: foo
96          default: bar
97        - name: blah
98          default: stuff
99      - params_validator: path_to_my.ValidatorFunction
100
101  Where
102    mapreduce_name: The name of the mapreduce. Used for UI purposes.
103    mapper_handler_spec: Full <module_name>.<function_name/class_name> of
104      mapper handler. See MapreduceSpec class documentation for full handler
105      specification.
106    input_reader: Full <module_name>.<function_name/class_name> of the
107      InputReader sub-class to use for the mapper job.
108    params: A list of optional parameter names and optional default values
109      that may be supplied or overridden by the user running the job.
110    params_validator is full <module_name>.<function_name/class_name> of
111      a callable to validate the mapper_params after they are input by the
112      user running the job.
113  """
114
115  ATTRIBUTES = {
116      "mapreduce": validation.Optional(validation.Repeated(MapreduceInfo))
117  }
118
119  @staticmethod
120  def to_dict(mapreduce_yaml):
121    """Converts a MapReduceYaml file into a JSON-encodable dictionary.
122
123    For use in user-visible UI and internal methods for interfacing with
124    user code (like param validation). as a list
125
126    Args:
127      mapreduce_yaml: The Pyton representation of the mapreduce.yaml document.
128
129    Returns:
130      A list of configuration dictionaries.
131    """
132    all_configs = []
133    for config in mapreduce_yaml.mapreduce:
134      out = {
135          "name": config.name,
136          "mapper_input_reader": config.mapper.input_reader,
137          "mapper_handler": config.mapper.handler,
138      }
139      if config.mapper.params_validator:
140        out["mapper_params_validator"] = config.mapper.params_validator
141      if config.mapper.params:
142        param_defaults = {}
143        for param in config.mapper.params:
144          param_defaults[param.name] = param.default or param.value
145        out["mapper_params"] = param_defaults
146      if config.params:
147        param_defaults = {}
148        for param in config.params:
149          param_defaults[param.name] = param.default or param.value
150        out["params"] = param_defaults
151      if config.mapper.output_writer:
152        out["mapper_output_writer"] = config.mapper.output_writer
153      all_configs.append(out)
154
155    return all_configs
156
157
158# N.B. Sadly, we currently don't have and ability to determine
159# application root dir at run time. We need to walk up the directory structure
160# to find it.
161def find_mapreduce_yaml(status_file=__file__):
162  """Traverse directory trees to find mapreduce.yaml file.
163
164  Begins with the location of status.py and then moves on to check the working
165  directory.
166
167  Args:
168    status_file: location of status.py, overridable for testing purposes.
169
170  Returns:
171    the path of mapreduce.yaml file or None if not found.
172  """
173  checked = set()
174  yaml = _find_mapreduce_yaml(os.path.dirname(status_file), checked)
175  if not yaml:
176    yaml = _find_mapreduce_yaml(os.getcwd(), checked)
177  return yaml
178
179
180def _find_mapreduce_yaml(start, checked):
181  """Traverse the directory tree identified by start until a directory already
182  in checked is encountered or the path of mapreduce.yaml is found.
183
184  Checked is present both to make loop termination easy to reason about and so
185  that the same directories do not get rechecked.
186
187  Args:
188    start: the path to start in and work upward from
189    checked: the set of already examined directories
190
191  Returns:
192    the path of mapreduce.yaml file or None if not found.
193  """
194  dir = start
195  while dir not in checked:
196    checked.add(dir)
197    for mr_yaml_name in MR_YAML_NAMES:
198      yaml_path = os.path.join(dir, mr_yaml_name)
199      if os.path.exists(yaml_path):
200        return yaml_path
201    dir = os.path.dirname(dir)
202  return None
203
204
205def parse_mapreduce_yaml(contents):
206  """Parses mapreduce.yaml file contents.
207
208  Args:
209    contents: mapreduce.yaml file contents.
210
211  Returns:
212    MapReduceYaml object with all the data from original file.
213
214  Raises:
215    errors.BadYamlError: when contents is not a valid mapreduce.yaml file.
216  """
217  try:
218    builder = yaml_object.ObjectBuilder(MapReduceYaml)
219    handler = yaml_builder.BuilderHandler(builder)
220    listener = yaml_listener.EventListener(handler)
221    listener.Parse(contents)
222
223    mr_info = handler.GetResults()
224  except (ValueError, yaml_errors.EventError), e:
225    raise errors.BadYamlError(e)
226
227  if len(mr_info) < 1:
228    raise errors.BadYamlError("No configs found in mapreduce.yaml")
229  if len(mr_info) > 1:
230    raise errors.MultipleDocumentsInMrYaml("Found %d YAML documents" %
231                                           len(mr_info))
232
233  jobs = mr_info[0]
234  job_names = set(j.name for j in jobs.mapreduce)
235  if len(jobs.mapreduce) != len(job_names):
236    raise errors.BadYamlError(
237        "Overlapping mapreduce names; names must be unique")
238
239  return jobs
240
241
242def get_mapreduce_yaml(parse=parse_mapreduce_yaml):
243  """Locates mapreduce.yaml, loads and parses its info.
244
245  Args:
246    parse: Used for testing.
247
248  Returns:
249    MapReduceYaml object.
250
251  Raises:
252    errors.BadYamlError: when contents is not a valid mapreduce.yaml file or the
253    file is missing.
254  """
255  mr_yaml_path = find_mapreduce_yaml()
256  if not mr_yaml_path:
257    raise errors.MissingYamlError()
258  mr_yaml_file = open(mr_yaml_path)
259  try:
260    return parse(mr_yaml_file.read())
261  finally:
262    mr_yaml_file.close()
263
264
265class ResourceHandler(webapp.RequestHandler):
266  """Handler for static resources."""
267
268  _RESOURCE_MAP = {
269      "status": ("overview.html", "text/html"),
270      "detail": ("detail.html", "text/html"),
271      "base.css": ("base.css", "text/css"),
272      "jquery.js": ("jquery-1.6.1.min.js", "text/javascript"),
273      "jquery-json.js": ("jquery.json-2.2.min.js", "text/javascript"),
274      "jquery-url.js": ("jquery.url.js", "text/javascript"),
275      "status.js": ("status.js", "text/javascript"),
276  }
277
278  def get(self, relative):
279    if relative not in self._RESOURCE_MAP:
280      self.response.set_status(404)
281      self.response.out.write("Resource not found.")
282      return
283
284    real_path, content_type = self._RESOURCE_MAP[relative]
285    path = os.path.join(os.path.dirname(__file__), "static", real_path)
286
287    # It's possible we're inside a zipfile (zipimport).  If so, path
288    # will include 'something.zip'.
289    if ('.zip' + os.sep) in path:
290      (zip_file, zip_path) = os.path.relpath(path).split('.zip' + os.sep, 1)
291      content = zipfile.ZipFile(zip_file + '.zip').read(zip_path)
292    else:
293      try:
294        data = pkgutil.get_data(__name__, "static/" + real_path)
295      except AttributeError:  # Python < 2.6.
296        data = None
297      content = data or open(path, 'rb').read()
298
299    self.response.headers["Cache-Control"] = "public; max-age=300"
300    self.response.headers["Content-Type"] = content_type
301    self.response.out.write(content)
302
303
304class ListConfigsHandler(base_handler.GetJsonHandler):
305  """Lists mapreduce configs as JSON for users to start jobs."""
306
307  def handle(self):
308    self.json_response["configs"] = MapReduceYaml.to_dict(get_mapreduce_yaml())
309
310
311class ListJobsHandler(base_handler.GetJsonHandler):
312  """Lists running and completed mapreduce jobs for an overview as JSON."""
313
314  def handle(self):
315    cursor = self.request.get("cursor")
316    count = int(self.request.get("count", "50"))
317
318    query = model.MapreduceState.all()
319    if cursor:
320      query.filter("__key__ >=", db.Key(cursor))
321    query.order("__key__")
322
323    jobs_list = query.fetch(count + 1)
324    if len(jobs_list) == (count + 1):
325      self.json_response["cursor"] = str(jobs_list[-1].key())
326      jobs_list = jobs_list[:-1]
327
328    all_jobs = []
329    for job in jobs_list:
330      out = {
331          # Data shared between overview and detail pages.
332          "name": job.mapreduce_spec.name,
333          "mapreduce_id": job.mapreduce_spec.mapreduce_id,
334          "active": job.active,
335          "start_timestamp_ms":
336              int(time.mktime(job.start_time.utctimetuple()) * 1000),
337          "updated_timestamp_ms":
338              int(time.mktime(job.last_poll_time.utctimetuple()) * 1000),
339
340          # Specific to overview page.
341          "chart_url": job.sparkline_url,
342          "chart_width": job.chart_width,
343          "active_shards": job.active_shards,
344          "shards": job.mapreduce_spec.mapper.shard_count,
345      }
346      if job.result_status:
347        out["result_status"] = job.result_status
348      all_jobs.append(out)
349
350    self.json_response["jobs"] = all_jobs
351
352
353class GetJobDetailHandler(base_handler.GetJsonHandler):
354  """Retrieves the details of a mapreduce job as JSON."""
355
356  def handle(self):
357    mapreduce_id = self.request.get("mapreduce_id")
358    if not mapreduce_id:
359      raise BadStatusParameterError("'mapreduce_id' was invalid")
360    job = model.MapreduceState.get_by_key_name(mapreduce_id)
361    if job is None:
362      raise KeyError("Could not find job with ID %r" % mapreduce_id)
363
364    self.json_response.update(job.mapreduce_spec.to_json())
365    self.json_response.update(job.counters_map.to_json())
366    self.json_response.update({
367        # Shared with overview page.
368        "active": job.active,
369        "start_timestamp_ms":
370            int(time.mktime(job.start_time.utctimetuple()) * 1000),
371        "updated_timestamp_ms":
372            int(time.mktime(job.last_poll_time.utctimetuple()) * 1000),
373
374        # Specific to detail page.
375        "chart_url": job.chart_url,
376        "chart_width": job.chart_width,
377    })
378    self.json_response["result_status"] = job.result_status
379
380    all_shards = []
381    for shard in model.ShardState.find_all_by_mapreduce_state(job):
382      out = {
383          "active": shard.active,
384          "result_status": shard.result_status,
385          "shard_number": shard.shard_number,
386          "shard_id": shard.shard_id,
387          "updated_timestamp_ms":
388              int(time.mktime(shard.update_time.utctimetuple()) * 1000),
389          "shard_description": shard.shard_description,
390          "last_work_item": shard.last_work_item,
391      }
392      out.update(shard.counters_map.to_json())
393      all_shards.append(out)
394    all_shards.sort(key=lambda x: x["shard_number"])
395    self.json_response["shards"] = all_shards
396