1#!/usr/bin/env python
2"""Parameters to control Mapreduce."""
3
4__all__ = ["CONFIG_NAMESPACE",
5           "config"]
6
7import pickle
8
9
10# To break circular dependency and more.
11# pylint: disable=g-import-not-at-top
12
13
14# For the mapreduce in python 25 runtime, this import will fail.
15# TODO(user): Remove all pipeline import protections after 25 mr defunct.
16try:
17  from pipeline import util as pipeline_util
18except ImportError:
19  pipeline_util = None
20
21from google.appengine.api import lib_config
22
23CONFIG_NAMESPACE = "mapreduce"
24
25
26# pylint: disable=protected-access
27# pylint: disable=invalid-name
28
29
30class _JobConfigMeta(type):
31  """Metaclass that controls class creation."""
32
33  _OPTIONS = "_options"
34  _REQUIRED = "_required"
35
36  def __new__(mcs, classname, bases, class_dict):
37    """Creates a _Config class and modifies its class dict.
38
39    Args:
40      classname: name of the class.
41      bases: a list of base classes.
42      class_dict: original class dict.
43
44    Returns:
45      A new _Config class. The modified class will have two fields.
46      _options field is a dict from option name to _Option objects.
47      _required field is a set of required option names.
48    """
49    options = {}
50    required = set()
51    for name, option in class_dict.iteritems():
52      if isinstance(option, _Option):
53        options[name] = option
54        if option.required:
55          required.add(name)
56
57    for name in options:
58      class_dict.pop(name)
59    class_dict[mcs._OPTIONS] = options
60    class_dict[mcs._REQUIRED] = required
61    cls = type.__new__(mcs, classname, bases, class_dict)
62
63    # Handle inheritance of _Config.
64    if object not in bases:
65      parent_options = {}
66      # Update options from the root down.
67      for c in reversed(cls.__mro__):
68        if mcs._OPTIONS in c.__dict__:
69          # Children override parent.
70          parent_options.update(c.__dict__[mcs._OPTIONS])
71        if mcs._REQUIRED in c.__dict__:
72          required.update(c.__dict__[mcs._REQUIRED])
73      for k, v in parent_options.iteritems():
74        if k not in options:
75          options[k] = v
76    return cls
77
78
79class _Option(object):
80  """An option for _Config."""
81
82  def __init__(self, kind, required=False, default_factory=None,
83               can_be_none=False):
84    """Init.
85
86    Args:
87      kind: type of the option.
88      required: whether user is required to supply a value.
89      default_factory: a factory, when called, returns the default value.
90      can_be_none: whether value can be None.
91
92    Raises:
93      ValueError: if arguments aren't compatible.
94    """
95    if required and default_factory is not None:
96      raise ValueError("No default_factory value when option is required.")
97    self.kind = kind
98    self.required = required
99    self.default_factory = default_factory
100    self.can_be_none = can_be_none
101
102
103class _Config(object):
104  """Root class for all per job configuration."""
105
106  __metaclass__ = _JobConfigMeta
107
108  def __init__(self, _lenient=False, **kwds):
109    """Init.
110
111    Args:
112      _lenient: When true, no option is required.
113      **kwds: keyword arguments for options and their values.
114    """
115    self._verify_keys(kwds, _lenient)
116    self._set_values(kwds, _lenient)
117
118  def _verify_keys(self, kwds, _lenient):
119    keys = set()
120    for k in kwds:
121      if k not in self._options:
122        raise ValueError("Option %s is not supported." % (k))
123      keys.add(k)
124    if not _lenient:
125      missing = self._required - keys
126      if missing:
127        raise ValueError("Options %s are required." % tuple(missing))
128
129  def _set_values(self, kwds, _lenient):
130    for k, option in self._options.iteritems():
131      v = kwds.get(k)
132      if v is None and option.default_factory:
133        v = option.default_factory()
134      setattr(self, k, v)
135      if _lenient:
136        continue
137      if v is None and option.can_be_none:
138        continue
139      if isinstance(v, type) and not issubclass(v, option.kind):
140        raise TypeError(
141            "Expect subclass of %r for option %s. Got %r" % (
142                option.kind, k, v))
143      if not isinstance(v, type) and not isinstance(v, option.kind):
144        raise TypeError("Expect type %r for option %s. Got %r" % (
145            option.kind, k, v))
146
147  def __eq__(self, other):
148    if not isinstance(other, self.__class__):
149      return False
150    return other.__dict__ == self.__dict__
151
152  def __repr__(self):
153    return str(self.__dict__)
154
155  def to_json(self):
156    return {"config": pickle.dumps(self)}
157
158  @classmethod
159  def from_json(cls, json):
160    return pickle.loads(json["config"])
161
162
163# TODO(user): Make more of these private.
164class _ConfigDefaults(object):
165  """Default configs.
166
167  Do not change parameters whose names begin with _.
168
169  SHARD_MAX_ATTEMPTS: Max attempts to execute a shard before giving up.
170
171  TASK_MAX_ATTEMPTS: Max attempts to execute a task before dropping it. Task
172    is any taskqueue task created by MR framework. A task is dropped
173    when its X-AppEngine-TaskExecutionCount is bigger than this number.
174    Dropping a task will cause abort on the entire MR job.
175
176  TASK_MAX_DATA_PROCESSING_ATTEMPTS:
177    Max times to execute a task when previous task attempts failed during
178    data processing stage. An MR work task has three major stages:
179    initial setup, data processing, and final checkpoint.
180    Setup stage should be allowed to be retried more times than data processing
181    stage: setup failures are caused by unavailable GAE services while
182    data processing failures are mostly due to user function error out on
183    certain input data. Thus, set TASK_MAX_ATTEMPTS higher than this parameter.
184
185  QUEUE_NAME: Default queue for MR.
186
187  SHARD_COUNT: Default shard count.
188
189  PROCESSING_RATE_PER_SEC: Default rate of processed entities per second.
190
191  BASE_PATH : Base path of mapreduce and pipeline handlers.
192  """
193
194  SHARD_MAX_ATTEMPTS = 4
195
196  # Arbitrary big number.
197  TASK_MAX_ATTEMPTS = 31
198
199  TASK_MAX_DATA_PROCESSING_ATTEMPTS = 11
200
201  QUEUE_NAME = "default"
202
203  SHARD_COUNT = 8
204
205  # Maximum number of mapper calls per second.
206  # This parameter is useful for testing to force short slices.
207  # Maybe make this a private constant instead.
208  # If people want to rate limit their jobs, they can reduce shard count.
209  PROCESSING_RATE_PER_SEC = 1000000
210
211  # This path will be changed by build process when this is a part of SDK.
212  BASE_PATH = "/mapreduce"
213
214  # TODO(user): find a proper value for this.
215  # The amount of time to perform scanning in one slice. New slice will be
216  # scheduled as soon as current one takes this long.
217  _SLICE_DURATION_SEC = 15
218
219  # Delay between consecutive controller callback invocations.
220  _CONTROLLER_PERIOD_SEC = 2
221
222
223# TODO(user): changes this name to app_config
224config = lib_config.register(CONFIG_NAMESPACE, _ConfigDefaults.__dict__)
225
226
227# The following are constants that depends on the value of _config.
228# They are constants because _config is completely initialized on the first
229# request of an instance and will never change until user deploy a new version.
230_DEFAULT_PIPELINE_BASE_PATH = config.BASE_PATH + "/pipeline"
231# See b/11341023 for context.
232_GCS_URLFETCH_TIMEOUT_SEC = 30
233# If a lock has been held longer than this value, mapreduce will start to use
234# logs API to check if the request has ended.
235_LEASE_DURATION_SEC = config._SLICE_DURATION_SEC * 1.1
236# In rare occasions, Logs API misses log entries. Thus
237# if a lock has been held longer than this timeout, mapreduce assumes the
238# request holding the lock has died, regardless of Logs API.
239# 10 mins is taskqueue task timeout on a frontend.
240_MAX_LEASE_DURATION_SEC = max(10 * 60 + 30, config._SLICE_DURATION_SEC * 1.5)
241