cron_servlet.py revision 1e9bf3e0803691d0a228da41fc608347b6db4340
1# Copyright 2013 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import time
7import traceback
8
9from app_yaml_helper import AppYamlHelper
10from appengine_wrappers import (
11    GetAppVersion, IsDeadlineExceededError, IsDevServer, logservice)
12from branch_utility import BranchUtility
13from compiled_file_system import CompiledFileSystem
14from data_source_registry import CreateDataSources
15from empty_dir_file_system import EmptyDirFileSystem
16from file_system_util import CreateURLsFromPaths
17from github_file_system_provider import GithubFileSystemProvider
18from host_file_system_provider import HostFileSystemProvider
19from object_store_creator import ObjectStoreCreator
20from render_servlet import RenderServlet
21from server_instance import ServerInstance
22from servlet import Servlet, Request, Response
23import svn_constants
24
25class _SingletonRenderServletDelegate(RenderServlet.Delegate):
26  def __init__(self, server_instance):
27    self._server_instance = server_instance
28
29  def CreateServerInstance(self):
30    return self._server_instance
31
32class _CronLogger(object):
33  '''Wraps the logging.* methods to prefix them with 'cron' and flush
34  immediately. The flushing is important because often these cron runs time
35  out and we lose the logs.
36  '''
37  def info(self, msg, *args):    self._log(logging.info, msg, args)
38  def warning(self, msg, *args): self._log(logging.warning, msg, args)
39  def error(self, msg, *args):   self._log(logging.error, msg, args)
40
41  def _log(self, logfn, msg, args):
42    try:
43      logfn('cron: %s' % msg, *args)
44    finally:
45      logservice.flush()
46
47_cronlog = _CronLogger()
48
49def _RequestEachItem(title, items, request_callback):
50  '''Runs a task |request_callback| named |title| for each item in |items|.
51  |request_callback| must take an item and return a servlet response.
52  Returns true if every item was successfully run, false if any return a
53  non-200 response or raise an exception.
54  '''
55  _cronlog.info('%s: starting', title)
56  success_count, failure_count = 0, 0
57  start_time = time.time()
58  try:
59    for i, item in enumerate(items):
60      def error_message(detail):
61        return '%s: error rendering %s (%s of %s): %s' % (
62            title, item, i + 1, len(items), detail)
63      try:
64        response = request_callback(item)
65        if response.status == 200:
66          success_count += 1
67        else:
68          _cronlog.error(error_message('response status %s' % response.status))
69          failure_count += 1
70      except Exception as e:
71        _cronlog.error(error_message(traceback.format_exc()))
72        failure_count += 1
73        if IsDeadlineExceededError(e): raise
74  finally:
75    elapsed_seconds = time.time() - start_time
76    _cronlog.info('%s: rendered %s of %s with %s failures in %s seconds',
77        title, success_count, len(items), failure_count, elapsed_seconds);
78  return success_count == len(items)
79
80class CronServlet(Servlet):
81  '''Servlet which runs a cron job.
82  '''
83  def __init__(self, request, delegate_for_test=None):
84    Servlet.__init__(self, request)
85    self._delegate = delegate_for_test or CronServlet.Delegate()
86
87  class Delegate(object):
88    '''CronServlet's runtime dependencies. Override for testing.
89    '''
90    def CreateBranchUtility(self, object_store_creator):
91      return BranchUtility.Create(object_store_creator)
92
93    def CreateHostFileSystemProvider(self,
94                                     object_store_creator,
95                                     max_trunk_revision=None):
96      return HostFileSystemProvider(object_store_creator,
97                                    max_trunk_revision=max_trunk_revision)
98
99    def CreateGithubFileSystemProvider(self, object_store_creator):
100      return GithubFileSystemProvider(object_store_creator)
101
102    def GetAppVersion(self):
103      return GetAppVersion()
104
105  def Get(self):
106    # Crons often time out, and if they do we need to make sure to flush the
107    # logs before the process gets killed (Python gives us a couple of
108    # seconds).
109    #
110    # So, manually flush logs at the end of the cron run. However, sometimes
111    # even that isn't enough, which is why in this file we use _cronlog and
112    # make it flush the log every time its used.
113    logservice.AUTOFLUSH_ENABLED = False
114    try:
115      return self._GetImpl()
116    except BaseException:
117      _cronlog.error('Caught top-level exception! %s', traceback.format_exc())
118    finally:
119      logservice.flush()
120
121  def _GetImpl(self):
122    # Cron strategy:
123    #
124    # Find all public template files and static files, and render them. Most of
125    # the time these won't have changed since the last cron run, so it's a
126    # little wasteful, but hopefully rendering is really fast (if it isn't we
127    # have a problem).
128    _cronlog.info('starting')
129
130    # This is returned every time RenderServlet wants to create a new
131    # ServerInstance.
132    #
133    # TODO(kalman): IMPORTANT. This sometimes throws an exception, breaking
134    # everything. Need retry logic at the fetcher level.
135    server_instance = self._GetSafeServerInstance()
136    trunk_fs = server_instance.host_file_system_provider.GetTrunk()
137
138    def render(path):
139      request = Request(path, self._request.host, self._request.headers)
140      delegate = _SingletonRenderServletDelegate(server_instance)
141      return RenderServlet(request, delegate).Get()
142
143    def request_files_in_dir(path, prefix=''):
144      '''Requests every file found under |path| in this host file system, with
145      a request prefix of |prefix|.
146      '''
147      files = [name for name, _ in CreateURLsFromPaths(trunk_fs, path, prefix)]
148      return _RequestEachItem(path, files, render)
149
150    results = []
151
152    try:
153      # Rendering the public templates will also pull in all of the private
154      # templates.
155      results.append(request_files_in_dir(svn_constants.PUBLIC_TEMPLATE_PATH))
156
157      # Rendering the public templates will have pulled in the .js and
158      # manifest.json files (for listing examples on the API reference pages),
159      # but there are still images, CSS, etc.
160      results.append(request_files_in_dir(svn_constants.STATIC_PATH,
161                                          prefix='static/'))
162
163      # Samples are too expensive to run on the dev server, where there is no
164      # parallel fetch.
165      if not IsDevServer():
166        # Fetch each individual sample file.
167        results.append(request_files_in_dir(svn_constants.EXAMPLES_PATH,
168                                            prefix='extensions/examples/'))
169
170        # Fetch the zip file of each example (contains all the individual
171        # files).
172        example_zips = []
173        for root, _, files in trunk_fs.Walk(svn_constants.EXAMPLES_PATH):
174          example_zips.extend(
175              root + '.zip' for name in files if name == 'manifest.json')
176        results.append(_RequestEachItem(
177            'example zips',
178            example_zips,
179            lambda path: render('extensions/examples/' + path)))
180
181      def run_cron(data_source):
182        title = data_source.__class__.__name__
183        _cronlog.info('%s: starting' % title)
184        start_time = time.time()
185        try:
186          data_source.Cron()
187        except Exception as e:
188          _cronlog.error('%s: error %s' % (title, traceback.format_exc()))
189          results.append(False)
190          if IsDeadlineExceededError(e): raise
191        finally:
192          _cronlog.info(
193              '%s: took %s seconds' % (title, time.time() - start_time))
194
195      for data_source in CreateDataSources(server_instance).values():
196        run_cron(data_source)
197
198      run_cron(server_instance.redirector)
199
200    except:
201      results.append(False)
202      # This should never actually happen (each cron step does its own
203      # conservative error checking), so re-raise no matter what it is.
204      _cronlog.error('uncaught error: %s' % traceback.format_exc())
205      raise
206    finally:
207      success = all(results)
208      _cronlog.info('finished (%s)', 'success' if success else 'FAILED')
209      return (Response.Ok('Success') if success else
210              Response.InternalError('Failure'))
211
212  def _GetSafeServerInstance(self):
213    '''Returns a ServerInstance with a host file system at a safe revision,
214    meaning the last revision that the current running version of the server
215    existed.
216    '''
217    delegate = self._delegate
218
219    # IMPORTANT: Get a ServerInstance pinned to the most recent revision, not
220    # HEAD. These cron jobs take a while and run very frequently such that
221    # there is usually one running at any given time, and eventually a file
222    # that we're dealing with will change underneath it, putting the server in
223    # an undefined state.
224    server_instance_near_head = self._CreateServerInstance(
225        self._GetMostRecentRevision())
226
227    app_yaml_handler = AppYamlHelper(
228        svn_constants.APP_YAML_PATH,
229        server_instance_near_head.object_store_creator,
230        server_instance_near_head.host_file_system_provider)
231
232    if app_yaml_handler.IsUpToDate(delegate.GetAppVersion()):
233      return server_instance_near_head
234
235    # The version in app.yaml is greater than the currently running app's.
236    # The safe version is the one before it changed.
237    safe_revision = app_yaml_handler.GetFirstRevisionGreaterThan(
238        delegate.GetAppVersion()) - 1
239
240    _cronlog.info('app version %s is out of date, safe is %s',
241        delegate.GetAppVersion(), safe_revision)
242
243    return self._CreateServerInstance(safe_revision)
244
245  def _GetMostRecentRevision(self):
246    '''Gets the revision of the most recent patch submitted to the host file
247    system. This is similar to HEAD but it's a concrete revision so won't
248    change as the cron runs.
249    '''
250    head_fs = (
251        self._CreateServerInstance(None).host_file_system_provider.GetTrunk())
252    return head_fs.Stat('/').version
253
254  def _CreateServerInstance(self, revision):
255    '''Creates a ServerInstance pinned to |revision|, or HEAD if None.
256    NOTE: If passed None it's likely that during the cron run patches will be
257    submitted at HEAD, which may change data underneath the cron run.
258    '''
259    object_store_creator = ObjectStoreCreator(start_empty=True)
260    branch_utility = self._delegate.CreateBranchUtility(object_store_creator)
261    host_file_system_provider = self._delegate.CreateHostFileSystemProvider(
262        object_store_creator, max_trunk_revision=revision)
263    github_file_system_provider = self._delegate.CreateGithubFileSystemProvider(
264        object_store_creator)
265    return ServerInstance(object_store_creator,
266                          CompiledFileSystem.Factory(object_store_creator),
267                          branch_utility,
268                          host_file_system_provider,
269                          github_file_system_provider)
270