utils.py revision ed91ba902a885505433827b2586117371632a76b
1#!/usr/bin/python
2#
3# Copyright 2008 Google Inc. Released under the GPL v2
4
5import os, pickle, random, re, resource, select, shutil, signal, StringIO
6import socket, struct, subprocess, sys, time, textwrap, urllib, urlparse
7import warnings
8from autotest_lib.client.common_lib import error, barrier
9
10def deprecated(func):
11    """This is a decorator which can be used to mark functions as deprecated.
12    It will result in a warning being emmitted when the function is used."""
13    def new_func(*args, **dargs):
14        warnings.warn("Call to deprecated function %s." % func.__name__,
15                      category=DeprecationWarning)
16        return func(*args, **dargs)
17    new_func.__name__ = func.__name__
18    new_func.__doc__ = func.__doc__
19    new_func.__dict__.update(func.__dict__)
20    return new_func
21
22
23class BgJob(object):
24    def __init__(self, command, stdout_tee=None, stderr_tee=None, verbose=True):
25        self.command = command
26        self.stdout_tee = stdout_tee
27        self.stderr_tee = stderr_tee
28        self.result = CmdResult(command)
29        if verbose:
30            print "running: %s" % command
31        self.sp = subprocess.Popen(command, stdout=subprocess.PIPE,
32                                   stderr=subprocess.PIPE,
33                                   preexec_fn=self._reset_sigpipe, shell=True,
34                                   executable="/bin/bash")
35
36
37    def output_prepare(self, stdout_file=None, stderr_file=None):
38        self.stdout_file = stdout_file
39        self.stderr_file = stderr_file
40
41    def process_output(self, stdout=True, final_read=False):
42        """output_prepare must be called prior to calling this"""
43        if stdout:
44            pipe, buf, tee = self.sp.stdout, self.stdout_file, self.stdout_tee
45        else:
46            pipe, buf, tee = self.sp.stderr, self.stderr_file, self.stderr_tee
47
48        if final_read:
49            # read in all the data we can from pipe and then stop
50            data = []
51            while select.select([pipe], [], [], 0)[0]:
52                data.append(os.read(pipe.fileno(), 1024))
53                if len(data[-1]) == 0:
54                    break
55            data = "".join(data)
56        else:
57            # perform a single read
58            data = os.read(pipe.fileno(), 1024)
59        buf.write(data)
60        if tee:
61            tee.write(data)
62            tee.flush()
63
64
65    def cleanup(self):
66        self.sp.stdout.close()
67        self.sp.stderr.close()
68        self.result.stdout = self.stdout_file.getvalue()
69        self.result.stderr = self.stderr_file.getvalue()
70
71
72    def _reset_sigpipe(self):
73        signal.signal(signal.SIGPIPE, signal.SIG_DFL)
74
75
76def ip_to_long(ip):
77    # !L is a long in network byte order
78    return struct.unpack('!L', socket.inet_aton(ip))[0]
79
80
81def long_to_ip(number):
82    # See above comment.
83    return socket.inet_ntoa(struct.pack('!L', number))
84
85
86def create_subnet_mask(bits):
87    # ~ does weird things in python...but this does work
88    return (1 << 32) - (1 << 32-bits)
89
90
91def format_ip_with_mask(ip, mask_bits):
92    masked_ip = ip_to_long(ip) & create_subnet_mask(mask_bits)
93    return "%s/%s" % (long_to_ip(masked_ip), mask_bits)
94
95
96def get_ip_local_port_range():
97    match = re.match(r'\s*(\d+)\s*(\d+)\s*$',
98                     read_one_line('/proc/sys/net/ipv4/ip_local_port_range'))
99    return (int(match.group(1)), int(match.group(2)))
100
101
102def set_ip_local_port_range(lower, upper):
103    write_one_line('/proc/sys/net/ipv4/ip_local_port_range',
104                   '%d %d\n' % (lower, upper))
105
106def read_one_line(filename):
107    return open(filename, 'r').readline().rstrip('\n')
108
109
110def write_one_line(filename, str):
111    open(filename, 'w').write(str.rstrip('\n') + '\n')
112
113
114def read_keyval(path):
115    """
116    Read a key-value pair format file into a dictionary, and return it.
117    Takes either a filename or directory name as input. If it's a
118    directory name, we assume you want the file to be called keyval.
119    """
120    if os.path.isdir(path):
121        path = os.path.join(path, 'keyval')
122    keyval = {}
123    for line in open(path):
124        line = re.sub('#.*', '', line).rstrip()
125        if not re.search(r'^[-\w]+=', line):
126            raise ValueError('Invalid format line: %s' % line)
127        key, value = line.split('=', 1)
128        if re.search('^\d+$', value):
129            value = int(value)
130        elif re.search('^(\d+\.)?\d+$', value):
131            value = float(value)
132        keyval[key] = value
133    return keyval
134
135
136def write_keyval(path, dictionary, type_tag=None):
137    """
138    Write a key-value pair format file out to a file. This uses append
139    mode to open the file, so existing text will not be overwritten or
140    reparsed.
141
142    If type_tag is None, then the key must be composed of alphanumeric
143    characters (or dashes+underscores). However, if type-tag is not
144    null then the keys must also have "{type_tag}" as a suffix. At
145    the moment the only valid values of type_tag are "attr" and "perf".
146    """
147    if os.path.isdir(path):
148        path = os.path.join(path, 'keyval')
149    keyval = open(path, 'a')
150
151    if type_tag is None:
152        key_regex = re.compile(r'^[-\w]+$')
153    else:
154        if type_tag not in ('attr', 'perf'):
155            raise ValueError('Invalid type tag: %s' % type_tag)
156        escaped_tag = re.escape(type_tag)
157        key_regex = re.compile(r'^[-\w]+\{%s\}$' % escaped_tag)
158    try:
159        for key, value in dictionary.iteritems():
160            if not key_regex.search(key):
161                raise ValueError('Invalid key: %s' % key)
162            keyval.write('%s=%s\n' % (key, value))
163    finally:
164        keyval.close()
165
166
167def is_url(path):
168    """Return true if path looks like a URL"""
169    # for now, just handle http and ftp
170    url_parts = urlparse.urlparse(path)
171    return (url_parts[0] in ('http', 'ftp'))
172
173
174def urlopen(url, data=None, proxies=None, timeout=5):
175    """Wrapper to urllib.urlopen with timeout addition."""
176
177    # Save old timeout
178    old_timeout = socket.getdefaulttimeout()
179    socket.setdefaulttimeout(timeout)
180    try:
181        return urllib.urlopen(url, data=data, proxies=proxies)
182    finally:
183        socket.setdefaulttimeout(old_timeout)
184
185
186def urlretrieve(url, filename=None, reporthook=None, data=None, timeout=300):
187    """Wrapper to urllib.urlretrieve with timeout addition."""
188    old_timeout = socket.getdefaulttimeout()
189    socket.setdefaulttimeout(timeout)
190    try:
191        return urllib.urlretrieve(url, filename=filename,
192                                  reporthook=reporthook, data=data)
193    finally:
194        socket.setdefaulttimeout(old_timeout)
195
196
197def get_file(src, dest, permissions=None):
198    """Get a file from src, which can be local or a remote URL"""
199    if (src == dest):
200        return
201    if (is_url(src)):
202        print 'PWD: ' + os.getcwd()
203        print 'Fetching \n\t', src, '\n\t->', dest
204        try:
205            urllib.urlretrieve(src, dest)
206        except IOError, e:
207            raise error.AutotestError('Unable to retrieve %s (to %s)'
208                                % (src, dest), e)
209    else:
210        shutil.copyfile(src, dest)
211    if permissions:
212        os.chmod(dest, permissions)
213    return dest
214
215
216def unmap_url(srcdir, src, destdir='.'):
217    """
218    Receives either a path to a local file or a URL.
219    returns either the path to the local file, or the fetched URL
220
221    unmap_url('/usr/src', 'foo.tar', '/tmp')
222                            = '/usr/src/foo.tar'
223    unmap_url('/usr/src', 'http://site/file', '/tmp')
224                            = '/tmp/file'
225                            (after retrieving it)
226    """
227    if is_url(src):
228        url_parts = urlparse.urlparse(src)
229        filename = os.path.basename(url_parts[2])
230        dest = os.path.join(destdir, filename)
231        return get_file(src, dest)
232    else:
233        return os.path.join(srcdir, src)
234
235
236def update_version(srcdir, preserve_srcdir, new_version, install,
237                   *args, **dargs):
238    """
239    Make sure srcdir is version new_version
240
241    If not, delete it and install() the new version.
242
243    In the preserve_srcdir case, we just check it's up to date,
244    and if not, we rerun install, without removing srcdir
245    """
246    versionfile = os.path.join(srcdir, '.version')
247    install_needed = True
248
249    if os.path.exists(versionfile):
250        old_version = pickle.load(open(versionfile))
251        if old_version == new_version:
252            install_needed = False
253
254    if install_needed:
255        if not preserve_srcdir and os.path.exists(srcdir):
256            shutil.rmtree(srcdir)
257        install(*args, **dargs)
258        if os.path.exists(srcdir):
259            pickle.dump(new_version, open(versionfile, 'w'))
260
261
262def run(command, timeout=None, ignore_status=False,
263        stdout_tee=None, stderr_tee=None, verbose=True):
264    """
265    Run a command on the host.
266
267    Args:
268            command: the command line string
269            timeout: time limit in seconds before attempting to
270                    kill the running process. The run() function
271                    will take a few seconds longer than 'timeout'
272                    to complete if it has to kill the process.
273            ignore_status: do not raise an exception, no matter what
274                    the exit code of the command is.
275            stdout_tee: optional file-like object to which stdout data
276                        will be written as it is generated (data will still
277                        be stored in result.stdout)
278            stderr_tee: likewise for stderr
279
280    Returns:
281            a CmdResult object
282
283    Raises:
284            CmdError: the exit code of the command
285                    execution was not 0
286    """
287    bg_job = join_bg_jobs((BgJob(command, stdout_tee, stderr_tee, verbose),),
288                          timeout)[0]
289    if not ignore_status and bg_job.result.exit_status:
290        raise error.CmdError(command, bg_job.result,
291                             "Command returned non-zero exit status")
292
293    return bg_job.result
294
295def run_parallel(commands, timeout=None, ignore_status=False,
296                 stdout_tee=None, stderr_tee=None):
297    """Beahves the same as run with the following exceptions:
298
299    - commands is a list of commands to run in parallel.
300    - ignore_status toggles whether or not an exception should be raised
301      on any error.
302
303    returns a list of CmdResult objects
304    """
305    bg_jobs = []
306    for command in commands:
307        bg_jobs.append(BgJob(command, stdout_tee, stderr_tee))
308
309    # Updates objects in bg_jobs list with their process information
310    join_bg_jobs(bg_jobs, timeout)
311
312    for bg_job in bg_jobs:
313        if not ignore_status and bg_job.result.exit_status:
314            raise error.CmdError(command, bg_job.result,
315                                 "Command returned non-zero exit status")
316
317    return [bg_job.result for bg_job in bg_jobs]
318
319
320@deprecated
321def run_bg(command):
322    """Function deprecated. Please use BgJob class instead."""
323    bg_job = BgJob(command)
324    return bg_job.sp, bg_job.result
325
326
327def join_bg_jobs(bg_jobs, timeout=None):
328    """Joins the bg_jobs with the current thread.
329
330    Returns the same list of bg_jobs objects that was passed in.
331    """
332    ret, timeouterr = 0, False
333    for bg_job in bg_jobs:
334        bg_job.output_prepare(StringIO.StringIO(), StringIO.StringIO())
335
336    try:
337        # We are holding ends to stdin, stdout pipes
338        # hence we need to be sure to close those fds no mater what
339        start_time = time.time()
340        timeout_error = _wait_for_commands(bg_jobs, start_time, timeout)
341
342        for bg_job in bg_jobs:
343            # Process stdout and stderr
344            bg_job.process_output(stdout=True,final_read=True)
345            bg_job.process_output(stdout=False,final_read=True)
346    finally:
347        # close our ends of the pipes to the sp no matter what
348        for bg_job in bg_jobs:
349            bg_job.cleanup()
350
351    if timeout_error:
352        # TODO: This needs to be fixed to better represent what happens when
353        # running in parallel. However this is backwards compatable, so it will
354        # do for the time being.
355        raise error.CmdError(bg_jobs[0].command, bg_jobs[0].result,
356                             "Command(s) did not complete within %d seconds"
357                             % timeout)
358
359
360    return bg_jobs
361
362
363def _wait_for_commands(bg_jobs, start_time, timeout):
364    # This returns True if it must return due to a timeout, otherwise False.
365
366    # To check for processes which terminate without producing any output
367    # a 1 second timeout is used in select.
368    SELECT_TIMEOUT = 1
369
370    select_list = []
371    reverse_dict = {}
372    for bg_job in bg_jobs:
373        select_list.append(bg_job.sp.stdout)
374        select_list.append(bg_job.sp.stderr)
375        reverse_dict[bg_job.sp.stdout] = (bg_job,True)
376        reverse_dict[bg_job.sp.stderr] = (bg_job,False)
377
378    if timeout:
379        stop_time = start_time + timeout
380        time_left = stop_time - time.time()
381    else:
382        time_left = None # so that select never times out
383    while not timeout or time_left > 0:
384        # select will return when stdout is ready (including when it is
385        # EOF, that is the process has terminated).
386        ready, _, _ = select.select(select_list, [], [], SELECT_TIMEOUT)
387
388        # os.read() has to be used instead of
389        # subproc.stdout.read() which will otherwise block
390        for fileno in ready:
391            bg_job,stdout = reverse_dict[fileno]
392            bg_job.process_output(stdout)
393
394        remaining_jobs = [x for x in bg_jobs if x.result.exit_status is None]
395        if len(remaining_jobs) == 0:
396            return False
397        for bg_job in remaining_jobs:
398            bg_job.result.exit_status = bg_job.sp.poll()
399
400        if timeout:
401            time_left = stop_time - time.time()
402
403    # Kill all processes which did not complete prior to timeout
404    for bg_job in [x for x in bg_jobs if x.result.exit_status is None]:
405        nuke_subprocess(bg_job.sp)
406
407    return True
408
409
410def nuke_subprocess(subproc):
411    # check if the subprocess is still alive, first
412    if subproc.poll() is not None:
413        return subproc.poll()
414
415    # the process has not terminated within timeout,
416    # kill it via an escalating series of signals.
417    signal_queue = [signal.SIGTERM, signal.SIGKILL]
418    for sig in signal_queue:
419        try:
420            os.kill(subproc.pid, sig)
421        # The process may have died before we could kill it.
422        except OSError:
423            pass
424
425        for i in range(5):
426            rc = subproc.poll()
427            if rc != None:
428                return rc
429            time.sleep(1)
430
431
432def nuke_pid(pid):
433    # the process has not terminated within timeout,
434    # kill it via an escalating series of signals.
435    signal_queue = [signal.SIGTERM, signal.SIGKILL]
436    for sig in signal_queue:
437        try:
438            os.kill(pid, sig)
439
440        # The process may have died before we could kill it.
441        except OSError:
442            pass
443
444        try:
445            for i in range(5):
446                status = os.waitpid(pid, os.WNOHANG)[0]
447                if status == pid:
448                    return
449                time.sleep(1)
450
451            if status != pid:
452                raise error.AutoservRunError('Could not kill %d'
453                        % pid, None)
454
455        # the process died before we join it.
456        except OSError:
457            pass
458
459
460
461def system(command, timeout=None, ignore_status=False):
462    """This function returns the exit status of command."""
463    return run(command, timeout, ignore_status,
464               stdout_tee=sys.stdout, stderr_tee=sys.stderr).exit_status
465
466
467def system_parallel(commands, timeout=None, ignore_status=False):
468    """This function returns a list of exit statuses for the respective
469    list of commands."""
470    return [bg_jobs.exit_status for bg_jobs in
471            run_parallel(commands, timeout, ignore_status,
472                         stdout_tee=sys.stdout, stderr_tee=sys.stderr)]
473
474
475def system_output(command, timeout=None, ignore_status=False,
476                  retain_output=False):
477    if retain_output:
478        out = run(command, timeout, ignore_status,
479                  stdout_tee=sys.stdout, stderr_tee=sys.stderr).stdout
480    else:
481        out = run(command, timeout, ignore_status).stdout
482    if out[-1:] == '\n': out = out[:-1]
483    return out
484
485
486def system_output_parallel(commands, timeout=None, ignore_status=False,
487                           retain_output=False):
488    if retain_output:
489        out = [bg_job.stdout for bg_job in run_parallel(commands, timeout,
490                                                        ignore_status,
491                                                        stdout_tee=sys.stdout,
492                                                        stderr_tee=sys.stderr)]
493    else:
494        out = [bg_job.stdout for bg_job in run_parallel(commands, timeout,
495                                                        ignore_status)]
496    for x in out:
497        if out[-1:] == '\n': out = out[:-1]
498    return out
499
500
501def get_cpu_percentage(function, *args, **dargs):
502    """Returns a tuple containing the CPU% and return value from function call.
503
504    This function calculates the usage time by taking the difference of
505    the user and system times both before and after the function call.
506    """
507    child_pre = resource.getrusage(resource.RUSAGE_CHILDREN)
508    self_pre = resource.getrusage(resource.RUSAGE_SELF)
509    start = time.time()
510    to_return = function(*args, **dargs)
511    elapsed = time.time() - start
512    self_post = resource.getrusage(resource.RUSAGE_SELF)
513    child_post = resource.getrusage(resource.RUSAGE_CHILDREN)
514
515    # Calculate CPU Percentage
516    s_user, s_system = [a - b for a, b in zip(self_post, self_pre)[:2]]
517    c_user, c_system = [a - b for a, b in zip(child_post, child_pre)[:2]]
518    cpu_percent = (s_user + c_user + s_system + c_system) / elapsed
519
520    return cpu_percent, to_return
521
522
523"""
524This function is used when there is a need to run more than one
525job simultaneously starting exactly at the same time. It basically returns
526a modified control file (containing the synchronization code prepended)
527whenever it is ready to run the control file. The synchronization
528is done using barriers to make sure that the jobs start at the same time.
529
530Here is how the synchronization is done to make sure that the tests
531start at exactly the same time on the client.
532sc_bar is a server barrier and s_bar, c_bar are the normal barriers
533
534                  Job1              Job2         ......      JobN
535 Server:   |                        sc_bar
536 Server:   |                        s_bar        ......      s_bar
537 Server:   |      at.run()         at.run()      ......      at.run()
538 ----------|------------------------------------------------------
539 Client    |      sc_bar
540 Client    |      c_bar             c_bar        ......      c_bar
541 Client    |    <run test>         <run test>    ......     <run test>
542
543
544PARAMS:
545   control_file : The control file which to which the above synchronization
546                  code would be prepended to
547   host_name    : The host name on which the job is going to run
548   host_num (non negative) : A number to identify the machine so that we have
549                  different sets of s_bar_ports for each of the machines.
550   instance     : The number of the job
551   num_jobs     : Total number of jobs that are going to run in parallel with
552                  this job starting at the same time
553   port_base    : Port number that is used to derive the actual barrier ports.
554
555RETURN VALUE:
556    The modified control file.
557
558"""
559def get_sync_control_file(control, host_name, host_num,
560                          instance, num_jobs, port_base=63100):
561    sc_bar_port = port_base
562    c_bar_port = port_base
563    if host_num < 0:
564        print "Please provide a non negative number for the host"
565        return None
566    s_bar_port = port_base + 1 + host_num # The set of s_bar_ports are
567                                          # the same for a given machine
568
569    sc_bar_timeout = 180
570    s_bar_timeout = c_bar_timeout = 120
571
572    # The barrier code snippet is prepended into the conrol file
573    # dynamically before at.run() is called finally.
574    control_new = []
575
576    # jobid is the unique name used to identify the processes
577    # trying to reach the barriers
578    jobid = "%s#%d" % (host_name, instance)
579
580    rendv = []
581    # rendvstr is a temp holder for the rendezvous list of the processes
582    for n in range(num_jobs):
583        rendv.append("'%s#%d'" % (host_name, n))
584    rendvstr = ",".join(rendv)
585
586    if instance == 0:
587        # Do the setup and wait at the server barrier
588        # Clean up the tmp and the control dirs for the first instance
589        control_new.append('if os.path.exists(job.tmpdir):')
590        control_new.append("\t system('umount -f %s > /dev/null"
591                           "2> /dev/null' % job.tmpdir,"
592                           "ignore_status=True)")
593        control_new.append("\t system('rm -rf ' + job.tmpdir)")
594        control_new.append(
595            'b0 = job.barrier("%s", "sc_bar", %d, port=%d)'
596            % (jobid, sc_bar_timeout, sc_bar_port))
597        control_new.append(
598        'b0.rendevous_servers("PARALLEL_MASTER", "%s")'
599         % jobid)
600
601    elif instance == 1:
602        # Wait at the server barrier to wait for instance=0
603        # process to complete setup
604        b0 = barrier.barrier("PARALLEL_MASTER", "sc_bar", sc_bar_timeout,
605                     port=sc_bar_port)
606        b0.rendevous_servers("PARALLEL_MASTER", jobid)
607
608        if(num_jobs > 2):
609            b1 = barrier.barrier(jobid, "s_bar", s_bar_timeout,
610                         port=s_bar_port)
611            b1.rendevous(rendvstr)
612
613    else:
614        # For the rest of the clients
615        b2 = barrier.barrier(jobid, "s_bar", s_bar_timeout, port=s_bar_port)
616        b2.rendevous(rendvstr)
617
618    # Client side barrier for all the tests to start at the same time
619    control_new.append('b1 = job.barrier("%s", "c_bar", %d, port=%d)'
620                    % (jobid, c_bar_timeout, c_bar_port))
621    control_new.append("b1.rendevous(%s)" % rendvstr)
622
623    # Stick in the rest of the control file
624    control_new.append(control)
625
626    return "\n".join(control_new)
627
628
629def get_arch(run_function=run):
630    """
631    Get the hardware architecture of the machine.
632    run_function is used to execute the commands. It defaults to
633    utils.run() but a custom method (if provided) should be of the
634    same schema as utils.run. It should return a CmdResult object and
635    throw a CmdError exception.
636    """
637    arch = run_function('/bin/uname -m').stdout.rstrip()
638    if re.match(r'i\d86$', arch):
639        arch = 'i386'
640    return arch
641
642
643class CmdResult(object):
644    """
645    Command execution result.
646
647    command:     String containing the command line itself
648    exit_status: Integer exit code of the process
649    stdout:      String containing stdout of the process
650    stderr:      String containing stderr of the process
651    duration:    Elapsed wall clock time running the process
652    """
653
654
655    def __init__(self, command=None, stdout="", stderr="",
656                 exit_status=None, duration=0):
657        self.command = command
658        self.exit_status = exit_status
659        self.stdout = stdout
660        self.stderr = stderr
661        self.duration = duration
662
663
664    def __repr__(self):
665        wrapper = textwrap.TextWrapper(width = 78,
666                                       initial_indent="\n    ",
667                                       subsequent_indent="    ")
668
669        stdout = self.stdout.rstrip()
670        if stdout:
671            stdout = "\nstdout:\n%s" % stdout
672
673        stderr = self.stderr.rstrip()
674        if stderr:
675            stderr = "\nstderr:\n%s" % stderr
676
677        return ("* Command: %s\n"
678                "Exit status: %s\n"
679                "Duration: %s\n"
680                "%s"
681                "%s"
682                % (wrapper.fill(self.command), self.exit_status,
683                self.duration, stdout, stderr))
684
685
686class run_randomly:
687    def __init__(self, run_sequentially=False):
688        # Run sequentially is for debugging control files
689        self.test_list = []
690        self.run_sequentially = run_sequentially
691
692
693    def add(self, *args, **dargs):
694        test = (args, dargs)
695        self.test_list.append(test)
696
697
698    def run(self, fn):
699        while self.test_list:
700            test_index = random.randint(0, len(self.test_list)-1)
701            if self.run_sequentially:
702                test_index = 0
703            (args, dargs) = self.test_list.pop(test_index)
704            fn(*args, **dargs)
705