1#
2#    Copyright 2015 ARM Limited
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import re
17import time
18import logging
19
20from devlib.utils.serial_port import TIMEOUT
21
22
23logger = logging.getLogger('U-Boot')
24
25
26class UbootMenu(object):
27    """
28    Allows navigating Das U-boot menu over serial (it relies on a pexpect connection).
29
30    """
31
32    option_regex = re.compile(r'^\[(\d+)\]\s+([^\r]+)\r\n', re.M)
33    prompt_regex = re.compile(r'^([^\r\n]+):\s*', re.M)
34    invalid_regex = re.compile(r'Invalid input \(max (\d+)\)', re.M)
35
36    load_delay = 1  # seconds
37    default_timeout = 60  # seconds
38
39    def __init__(self, conn, start_prompt='Hit any key to stop autoboot'):
40        """
41        :param conn: A serial connection as returned by ``pexect.spawn()``.
42        :param prompt: U-Boot menu prompt
43        :param start_prompt: The starting prompt to wait for during ``open()``.
44
45        """
46        self.conn = conn
47        self.conn.crlf = '\n\r'  # TODO: this has *got* to be a bug in U-Boot...
48        self.start_prompt = start_prompt
49        self.options = {}
50        self.prompt = None
51
52    def open(self, timeout=default_timeout):
53        """
54        "Open" the UEFI menu by sending an interrupt on STDIN after seeing the
55        starting prompt (configurable upon creation of the ``UefiMenu`` object.
56
57        """
58        self.conn.expect(self.start_prompt, timeout)
59        self.conn.sendline('')
60        time.sleep(self.load_delay)
61        self.conn.readline()  # garbage
62        self.conn.sendline('')
63        self.prompt = self.conn.readline().strip()
64
65    def getenv(self):
66        output = self.enter('printenv')
67        result = {}
68        for line in output.split('\n'):
69            if '=' in line:
70                variable, value = line.split('=', 1)
71                result[variable.strip()] = value.strip()
72        return result
73
74    def setenv(self, variable, value, force=False):
75        force_str = ' -f' if force else ''
76        if value is not None:
77            command = 'setenv{} {} {}'.format(force_str, variable, value)
78        else:
79            command = 'setenv{} {}'.format(force_str, variable)
80        return self.enter(command)
81
82    def boot(self):
83        self.write_characters('boot')
84
85    def nudge(self):
86        """Send a little nudge to ensure there is something to read. This is useful when you're not
87        sure if all out put from the serial has been read already."""
88        self.enter('')
89
90    def enter(self, value, delay=load_delay):
91        """Like ``select()`` except no resolution is performed -- the value is sent directly
92        to the serial connection."""
93        # Empty the buffer first, so that only response to the input about to
94        # be sent will be processed by subsequent commands.
95        value = str(value)
96        self.empty_buffer()
97        self.write_characters(value)
98        self.conn.expect(self.prompt, timeout=delay)
99        return self.conn.before
100
101    def write_characters(self, line):
102        line = line.rstrip('\r\n')
103        for c in line:
104            self.conn.send(c)
105            time.sleep(0.05)
106        self.conn.sendline('')
107
108    def empty_buffer(self):
109        try:
110            while True:
111                time.sleep(0.1)
112                self.conn.read_nonblocking(size=1024, timeout=0.1)
113        except TIMEOUT:
114            pass
115        self.conn.buffer = ''
116
117