1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import math
6import threading
7
8import common
9from autotest_lib.client.common_lib import error
10from autotest_lib.client.common_lib.cros import retry
11from autotest_lib.frontend.afe.json_rpc import proxy
12from autotest_lib.server import frontend
13from chromite.lib import retry_util
14from chromite.lib import timeout_util
15
16
17def convert_timeout_to_retry(backoff, timeout_min, delay_sec):
18    """Compute the number of retry attempts for use with chromite.retry_util.
19
20    @param backoff: The exponential backoff factor.
21    @param timeout_min: The maximum amount of time (in minutes) to sleep.
22    @param delay_sec: The amount to sleep (in seconds) between each attempt.
23
24    @return: The number of retry attempts in the case of exponential backoff.
25    """
26    # Estimate the max_retry in the case of exponential backoff:
27    # => total_sleep = sleep*sum(r=0..max_retry-1, backoff^r)
28    # => total_sleep = sleep( (1-backoff^max_retry) / (1-backoff) )
29    # => max_retry*ln(backoff) = ln(1-(total_sleep/sleep)*(1-backoff))
30    # => max_retry = ln(1-(total_sleep/sleep)*(1-backoff))/ln(backoff)
31    total_sleep = timeout_min * 60
32    numerator = math.log10(1-(total_sleep/delay_sec)*(1-backoff))
33    denominator = math.log10(backoff)
34    return int(math.ceil(numerator/denominator))
35
36
37class RetryingAFE(frontend.AFE):
38    """Wrapper around frontend.AFE that retries all RPCs.
39
40    Timeout for retries and delay between retries are configurable.
41    """
42    def __init__(self, timeout_min=30, delay_sec=10, **dargs):
43        """Constructor
44
45        @param timeout_min: timeout in minutes until giving up.
46        @param delay_sec: pre-jittered delay between retries in seconds.
47        """
48        self.timeout_min = timeout_min
49        self.delay_sec = delay_sec
50        super(RetryingAFE, self).__init__(**dargs)
51
52
53    def run(self, call, **dargs):
54        # exc_retry: We retry if this exception is raised.
55        # blacklist: Exceptions that we raise immediately if caught.
56        exc_retry = Exception
57        blacklist = (ImportError, error.RPCException, proxy.JSONRPCException,
58                     timeout_util.TimeoutError)
59        backoff = 2
60        max_retry = convert_timeout_to_retry(backoff, self.timeout_min,
61                                             self.delay_sec)
62
63        def _run(self, call, **dargs):
64            return super(RetryingAFE, self).run(call, **dargs)
65
66        def handler(exc):
67            """Check if exc is an exc_retry or if it's blacklisted.
68
69            @param exc: An exception.
70
71            @return: True if exc is an exc_retry and is not
72                     blacklisted. False otherwise.
73            """
74            is_exc_to_check = isinstance(exc, exc_retry)
75            is_blacklisted = isinstance(exc, blacklist)
76            return is_exc_to_check and not is_blacklisted
77
78        # If the call is not in main thread, signal can't be used to abort the
79        # call. In that case, use a basic retry which does not enforce timeout
80        # if the process hangs.
81        @retry.retry(Exception, timeout_min=self.timeout_min,
82                     delay_sec=self.delay_sec,
83                     blacklist=[ImportError, error.RPCException,
84                                proxy.ValidationError])
85        def _run_in_child_thread(self, call, **dargs):
86            return super(RetryingAFE, self).run(call, **dargs)
87
88        if isinstance(threading.current_thread(), threading._MainThread):
89            # Set the keyword argument for GenericRetry
90            dargs['sleep'] = self.delay_sec
91            dargs['backoff_factor'] = backoff
92            with timeout_util.Timeout(self.timeout_min * 60):
93                return retry_util.GenericRetry(handler, max_retry, _run,
94                                               self, call, **dargs)
95        else:
96            return _run_in_child_thread(self, call, **dargs)
97
98
99class RetryingTKO(frontend.TKO):
100    """Wrapper around frontend.TKO that retries all RPCs.
101
102    Timeout for retries and delay between retries are configurable.
103    """
104    def __init__(self, timeout_min=30, delay_sec=10, **dargs):
105        """Constructor
106
107        @param timeout_min: timeout in minutes until giving up.
108        @param delay_sec: pre-jittered delay between retries in seconds.
109        """
110        self.timeout_min = timeout_min
111        self.delay_sec = delay_sec
112        super(RetryingTKO, self).__init__(**dargs)
113
114
115    def run(self, call, **dargs):
116        @retry.retry(Exception, timeout_min=self.timeout_min,
117                     delay_sec=self.delay_sec,
118                     blacklist=[ImportError, error.RPCException,
119                                proxy.ValidationError])
120        def _run(self, call, **dargs):
121            return super(RetryingTKO, self).run(call, **dargs)
122        return _run(self, call, **dargs)
123