utils.py revision e80d47159f8e43d6e0d736554e207833ec116dfa
1#!/usr/bin/python
2#
3# Copyright 2008 Google Inc. Released under the GPL v2
4
5import os, pickle, random, re, resource, select, shutil, signal, StringIO
6import socket, struct, subprocess, sys, time, textwrap, urllib, urlparse
7import warnings
8from autotest_lib.client.common_lib import error, barrier
9
10def deprecated(func):
11    """This is a decorator which can be used to mark functions as deprecated.
12    It will result in a warning being emmitted when the function is used."""
13    def new_func(*args, **dargs):
14        warnings.warn("Call to deprecated function %s." % func.__name__,
15                      category=DeprecationWarning)
16        return func(*args, **dargs)
17    new_func.__name__ = func.__name__
18    new_func.__doc__ = func.__doc__
19    new_func.__dict__.update(func.__dict__)
20    return new_func
21
22
23class BgJob(object):
24    def __init__(self, command, stdout_tee=None, stderr_tee=None, verbose=True):
25        self.command = command
26        self.stdout_tee = stdout_tee
27        self.stderr_tee = stderr_tee
28        self.result = CmdResult(command)
29        if verbose:
30            print "running: %s" % command
31        self.sp = subprocess.Popen(command, stdout=subprocess.PIPE,
32                                   stderr=subprocess.PIPE,
33                                   preexec_fn=self._reset_sigpipe, shell=True,
34                                   executable="/bin/bash")
35
36
37    def output_prepare(self, stdout_file=None, stderr_file=None):
38        self.stdout_file = stdout_file
39        self.stderr_file = stderr_file
40
41    def process_output(self, stdout=True, final_read=False):
42        """output_prepare must be called prior to calling this"""
43        if stdout:
44            pipe, buf, tee = self.sp.stdout, self.stdout_file, self.stdout_tee
45        else:
46            pipe, buf, tee = self.sp.stderr, self.stderr_file, self.stderr_tee
47
48        if final_read:
49            # read in all the data we can from pipe and then stop
50            data = []
51            while select.select([pipe], [], [], 0)[0]:
52                data.append(os.read(pipe.fileno(), 1024))
53                if len(data[-1]) == 0:
54                    break
55            data = "".join(data)
56        else:
57            # perform a single read
58            data = os.read(pipe.fileno(), 1024)
59        buf.write(data)
60        if tee:
61            tee.write(data)
62            tee.flush()
63
64
65    def cleanup(self):
66        self.sp.stdout.close()
67        self.sp.stderr.close()
68        self.result.stdout = self.stdout_file.getvalue()
69        self.result.stderr = self.stderr_file.getvalue()
70
71
72    def _reset_sigpipe(self):
73        signal.signal(signal.SIGPIPE, signal.SIG_DFL)
74
75
76def ip_to_long(ip):
77    # !L is a long in network byte order
78    return struct.unpack('!L', socket.inet_aton(ip))[0]
79
80
81def long_to_ip(number):
82    # See above comment.
83    return socket.inet_ntoa(struct.pack('!L', number))
84
85
86def create_subnet_mask(bits):
87    # ~ does weird things in python...but this does work
88    return (1 << 32) - (1 << 32-bits)
89
90
91def format_ip_with_mask(ip, mask_bits):
92    masked_ip = ip_to_long(ip) & create_subnet_mask(mask_bits)
93    return "%s/%s" % (long_to_ip(masked_ip), mask_bits)
94
95
96def normalize_hostname(alias):
97    ip = socket.gethostbyname(alias)
98    return socket.gethostbyaddr(ip)[0]
99
100
101def get_ip_local_port_range():
102    match = re.match(r'\s*(\d+)\s*(\d+)\s*$',
103                     read_one_line('/proc/sys/net/ipv4/ip_local_port_range'))
104    return (int(match.group(1)), int(match.group(2)))
105
106
107def set_ip_local_port_range(lower, upper):
108    write_one_line('/proc/sys/net/ipv4/ip_local_port_range',
109                   '%d %d\n' % (lower, upper))
110
111
112def read_one_line(filename):
113    return open(filename, 'r').readline().rstrip('\n')
114
115
116def write_one_line(filename, str):
117    open(filename, 'w').write(str.rstrip('\n') + '\n')
118
119
120def read_keyval(path):
121    """
122    Read a key-value pair format file into a dictionary, and return it.
123    Takes either a filename or directory name as input. If it's a
124    directory name, we assume you want the file to be called keyval.
125    """
126    if os.path.isdir(path):
127        path = os.path.join(path, 'keyval')
128    keyval = {}
129    for line in open(path):
130        line = re.sub('#.*', '', line).rstrip()
131        if not re.search(r'^[-\w]+=', line):
132            raise ValueError('Invalid format line: %s' % line)
133        key, value = line.split('=', 1)
134        if re.search('^\d+$', value):
135            value = int(value)
136        elif re.search('^(\d+\.)?\d+$', value):
137            value = float(value)
138        keyval[key] = value
139    return keyval
140
141
142def write_keyval(path, dictionary, type_tag=None):
143    """
144    Write a key-value pair format file out to a file. This uses append
145    mode to open the file, so existing text will not be overwritten or
146    reparsed.
147
148    If type_tag is None, then the key must be composed of alphanumeric
149    characters (or dashes+underscores). However, if type-tag is not
150    null then the keys must also have "{type_tag}" as a suffix. At
151    the moment the only valid values of type_tag are "attr" and "perf".
152    """
153    if os.path.isdir(path):
154        path = os.path.join(path, 'keyval')
155    keyval = open(path, 'a')
156
157    if type_tag is None:
158        key_regex = re.compile(r'^[-\w]+$')
159    else:
160        if type_tag not in ('attr', 'perf'):
161            raise ValueError('Invalid type tag: %s' % type_tag)
162        escaped_tag = re.escape(type_tag)
163        key_regex = re.compile(r'^[-\w]+\{%s\}$' % escaped_tag)
164    try:
165        for key, value in dictionary.iteritems():
166            if not key_regex.search(key):
167                raise ValueError('Invalid key: %s' % key)
168            keyval.write('%s=%s\n' % (key, value))
169    finally:
170        keyval.close()
171
172
173def is_url(path):
174    """Return true if path looks like a URL"""
175    # for now, just handle http and ftp
176    url_parts = urlparse.urlparse(path)
177    return (url_parts[0] in ('http', 'ftp'))
178
179
180def urlopen(url, data=None, proxies=None, timeout=5):
181    """Wrapper to urllib.urlopen with timeout addition."""
182
183    # Save old timeout
184    old_timeout = socket.getdefaulttimeout()
185    socket.setdefaulttimeout(timeout)
186    try:
187        return urllib.urlopen(url, data=data, proxies=proxies)
188    finally:
189        socket.setdefaulttimeout(old_timeout)
190
191
192def urlretrieve(url, filename=None, reporthook=None, data=None, timeout=300):
193    """Wrapper to urllib.urlretrieve with timeout addition."""
194    old_timeout = socket.getdefaulttimeout()
195    socket.setdefaulttimeout(timeout)
196    try:
197        return urllib.urlretrieve(url, filename=filename,
198                                  reporthook=reporthook, data=data)
199    finally:
200        socket.setdefaulttimeout(old_timeout)
201
202
203def get_file(src, dest, permissions=None):
204    """Get a file from src, which can be local or a remote URL"""
205    if (src == dest):
206        return
207    if (is_url(src)):
208        print 'PWD: ' + os.getcwd()
209        print 'Fetching \n\t', src, '\n\t->', dest
210        try:
211            urllib.urlretrieve(src, dest)
212        except IOError, e:
213            raise error.AutotestError('Unable to retrieve %s (to %s)'
214                                % (src, dest), e)
215    else:
216        shutil.copyfile(src, dest)
217    if permissions:
218        os.chmod(dest, permissions)
219    return dest
220
221
222def unmap_url(srcdir, src, destdir='.'):
223    """
224    Receives either a path to a local file or a URL.
225    returns either the path to the local file, or the fetched URL
226
227    unmap_url('/usr/src', 'foo.tar', '/tmp')
228                            = '/usr/src/foo.tar'
229    unmap_url('/usr/src', 'http://site/file', '/tmp')
230                            = '/tmp/file'
231                            (after retrieving it)
232    """
233    if is_url(src):
234        url_parts = urlparse.urlparse(src)
235        filename = os.path.basename(url_parts[2])
236        dest = os.path.join(destdir, filename)
237        return get_file(src, dest)
238    else:
239        return os.path.join(srcdir, src)
240
241
242def update_version(srcdir, preserve_srcdir, new_version, install,
243                   *args, **dargs):
244    """
245    Make sure srcdir is version new_version
246
247    If not, delete it and install() the new version.
248
249    In the preserve_srcdir case, we just check it's up to date,
250    and if not, we rerun install, without removing srcdir
251    """
252    versionfile = os.path.join(srcdir, '.version')
253    install_needed = True
254
255    if os.path.exists(versionfile):
256        old_version = pickle.load(open(versionfile))
257        if old_version == new_version:
258            install_needed = False
259
260    if install_needed:
261        if not preserve_srcdir and os.path.exists(srcdir):
262            shutil.rmtree(srcdir)
263        install(*args, **dargs)
264        if os.path.exists(srcdir):
265            pickle.dump(new_version, open(versionfile, 'w'))
266
267
268def run(command, timeout=None, ignore_status=False,
269        stdout_tee=None, stderr_tee=None, verbose=True):
270    """
271    Run a command on the host.
272
273    Args:
274            command: the command line string
275            timeout: time limit in seconds before attempting to
276                    kill the running process. The run() function
277                    will take a few seconds longer than 'timeout'
278                    to complete if it has to kill the process.
279            ignore_status: do not raise an exception, no matter what
280                    the exit code of the command is.
281            stdout_tee: optional file-like object to which stdout data
282                        will be written as it is generated (data will still
283                        be stored in result.stdout)
284            stderr_tee: likewise for stderr
285
286    Returns:
287            a CmdResult object
288
289    Raises:
290            CmdError: the exit code of the command
291                    execution was not 0
292    """
293    bg_job = join_bg_jobs((BgJob(command, stdout_tee, stderr_tee, verbose),),
294                          timeout)[0]
295    if not ignore_status and bg_job.result.exit_status:
296        raise error.CmdError(command, bg_job.result,
297                             "Command returned non-zero exit status")
298
299    return bg_job.result
300
301def run_parallel(commands, timeout=None, ignore_status=False,
302                 stdout_tee=None, stderr_tee=None):
303    """Beahves the same as run with the following exceptions:
304
305    - commands is a list of commands to run in parallel.
306    - ignore_status toggles whether or not an exception should be raised
307      on any error.
308
309    returns a list of CmdResult objects
310    """
311    bg_jobs = []
312    for command in commands:
313        bg_jobs.append(BgJob(command, stdout_tee, stderr_tee))
314
315    # Updates objects in bg_jobs list with their process information
316    join_bg_jobs(bg_jobs, timeout)
317
318    for bg_job in bg_jobs:
319        if not ignore_status and bg_job.result.exit_status:
320            raise error.CmdError(command, bg_job.result,
321                                 "Command returned non-zero exit status")
322
323    return [bg_job.result for bg_job in bg_jobs]
324
325
326@deprecated
327def run_bg(command):
328    """Function deprecated. Please use BgJob class instead."""
329    bg_job = BgJob(command)
330    return bg_job.sp, bg_job.result
331
332
333def join_bg_jobs(bg_jobs, timeout=None):
334    """Joins the bg_jobs with the current thread.
335
336    Returns the same list of bg_jobs objects that was passed in.
337    """
338    ret, timeouterr = 0, False
339    for bg_job in bg_jobs:
340        bg_job.output_prepare(StringIO.StringIO(), StringIO.StringIO())
341
342    try:
343        # We are holding ends to stdin, stdout pipes
344        # hence we need to be sure to close those fds no mater what
345        start_time = time.time()
346        timeout_error = _wait_for_commands(bg_jobs, start_time, timeout)
347
348        for bg_job in bg_jobs:
349            # Process stdout and stderr
350            bg_job.process_output(stdout=True,final_read=True)
351            bg_job.process_output(stdout=False,final_read=True)
352    finally:
353        # close our ends of the pipes to the sp no matter what
354        for bg_job in bg_jobs:
355            bg_job.cleanup()
356
357    if timeout_error:
358        # TODO: This needs to be fixed to better represent what happens when
359        # running in parallel. However this is backwards compatable, so it will
360        # do for the time being.
361        raise error.CmdError(bg_jobs[0].command, bg_jobs[0].result,
362                             "Command(s) did not complete within %d seconds"
363                             % timeout)
364
365
366    return bg_jobs
367
368
369def _wait_for_commands(bg_jobs, start_time, timeout):
370    # This returns True if it must return due to a timeout, otherwise False.
371
372    # To check for processes which terminate without producing any output
373    # a 1 second timeout is used in select.
374    SELECT_TIMEOUT = 1
375
376    select_list = []
377    reverse_dict = {}
378    for bg_job in bg_jobs:
379        select_list.append(bg_job.sp.stdout)
380        select_list.append(bg_job.sp.stderr)
381        reverse_dict[bg_job.sp.stdout] = (bg_job,True)
382        reverse_dict[bg_job.sp.stderr] = (bg_job,False)
383
384    if timeout:
385        stop_time = start_time + timeout
386        time_left = stop_time - time.time()
387    else:
388        time_left = None # so that select never times out
389    while not timeout or time_left > 0:
390        # select will return when stdout is ready (including when it is
391        # EOF, that is the process has terminated).
392        ready, _, _ = select.select(select_list, [], [], SELECT_TIMEOUT)
393
394        # os.read() has to be used instead of
395        # subproc.stdout.read() which will otherwise block
396        for fileno in ready:
397            bg_job,stdout = reverse_dict[fileno]
398            bg_job.process_output(stdout)
399
400        remaining_jobs = [x for x in bg_jobs if x.result.exit_status is None]
401        if len(remaining_jobs) == 0:
402            return False
403        for bg_job in remaining_jobs:
404            bg_job.result.exit_status = bg_job.sp.poll()
405
406        if timeout:
407            time_left = stop_time - time.time()
408
409    # Kill all processes which did not complete prior to timeout
410    for bg_job in [x for x in bg_jobs if x.result.exit_status is None]:
411        nuke_subprocess(bg_job.sp)
412        bg_job.result.exit_status = bg_job.sp.poll()
413
414    return True
415
416
417def nuke_subprocess(subproc):
418    # check if the subprocess is still alive, first
419    if subproc.poll() is not None:
420        return subproc.poll()
421
422    # the process has not terminated within timeout,
423    # kill it via an escalating series of signals.
424    signal_queue = [signal.SIGTERM, signal.SIGKILL]
425    for sig in signal_queue:
426        try:
427            os.kill(subproc.pid, sig)
428        # The process may have died before we could kill it.
429        except OSError:
430            pass
431
432        for i in range(5):
433            rc = subproc.poll()
434            if rc != None:
435                return rc
436            time.sleep(1)
437
438
439def nuke_pid(pid):
440    # the process has not terminated within timeout,
441    # kill it via an escalating series of signals.
442    signal_queue = [signal.SIGTERM, signal.SIGKILL]
443    for sig in signal_queue:
444        try:
445            os.kill(pid, sig)
446
447        # The process may have died before we could kill it.
448        except OSError:
449            pass
450
451        try:
452            for i in range(5):
453                status = os.waitpid(pid, os.WNOHANG)[0]
454                if status == pid:
455                    return
456                time.sleep(1)
457
458            if status != pid:
459                raise error.AutoservRunError('Could not kill %d'
460                        % pid, None)
461
462        # the process died before we join it.
463        except OSError:
464            pass
465
466
467
468def system(command, timeout=None, ignore_status=False):
469    """This function returns the exit status of command."""
470    return run(command, timeout, ignore_status,
471               stdout_tee=sys.stdout, stderr_tee=sys.stderr).exit_status
472
473
474def system_parallel(commands, timeout=None, ignore_status=False):
475    """This function returns a list of exit statuses for the respective
476    list of commands."""
477    return [bg_jobs.exit_status for bg_jobs in
478            run_parallel(commands, timeout, ignore_status,
479                         stdout_tee=sys.stdout, stderr_tee=sys.stderr)]
480
481
482def system_output(command, timeout=None, ignore_status=False,
483                  retain_output=False):
484    if retain_output:
485        out = run(command, timeout, ignore_status,
486                  stdout_tee=sys.stdout, stderr_tee=sys.stderr).stdout
487    else:
488        out = run(command, timeout, ignore_status).stdout
489    if out[-1:] == '\n': out = out[:-1]
490    return out
491
492
493def system_output_parallel(commands, timeout=None, ignore_status=False,
494                           retain_output=False):
495    if retain_output:
496        out = [bg_job.stdout for bg_job in run_parallel(commands, timeout,
497                                                        ignore_status,
498                                                        stdout_tee=sys.stdout,
499                                                        stderr_tee=sys.stderr)]
500    else:
501        out = [bg_job.stdout for bg_job in run_parallel(commands, timeout,
502                                                        ignore_status)]
503    for x in out:
504        if out[-1:] == '\n': out = out[:-1]
505    return out
506
507
508def get_cpu_percentage(function, *args, **dargs):
509    """Returns a tuple containing the CPU% and return value from function call.
510
511    This function calculates the usage time by taking the difference of
512    the user and system times both before and after the function call.
513    """
514    child_pre = resource.getrusage(resource.RUSAGE_CHILDREN)
515    self_pre = resource.getrusage(resource.RUSAGE_SELF)
516    start = time.time()
517    to_return = function(*args, **dargs)
518    elapsed = time.time() - start
519    self_post = resource.getrusage(resource.RUSAGE_SELF)
520    child_post = resource.getrusage(resource.RUSAGE_CHILDREN)
521
522    # Calculate CPU Percentage
523    s_user, s_system = [a - b for a, b in zip(self_post, self_pre)[:2]]
524    c_user, c_system = [a - b for a, b in zip(child_post, child_pre)[:2]]
525    cpu_percent = (s_user + c_user + s_system + c_system) / elapsed
526
527    return cpu_percent, to_return
528
529
530"""
531This function is used when there is a need to run more than one
532job simultaneously starting exactly at the same time. It basically returns
533a modified control file (containing the synchronization code prepended)
534whenever it is ready to run the control file. The synchronization
535is done using barriers to make sure that the jobs start at the same time.
536
537Here is how the synchronization is done to make sure that the tests
538start at exactly the same time on the client.
539sc_bar is a server barrier and s_bar, c_bar are the normal barriers
540
541                  Job1              Job2         ......      JobN
542 Server:   |                        sc_bar
543 Server:   |                        s_bar        ......      s_bar
544 Server:   |      at.run()         at.run()      ......      at.run()
545 ----------|------------------------------------------------------
546 Client    |      sc_bar
547 Client    |      c_bar             c_bar        ......      c_bar
548 Client    |    <run test>         <run test>    ......     <run test>
549
550
551PARAMS:
552   control_file : The control file which to which the above synchronization
553                  code would be prepended to
554   host_name    : The host name on which the job is going to run
555   host_num (non negative) : A number to identify the machine so that we have
556                  different sets of s_bar_ports for each of the machines.
557   instance     : The number of the job
558   num_jobs     : Total number of jobs that are going to run in parallel with
559                  this job starting at the same time
560   port_base    : Port number that is used to derive the actual barrier ports.
561
562RETURN VALUE:
563    The modified control file.
564
565"""
566def get_sync_control_file(control, host_name, host_num,
567                          instance, num_jobs, port_base=63100):
568    sc_bar_port = port_base
569    c_bar_port = port_base
570    if host_num < 0:
571        print "Please provide a non negative number for the host"
572        return None
573    s_bar_port = port_base + 1 + host_num # The set of s_bar_ports are
574                                          # the same for a given machine
575
576    sc_bar_timeout = 180
577    s_bar_timeout = c_bar_timeout = 120
578
579    # The barrier code snippet is prepended into the conrol file
580    # dynamically before at.run() is called finally.
581    control_new = []
582
583    # jobid is the unique name used to identify the processes
584    # trying to reach the barriers
585    jobid = "%s#%d" % (host_name, instance)
586
587    rendv = []
588    # rendvstr is a temp holder for the rendezvous list of the processes
589    for n in range(num_jobs):
590        rendv.append("'%s#%d'" % (host_name, n))
591    rendvstr = ",".join(rendv)
592
593    if instance == 0:
594        # Do the setup and wait at the server barrier
595        # Clean up the tmp and the control dirs for the first instance
596        control_new.append('if os.path.exists(job.tmpdir):')
597        control_new.append("\t system('umount -f %s > /dev/null"
598                           "2> /dev/null' % job.tmpdir,"
599                           "ignore_status=True)")
600        control_new.append("\t system('rm -rf ' + job.tmpdir)")
601        control_new.append(
602            'b0 = job.barrier("%s", "sc_bar", %d, port=%d)'
603            % (jobid, sc_bar_timeout, sc_bar_port))
604        control_new.append(
605        'b0.rendevous_servers("PARALLEL_MASTER", "%s")'
606         % jobid)
607
608    elif instance == 1:
609        # Wait at the server barrier to wait for instance=0
610        # process to complete setup
611        b0 = barrier.barrier("PARALLEL_MASTER", "sc_bar", sc_bar_timeout,
612                     port=sc_bar_port)
613        b0.rendevous_servers("PARALLEL_MASTER", jobid)
614
615        if(num_jobs > 2):
616            b1 = barrier.barrier(jobid, "s_bar", s_bar_timeout,
617                         port=s_bar_port)
618            b1.rendevous(rendvstr)
619
620    else:
621        # For the rest of the clients
622        b2 = barrier.barrier(jobid, "s_bar", s_bar_timeout, port=s_bar_port)
623        b2.rendevous(rendvstr)
624
625    # Client side barrier for all the tests to start at the same time
626    control_new.append('b1 = job.barrier("%s", "c_bar", %d, port=%d)'
627                    % (jobid, c_bar_timeout, c_bar_port))
628    control_new.append("b1.rendevous(%s)" % rendvstr)
629
630    # Stick in the rest of the control file
631    control_new.append(control)
632
633    return "\n".join(control_new)
634
635
636def get_arch(run_function=run):
637    """
638    Get the hardware architecture of the machine.
639    run_function is used to execute the commands. It defaults to
640    utils.run() but a custom method (if provided) should be of the
641    same schema as utils.run. It should return a CmdResult object and
642    throw a CmdError exception.
643    """
644    arch = run_function('/bin/uname -m').stdout.rstrip()
645    if re.match(r'i\d86$', arch):
646        arch = 'i386'
647    return arch
648
649
650class CmdResult(object):
651    """
652    Command execution result.
653
654    command:     String containing the command line itself
655    exit_status: Integer exit code of the process
656    stdout:      String containing stdout of the process
657    stderr:      String containing stderr of the process
658    duration:    Elapsed wall clock time running the process
659    """
660
661
662    def __init__(self, command=None, stdout="", stderr="",
663                 exit_status=None, duration=0):
664        self.command = command
665        self.exit_status = exit_status
666        self.stdout = stdout
667        self.stderr = stderr
668        self.duration = duration
669
670
671    def __repr__(self):
672        wrapper = textwrap.TextWrapper(width = 78,
673                                       initial_indent="\n    ",
674                                       subsequent_indent="    ")
675
676        stdout = self.stdout.rstrip()
677        if stdout:
678            stdout = "\nstdout:\n%s" % stdout
679
680        stderr = self.stderr.rstrip()
681        if stderr:
682            stderr = "\nstderr:\n%s" % stderr
683
684        return ("* Command: %s\n"
685                "Exit status: %s\n"
686                "Duration: %s\n"
687                "%s"
688                "%s"
689                % (wrapper.fill(self.command), self.exit_status,
690                self.duration, stdout, stderr))
691
692
693class run_randomly:
694    def __init__(self, run_sequentially=False):
695        # Run sequentially is for debugging control files
696        self.test_list = []
697        self.run_sequentially = run_sequentially
698
699
700    def add(self, *args, **dargs):
701        test = (args, dargs)
702        self.test_list.append(test)
703
704
705    def run(self, fn):
706        while self.test_list:
707            test_index = random.randint(0, len(self.test_list)-1)
708            if self.run_sequentially:
709                test_index = 0
710            (args, dargs) = self.test_list.pop(test_index)
711            fn(*args, **dargs)
712