utils.py revision 549afada66cc95a35fb0be76357f335016a1b122
1#
2# Copyright 2008 Google Inc. Released under the GPL v2
3
4import os, pickle, random, re, resource, select, shutil, signal, StringIO
5import socket, struct, subprocess, sys, time, textwrap, urlparse
6import warnings, smtplib, logging, urllib2
7from autotest_lib.client.common_lib import error, barrier, logging_manager
8
9def deprecated(func):
10    """This is a decorator which can be used to mark functions as deprecated.
11    It will result in a warning being emmitted when the function is used."""
12    def new_func(*args, **dargs):
13        warnings.warn("Call to deprecated function %s." % func.__name__,
14                      category=DeprecationWarning)
15        return func(*args, **dargs)
16    new_func.__name__ = func.__name__
17    new_func.__doc__ = func.__doc__
18    new_func.__dict__.update(func.__dict__)
19    return new_func
20
21
22class _NullStream(object):
23    def write(self, data):
24        pass
25
26
27    def flush(self):
28        pass
29
30
31TEE_TO_LOGS = object()
32_the_null_stream = _NullStream()
33
34DEFAULT_STDOUT_LEVEL = logging.DEBUG
35DEFAULT_STDERR_LEVEL = logging.ERROR
36
37def get_stream_tee_file(stream, level):
38    if stream is None:
39        return _the_null_stream
40    if stream is TEE_TO_LOGS:
41        return logging_manager.LoggingFile(level=level)
42    return stream
43
44
45class BgJob(object):
46    def __init__(self, command, stdout_tee=None, stderr_tee=None, verbose=True,
47                 stdin=None, stderr_level=DEFAULT_STDERR_LEVEL):
48        self.command = command
49        self.stdout_tee = get_stream_tee_file(stdout_tee, DEFAULT_STDOUT_LEVEL)
50        self.stderr_tee = get_stream_tee_file(stderr_tee, stderr_level)
51        self.result = CmdResult(command)
52        if verbose:
53            logging.debug("Running '%s'" % command)
54        self.sp = subprocess.Popen(command, stdout=subprocess.PIPE,
55                                   stderr=subprocess.PIPE,
56                                   preexec_fn=self._reset_sigpipe, shell=True,
57                                   executable="/bin/bash",
58                                   stdin=stdin)
59
60
61    def output_prepare(self, stdout_file=None, stderr_file=None):
62        self.stdout_file = stdout_file
63        self.stderr_file = stderr_file
64
65
66    def process_output(self, stdout=True, final_read=False):
67        """output_prepare must be called prior to calling this"""
68        if stdout:
69            pipe, buf, tee = self.sp.stdout, self.stdout_file, self.stdout_tee
70        else:
71            pipe, buf, tee = self.sp.stderr, self.stderr_file, self.stderr_tee
72
73        if final_read:
74            # read in all the data we can from pipe and then stop
75            data = []
76            while select.select([pipe], [], [], 0)[0]:
77                data.append(os.read(pipe.fileno(), 1024))
78                if len(data[-1]) == 0:
79                    break
80            data = "".join(data)
81        else:
82            # perform a single read
83            data = os.read(pipe.fileno(), 1024)
84        buf.write(data)
85        tee.write(data)
86
87
88    def cleanup(self):
89        self.stdout_tee.flush()
90        self.stderr_tee.flush()
91        self.sp.stdout.close()
92        self.sp.stderr.close()
93        self.result.stdout = self.stdout_file.getvalue()
94        self.result.stderr = self.stderr_file.getvalue()
95
96
97    def _reset_sigpipe(self):
98        signal.signal(signal.SIGPIPE, signal.SIG_DFL)
99
100
101def ip_to_long(ip):
102    # !L is a long in network byte order
103    return struct.unpack('!L', socket.inet_aton(ip))[0]
104
105
106def long_to_ip(number):
107    # See above comment.
108    return socket.inet_ntoa(struct.pack('!L', number))
109
110
111def create_subnet_mask(bits):
112    return (1 << 32) - (1 << 32-bits)
113
114
115def format_ip_with_mask(ip, mask_bits):
116    masked_ip = ip_to_long(ip) & create_subnet_mask(mask_bits)
117    return "%s/%s" % (long_to_ip(masked_ip), mask_bits)
118
119
120def normalize_hostname(alias):
121    ip = socket.gethostbyname(alias)
122    return socket.gethostbyaddr(ip)[0]
123
124
125def get_ip_local_port_range():
126    match = re.match(r'\s*(\d+)\s*(\d+)\s*$',
127                     read_one_line('/proc/sys/net/ipv4/ip_local_port_range'))
128    return (int(match.group(1)), int(match.group(2)))
129
130
131def set_ip_local_port_range(lower, upper):
132    write_one_line('/proc/sys/net/ipv4/ip_local_port_range',
133                   '%d %d\n' % (lower, upper))
134
135
136
137def send_email(mail_from, mail_to, subject, body):
138    """
139    Sends an email via smtp
140
141    mail_from: string with email address of sender
142    mail_to: string or list with email address(es) of recipients
143    subject: string with subject of email
144    body: (multi-line) string with body of email
145    """
146    if isinstance(mail_to, str):
147        mail_to = [mail_to]
148    msg = "From: %s\nTo: %s\nSubject: %s\n\n%s" % (mail_from, ','.join(mail_to),
149                                                   subject, body)
150    try:
151        mailer = smtplib.SMTP('localhost')
152        try:
153            mailer.sendmail(mail_from, mail_to, msg)
154        finally:
155            mailer.quit()
156    except Exception, e:
157        # Emails are non-critical, not errors, but don't raise them
158        print "Sending email failed. Reason: %s" % repr(e)
159
160
161def read_one_line(filename):
162    return open(filename, 'r').readline().rstrip('\n')
163
164
165def write_one_line(filename, line):
166    open_write_close(filename, line.rstrip('\n') + '\n')
167
168
169def open_write_close(filename, data):
170    f = open(filename, 'w')
171    try:
172        f.write(data)
173    finally:
174        f.close()
175
176
177def read_keyval(path):
178    """
179    Read a key-value pair format file into a dictionary, and return it.
180    Takes either a filename or directory name as input. If it's a
181    directory name, we assume you want the file to be called keyval.
182    """
183    if os.path.isdir(path):
184        path = os.path.join(path, 'keyval')
185    keyval = {}
186    if os.path.exists(path):
187        for line in open(path):
188            line = re.sub('#.*', '', line).rstrip()
189            if not re.search(r'^[-\.\w]+=', line):
190                raise ValueError('Invalid format line: %s' % line)
191            key, value = line.split('=', 1)
192            if re.search('^\d+$', value):
193                value = int(value)
194            elif re.search('^(\d+\.)?\d+$', value):
195                value = float(value)
196            keyval[key] = value
197    return keyval
198
199
200def write_keyval(path, dictionary, type_tag=None):
201    """
202    Write a key-value pair format file out to a file. This uses append
203    mode to open the file, so existing text will not be overwritten or
204    reparsed.
205
206    If type_tag is None, then the key must be composed of alphanumeric
207    characters (or dashes+underscores). However, if type-tag is not
208    null then the keys must also have "{type_tag}" as a suffix. At
209    the moment the only valid values of type_tag are "attr" and "perf".
210    """
211    if os.path.isdir(path):
212        path = os.path.join(path, 'keyval')
213    keyval = open(path, 'a')
214
215    if type_tag is None:
216        key_regex = re.compile(r'^[-\.\w]+$')
217    else:
218        if type_tag not in ('attr', 'perf'):
219            raise ValueError('Invalid type tag: %s' % type_tag)
220        escaped_tag = re.escape(type_tag)
221        key_regex = re.compile(r'^[-\.\w]+\{%s\}$' % escaped_tag)
222    try:
223        for key in sorted(dictionary.keys()):
224            if not key_regex.search(key):
225                raise ValueError('Invalid key: %s' % key)
226            keyval.write('%s=%s\n' % (key, dictionary[key]))
227    finally:
228        keyval.close()
229
230
231def is_url(path):
232    """Return true if path looks like a URL"""
233    # for now, just handle http and ftp
234    url_parts = urlparse.urlparse(path)
235    return (url_parts[0] in ('http', 'ftp'))
236
237
238def urlopen(url, data=None, timeout=5):
239    """Wrapper to urllib2.urlopen with timeout addition."""
240
241    # Save old timeout
242    old_timeout = socket.getdefaulttimeout()
243    socket.setdefaulttimeout(timeout)
244    try:
245        return urllib2.urlopen(url, data=data)
246    finally:
247        socket.setdefaulttimeout(old_timeout)
248
249
250def urlretrieve(url, filename, data=None, timeout=300):
251    """Retrieve a file from given url."""
252    logging.debug('Fetching %s -> %s', url, filename)
253
254    src_file = urlopen(url, data=data, timeout=timeout)
255    try:
256        dest_file = open(filename, 'wb')
257        try:
258            shutil.copyfileobj(src_file, dest_file)
259        finally:
260            dest_file.close()
261    finally:
262        src_file.close()
263
264
265def get_file(src, dest, permissions=None):
266    """Get a file from src, which can be local or a remote URL"""
267    if src == dest:
268        return
269
270    if is_url(src):
271        urlretrieve(src, dest)
272    else:
273        shutil.copyfile(src, dest)
274
275    if permissions:
276        os.chmod(dest, permissions)
277    return dest
278
279
280def unmap_url(srcdir, src, destdir='.'):
281    """
282    Receives either a path to a local file or a URL.
283    returns either the path to the local file, or the fetched URL
284
285    unmap_url('/usr/src', 'foo.tar', '/tmp')
286                            = '/usr/src/foo.tar'
287    unmap_url('/usr/src', 'http://site/file', '/tmp')
288                            = '/tmp/file'
289                            (after retrieving it)
290    """
291    if is_url(src):
292        url_parts = urlparse.urlparse(src)
293        filename = os.path.basename(url_parts[2])
294        dest = os.path.join(destdir, filename)
295        return get_file(src, dest)
296    else:
297        return os.path.join(srcdir, src)
298
299
300def update_version(srcdir, preserve_srcdir, new_version, install,
301                   *args, **dargs):
302    """
303    Make sure srcdir is version new_version
304
305    If not, delete it and install() the new version.
306
307    In the preserve_srcdir case, we just check it's up to date,
308    and if not, we rerun install, without removing srcdir
309    """
310    versionfile = os.path.join(srcdir, '.version')
311    install_needed = True
312
313    if os.path.exists(versionfile):
314        old_version = pickle.load(open(versionfile))
315        if old_version == new_version:
316            install_needed = False
317
318    if install_needed:
319        if not preserve_srcdir and os.path.exists(srcdir):
320            shutil.rmtree(srcdir)
321        install(*args, **dargs)
322        if os.path.exists(srcdir):
323            pickle.dump(new_version, open(versionfile, 'w'))
324
325
326def get_stderr_level(stderr_is_expected):
327    if stderr_is_expected:
328        return DEFAULT_STDOUT_LEVEL
329    return DEFAULT_STDERR_LEVEL
330
331
332def run(command, timeout=None, ignore_status=False,
333        stdout_tee=None, stderr_tee=None, verbose=True, stdin=None,
334        stderr_is_expected=None):
335    """
336    Run a command on the host.
337
338    Args:
339            command: the command line string
340            timeout: time limit in seconds before attempting to
341                    kill the running process. The run() function
342                    will take a few seconds longer than 'timeout'
343                    to complete if it has to kill the process.
344            ignore_status: do not raise an exception, no matter what
345                    the exit code of the command is.
346            stdout_tee: optional file-like object to which stdout data
347                        will be written as it is generated (data will still
348                        be stored in result.stdout)
349            stderr_tee: likewise for stderr
350            verbose: if True, log the command being run
351            stdin: stdin to pass to the executed process
352
353    Returns:
354            a CmdResult object
355
356    Raises:
357            CmdError: the exit code of the command
358                    execution was not 0
359    """
360    if stderr_is_expected is None:
361        stderr_is_expected = ignore_status
362    bg_job = join_bg_jobs(
363        (BgJob(command, stdout_tee, stderr_tee, verbose, stdin=stdin,
364               stderr_level=get_stderr_level(stderr_is_expected)),),
365        timeout)[0]
366    if not ignore_status and bg_job.result.exit_status:
367        raise error.CmdError(command, bg_job.result,
368                             "Command returned non-zero exit status")
369
370    return bg_job.result
371
372
373def run_parallel(commands, timeout=None, ignore_status=False,
374                 stdout_tee=None, stderr_tee=None):
375    """Beahves the same as run with the following exceptions:
376
377    - commands is a list of commands to run in parallel.
378    - ignore_status toggles whether or not an exception should be raised
379      on any error.
380
381    returns a list of CmdResult objects
382    """
383    bg_jobs = []
384    for command in commands:
385        bg_jobs.append(BgJob(command, stdout_tee, stderr_tee,
386                             stderr_level=get_stderr_level(ignore_status)))
387
388    # Updates objects in bg_jobs list with their process information
389    join_bg_jobs(bg_jobs, timeout)
390
391    for bg_job in bg_jobs:
392        if not ignore_status and bg_job.result.exit_status:
393            raise error.CmdError(command, bg_job.result,
394                                 "Command returned non-zero exit status")
395
396    return [bg_job.result for bg_job in bg_jobs]
397
398
399@deprecated
400def run_bg(command):
401    """Function deprecated. Please use BgJob class instead."""
402    bg_job = BgJob(command)
403    return bg_job.sp, bg_job.result
404
405
406def join_bg_jobs(bg_jobs, timeout=None):
407    """Joins the bg_jobs with the current thread.
408
409    Returns the same list of bg_jobs objects that was passed in.
410    """
411    ret, timeout_error = 0, False
412    for bg_job in bg_jobs:
413        bg_job.output_prepare(StringIO.StringIO(), StringIO.StringIO())
414
415    try:
416        # We are holding ends to stdin, stdout pipes
417        # hence we need to be sure to close those fds no mater what
418        start_time = time.time()
419        timeout_error = _wait_for_commands(bg_jobs, start_time, timeout)
420
421        for bg_job in bg_jobs:
422            # Process stdout and stderr
423            bg_job.process_output(stdout=True,final_read=True)
424            bg_job.process_output(stdout=False,final_read=True)
425    finally:
426        # close our ends of the pipes to the sp no matter what
427        for bg_job in bg_jobs:
428            bg_job.cleanup()
429
430    if timeout_error:
431        # TODO: This needs to be fixed to better represent what happens when
432        # running in parallel. However this is backwards compatable, so it will
433        # do for the time being.
434        raise error.CmdError(bg_jobs[0].command, bg_jobs[0].result,
435                             "Command(s) did not complete within %d seconds"
436                             % timeout)
437
438
439    return bg_jobs
440
441
442def _wait_for_commands(bg_jobs, start_time, timeout):
443    # This returns True if it must return due to a timeout, otherwise False.
444
445    # To check for processes which terminate without producing any output
446    # a 1 second timeout is used in select.
447    SELECT_TIMEOUT = 1
448
449    select_list = []
450    reverse_dict = {}
451    for bg_job in bg_jobs:
452        select_list.append(bg_job.sp.stdout)
453        select_list.append(bg_job.sp.stderr)
454        reverse_dict[bg_job.sp.stdout] = (bg_job,True)
455        reverse_dict[bg_job.sp.stderr] = (bg_job,False)
456
457    if timeout:
458        stop_time = start_time + timeout
459        time_left = stop_time - time.time()
460    else:
461        time_left = None # so that select never times out
462    while not timeout or time_left > 0:
463        # select will return when stdout is ready (including when it is
464        # EOF, that is the process has terminated).
465        ready, _, _ = select.select(select_list, [], [], SELECT_TIMEOUT)
466
467        # os.read() has to be used instead of
468        # subproc.stdout.read() which will otherwise block
469        for fileno in ready:
470            bg_job,stdout = reverse_dict[fileno]
471            bg_job.process_output(stdout)
472
473        remaining_jobs = [x for x in bg_jobs if x.result.exit_status is None]
474        if len(remaining_jobs) == 0:
475            return False
476        for bg_job in remaining_jobs:
477            bg_job.result.exit_status = bg_job.sp.poll()
478
479        if timeout:
480            time_left = stop_time - time.time()
481
482    # Kill all processes which did not complete prior to timeout
483    for bg_job in [x for x in bg_jobs if x.result.exit_status is None]:
484        print '* Warning: run process timeout (%s) fired' % timeout
485        nuke_subprocess(bg_job.sp)
486        bg_job.result.exit_status = bg_job.sp.poll()
487
488    return True
489
490
491def pid_is_alive(pid):
492    """
493    True if process pid exists and is not yet stuck in Zombie state.
494    Zombies are impossible to move between cgroups, etc.
495    pid can be integer, or text of integer.
496    """
497    path = '/proc/%s/stat' % pid
498
499    try:
500        stat = read_one_line(path)
501    except IOError:
502        if not os.path.exists(path):
503            # file went away
504            return False
505        raise
506
507    return stat.split()[2] != 'Z'
508
509
510def signal_pid(pid, sig):
511    """
512    Sends a signal to a process id. Returns True if the process terminated
513    successfully, False otherwise.
514    """
515    try:
516        os.kill(pid, sig)
517    except OSError:
518        # The process may have died before we could kill it.
519        pass
520
521    for i in range(5):
522        if not pid_is_alive(pid):
523            return True
524        time.sleep(1)
525
526    # The process is still alive
527    return False
528
529
530def nuke_subprocess(subproc):
531    # check if the subprocess is still alive, first
532    if subproc.poll() is not None:
533        return subproc.poll()
534
535    # the process has not terminated within timeout,
536    # kill it via an escalating series of signals.
537    signal_queue = [signal.SIGTERM, signal.SIGKILL]
538    for sig in signal_queue:
539        signal_pid(subproc.pid, sig)
540        if subproc.poll() is not None:
541            return subproc.poll()
542
543
544def nuke_pid(pid):
545    # the process has not terminated within timeout,
546    # kill it via an escalating series of signals.
547    signal_queue = [signal.SIGTERM, signal.SIGKILL]
548    for sig in signal_queue:
549        if signal_pid(pid, sig):
550            return
551
552    # no signal successfully terminated the process
553    raise error.AutoservRunError('Could not kill %d' % pid, None)
554
555
556def system(command, timeout=None, ignore_status=False):
557    """This function returns the exit status of command."""
558    return run(command, timeout=timeout, ignore_status=ignore_status,
559               stdout_tee=TEE_TO_LOGS, stderr_tee=TEE_TO_LOGS).exit_status
560
561
562def system_parallel(commands, timeout=None, ignore_status=False):
563    """This function returns a list of exit statuses for the respective
564    list of commands."""
565    return [bg_jobs.exit_status for bg_jobs in
566            run_parallel(commands, timeout=timeout, ignore_status=ignore_status,
567                         stdout_tee=TEE_TO_LOGS, stderr_tee=TEE_TO_LOGS)]
568
569
570def system_output(command, timeout=None, ignore_status=False,
571                  retain_output=False):
572    if retain_output:
573        out = run(command, timeout=timeout, ignore_status=ignore_status,
574                  stdout_tee=TEE_TO_LOGS, stderr_tee=TEE_TO_LOGS).stdout
575    else:
576        out = run(command, timeout=timeout, ignore_status=ignore_status).stdout
577    if out[-1:] == '\n': out = out[:-1]
578    return out
579
580
581def system_output_parallel(commands, timeout=None, ignore_status=False,
582                           retain_output=False):
583    if retain_output:
584        out = [bg_job.stdout for bg_job
585               in run_parallel(commands, timeout=timeout,
586                               ignore_status=ignore_status,
587                               stdout_tee=TEE_TO_LOGS, stderr_tee=TEE_TO_LOGS)]
588    else:
589        out = [bg_job.stdout for bg_job in run_parallel(commands,
590                                  timeout=timeout, ignore_status=ignore_status)]
591    for x in out:
592        if out[-1:] == '\n': out = out[:-1]
593    return out
594
595
596def strip_unicode(input):
597    if type(input) == list:
598        return [strip_unicode(i) for i in input]
599    elif type(input) == dict:
600        output = {}
601        for key in input.keys():
602            output[str(key)] = strip_unicode(input[key])
603        return output
604    elif type(input) == unicode:
605        return str(input)
606    else:
607        return input
608
609
610def get_cpu_percentage(function, *args, **dargs):
611    """Returns a tuple containing the CPU% and return value from function call.
612
613    This function calculates the usage time by taking the difference of
614    the user and system times both before and after the function call.
615    """
616    child_pre = resource.getrusage(resource.RUSAGE_CHILDREN)
617    self_pre = resource.getrusage(resource.RUSAGE_SELF)
618    start = time.time()
619    to_return = function(*args, **dargs)
620    elapsed = time.time() - start
621    self_post = resource.getrusage(resource.RUSAGE_SELF)
622    child_post = resource.getrusage(resource.RUSAGE_CHILDREN)
623
624    # Calculate CPU Percentage
625    s_user, s_system = [a - b for a, b in zip(self_post, self_pre)[:2]]
626    c_user, c_system = [a - b for a, b in zip(child_post, child_pre)[:2]]
627    cpu_percent = (s_user + c_user + s_system + c_system) / elapsed
628
629    return cpu_percent, to_return
630
631
632"""
633This function is used when there is a need to run more than one
634job simultaneously starting exactly at the same time. It basically returns
635a modified control file (containing the synchronization code prepended)
636whenever it is ready to run the control file. The synchronization
637is done using barriers to make sure that the jobs start at the same time.
638
639Here is how the synchronization is done to make sure that the tests
640start at exactly the same time on the client.
641sc_bar is a server barrier and s_bar, c_bar are the normal barriers
642
643                  Job1              Job2         ......      JobN
644 Server:   |                        sc_bar
645 Server:   |                        s_bar        ......      s_bar
646 Server:   |      at.run()         at.run()      ......      at.run()
647 ----------|------------------------------------------------------
648 Client    |      sc_bar
649 Client    |      c_bar             c_bar        ......      c_bar
650 Client    |    <run test>         <run test>    ......     <run test>
651
652
653PARAMS:
654   control_file : The control file which to which the above synchronization
655                  code would be prepended to
656   host_name    : The host name on which the job is going to run
657   host_num (non negative) : A number to identify the machine so that we have
658                  different sets of s_bar_ports for each of the machines.
659   instance     : The number of the job
660   num_jobs     : Total number of jobs that are going to run in parallel with
661                  this job starting at the same time
662   port_base    : Port number that is used to derive the actual barrier ports.
663
664RETURN VALUE:
665    The modified control file.
666
667"""
668def get_sync_control_file(control, host_name, host_num,
669                          instance, num_jobs, port_base=63100):
670    sc_bar_port = port_base
671    c_bar_port = port_base
672    if host_num < 0:
673        print "Please provide a non negative number for the host"
674        return None
675    s_bar_port = port_base + 1 + host_num # The set of s_bar_ports are
676                                          # the same for a given machine
677
678    sc_bar_timeout = 180
679    s_bar_timeout = c_bar_timeout = 120
680
681    # The barrier code snippet is prepended into the conrol file
682    # dynamically before at.run() is called finally.
683    control_new = []
684
685    # jobid is the unique name used to identify the processes
686    # trying to reach the barriers
687    jobid = "%s#%d" % (host_name, instance)
688
689    rendv = []
690    # rendvstr is a temp holder for the rendezvous list of the processes
691    for n in range(num_jobs):
692        rendv.append("'%s#%d'" % (host_name, n))
693    rendvstr = ",".join(rendv)
694
695    if instance == 0:
696        # Do the setup and wait at the server barrier
697        # Clean up the tmp and the control dirs for the first instance
698        control_new.append('if os.path.exists(job.tmpdir):')
699        control_new.append("\t system('umount -f %s > /dev/null"
700                           "2> /dev/null' % job.tmpdir,"
701                           "ignore_status=True)")
702        control_new.append("\t system('rm -rf ' + job.tmpdir)")
703        control_new.append(
704            'b0 = job.barrier("%s", "sc_bar", %d, port=%d)'
705            % (jobid, sc_bar_timeout, sc_bar_port))
706        control_new.append(
707        'b0.rendezvous_servers("PARALLEL_MASTER", "%s")'
708         % jobid)
709
710    elif instance == 1:
711        # Wait at the server barrier to wait for instance=0
712        # process to complete setup
713        b0 = barrier.barrier("PARALLEL_MASTER", "sc_bar", sc_bar_timeout,
714                     port=sc_bar_port)
715        b0.rendezvous_servers("PARALLEL_MASTER", jobid)
716
717        if(num_jobs > 2):
718            b1 = barrier.barrier(jobid, "s_bar", s_bar_timeout,
719                         port=s_bar_port)
720            b1.rendezvous(rendvstr)
721
722    else:
723        # For the rest of the clients
724        b2 = barrier.barrier(jobid, "s_bar", s_bar_timeout, port=s_bar_port)
725        b2.rendezvous(rendvstr)
726
727    # Client side barrier for all the tests to start at the same time
728    control_new.append('b1 = job.barrier("%s", "c_bar", %d, port=%d)'
729                    % (jobid, c_bar_timeout, c_bar_port))
730    control_new.append("b1.rendezvous(%s)" % rendvstr)
731
732    # Stick in the rest of the control file
733    control_new.append(control)
734
735    return "\n".join(control_new)
736
737
738def get_arch(run_function=run):
739    """
740    Get the hardware architecture of the machine.
741    run_function is used to execute the commands. It defaults to
742    utils.run() but a custom method (if provided) should be of the
743    same schema as utils.run. It should return a CmdResult object and
744    throw a CmdError exception.
745    """
746    arch = run_function('/bin/uname -m').stdout.rstrip()
747    if re.match(r'i\d86$', arch):
748        arch = 'i386'
749    return arch
750
751
752def get_num_logical_cpus_per_socket(run_function=run):
753    """
754    Get the number of cores (including hyperthreading) per cpu.
755    run_function is used to execute the commands. It defaults to
756    utils.run() but a custom method (if provided) should be of the
757    same schema as utils.run. It should return a CmdResult object and
758    throw a CmdError exception.
759    """
760    siblings = run_function('grep "^siblings" /proc/cpuinfo').stdout.rstrip()
761    num_siblings = map(int,
762                       re.findall(r'^siblings\s*:\s*(\d+)\s*$',
763                                  siblings, re.M))
764    if len(num_siblings) == 0:
765        raise error.TestError('Unable to find siblings info in /proc/cpuinfo')
766    if min(num_siblings) != max(num_siblings):
767        raise error.TestError('Number of siblings differ %r' %
768                              num_siblings)
769    return num_siblings[0]
770
771
772def merge_trees(src, dest):
773    """
774    Merges a source directory tree at 'src' into a destination tree at
775    'dest'. If a path is a file in both trees than the file in the source
776    tree is APPENDED to the one in the destination tree. If a path is
777    a directory in both trees then the directories are recursively merged
778    with this function. In any other case, the function will skip the
779    paths that cannot be merged (instead of failing).
780    """
781    if not os.path.exists(src):
782        return # exists only in dest
783    elif not os.path.exists(dest):
784        if os.path.isfile(src):
785            shutil.copy2(src, dest) # file only in src
786        else:
787            shutil.copytree(src, dest, symlinks=True) # dir only in src
788        return
789    elif os.path.isfile(src) and os.path.isfile(dest):
790        # src & dest are files in both trees, append src to dest
791        destfile = open(dest, "a")
792        try:
793            srcfile = open(src)
794            try:
795                destfile.write(srcfile.read())
796            finally:
797                srcfile.close()
798        finally:
799            destfile.close()
800    elif os.path.isdir(src) and os.path.isdir(dest):
801        # src & dest are directories in both trees, so recursively merge
802        for name in os.listdir(src):
803            merge_trees(os.path.join(src, name), os.path.join(dest, name))
804    else:
805        # src & dest both exist, but are incompatible
806        return
807
808
809class CmdResult(object):
810    """
811    Command execution result.
812
813    command:     String containing the command line itself
814    exit_status: Integer exit code of the process
815    stdout:      String containing stdout of the process
816    stderr:      String containing stderr of the process
817    duration:    Elapsed wall clock time running the process
818    """
819
820
821    def __init__(self, command="", stdout="", stderr="",
822                 exit_status=None, duration=0):
823        self.command = command
824        self.exit_status = exit_status
825        self.stdout = stdout
826        self.stderr = stderr
827        self.duration = duration
828
829
830    def __repr__(self):
831        wrapper = textwrap.TextWrapper(width = 78,
832                                       initial_indent="\n    ",
833                                       subsequent_indent="    ")
834
835        stdout = self.stdout.rstrip()
836        if stdout:
837            stdout = "\nstdout:\n%s" % stdout
838
839        stderr = self.stderr.rstrip()
840        if stderr:
841            stderr = "\nstderr:\n%s" % stderr
842
843        return ("* Command: %s\n"
844                "Exit status: %s\n"
845                "Duration: %s\n"
846                "%s"
847                "%s"
848                % (wrapper.fill(self.command), self.exit_status,
849                self.duration, stdout, stderr))
850
851
852class run_randomly:
853    def __init__(self, run_sequentially=False):
854        # Run sequentially is for debugging control files
855        self.test_list = []
856        self.run_sequentially = run_sequentially
857
858
859    def add(self, *args, **dargs):
860        test = (args, dargs)
861        self.test_list.append(test)
862
863
864    def run(self, fn):
865        while self.test_list:
866            test_index = random.randint(0, len(self.test_list)-1)
867            if self.run_sequentially:
868                test_index = 0
869            (args, dargs) = self.test_list.pop(test_index)
870            fn(*args, **dargs)
871
872
873def import_site_symbol(path, module, name, dummy=None, modulefile=None):
874    """
875    Try to import site specific symbol from site specific file if it exists
876
877    @param path full filename of the source file calling this (ie __file__)
878    @param module full module name
879    @param name symbol name to be imported from the site file
880    @param dummy dummy value to return in case there is no symbol to import
881    @param modulefile module filename
882
883    @return site specific symbol or dummy
884
885    @exception ImportError if the site file exists but imports fails
886    """
887    short_module = module[module.rfind(".") + 1:]
888
889    if not modulefile:
890        modulefile = short_module + ".py"
891
892    try:
893        site_exists = os.path.getsize(os.path.join(os.path.dirname(path),
894                                                   modulefile))
895    except os.error:
896        site_exists = False
897
898    msg = None
899    if site_exists:
900        # special unique value to tell us if the symbol can't be imported
901        cant_import = object()
902
903        # return the object from the imported module
904        obj = getattr(__import__(module, {}, {}, [short_module]), name,
905                      cant_import)
906        if obj is cant_import:
907            msg = ("unable to import site symbol '%s', using non-site "
908                   "implementation") % name
909        logging.error(msg)
910        obj = dummy
911    else:
912        obj = dummy
913
914    return obj
915
916
917def import_site_class(path, module, classname, baseclass, modulefile=None):
918    """
919    Try to import site specific class from site specific file if it exists
920
921    Args:
922        path: full filename of the source file calling this (ie __file__)
923        module: full module name
924        classname: class name to be loaded from site file
925        baseclass: base class object to return when no site file present or
926            to mixin when site class exists but is not inherited from baseclass
927        modulefile: module filename
928
929    Returns: baseclass if site specific class does not exist, the site specific
930        class if it exists and is inherited from baseclass or a mixin of the
931        site specific class and baseclass when the site specific class exists
932        and is not inherited from baseclass
933
934    Raises: ImportError if the site file exists but imports fails
935    """
936
937    res = import_site_symbol(path, module, classname, None, modulefile)
938    if res:
939        if not issubclass(res, baseclass):
940            # if not a subclass of baseclass then mix in baseclass with the
941            # site specific class object and return the result
942            res = type(classname, (res, baseclass), {})
943    else:
944        res = baseclass
945
946    return res
947
948
949def import_site_function(path, module, funcname, dummy, modulefile=None):
950    """
951    Try to import site specific function from site specific file if it exists
952
953    Args:
954        path: full filename of the source file calling this (ie __file__)
955        module: full module name
956        funcname: function name to be imported from site file
957        dummy: dummy function to return in case there is no function to import
958        modulefile: module filename
959
960    Returns: site specific function object or dummy
961
962    Raises: ImportError if the site file exists but imports fails
963    """
964
965    return import_site_symbol(path, module, funcname, dummy, modulefile)
966
967
968def _get_pid_path(program_name):
969    my_path = os.path.dirname(__file__)
970    return os.path.abspath(os.path.join(my_path, "..", "..",
971                                        "%s.pid" % program_name))
972
973
974def write_pid(program_name):
975    """
976    Try to drop <program_name>.pid in the main autotest directory.
977
978    Args:
979      program_name: prefix for file name
980    """
981    pidfile = open(_get_pid_path(program_name), "w")
982    try:
983        pidfile.write("%s\n" % os.getpid())
984    finally:
985        pidfile.close()
986
987
988def delete_pid_file_if_exists(program_name):
989    """
990    Tries to remove <program_name>.pid from the main autotest directory.
991    """
992    pidfile_path = _get_pid_path(program_name)
993
994    try:
995        os.remove(pidfile_path)
996    except OSError:
997        if not os.path.exists(pidfile_path):
998            return
999        raise
1000
1001
1002def get_pid_from_file(program_name):
1003    """
1004    Reads the pid from <program_name>.pid in the autotest directory.
1005
1006    @param program_name the name of the program
1007    @return the pid if the file exists, None otherwise.
1008    """
1009    pidfile_path = _get_pid_path(program_name)
1010    if not os.path.exists(pidfile_path):
1011        return None
1012
1013    pidfile = open(_get_pid_path(program_name), 'r')
1014
1015    try:
1016        try:
1017            pid = int(pidfile.readline())
1018        except IOError:
1019            if not os.path.exists(pidfile_path):
1020                return None
1021            raise
1022    finally:
1023        pidfile.close()
1024
1025    return pid
1026
1027
1028def process_is_alive(program_name):
1029    """
1030    Checks if the process is alive and not in Zombie state.
1031
1032    @param program_name the name of the program
1033    @return True if still alive, False otherwise
1034    """
1035    pid = get_pid_from_file(program_name)
1036    if pid is None:
1037        return False
1038    return pid_is_alive(pid)
1039
1040
1041def signal_process(program_name, sig=signal.SIGTERM):
1042    """
1043    Sends a signal to the process listed in <program_name>.pid
1044
1045    @param program_name the name of the program
1046    @param sig signal to send
1047    """
1048    pid = get_pid_from_file(program_name)
1049    if pid:
1050        signal_pid(pid, sig)
1051
1052
1053def get_relative_path(path, reference):
1054    """Given 2 absolute paths "path" and "reference", compute the path of
1055    "path" as relative to the directory "reference".
1056
1057    @param path the absolute path to convert to a relative path
1058    @param reference an absolute directory path to which the relative
1059        path will be computed
1060    """
1061    # normalize the paths (remove double slashes, etc)
1062    assert(os.path.isabs(path))
1063    assert(os.path.isabs(reference))
1064
1065    path = os.path.normpath(path)
1066    reference = os.path.normpath(reference)
1067
1068    # we could use os.path.split() but it splits from the end
1069    path_list = path.split(os.path.sep)[1:]
1070    ref_list = reference.split(os.path.sep)[1:]
1071
1072    # find the longest leading common path
1073    for i in xrange(min(len(path_list), len(ref_list))):
1074        if path_list[i] != ref_list[i]:
1075            # decrement i so when exiting this loop either by no match or by
1076            # end of range we are one step behind
1077            i -= 1
1078            break
1079    i += 1
1080    # drop the common part of the paths, not interested in that anymore
1081    del path_list[:i]
1082
1083    # for each uncommon component in the reference prepend a ".."
1084    path_list[:0] = ['..'] * (len(ref_list) - i)
1085
1086    return os.path.join(*path_list)
1087
1088
1089def sh_escape(command):
1090    """
1091    Escape special characters from a command so that it can be passed
1092    as a double quoted (" ") string in a (ba)sh command.
1093
1094    Args:
1095            command: the command string to escape.
1096
1097    Returns:
1098            The escaped command string. The required englobing double
1099            quotes are NOT added and so should be added at some point by
1100            the caller.
1101
1102    See also: http://www.tldp.org/LDP/abs/html/escapingsection.html
1103    """
1104    command = command.replace("\\", "\\\\")
1105    command = command.replace("$", r'\$')
1106    command = command.replace('"', r'\"')
1107    command = command.replace('`', r'\`')
1108    return command
1109