1import itertools
2
3import Util
4from ShCommands import Command, Pipeline, Seq
5
6class ShLexer:
7    def __init__(self, data, win32Escapes = False):
8        self.data = data
9        self.pos = 0
10        self.end = len(data)
11        self.win32Escapes = win32Escapes
12
13    def eat(self):
14        c = self.data[self.pos]
15        self.pos += 1
16        return c
17
18    def look(self):
19        return self.data[self.pos]
20
21    def maybe_eat(self, c):
22        """
23        maybe_eat(c) - Consume the character c if it is the next character,
24        returning True if a character was consumed. """
25        if self.data[self.pos] == c:
26            self.pos += 1
27            return True
28        return False
29
30    def lex_arg_fast(self, c):
31        # Get the leading whitespace free section.
32        chunk = self.data[self.pos - 1:].split(None, 1)[0]
33
34        # If it has special characters, the fast path failed.
35        if ('|' in chunk or '&' in chunk or
36            '<' in chunk or '>' in chunk or
37            "'" in chunk or '"' in chunk or
38            '\\' in chunk):
39            return None
40
41        self.pos = self.pos - 1 + len(chunk)
42        return chunk
43
44    def lex_arg_slow(self, c):
45        if c in "'\"":
46            str = self.lex_arg_quoted(c)
47        else:
48            str = c
49        while self.pos != self.end:
50            c = self.look()
51            if c.isspace() or c in "|&":
52                break
53            elif c in '><':
54                # This is an annoying case; we treat '2>' as a single token so
55                # we don't have to track whitespace tokens.
56
57                # If the parse string isn't an integer, do the usual thing.
58                if not str.isdigit():
59                    break
60
61                # Otherwise, lex the operator and convert to a redirection
62                # token.
63                num = int(str)
64                tok = self.lex_one_token()
65                assert isinstance(tok, tuple) and len(tok) == 1
66                return (tok[0], num)
67            elif c == '"':
68                self.eat()
69                str += self.lex_arg_quoted('"')
70            elif c == "'":
71                self.eat()
72                str += self.lex_arg_quoted("'")
73            elif not self.win32Escapes and c == '\\':
74                # Outside of a string, '\\' escapes everything.
75                self.eat()
76                if self.pos == self.end:
77                    Util.warning("escape at end of quoted argument in: %r" %
78                                 self.data)
79                    return str
80                str += self.eat()
81            else:
82                str += self.eat()
83        return str
84
85    def lex_arg_quoted(self, delim):
86        str = ''
87        while self.pos != self.end:
88            c = self.eat()
89            if c == delim:
90                return str
91            elif c == '\\' and delim == '"':
92                # Inside a '"' quoted string, '\\' only escapes the quote
93                # character and backslash, otherwise it is preserved.
94                if self.pos == self.end:
95                    Util.warning("escape at end of quoted argument in: %r" %
96                                 self.data)
97                    return str
98                c = self.eat()
99                if c == '"': #
100                    str += '"'
101                elif c == '\\':
102                    str += '\\'
103                else:
104                    str += '\\' + c
105            else:
106                str += c
107        Util.warning("missing quote character in %r" % self.data)
108        return str
109
110    def lex_arg_checked(self, c):
111        pos = self.pos
112        res = self.lex_arg_fast(c)
113        end = self.pos
114
115        self.pos = pos
116        reference = self.lex_arg_slow(c)
117        if res is not None:
118            if res != reference:
119                raise ValueError,"Fast path failure: %r != %r" % (res, reference)
120            if self.pos != end:
121                raise ValueError,"Fast path failure: %r != %r" % (self.pos, end)
122        return reference
123
124    def lex_arg(self, c):
125        return self.lex_arg_fast(c) or self.lex_arg_slow(c)
126
127    def lex_one_token(self):
128        """
129        lex_one_token - Lex a single 'sh' token. """
130
131        c = self.eat()
132        if c in ';!':
133            return (c,)
134        if c == '|':
135            if self.maybe_eat('|'):
136                return ('||',)
137            return (c,)
138        if c == '&':
139            if self.maybe_eat('&'):
140                return ('&&',)
141            if self.maybe_eat('>'):
142                return ('&>',)
143            return (c,)
144        if c == '>':
145            if self.maybe_eat('&'):
146                return ('>&',)
147            if self.maybe_eat('>'):
148                return ('>>',)
149            return (c,)
150        if c == '<':
151            if self.maybe_eat('&'):
152                return ('<&',)
153            if self.maybe_eat('>'):
154                return ('<<',)
155            return (c,)
156
157        return self.lex_arg(c)
158
159    def lex(self):
160        while self.pos != self.end:
161            if self.look().isspace():
162                self.eat()
163            else:
164                yield self.lex_one_token()
165
166###
167
168class ShParser:
169    def __init__(self, data, win32Escapes = False):
170        self.data = data
171        self.tokens = ShLexer(data, win32Escapes = win32Escapes).lex()
172
173    def lex(self):
174        try:
175            return self.tokens.next()
176        except StopIteration:
177            return None
178
179    def look(self):
180        next = self.lex()
181        if next is not None:
182            self.tokens = itertools.chain([next], self.tokens)
183        return next
184
185    def parse_command(self):
186        tok = self.lex()
187        if not tok:
188            raise ValueError,"empty command!"
189        if isinstance(tok, tuple):
190            raise ValueError,"syntax error near unexpected token %r" % tok[0]
191
192        args = [tok]
193        redirects = []
194        while 1:
195            tok = self.look()
196
197            # EOF?
198            if tok is None:
199                break
200
201            # If this is an argument, just add it to the current command.
202            if isinstance(tok, str):
203                args.append(self.lex())
204                continue
205
206            # Otherwise see if it is a terminator.
207            assert isinstance(tok, tuple)
208            if tok[0] in ('|',';','&','||','&&'):
209                break
210
211            # Otherwise it must be a redirection.
212            op = self.lex()
213            arg = self.lex()
214            if not arg:
215                raise ValueError,"syntax error near token %r" % op[0]
216            redirects.append((op, arg))
217
218        return Command(args, redirects)
219
220    def parse_pipeline(self):
221        negate = False
222        if self.look() == ('!',):
223            self.lex()
224            negate = True
225
226        commands = [self.parse_command()]
227        while self.look() == ('|',):
228            self.lex()
229            commands.append(self.parse_command())
230        return Pipeline(commands, negate)
231
232    def parse(self):
233        lhs = self.parse_pipeline()
234
235        while self.look():
236            operator = self.lex()
237            assert isinstance(operator, tuple) and len(operator) == 1
238
239            if not self.look():
240                raise ValueError, "missing argument to operator %r" % operator[0]
241
242            # FIXME: Operator precedence!!
243            lhs = Seq(lhs, operator[0], self.parse_pipeline())
244
245        return lhs
246
247###
248
249import unittest
250
251class TestShLexer(unittest.TestCase):
252    def lex(self, str, *args, **kwargs):
253        return list(ShLexer(str, *args, **kwargs).lex())
254
255    def test_basic(self):
256        self.assertEqual(self.lex('a|b>c&d<e'),
257                         ['a', ('|',), 'b', ('>',), 'c', ('&',), 'd',
258                          ('<',), 'e'])
259
260    def test_redirection_tokens(self):
261        self.assertEqual(self.lex('a2>c'),
262                         ['a2', ('>',), 'c'])
263        self.assertEqual(self.lex('a 2>c'),
264                         ['a', ('>',2), 'c'])
265
266    def test_quoting(self):
267        self.assertEqual(self.lex(""" 'a' """),
268                         ['a'])
269        self.assertEqual(self.lex(""" "hello\\"world" """),
270                         ['hello"world'])
271        self.assertEqual(self.lex(""" "hello\\'world" """),
272                         ["hello\\'world"])
273        self.assertEqual(self.lex(""" "hello\\\\world" """),
274                         ["hello\\world"])
275        self.assertEqual(self.lex(""" he"llo wo"rld """),
276                         ["hello world"])
277        self.assertEqual(self.lex(""" a\\ b a\\\\b """),
278                         ["a b", "a\\b"])
279        self.assertEqual(self.lex(""" "" "" """),
280                         ["", ""])
281        self.assertEqual(self.lex(""" a\\ b """, win32Escapes = True),
282                         ['a\\', 'b'])
283
284class TestShParse(unittest.TestCase):
285    def parse(self, str):
286        return ShParser(str).parse()
287
288    def test_basic(self):
289        self.assertEqual(self.parse('echo hello'),
290                         Pipeline([Command(['echo', 'hello'], [])], False))
291        self.assertEqual(self.parse('echo ""'),
292                         Pipeline([Command(['echo', ''], [])], False))
293        self.assertEqual(self.parse("""echo -DFOO='a'"""),
294                         Pipeline([Command(['echo', '-DFOO=a'], [])], False))
295        self.assertEqual(self.parse('echo -DFOO="a"'),
296                         Pipeline([Command(['echo', '-DFOO=a'], [])], False))
297
298    def test_redirection(self):
299        self.assertEqual(self.parse('echo hello > c'),
300                         Pipeline([Command(['echo', 'hello'],
301                                           [((('>'),), 'c')])], False))
302        self.assertEqual(self.parse('echo hello > c >> d'),
303                         Pipeline([Command(['echo', 'hello'], [(('>',), 'c'),
304                                                     (('>>',), 'd')])], False))
305        self.assertEqual(self.parse('a 2>&1'),
306                         Pipeline([Command(['a'], [(('>&',2), '1')])], False))
307
308    def test_pipeline(self):
309        self.assertEqual(self.parse('a | b'),
310                         Pipeline([Command(['a'], []),
311                                   Command(['b'], [])],
312                                  False))
313
314        self.assertEqual(self.parse('a | b | c'),
315                         Pipeline([Command(['a'], []),
316                                   Command(['b'], []),
317                                   Command(['c'], [])],
318                                  False))
319
320        self.assertEqual(self.parse('! a'),
321                         Pipeline([Command(['a'], [])],
322                                  True))
323
324    def test_list(self):
325        self.assertEqual(self.parse('a ; b'),
326                         Seq(Pipeline([Command(['a'], [])], False),
327                             ';',
328                             Pipeline([Command(['b'], [])], False)))
329
330        self.assertEqual(self.parse('a & b'),
331                         Seq(Pipeline([Command(['a'], [])], False),
332                             '&',
333                             Pipeline([Command(['b'], [])], False)))
334
335        self.assertEqual(self.parse('a && b'),
336                         Seq(Pipeline([Command(['a'], [])], False),
337                             '&&',
338                             Pipeline([Command(['b'], [])], False)))
339
340        self.assertEqual(self.parse('a || b'),
341                         Seq(Pipeline([Command(['a'], [])], False),
342                             '||',
343                             Pipeline([Command(['b'], [])], False)))
344
345        self.assertEqual(self.parse('a && b || c'),
346                         Seq(Seq(Pipeline([Command(['a'], [])], False),
347                                 '&&',
348                                 Pipeline([Command(['b'], [])], False)),
349                             '||',
350                             Pipeline([Command(['c'], [])], False)))
351
352if __name__ == '__main__':
353    unittest.main()
354