utils.py revision f989e58f218b3f2597d6320f65a5c37f4a9a6ebd
1#
2# Copyright 2008 Google Inc. Released under the GPL v2
3
4import os, pickle, random, re, resource, select, shutil, signal, StringIO
5import socket, struct, subprocess, sys, time, textwrap, urlparse
6import warnings, smtplib, logging, urllib2
7try:
8    import hashlib
9except ImportError:
10    import md5, sha
11from autotest_lib.client.common_lib import error, barrier, logging_manager
12
13def deprecated(func):
14    """This is a decorator which can be used to mark functions as deprecated.
15    It will result in a warning being emmitted when the function is used."""
16    def new_func(*args, **dargs):
17        warnings.warn("Call to deprecated function %s." % func.__name__,
18                      category=DeprecationWarning)
19        return func(*args, **dargs)
20    new_func.__name__ = func.__name__
21    new_func.__doc__ = func.__doc__
22    new_func.__dict__.update(func.__dict__)
23    return new_func
24
25
26class _NullStream(object):
27    def write(self, data):
28        pass
29
30
31    def flush(self):
32        pass
33
34
35TEE_TO_LOGS = object()
36_the_null_stream = _NullStream()
37
38DEFAULT_STDOUT_LEVEL = logging.DEBUG
39DEFAULT_STDERR_LEVEL = logging.ERROR
40
41# prefixes for logging stdout/stderr of commands
42STDOUT_PREFIX = '[stdout] '
43STDERR_PREFIX = '[stderr] '
44
45
46def get_stream_tee_file(stream, level, prefix=''):
47    if stream is None:
48        return _the_null_stream
49    if stream is TEE_TO_LOGS:
50        return logging_manager.LoggingFile(level=level, prefix=prefix)
51    return stream
52
53
54class BgJob(object):
55    def __init__(self, command, stdout_tee=None, stderr_tee=None, verbose=True,
56                 stdin=None, stderr_level=DEFAULT_STDERR_LEVEL):
57        self.command = command
58        self.stdout_tee = get_stream_tee_file(stdout_tee, DEFAULT_STDOUT_LEVEL,
59                                              prefix=STDOUT_PREFIX)
60        self.stderr_tee = get_stream_tee_file(stderr_tee, stderr_level,
61                                              prefix=STDERR_PREFIX)
62        self.result = CmdResult(command)
63
64        # allow for easy stdin input by string, we'll let subprocess create
65        # a pipe for stdin input and we'll write to it in the wait loop
66        if isinstance(stdin, basestring):
67            self.string_stdin = stdin
68            stdin = subprocess.PIPE
69        else:
70            self.string_stdin = None
71
72        if verbose:
73            logging.debug("Running '%s'" % command)
74        self.sp = subprocess.Popen(command, stdout=subprocess.PIPE,
75                                   stderr=subprocess.PIPE,
76                                   preexec_fn=self._reset_sigpipe, shell=True,
77                                   executable="/bin/bash",
78                                   stdin=stdin)
79
80
81    def output_prepare(self, stdout_file=None, stderr_file=None):
82        self.stdout_file = stdout_file
83        self.stderr_file = stderr_file
84
85
86    def process_output(self, stdout=True, final_read=False):
87        """output_prepare must be called prior to calling this"""
88        if stdout:
89            pipe, buf, tee = self.sp.stdout, self.stdout_file, self.stdout_tee
90        else:
91            pipe, buf, tee = self.sp.stderr, self.stderr_file, self.stderr_tee
92
93        if final_read:
94            # read in all the data we can from pipe and then stop
95            data = []
96            while select.select([pipe], [], [], 0)[0]:
97                data.append(os.read(pipe.fileno(), 1024))
98                if len(data[-1]) == 0:
99                    break
100            data = "".join(data)
101        else:
102            # perform a single read
103            data = os.read(pipe.fileno(), 1024)
104        buf.write(data)
105        tee.write(data)
106
107
108    def cleanup(self):
109        self.stdout_tee.flush()
110        self.stderr_tee.flush()
111        self.sp.stdout.close()
112        self.sp.stderr.close()
113        self.result.stdout = self.stdout_file.getvalue()
114        self.result.stderr = self.stderr_file.getvalue()
115
116
117    def _reset_sigpipe(self):
118        signal.signal(signal.SIGPIPE, signal.SIG_DFL)
119
120
121def ip_to_long(ip):
122    # !L is a long in network byte order
123    return struct.unpack('!L', socket.inet_aton(ip))[0]
124
125
126def long_to_ip(number):
127    # See above comment.
128    return socket.inet_ntoa(struct.pack('!L', number))
129
130
131def create_subnet_mask(bits):
132    return (1 << 32) - (1 << 32-bits)
133
134
135def format_ip_with_mask(ip, mask_bits):
136    masked_ip = ip_to_long(ip) & create_subnet_mask(mask_bits)
137    return "%s/%s" % (long_to_ip(masked_ip), mask_bits)
138
139
140def normalize_hostname(alias):
141    ip = socket.gethostbyname(alias)
142    return socket.gethostbyaddr(ip)[0]
143
144
145def get_ip_local_port_range():
146    match = re.match(r'\s*(\d+)\s*(\d+)\s*$',
147                     read_one_line('/proc/sys/net/ipv4/ip_local_port_range'))
148    return (int(match.group(1)), int(match.group(2)))
149
150
151def set_ip_local_port_range(lower, upper):
152    write_one_line('/proc/sys/net/ipv4/ip_local_port_range',
153                   '%d %d\n' % (lower, upper))
154
155
156
157def send_email(mail_from, mail_to, subject, body):
158    """
159    Sends an email via smtp
160
161    mail_from: string with email address of sender
162    mail_to: string or list with email address(es) of recipients
163    subject: string with subject of email
164    body: (multi-line) string with body of email
165    """
166    if isinstance(mail_to, str):
167        mail_to = [mail_to]
168    msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (mail_from, ','.join(mail_to),
169                                                   subject, body)
170    try:
171        mailer = smtplib.SMTP('localhost')
172        try:
173            mailer.sendmail(mail_from, mail_to, msg)
174        finally:
175            mailer.quit()
176    except Exception, e:
177        # Emails are non-critical, not errors, but don't raise them
178        print "Sending email failed. Reason: %s" % repr(e)
179
180
181def read_one_line(filename):
182    return open(filename, 'r').readline().rstrip('\n')
183
184
185def read_file(filename):
186    f = open(filename)
187    try:
188        return f.read()
189    finally:
190        f.close()
191
192
193def write_one_line(filename, line):
194    open_write_close(filename, line.rstrip('\n') + '\n')
195
196
197def open_write_close(filename, data):
198    f = open(filename, 'w')
199    try:
200        f.write(data)
201    finally:
202        f.close()
203
204
205def read_keyval(path):
206    """
207    Read a key-value pair format file into a dictionary, and return it.
208    Takes either a filename or directory name as input. If it's a
209    directory name, we assume you want the file to be called keyval.
210    """
211    if os.path.isdir(path):
212        path = os.path.join(path, 'keyval')
213    keyval = {}
214    if os.path.exists(path):
215        for line in open(path):
216            line = re.sub('#.*', '', line).rstrip()
217            if not re.search(r'^[-\.\w]+=', line):
218                raise ValueError('Invalid format line: %s' % line)
219            key, value = line.split('=', 1)
220            if re.search('^\d+$', value):
221                value = int(value)
222            elif re.search('^(\d+\.)?\d+$', value):
223                value = float(value)
224            keyval[key] = value
225    return keyval
226
227
228def write_keyval(path, dictionary, type_tag=None):
229    """
230    Write a key-value pair format file out to a file. This uses append
231    mode to open the file, so existing text will not be overwritten or
232    reparsed.
233
234    If type_tag is None, then the key must be composed of alphanumeric
235    characters (or dashes+underscores). However, if type-tag is not
236    null then the keys must also have "{type_tag}" as a suffix. At
237    the moment the only valid values of type_tag are "attr" and "perf".
238    """
239    if os.path.isdir(path):
240        path = os.path.join(path, 'keyval')
241    keyval = open(path, 'a')
242
243    if type_tag is None:
244        key_regex = re.compile(r'^[-\.\w]+$')
245    else:
246        if type_tag not in ('attr', 'perf'):
247            raise ValueError('Invalid type tag: %s' % type_tag)
248        escaped_tag = re.escape(type_tag)
249        key_regex = re.compile(r'^[-\.\w]+\{%s\}$' % escaped_tag)
250    try:
251        for key in sorted(dictionary.keys()):
252            if not key_regex.search(key):
253                raise ValueError('Invalid key: %s' % key)
254            keyval.write('%s=%s\n' % (key, dictionary[key]))
255    finally:
256        keyval.close()
257
258
259def is_url(path):
260    """Return true if path looks like a URL"""
261    # for now, just handle http and ftp
262    url_parts = urlparse.urlparse(path)
263    return (url_parts[0] in ('http', 'ftp'))
264
265
266def urlopen(url, data=None, timeout=5):
267    """Wrapper to urllib2.urlopen with timeout addition."""
268
269    # Save old timeout
270    old_timeout = socket.getdefaulttimeout()
271    socket.setdefaulttimeout(timeout)
272    try:
273        return urllib2.urlopen(url, data=data)
274    finally:
275        socket.setdefaulttimeout(old_timeout)
276
277
278def urlretrieve(url, filename, data=None, timeout=300):
279    """Retrieve a file from given url."""
280    logging.debug('Fetching %s -> %s', url, filename)
281
282    src_file = urlopen(url, data=data, timeout=timeout)
283    try:
284        dest_file = open(filename, 'wb')
285        try:
286            shutil.copyfileobj(src_file, dest_file)
287        finally:
288            dest_file.close()
289    finally:
290        src_file.close()
291
292
293def hash(type, input=None):
294    """
295    Returns an hash object of type md5 or sha1. This function is implemented in
296    order to encapsulate hash objects in a way that is compatible with python
297    2.4 and python 2.6 without warnings.
298
299    Note that even though python 2.6 hashlib supports hash types other than
300    md5 and sha1, we are artificially limiting the input values in order to
301    make the function to behave exactly the same among both python
302    implementations.
303
304    @param input: Optional input string that will be used to update the hash.
305    """
306    if type not in ['md5', 'sha1']:
307        raise ValueError("Unsupported hash type: %s" % type)
308
309    try:
310        hash = hashlib.new(type)
311    except NameError:
312        if type == 'md5':
313            hash = md5.new()
314        elif type == 'sha1':
315            hash = sha.new()
316
317    if input:
318        hash.update(input)
319
320    return hash
321
322
323def get_file(src, dest, permissions=None):
324    """Get a file from src, which can be local or a remote URL"""
325    if src == dest:
326        return
327
328    if is_url(src):
329        urlretrieve(src, dest)
330    else:
331        shutil.copyfile(src, dest)
332
333    if permissions:
334        os.chmod(dest, permissions)
335    return dest
336
337
338def unmap_url(srcdir, src, destdir='.'):
339    """
340    Receives either a path to a local file or a URL.
341    returns either the path to the local file, or the fetched URL
342
343    unmap_url('/usr/src', 'foo.tar', '/tmp')
344                            = '/usr/src/foo.tar'
345    unmap_url('/usr/src', 'http://site/file', '/tmp')
346                            = '/tmp/file'
347                            (after retrieving it)
348    """
349    if is_url(src):
350        url_parts = urlparse.urlparse(src)
351        filename = os.path.basename(url_parts[2])
352        dest = os.path.join(destdir, filename)
353        return get_file(src, dest)
354    else:
355        return os.path.join(srcdir, src)
356
357
358def update_version(srcdir, preserve_srcdir, new_version, install,
359                   *args, **dargs):
360    """
361    Make sure srcdir is version new_version
362
363    If not, delete it and install() the new version.
364
365    In the preserve_srcdir case, we just check it's up to date,
366    and if not, we rerun install, without removing srcdir
367    """
368    versionfile = os.path.join(srcdir, '.version')
369    install_needed = True
370
371    if os.path.exists(versionfile):
372        old_version = pickle.load(open(versionfile))
373        if old_version == new_version:
374            install_needed = False
375
376    if install_needed:
377        if not preserve_srcdir and os.path.exists(srcdir):
378            shutil.rmtree(srcdir)
379        install(*args, **dargs)
380        if os.path.exists(srcdir):
381            pickle.dump(new_version, open(versionfile, 'w'))
382
383
384def get_stderr_level(stderr_is_expected):
385    if stderr_is_expected:
386        return DEFAULT_STDOUT_LEVEL
387    return DEFAULT_STDERR_LEVEL
388
389
390def run(command, timeout=None, ignore_status=False,
391        stdout_tee=None, stderr_tee=None, verbose=True, stdin=None,
392        stderr_is_expected=None, args=()):
393    """
394    Run a command on the host.
395
396    @param command: the command line string.
397    @param timeout: time limit in seconds before attempting to kill the
398            running process. The run() function will take a few seconds
399            longer than 'timeout' to complete if it has to kill the process.
400    @param ignore_status: do not raise an exception, no matter what the exit
401            code of the command is.
402    @param stdout_tee: optional file-like object to which stdout data
403            will be written as it is generated (data will still be stored
404            in result.stdout).
405    @param stderr_tee: likewise for stderr.
406    @param verbose: if True, log the command being run.
407    @param stdin: stdin to pass to the executed process (can be a file
408            descriptor, a file object of a real file or a string).
409    @param args: sequence of strings of arguments to be given to the command
410            inside " quotes after they have been escaped for that; each
411            element in the sequence will be given as a separate command
412            argument
413
414    @return a CmdResult object
415
416    @raise CmdError: the exit code of the command execution was not 0
417    """
418    if isinstance(args, basestring):
419        raise TypeError('Got a string for the "args" keyword argument, '
420                        'need a sequence.')
421
422    for arg in args:
423        command += ' "%s"' % sh_escape(arg)
424    if stderr_is_expected is None:
425        stderr_is_expected = ignore_status
426
427    bg_job = join_bg_jobs(
428        (BgJob(command, stdout_tee, stderr_tee, verbose, stdin=stdin,
429               stderr_level=get_stderr_level(stderr_is_expected)),),
430        timeout)[0]
431    if not ignore_status and bg_job.result.exit_status:
432        raise error.CmdError(command, bg_job.result,
433                             "Command returned non-zero exit status")
434
435    return bg_job.result
436
437
438def run_parallel(commands, timeout=None, ignore_status=False,
439                 stdout_tee=None, stderr_tee=None):
440    """
441    Behaves the same as run() with the following exceptions:
442
443    - commands is a list of commands to run in parallel.
444    - ignore_status toggles whether or not an exception should be raised
445      on any error.
446
447    @return: a list of CmdResult objects
448    """
449    bg_jobs = []
450    for command in commands:
451        bg_jobs.append(BgJob(command, stdout_tee, stderr_tee,
452                             stderr_level=get_stderr_level(ignore_status)))
453
454    # Updates objects in bg_jobs list with their process information
455    join_bg_jobs(bg_jobs, timeout)
456
457    for bg_job in bg_jobs:
458        if not ignore_status and bg_job.result.exit_status:
459            raise error.CmdError(command, bg_job.result,
460                                 "Command returned non-zero exit status")
461
462    return [bg_job.result for bg_job in bg_jobs]
463
464
465@deprecated
466def run_bg(command):
467    """Function deprecated. Please use BgJob class instead."""
468    bg_job = BgJob(command)
469    return bg_job.sp, bg_job.result
470
471
472def join_bg_jobs(bg_jobs, timeout=None):
473    """Joins the bg_jobs with the current thread.
474
475    Returns the same list of bg_jobs objects that was passed in.
476    """
477    ret, timeout_error = 0, False
478    for bg_job in bg_jobs:
479        bg_job.output_prepare(StringIO.StringIO(), StringIO.StringIO())
480
481    try:
482        # We are holding ends to stdin, stdout pipes
483        # hence we need to be sure to close those fds no mater what
484        start_time = time.time()
485        timeout_error = _wait_for_commands(bg_jobs, start_time, timeout)
486
487        for bg_job in bg_jobs:
488            # Process stdout and stderr
489            bg_job.process_output(stdout=True,final_read=True)
490            bg_job.process_output(stdout=False,final_read=True)
491    finally:
492        # close our ends of the pipes to the sp no matter what
493        for bg_job in bg_jobs:
494            bg_job.cleanup()
495
496    if timeout_error:
497        # TODO: This needs to be fixed to better represent what happens when
498        # running in parallel. However this is backwards compatable, so it will
499        # do for the time being.
500        raise error.CmdError(bg_jobs[0].command, bg_jobs[0].result,
501                             "Command(s) did not complete within %d seconds"
502                             % timeout)
503
504
505    return bg_jobs
506
507
508def _wait_for_commands(bg_jobs, start_time, timeout):
509    # This returns True if it must return due to a timeout, otherwise False.
510
511    # To check for processes which terminate without producing any output
512    # a 1 second timeout is used in select.
513    SELECT_TIMEOUT = 1
514
515    read_list = []
516    write_list = []
517    reverse_dict = {}
518
519    for bg_job in bg_jobs:
520        read_list.append(bg_job.sp.stdout)
521        read_list.append(bg_job.sp.stderr)
522        reverse_dict[bg_job.sp.stdout] = (bg_job, True)
523        reverse_dict[bg_job.sp.stderr] = (bg_job, False)
524        if bg_job.string_stdin is not None:
525            write_list.append(bg_job.sp.stdin)
526            reverse_dict[bg_job.sp.stdin] = bg_job
527
528    if timeout:
529        stop_time = start_time + timeout
530        time_left = stop_time - time.time()
531    else:
532        time_left = None # so that select never times out
533
534    while not timeout or time_left > 0:
535        # select will return when we may write to stdin or when there is
536        # stdout/stderr output we can read (including when it is
537        # EOF, that is the process has terminated).
538        read_ready, write_ready, _ = select.select(read_list, write_list, [],
539                                                   SELECT_TIMEOUT)
540
541        # os.read() has to be used instead of
542        # subproc.stdout.read() which will otherwise block
543        for file_obj in read_ready:
544            bg_job, is_stdout = reverse_dict[file_obj]
545            bg_job.process_output(is_stdout)
546
547        for file_obj in write_ready:
548            # we can write PIPE_BUF bytes without blocking
549            # POSIX requires PIPE_BUF is >= 512
550            bg_job = reverse_dict[file_obj]
551            file_obj.write(bg_job.string_stdin[:512])
552            bg_job.string_stdin = bg_job.string_stdin[512:]
553            # no more input data, close stdin, remove it from the select set
554            if not bg_job.string_stdin:
555                file_obj.close()
556                write_list.remove(file_obj)
557                del reverse_dict[file_obj]
558
559        all_jobs_finished = True
560        for bg_job in bg_jobs:
561            if bg_job.result.exit_status is not None:
562                continue
563
564            bg_job.result.exit_status = bg_job.sp.poll()
565            if bg_job.result.exit_status is not None:
566                # process exited, remove its stdout/stdin from the select set
567                read_list.remove(bg_job.sp.stdout)
568                read_list.remove(bg_job.sp.stderr)
569                del reverse_dict[bg_job.sp.stdout]
570                del reverse_dict[bg_job.sp.stderr]
571            else:
572                all_jobs_finished = False
573
574        if all_jobs_finished:
575            return False
576
577        if timeout:
578            time_left = stop_time - time.time()
579
580    # Kill all processes which did not complete prior to timeout
581    for bg_job in bg_jobs:
582        if bg_job.result.exit_status is not None:
583            continue
584
585        logging.warn('run process timeout (%s) fired on: %s', timeout,
586                     bg_job.command)
587        nuke_subprocess(bg_job.sp)
588        bg_job.result.exit_status = bg_job.sp.poll()
589
590    return True
591
592
593def pid_is_alive(pid):
594    """
595    True if process pid exists and is not yet stuck in Zombie state.
596    Zombies are impossible to move between cgroups, etc.
597    pid can be integer, or text of integer.
598    """
599    path = '/proc/%s/stat' % pid
600
601    try:
602        stat = read_one_line(path)
603    except IOError:
604        if not os.path.exists(path):
605            # file went away
606            return False
607        raise
608
609    return stat.split()[2] != 'Z'
610
611
612def signal_pid(pid, sig):
613    """
614    Sends a signal to a process id. Returns True if the process terminated
615    successfully, False otherwise.
616    """
617    try:
618        os.kill(pid, sig)
619    except OSError:
620        # The process may have died before we could kill it.
621        pass
622
623    for i in range(5):
624        if not pid_is_alive(pid):
625            return True
626        time.sleep(1)
627
628    # The process is still alive
629    return False
630
631
632def nuke_subprocess(subproc):
633    # check if the subprocess is still alive, first
634    if subproc.poll() is not None:
635        return subproc.poll()
636
637    # the process has not terminated within timeout,
638    # kill it via an escalating series of signals.
639    signal_queue = [signal.SIGTERM, signal.SIGKILL]
640    for sig in signal_queue:
641        signal_pid(subproc.pid, sig)
642        if subproc.poll() is not None:
643            return subproc.poll()
644
645
646def nuke_pid(pid, signal_queue=(signal.SIGTERM, signal.SIGKILL)):
647    # the process has not terminated within timeout,
648    # kill it via an escalating series of signals.
649    for sig in signal_queue:
650        if signal_pid(pid, sig):
651            return
652
653    # no signal successfully terminated the process
654    raise error.AutoservRunError('Could not kill %d' % pid, None)
655
656
657def system(command, timeout=None, ignore_status=False):
658    """
659    Run a command
660
661    @param timeout: timeout in seconds
662    @param ignore_status: if ignore_status=False, throw an exception if the
663            command's exit code is non-zero
664            if ignore_stauts=True, return the exit code.
665
666    @return exit status of command
667            (note, this will always be zero unless ignore_status=True)
668    """
669    return run(command, timeout=timeout, ignore_status=ignore_status,
670               stdout_tee=TEE_TO_LOGS, stderr_tee=TEE_TO_LOGS).exit_status
671
672
673def system_parallel(commands, timeout=None, ignore_status=False):
674    """This function returns a list of exit statuses for the respective
675    list of commands."""
676    return [bg_jobs.exit_status for bg_jobs in
677            run_parallel(commands, timeout=timeout, ignore_status=ignore_status,
678                         stdout_tee=TEE_TO_LOGS, stderr_tee=TEE_TO_LOGS)]
679
680
681def system_output(command, timeout=None, ignore_status=False,
682                  retain_output=False, args=()):
683    """
684    Run a command and return the stdout output.
685
686    @param command: command string to execute.
687    @param timeout: time limit in seconds before attempting to kill the
688            running process. The function will take a few seconds longer
689            than 'timeout' to complete if it has to kill the process.
690    @param ignore_status: do not raise an exception, no matter what the exit
691            code of the command is.
692    @param retain_output: set to True to make stdout/stderr of the command
693            output to be also sent to the logging system
694    @param args: sequence of strings of arguments to be given to the command
695            inside " quotes after they have been escaped for that; each
696            element in the sequence will be given as a separate command
697            argument
698
699    @return a string with the stdout output of the command.
700    """
701    if retain_output:
702        out = run(command, timeout=timeout, ignore_status=ignore_status,
703                  stdout_tee=TEE_TO_LOGS, stderr_tee=TEE_TO_LOGS,
704                  args=args).stdout
705    else:
706        out = run(command, timeout=timeout, ignore_status=ignore_status,
707                  args=args).stdout
708    if out[-1:] == '\n':
709        out = out[:-1]
710    return out
711
712
713def system_output_parallel(commands, timeout=None, ignore_status=False,
714                           retain_output=False):
715    if retain_output:
716        out = [bg_job.stdout for bg_job
717               in run_parallel(commands, timeout=timeout,
718                               ignore_status=ignore_status,
719                               stdout_tee=TEE_TO_LOGS, stderr_tee=TEE_TO_LOGS)]
720    else:
721        out = [bg_job.stdout for bg_job in run_parallel(commands,
722                                  timeout=timeout, ignore_status=ignore_status)]
723    for x in out:
724        if out[-1:] == '\n': out = out[:-1]
725    return out
726
727
728def strip_unicode(input):
729    if type(input) == list:
730        return [strip_unicode(i) for i in input]
731    elif type(input) == dict:
732        output = {}
733        for key in input.keys():
734            output[str(key)] = strip_unicode(input[key])
735        return output
736    elif type(input) == unicode:
737        return str(input)
738    else:
739        return input
740
741
742def get_cpu_percentage(function, *args, **dargs):
743    """Returns a tuple containing the CPU% and return value from function call.
744
745    This function calculates the usage time by taking the difference of
746    the user and system times both before and after the function call.
747    """
748    child_pre = resource.getrusage(resource.RUSAGE_CHILDREN)
749    self_pre = resource.getrusage(resource.RUSAGE_SELF)
750    start = time.time()
751    to_return = function(*args, **dargs)
752    elapsed = time.time() - start
753    self_post = resource.getrusage(resource.RUSAGE_SELF)
754    child_post = resource.getrusage(resource.RUSAGE_CHILDREN)
755
756    # Calculate CPU Percentage
757    s_user, s_system = [a - b for a, b in zip(self_post, self_pre)[:2]]
758    c_user, c_system = [a - b for a, b in zip(child_post, child_pre)[:2]]
759    cpu_percent = (s_user + c_user + s_system + c_system) / elapsed
760
761    return cpu_percent, to_return
762
763
764"""
765This function is used when there is a need to run more than one
766job simultaneously starting exactly at the same time. It basically returns
767a modified control file (containing the synchronization code prepended)
768whenever it is ready to run the control file. The synchronization
769is done using barriers to make sure that the jobs start at the same time.
770
771Here is how the synchronization is done to make sure that the tests
772start at exactly the same time on the client.
773sc_bar is a server barrier and s_bar, c_bar are the normal barriers
774
775                  Job1              Job2         ......      JobN
776 Server:   |                        sc_bar
777 Server:   |                        s_bar        ......      s_bar
778 Server:   |      at.run()         at.run()      ......      at.run()
779 ----------|------------------------------------------------------
780 Client    |      sc_bar
781 Client    |      c_bar             c_bar        ......      c_bar
782 Client    |    <run test>         <run test>    ......     <run test>
783
784
785PARAMS:
786   control_file : The control file which to which the above synchronization
787                  code would be prepended to
788   host_name    : The host name on which the job is going to run
789   host_num (non negative) : A number to identify the machine so that we have
790                  different sets of s_bar_ports for each of the machines.
791   instance     : The number of the job
792   num_jobs     : Total number of jobs that are going to run in parallel with
793                  this job starting at the same time
794   port_base    : Port number that is used to derive the actual barrier ports.
795
796RETURN VALUE:
797    The modified control file.
798
799"""
800def get_sync_control_file(control, host_name, host_num,
801                          instance, num_jobs, port_base=63100):
802    sc_bar_port = port_base
803    c_bar_port = port_base
804    if host_num < 0:
805        print "Please provide a non negative number for the host"
806        return None
807    s_bar_port = port_base + 1 + host_num # The set of s_bar_ports are
808                                          # the same for a given machine
809
810    sc_bar_timeout = 180
811    s_bar_timeout = c_bar_timeout = 120
812
813    # The barrier code snippet is prepended into the conrol file
814    # dynamically before at.run() is called finally.
815    control_new = []
816
817    # jobid is the unique name used to identify the processes
818    # trying to reach the barriers
819    jobid = "%s#%d" % (host_name, instance)
820
821    rendv = []
822    # rendvstr is a temp holder for the rendezvous list of the processes
823    for n in range(num_jobs):
824        rendv.append("'%s#%d'" % (host_name, n))
825    rendvstr = ",".join(rendv)
826
827    if instance == 0:
828        # Do the setup and wait at the server barrier
829        # Clean up the tmp and the control dirs for the first instance
830        control_new.append('if os.path.exists(job.tmpdir):')
831        control_new.append("\t system('umount -f %s > /dev/null"
832                           "2> /dev/null' % job.tmpdir,"
833                           "ignore_status=True)")
834        control_new.append("\t system('rm -rf ' + job.tmpdir)")
835        control_new.append(
836            'b0 = job.barrier("%s", "sc_bar", %d, port=%d)'
837            % (jobid, sc_bar_timeout, sc_bar_port))
838        control_new.append(
839        'b0.rendezvous_servers("PARALLEL_MASTER", "%s")'
840         % jobid)
841
842    elif instance == 1:
843        # Wait at the server barrier to wait for instance=0
844        # process to complete setup
845        b0 = barrier.barrier("PARALLEL_MASTER", "sc_bar", sc_bar_timeout,
846                     port=sc_bar_port)
847        b0.rendezvous_servers("PARALLEL_MASTER", jobid)
848
849        if(num_jobs > 2):
850            b1 = barrier.barrier(jobid, "s_bar", s_bar_timeout,
851                         port=s_bar_port)
852            b1.rendezvous(rendvstr)
853
854    else:
855        # For the rest of the clients
856        b2 = barrier.barrier(jobid, "s_bar", s_bar_timeout, port=s_bar_port)
857        b2.rendezvous(rendvstr)
858
859    # Client side barrier for all the tests to start at the same time
860    control_new.append('b1 = job.barrier("%s", "c_bar", %d, port=%d)'
861                    % (jobid, c_bar_timeout, c_bar_port))
862    control_new.append("b1.rendezvous(%s)" % rendvstr)
863
864    # Stick in the rest of the control file
865    control_new.append(control)
866
867    return "\n".join(control_new)
868
869
870def get_arch(run_function=run):
871    """
872    Get the hardware architecture of the machine.
873    run_function is used to execute the commands. It defaults to
874    utils.run() but a custom method (if provided) should be of the
875    same schema as utils.run. It should return a CmdResult object and
876    throw a CmdError exception.
877    """
878    arch = run_function('/bin/uname -m').stdout.rstrip()
879    if re.match(r'i\d86$', arch):
880        arch = 'i386'
881    return arch
882
883
884def get_num_logical_cpus_per_socket(run_function=run):
885    """
886    Get the number of cores (including hyperthreading) per cpu.
887    run_function is used to execute the commands. It defaults to
888    utils.run() but a custom method (if provided) should be of the
889    same schema as utils.run. It should return a CmdResult object and
890    throw a CmdError exception.
891    """
892    siblings = run_function('grep "^siblings" /proc/cpuinfo').stdout.rstrip()
893    num_siblings = map(int,
894                       re.findall(r'^siblings\s*:\s*(\d+)\s*$',
895                                  siblings, re.M))
896    if len(num_siblings) == 0:
897        raise error.TestError('Unable to find siblings info in /proc/cpuinfo')
898    if min(num_siblings) != max(num_siblings):
899        raise error.TestError('Number of siblings differ %r' %
900                              num_siblings)
901    return num_siblings[0]
902
903
904def merge_trees(src, dest):
905    """
906    Merges a source directory tree at 'src' into a destination tree at
907    'dest'. If a path is a file in both trees than the file in the source
908    tree is APPENDED to the one in the destination tree. If a path is
909    a directory in both trees then the directories are recursively merged
910    with this function. In any other case, the function will skip the
911    paths that cannot be merged (instead of failing).
912    """
913    if not os.path.exists(src):
914        return # exists only in dest
915    elif not os.path.exists(dest):
916        if os.path.isfile(src):
917            shutil.copy2(src, dest) # file only in src
918        else:
919            shutil.copytree(src, dest, symlinks=True) # dir only in src
920        return
921    elif os.path.isfile(src) and os.path.isfile(dest):
922        # src & dest are files in both trees, append src to dest
923        destfile = open(dest, "a")
924        try:
925            srcfile = open(src)
926            try:
927                destfile.write(srcfile.read())
928            finally:
929                srcfile.close()
930        finally:
931            destfile.close()
932    elif os.path.isdir(src) and os.path.isdir(dest):
933        # src & dest are directories in both trees, so recursively merge
934        for name in os.listdir(src):
935            merge_trees(os.path.join(src, name), os.path.join(dest, name))
936    else:
937        # src & dest both exist, but are incompatible
938        return
939
940
941class CmdResult(object):
942    """
943    Command execution result.
944
945    command:     String containing the command line itself
946    exit_status: Integer exit code of the process
947    stdout:      String containing stdout of the process
948    stderr:      String containing stderr of the process
949    duration:    Elapsed wall clock time running the process
950    """
951
952
953    def __init__(self, command="", stdout="", stderr="",
954                 exit_status=None, duration=0):
955        self.command = command
956        self.exit_status = exit_status
957        self.stdout = stdout
958        self.stderr = stderr
959        self.duration = duration
960
961
962    def __repr__(self):
963        wrapper = textwrap.TextWrapper(width = 78,
964                                       initial_indent="\n    ",
965                                       subsequent_indent="    ")
966
967        stdout = self.stdout.rstrip()
968        if stdout:
969            stdout = "\nstdout:\n%s" % stdout
970
971        stderr = self.stderr.rstrip()
972        if stderr:
973            stderr = "\nstderr:\n%s" % stderr
974
975        return ("* Command: %s\n"
976                "Exit status: %s\n"
977                "Duration: %s\n"
978                "%s"
979                "%s"
980                % (wrapper.fill(self.command), self.exit_status,
981                self.duration, stdout, stderr))
982
983
984class run_randomly:
985    def __init__(self, run_sequentially=False):
986        # Run sequentially is for debugging control files
987        self.test_list = []
988        self.run_sequentially = run_sequentially
989
990
991    def add(self, *args, **dargs):
992        test = (args, dargs)
993        self.test_list.append(test)
994
995
996    def run(self, fn):
997        while self.test_list:
998            test_index = random.randint(0, len(self.test_list)-1)
999            if self.run_sequentially:
1000                test_index = 0
1001            (args, dargs) = self.test_list.pop(test_index)
1002            fn(*args, **dargs)
1003
1004
1005def import_site_module(path, module, dummy=None, modulefile=None):
1006    """
1007    Try to import the site specific module if it exists.
1008
1009    @param path full filename of the source file calling this (ie __file__)
1010    @param module full module name
1011    @param dummy dummy value to return in case there is no symbol to import
1012    @param modulefile module filename
1013
1014    @return site specific module or dummy
1015
1016    @raises ImportError if the site file exists but imports fails
1017    """
1018    short_module = module[module.rfind(".") + 1:]
1019
1020    if not modulefile:
1021        modulefile = short_module + ".py"
1022
1023    if os.path.exists(os.path.join(os.path.dirname(path), modulefile)):
1024        return __import__(module, {}, {}, [short_module])
1025    return dummy
1026
1027
1028def import_site_symbol(path, module, name, dummy=None, modulefile=None):
1029    """
1030    Try to import site specific symbol from site specific file if it exists
1031
1032    @param path full filename of the source file calling this (ie __file__)
1033    @param module full module name
1034    @param name symbol name to be imported from the site file
1035    @param dummy dummy value to return in case there is no symbol to import
1036    @param modulefile module filename
1037
1038    @return site specific symbol or dummy
1039
1040    @raises ImportError if the site file exists but imports fails
1041    """
1042    module = import_site_module(path, module, modulefile=modulefile)
1043    if not module:
1044        return dummy
1045
1046    # special unique value to tell us if the symbol can't be imported
1047    cant_import = object()
1048
1049    obj = getattr(module, name, cant_import)
1050    if obj is cant_import:
1051        logging.debug("unable to import site symbol '%s', using non-site "
1052                      "implementation", name)
1053        return dummy
1054
1055    return obj
1056
1057
1058def import_site_class(path, module, classname, baseclass, modulefile=None):
1059    """
1060    Try to import site specific class from site specific file if it exists
1061
1062    Args:
1063        path: full filename of the source file calling this (ie __file__)
1064        module: full module name
1065        classname: class name to be loaded from site file
1066        baseclass: base class object to return when no site file present or
1067            to mixin when site class exists but is not inherited from baseclass
1068        modulefile: module filename
1069
1070    Returns: baseclass if site specific class does not exist, the site specific
1071        class if it exists and is inherited from baseclass or a mixin of the
1072        site specific class and baseclass when the site specific class exists
1073        and is not inherited from baseclass
1074
1075    Raises: ImportError if the site file exists but imports fails
1076    """
1077
1078    res = import_site_symbol(path, module, classname, None, modulefile)
1079    if res:
1080        if not issubclass(res, baseclass):
1081            # if not a subclass of baseclass then mix in baseclass with the
1082            # site specific class object and return the result
1083            res = type(classname, (res, baseclass), {})
1084    else:
1085        res = baseclass
1086
1087    return res
1088
1089
1090def import_site_function(path, module, funcname, dummy, modulefile=None):
1091    """
1092    Try to import site specific function from site specific file if it exists
1093
1094    Args:
1095        path: full filename of the source file calling this (ie __file__)
1096        module: full module name
1097        funcname: function name to be imported from site file
1098        dummy: dummy function to return in case there is no function to import
1099        modulefile: module filename
1100
1101    Returns: site specific function object or dummy
1102
1103    Raises: ImportError if the site file exists but imports fails
1104    """
1105
1106    return import_site_symbol(path, module, funcname, dummy, modulefile)
1107
1108
1109def _get_pid_path(program_name):
1110    my_path = os.path.dirname(__file__)
1111    return os.path.abspath(os.path.join(my_path, "..", "..",
1112                                        "%s.pid" % program_name))
1113
1114
1115def write_pid(program_name):
1116    """
1117    Try to drop <program_name>.pid in the main autotest directory.
1118
1119    Args:
1120      program_name: prefix for file name
1121    """
1122    pidfile = open(_get_pid_path(program_name), "w")
1123    try:
1124        pidfile.write("%s\n" % os.getpid())
1125    finally:
1126        pidfile.close()
1127
1128
1129def delete_pid_file_if_exists(program_name):
1130    """
1131    Tries to remove <program_name>.pid from the main autotest directory.
1132    """
1133    pidfile_path = _get_pid_path(program_name)
1134
1135    try:
1136        os.remove(pidfile_path)
1137    except OSError:
1138        if not os.path.exists(pidfile_path):
1139            return
1140        raise
1141
1142
1143def get_pid_from_file(program_name):
1144    """
1145    Reads the pid from <program_name>.pid in the autotest directory.
1146
1147    @param program_name the name of the program
1148    @return the pid if the file exists, None otherwise.
1149    """
1150    pidfile_path = _get_pid_path(program_name)
1151    if not os.path.exists(pidfile_path):
1152        return None
1153
1154    pidfile = open(_get_pid_path(program_name), 'r')
1155
1156    try:
1157        try:
1158            pid = int(pidfile.readline())
1159        except IOError:
1160            if not os.path.exists(pidfile_path):
1161                return None
1162            raise
1163    finally:
1164        pidfile.close()
1165
1166    return pid
1167
1168
1169def program_is_alive(program_name):
1170    """
1171    Checks if the process is alive and not in Zombie state.
1172
1173    @param program_name the name of the program
1174    @return True if still alive, False otherwise
1175    """
1176    pid = get_pid_from_file(program_name)
1177    if pid is None:
1178        return False
1179    return pid_is_alive(pid)
1180
1181
1182def signal_program(program_name, sig=signal.SIGTERM):
1183    """
1184    Sends a signal to the process listed in <program_name>.pid
1185
1186    @param program_name the name of the program
1187    @param sig signal to send
1188    """
1189    pid = get_pid_from_file(program_name)
1190    if pid:
1191        signal_pid(pid, sig)
1192
1193
1194def get_relative_path(path, reference):
1195    """Given 2 absolute paths "path" and "reference", compute the path of
1196    "path" as relative to the directory "reference".
1197
1198    @param path the absolute path to convert to a relative path
1199    @param reference an absolute directory path to which the relative
1200        path will be computed
1201    """
1202    # normalize the paths (remove double slashes, etc)
1203    assert(os.path.isabs(path))
1204    assert(os.path.isabs(reference))
1205
1206    path = os.path.normpath(path)
1207    reference = os.path.normpath(reference)
1208
1209    # we could use os.path.split() but it splits from the end
1210    path_list = path.split(os.path.sep)[1:]
1211    ref_list = reference.split(os.path.sep)[1:]
1212
1213    # find the longest leading common path
1214    for i in xrange(min(len(path_list), len(ref_list))):
1215        if path_list[i] != ref_list[i]:
1216            # decrement i so when exiting this loop either by no match or by
1217            # end of range we are one step behind
1218            i -= 1
1219            break
1220    i += 1
1221    # drop the common part of the paths, not interested in that anymore
1222    del path_list[:i]
1223
1224    # for each uncommon component in the reference prepend a ".."
1225    path_list[:0] = ['..'] * (len(ref_list) - i)
1226
1227    return os.path.join(*path_list)
1228
1229
1230def sh_escape(command):
1231    """
1232    Escape special characters from a command so that it can be passed
1233    as a double quoted (" ") string in a (ba)sh command.
1234
1235    Args:
1236            command: the command string to escape.
1237
1238    Returns:
1239            The escaped command string. The required englobing double
1240            quotes are NOT added and so should be added at some point by
1241            the caller.
1242
1243    See also: http://www.tldp.org/LDP/abs/html/escapingsection.html
1244    """
1245    command = command.replace("\\", "\\\\")
1246    command = command.replace("$", r'\$')
1247    command = command.replace('"', r'\"')
1248    command = command.replace('`', r'\`')
1249    return command
1250