1#!/usr/bin/env python 2# 3# Copyright 2010 Google Inc. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Common Pipelines for easy reuse.""" 18 19import cgi 20import logging 21import random 22 23from google.appengine.api import mail 24from google.appengine.api import taskqueue 25 26import pipeline 27 28 29class Return(pipeline.Pipeline): 30 """Causes calling generator to have the supplied default output value. 31 32 Only works when yielded last! 33 """ 34 35 def run(self, return_value=None): 36 return return_value 37 38 39class Ignore(pipeline.Pipeline): 40 """Mark the supplied parameters as unused outputs of sibling pipelines.""" 41 42 def run(self, *args): 43 pass 44 45 46class Dict(pipeline.Pipeline): 47 """Returns a dictionary with the supplied keyword arguments.""" 48 49 def run(self, **kwargs): 50 return dict(**kwargs) 51 52 53class List(pipeline.Pipeline): 54 """Returns a list with the supplied positional arguments.""" 55 56 def run(self, *args): 57 return list(args) 58 59 60class AbortIfTrue(pipeline.Pipeline): 61 """Aborts the entire pipeline if the supplied argument is True.""" 62 63 def run(self, value, message=''): 64 if value: 65 raise pipeline.Abort(message) 66 67 68class All(pipeline.Pipeline): 69 """Returns True if all of the values are True. 70 71 Returns False if there are no values present. 72 """ 73 74 def run(self, *args): 75 if len(args) == 0: 76 return False 77 for value in args: 78 if not value: 79 return False 80 return True 81 82 83class Any(pipeline.Pipeline): 84 """Returns True if any of the values are True.""" 85 86 def run(self, *args): 87 for value in args: 88 if value: 89 return True 90 return False 91 92 93class Complement(pipeline.Pipeline): 94 """Returns the boolean complement of the values.""" 95 96 def run(self, *args): 97 if len(args) == 1: 98 return not args[0] 99 else: 100 return [not value for value in args] 101 102 103class Max(pipeline.Pipeline): 104 """Returns the max value.""" 105 106 def __init__(self, *args): 107 if len(args) == 0: 108 raise TypeError('max expected at least 1 argument, got 0') 109 pipeline.Pipeline.__init__(self, *args) 110 111 def run(self, *args): 112 return max(args) 113 114 115class Min(pipeline.Pipeline): 116 """Returns the min value.""" 117 118 def __init__(self, *args): 119 if len(args) == 0: 120 raise TypeError('min expected at least 1 argument, got 0') 121 pipeline.Pipeline.__init__(self, *args) 122 123 def run(self, *args): 124 return min(args) 125 126 127class Sum(pipeline.Pipeline): 128 """Returns the sum of all values.""" 129 130 def __init__(self, *args): 131 if len(args) == 0: 132 raise TypeError('sum expected at least 1 argument, got 0') 133 pipeline.Pipeline.__init__(self, *args) 134 135 def run(self, *args): 136 return sum(args) 137 138 139class Multiply(pipeline.Pipeline): 140 """Returns all values multiplied together.""" 141 142 def __init__(self, *args): 143 if len(args) == 0: 144 raise TypeError('multiply expected at least 1 argument, got 0') 145 pipeline.Pipeline.__init__(self, *args) 146 147 def run(self, *args): 148 total = 1 149 for value in args: 150 total *= value 151 return total 152 153 154class Negate(pipeline.Pipeline): 155 """Returns each value supplied multiplied by -1.""" 156 157 def __init__(self, *args): 158 if len(args) == 0: 159 raise TypeError('negate expected at least 1 argument, got 0') 160 pipeline.Pipeline.__init__(self, *args) 161 162 def run(self, *args): 163 if len(args) == 1: 164 return -1 * args[0] 165 else: 166 return [-1 * x for x in args] 167 168 169class Extend(pipeline.Pipeline): 170 """Combine together lists and tuples into a single list. 171 172 Args: 173 *args: One or more lists or tuples. 174 175 Returns: 176 A single list of all supplied lists merged together in order. Length of 177 the output list is the sum of the lengths of all input lists. 178 """ 179 180 def run(self, *args): 181 combined = [] 182 for value in args: 183 combined.extend(value) 184 return combined 185 186 187class Append(pipeline.Pipeline): 188 """Combine together values into a list. 189 190 Args: 191 *args: One or more values. 192 193 Returns: 194 A single list of all values appended to the same list. Length of the 195 output list matches the length of the input list. 196 """ 197 198 def run(self, *args): 199 combined = [] 200 for value in args: 201 combined.append(value) 202 return combined 203 204 205class Concat(pipeline.Pipeline): 206 """Concatenates strings together using a join character. 207 208 Args: 209 *args: One or more strings. 210 separator: Keyword argument only; the string to use to join the args. 211 212 Returns: 213 The joined string. 214 """ 215 216 def run(self, *args, **kwargs): 217 separator = kwargs.get('separator', '') 218 return separator.join(args) 219 220 221class Union(pipeline.Pipeline): 222 """Like Extend, but the resulting list has all unique elements.""" 223 224 def run(self, *args): 225 combined = set() 226 for value in args: 227 combined.update(value) 228 return list(combined) 229 230 231class Intersection(pipeline.Pipeline): 232 """Returns only those items belonging to all of the supplied lists. 233 234 Each argument must be a list. No individual items are permitted. 235 """ 236 237 def run(self, *args): 238 if not args: 239 return [] 240 result = set(args[0]) 241 for value in args[1:]: 242 result.intersection_update(set(value)) 243 return list(result) 244 245 246class Uniquify(pipeline.Pipeline): 247 """Returns a list of unique items from the list of items supplied.""" 248 249 def run(self, *args): 250 return list(set(args)) 251 252 253class Format(pipeline.Pipeline): 254 """Formats a string with formatting arguments.""" 255 256 @classmethod 257 def dict(cls, message, **format_dict): 258 """Formats a dictionary. 259 260 Args: 261 message: The format string. 262 **format_dict: Keyword arguments of format parameters to use for 263 formatting the string. 264 265 Returns: 266 The formatted string. 267 """ 268 return cls('dict', message, format_dict) 269 270 @classmethod 271 def tuple(cls, message, *params): 272 """Formats a tuple. 273 274 Args: 275 message: The format string. 276 *params: The formatting positional parameters. 277 278 Returns: 279 The formatted string. 280 """ 281 return cls('tuple', message, *params) 282 283 def run(self, format_type, message, *params): 284 if format_type == 'dict': 285 return message % params[0] 286 elif format_type == 'tuple': 287 return message % params 288 else: 289 raise pipeline.Abort('Invalid format type: %s' % format_type) 290 291 292class Log(pipeline.Pipeline): 293 """Logs a message, just like the Python logging module.""" 294 295 # TODO: Hack the call stack of the logging message to use the file and line 296 # context from when it was first scheduled, not when it actually ran. 297 298 _log_method = logging.log 299 300 @classmethod 301 def log(cls, *args, **kwargs): 302 return Log(*args, **kwargs) 303 304 @classmethod 305 def debug(cls, *args, **kwargs): 306 return Log(logging.DEBUG, *args, **kwargs) 307 308 @classmethod 309 def info(cls, *args, **kwargs): 310 return Log(logging.INFO, *args, **kwargs) 311 312 @classmethod 313 def warning(cls, *args, **kwargs): 314 return Log(logging.WARNING, *args, **kwargs) 315 316 @classmethod 317 def error(cls, *args, **kwargs): 318 return Log(logging.ERROR, *args, **kwargs) 319 320 @classmethod 321 def critical(cls, *args, **kwargs): 322 return Log(logging.CRITICAL, *args, **kwargs) 323 324 def run(self, level, message, *args): 325 Log._log_method.im_func(level, message, *args) 326 327 328class Delay(pipeline.Pipeline): 329 """Waits N seconds before completion. 330 331 Args: 332 seconds: Keyword argument only. The number of seconds to wait. Will be 333 rounded to the nearest whole second. 334 335 Returns: 336 How long this delay waited. 337 """ 338 339 async = True 340 341 def __init__(self, *args, **kwargs): 342 if len(args) != 0 or len(kwargs) != 1 or kwargs.keys()[0] != 'seconds': 343 raise TypeError('Delay takes one keyword parameter, "seconds".') 344 pipeline.Pipeline.__init__(self, *args, **kwargs) 345 346 def run(self, seconds=None): 347 task = self.get_callback_task( 348 countdown=seconds, 349 name='ae-pipeline-delay-' + self.pipeline_id) 350 try: 351 task.add(self.queue_name) 352 except (taskqueue.TombstonedTaskError, taskqueue.TaskAlreadyExistsError): 353 pass 354 355 def run_test(self, seconds=None): 356 logging.debug('Delay pipeline pretending to sleep %0.2f seconds', seconds) 357 self.complete(seconds) 358 359 def callback(self): 360 self.complete(self.kwargs['seconds']) 361 362 363class EmailToContinue(pipeline.Pipeline): 364 """Emails someone asking if the pipeline should continue. 365 366 When the user clicks "Approve", the pipeline will return True. When the 367 user clicks "Disapprove", the pipeline will return False. 368 369 Supply normal mail.EmailMessage parameters, plus two additional parameters: 370 371 approve_html: HTML to show to the user after clicking approve. 372 disapprove_html: HTML to show to the user after clicking disapprove. 373 374 Additionally, the 'body' and 'html' keyword arguments are treated as Python 375 dictionary templates with the keywords 'approval_url' and 'disapprove_url', 376 which let you place those links in your email however you want (as long 377 as clicking the links results in a GET request). The approve/disapprove URLs 378 are relative paths (e.g., '/relative/foo/bar'), so you must connect them to 379 whatever hostname you actually want users to access the callback on with an 380 absolute URL. 381 382 A random token is used to secure the asynchronous action. 383 """ 384 385 async = True 386 public_callbacks = True 387 388 _email_message = mail.EmailMessage 389 390 def __init__(self, **kwargs): 391 if 'random_token' not in kwargs: 392 kwargs['random_token'] = '%x' % random.randint(0, 2**64) 393 if 'approve_html' not in kwargs: 394 kwargs['approve_html'] = '<h1>Approved!</h1>' 395 if 'disapprove_html' not in kwargs: 396 kwargs['disapprove_html'] = '<h1>Not Approved!</h1>' 397 pipeline.Pipeline.__init__(self, **kwargs) 398 399 def run(self, **kwargs): 400 random_token = kwargs.pop('random_token') 401 kwargs.pop('approve_html', '') 402 kwargs.pop('disapprove_html', '') 403 404 approve_url = self.get_callback_url( 405 random_token=random_token, choice='approve') 406 disapprove_url = self.get_callback_url( 407 random_token=random_token, choice='disapprove') 408 409 mail_args = kwargs.copy() 410 mail_args['body'] = mail_args['body'] % { 411 'approve_url': approve_url, 412 'disapprove_url': disapprove_url, 413 } 414 if 'html' in mail_args: 415 mail_args['html'] = mail_args['html'] % { 416 'approve_url': cgi.escape(approve_url), 417 'disapprove_url': cgi.escape(disapprove_url), 418 } 419 EmailToContinue._email_message.im_func(**mail_args).send() 420 421 def run_test(self, **kwargs): 422 self.run(**kwargs) 423 self.complete(True) 424 425 def callback(self, random_token=None, choice=None): 426 if random_token != self.kwargs['random_token']: 427 return (403, 'text/html', '<h1>Invalid security token.</h1>') 428 429 if choice == 'approve': 430 self.complete(True) 431 return (200, 'text/html', self.kwargs['approve_html']) 432 elif choice == 'disapprove': 433 self.complete(False) 434 return (200, 'text/html', self.kwargs['disapprove_html']) 435 else: 436 return (400, 'text/html', '<h1>Invalid "choice" value.</h1>') 437