utils.py revision e9be8c38ab90271133c8b3989055a9ef6aece480
1#!/usr/bin/python
2#
3# Copyright 2008 Google Inc. Released under the GPL v2
4
5import os, pickle, random, re, select, shutil, signal, StringIO, subprocess
6import socket, sys, time, textwrap, urllib, urlparse
7import error, barrier
8
9
10def read_one_line(filename):
11    return open(filename, 'r').readline().strip()
12
13
14def write_one_line(filename, str):
15    open(filename, 'w').write(str.rstrip() + "\n")
16
17
18def read_keyval(path):
19    """
20    Read a key-value pair format file into a dictionary, and return it.
21    Takes either a filename or directory name as input. If it's a
22    directory name, we assume you want the file to be called keyval.
23    """
24    if os.path.isdir(path):
25        path = os.path.join(path, 'keyval')
26    keyval = {}
27    for line in open(path):
28        line = re.sub('#.*', '', line.rstrip())
29        if not re.search(r'^[-\w]+=', line):
30            raise ValueError('Invalid format line: %s' % line)
31        key, value = line.split('=', 1)
32        if re.search('^\d+$', value):
33            value = int(value)
34        elif re.search('^(\d+\.)?\d+$', value):
35            value = float(value)
36        keyval[key] = value
37    return keyval
38
39
40def write_keyval(path, dictionary, type_tag=None):
41    """
42    Write a key-value pair format file out to a file. This uses append
43    mode to open the file, so existing text will not be overwritten or
44    reparsed.
45
46    If type_tag is None, then the key must be composed of alphanumeric
47    characters (or dashes+underscores). However, if type-tag is not
48    null then the keys must also have "{type_tag}" as a suffix. At
49    the moment the only valid values of type_tag are "attr" and "perf".
50    """
51    if os.path.isdir(path):
52        path = os.path.join(path, 'keyval')
53    keyval = open(path, 'a')
54
55    if type_tag is None:
56        key_regex = re.compile(r'^[-\w]+$')
57    else:
58        if type_tag not in ('attr', 'perf'):
59            raise ValueError('Invalid type tag: %s' % type_tag)
60        escaped_tag = re.escape(type_tag)
61        key_regex = re.compile(r'^[-\w]+\{%s\}$' % escaped_tag)
62    try:
63        for key, value in dictionary.iteritems():
64            if not key_regex.search(key):
65                raise ValueError('Invalid key: %s' % key)
66            keyval.write('%s=%s\n' % (key, value))
67    finally:
68        keyval.close()
69
70
71def is_url(path):
72    """Return true if path looks like a URL"""
73    # for now, just handle http and ftp
74    url_parts = urlparse.urlparse(path)
75    return (url_parts[0] in ('http', 'ftp'))
76
77
78def urlopen(url, data=None, proxies=None, timeout=300):
79    """Wrapper to urllib.urlopen with timeout addition."""
80
81    # Save old timeout
82    old_timeout = socket.getdefaulttimeout()
83    socket.setdefaulttimeout(timeout)
84    try:
85        return urllib.urlopen(url, data=data, proxies=proxies)
86    finally:
87        socket.setdefaulttimeout(old_timeout)
88
89
90def urlretrieve(url, filename=None, reporthook=None, data=None, timeout=300):
91    """Wrapper to urllib.urlretrieve with timeout addition."""
92    old_timeout = socket.getdefaulttimeout()
93    socket.setdefaulttimeout(timeout)
94    try:
95        return urllib.urlretrieve(url, filename=filename,
96                                  reporthook=reporthook, data=data)
97    finally:
98        socket.setdefaulttimeout(old_timeout)
99
100
101def get_file(src, dest, permissions=None):
102    """Get a file from src, which can be local or a remote URL"""
103    if (src == dest):
104        return
105    if (is_url(src)):
106        print 'PWD: ' + os.getcwd()
107        print 'Fetching \n\t', src, '\n\t->', dest
108        try:
109            urllib.urlretrieve(src, dest)
110        except IOError, e:
111            raise error.AutotestError('Unable to retrieve %s (to %s)'
112                                % (src, dest), e)
113    else:
114        shutil.copyfile(src, dest)
115    if permissions:
116        os.chmod(dest, permissions)
117    return dest
118
119
120def unmap_url(srcdir, src, destdir='.'):
121    """
122    Receives either a path to a local file or a URL.
123    returns either the path to the local file, or the fetched URL
124
125    unmap_url('/usr/src', 'foo.tar', '/tmp')
126                            = '/usr/src/foo.tar'
127    unmap_url('/usr/src', 'http://site/file', '/tmp')
128                            = '/tmp/file'
129                            (after retrieving it)
130    """
131    if is_url(src):
132        url_parts = urlparse.urlparse(src)
133        filename = os.path.basename(url_parts[2])
134        dest = os.path.join(destdir, filename)
135        return get_file(src, dest)
136    else:
137        return os.path.join(srcdir, src)
138
139
140def update_version(srcdir, preserve_srcdir, new_version, install,
141                   *args, **dargs):
142    """
143    Make sure srcdir is version new_version
144
145    If not, delete it and install() the new version.
146
147    In the preserve_srcdir case, we just check it's up to date,
148    and if not, we rerun install, without removing srcdir
149    """
150    versionfile = os.path.join(srcdir, '.version')
151    install_needed = True
152
153    if os.path.exists(versionfile):
154        old_version = pickle.load(open(versionfile))
155        if old_version == new_version:
156            install_needed = False
157
158    if install_needed:
159        if not preserve_srcdir and os.path.exists(srcdir):
160            shutil.rmtree(srcdir)
161        install(*args, **dargs)
162        if os.path.exists(srcdir):
163            pickle.dump(new_version, open(versionfile, 'w'))
164
165
166def run(command, timeout=None, ignore_status=False,
167        stdout_tee=None, stderr_tee=None):
168    """
169    Run a command on the host.
170
171    Args:
172            command: the command line string
173            timeout: time limit in seconds before attempting to
174                    kill the running process. The run() function
175                    will take a few seconds longer than 'timeout'
176                    to complete if it has to kill the process.
177            ignore_status: do not raise an exception, no matter what
178                    the exit code of the command is.
179            stdout_tee: optional file-like object to which stdout data
180                        will be written as it is generated (data will still
181                        be stored in result.stdout)
182            stderr_tee: likewise for stderr
183
184    Returns:
185            a CmdResult object
186
187    Raises:
188            CmdError: the exit code of the command
189                    execution was not 0
190    """
191    return join_bg_job(run_bg(command), command, timeout, ignore_status,
192                       stdout_tee, stderr_tee)
193
194
195def run_bg(command):
196    """Run the command in a subprocess and return the subprocess."""
197    result = CmdResult(command)
198    def reset_sigpipe():
199        signal.signal(signal.SIGPIPE, signal.SIG_DFL)
200    sp = subprocess.Popen(command, stdout=subprocess.PIPE,
201                          stderr=subprocess.PIPE, preexec_fn=reset_sigpipe,
202                          shell=True, executable="/bin/bash")
203    return sp, result
204
205
206def join_bg_job(bg_job, command, timeout=None, ignore_status=False,
207        stdout_tee=None, stderr_tee=None):
208    """Join the subprocess with the current thread. See run description."""
209    sp, result = bg_job
210    stdout_file = StringIO.StringIO()
211    stderr_file = StringIO.StringIO()
212    (ret, timeouterr) = (0, False)
213
214    try:
215        # We are holding ends to stdin, stdout pipes
216        # hence we need to be sure to close those fds no mater what
217        start_time = time.time()
218        (ret, timeouterr) = _wait_for_command(sp, start_time,
219                                timeout, stdout_file, stderr_file,
220                                stdout_tee, stderr_tee)
221        result.exit_status = ret
222        result.duration = time.time() - start_time
223        # don't use os.read now, so we get all the rest of the output
224        _process_output(sp.stdout, stdout_file, stdout_tee,
225                        use_os_read=False)
226        _process_output(sp.stderr, stderr_file, stderr_tee,
227                        use_os_read=False)
228    finally:
229        # close our ends of the pipes to the sp no matter what
230        sp.stdout.close()
231        sp.stderr.close()
232
233    result.stdout = stdout_file.getvalue()
234    result.stderr = stderr_file.getvalue()
235
236    if result.exit_status != 0:
237        if timeouterr:
238            raise error.CmdError(command, result, "Command did not "
239                                 "complete within %d seconds" % timeout)
240        elif not ignore_status:
241            raise error.CmdError(command, result,
242                                 "Command returned non-zero exit status")
243
244    return result
245
246# this returns a tuple with the return code and a flag to specify if the error
247# is due to the process not terminating within timeout
248def _wait_for_command(subproc, start_time, timeout, stdout_file, stderr_file,
249                      stdout_tee, stderr_tee):
250    if timeout:
251        stop_time = start_time + timeout
252        time_left = stop_time - time.time()
253    else:
254        time_left = None # so that select never times out
255    while not timeout or time_left > 0:
256        # select will return when stdout is ready (including when it is
257        # EOF, that is the process has terminated).
258        ready, _, _ = select.select([subproc.stdout, subproc.stderr],
259                                     [], [], time_left)
260        # os.read() has to be used instead of
261        # subproc.stdout.read() which will otherwise block
262        if subproc.stdout in ready:
263            _process_output(subproc.stdout, stdout_file,
264                            stdout_tee)
265        if subproc.stderr in ready:
266            _process_output(subproc.stderr, stderr_file,
267                            stderr_tee)
268
269        exit_status_indication = subproc.poll()
270
271        if exit_status_indication is not None:
272            return (exit_status_indication, False)
273
274        if timeout:
275            time_left = stop_time - time.time()
276
277    # the process has not terminated within timeout,
278    # kill it via an escalating series of signals.
279    if exit_status_indication is None:
280        exit_status_indication = nuke_subprocess(subproc)
281
282    return (exit_status_indication, True)
283
284
285def nuke_subprocess(subproc):
286    # the process has not terminated within timeout,
287    # kill it via an escalating series of signals.
288    signal_queue = [signal.SIGTERM, signal.SIGKILL]
289    for sig in signal_queue:
290        try:
291            os.kill(subproc.pid, sig)
292        # The process may have died before we could kill it.
293        except OSError:
294            pass
295
296        for i in range(5):
297            rc = subproc.poll()
298            if rc != None:
299                return rc
300            time.sleep(1)
301
302
303def nuke_pid(pid):
304    # the process has not terminated within timeout,
305    # kill it via an escalating series of signals.
306    signal_queue = [signal.SIGTERM, signal.SIGKILL]
307    for sig in signal_queue:
308        try:
309            os.kill(pid, sig)
310
311        # The process may have died before we could kill it.
312        except OSError:
313            pass
314
315        try:
316            for i in range(5):
317                status = os.waitpid(pid, os.WNOHANG)[0]
318                if status == pid:
319                    return
320                time.sleep(1)
321
322            if status != pid:
323                raise error.AutoservRunError('Could not kill %d'
324                        % pid, None)
325
326        # the process died before we join it.
327        except OSError:
328            pass
329
330
331def _process_output(pipe, fbuffer, teefile=None, use_os_read=True):
332    if use_os_read:
333        data = os.read(pipe.fileno(), 1024)
334    else:
335        data = pipe.read()
336    fbuffer.write(data)
337    if teefile:
338        teefile.write(data)
339        teefile.flush()
340
341
342def system(command, timeout=None, ignore_status=False):
343    return run(command, timeout, ignore_status,
344            stdout_tee=sys.stdout, stderr_tee=sys.stderr).exit_status
345
346
347def system_output(command, timeout=None, ignore_status=False,
348                  retain_output=False):
349    if retain_output:
350        out = run(command, timeout, ignore_status,
351                  stdout_tee=sys.stdout, stderr_tee=sys.stderr).stdout
352    else:
353        out = run(command, timeout, ignore_status).stdout
354    if out[-1:] == '\n': out = out[:-1]
355    return out
356
357"""
358This function is used when there is a need to run more than one
359job simultaneously starting exactly at the same time. It basically returns
360a modified control file (containing the synchronization code prepended)
361whenever it is ready to run the control file. The synchronization
362is done using barriers to make sure that the jobs start at the same time.
363
364Here is how the synchronization is done to make sure that the tests
365start at exactly the same time on the client.
366sc_bar is a server barrier and s_bar, c_bar are the normal barriers
367
368                  Job1              Job2         ......      JobN
369 Server:   |                        sc_bar
370 Server:   |                        s_bar        ......      s_bar
371 Server:   |      at.run()         at.run()      ......      at.run()
372 ----------|------------------------------------------------------
373 Client    |      sc_bar
374 Client    |      c_bar             c_bar        ......      c_bar
375 Client    |    <run test>         <run test>    ......     <run test>
376
377
378PARAMS:
379   control_file : The control file which to which the above synchronization
380                  code would be prepended to
381   host_name    : The host name on which the job is going to run
382   host_num (non negative) : A number to identify the machine so that we have
383                  different sets of s_bar_ports for each of the machines.
384   instance     : The number of the job
385   num_jobs     : Total number of jobs that are going to run in parallel with
386                  this job starting at the same time
387   port_base    : Port number that is used to derive the actual barrier ports.
388
389RETURN VALUE:
390    The modified control file.
391
392"""
393def get_sync_control_file(control, host_name, host_num,
394                          instance, num_jobs, port_base=63100):
395    sc_bar_port = port_base
396    c_bar_port = port_base
397    if host_num < 0:
398        print "Please provide a non negative number for the host"
399        return None
400    s_bar_port = port_base + 1 + host_num # The set of s_bar_ports are
401                                          # the same for a given machine
402
403    sc_bar_timeout = 180
404    s_bar_timeout = c_bar_timeout = 120
405
406    # The barrier code snippet is prepended into the conrol file
407    # dynamically before at.run() is called finally.
408    control_new = []
409
410    # jobid is the unique name used to identify the processes
411    # trying to reach the barriers
412    jobid = "%s#%d" % (host_name, instance)
413
414    rendv = []
415    # rendvstr is a temp holder for the rendezvous list of the processes
416    for n in range(num_jobs):
417        rendv.append("'%s#%d'" % (host_name, n))
418    rendvstr = ",".join(rendv)
419
420    if instance == 0:
421        # Do the setup and wait at the server barrier
422        # Clean up the tmp and the control dirs for the first instance
423        control_new.append('if os.path.exists(job.tmpdir):')
424        control_new.append("\t system('umount -f %s > /dev/null"
425                           "2> /dev/null' % job.tmpdir,"
426                           "ignore_status=True)")
427        control_new.append("\t system('rm -rf ' + job.tmpdir)")
428        control_new.append(
429            'b0 = job.barrier("%s", "sc_bar", %d, port=%d)'
430            % (jobid, sc_bar_timeout, sc_bar_port))
431        control_new.append(
432        'b0.rendevous_servers("PARALLEL_MASTER", "%s")'
433         % jobid)
434
435    elif instance == 1:
436        # Wait at the server barrier to wait for instance=0
437        # process to complete setup
438        b0 = barrier.barrier("PARALLEL_MASTER", "sc_bar", sc_bar_timeout,
439                     port=sc_bar_port)
440        b0.rendevous_servers("PARALLEL_MASTER", jobid)
441
442        if(num_jobs > 2):
443            b1 = barrier.barrier(jobid, "s_bar", s_bar_timeout,
444                         port=s_bar_port)
445            b1.rendevous(rendvstr)
446
447    else:
448        # For the rest of the clients
449        b2 = barrier.barrier(jobid, "s_bar", s_bar_timeout, port=s_bar_port)
450        b2.rendevous(rendvstr)
451
452    # Client side barrier for all the tests to start at the same time
453    control_new.append('b1 = job.barrier("%s", "c_bar", %d, port=%d)'
454                    % (jobid, c_bar_timeout, c_bar_port))
455    control_new.append("b1.rendevous(%s)" % rendvstr)
456
457    # Stick in the rest of the control file
458    control_new.append(control)
459
460    return "\n".join(control_new)
461
462
463class CmdResult(object):
464    """
465    Command execution result.
466
467    command:     String containing the command line itself
468    exit_status: Integer exit code of the process
469    stdout:      String containing stdout of the process
470    stderr:      String containing stderr of the process
471    duration:    Elapsed wall clock time running the process
472    """
473
474
475    def __init__(self, command=None, stdout="", stderr="",
476                 exit_status=None, duration=0):
477        self.command = command
478        self.exit_status = exit_status
479        self.stdout = stdout
480        self.stderr = stderr
481        self.duration = duration
482
483
484    def __repr__(self):
485        wrapper = textwrap.TextWrapper(width = 78,
486                                       initial_indent="\n    ",
487                                       subsequent_indent="    ")
488
489        stdout = self.stdout.rstrip()
490        if stdout:
491            stdout = "\nstdout:\n%s" % stdout
492
493        stderr = self.stderr.rstrip()
494        if stderr:
495            stderr = "\nstderr:\n%s" % stderr
496
497        return ("* Command: %s\n"
498                "Exit status: %s\n"
499                "Duration: %s\n"
500                "%s"
501                "%s"
502                % (wrapper.fill(self.command), self.exit_status,
503                self.duration, stdout, stderr))
504
505
506class run_randomly:
507    def __init__(self, run_sequentially=False):
508        # Run sequentially is for debugging control files
509        self.test_list = []
510        self.run_sequentially = run_sequentially
511
512
513    def add(self, *args, **dargs):
514        test = (args, dargs)
515        self.test_list.append(test)
516
517
518    def run(self, fn):
519        while self.test_list:
520            test_index = random.randint(0, len(self.test_list)-1)
521            if self.run_sequentially:
522                test_index = 0
523            (args, dargs) = self.test_list.pop(test_index)
524            fn(*args, **dargs)
525