1#!/usr/bin/env python
2#
3# Copyright 2010 Google Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Common Pipelines for easy reuse."""
18
19import cgi
20import logging
21import random
22
23from google.appengine.api import mail
24from google.appengine.api import taskqueue
25
26import pipeline
27
28
29class Return(pipeline.Pipeline):
30  """Causes calling generator to have the supplied default output value.
31
32  Only works when yielded last!
33  """
34
35  def run(self, return_value=None):
36    return return_value
37
38
39class Ignore(pipeline.Pipeline):
40  """Mark the supplied parameters as unused outputs of sibling pipelines."""
41
42  def run(self, *args):
43    pass
44
45
46class Dict(pipeline.Pipeline):
47  """Returns a dictionary with the supplied keyword arguments."""
48
49  def run(self, **kwargs):
50    return dict(**kwargs)
51
52
53class List(pipeline.Pipeline):
54  """Returns a list with the supplied positional arguments."""
55
56  def run(self, *args):
57    return list(args)
58
59
60class AbortIfTrue(pipeline.Pipeline):
61  """Aborts the entire pipeline if the supplied argument is True."""
62
63  def run(self, value, message=''):
64    if value:
65      raise pipeline.Abort(message)
66
67
68class All(pipeline.Pipeline):
69  """Returns True if all of the values are True.
70
71  Returns False if there are no values present.
72  """
73
74  def run(self, *args):
75    if len(args) == 0:
76      return False
77    for value in args:
78      if not value:
79        return False
80    return True
81
82
83class Any(pipeline.Pipeline):
84  """Returns True if any of the values are True."""
85
86  def run(self, *args):
87    for value in args:
88      if value:
89        return True
90    return False
91
92
93class Complement(pipeline.Pipeline):
94  """Returns the boolean complement of the values."""
95
96  def run(self, *args):
97    if len(args) == 1:
98      return not args[0]
99    else:
100      return [not value for value in args]
101
102
103class Max(pipeline.Pipeline):
104  """Returns the max value."""
105
106  def __init__(self, *args):
107    if len(args) == 0:
108      raise TypeError('max expected at least 1 argument, got 0')
109    pipeline.Pipeline.__init__(self, *args)
110
111  def run(self, *args):
112    return max(args)
113
114
115class Min(pipeline.Pipeline):
116  """Returns the min value."""
117
118  def __init__(self, *args):
119    if len(args) == 0:
120      raise TypeError('min expected at least 1 argument, got 0')
121    pipeline.Pipeline.__init__(self, *args)
122
123  def run(self, *args):
124    return min(args)
125
126
127class Sum(pipeline.Pipeline):
128  """Returns the sum of all values."""
129
130  def __init__(self, *args):
131    if len(args) == 0:
132      raise TypeError('sum expected at least 1 argument, got 0')
133    pipeline.Pipeline.__init__(self, *args)
134
135  def run(self, *args):
136    return sum(args)
137
138
139class Multiply(pipeline.Pipeline):
140  """Returns all values multiplied together."""
141
142  def __init__(self, *args):
143    if len(args) == 0:
144      raise TypeError('multiply expected at least 1 argument, got 0')
145    pipeline.Pipeline.__init__(self, *args)
146
147  def run(self, *args):
148    total = 1
149    for value in args:
150      total *= value
151    return total
152
153
154class Negate(pipeline.Pipeline):
155  """Returns each value supplied multiplied by -1."""
156
157  def __init__(self, *args):
158    if len(args) == 0:
159      raise TypeError('negate expected at least 1 argument, got 0')
160    pipeline.Pipeline.__init__(self, *args)
161
162  def run(self, *args):
163    if len(args) == 1:
164      return -1 * args[0]
165    else:
166      return [-1 * x for x in args]
167
168
169class Extend(pipeline.Pipeline):
170  """Combine together lists and tuples into a single list.
171
172  Args:
173    *args: One or more lists or tuples.
174
175  Returns:
176    A single list of all supplied lists merged together in order. Length of
177    the output list is the sum of the lengths of all input lists.
178  """
179
180  def run(self, *args):
181    combined = []
182    for value in args:
183      combined.extend(value)
184    return combined
185
186
187class Append(pipeline.Pipeline):
188  """Combine together values into a list.
189
190  Args:
191    *args: One or more values.
192
193  Returns:
194    A single list of all values appended to the same list. Length of the
195    output list matches the length of the input list.
196  """
197
198  def run(self, *args):
199    combined = []
200    for value in args:
201      combined.append(value)
202    return combined
203
204
205class Concat(pipeline.Pipeline):
206  """Concatenates strings together using a join character.
207
208  Args:
209    *args: One or more strings.
210    separator: Keyword argument only; the string to use to join the args.
211
212  Returns:
213    The joined string.
214  """
215
216  def run(self, *args, **kwargs):
217    separator = kwargs.get('separator', '')
218    return separator.join(args)
219
220
221class Union(pipeline.Pipeline):
222  """Like Extend, but the resulting list has all unique elements."""
223
224  def run(self, *args):
225    combined = set()
226    for value in args:
227      combined.update(value)
228    return list(combined)
229
230
231class Intersection(pipeline.Pipeline):
232  """Returns only those items belonging to all of the supplied lists.
233
234  Each argument must be a list. No individual items are permitted.
235  """
236
237  def run(self, *args):
238    if not args:
239      return []
240    result = set(args[0])
241    for value in args[1:]:
242      result.intersection_update(set(value))
243    return list(result)
244
245
246class Uniquify(pipeline.Pipeline):
247  """Returns a list of unique items from the list of items supplied."""
248
249  def run(self, *args):
250    return list(set(args))
251
252
253class Format(pipeline.Pipeline):
254  """Formats a string with formatting arguments."""
255
256  @classmethod
257  def dict(cls, message, **format_dict):
258    """Formats a dictionary.
259
260    Args:
261      message: The format string.
262      **format_dict: Keyword arguments of format parameters to use for
263        formatting the string.
264
265    Returns:
266      The formatted string.
267    """
268    return cls('dict', message, format_dict)
269
270  @classmethod
271  def tuple(cls, message, *params):
272    """Formats a tuple.
273
274    Args:
275      message: The format string.
276      *params: The formatting positional parameters.
277
278    Returns:
279      The formatted string.
280    """
281    return cls('tuple', message, *params)
282
283  def run(self, format_type, message, *params):
284    if format_type == 'dict':
285      return message % params[0]
286    elif format_type == 'tuple':
287      return message % params
288    else:
289      raise pipeline.Abort('Invalid format type: %s' % format_type)
290
291
292class Log(pipeline.Pipeline):
293  """Logs a message, just like the Python logging module."""
294
295  # TODO: Hack the call stack of the logging message to use the file and line
296  # context from when it was first scheduled, not when it actually ran.
297
298  _log_method = logging.log
299
300  @classmethod
301  def log(cls, *args, **kwargs):
302    return Log(*args, **kwargs)
303
304  @classmethod
305  def debug(cls, *args, **kwargs):
306    return Log(logging.DEBUG, *args, **kwargs)
307
308  @classmethod
309  def info(cls, *args, **kwargs):
310    return Log(logging.INFO, *args, **kwargs)
311
312  @classmethod
313  def warning(cls, *args, **kwargs):
314    return Log(logging.WARNING, *args, **kwargs)
315
316  @classmethod
317  def error(cls, *args, **kwargs):
318    return Log(logging.ERROR, *args, **kwargs)
319
320  @classmethod
321  def critical(cls, *args, **kwargs):
322    return Log(logging.CRITICAL, *args, **kwargs)
323
324  def run(self, level, message, *args):
325    Log._log_method.im_func(level, message, *args)
326
327
328class Delay(pipeline.Pipeline):
329  """Waits N seconds before completion.
330
331  Args:
332    seconds: Keyword argument only. The number of seconds to wait. Will be
333      rounded to the nearest whole second.
334
335  Returns:
336    How long this delay waited.
337  """
338
339  async = True
340
341  def __init__(self, *args, **kwargs):
342    if len(args) != 0 or len(kwargs) != 1 or kwargs.keys()[0] != 'seconds':
343      raise TypeError('Delay takes one keyword parameter, "seconds".')
344    pipeline.Pipeline.__init__(self, *args, **kwargs)
345
346  def run(self, seconds=None):
347    task = self.get_callback_task(
348        countdown=seconds,
349        name='ae-pipeline-delay-' + self.pipeline_id)
350    try:
351      task.add(self.queue_name)
352    except (taskqueue.TombstonedTaskError, taskqueue.TaskAlreadyExistsError):
353      pass
354
355  def run_test(self, seconds=None):
356    logging.debug('Delay pipeline pretending to sleep %0.2f seconds', seconds)
357    self.complete(seconds)
358
359  def callback(self):
360    self.complete(self.kwargs['seconds'])
361
362
363class EmailToContinue(pipeline.Pipeline):
364  """Emails someone asking if the pipeline should continue.
365
366  When the user clicks "Approve", the pipeline will return True. When the
367  user clicks "Disapprove", the pipeline will return False.
368
369  Supply normal mail.EmailMessage parameters, plus two additional parameters:
370
371    approve_html: HTML to show to the user after clicking approve.
372    disapprove_html: HTML to show to the user after clicking disapprove.
373
374  Additionally, the 'body' and 'html' keyword arguments are treated as Python
375  dictionary templates with the keywords 'approval_url' and 'disapprove_url',
376  which let you place those links in your email however you want (as long
377  as clicking the links results in a GET request). The approve/disapprove URLs
378  are relative paths (e.g., '/relative/foo/bar'), so you must connect them to
379  whatever hostname you actually want users to access the callback on with an
380  absolute URL.
381
382  A random token is used to secure the asynchronous action.
383  """
384
385  async = True
386  public_callbacks = True
387
388  _email_message = mail.EmailMessage
389
390  def __init__(self, **kwargs):
391    if 'random_token' not in kwargs:
392      kwargs['random_token'] = '%x' % random.randint(0, 2**64)
393    if 'approve_html' not in kwargs:
394      kwargs['approve_html'] = '<h1>Approved!</h1>'
395    if 'disapprove_html' not in kwargs:
396      kwargs['disapprove_html'] = '<h1>Not Approved!</h1>'
397    pipeline.Pipeline.__init__(self, **kwargs)
398
399  def run(self, **kwargs):
400    random_token = kwargs.pop('random_token')
401    kwargs.pop('approve_html', '')
402    kwargs.pop('disapprove_html', '')
403
404    approve_url = self.get_callback_url(
405        random_token=random_token, choice='approve')
406    disapprove_url = self.get_callback_url(
407        random_token=random_token, choice='disapprove')
408
409    mail_args = kwargs.copy()
410    mail_args['body'] = mail_args['body'] % {
411        'approve_url': approve_url,
412        'disapprove_url': disapprove_url,
413    }
414    if 'html' in mail_args:
415      mail_args['html'] = mail_args['html'] % {
416        'approve_url': cgi.escape(approve_url),
417        'disapprove_url': cgi.escape(disapprove_url),
418      }
419    EmailToContinue._email_message.im_func(**mail_args).send()
420
421  def run_test(self, **kwargs):
422    self.run(**kwargs)
423    self.complete(True)
424
425  def callback(self, random_token=None, choice=None):
426    if random_token != self.kwargs['random_token']:
427      return (403, 'text/html', '<h1>Invalid security token.</h1>')
428
429    if choice == 'approve':
430      self.complete(True)
431      return (200, 'text/html', self.kwargs['approve_html'])
432    elif choice == 'disapprove':
433      self.complete(False)
434      return (200, 'text/html', self.kwargs['disapprove_html'])
435    else:
436      return (400, 'text/html', '<h1>Invalid "choice" value.</h1>')
437