1#! /usr/bin/env python 2 3## This file is part of Scapy 4## See http://www.secdev.org/projects/scapy for more informations 5## Copyright (C) Philippe Biondi <phil@secdev.org> 6## This program is published under a GPLv2 license 7 8from __future__ import print_function 9import os 10import subprocess 11import itertools 12import collections 13import time 14import scapy.modules.six as six 15from threading import Lock, Thread 16import scapy.utils 17 18from scapy.automaton import Message, select_objects, SelectableObject 19from scapy.consts import WINDOWS 20from scapy.error import log_interactive, warning 21from scapy.config import conf 22from scapy.utils import get_temp_file, do_graph 23 24import scapy.arch 25 26class PipeEngine(SelectableObject): 27 pipes = {} 28 @classmethod 29 def list_pipes(cls): 30 for pn,pc in sorted(cls.pipes.items()): 31 doc = pc.__doc__ or "" 32 if doc: 33 doc = doc.splitlines()[0] 34 print("%20s: %s" % (pn, doc)) 35 @classmethod 36 def list_pipes_detailed(cls): 37 for pn,pc in sorted(cls.pipes.items()): 38 if pc.__doc__: 39 print("###### %s\n %s" % (pn ,pc.__doc__)) 40 else: 41 print("###### %s" % pn) 42 43 def __init__(self, *pipes): 44 self.active_pipes = set() 45 self.active_sources = set() 46 self.active_drains = set() 47 self.active_sinks = set() 48 self._add_pipes(*pipes) 49 self.thread_lock = Lock() 50 self.command_lock = Lock() 51 self.__fd_queue = collections.deque() 52 self.__fdr,self.__fdw = os.pipe() 53 self.thread = None 54 def __getattr__(self, attr): 55 if attr.startswith("spawn_"): 56 dname = attr[6:] 57 if dname in self.pipes: 58 def f(*args, **kargs): 59 k = self.pipes[dname] 60 p = k(*args, **kargs) 61 self.add(p) 62 return p 63 return f 64 raise AttributeError(attr) 65 66 def check_recv(self): 67 """As select.select is not available, we check if there 68 is some data to read by using a list that stores pointers.""" 69 return len(self.__fd_queue) > 0 70 71 def fileno(self): 72 return self.__fdr 73 74 def _read_cmd(self): 75 os.read(self.__fdr,1) 76 return self.__fd_queue.popleft() 77 78 def _write_cmd(self, _cmd): 79 self.__fd_queue.append(_cmd) 80 os.write(self.__fdw, b"X") 81 self.call_release() 82 83 def add_one_pipe(self, pipe): 84 self.active_pipes.add(pipe) 85 if isinstance(pipe, Source): 86 self.active_sources.add(pipe) 87 if isinstance(pipe, Drain): 88 self.active_drains.add(pipe) 89 if isinstance(pipe, Sink): 90 self.active_sinks.add(pipe) 91 92 def get_pipe_list(self, pipe): 93 def flatten(p, l): 94 l.add(p) 95 for q in p.sources|p.sinks|p.high_sources|p.high_sinks: 96 if q not in l: 97 flatten(q, l) 98 pl = set() 99 flatten(pipe, pl) 100 return pl 101 102 def _add_pipes(self, *pipes): 103 pl = set() 104 for p in pipes: 105 pl |= self.get_pipe_list(p) 106 pl -= self.active_pipes 107 for q in pl: 108 self.add_one_pipe(q) 109 return pl 110 111 112 def run(self): 113 log_interactive.info("Pipe engine thread started.") 114 try: 115 for p in self.active_pipes: 116 p.start() 117 sources = self.active_sources 118 sources.add(self) 119 exhausted = set([]) 120 RUN=True 121 STOP_IF_EXHAUSTED = False 122 while RUN and (not STOP_IF_EXHAUSTED or len(sources) > 1): 123 fds = select_objects(sources, 2) 124 for fd in fds: 125 if fd is self: 126 cmd = self._read_cmd() 127 if cmd == "X": 128 RUN=False 129 break 130 elif cmd == "B": 131 STOP_IF_EXHAUSTED = True 132 elif cmd == "A": 133 sources = self.active_sources-exhausted 134 sources.add(self) 135 else: 136 warning("Unknown internal pipe engine command: %r. Ignoring." % cmd) 137 elif fd in sources: 138 try: 139 fd.deliver() 140 except Exception as e: 141 log_interactive.exception("piping from %s failed: %s" % (fd.name, e)) 142 else: 143 if fd.exhausted(): 144 exhausted.add(fd) 145 sources.remove(fd) 146 except KeyboardInterrupt: 147 pass 148 finally: 149 try: 150 for p in self.active_pipes: 151 p.stop() 152 finally: 153 self.thread_lock.release() 154 log_interactive.info("Pipe engine thread stopped.") 155 156 def start(self): 157 if self.thread_lock.acquire(0): 158 _t = Thread(target=self.run) 159 _t.setDaemon(True) 160 _t.start() 161 self.thread = _t 162 else: 163 warning("Pipe engine already running") 164 def wait_and_stop(self): 165 self.stop(_cmd="B") 166 def stop(self, _cmd="X"): 167 try: 168 with self.command_lock: 169 if self.thread is not None: 170 self._write_cmd(_cmd) 171 self.thread.join() 172 try: 173 self.thread_lock.release() 174 except: 175 pass 176 else: 177 warning("Pipe engine thread not running") 178 except KeyboardInterrupt: 179 print("Interrupted by user.") 180 181 def add(self, *pipes): 182 pipes = self._add_pipes(*pipes) 183 with self.command_lock: 184 if self.thread is not None: 185 for p in pipes: 186 p.start() 187 self._write_cmd("A") 188 189 def graph(self,**kargs): 190 g=['digraph "pipe" {',"\tnode [shape=rectangle];",] 191 for p in self.active_pipes: 192 g.append('\t"%i" [label="%s"];' % (id(p), p.name)) 193 g.append("") 194 g.append("\tedge [color=blue, arrowhead=vee];") 195 for p in self.active_pipes: 196 for q in p.sinks: 197 g.append('\t"%i" -> "%i";' % (id(p), id(q))) 198 g.append("") 199 g.append("\tedge [color=purple, arrowhead=veevee];") 200 for p in self.active_pipes: 201 for q in p.high_sinks: 202 g.append('\t"%i" -> "%i";' % (id(p), id(q))) 203 g.append("") 204 g.append("\tedge [color=red, arrowhead=diamond];") 205 for p in self.active_pipes: 206 for q in p.trigger_sinks: 207 g.append('\t"%i" -> "%i";' % (id(p), id(q))) 208 g.append('}') 209 graph = "\n".join(g) 210 do_graph(graph, **kargs) 211 212 213class _ConnectorLogic(object): 214 def __init__(self): 215 self.sources = set() 216 self.sinks = set() 217 self.high_sources = set() 218 self.high_sinks = set() 219 self.trigger_sources = set() 220 self.trigger_sinks = set() 221 222 def __lt__(self, other): 223 other.sinks.add(self) 224 self.sources.add(other) 225 return other 226 def __gt__(self, other): 227 self.sinks.add(other) 228 other.sources.add(self) 229 return other 230 def __eq__(self, other): 231 self > other 232 other > self 233 return other 234 235 def __lshift__(self, other): 236 self.high_sources.add(other) 237 other.high_sinks.add(self) 238 return other 239 def __rshift__(self, other): 240 self.high_sinks.add(other) 241 other.high_sources.add(self) 242 return other 243 def __floordiv__(self, other): 244 self >> other 245 other >> self 246 return other 247 248 def __xor__(self, other): 249 self.trigger_sinks.add(other) 250 other.trigger_sources.add(self) 251 return other 252 253 def __hash__(self): 254 return object.__hash__(self) 255 256class _PipeMeta(type): 257 def __new__(cls, name, bases, dct): 258 c = type.__new__(cls, name, bases, dct) 259 PipeEngine.pipes[name] = c 260 return c 261 262class Pipe(six.with_metaclass(_PipeMeta, _ConnectorLogic)): 263 def __init__(self, name=None): 264 _ConnectorLogic.__init__(self) 265 if name is None: 266 name = "%s" % (self.__class__.__name__) 267 self.name = name 268 def _send(self, msg): 269 for s in self.sinks: 270 s.push(msg) 271 def _high_send(self, msg): 272 for s in self.high_sinks: 273 s.high_push(msg) 274 def _trigger(self, msg=None): 275 for s in self.trigger_sinks: 276 s.on_trigger(msg) 277 278 def __repr__(self): 279 ct = conf.color_theme 280 s = "%s%s" % (ct.punct("<"), ct.layer_name(self.name)) 281 if self.sources or self.sinks: 282 s+= " %s" % ct.punct("[") 283 if self.sources: 284 s+="%s%s" % (ct.punct(",").join(ct.field_name(s.name) for s in self.sources), 285 ct.field_value(">")) 286 s += ct.layer_name("#") 287 if self.sinks: 288 s+="%s%s" % (ct.field_value(">"), 289 ct.punct(",").join(ct.field_name(s.name) for s in self.sinks)) 290 s += ct.punct("]") 291 292 if self.high_sources or self.high_sinks: 293 s+= " %s" % ct.punct("[") 294 if self.high_sources: 295 s+="%s%s" % (ct.punct(",").join(ct.field_name(s.name) for s in self.high_sources), 296 ct.field_value(">>")) 297 s += ct.layer_name("#") 298 if self.high_sinks: 299 s+="%s%s" % (ct.field_value(">>"), 300 ct.punct(",").join(ct.field_name(s.name) for s in self.high_sinks)) 301 s += ct.punct("]") 302 303 if self.trigger_sources or self.trigger_sinks: 304 s+= " %s" % ct.punct("[") 305 if self.trigger_sources: 306 s+="%s%s" % (ct.punct(",").join(ct.field_name(s.name) for s in self.trigger_sources), 307 ct.field_value("^")) 308 s += ct.layer_name("#") 309 if self.trigger_sinks: 310 s+="%s%s" % (ct.field_value("^"), 311 ct.punct(",").join(ct.field_name(s.name) for s in self.trigger_sinks)) 312 s += ct.punct("]") 313 314 315 s += ct.punct(">") 316 return s 317 318class Source(Pipe, SelectableObject): 319 def __init__(self, name=None): 320 Pipe.__init__(self, name=name) 321 self.is_exhausted = False 322 def _read_message(self): 323 return Message() 324 def deliver(self): 325 msg = self._read_message 326 self._send(msg) 327 def fileno(self): 328 return None 329 def check_recv(self): 330 return False 331 def exhausted(self): 332 return self.is_exhausted 333 def start(self): 334 pass 335 def stop(self): 336 pass 337 338class Drain(Pipe): 339 """Repeat messages from low/high entries to (resp.) low/high exits 340 +-------+ 341 >>-|-------|->> 342 | | 343 >-|-------|-> 344 +-------+ 345""" 346 def push(self, msg): 347 self._send(msg) 348 def high_push(self, msg): 349 self._high_send(msg) 350 def start(self): 351 pass 352 def stop(self): 353 pass 354 355class Sink(Pipe): 356 def push(self, msg): 357 pass 358 def high_push(self, msg): 359 pass 360 def start(self): 361 pass 362 def stop(self): 363 pass 364 365 366class AutoSource(Source, SelectableObject): 367 def __init__(self, name=None): 368 Source.__init__(self, name=name) 369 self.__fdr,self.__fdw = os.pipe() 370 self._queue = collections.deque() 371 def fileno(self): 372 return self.__fdr 373 def check_recv(self): 374 return len(self._queue) > 0 375 def _gen_data(self, msg): 376 self._queue.append((msg,False)) 377 self._wake_up() 378 def _gen_high_data(self, msg): 379 self._queue.append((msg,True)) 380 self._wake_up() 381 def _wake_up(self): 382 os.write(self.__fdw, b"X") 383 self.call_release() 384 def deliver(self): 385 os.read(self.__fdr,1) 386 try: 387 msg,high = self._queue.popleft() 388 except IndexError: #empty queue. Exhausted source 389 pass 390 else: 391 if high: 392 self._high_send(msg) 393 else: 394 self._send(msg) 395 396class ThreadGenSource(AutoSource): 397 def __init__(self, name=None): 398 AutoSource.__init__(self, name=name) 399 self.RUN = False 400 def generate(self): 401 pass 402 def start(self): 403 self.RUN = True 404 Thread(target=self.generate).start() 405 def stop(self): 406 self.RUN = False 407 408 409 410class ConsoleSink(Sink): 411 """Print messages on low and high entries 412 +-------+ 413 >>-|--. |->> 414 | print | 415 >-|--' |-> 416 +-------+ 417""" 418 def push(self, msg): 419 print(">%r" % msg) 420 def high_push(self, msg): 421 print(">>%r" % msg) 422 423class RawConsoleSink(Sink): 424 """Print messages on low and high entries 425 +-------+ 426 >>-|--. |->> 427 | write | 428 >-|--' |-> 429 +-------+ 430""" 431 def __init__(self, name=None, newlines=True): 432 Sink.__init__(self, name=name) 433 self.newlines = newlines 434 self._write_pipe = 1 435 def push(self, msg): 436 if self.newlines: 437 msg += "\n" 438 os.write(self._write_pipe, msg.encode("utf8")) 439 def high_push(self, msg): 440 if self.newlines: 441 msg += "\n" 442 os.write(self._write_pipe, msg.encode("utf8")) 443 444class CLIFeeder(AutoSource): 445 """Send messages from python command line 446 +--------+ 447 >>-| |->> 448 | send() | 449 >-| `----|-> 450 +--------+ 451""" 452 def send(self, msg): 453 self._gen_data(msg) 454 def close(self): 455 self.is_exhausted = True 456 457class CLIHighFeeder(CLIFeeder): 458 """Send messages from python command line to high output 459 +--------+ 460 >>-| .----|->> 461 | send() | 462 >-| |-> 463 +--------+ 464""" 465 def send(self, msg): 466 self._gen_high_data(msg) 467 468 469class PeriodicSource(ThreadGenSource): 470 """Generage messages periodically on low exit 471 +-------+ 472 >>-| |->> 473 | msg,T | 474 >-| `----|-> 475 +-------+ 476""" 477 def __init__(self, msg, period, period2=0, name=None): 478 ThreadGenSource.__init__(self,name=name) 479 if not isinstance(msg, (list, set, tuple)): 480 msg=[msg] 481 self.msg = msg 482 self.period = period 483 self.period2 = period2 484 def generate(self): 485 while self.RUN: 486 empty_gen = True 487 for m in self.msg: 488 empty_gen = False 489 self._gen_data(m) 490 time.sleep(self.period) 491 if empty_gen: 492 self.is_exhausted = True 493 self._wake_up() 494 time.sleep(self.period2) 495 496class TermSink(Sink): 497 """Print messages on low and high entries on a separate terminal 498 +-------+ 499 >>-|--. |->> 500 | print | 501 >-|--' |-> 502 +-------+ 503""" 504 def __init__(self, name=None, keepterm=True, newlines=True, openearly=True): 505 Sink.__init__(self, name=name) 506 self.keepterm = keepterm 507 self.newlines = newlines 508 self.openearly = openearly 509 self.opened = False 510 if self.openearly: 511 self.start() 512 def _start_windows(self): 513 if not self.opened: 514 self.opened = True 515 self.__f = get_temp_file() 516 open(self.__f, "a").close() 517 self.name = "Scapy" if self.name is None else self.name 518 # Start a powershell in a new window and print the PID 519 cmd = "$app = Start-Process PowerShell -ArgumentList '-command &{$host.ui.RawUI.WindowTitle=\\\"%s\\\";Get-Content \\\"%s\\\" -wait}' -passthru; echo $app.Id" % (self.name, self.__f.replace("\\", "\\\\")) 520 proc = subprocess.Popen([conf.prog.powershell, cmd], stdout=subprocess.PIPE) 521 output, _ = proc.communicate() 522 # This is the process PID 523 self.pid = int(output) 524 print("PID: %d" % self.pid) 525 def _start_unix(self): 526 if not self.opened: 527 self.opened = True 528 rdesc, self.wdesc = os.pipe() 529 cmd = ["xterm"] 530 if self.name is not None: 531 cmd.extend(["-title",self.name]) 532 if self.keepterm: 533 cmd.append("-hold") 534 cmd.extend(["-e", "cat <&%d" % rdesc]) 535 self.proc = subprocess.Popen(cmd, close_fds=False) 536 os.close(rdesc) 537 def start(self): 538 if WINDOWS: 539 return self._start_windows() 540 else: 541 return self._start_unix() 542 def _stop_windows(self): 543 if not self.keepterm: 544 self.opened = False 545 # Recipe to kill process with PID 546 # http://code.activestate.com/recipes/347462-terminating-a-subprocess-on-windows/ 547 import ctypes 548 PROCESS_TERMINATE = 1 549 handle = ctypes.windll.kernel32.OpenProcess(PROCESS_TERMINATE, False, self.pid) 550 ctypes.windll.kernel32.TerminateProcess(handle, -1) 551 ctypes.windll.kernel32.CloseHandle(handle) 552 def _stop_unix(self): 553 if not self.keepterm: 554 self.opened = False 555 self.proc.kill() 556 self.proc.wait() 557 def stop(self): 558 if WINDOWS: 559 return self._stop_windows() 560 else: 561 return self._stop_unix() 562 def _print(self, s): 563 if self.newlines: 564 s+="\n" 565 if WINDOWS: 566 wdesc = open(self.__f, "a") 567 wdesc.write(s) 568 wdesc.close() 569 else: 570 os.write(self.wdesc, s.encode()) 571 def push(self, msg): 572 self._print(str(msg)) 573 def high_push(self, msg): 574 self._print(str(msg)) 575 576 577class QueueSink(Sink): 578 """Collect messages from high and low entries and queue them. Messages are unqueued with the .recv() method. 579 +-------+ 580 >>-|--. |->> 581 | queue | 582 >-|--' |-> 583 +-------+ 584""" 585 def __init__(self, name=None): 586 Sink.__init__(self, name=name) 587 self.q = six.moves.queue.Queue() 588 def push(self, msg): 589 self.q.put(msg) 590 def high_push(self, msg): 591 self.q.put(msg) 592 def recv(self): 593 while True: 594 try: 595 return self.q.get(True, timeout=0.1) 596 except six.moves.queue.Empty: 597 pass 598 599 600class TransformDrain(Drain): 601 """Apply a function to messages on low and high entry 602 +-------+ 603 >>-|--[f]--|->> 604 | | 605 >-|--[f]--|-> 606 +-------+ 607""" 608 def __init__(self, f, name=None): 609 Drain.__init__(self, name=name) 610 self.f = f 611 def push(self, msg): 612 self._send(self.f(msg)) 613 def high_push(self, msg): 614 self._high_send(self.f(msg)) 615 616class UpDrain(Drain): 617 """Repeat messages from low entry to high exit 618 +-------+ 619 >>-| ,--|->> 620 | / | 621 >-|--' |-> 622 +-------+ 623""" 624 def push(self, msg): 625 self._high_send(msg) 626 def high_push(self, msg): 627 pass 628 629class DownDrain(Drain): 630 """Repeat messages from high entry to low exit 631 +-------+ 632 >>-|--. |->> 633 | \ | 634 >-| `--|-> 635 +-------+ 636""" 637 def push(self, msg): 638 pass 639 def high_push(self, msg): 640 self._send(msg) 641