1#! /usr/bin/env python
2
3## This file is part of Scapy
4## See http://www.secdev.org/projects/scapy for more informations
5## Copyright (C) Philippe Biondi <phil@secdev.org>
6## This program is published under a GPLv2 license
7
8from __future__ import print_function
9import os
10import subprocess
11import itertools
12import collections
13import time
14import scapy.modules.six as six
15from threading import Lock, Thread
16import scapy.utils
17
18from scapy.automaton import Message, select_objects, SelectableObject
19from scapy.consts import WINDOWS
20from scapy.error import log_interactive, warning
21from scapy.config import conf
22from scapy.utils import get_temp_file, do_graph
23
24import scapy.arch
25
26class PipeEngine(SelectableObject):
27    pipes = {}
28    @classmethod
29    def list_pipes(cls):
30        for pn,pc in sorted(cls.pipes.items()):
31            doc = pc.__doc__ or ""
32            if doc:
33                doc = doc.splitlines()[0]
34            print("%20s: %s" % (pn, doc))
35    @classmethod
36    def list_pipes_detailed(cls):
37        for pn,pc in sorted(cls.pipes.items()):
38            if pc.__doc__:
39                print("###### %s\n %s" % (pn ,pc.__doc__))
40            else:
41                print("###### %s" % pn)
42
43    def __init__(self, *pipes):
44        self.active_pipes = set()
45        self.active_sources = set()
46        self.active_drains = set()
47        self.active_sinks = set()
48        self._add_pipes(*pipes)
49        self.thread_lock = Lock()
50        self.command_lock = Lock()
51        self.__fd_queue = collections.deque()
52        self.__fdr,self.__fdw = os.pipe()
53        self.thread = None
54    def __getattr__(self, attr):
55        if attr.startswith("spawn_"):
56            dname = attr[6:]
57            if dname in self.pipes:
58                def f(*args, **kargs):
59                    k = self.pipes[dname]
60                    p = k(*args, **kargs)
61                    self.add(p)
62                    return p
63                return f
64        raise AttributeError(attr)
65
66    def check_recv(self):
67        """As select.select is not available, we check if there
68        is some data to read by using a list that stores pointers."""
69        return len(self.__fd_queue) > 0
70
71    def fileno(self):
72        return self.__fdr
73
74    def _read_cmd(self):
75        os.read(self.__fdr,1)
76        return self.__fd_queue.popleft()
77
78    def _write_cmd(self, _cmd):
79        self.__fd_queue.append(_cmd)
80        os.write(self.__fdw, b"X")
81        self.call_release()
82
83    def add_one_pipe(self, pipe):
84        self.active_pipes.add(pipe)
85        if isinstance(pipe, Source):
86            self.active_sources.add(pipe)
87        if isinstance(pipe, Drain):
88            self.active_drains.add(pipe)
89        if isinstance(pipe, Sink):
90            self.active_sinks.add(pipe)
91
92    def get_pipe_list(self, pipe):
93        def flatten(p, l):
94            l.add(p)
95            for q in p.sources|p.sinks|p.high_sources|p.high_sinks:
96                if q not in l:
97                    flatten(q, l)
98        pl = set()
99        flatten(pipe, pl)
100        return pl
101
102    def _add_pipes(self, *pipes):
103        pl = set()
104        for p in pipes:
105            pl |= self.get_pipe_list(p)
106        pl -= self.active_pipes
107        for q in pl:
108            self.add_one_pipe(q)
109        return pl
110
111
112    def run(self):
113        log_interactive.info("Pipe engine thread started.")
114        try:
115            for p in self.active_pipes:
116                p.start()
117            sources = self.active_sources
118            sources.add(self)
119            exhausted = set([])
120            RUN=True
121            STOP_IF_EXHAUSTED = False
122            while RUN and (not STOP_IF_EXHAUSTED or len(sources) > 1):
123                fds = select_objects(sources, 2)
124                for fd in fds:
125                    if fd is self:
126                        cmd = self._read_cmd()
127                        if cmd == "X":
128                            RUN=False
129                            break
130                        elif cmd == "B":
131                            STOP_IF_EXHAUSTED = True
132                        elif cmd == "A":
133                            sources = self.active_sources-exhausted
134                            sources.add(self)
135                        else:
136                            warning("Unknown internal pipe engine command: %r. Ignoring." % cmd)
137                    elif fd in sources:
138                        try:
139                            fd.deliver()
140                        except Exception as e:
141                            log_interactive.exception("piping from %s failed: %s" % (fd.name, e))
142                        else:
143                            if fd.exhausted():
144                                exhausted.add(fd)
145                                sources.remove(fd)
146        except KeyboardInterrupt:
147            pass
148        finally:
149            try:
150                for p in self.active_pipes:
151                    p.stop()
152            finally:
153                self.thread_lock.release()
154                log_interactive.info("Pipe engine thread stopped.")
155
156    def start(self):
157        if self.thread_lock.acquire(0):
158            _t = Thread(target=self.run)
159            _t.setDaemon(True)
160            _t.start()
161            self.thread = _t
162        else:
163            warning("Pipe engine already running")
164    def wait_and_stop(self):
165        self.stop(_cmd="B")
166    def stop(self, _cmd="X"):
167        try:
168            with self.command_lock:
169                if self.thread is not None:
170                    self._write_cmd(_cmd)
171                    self.thread.join()
172                    try:
173                        self.thread_lock.release()
174                    except:
175                        pass
176                else:
177                    warning("Pipe engine thread not running")
178        except KeyboardInterrupt:
179            print("Interrupted by user.")
180
181    def add(self, *pipes):
182        pipes = self._add_pipes(*pipes)
183        with self.command_lock:
184            if self.thread is not None:
185                for p in pipes:
186                    p.start()
187                self._write_cmd("A")
188
189    def graph(self,**kargs):
190        g=['digraph "pipe" {',"\tnode [shape=rectangle];",]
191        for p in self.active_pipes:
192            g.append('\t"%i" [label="%s"];' % (id(p), p.name))
193        g.append("")
194        g.append("\tedge [color=blue, arrowhead=vee];")
195        for p in self.active_pipes:
196            for q in p.sinks:
197                g.append('\t"%i" -> "%i";' % (id(p), id(q)))
198        g.append("")
199        g.append("\tedge [color=purple, arrowhead=veevee];")
200        for p in self.active_pipes:
201            for q in p.high_sinks:
202                g.append('\t"%i" -> "%i";' % (id(p), id(q)))
203        g.append("")
204        g.append("\tedge [color=red, arrowhead=diamond];")
205        for p in self.active_pipes:
206            for q in p.trigger_sinks:
207                g.append('\t"%i" -> "%i";' % (id(p), id(q)))
208        g.append('}')
209        graph = "\n".join(g)
210        do_graph(graph, **kargs)
211
212
213class _ConnectorLogic(object):
214    def __init__(self):
215        self.sources = set()
216        self.sinks = set()
217        self.high_sources = set()
218        self.high_sinks = set()
219        self.trigger_sources = set()
220        self.trigger_sinks = set()
221
222    def __lt__(self, other):
223        other.sinks.add(self)
224        self.sources.add(other)
225        return other
226    def __gt__(self, other):
227        self.sinks.add(other)
228        other.sources.add(self)
229        return other
230    def __eq__(self, other):
231        self > other
232        other > self
233        return other
234
235    def __lshift__(self, other):
236        self.high_sources.add(other)
237        other.high_sinks.add(self)
238        return other
239    def __rshift__(self, other):
240        self.high_sinks.add(other)
241        other.high_sources.add(self)
242        return other
243    def __floordiv__(self, other):
244        self >> other
245        other >> self
246        return other
247
248    def __xor__(self, other):
249        self.trigger_sinks.add(other)
250        other.trigger_sources.add(self)
251        return other
252
253    def __hash__(self):
254        return object.__hash__(self)
255
256class _PipeMeta(type):
257    def __new__(cls, name, bases, dct):
258        c = type.__new__(cls, name, bases, dct)
259        PipeEngine.pipes[name] = c
260        return c
261
262class Pipe(six.with_metaclass(_PipeMeta, _ConnectorLogic)):
263    def __init__(self, name=None):
264        _ConnectorLogic.__init__(self)
265        if name is None:
266            name = "%s" % (self.__class__.__name__)
267        self.name = name
268    def _send(self, msg):
269        for s in self.sinks:
270            s.push(msg)
271    def _high_send(self, msg):
272        for s in self.high_sinks:
273            s.high_push(msg)
274    def _trigger(self, msg=None):
275        for s in self.trigger_sinks:
276            s.on_trigger(msg)
277
278    def __repr__(self):
279        ct = conf.color_theme
280        s = "%s%s" % (ct.punct("<"), ct.layer_name(self.name))
281        if self.sources or self.sinks:
282            s+= " %s" % ct.punct("[")
283            if self.sources:
284                s+="%s%s" %  (ct.punct(",").join(ct.field_name(s.name) for s in self.sources),
285                              ct.field_value(">"))
286            s += ct.layer_name("#")
287            if self.sinks:
288                s+="%s%s" % (ct.field_value(">"),
289                             ct.punct(",").join(ct.field_name(s.name) for s in self.sinks))
290            s += ct.punct("]")
291
292        if self.high_sources or self.high_sinks:
293            s+= " %s" % ct.punct("[")
294            if self.high_sources:
295                s+="%s%s" %  (ct.punct(",").join(ct.field_name(s.name) for s in self.high_sources),
296                              ct.field_value(">>"))
297            s += ct.layer_name("#")
298            if self.high_sinks:
299                s+="%s%s" % (ct.field_value(">>"),
300                             ct.punct(",").join(ct.field_name(s.name) for s in self.high_sinks))
301            s += ct.punct("]")
302
303        if self.trigger_sources or self.trigger_sinks:
304            s+= " %s" % ct.punct("[")
305            if self.trigger_sources:
306                s+="%s%s" %  (ct.punct(",").join(ct.field_name(s.name) for s in self.trigger_sources),
307                              ct.field_value("^"))
308            s += ct.layer_name("#")
309            if self.trigger_sinks:
310                s+="%s%s" % (ct.field_value("^"),
311                             ct.punct(",").join(ct.field_name(s.name) for s in self.trigger_sinks))
312            s += ct.punct("]")
313
314
315        s += ct.punct(">")
316        return s
317
318class Source(Pipe, SelectableObject):
319    def __init__(self, name=None):
320        Pipe.__init__(self, name=name)
321        self.is_exhausted = False
322    def _read_message(self):
323        return Message()
324    def deliver(self):
325        msg = self._read_message
326        self._send(msg)
327    def fileno(self):
328        return None
329    def check_recv(self):
330        return False
331    def exhausted(self):
332        return self.is_exhausted
333    def start(self):
334        pass
335    def stop(self):
336        pass
337
338class Drain(Pipe):
339    """Repeat messages from low/high entries to (resp.) low/high exits
340     +-------+
341  >>-|-------|->>
342     |       |
343   >-|-------|->
344     +-------+
345"""
346    def push(self, msg):
347        self._send(msg)
348    def high_push(self, msg):
349        self._high_send(msg)
350    def start(self):
351        pass
352    def stop(self):
353        pass
354
355class Sink(Pipe):
356    def push(self, msg):
357        pass
358    def high_push(self, msg):
359        pass
360    def start(self):
361        pass
362    def stop(self):
363        pass
364
365
366class AutoSource(Source, SelectableObject):
367    def __init__(self, name=None):
368        Source.__init__(self, name=name)
369        self.__fdr,self.__fdw = os.pipe()
370        self._queue = collections.deque()
371    def fileno(self):
372        return self.__fdr
373    def check_recv(self):
374        return len(self._queue) > 0
375    def _gen_data(self, msg):
376        self._queue.append((msg,False))
377        self._wake_up()
378    def _gen_high_data(self, msg):
379        self._queue.append((msg,True))
380        self._wake_up()
381    def _wake_up(self):
382        os.write(self.__fdw, b"X")
383        self.call_release()
384    def deliver(self):
385        os.read(self.__fdr,1)
386        try:
387            msg,high = self._queue.popleft()
388        except IndexError: #empty queue. Exhausted source
389            pass
390        else:
391            if high:
392                self._high_send(msg)
393            else:
394                self._send(msg)
395
396class ThreadGenSource(AutoSource):
397    def __init__(self, name=None):
398        AutoSource.__init__(self, name=name)
399        self.RUN = False
400    def generate(self):
401        pass
402    def start(self):
403        self.RUN = True
404        Thread(target=self.generate).start()
405    def stop(self):
406        self.RUN = False
407
408
409
410class ConsoleSink(Sink):
411    """Print messages on low and high entries
412     +-------+
413  >>-|--.    |->>
414     | print |
415   >-|--'    |->
416     +-------+
417"""
418    def push(self, msg):
419        print(">%r" % msg)
420    def high_push(self, msg):
421        print(">>%r" % msg)
422
423class RawConsoleSink(Sink):
424    """Print messages on low and high entries
425     +-------+
426  >>-|--.    |->>
427     | write |
428   >-|--'    |->
429     +-------+
430"""
431    def __init__(self, name=None, newlines=True):
432        Sink.__init__(self, name=name)
433        self.newlines = newlines
434        self._write_pipe = 1
435    def push(self, msg):
436        if self.newlines:
437            msg += "\n"
438        os.write(self._write_pipe, msg.encode("utf8"))
439    def high_push(self, msg):
440        if self.newlines:
441            msg += "\n"
442        os.write(self._write_pipe, msg.encode("utf8"))
443
444class CLIFeeder(AutoSource):
445    """Send messages from python command line
446     +--------+
447  >>-|        |->>
448     | send() |
449   >-|   `----|->
450     +--------+
451"""
452    def send(self, msg):
453        self._gen_data(msg)
454    def close(self):
455        self.is_exhausted = True
456
457class CLIHighFeeder(CLIFeeder):
458    """Send messages from python command line to high output
459     +--------+
460  >>-|   .----|->>
461     | send() |
462   >-|        |->
463     +--------+
464"""
465    def send(self, msg):
466        self._gen_high_data(msg)
467
468
469class PeriodicSource(ThreadGenSource):
470    """Generage messages periodically on low exit
471     +-------+
472  >>-|       |->>
473     | msg,T |
474   >-|  `----|->
475     +-------+
476"""
477    def __init__(self, msg, period, period2=0, name=None):
478        ThreadGenSource.__init__(self,name=name)
479        if not isinstance(msg, (list, set, tuple)):
480            msg=[msg]
481        self.msg = msg
482        self.period = period
483        self.period2 = period2
484    def generate(self):
485        while self.RUN:
486            empty_gen = True
487            for m in self.msg:
488                empty_gen = False
489                self._gen_data(m)
490                time.sleep(self.period)
491            if empty_gen:
492                self.is_exhausted = True
493                self._wake_up()
494            time.sleep(self.period2)
495
496class TermSink(Sink):
497    """Print messages on low and high entries on a separate terminal
498     +-------+
499  >>-|--.    |->>
500     | print |
501   >-|--'    |->
502     +-------+
503"""
504    def __init__(self, name=None, keepterm=True, newlines=True, openearly=True):
505        Sink.__init__(self, name=name)
506        self.keepterm = keepterm
507        self.newlines = newlines
508        self.openearly = openearly
509        self.opened = False
510        if self.openearly:
511            self.start()
512    def _start_windows(self):
513        if not self.opened:
514            self.opened = True
515            self.__f = get_temp_file()
516            open(self.__f, "a").close()
517            self.name = "Scapy" if self.name is None else self.name
518            # Start a powershell in a new window and print the PID
519            cmd = "$app = Start-Process PowerShell -ArgumentList '-command &{$host.ui.RawUI.WindowTitle=\\\"%s\\\";Get-Content \\\"%s\\\" -wait}' -passthru; echo $app.Id" % (self.name, self.__f.replace("\\", "\\\\"))
520            proc = subprocess.Popen([conf.prog.powershell, cmd], stdout=subprocess.PIPE)
521            output, _ = proc.communicate()
522            # This is the process PID
523            self.pid = int(output)
524            print("PID: %d" % self.pid)
525    def _start_unix(self):
526        if not self.opened:
527            self.opened = True
528            rdesc, self.wdesc = os.pipe()
529            cmd = ["xterm"]
530            if self.name is not None:
531                cmd.extend(["-title",self.name])
532            if self.keepterm:
533                cmd.append("-hold")
534            cmd.extend(["-e", "cat <&%d" % rdesc])
535            self.proc = subprocess.Popen(cmd, close_fds=False)
536            os.close(rdesc)
537    def start(self):
538        if WINDOWS:
539            return self._start_windows()
540        else:
541            return self._start_unix()
542    def _stop_windows(self):
543        if not self.keepterm:
544            self.opened = False
545            # Recipe to kill process with PID
546            # http://code.activestate.com/recipes/347462-terminating-a-subprocess-on-windows/
547            import ctypes
548            PROCESS_TERMINATE = 1
549            handle = ctypes.windll.kernel32.OpenProcess(PROCESS_TERMINATE, False, self.pid)
550            ctypes.windll.kernel32.TerminateProcess(handle, -1)
551            ctypes.windll.kernel32.CloseHandle(handle)
552    def _stop_unix(self):
553        if not self.keepterm:
554            self.opened = False
555            self.proc.kill()
556            self.proc.wait()
557    def stop(self):
558        if WINDOWS:
559            return self._stop_windows()
560        else:
561            return self._stop_unix()
562    def _print(self, s):
563        if self.newlines:
564            s+="\n"
565        if WINDOWS:
566            wdesc = open(self.__f, "a")
567            wdesc.write(s)
568            wdesc.close()
569        else:
570            os.write(self.wdesc, s.encode())
571    def push(self, msg):
572        self._print(str(msg))
573    def high_push(self, msg):
574        self._print(str(msg))
575
576
577class QueueSink(Sink):
578    """Collect messages from high and low entries and queue them. Messages are unqueued with the .recv() method.
579     +-------+
580  >>-|--.    |->>
581     | queue |
582   >-|--'    |->
583     +-------+
584"""
585    def __init__(self, name=None):
586        Sink.__init__(self, name=name)
587        self.q = six.moves.queue.Queue()
588    def push(self, msg):
589        self.q.put(msg)
590    def high_push(self, msg):
591        self.q.put(msg)
592    def recv(self):
593        while True:
594            try:
595                return self.q.get(True, timeout=0.1)
596            except six.moves.queue.Empty:
597                pass
598
599
600class TransformDrain(Drain):
601    """Apply a function to messages on low and high entry
602     +-------+
603  >>-|--[f]--|->>
604     |       |
605   >-|--[f]--|->
606     +-------+
607"""
608    def __init__(self, f, name=None):
609        Drain.__init__(self, name=name)
610        self.f = f
611    def push(self, msg):
612        self._send(self.f(msg))
613    def high_push(self, msg):
614        self._high_send(self.f(msg))
615
616class UpDrain(Drain):
617    """Repeat messages from low entry to high exit
618     +-------+
619  >>-|    ,--|->>
620     |   /   |
621   >-|--'    |->
622     +-------+
623"""
624    def push(self, msg):
625        self._high_send(msg)
626    def high_push(self, msg):
627        pass
628
629class DownDrain(Drain):
630    """Repeat messages from high entry to low exit
631     +-------+
632  >>-|--.    |->>
633     |   \   |
634   >-|    `--|->
635     +-------+
636"""
637    def push(self, msg):
638        pass
639    def high_push(self, msg):
640        self._send(msg)
641