utils.py revision c5ddfd1f71caef9ec0c84c53ef7db42fcdc33e1c
1#!/usr/bin/python
2#
3# Copyright 2008 Google Inc. Released under the GPL v2
4
5import os, pickle, random, re, select, shutil, signal, StringIO, subprocess
6import socket, sys, time, textwrap, urllib, urlparse
7import error, barrier
8
9
10def read_one_line(filename):
11    return open(filename, 'r').readline().rstrip('\n')
12
13
14def write_one_line(filename, str):
15    open(filename, 'w').write(str.rstrip('\n') + '\n')
16
17
18def read_keyval(path):
19    """
20    Read a key-value pair format file into a dictionary, and return it.
21    Takes either a filename or directory name as input. If it's a
22    directory name, we assume you want the file to be called keyval.
23    """
24    if os.path.isdir(path):
25        path = os.path.join(path, 'keyval')
26    keyval = {}
27    for line in open(path):
28        line = re.sub('#.*', '', line).rstrip()
29        if not re.search(r'^[-\w]+=', line):
30            raise ValueError('Invalid format line: %s' % line)
31        key, value = line.split('=', 1)
32        if re.search('^\d+$', value):
33            value = int(value)
34        elif re.search('^(\d+\.)?\d+$', value):
35            value = float(value)
36        keyval[key] = value
37    return keyval
38
39
40def write_keyval(path, dictionary, type_tag=None):
41    """
42    Write a key-value pair format file out to a file. This uses append
43    mode to open the file, so existing text will not be overwritten or
44    reparsed.
45
46    If type_tag is None, then the key must be composed of alphanumeric
47    characters (or dashes+underscores). However, if type-tag is not
48    null then the keys must also have "{type_tag}" as a suffix. At
49    the moment the only valid values of type_tag are "attr" and "perf".
50    """
51    if os.path.isdir(path):
52        path = os.path.join(path, 'keyval')
53    keyval = open(path, 'a')
54
55    if type_tag is None:
56        key_regex = re.compile(r'^[-\w]+$')
57    else:
58        if type_tag not in ('attr', 'perf'):
59            raise ValueError('Invalid type tag: %s' % type_tag)
60        escaped_tag = re.escape(type_tag)
61        key_regex = re.compile(r'^[-\w]+\{%s\}$' % escaped_tag)
62    try:
63        for key, value in dictionary.iteritems():
64            if not key_regex.search(key):
65                raise ValueError('Invalid key: %s' % key)
66            keyval.write('%s=%s\n' % (key, value))
67    finally:
68        keyval.close()
69
70
71def is_url(path):
72    """Return true if path looks like a URL"""
73    # for now, just handle http and ftp
74    url_parts = urlparse.urlparse(path)
75    return (url_parts[0] in ('http', 'ftp'))
76
77
78def urlopen(url, data=None, proxies=None, timeout=300):
79    """Wrapper to urllib.urlopen with timeout addition."""
80
81    # Save old timeout
82    old_timeout = socket.getdefaulttimeout()
83    socket.setdefaulttimeout(timeout)
84    try:
85        return urllib.urlopen(url, data=data, proxies=proxies)
86    finally:
87        socket.setdefaulttimeout(old_timeout)
88
89
90def urlretrieve(url, filename=None, reporthook=None, data=None, timeout=300):
91    """Wrapper to urllib.urlretrieve with timeout addition."""
92    old_timeout = socket.getdefaulttimeout()
93    socket.setdefaulttimeout(timeout)
94    try:
95        return urllib.urlretrieve(url, filename=filename,
96                                  reporthook=reporthook, data=data)
97    finally:
98        socket.setdefaulttimeout(old_timeout)
99
100
101def get_file(src, dest, permissions=None):
102    """Get a file from src, which can be local or a remote URL"""
103    if (src == dest):
104        return
105    if (is_url(src)):
106        print 'PWD: ' + os.getcwd()
107        print 'Fetching \n\t', src, '\n\t->', dest
108        try:
109            urllib.urlretrieve(src, dest)
110        except IOError, e:
111            raise error.AutotestError('Unable to retrieve %s (to %s)'
112                                % (src, dest), e)
113    else:
114        shutil.copyfile(src, dest)
115    if permissions:
116        os.chmod(dest, permissions)
117    return dest
118
119
120def unmap_url(srcdir, src, destdir='.'):
121    """
122    Receives either a path to a local file or a URL.
123    returns either the path to the local file, or the fetched URL
124
125    unmap_url('/usr/src', 'foo.tar', '/tmp')
126                            = '/usr/src/foo.tar'
127    unmap_url('/usr/src', 'http://site/file', '/tmp')
128                            = '/tmp/file'
129                            (after retrieving it)
130    """
131    if is_url(src):
132        url_parts = urlparse.urlparse(src)
133        filename = os.path.basename(url_parts[2])
134        dest = os.path.join(destdir, filename)
135        return get_file(src, dest)
136    else:
137        return os.path.join(srcdir, src)
138
139
140def update_version(srcdir, preserve_srcdir, new_version, install,
141                   *args, **dargs):
142    """
143    Make sure srcdir is version new_version
144
145    If not, delete it and install() the new version.
146
147    In the preserve_srcdir case, we just check it's up to date,
148    and if not, we rerun install, without removing srcdir
149    """
150    versionfile = os.path.join(srcdir, '.version')
151    install_needed = True
152
153    if os.path.exists(versionfile):
154        old_version = pickle.load(open(versionfile))
155        if old_version == new_version:
156            install_needed = False
157
158    if install_needed:
159        if not preserve_srcdir and os.path.exists(srcdir):
160            shutil.rmtree(srcdir)
161        install(*args, **dargs)
162        if os.path.exists(srcdir):
163            pickle.dump(new_version, open(versionfile, 'w'))
164
165
166def run(command, timeout=None, ignore_status=False,
167        stdout_tee=None, stderr_tee=None):
168    """
169    Run a command on the host.
170
171    Args:
172            command: the command line string
173            timeout: time limit in seconds before attempting to
174                    kill the running process. The run() function
175                    will take a few seconds longer than 'timeout'
176                    to complete if it has to kill the process.
177            ignore_status: do not raise an exception, no matter what
178                    the exit code of the command is.
179            stdout_tee: optional file-like object to which stdout data
180                        will be written as it is generated (data will still
181                        be stored in result.stdout)
182            stderr_tee: likewise for stderr
183
184    Returns:
185            a CmdResult object
186
187    Raises:
188            CmdError: the exit code of the command
189                    execution was not 0
190    """
191    return join_bg_job(run_bg(command), command, timeout, ignore_status,
192                       stdout_tee, stderr_tee)
193
194
195def run_bg(command):
196    """Run the command in a subprocess and return the subprocess."""
197    result = CmdResult(command)
198    def reset_sigpipe():
199        signal.signal(signal.SIGPIPE, signal.SIG_DFL)
200    sp = subprocess.Popen(command, stdout=subprocess.PIPE,
201                          stderr=subprocess.PIPE, preexec_fn=reset_sigpipe,
202                          shell=True, executable="/bin/bash")
203    return sp, result
204
205
206def join_bg_job(bg_job, command, timeout=None, ignore_status=False,
207        stdout_tee=None, stderr_tee=None):
208    """Join the subprocess with the current thread. See run description."""
209    sp, result = bg_job
210    stdout_file = StringIO.StringIO()
211    stderr_file = StringIO.StringIO()
212    (ret, timeouterr) = (0, False)
213
214    try:
215        # We are holding ends to stdin, stdout pipes
216        # hence we need to be sure to close those fds no mater what
217        start_time = time.time()
218        (ret, timeouterr) = _wait_for_command(sp, start_time,
219                                timeout, stdout_file, stderr_file,
220                                stdout_tee, stderr_tee)
221        result.exit_status = ret
222        result.duration = time.time() - start_time
223        # don't use os.read now, so we get all the rest of the output
224        _process_output(sp.stdout, stdout_file, stdout_tee, final_read=True)
225        _process_output(sp.stderr, stderr_file, stderr_tee, final_read=True)
226    finally:
227        # close our ends of the pipes to the sp no matter what
228        sp.stdout.close()
229        sp.stderr.close()
230
231    result.stdout = stdout_file.getvalue()
232    result.stderr = stderr_file.getvalue()
233
234    if result.exit_status != 0:
235        if timeouterr:
236            raise error.CmdError(command, result, "Command did not "
237                                 "complete within %d seconds" % timeout)
238        elif not ignore_status:
239            raise error.CmdError(command, result,
240                                 "Command returned non-zero exit status")
241
242    return result
243
244# this returns a tuple with the return code and a flag to specify if the error
245# is due to the process not terminating within timeout
246def _wait_for_command(subproc, start_time, timeout, stdout_file, stderr_file,
247                      stdout_tee, stderr_tee):
248    if timeout:
249        stop_time = start_time + timeout
250        time_left = stop_time - time.time()
251    else:
252        time_left = None # so that select never times out
253    while not timeout or time_left > 0:
254        # select will return when stdout is ready (including when it is
255        # EOF, that is the process has terminated).
256        ready, _, _ = select.select([subproc.stdout, subproc.stderr],
257                                     [], [], time_left)
258        # os.read() has to be used instead of
259        # subproc.stdout.read() which will otherwise block
260        if subproc.stdout in ready:
261            _process_output(subproc.stdout, stdout_file, stdout_tee)
262        if subproc.stderr in ready:
263            _process_output(subproc.stderr, stderr_file, stderr_tee)
264
265        exit_status_indication = subproc.poll()
266
267        if exit_status_indication is not None:
268            return (exit_status_indication, False)
269
270        if timeout:
271            time_left = stop_time - time.time()
272
273    # the process has not terminated within timeout,
274    # kill it via an escalating series of signals.
275    if exit_status_indication is None:
276        exit_status_indication = nuke_subprocess(subproc)
277
278    return (exit_status_indication, True)
279
280
281def nuke_subprocess(subproc):
282    # the process has not terminated within timeout,
283    # kill it via an escalating series of signals.
284    signal_queue = [signal.SIGTERM, signal.SIGKILL]
285    for sig in signal_queue:
286        try:
287            os.kill(subproc.pid, sig)
288        # The process may have died before we could kill it.
289        except OSError:
290            pass
291
292        for i in range(5):
293            rc = subproc.poll()
294            if rc != None:
295                return rc
296            time.sleep(1)
297
298
299def nuke_pid(pid):
300    # the process has not terminated within timeout,
301    # kill it via an escalating series of signals.
302    signal_queue = [signal.SIGTERM, signal.SIGKILL]
303    for sig in signal_queue:
304        try:
305            os.kill(pid, sig)
306
307        # The process may have died before we could kill it.
308        except OSError:
309            pass
310
311        try:
312            for i in range(5):
313                status = os.waitpid(pid, os.WNOHANG)[0]
314                if status == pid:
315                    return
316                time.sleep(1)
317
318            if status != pid:
319                raise error.AutoservRunError('Could not kill %d'
320                        % pid, None)
321
322        # the process died before we join it.
323        except OSError:
324            pass
325
326
327def _process_output(pipe, fbuffer, teefile=None, final_read=False):
328    if final_read:
329        # read in all the data we can from pipe and then stop
330        data = []
331        while select.select([pipe], [], [], 0)[0]:
332            data.append(os.read(pipe.fileno(), 1024))
333            if len(data[-1]) == 0:
334                break
335        data = "".join(data)
336    else:
337        # perform a single read
338        data = os.read(pipe.fileno(), 1024)
339    fbuffer.write(data)
340    if teefile:
341        teefile.write(data)
342        teefile.flush()
343
344
345def system(command, timeout=None, ignore_status=False):
346    return run(command, timeout, ignore_status,
347            stdout_tee=sys.stdout, stderr_tee=sys.stderr).exit_status
348
349
350def system_output(command, timeout=None, ignore_status=False,
351                  retain_output=False):
352    if retain_output:
353        out = run(command, timeout, ignore_status,
354                  stdout_tee=sys.stdout, stderr_tee=sys.stderr).stdout
355    else:
356        out = run(command, timeout, ignore_status).stdout
357    if out[-1:] == '\n': out = out[:-1]
358    return out
359
360"""
361This function is used when there is a need to run more than one
362job simultaneously starting exactly at the same time. It basically returns
363a modified control file (containing the synchronization code prepended)
364whenever it is ready to run the control file. The synchronization
365is done using barriers to make sure that the jobs start at the same time.
366
367Here is how the synchronization is done to make sure that the tests
368start at exactly the same time on the client.
369sc_bar is a server barrier and s_bar, c_bar are the normal barriers
370
371                  Job1              Job2         ......      JobN
372 Server:   |                        sc_bar
373 Server:   |                        s_bar        ......      s_bar
374 Server:   |      at.run()         at.run()      ......      at.run()
375 ----------|------------------------------------------------------
376 Client    |      sc_bar
377 Client    |      c_bar             c_bar        ......      c_bar
378 Client    |    <run test>         <run test>    ......     <run test>
379
380
381PARAMS:
382   control_file : The control file which to which the above synchronization
383                  code would be prepended to
384   host_name    : The host name on which the job is going to run
385   host_num (non negative) : A number to identify the machine so that we have
386                  different sets of s_bar_ports for each of the machines.
387   instance     : The number of the job
388   num_jobs     : Total number of jobs that are going to run in parallel with
389                  this job starting at the same time
390   port_base    : Port number that is used to derive the actual barrier ports.
391
392RETURN VALUE:
393    The modified control file.
394
395"""
396def get_sync_control_file(control, host_name, host_num,
397                          instance, num_jobs, port_base=63100):
398    sc_bar_port = port_base
399    c_bar_port = port_base
400    if host_num < 0:
401        print "Please provide a non negative number for the host"
402        return None
403    s_bar_port = port_base + 1 + host_num # The set of s_bar_ports are
404                                          # the same for a given machine
405
406    sc_bar_timeout = 180
407    s_bar_timeout = c_bar_timeout = 120
408
409    # The barrier code snippet is prepended into the conrol file
410    # dynamically before at.run() is called finally.
411    control_new = []
412
413    # jobid is the unique name used to identify the processes
414    # trying to reach the barriers
415    jobid = "%s#%d" % (host_name, instance)
416
417    rendv = []
418    # rendvstr is a temp holder for the rendezvous list of the processes
419    for n in range(num_jobs):
420        rendv.append("'%s#%d'" % (host_name, n))
421    rendvstr = ",".join(rendv)
422
423    if instance == 0:
424        # Do the setup and wait at the server barrier
425        # Clean up the tmp and the control dirs for the first instance
426        control_new.append('if os.path.exists(job.tmpdir):')
427        control_new.append("\t system('umount -f %s > /dev/null"
428                           "2> /dev/null' % job.tmpdir,"
429                           "ignore_status=True)")
430        control_new.append("\t system('rm -rf ' + job.tmpdir)")
431        control_new.append(
432            'b0 = job.barrier("%s", "sc_bar", %d, port=%d)'
433            % (jobid, sc_bar_timeout, sc_bar_port))
434        control_new.append(
435        'b0.rendevous_servers("PARALLEL_MASTER", "%s")'
436         % jobid)
437
438    elif instance == 1:
439        # Wait at the server barrier to wait for instance=0
440        # process to complete setup
441        b0 = barrier.barrier("PARALLEL_MASTER", "sc_bar", sc_bar_timeout,
442                     port=sc_bar_port)
443        b0.rendevous_servers("PARALLEL_MASTER", jobid)
444
445        if(num_jobs > 2):
446            b1 = barrier.barrier(jobid, "s_bar", s_bar_timeout,
447                         port=s_bar_port)
448            b1.rendevous(rendvstr)
449
450    else:
451        # For the rest of the clients
452        b2 = barrier.barrier(jobid, "s_bar", s_bar_timeout, port=s_bar_port)
453        b2.rendevous(rendvstr)
454
455    # Client side barrier for all the tests to start at the same time
456    control_new.append('b1 = job.barrier("%s", "c_bar", %d, port=%d)'
457                    % (jobid, c_bar_timeout, c_bar_port))
458    control_new.append("b1.rendevous(%s)" % rendvstr)
459
460    # Stick in the rest of the control file
461    control_new.append(control)
462
463    return "\n".join(control_new)
464
465
466def get_arch(run_function=run):
467    """
468    Get the hardware architecture of the machine.
469    run_function is used to execute the commands. It defaults to
470    utils.run() but a custom method (if provided) should be of the
471    same schema as utils.run. It should return a CmdResult object and
472    throw a CmdError exception.
473    """
474    arch = run_function('/bin/uname -m').stdout.rstrip()
475    if re.match(r'i\d86$', arch):
476        arch = 'i386'
477    return arch
478
479
480class CmdResult(object):
481    """
482    Command execution result.
483
484    command:     String containing the command line itself
485    exit_status: Integer exit code of the process
486    stdout:      String containing stdout of the process
487    stderr:      String containing stderr of the process
488    duration:    Elapsed wall clock time running the process
489    """
490
491
492    def __init__(self, command=None, stdout="", stderr="",
493                 exit_status=None, duration=0):
494        self.command = command
495        self.exit_status = exit_status
496        self.stdout = stdout
497        self.stderr = stderr
498        self.duration = duration
499
500
501    def __repr__(self):
502        wrapper = textwrap.TextWrapper(width = 78,
503                                       initial_indent="\n    ",
504                                       subsequent_indent="    ")
505
506        stdout = self.stdout.rstrip()
507        if stdout:
508            stdout = "\nstdout:\n%s" % stdout
509
510        stderr = self.stderr.rstrip()
511        if stderr:
512            stderr = "\nstderr:\n%s" % stderr
513
514        return ("* Command: %s\n"
515                "Exit status: %s\n"
516                "Duration: %s\n"
517                "%s"
518                "%s"
519                % (wrapper.fill(self.command), self.exit_status,
520                self.duration, stdout, stderr))
521
522
523class run_randomly:
524    def __init__(self, run_sequentially=False):
525        # Run sequentially is for debugging control files
526        self.test_list = []
527        self.run_sequentially = run_sequentially
528
529
530    def add(self, *args, **dargs):
531        test = (args, dargs)
532        self.test_list.append(test)
533
534
535    def run(self, fn):
536        while self.test_list:
537            test_index = random.randint(0, len(self.test_list)-1)
538            if self.run_sequentially:
539                test_index = 0
540            (args, dargs) = self.test_list.pop(test_index)
541            fn(*args, **dargs)
542