1#!/usr/bin/python
2
3from __future__ import print_function
4
5import argparse
6import logging
7import multiprocessing
8import os
9import subprocess
10import sys
11import time
12
13import common
14from autotest_lib.server import frontend
15from autotest_lib.site_utils.lib import infra
16
17DEPLOY_SERVER_LOCAL = ('/usr/local/autotest/site_utils/deploy_server_local.py')
18POOL_SIZE = 124
19PUSH_ORDER = {'database': 0,
20              'database_slave': 0,
21              'drone': 1,
22              'shard': 1,
23              'golo_proxy': 1,
24              'afe': 2,
25              'scheduler': 2,
26              'host_scheduler': 2,
27              'suite_scheduler': 2}
28
29
30def discover_servers(afe, server_filter=set()):
31    """Discover the in-production servers to update.
32
33    @param afe: Server to contact with RPC requests.
34    @param server_filter: A set of servers to get status for.
35
36    @returns: A list of a list of tuple of (server_name, server_status, roles).
37              The list is sorted by the order to be updated. Servers in the same
38              sublist can be pushed together.
39
40    """
41    # Example server details....
42    # {
43    #     'hostname': 'server1',
44    #     'status': 'backup',
45    #     'roles': ['drone', 'scheduler'],
46    #     'attributes': {'max_processes': 300}
47    # }
48    rpc = frontend.AFE(server=afe)
49    servers = rpc.run('get_servers')
50
51    # Do not update servers that need repair, and filter the server list by
52    # given server_filter if needed.
53    servers = [s for s in servers
54               if (s['status'] != 'repair_required' and
55                   (not server_filter or s['hostname'] in server_filter))]
56
57    # Do not update reserve, devserver or crash_server (not YET supported).
58    servers = [s for s in servers if 'devserver' not in s['roles'] and
59               'crash_server' not in s['roles'] and
60               'reserve' not in s['roles']]
61
62    sorted_servers = []
63    for i in range(max(PUSH_ORDER.values()) + 1):
64        sorted_servers.append([])
65    servers_with_unknown_order = []
66    for server in servers:
67        info = (server['hostname'], server['status'], server['roles'])
68        try:
69            order = min([PUSH_ORDER[r] for r in server['roles']
70                         if r in PUSH_ORDER])
71            sorted_servers[order].append(info)
72        except ValueError:
73            # All roles are not indexed in PUSH_ORDER.
74            servers_with_unknown_order.append(info)
75
76    # Push all servers with unknown roles together.
77    if servers_with_unknown_order:
78        sorted_servers.append(servers_with_unknown_order)
79
80    found_servers = set([s['hostname'] for s in servers])
81    # Inject the servers passed in by user but not found in server database.
82    extra_servers = []
83    for server in server_filter - found_servers:
84        extra_servers.append((server, 'unknown', ['unknown']))
85    if extra_servers:
86        sorted_servers.append(extra_servers)
87
88    return sorted_servers
89
90
91def parse_arguments(args):
92    """Parse command line arguments.
93
94    @param args: The command line arguments to parse. (usually sys.argv[1:])
95
96    @returns An argparse.Namespace populated with argument values.
97    """
98    parser = argparse.ArgumentParser(
99            formatter_class=argparse.RawDescriptionHelpFormatter,
100            description='Command to update an entire autotest installation.',
101            epilog=('Update all servers:\n'
102                    '  deploy_server.py\n'
103                    '\n'
104                    'Update one server:\n'
105                    '  deploy_server.py <server>\n'
106                    '\n'
107                    'Send arguments to remote deploy_server_local.py:\n'
108                    '  deploy_server.py -- --dryrun\n'
109                    '\n'
110                    'See what arguments would be run on specified servers:\n'
111                    '  deploy_server.py --dryrun <server_a> <server_b> --'
112                    ' --skip-update\n'))
113
114    parser.add_argument('-v', '--verbose', action='store_true', dest='verbose',
115            help='Log all deploy script output.')
116    parser.add_argument('--continue', action='store_true', dest='cont',
117            help='Continue to the next server on failure.')
118    parser.add_argument('--afe', required=True,
119            help='What is the main server for this installation? (cautotest).')
120    parser.add_argument('--update_push_servers', action='store_true',
121            help='Indicate to update test_push servers.')
122    parser.add_argument('--force_update', action='store_true',
123            help='Force to run update commands for afe, tko, build_externals')
124    parser.add_argument('--dryrun', action='store_true',
125            help='Don\'t actually run remote commands.')
126    parser.add_argument('--logfile', action='store',
127            default='/tmp/deployment.log',
128            help='Path to the file to save the deployment log to. Default is '
129                 '/tmp/deployment.log')
130    parser.add_argument('args', nargs=argparse.REMAINDER,
131            help=('<server>, <server> ... -- <remote_arg>, <remote_arg> ...'))
132
133    results = parser.parse_args(args)
134
135    # We take the args list and further split it down. Everything before --
136    # is a server name, and everything after it is an argument to pass along
137    # to deploy_server_local.py.
138    #
139    # This:
140    #   server_a, server_b -- --dryrun --skip-report
141    #
142    # Becomes:
143    #   args.servers['server_a', 'server_b']
144    #   args.args['--dryrun', '--skip-report']
145    try:
146        local_args_index = results.args.index('--') + 1
147    except ValueError:
148        # If -- isn't present, they are all servers.
149        results.servers = results.args
150        results.args = []
151    else:
152        # Split arguments.
153        results.servers = results.args[:local_args_index-1]
154        results.args = results.args[local_args_index:]
155
156    return results
157
158
159def update_server(inputs):
160    """Deploy for given server.
161
162    @param inputs: Inputs for the update action, including:
163                   server: Name of the server to update.
164                   status: Status of the server.
165                   options: Options for the update.
166
167    @return: A tuple of (server, success, output), where:
168             server: Name of the server to be updated.
169             sucess: True if update succeeds, False otherwise.
170             output: A string of the deploy_server_local script output
171                     including any errors.
172
173    """
174    start = time.time()
175    server = inputs['server']
176    status = inputs['status']
177    # Shared list to record the finished server.
178    finished_servers = inputs['finished_servers']
179    options = inputs['options']
180    print('Updating server %s...' % server)
181    if status == 'backup':
182        extra_args = ['--skip-service-status']
183    else:
184        extra_args = []
185
186    cmd = ('%s %s' %
187           (DEPLOY_SERVER_LOCAL, ' '.join(options.args + extra_args)))
188    output = '%s: %s' % (server, cmd)
189    success = True
190    if not options.dryrun:
191        for i in range(5):
192            try:
193                print('[%s/5] Try to update server %s' % (i, server))
194                output = infra.execute_command(server, cmd)
195                finished_servers.append(server)
196                break
197            except subprocess.CalledProcessError as e:
198                print('%s: Command failed with error: %s' % (server, e))
199                success = False
200                output = e.output
201
202    print('Time used to update server %s: %s' % (server, time.time()-start))
203    return server, success, output
204
205
206def update_in_parallel(servers, options):
207    """Update a group of servers in parallel.
208
209    @param servers: A list of tuple of (server_name, server_status, roles).
210    @param options: Options for the push.
211
212    @returns A list of servers that failed to update.
213    """
214    # Create a list to record all the finished servers.
215    manager = multiprocessing.Manager()
216    finished_servers = manager.list()
217
218    args = []
219    for server, status, _ in servers:
220        args.append({'server': server,
221                     'status': status,
222                     'finished_servers': finished_servers,
223                     'options': options})
224    # The update actions run in parallel. If any update failed, we should wait
225    # for other running updates being finished. Abort in the middle of an update
226    # may leave the server in a bad state.
227    pool = multiprocessing.pool.ThreadPool(POOL_SIZE)
228    try:
229        failed_servers = []
230        results = pool.map_async(update_server, args)
231        pool.close()
232
233        # Track the updating progress for current group of servers.
234        incomplete_servers = set()
235        server_names = set([s[0] for s in servers])
236        while not results.ready():
237            incomplete_servers = server_names - set(finished_servers)
238            print('Not finished yet. %d servers in this group. '
239                '%d servers are still running:\n%s\n' %
240                (len(servers), len(incomplete_servers), incomplete_servers))
241            # Check the progress every 1 mins
242            results.wait(60)
243
244        # After update finished, parse the result.
245        for server, success, output in results.get():
246            if options.dryrun:
247                print('Dry run, updating server %s is skipped.' % server)
248            else:
249                if success:
250                    msg = ('Successfully updated server %s.\n' % server)
251                    if options.verbose:
252                        print(output)
253                        print()
254                else:
255                    msg = ('Failed to update server %s.\nError: %s' %
256                        (server, output.strip()))
257                    print(msg)
258                    failed_servers.append(server)
259                # Write the result into logfile.
260                with open(options.logfile, 'a') as f:
261                    f.write(msg)
262    finally:
263        pool.terminate()
264        pool.join()
265
266    return failed_servers
267
268def main(args):
269    """Main routine that drives all the real work.
270
271    @param args: The command line arguments to parse. (usually sys.argv)
272
273    @returns The system exit code.
274    """
275    options = parse_arguments(args[1:])
276    # Remove all the handlers from the root logger to get rid of the handlers
277    # introduced by the import packages.
278    logging.getLogger().handlers = []
279    logging.basicConfig(level=logging.DEBUG
280                        if options.verbose else logging.INFO)
281
282    print('Retrieving server status...')
283    sorted_servers = discover_servers(options.afe, set(options.servers or []))
284
285    # Display what we plan to update.
286    print('Will update (in this order):')
287    i = 1
288    for servers in sorted_servers:
289        print('%s Group %d (%d servers) %s' % ('='*30, i, len(servers), '='*30))
290        for server, status, roles in servers:
291            print('\t%-36s:\t%s\t%s' % (server, status, roles))
292        i += 1
293    print()
294
295    if os.path.exists(options.logfile):
296        os.remove(options.logfile)
297    print ('Start updating, push logs of every server will be saved '
298           'at %s' % options.logfile)
299    failed = []
300    skipped = []
301    for servers in sorted_servers:
302        if not failed or options.cont:
303            failed += update_in_parallel(servers, options)
304        else:
305            skipped.extend(s[0] for s in servers)  # Only include server name.
306
307    if failed:
308        print('Errors updating:')
309        for server in failed:
310            print('  %s' % server)
311        print()
312        print('To retry:')
313        print('  %s <options> %s' %
314              (str(args[0]), str(' '.join(failed + skipped))))
315        # Exit with error.
316        return 1
317
318
319if __name__ == '__main__':
320    sys.exit(main(sys.argv))
321