1# Copyright 2013 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import time
6import traceback
7
8from app_yaml_helper import AppYamlHelper
9from appengine_wrappers import IsDeadlineExceededError, logservice, taskqueue
10from branch_utility import BranchUtility
11from compiled_file_system import CompiledFileSystem
12from custom_logger import CustomLogger
13from data_source_registry import CreateDataSources
14from environment import GetAppVersion
15from gcs_file_system_provider import CloudStorageFileSystemProvider
16from github_file_system_provider import GithubFileSystemProvider
17from host_file_system_provider import HostFileSystemProvider
18from object_store_creator import ObjectStoreCreator
19from render_refresher import RenderRefresher
20from server_instance import ServerInstance
21from servlet import Servlet, Request, Response
22from timer import Timer
23
24
25_log = CustomLogger('cron')
26
27
28class CronServlet(Servlet):
29  '''Servlet which runs a cron job.
30  '''
31  def __init__(self, request, delegate_for_test=None):
32    Servlet.__init__(self, request)
33    self._delegate = delegate_for_test or CronServlet.Delegate()
34
35  class Delegate(object):
36    '''CronServlet's runtime dependencies. Override for testing.
37    '''
38    def CreateBranchUtility(self, object_store_creator):
39      return BranchUtility.Create(object_store_creator)
40
41    def CreateHostFileSystemProvider(self,
42                                     object_store_creator,
43                                     pinned_commit=None):
44      return HostFileSystemProvider(object_store_creator,
45                                    pinned_commit=pinned_commit)
46
47    def CreateGithubFileSystemProvider(self, object_store_creator):
48      return GithubFileSystemProvider(object_store_creator)
49
50    def CreateGCSFileSystemProvider(self, object_store_creator):
51      return CloudStorageFileSystemProvider(object_store_creator)
52
53    def GetAppVersion(self):
54      return GetAppVersion()
55
56  def Get(self):
57    # Refreshes may time out, and if they do we need to make sure to flush the
58    # logs before the process gets killed (Python gives us a couple of
59    # seconds).
60    #
61    # So, manually flush logs at the end of the cron run. However, sometimes
62    # even that isn't enough, which is why in this file we use _log and
63    # make it flush the log every time its used.
64    logservice.AUTOFLUSH_ENABLED = False
65    try:
66      return self._GetImpl()
67    except BaseException:
68      _log.error('Caught top-level exception! %s', traceback.format_exc())
69    finally:
70      logservice.flush()
71
72  def _GetImpl(self):
73    # Cron strategy:
74    #
75    # Collect all DataSources, the PlatformBundle, the ContentProviders, and
76    # any other statically renderered contents (e.g. examples content),
77    # and spin up taskqueue tasks which will refresh any cached data relevant
78    # to these assets.
79    #
80    # TODO(rockot/kalman): At the moment examples are not actually refreshed
81    # because they're too slow.
82
83    _log.info('starting')
84
85    server_instance = self._GetSafeServerInstance()
86    master_fs = server_instance.host_file_system_provider.GetMaster()
87    master_commit = master_fs.GetCommitID().Get()
88
89    # This is the guy that would be responsible for refreshing the cache of
90    # examples. Here for posterity, hopefully it will be added to the targets
91    # below someday.
92    render_refresher = RenderRefresher(server_instance, self._request)
93
94    # Get the default taskqueue
95    queue = taskqueue.Queue()
96
97    # GAE documentation specifies that it's bad to add tasks to a queue
98    # within one second of purging. We wait 2 seconds, because we like
99    # to go the extra mile.
100    queue.purge()
101    time.sleep(2)
102
103    success = True
104    try:
105      data_sources = CreateDataSources(server_instance)
106      targets = (data_sources.items() +
107                 [('content_providers', server_instance.content_providers),
108                  ('platform_bundle', server_instance.platform_bundle)])
109      title = 'initializing %s parallel targets' % len(targets)
110      _log.info(title)
111      timer = Timer()
112      for name, target in targets:
113        refresh_paths = target.GetRefreshPaths()
114        for path in refresh_paths:
115          queue.add(taskqueue.Task(url='/_refresh/%s/%s' % (name, path),
116                                   params={'commit': master_commit}))
117      _log.info('%s took %s' % (title, timer.Stop().FormatElapsed()))
118    except:
119      # This should never actually happen (each cron step does its own
120      # conservative error checking), so re-raise no matter what it is.
121      _log.error('uncaught error: %s' % traceback.format_exc())
122      success = False
123      raise
124    finally:
125      _log.info('finished (%s)', 'success' if success else 'FAILED')
126      return (Response.Ok('Success') if success else
127              Response.InternalError('Failure'))
128
129  def _GetSafeServerInstance(self):
130    '''Returns a ServerInstance with a host file system at a safe commit,
131    meaning the last commit that the current running version of the server
132    existed.
133    '''
134    delegate = self._delegate
135
136    # IMPORTANT: Get a ServerInstance pinned to the most recent commit, not
137    # HEAD. These cron jobs take a while and run very frequently such that
138    # there is usually one running at any given time, and eventually a file
139    # that we're dealing with will change underneath it, putting the server in
140    # an undefined state.
141    server_instance_near_head = self._CreateServerInstance(
142        self._GetMostRecentCommit())
143
144    app_yaml_handler = AppYamlHelper(
145        server_instance_near_head.object_store_creator,
146        server_instance_near_head.host_file_system_provider)
147
148    if app_yaml_handler.IsUpToDate(delegate.GetAppVersion()):
149      return server_instance_near_head
150
151    # The version in app.yaml is greater than the currently running app's.
152    # The safe version is the one before it changed.
153    safe_revision = app_yaml_handler.GetFirstRevisionGreaterThan(
154        delegate.GetAppVersion()) - 1
155
156    _log.info('app version %s is out of date, safe is %s',
157        delegate.GetAppVersion(), safe_revision)
158
159    return self._CreateServerInstance(safe_revision)
160
161  def _GetMostRecentCommit(self):
162    '''Gets the commit of the most recent patch submitted to the host file
163    system. This is similar to HEAD but it's a concrete commit so won't
164    change as the cron runs.
165    '''
166    head_fs = (
167        self._CreateServerInstance(None).host_file_system_provider.GetMaster())
168    return head_fs.GetCommitID().Get()
169
170  def _CreateServerInstance(self, commit):
171    '''Creates a ServerInstance pinned to |commit|, or HEAD if None.
172    NOTE: If passed None it's likely that during the cron run patches will be
173    submitted at HEAD, which may change data underneath the cron run.
174    '''
175    object_store_creator = ObjectStoreCreator(start_empty=True)
176    branch_utility = self._delegate.CreateBranchUtility(object_store_creator)
177    host_file_system_provider = self._delegate.CreateHostFileSystemProvider(
178        object_store_creator, pinned_commit=commit)
179    github_file_system_provider = self._delegate.CreateGithubFileSystemProvider(
180        object_store_creator)
181    gcs_file_system_provider = self._delegate.CreateGCSFileSystemProvider(
182        object_store_creator)
183    return ServerInstance(object_store_creator,
184                          CompiledFileSystem.Factory(object_store_creator),
185                          branch_utility,
186                          host_file_system_provider,
187                          github_file_system_provider,
188                          gcs_file_system_provider)
189