1#!/usr/bin/env python
2# Copyright 2010 Google Inc. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""Base handler class for all mapreduce handlers."""
17
18
19
20# pylint: disable=protected-access
21# pylint: disable=g-bad-name
22# pylint: disable=g-import-not-at-top
23
24import httplib
25import logging
26
27try:
28  import json
29except ImportError:
30  import simplejson as json
31
32try:
33  from mapreduce import pipeline_base
34except ImportError:
35  pipeline_base = None
36try:
37  # Check if the full cloudstorage package exists. The stub part is in runtime.
38  import cloudstorage
39  if hasattr(cloudstorage, "_STUB"):
40    cloudstorage = None
41except ImportError:
42  cloudstorage = None
43
44from google.appengine.ext import webapp
45from mapreduce import errors
46from mapreduce import json_util
47from mapreduce import model
48from mapreduce import parameters
49
50
51class Error(Exception):
52  """Base-class for exceptions in this module."""
53
54
55class BadRequestPathError(Error):
56  """The request path for the handler is invalid."""
57
58
59class TaskQueueHandler(webapp.RequestHandler):
60  """Base class for handlers intended to be run only from the task queue.
61
62  Sub-classes should implement
63  1. the 'handle' method for all POST request.
64  2. '_preprocess' method for decoding or validations before handle.
65  3. '_drop_gracefully' method if _preprocess fails and the task has to
66     be dropped.
67
68  In Python27 runtime, webapp2 will automatically replace webapp.
69  """
70
71  _DEFAULT_USER_AGENT = "App Engine Python MR"
72
73  def __init__(self, *args, **kwargs):
74    # webapp framework invokes initialize after __init__.
75    # webapp2 framework invokes initialize within __init__.
76    # Python27 runtime swap webapp with webapp2 underneath us.
77    # Since initialize will conditionally change this field,
78    # it needs to be set before calling super's __init__.
79    self._preprocess_success = False
80    super(TaskQueueHandler, self).__init__(*args, **kwargs)
81    if cloudstorage:
82      cloudstorage.set_default_retry_params(
83          cloudstorage.RetryParams(
84              min_retries=5,
85              max_retries=10,
86              urlfetch_timeout=parameters._GCS_URLFETCH_TIMEOUT_SEC,
87              save_access_token=True,
88              _user_agent=self._DEFAULT_USER_AGENT))
89
90  def initialize(self, request, response):
91    """Initialize.
92
93    1. call webapp init.
94    2. check request is indeed from taskqueue.
95    3. check the task has not been retried too many times.
96    4. run handler specific processing logic.
97    5. run error handling logic if precessing failed.
98
99    Args:
100      request: a webapp.Request instance.
101      response: a webapp.Response instance.
102    """
103    super(TaskQueueHandler, self).initialize(request, response)
104
105    # Check request is from taskqueue.
106    if "X-AppEngine-QueueName" not in self.request.headers:
107      logging.error(self.request.headers)
108      logging.error("Task queue handler received non-task queue request")
109      self.response.set_status(
110          403, message="Task queue handler received non-task queue request")
111      return
112
113    # Check task has not been retried too many times.
114    if self.task_retry_count() + 1 > parameters.config.TASK_MAX_ATTEMPTS:
115      logging.error(
116          "Task %s has been attempted %s times. Dropping it permanently.",
117          self.request.headers["X-AppEngine-TaskName"],
118          self.task_retry_count() + 1)
119      self._drop_gracefully()
120      return
121
122    try:
123      self._preprocess()
124      self._preprocess_success = True
125    # pylint: disable=bare-except
126    except:
127      self._preprocess_success = False
128      logging.error(
129          "Preprocess task %s failed. Dropping it permanently.",
130          self.request.headers["X-AppEngine-TaskName"])
131      self._drop_gracefully()
132
133  def post(self):
134    if self._preprocess_success:
135      self.handle()
136
137  def handle(self):
138    """To be implemented by subclasses."""
139    raise NotImplementedError()
140
141  def _preprocess(self):
142    """Preprocess.
143
144    This method is called after webapp initialization code has been run
145    successfully. It can thus access self.request, self.response and so on.
146    """
147    pass
148
149  def _drop_gracefully(self):
150    """Drop task gracefully.
151
152    When preprocess failed, this method is called before the task is dropped.
153    """
154    pass
155
156  def task_retry_count(self):
157    """Number of times this task has been retried."""
158    return int(self.request.headers.get("X-AppEngine-TaskExecutionCount", 0))
159
160  def retry_task(self):
161    """Ask taskqueue to retry this task.
162
163    Even though raising an exception can cause a task retry, it
164    will flood logs with highly visible ERROR logs. Handlers should uses
165    this method to perform controlled task retries. Only raise exceptions
166    for those deserve ERROR log entries.
167    """
168    self.response.set_status(httplib.SERVICE_UNAVAILABLE, "Retry task")
169    self.response.clear()
170
171
172class JsonHandler(webapp.RequestHandler):
173  """Base class for JSON handlers for user interface.
174
175  Sub-classes should implement the 'handle' method. They should put their
176  response data in the 'self.json_response' dictionary. Any exceptions raised
177  by the sub-class implementation will be sent in a JSON response with the
178  name of the error_class and the error_message.
179  """
180
181  def __init__(self, *args):
182    """Initializer."""
183    super(JsonHandler, self).__init__(*args)
184    self.json_response = {}
185
186  def base_path(self):
187    """Base path for all mapreduce-related urls.
188
189    JSON handlers are mapped to /base_path/command/command_name thus they
190    require special treatment.
191
192    Raises:
193      BadRequestPathError: if the path does not end with "/command".
194
195    Returns:
196      The base path.
197    """
198    path = self.request.path
199    base_path = path[:path.rfind("/")]
200    if not base_path.endswith("/command"):
201      raise BadRequestPathError(
202          "Json handlers should have /command path prefix")
203    return base_path[:base_path.rfind("/")]
204
205  def _handle_wrapper(self):
206    """The helper method for handling JSON Post and Get requests."""
207    if self.request.headers.get("X-Requested-With") != "XMLHttpRequest":
208      logging.error("Got JSON request with no X-Requested-With header")
209      self.response.set_status(
210          403, message="Got JSON request with no X-Requested-With header")
211      return
212
213    self.json_response.clear()
214    try:
215      self.handle()
216    except errors.MissingYamlError:
217      logging.debug("Could not find 'mapreduce.yaml' file.")
218      self.json_response.clear()
219      self.json_response["error_class"] = "Notice"
220      self.json_response["error_message"] = "Could not find 'mapreduce.yaml'"
221    except Exception, e:
222      logging.exception("Error in JsonHandler, returning exception.")
223      # TODO(user): Include full traceback here for the end-user.
224      self.json_response.clear()
225      self.json_response["error_class"] = e.__class__.__name__
226      self.json_response["error_message"] = str(e)
227
228    self.response.headers["Content-Type"] = "text/javascript"
229    try:
230      output = json.dumps(self.json_response, cls=json_util.JsonEncoder)
231    # pylint: disable=broad-except
232    except Exception, e:
233      logging.exception("Could not serialize to JSON")
234      self.response.set_status(500, message="Could not serialize to JSON")
235      return
236    else:
237      self.response.out.write(output)
238
239  def handle(self):
240    """To be implemented by sub-classes."""
241    raise NotImplementedError()
242
243
244class PostJsonHandler(JsonHandler):
245  """JSON handler that accepts POST requests."""
246
247  def post(self):
248    self._handle_wrapper()
249
250
251class GetJsonHandler(JsonHandler):
252  """JSON handler that accepts GET posts."""
253
254  def get(self):
255    self._handle_wrapper()
256
257
258class HugeTaskHandler(TaskQueueHandler):
259  """Base handler for processing HugeTasks."""
260
261  class _RequestWrapper(object):
262    """Container of a request and associated parameters."""
263
264    def __init__(self, request):
265      self._request = request
266      self._params = model.HugeTask.decode_payload(request)
267
268    def get(self, name, default=""):
269      return self._params.get(name, default)
270
271    def set(self, name, value):
272      self._params[name] = value
273
274    def __getattr__(self, name):
275      return getattr(self._request, name)
276
277  def __init__(self, *args, **kwargs):
278    super(HugeTaskHandler, self).__init__(*args, **kwargs)
279
280  def _preprocess(self):
281    self.request = self._RequestWrapper(self.request)
282
283
284if pipeline_base:
285  # For backward compatiblity.
286  PipelineBase = pipeline_base.PipelineBase
287else:
288  PipelineBase = None
289