1#    Copyright 2014-2015 ARM Limited
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15
16
17import re
18import time
19import logging
20from copy import copy
21
22from devlib.utils.serial_port import write_characters, TIMEOUT
23from devlib.utils.types import boolean
24
25
26logger = logging.getLogger('UEFI')
27
28
29class UefiConfig(object):
30
31    def __init__(self, config_dict):
32        if isinstance(config_dict, UefiConfig):
33            self.__dict__ = copy(config_dict.__dict__)
34        else:
35            try:
36                self.image_name = config_dict['image_name']
37                self.image_args = config_dict['image_args']
38                self.fdt_support = boolean(config_dict['fdt_support'])
39            except KeyError as e:
40                raise ValueError('Missing mandatory parameter for UEFI entry config: "{}"'.format(e))
41            self.initrd = config_dict.get('initrd')
42            self.fdt_path = config_dict.get('fdt_path')
43            if self.fdt_path and not self.fdt_support:
44                raise ValueError('FDT path has been specfied for UEFI entry, when FDT support is "False"')
45
46
47class UefiMenu(object):
48    """
49    Allows navigating UEFI menu over serial (it relies on a pexpect connection).
50
51    """
52
53    option_regex = re.compile(r'^\[(\d+)\]\s+([^\r]+)\r\n', re.M)
54    prompt_regex = re.compile(r'^(\S[^\r\n]+):\s*', re.M)
55    invalid_regex = re.compile(r'Invalid input \(max (\d+)\)', re.M)
56
57    load_delay = 1  # seconds
58    default_timeout = 60  # seconds
59
60    def __init__(self, conn, prompt='The default boot selection will start in'):
61        """
62        :param conn: A serial connection as returned by ``pexect.spawn()``.
63        :param prompt: The starting prompt to wait for during ``open()``.
64
65        """
66        self.conn = conn
67        self.start_prompt = prompt
68        self.options = {}
69        self.prompt = None
70        self.attempting_invalid_retry = False
71
72    def wait(self, timeout=default_timeout):
73        """
74        "Open" the UEFI menu by sending an interrupt on STDIN after seeing the
75        starting prompt (configurable upon creation of the ``UefiMenu`` object.
76
77        """
78        self.conn.expect(self.start_prompt, timeout)
79        self.connect()
80
81    def connect(self, timeout=default_timeout):
82        self.nudge()
83        time.sleep(self.load_delay)
84        self.read_menu(timeout=timeout)
85
86    def create_entry(self, name, config):
87        """Create a new UEFI entry using the parameters. The menu is assumed
88        to be at the top level. Upon return, the menu will be at the top level."""
89        logger.debug('Creating UEFI entry {}'.format(name))
90        self.nudge()
91        self.select('Boot Manager')
92        self.select('Add Boot Device Entry')
93        self.select('NOR Flash')
94        self.enter(config.image_name)
95        self.enter('y' if config.fdt_support else 'n')
96        if config.initrd:
97            self.enter('y')
98            self.enter(config.initrd)
99        else:
100            self.enter('n')
101        self.enter(config.image_args)
102        self.enter(name)
103
104        if config.fdt_path:
105            self.select('Update FDT path')
106            self.enter(config.fdt_path)
107
108        self.select('Return to main menu')
109
110    def delete_entry(self, name):
111        """Delete the specified UEFI entry. The menu is assumed
112        to be at the top level. Upon return, the menu will be at the top level."""
113        logger.debug('Removing UEFI entry {}'.format(name))
114        self.nudge()
115        self.select('Boot Manager')
116        self.select('Remove Boot Device Entry')
117        self.select(name)
118        self.select('Return to main menu')
119
120    def select(self, option, timeout=default_timeout):
121        """
122        Select the specified option from the current menu.
123
124        :param option: Could be an ``int`` index of the option, or a string/regex to
125                       match option text against.
126        :param timeout: If a non-``int`` option is specified, the option list may need
127                        need to be parsed (if it hasn't been already), this may block
128                        and the timeout is used to cap that , resulting in a ``TIMEOUT``
129                        exception.
130        :param delay: A fixed delay to wait after sending the input to the serial connection.
131                      This should be set if input this action is known to result in a
132                      long-running operation.
133
134        """
135        if isinstance(option, basestring):
136            option = self.get_option_index(option, timeout)
137        self.enter(option)
138
139    def enter(self, value, delay=load_delay):
140        """Like ``select()`` except no resolution is performed -- the value is sent directly
141        to the serial connection."""
142        # Empty the buffer first, so that only response to the input about to
143        # be sent will be processed by subsequent commands.
144        value = str(value)
145        self._reset()
146        write_characters(self.conn, value)
147        # TODO: in case the value is long an complicated, things may get
148        # screwed up (e.g. there may be line breaks injected), additionally,
149        # special chars might cause regex to fail. To avoid these issues i'm
150        # only matching against the first 5 chars of the value. This is
151        # entirely arbitrary and I'll probably have to find a better way of
152        # doing this at some point.
153        self.conn.expect(value[:5], timeout=delay)
154        time.sleep(self.load_delay)
155
156    def read_menu(self, timeout=default_timeout):
157        """Parse serial output to get the menu options and the following prompt."""
158        attempting_timeout_retry = False
159        self.attempting_invalid_retry = False
160        while True:
161            index = self.conn.expect([self.option_regex, self.prompt_regex, self.invalid_regex, TIMEOUT],
162                                     timeout=timeout)
163            match = self.conn.match
164            if index == 0:  # matched menu option
165                self.options[match.group(1)] = match.group(2)
166            elif index == 1:  # matched prompt
167                self.prompt = match.group(1)
168                break
169            elif index == 2:  # matched invalid selection
170                # We've sent an invalid input (which includes an empty line) at
171                # the top-level menu. To get back the menu options, it seems we
172                # need to enter what the error reports as the max + 1, so...
173                if not self.attempting_invalid_retry:
174                    self.attempting_invalid_retry = True
175                    val = int(match.group(1))
176                    self.empty_buffer()
177                    self.enter(val)
178                    self.select('Return to main menu')
179                    self.attempting_invalid_retry = False
180                else:   # OK, that didn't work; panic!
181                    raise RuntimeError('Could not read menu entries stuck on "{}" prompt'.format(self.prompt))
182            elif index == 3:  # timed out
183                if not attempting_timeout_retry:
184                    attempting_timeout_retry = True
185                    self.nudge()
186                else:  # Didn't help. Run away!
187                    raise RuntimeError('Did not see a valid UEFI menu.')
188            else:
189                raise AssertionError('Unexpected response waiting for UEFI menu')  # should never get here
190
191    def list_options(self, timeout=default_timeout):
192        """Returns the menu index of the specified option text (uses regex matching). If the option
193        is not in the current menu, ``LookupError`` will be raised."""
194        if not self.prompt:
195            self.read_menu(timeout)
196        return self.options.items()
197
198    def get_option_index(self, text, timeout=default_timeout):
199        """Returns the menu index of the specified option text (uses regex matching). If the option
200        is not in the current menu, ``LookupError`` will be raised."""
201        if not self.prompt:
202            self.read_menu(timeout)
203        for k, v in self.options.iteritems():
204            if re.search(text, v):
205                return k
206        raise LookupError(text)
207
208    def has_option(self, text, timeout=default_timeout):
209        """Returns ``True`` if at least one of the options in the current menu has
210        matched (using regex) the specified text."""
211        try:
212            self.get_option_index(text, timeout)
213            return True
214        except LookupError:
215            return False
216
217    def nudge(self):
218        """Send a little nudge to ensure there is something to read. This is useful when you're not
219        sure if all out put from the serial has been read already."""
220        self.enter('')
221
222    def empty_buffer(self):
223        """Read everything from the serial and clear the internal pexpect buffer. This ensures
224        that the next ``expect()`` call will time out (unless further input will be sent to the
225        serial beforehand. This is used to create a "known" state and avoid unexpected matches."""
226        try:
227            while True:
228                time.sleep(0.1)
229                self.conn.read_nonblocking(size=1024, timeout=0.1)
230        except TIMEOUT:
231            pass
232        self.conn.buffer = ''
233
234    def _reset(self):
235        self.options = {}
236        self.prompt = None
237        self.empty_buffer()
238
239
240