base_job.py revision a087eaedccf3bf2a89808f22254d282bd7147f8c
1import os, copy, logging, errno, cPickle as pickle, fcntl
2
3from autotest_lib.client.common_lib import autotemp, error
4
5
6class job_directory(object):
7    """Represents a job.*dir directory."""
8
9
10    class JobDirectoryException(error.AutotestError):
11        """Generic job_directory exception superclass."""
12
13
14    class MissingDirectoryException(JobDirectoryException):
15        """Raised when a directory required by the job does not exist."""
16        def __init__(self, path):
17            Exception.__init__(self, 'Directory %s does not exist' % path)
18
19
20    class UncreatableDirectoryException(JobDirectoryException):
21        """Raised when a directory required by the job is missing and cannot
22        be created."""
23        def __init__(self, path, error):
24            msg = 'Creation of directory %s failed with exception %s'
25            msg %= (path, error)
26            Exception.__init__(self, msg)
27
28
29    class UnwritableDirectoryException(JobDirectoryException):
30        """Raised when a writable directory required by the job exists
31        but is not writable."""
32        def __init__(self, path):
33            msg = 'Directory %s exists but is not writable' % path
34            Exception.__init__(self, msg)
35
36
37    def __init__(self, path, is_writable=False):
38        """
39        Instantiate a job directory.
40
41        @param path The path of the directory. If None a temporary directory
42            will be created instead.
43        @param is_writable If True, expect the directory to be writable.
44
45        @raises MissingDirectoryException raised if is_writable=False and the
46            directory does not exist.
47        @raises UnwritableDirectoryException raised if is_writable=True and
48            the directory exists but is not writable.
49        @raises UncreatableDirectoryException raised if is_writable=True, the
50            directory does not exist and it cannot be created.
51        """
52        if path is None:
53            if is_writable:
54                self._tempdir = autotemp.tempdir(unique_id='autotest')
55                self.path = self._tempdir.name
56            else:
57                raise self.MissingDirectoryException(path)
58        else:
59            self._tempdir = None
60            self.path = path
61        self._ensure_valid(is_writable)
62
63
64    def _ensure_valid(self, is_writable):
65        """
66        Ensure that this is a valid directory.
67
68        Will check if a directory exists, can optionally also enforce that
69        it be writable. It can optionally create it if necessary. Creation
70        will still fail if the path is rooted in a non-writable directory, or
71        if a file already exists at the given location.
72
73        @param dir_path A path where a directory should be located
74        @param is_writable A boolean indicating that the directory should
75            not only exist, but also be writable.
76
77        @raises MissingDirectoryException raised if is_writable=False and the
78            directory does not exist.
79        @raises UnwritableDirectoryException raised if is_writable=True and
80            the directory is not wrtiable.
81        @raises UncreatableDirectoryException raised if is_writable=True, the
82            directory does not exist and it cannot be created
83        """
84        # ensure the directory exists
85        if is_writable:
86            try:
87                os.makedirs(self.path)
88            except OSError, e:
89                if e.errno != errno.EEXIST or not os.path.isdir(self.path):
90                    raise self.UncreatableDirectoryException(self.path, e)
91        elif not os.path.isdir(self.path):
92            raise self.MissingDirectoryException(self.path)
93
94        # if is_writable=True, also check that the directory is writable
95        if is_writable and not os.access(self.path, os.W_OK):
96            raise self.UnwritableDirectoryException(self.path)
97
98
99    @staticmethod
100    def property_factory(attribute):
101        """
102        Create a job.*dir -> job._*dir.path property accessor.
103
104        @param attribute A string with the name of the attribute this is
105            exposed as. '_'+attribute must then be attribute that holds
106            either None or a job_directory-like object.
107
108        @returns A read-only property object that exposes a job_directory path
109        """
110        @property
111        def dir_property(self):
112            underlying_attribute = getattr(self, '_' + attribute)
113            if underlying_attribute is None:
114                return None
115            else:
116                return underlying_attribute.path
117        return dir_property
118
119
120# decorator for use with job_state methods
121def with_backing_lock(method):
122    """A decorator to perform a lock-*-unlock cycle.
123
124    When applied to a method, this decorator will automatically wrap
125    calls to the method in a backing file lock and before the call
126    followed by a backing file unlock.
127    """
128    def wrapped_method(self, *args, **dargs):
129        already_have_lock = self._backing_file_lock is not None
130        if not already_have_lock:
131            self._lock_backing_file()
132        try:
133            return method(self, *args, **dargs)
134        finally:
135            if not already_have_lock:
136                self._unlock_backing_file()
137    wrapped_method.__name__ = method.__name__
138    wrapped_method.__doc__ = method.__doc__
139    return wrapped_method
140
141
142# decorator for use with job_state methods
143def with_backing_file(method):
144    """A decorator to perform a lock-read-*-write-unlock cycle.
145
146    When applied to a method, this decorator will automatically wrap
147    calls to the method in a lock-and-read before the call followed by a
148    write-and-unlock. Any operation that is reading or writing state
149    should be decorated with this method to ensure that backing file
150    state is consistently maintained.
151    """
152    @with_backing_lock
153    def wrapped_method(self, *args, **dargs):
154        self._read_from_backing_file()
155        try:
156            return method(self, *args, **dargs)
157        finally:
158            self._write_to_backing_file()
159    wrapped_method.__name__ = method.__name__
160    wrapped_method.__doc__ = method.__doc__
161    return wrapped_method
162
163
164
165class job_state(object):
166    """A class for managing explicit job and user state, optionally persistent.
167
168    The class allows you to save state by name (like a dictionary). Any state
169    stored in this class should be picklable and deep copyable. While this is
170    not enforced it is recommended that only valid python identifiers be used
171    as names. Additionally, the namespace 'stateful_property' is used for
172    storing the valued associated with properties constructed using the
173    property_factory method.
174    """
175
176    NO_DEFAULT = object()
177    PICKLE_PROTOCOL = 2  # highest protocol available in python 2.4
178
179
180    def __init__(self):
181        """Initialize the job state."""
182        self._state = {}
183        self._backing_file = None
184        self._backing_file_initialized = False
185        self._backing_file_lock = None
186
187
188    def _lock_backing_file(self):
189        """Acquire a lock on the backing file."""
190        if self._backing_file:
191            self._backing_file_lock = open(self._backing_file, 'a')
192            fcntl.flock(self._backing_file_lock, fcntl.LOCK_EX)
193
194
195    def _unlock_backing_file(self):
196        """Release a lock on the backing file."""
197        if self._backing_file_lock:
198            fcntl.flock(self._backing_file_lock, fcntl.LOCK_UN)
199            self._backing_file_lock.close()
200            self._backing_file_lock = None
201
202
203    def read_from_file(self, file_path, merge=True):
204        """Read in any state from the file at file_path.
205
206        When merge=True, any state specified only in-memory will be preserved.
207        Any state specified on-disk will be set in-memory, even if an in-memory
208        setting already exists.
209
210        @param file_path The path where the state should be read from. It must
211            exist but it can be empty.
212        @param merge If true, merge the on-disk state with the in-memory
213            state. If false, replace the in-memory state with the on-disk
214            state.
215
216        @warning This method is intentionally concurrency-unsafe. It makes no
217            attempt to control concurrent access to the file at file_path.
218        """
219
220        # we can assume that the file exists
221        if os.path.getsize(file_path) == 0:
222            on_disk_state = {}
223        else:
224            on_disk_state = pickle.load(open(file_path))
225
226        if merge:
227            # merge the on-disk state with the in-memory state
228            for namespace, namespace_dict in on_disk_state.iteritems():
229                in_memory_namespace = self._state.setdefault(namespace, {})
230                for name, value in namespace_dict.iteritems():
231                    if name in in_memory_namespace:
232                        if in_memory_namespace[name] != value:
233                            logging.info('Persistent value of %s.%s from %s '
234                                         'overridding existing in-memory '
235                                         'value', namespace, name, file_path)
236                            in_memory_namespace[name] = value
237                        else:
238                            logging.debug('Value of %s.%s is unchanged, '
239                                          'skipping import', namespace, name)
240                    else:
241                        logging.debug('Importing %s.%s from state file %s',
242                                      namespace, name, file_path)
243                        in_memory_namespace[name] = value
244        else:
245            # just replace the in-memory state with the on-disk state
246            self._state = on_disk_state
247            logging.debug('Replacing in-memory state with on-disk state '
248                          'from %s', file_path)
249
250        # lock the backing file before we refresh it
251        with_backing_lock(self.__class__._write_to_backing_file)(self)
252
253
254    def write_to_file(self, file_path):
255        """Write out the current state to the given path.
256
257        @param file_path The path where the state should be written out to.
258            Must be writable.
259
260        @warning This method is intentionally concurrency-unsafe. It makes no
261            attempt to control concurrent access to the file at file_path.
262        """
263        outfile = open(file_path, 'w')
264        try:
265            pickle.dump(self._state, outfile, self.PICKLE_PROTOCOL)
266        finally:
267            outfile.close()
268        logging.debug('Persistent state flushed to %s', file_path)
269
270
271    def _read_from_backing_file(self):
272        """Refresh the current state from the backing file.
273
274        If the backing file has never been read before (indicated by checking
275        self._backing_file_initialized) it will merge the file with the
276        in-memory state, rather than overwriting it.
277        """
278        if self._backing_file:
279            merge_backing_file = not self._backing_file_initialized
280            self.read_from_file(self._backing_file, merge=merge_backing_file)
281            self._backing_file_initialized = True
282
283
284    def _write_to_backing_file(self):
285        """Flush the current state to the backing file."""
286        if self._backing_file:
287            self.write_to_file(self._backing_file)
288
289
290    @with_backing_file
291    def _synchronize_backing_file(self):
292        """Synchronizes the contents of the in-memory and on-disk state."""
293        # state is implicitly synchronized in _with_backing_file methods
294        pass
295
296
297    def set_backing_file(self, file_path):
298        """Change the path used as the backing file for the persistent state.
299
300        When a new backing file is specified if a file already exists then
301        its contents will be added into the current state, with conflicts
302        between the file and memory being resolved in favor of the file
303        contents. The file will then be kept in sync with the (combined)
304        in-memory state. The syncing can be disabled by setting this to None.
305
306        @param file_path A path on the filesystem that can be read from and
307            written to, or None to turn off the backing store.
308        """
309        self._synchronize_backing_file()
310        self._backing_file = file_path
311        self._backing_file_initialized = False
312        self._synchronize_backing_file()
313
314
315    @with_backing_file
316    def get(self, namespace, name, default=NO_DEFAULT):
317        """Returns the value associated with a particular name.
318
319        @param namespace The namespace that the property should be stored in.
320        @param name The name the value was saved with.
321        @param default A default value to return if no state is currently
322            associated with var.
323
324        @returns A deep copy of the value associated with name. Note that this
325            explicitly returns a deep copy to avoid problems with mutable
326            values; mutations are not persisted or shared.
327        @raises KeyError raised when no state is associated with var and a
328            default value is not provided.
329        """
330        if self.has(namespace, name):
331            return copy.deepcopy(self._state[namespace][name])
332        elif default is self.NO_DEFAULT:
333            raise KeyError('No key %s in namespace %s' % (name, namespace))
334        else:
335            return default
336
337
338    @with_backing_file
339    def set(self, namespace, name, value):
340        """Saves the value given with the provided name.
341
342        @param namespace The namespace that the property should be stored in.
343        @param name The name the value should be saved with.
344        @param value The value to save.
345        """
346        namespace_dict = self._state.setdefault(namespace, {})
347        namespace_dict[name] = copy.deepcopy(value)
348        logging.debug('Persistent state %s.%s now set to %r', namespace,
349                      name, value)
350
351
352    @with_backing_file
353    def has(self, namespace, name):
354        """Return a boolean indicating if namespace.name is defined.
355
356        @param namespace The namespace to check for a definition.
357        @param name The name to check for a definition.
358
359        @returns True if the given name is defined in the given namespace and
360            False otherwise.
361        """
362        return namespace in self._state and name in self._state[namespace]
363
364
365    @with_backing_file
366    def discard(self, namespace, name):
367        """If namespace.name is a defined value, deletes it.
368
369        @param namespace The namespace that the property is stored in.
370        @param name The name the value is saved with.
371        """
372        if self.has(namespace, name):
373            del self._state[namespace][name]
374            if len(self._state[namespace]) == 0:
375                del self._state[namespace]
376            logging.debug('Persistent state %s.%s deleted', namespace, name)
377        else:
378            logging.debug(
379                'Persistent state %s.%s not defined so nothing is discarded',
380                namespace, name)
381
382
383    @with_backing_file
384    def discard_namespace(self, namespace):
385        """Delete all defined namespace.* names.
386
387        @param namespace The namespace to be cleared.
388        """
389        if namespace in self._state:
390            del self._state[namespace]
391        logging.debug('Persistent state %s.* deleted', namespace)
392
393
394    @staticmethod
395    def property_factory(state_attribute, property_attribute, default,
396                         namespace='global_properties'):
397        """
398        Create a property object for an attribute using self.get and self.set.
399
400        @param state_attribute A string with the name of the attribute on
401            job that contains the job_state instance.
402        @param property_attribute A string with the name of the attribute
403            this property is exposed as.
404        @param default A default value that should be used for this property
405            if it is not set.
406        @param namespace The namespace to store the attribute value in.
407
408        @returns A read-write property object that performs self.get calls
409            to read the value and self.set calls to set it.
410        """
411        def getter(job):
412            state = getattr(job, state_attribute)
413            return state.get(namespace, property_attribute, default)
414        def setter(job, value):
415            state = getattr(job, state_attribute)
416            state.set(namespace, property_attribute, value)
417        return property(getter, setter)
418
419
420class base_job(object):
421    """An abstract base class for the various autotest job classes.
422
423    Properties:
424        autodir
425            The top level autotest directory.
426        clientdir
427            The autotest client directory.
428        serverdir
429            The autotest server directory. [OPTIONAL]
430        resultdir
431            The directory where results should be written out. [WRITABLE]
432
433        pkgdir
434            The job packages directory. [WRITABLE]
435        tmpdir
436            The job temporary directory. [WRITABLE]
437        testdir
438            The job test directory. [WRITABLE]
439        site_testdir
440            The job site test directory. [WRITABLE]
441
442        bindir
443            The client bin/ directory.
444        configdir
445            The client config/ directory.
446        profdir
447            The client profilers/ directory.
448        toolsdir
449            The client tools/ directory.
450
451        conmuxdir
452            The conmux directory. [OPTIONAL]
453
454        control
455            A path to the control file to be executed. [OPTIONAL]
456        hosts
457            A set of all live Host objects currently in use by the job.
458            Code running in the context of a local client can safely assume
459            that this set contains only a single entry.
460        machines
461            A list of the machine names associated with the job.
462        user
463            The user executing the job.
464        tag
465            A tag identifying the job. Often used by the scheduler to give
466            a name of the form NUMBER-USERNAME/HOSTNAME.
467
468        last_boot_tag
469            The label of the kernel from the last reboot. [OPTIONAL,PERSISTENT]
470        automatic_test_tag
471            A string which, if set, will be automatically added to the test
472            name when running tests.
473
474        default_profile_only
475            A boolean indicating the default value of profile_only used
476            by test.execute. [PERSISTENT]
477        drop_caches
478            A boolean indicating if caches should be dropped before each
479            test is executed.
480        drop_caches_between_iterations
481            A boolean indicating if caches should be dropped before each
482            test iteration is executed.
483        run_test_cleanup
484            A boolean indicating if test.cleanup should be run by default
485            after a test completes, if the run_cleanup argument is not
486            specified. [PERSISTENT]
487
488        num_tests_run
489            The number of tests run during the job. [OPTIONAL]
490        num_tests_failed
491            The number of tests failed during the job. [OPTIONAL]
492
493        bootloader
494            An instance of the boottool class. May not be available on job
495            instances where access to the bootloader is not available
496            (e.g. on the server running a server job). [OPTIONAL]
497        harness
498            An instance of the client test harness. Only available in contexts
499            where client test execution happens. [OPTIONAL]
500        logging
501            An instance of the logging manager associated with the job.
502        profilers
503            An instance of the profiler manager associated with the job.
504        sysinfo
505            An instance of the sysinfo object. Only available in contexts
506            where it's possible to collect sysinfo.
507        warning_manager
508            A class for managing which types of WARN messages should be
509            logged and which should be supressed. [OPTIONAL]
510        warning_loggers
511            A set of readable streams that will be monitored for WARN messages
512            to be logged. [OPTIONAL]
513
514    Abstract methods:
515        _find_base_directories [CLASSMETHOD]
516            Returns the location of autodir, clientdir and serverdir
517
518        _find_resultdir
519            Returns the location of resultdir. Gets a copy of any parameters
520            passed into base_job.__init__. Can return None to indicate that
521            no resultdir is to be used.
522    """
523
524   # capture the dependency on several helper classes with factories
525    _job_directory = job_directory
526    _job_state = job_state
527
528
529    # all the job directory attributes
530    autodir = _job_directory.property_factory('autodir')
531    clientdir = _job_directory.property_factory('clientdir')
532    serverdir = _job_directory.property_factory('serverdir')
533    resultdir = _job_directory.property_factory('resultdir')
534    pkgdir = _job_directory.property_factory('pkgdir')
535    tmpdir = _job_directory.property_factory('tmpdir')
536    testdir = _job_directory.property_factory('testdir')
537    site_testdir = _job_directory.property_factory('site_testdir')
538    bindir = _job_directory.property_factory('bindir')
539    configdir = _job_directory.property_factory('configdir')
540    profdir = _job_directory.property_factory('profdir')
541    toolsdir = _job_directory.property_factory('toolsdir')
542    conmuxdir = _job_directory.property_factory('conmuxdir')
543
544
545    # all the generic persistent properties
546    tag = _job_state.property_factory('_state', 'tag', '')
547    default_profile_only = _job_state.property_factory(
548        '_state', 'default_profile_only', False)
549    run_test_cleanup = _job_state.property_factory(
550        '_state', 'run_test_cleanup', True)
551    last_boot_tag = _job_state.property_factory(
552        '_state', 'last_boot_tag', None)
553    automatic_test_tag = _job_state.property_factory(
554        '_state', 'automatic_test_tag', None)
555
556    # the use_sequence_number property
557    _sequence_number = _job_state.property_factory(
558        '_state', '_sequence_number', None)
559    def _get_use_sequence_number(self):
560        return bool(self._sequence_number)
561    def _set_use_sequence_number(self, value):
562        if value:
563            self._sequence_number = 1
564        else:
565            self._sequence_number = None
566    use_sequence_number = property(_get_use_sequence_number,
567                                   _set_use_sequence_number)
568
569
570    def __init__(self, *args, **dargs):
571        # initialize the base directories, all others are relative to these
572        autodir, clientdir, serverdir = self._find_base_directories()
573        self._autodir = self._job_directory(autodir)
574        self._clientdir = self._job_directory(clientdir)
575        if serverdir:
576            self._serverdir = self._job_directory(serverdir)
577        else:
578            self._serverdir = None
579
580        # initialize all the other directories relative to the base ones
581        self._initialize_dir_properties()
582        self._resultdir = self._job_directory(
583            self._find_resultdir(*args, **dargs), True)
584        self._execution_contexts = []
585
586        # initialize all the job state
587        self._state = self._job_state()
588
589
590    @classmethod
591    def _find_base_directories(cls):
592        raise NotImplementedError()
593
594
595    def _initialize_dir_properties(self):
596        """
597        Initializes all the secondary self.*dir properties. Requires autodir,
598        clientdir and serverdir to already be initialized.
599        """
600        # create some stubs for use as shortcuts
601        def readonly_dir(*args):
602            return self._job_directory(os.path.join(*args))
603        def readwrite_dir(*args):
604            return self._job_directory(os.path.join(*args), True)
605
606        # various client-specific directories
607        self._bindir = readonly_dir(self.clientdir, 'bin')
608        self._configdir = readonly_dir(self.clientdir, 'config')
609        self._profdir = readonly_dir(self.clientdir, 'profilers')
610        self._pkgdir = readwrite_dir(self.clientdir, 'packages')
611        self._toolsdir = readonly_dir(self.clientdir, 'tools')
612
613        # directories which are in serverdir on a server, clientdir on a client
614        if self.serverdir:
615            root = self.serverdir
616        else:
617            root = self.clientdir
618        self._tmpdir = readwrite_dir(root, 'tmp')
619        self._testdir = readwrite_dir(root, 'tests')
620        self._site_testdir = readwrite_dir(root, 'site_tests')
621
622        # various server-specific directories
623        if self.serverdir:
624            self._conmuxdir = readonly_dir(self.autodir, 'conmux')
625        else:
626            self._conmuxdir = None
627
628
629    def _find_resultdir(self, *args, **dargs):
630        raise NotImplementedError()
631
632
633    def push_execution_context(self, resultdir):
634        """
635        Save off the current context of the job and change to the given one.
636
637        In practice method just changes the resultdir, but it may become more
638        extensive in the future. The expected use case is for when a child
639        job needs to be executed in some sort of nested context (for example
640        the way parallel_simple does). The original context can be restored
641        with a pop_execution_context call.
642
643        @param resultdir The new resultdir, relative to the current one.
644        """
645        new_dir = self._job_directory(
646            os.path.join(self.resultdir, resultdir), True)
647        self._execution_contexts.append(self._resultdir)
648        self._resultdir = new_dir
649
650
651    def pop_execution_context(self):
652        """
653        Reverse the effects of the previous push_execution_context call.
654
655        @raises IndexError raised when the stack of contexts is empty.
656        """
657        if not self._execution_contexts:
658            raise IndexError('No old execution context to restore')
659        self._resultdir = self._execution_contexts.pop()
660
661
662    def get_state(self, name, default=_job_state.NO_DEFAULT):
663        """Returns the value associated with a particular name.
664
665        @param name The name the value was saved with.
666        @param default A default value to return if no state is currently
667            associated with var.
668
669        @returns A deep copy of the value associated with name. Note that this
670            explicitly returns a deep copy to avoid problems with mutable
671            values; mutations are not persisted or shared.
672        @raises KeyError raised when no state is associated with var and a
673            default value is not provided.
674        """
675        try:
676            return self._state.get('public', name, default=default)
677        except KeyError:
678            raise KeyError(name)
679
680
681    def set_state(self, name, value):
682        """Saves the value given with the provided name.
683
684        @param name The name the value should be saved with.
685        @param value The value to save.
686        """
687        self._state.set('public', name, value)
688
689
690    def _build_tagged_test_name(self, testname, dargs):
691        """Builds the fully tagged testname and subdirectory for job.run_test.
692
693        @param testname The base name of the test
694        @param dargs The ** arguments passed to run_test. And arguments
695            consumed by this method will be removed from the dictionary.
696
697        @returns A 3-tuple of the full name of the test, the subdirectory it
698            should be stored in, and the full tag of the subdir.
699        """
700        tag_parts = []
701
702        # build up the parts of the tag used for the test name
703        base_tag = dargs.pop('tag', None)
704        if base_tag:
705            tag_parts.append(str(base_tag))
706        if self.use_sequence_number:
707            tag_parts.append('_%02d_' % self._sequence_number)
708            self._sequence_number += 1
709        if self.automatic_test_tag:
710            tag_parts.append(self.automatic_test_tag)
711        full_testname = '.'.join([testname] + tag_parts)
712
713        # build up the subdir and tag as well
714        subdir_tag = dargs.pop('subdir_tag', None)
715        if subdir_tag:
716            tag_parts.append(subdir_tag)
717        subdir = '.'.join([testname] + tag_parts)
718        tag = '.'.join(tag_parts)
719
720        return full_testname, subdir, tag
721
722
723    def _make_test_outputdir(self, subdir):
724        """Creates an output directory for a test to run it.
725
726        @param subdir The subdirectory of the test. Generally computed by
727            _build_tagged_test_name.
728
729        @returns A job_directory instance corresponding to the outputdir of
730            the test.
731        @raises A TestError if the output directory is invalid.
732        """
733        # explicitly check that this subdirectory is new
734        path = os.path.join(self.resultdir, subdir)
735        if os.path.exists(path):
736            msg = ('%s already exists; multiple tests cannot run with the '
737                   'same subdirectory' % subdir)
738            raise error.TestError(msg)
739
740        # create the outputdir and raise a TestError if it isn't valid
741        try:
742            outputdir = self._job_directory(path, True)
743            return outputdir
744        except self._job_directory.JobDirectoryException, e:
745            logging.exception('%s directory creation failed with %s',
746                              subdir, e)
747            raise error.TestError('%s directory creation failed' % subdir)
748