1#!/usr/bin/env python 2# 3# Copyright 2010 Google Inc. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Status page handler for mapreduce framework.""" 18 19__author__ = ("aizatsky@google.com (Mike Aizatsky)", 20 "bslatkin@google.com (Brett Slatkin)") 21 22import os 23import pkgutil 24import time 25import zipfile 26 27from google.appengine.api import validation 28from google.appengine.api import yaml_builder 29from google.appengine.api import yaml_errors 30from google.appengine.api import yaml_listener 31from google.appengine.api import yaml_object 32from google.appengine.ext import db 33from google.appengine.ext import webapp 34from mapreduce import base_handler 35from mapreduce import errors 36from mapreduce import model 37 38 39# TODO(user): a list of features we'd like to have in status page: 40# - show sparklet of entities/sec on index page 41# - shard bar chart should color finished shards differently 42 43# mapreduce.yaml file names 44MR_YAML_NAMES = ["mapreduce.yaml", "mapreduce.yml"] 45 46 47class BadStatusParameterError(Exception): 48 """A parameter passed to a status handler was invalid.""" 49 50 51class UserParam(validation.Validated): 52 """A user-supplied parameter to a mapreduce job.""" 53 54 ATTRIBUTES = { 55 "name": r"[a-zA-Z0-9_\.]+", 56 "default": validation.Optional(r".*"), 57 "value": validation.Optional(r".*"), 58 } 59 60 61class MapperInfo(validation.Validated): 62 """Configuration parameters for the mapper part of the job.""" 63 64 ATTRIBUTES = { 65 "handler": r".+", 66 "input_reader": r".+", 67 "output_writer": validation.Optional(r".+"), 68 "params": validation.Optional(validation.Repeated(UserParam)), 69 "params_validator": validation.Optional(r".+"), 70 } 71 72 73class MapreduceInfo(validation.Validated): 74 """Mapreduce description in mapreduce.yaml.""" 75 76 ATTRIBUTES = { 77 "name": r".+", 78 "mapper": MapperInfo, 79 "params": validation.Optional(validation.Repeated(UserParam)), 80 "params_validator": validation.Optional(r".+"), 81 } 82 83 84class MapReduceYaml(validation.Validated): 85 """Root class for mapreduce.yaml. 86 87 File format: 88 89 mapreduce: 90 - name: <mapreduce_name> 91 mapper: 92 - input_reader: google.appengine.ext.mapreduce.DatastoreInputReader 93 - handler: path_to_my.MapperFunction 94 - params: 95 - name: foo 96 default: bar 97 - name: blah 98 default: stuff 99 - params_validator: path_to_my.ValidatorFunction 100 101 Where 102 mapreduce_name: The name of the mapreduce. Used for UI purposes. 103 mapper_handler_spec: Full <module_name>.<function_name/class_name> of 104 mapper handler. See MapreduceSpec class documentation for full handler 105 specification. 106 input_reader: Full <module_name>.<function_name/class_name> of the 107 InputReader sub-class to use for the mapper job. 108 params: A list of optional parameter names and optional default values 109 that may be supplied or overridden by the user running the job. 110 params_validator is full <module_name>.<function_name/class_name> of 111 a callable to validate the mapper_params after they are input by the 112 user running the job. 113 """ 114 115 ATTRIBUTES = { 116 "mapreduce": validation.Optional(validation.Repeated(MapreduceInfo)) 117 } 118 119 @staticmethod 120 def to_dict(mapreduce_yaml): 121 """Converts a MapReduceYaml file into a JSON-encodable dictionary. 122 123 For use in user-visible UI and internal methods for interfacing with 124 user code (like param validation). as a list 125 126 Args: 127 mapreduce_yaml: The Pyton representation of the mapreduce.yaml document. 128 129 Returns: 130 A list of configuration dictionaries. 131 """ 132 all_configs = [] 133 for config in mapreduce_yaml.mapreduce: 134 out = { 135 "name": config.name, 136 "mapper_input_reader": config.mapper.input_reader, 137 "mapper_handler": config.mapper.handler, 138 } 139 if config.mapper.params_validator: 140 out["mapper_params_validator"] = config.mapper.params_validator 141 if config.mapper.params: 142 param_defaults = {} 143 for param in config.mapper.params: 144 param_defaults[param.name] = param.default or param.value 145 out["mapper_params"] = param_defaults 146 if config.params: 147 param_defaults = {} 148 for param in config.params: 149 param_defaults[param.name] = param.default or param.value 150 out["params"] = param_defaults 151 if config.mapper.output_writer: 152 out["mapper_output_writer"] = config.mapper.output_writer 153 all_configs.append(out) 154 155 return all_configs 156 157 158# N.B. Sadly, we currently don't have and ability to determine 159# application root dir at run time. We need to walk up the directory structure 160# to find it. 161def find_mapreduce_yaml(status_file=__file__): 162 """Traverse directory trees to find mapreduce.yaml file. 163 164 Begins with the location of status.py and then moves on to check the working 165 directory. 166 167 Args: 168 status_file: location of status.py, overridable for testing purposes. 169 170 Returns: 171 the path of mapreduce.yaml file or None if not found. 172 """ 173 checked = set() 174 yaml = _find_mapreduce_yaml(os.path.dirname(status_file), checked) 175 if not yaml: 176 yaml = _find_mapreduce_yaml(os.getcwd(), checked) 177 return yaml 178 179 180def _find_mapreduce_yaml(start, checked): 181 """Traverse the directory tree identified by start until a directory already 182 in checked is encountered or the path of mapreduce.yaml is found. 183 184 Checked is present both to make loop termination easy to reason about and so 185 that the same directories do not get rechecked. 186 187 Args: 188 start: the path to start in and work upward from 189 checked: the set of already examined directories 190 191 Returns: 192 the path of mapreduce.yaml file or None if not found. 193 """ 194 dir = start 195 while dir not in checked: 196 checked.add(dir) 197 for mr_yaml_name in MR_YAML_NAMES: 198 yaml_path = os.path.join(dir, mr_yaml_name) 199 if os.path.exists(yaml_path): 200 return yaml_path 201 dir = os.path.dirname(dir) 202 return None 203 204 205def parse_mapreduce_yaml(contents): 206 """Parses mapreduce.yaml file contents. 207 208 Args: 209 contents: mapreduce.yaml file contents. 210 211 Returns: 212 MapReduceYaml object with all the data from original file. 213 214 Raises: 215 errors.BadYamlError: when contents is not a valid mapreduce.yaml file. 216 """ 217 try: 218 builder = yaml_object.ObjectBuilder(MapReduceYaml) 219 handler = yaml_builder.BuilderHandler(builder) 220 listener = yaml_listener.EventListener(handler) 221 listener.Parse(contents) 222 223 mr_info = handler.GetResults() 224 except (ValueError, yaml_errors.EventError), e: 225 raise errors.BadYamlError(e) 226 227 if len(mr_info) < 1: 228 raise errors.BadYamlError("No configs found in mapreduce.yaml") 229 if len(mr_info) > 1: 230 raise errors.MultipleDocumentsInMrYaml("Found %d YAML documents" % 231 len(mr_info)) 232 233 jobs = mr_info[0] 234 job_names = set(j.name for j in jobs.mapreduce) 235 if len(jobs.mapreduce) != len(job_names): 236 raise errors.BadYamlError( 237 "Overlapping mapreduce names; names must be unique") 238 239 return jobs 240 241 242def get_mapreduce_yaml(parse=parse_mapreduce_yaml): 243 """Locates mapreduce.yaml, loads and parses its info. 244 245 Args: 246 parse: Used for testing. 247 248 Returns: 249 MapReduceYaml object. 250 251 Raises: 252 errors.BadYamlError: when contents is not a valid mapreduce.yaml file or the 253 file is missing. 254 """ 255 mr_yaml_path = find_mapreduce_yaml() 256 if not mr_yaml_path: 257 raise errors.MissingYamlError() 258 mr_yaml_file = open(mr_yaml_path) 259 try: 260 return parse(mr_yaml_file.read()) 261 finally: 262 mr_yaml_file.close() 263 264 265class ResourceHandler(webapp.RequestHandler): 266 """Handler for static resources.""" 267 268 _RESOURCE_MAP = { 269 "status": ("overview.html", "text/html"), 270 "detail": ("detail.html", "text/html"), 271 "base.css": ("base.css", "text/css"), 272 "jquery.js": ("jquery-1.6.1.min.js", "text/javascript"), 273 "jquery-json.js": ("jquery.json-2.2.min.js", "text/javascript"), 274 "jquery-url.js": ("jquery.url.js", "text/javascript"), 275 "status.js": ("status.js", "text/javascript"), 276 } 277 278 def get(self, relative): 279 if relative not in self._RESOURCE_MAP: 280 self.response.set_status(404) 281 self.response.out.write("Resource not found.") 282 return 283 284 real_path, content_type = self._RESOURCE_MAP[relative] 285 path = os.path.join(os.path.dirname(__file__), "static", real_path) 286 287 # It's possible we're inside a zipfile (zipimport). If so, path 288 # will include 'something.zip'. 289 if ('.zip' + os.sep) in path: 290 (zip_file, zip_path) = os.path.relpath(path).split('.zip' + os.sep, 1) 291 content = zipfile.ZipFile(zip_file + '.zip').read(zip_path) 292 else: 293 try: 294 data = pkgutil.get_data(__name__, "static/" + real_path) 295 except AttributeError: # Python < 2.6. 296 data = None 297 content = data or open(path, 'rb').read() 298 299 self.response.headers["Cache-Control"] = "public; max-age=300" 300 self.response.headers["Content-Type"] = content_type 301 self.response.out.write(content) 302 303 304class ListConfigsHandler(base_handler.GetJsonHandler): 305 """Lists mapreduce configs as JSON for users to start jobs.""" 306 307 def handle(self): 308 self.json_response["configs"] = MapReduceYaml.to_dict(get_mapreduce_yaml()) 309 310 311class ListJobsHandler(base_handler.GetJsonHandler): 312 """Lists running and completed mapreduce jobs for an overview as JSON.""" 313 314 def handle(self): 315 cursor = self.request.get("cursor") 316 count = int(self.request.get("count", "50")) 317 318 query = model.MapreduceState.all() 319 if cursor: 320 query.filter("__key__ >=", db.Key(cursor)) 321 query.order("__key__") 322 323 jobs_list = query.fetch(count + 1) 324 if len(jobs_list) == (count + 1): 325 self.json_response["cursor"] = str(jobs_list[-1].key()) 326 jobs_list = jobs_list[:-1] 327 328 all_jobs = [] 329 for job in jobs_list: 330 out = { 331 # Data shared between overview and detail pages. 332 "name": job.mapreduce_spec.name, 333 "mapreduce_id": job.mapreduce_spec.mapreduce_id, 334 "active": job.active, 335 "start_timestamp_ms": 336 int(time.mktime(job.start_time.utctimetuple()) * 1000), 337 "updated_timestamp_ms": 338 int(time.mktime(job.last_poll_time.utctimetuple()) * 1000), 339 340 # Specific to overview page. 341 "chart_url": job.sparkline_url, 342 "chart_width": job.chart_width, 343 "active_shards": job.active_shards, 344 "shards": job.mapreduce_spec.mapper.shard_count, 345 } 346 if job.result_status: 347 out["result_status"] = job.result_status 348 all_jobs.append(out) 349 350 self.json_response["jobs"] = all_jobs 351 352 353class GetJobDetailHandler(base_handler.GetJsonHandler): 354 """Retrieves the details of a mapreduce job as JSON.""" 355 356 def handle(self): 357 mapreduce_id = self.request.get("mapreduce_id") 358 if not mapreduce_id: 359 raise BadStatusParameterError("'mapreduce_id' was invalid") 360 job = model.MapreduceState.get_by_key_name(mapreduce_id) 361 if job is None: 362 raise KeyError("Could not find job with ID %r" % mapreduce_id) 363 364 self.json_response.update(job.mapreduce_spec.to_json()) 365 self.json_response.update(job.counters_map.to_json()) 366 self.json_response.update({ 367 # Shared with overview page. 368 "active": job.active, 369 "start_timestamp_ms": 370 int(time.mktime(job.start_time.utctimetuple()) * 1000), 371 "updated_timestamp_ms": 372 int(time.mktime(job.last_poll_time.utctimetuple()) * 1000), 373 374 # Specific to detail page. 375 "chart_url": job.chart_url, 376 "chart_width": job.chart_width, 377 }) 378 self.json_response["result_status"] = job.result_status 379 380 all_shards = [] 381 for shard in model.ShardState.find_all_by_mapreduce_state(job): 382 out = { 383 "active": shard.active, 384 "result_status": shard.result_status, 385 "shard_number": shard.shard_number, 386 "shard_id": shard.shard_id, 387 "updated_timestamp_ms": 388 int(time.mktime(shard.update_time.utctimetuple()) * 1000), 389 "shard_description": shard.shard_description, 390 "last_work_item": shard.last_work_item, 391 } 392 out.update(shard.counters_map.to_json()) 393 all_shards.append(out) 394 all_shards.sort(key=lambda x: x["shard_number"]) 395 self.json_response["shards"] = all_shards 396