1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging, random, re, string, traceback
6from autotest_lib.client.common_lib import error
7from autotest_lib.server import autotest
8from autotest_lib.server import hosts
9from autotest_lib.server import test
10
11class kernel_MemoryRamoop(test.test):
12    """
13    This test verify that /dev/pstore/console-ramoops is preserved correctly
14    after system reboot/kernel crash and also verify that there is no memory
15    corrupt in that log.
16
17    There is also platform_KernelErrorPaths test that test kernel crash. But
18    this test focuses on verify that kernel create console-ramoops file
19    correctly and its content is not corrupt. Contrary to the other test
20    that test at the bigger scope, i.e. the whole error reporting mechanism.
21    """
22    version = 1
23
24    _RAMOOP_PATH = '/dev/pstore/console-ramoops'
25    _KMSG_PATH = '/dev/kmsg'
26    _LKDTM_PATH = '/sys/kernel/debug/provoke-crash/DIRECT'
27
28    # ramopp have size of 128K so we will generate about 100K of random message
29    _MSG_LINE_COUNT = 1000
30    _MSG_LINE_LENGTH = 80
31    _MSG_MAGIC = 'ramoop_test'
32
33    def run_once(self, client_ip):
34        """
35        Run the test.
36        """
37        if not client_ip:
38            error.TestError("Must provide client's IP address to test")
39
40        self._client = hosts.create_host(client_ip)
41        self._client_at = autotest.Autotest(self._client)
42
43        self._run_test(self._do_reboot, '.*Restarting system.*$')
44
45        if self._client.check_for_lkdtm():
46            self._run_test(self._do_kernel_panic, '.*lkdtm:.*PANIC$')
47            self._run_test(self._do_kernel_bug, '.*lkdtm:.*BUG$')
48        else:
49            logging.warn('DUT did not have kernel dump test module')
50
51        self._run_test(self._do_reboot_with_suspend, '.*Restarting system.*$')
52
53    def _run_test(self, test_function, sig_pattern):
54        """
55        Run the test using by write random message to kernel log. Then
56        restart/crash the kernel and then verify integrity of console-ramoop
57
58        @param test_function: fuction to call to reboot / crash DUT
59        @param sig_pattern: regex of kernel log message generate when reboot
60                            or crash by test_function
61        """
62
63        msg = self._generate_random_msg()
64
65        for line in msg:
66            cmd = 'echo "%s" > %s' % (line, self._KMSG_PATH)
67            self._client.run(cmd)
68
69        test_function()
70
71        cmd = 'cat %s' % self._RAMOOP_PATH
72        ramoop = self._client.run(cmd).stdout
73
74        self._verify_random_msg(ramoop, msg, sig_pattern)
75
76    def _do_reboot(self):
77        """
78        Reboot host machine
79        """
80        logging.info('Server: reboot client')
81        try:
82            self._client.reboot()
83        except error.AutoservRebootError as err:
84            raise error.TestFail('%s.\nTest failed with error %s' % (
85                    traceback.format_exc(), str(err)))
86
87    def _do_reboot_with_suspend(self):
88        """
89        Reboot host machine after suspend once
90        """
91        self._client.suspend(suspend_time=15)
92
93        logging.info('Server: reboot client')
94        try:
95            self._client.reboot()
96        except error.AutoservRebootError as err:
97            raise error.TestFail('%s.\nTest failed with error %s' % (
98                    traceback.format_exc(), str(err)))
99
100    def _do_kernel_panic(self):
101        """
102        Cause kernel panic using kernel dump test module
103        """
104        logging.info('Server: make client kernel panic')
105
106        cmd = 'echo PANIC > %s' % self._LKDTM_PATH
107        boot_id = self._client.get_boot_id()
108        self._client.run(cmd, ignore_status=True)
109        self._client.wait_for_restart(old_boot_id=boot_id)
110
111    def _do_kernel_bug(self):
112        """
113        Cause kernel bug using kernel dump test module
114        """
115        logging.info('Server: make client kernel bug')
116
117        cmd = 'echo BUG > %s' % self._LKDTM_PATH
118        boot_id = self._client.get_boot_id()
119        self._client.run(cmd, ignore_status=True)
120        self._client.wait_for_restart(old_boot_id=boot_id)
121
122    def _generate_random_msg(self):
123        """
124        Generate random message to put in kernel log
125        The message format is [magic string]: [3 digit id] [random char/digit]
126        """
127        valid_char = string.letters + string.digits
128        ret = []
129        for i in range(self._MSG_LINE_COUNT):
130            line = '%s: %03d ' % (self._MSG_MAGIC, i)
131            for _ in range(self._MSG_LINE_LENGTH):
132                line += random.choice(valid_char)
133            ret += [line]
134        return ret
135
136    def _verify_random_msg(self, ramoop, src_msg, sig_pattern):
137        """
138        Verify random message generated by _generate_random_msg
139
140        There are 3 things to verify.
141        1. At least one random message exist. (earlier random message may be
142           cutoff because console-ramoops has limited size.
143        2. Integrity of random message.
144        3. Signature of reboot / kernel crash
145
146        @param ramoop: console-ramoops file in DUT
147        @param src_msg: message write to kernel log
148        @param sig_patterm: regex of kernel log to verify
149        """
150        #                   time stamp     magic   id      random
151        pattern = str("\\[ *(\\d+\\.\\d+)\\].*(%s: (\\d{3}) \\w{%d})" %
152            (self._MSG_MAGIC, self._MSG_LINE_LENGTH))
153        matcher = re.compile(pattern)
154
155        logging.info('%s', pattern)
156
157        state = 'find_rand_msg'
158
159        last_timestamp = 0
160        for line in ramoop.split('\n'):
161            if state == 'find_rand_msg':
162                if not matcher.match(line):
163                    continue
164                last_id = int(matcher.split(line)[3]) - 1
165                state = 'match_rand_pattern'
166                logging.info("%s: %s", state, line)
167
168            if state == 'match_rand_pattern':
169                if not matcher.match(line):
170                    continue
171                components = matcher.split(line)
172                timestamp = float(components[1])
173                msg = components[2]
174                id = int(components[3])
175
176                if timestamp < last_timestamp:
177                    logging.info("last_timestamp: %f, timestamp: %d",
178                                 last_timestamp, timestamp)
179                    raise error.TestFail('Found reverse time stamp.')
180                last_timestamp = timestamp
181
182                if id != last_id + 1:
183                    logging.info("last_id: %d, id: %d", last_id, id)
184                    raise error.TestFail('Found missing message.')
185                last_id = id
186
187                if msg != src_msg[id]:
188                    logging.info("cur_msg: '%s'", msg)
189                    logging.info("src_msg: '%s'", src_msg[id])
190                    raise error.TestFail('Found corrupt message.')
191
192                if id == self._MSG_LINE_COUNT - 1:
193                    state = 'find_signature'
194
195            if state == 'find_signature':
196                if re.match(sig_pattern, line):
197                    logging.info("%s: %s", state, line)
198                    break
199
200        # error case: successful run must break in find_sigature state
201        else:
202            raise error.TestFail(str('Verify failed at state %s' % state))
203