1######################## 2% Pipetool related tests 3######################## 4 5+ Basic tests 6 7= Test default test case 8 9s = PeriodicSource("hello", 1, name="src") 10d1 = Drain(name="d1") 11c = ConsoleSink(name="c") 12tf = TransformDrain(lambda x: "Got %s" % x) 13t = TermSink(name="PipeToolsPeriodicTest", keepterm=False) 14s > d1 > c 15d1 > tf > t 16 17p = PipeEngine(s) 18p.start() 19time.sleep(3) 20s.msg = [] 21p.stop() 22 23try: 24 os.remove("test.png") 25except OSError: 26 pass 27 28= Test add_pipe 29 30s = AutoSource() 31p = PipeEngine(s) 32p.add(Pipe()) 33assert len(p.active_pipes) == 2 34 35x = p.spawn_Pipe() 36assert len(p.active_pipes) == 3 37assert isinstance(x, Pipe) 38 39= Test exhausted source 40 41s = AutoSource() 42s._gen_data("hello") 43s.is_exhausted = True 44d1 = Drain(name="d1") 45c = ConsoleSink(name="c") 46s > d1 > c 47 48p = PipeEngine(s) 49p.start() 50p.wait_and_stop() 51 52= Test add_pipe on running instance 53 54p = PipeEngine() 55p.start() 56 57s = CLIFeeder() 58 59d1 = Drain(name="d1") 60c = QueueSink(name="c") 61s > d1 > c 62 63p.add(s) 64 65s.send("hello") 66s.send("hi") 67 68assert c.q.get(timeout=5) == "hello" 69assert c.q.get(timeout=5) == "hi" 70 71p.stop() 72 73= Test Operators 74 75s = AutoSource() 76p = PipeEngine(s) 77assert p == p 78 79a = AutoSource() 80b = AutoSource() 81a >> b 82assert len(a.high_sinks) == 1 83assert len(a.high_sources) == 0 84assert len(b.high_sinks) == 0 85assert len(b.high_sources) == 1 86a 87b 88 89a = AutoSource() 90b = AutoSource() 91a << b 92assert len(a.high_sinks) == 0 93assert len(a.high_sources) == 1 94assert len(b.high_sinks) == 1 95assert len(b.high_sources) == 0 96a 97b 98 99a = AutoSource() 100b = AutoSource() 101a == b 102assert len(a.sinks) == 1 103assert len(a.sources) == 1 104assert len(b.sinks) == 1 105assert len(b.sources) == 1 106 107a = AutoSource() 108b = AutoSource() 109a//b 110assert len(a.high_sinks) == 1 111assert len(a.high_sources) == 1 112assert len(b.high_sinks) == 1 113assert len(b.high_sources) == 1 114 115a = AutoSource() 116b = AutoSource() 117a^b 118assert len(b.trigger_sources) == 1 119assert len(a.trigger_sinks) == 1 120 121= Test doc 122 123s = AutoSource() 124p = PipeEngine(s) 125p.list_pipes() 126p.list_pipes_detailed() 127 128= Test RawConsoleSink with CLIFeeder 129 130p = PipeEngine() 131 132s = CLIFeeder() 133s.send("hello") 134s.is_exhausted = True 135 136r, w = os.pipe() 137 138d1 = Drain(name="d1") 139c = RawConsoleSink(name="c") 140c._write_pipe = w 141s > d1 > c 142 143p.add(s) 144p.start() 145 146assert os.read(r, 20) == b"hello\n" 147p.wait_and_stop() 148 149= Test QueueSink with CLIFeeder 150 151p = PipeEngine() 152 153s = CLIFeeder() 154s.send("hello") 155s.is_exhausted = True 156 157d1 = Drain(name="d1") 158c = QueueSink(name="c") 159s > d1 > c 160 161p.add(s) 162p.start() 163 164p.wait_and_stop() 165assert c.recv() == "hello" 166 167= Test UpDrain 168 169test_val = None 170 171class TestSink(Sink): 172 def high_push(self, msg): 173 global test_val 174 test_val = msg 175 176p = PipeEngine() 177 178s = CLIFeeder() 179s.send("hello") 180s.is_exhausted = True 181 182d1 = UpDrain(name="d1") 183c = TestSink(name="c") 184s > d1 185d1 >> c 186 187p.add(s) 188p.start() 189 190p.wait_and_stop() 191assert test_val == "hello" 192 193= Test DownDrain 194 195test_val = None 196 197class TestSink(Sink): 198 def push(self, msg): 199 global test_val 200 test_val = msg 201 202p = PipeEngine() 203 204s = CLIHighFeeder() 205s.send("hello") 206s.is_exhausted = True 207 208d1 = DownDrain(name="d1") 209c = TestSink(name="c") 210s >> d1 211d1 > c 212 213p.add(s) 214p.start() 215 216p.wait_and_stop() 217assert test_val == "hello" 218 219+ Advanced ScapyPipes pipetools tests 220 221= Test SniffSource 222~ netaccess 223 224p = PipeEngine() 225 226s = SniffSource() 227d1 = Drain(name="d1") 228c = QueueSink(name="c") 229s > d1 > c 230 231p.add(s) 232p.start() 233sniff(count=3) 234p.stop() 235assert c.q.get() 236 237= Test exhausted AutoSource and SniffSource 238 239import mock 240from scapy.error import Scapy_Exception 241 242def _fail(): 243 raise Scapy_Exception() 244 245a = AutoSource() 246a._send = mock.MagicMock(side_effect=_fail) 247a._wake_up() 248try: 249 a.deliver() 250except: 251 pass 252 253s = SniffSource() 254s.s = mock.MagicMock() 255s.s.recv = mock.MagicMock(side_effect=_fail) 256try: 257 s.deliver() 258except: 259 pass 260 261= Test RdpcapSource and WrpcapSink 262~ needs_root 263 264req = Ether()/IP()/ICMP() 265rpy = Ether()/IP('E\x00\x00\x1c\x00\x00\x00\x004\x01\x1d\x04\xd8:\xd0\x83\xc0\xa8\x00w\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') 266 267wrpcap("t.pcap", [req, rpy]) 268 269p = PipeEngine() 270 271s = RdpcapSource("t.pcap") 272d1 = Drain(name="d1") 273c = WrpcapSink("t2.pcap", name="c") 274s > d1 > c 275p.add(s) 276p.start() 277p.wait_and_stop() 278 279results = rdpcap("t2.pcap") 280 281assert raw(results[0]) == raw(req) 282assert raw(results[1]) == raw(rpy) 283 284os.unlink("t.pcap") 285os.unlink("t2.pcap") 286 287= Test InjectSink and Inject3Sink 288~ needs_root 289 290import mock 291 292a = IP(dst="192.168.0.1")/ICMP() 293msgs = [] 294 295class FakeSocket(object): 296 def __init__(self, *arg, **karg): 297 pass 298 def close(self): 299 pass 300 def send(self, msg): 301 global msgs 302 msgs.append(msg) 303 304@mock.patch("scapy.scapypipes.conf.L2socket", FakeSocket) 305@mock.patch("scapy.scapypipes.conf.L3socket", FakeSocket) 306def _inject_sink(i3): 307 s = CLIFeeder() 308 s.send(a) 309 s.is_exhausted = True 310 d1 = Drain(name="d1") 311 c = Inject3Sink() if i3 else InjectSink() 312 s > d1 > c 313 p = PipeEngine(s) 314 p.start() 315 p.wait_and_stop() 316 317_inject_sink(False) # InjectSink 318_inject_sink(True) # Inject3Sink 319 320assert msgs == [a,a] 321 322= TriggerDrain and TriggeredValve with CLIFeeder 323 324s = CLIFeeder() 325d1 = TriggerDrain(lambda x:x=="trigger") 326d2 = TriggeredValve() 327c = QueueSink() 328 329s > d1 > d2 > c 330d1 ^ d2 331 332p = PipeEngine(s) 333p.start() 334 335s.send("hello") 336s.send("trigger") 337s.send("hello2") 338s.send("trigger") 339s.send("hello3") 340 341assert c.q.get(timeout=5) == "hello" 342assert c.q.get(timeout=5) == "trigger" 343assert c.q.get(timeout=5) == "hello3" 344 345p.stop() 346 347= TriggerDrain and TriggeredValve with CLIHighFeeder 348 349s = CLIHighFeeder() 350d1 = TriggerDrain(lambda x:x=="trigger") 351d2 = TriggeredValve() 352c = QueueSink() 353 354s >> d1 355d1 >> d2 356d2 >> c 357d1 ^ d2 358 359p = PipeEngine(s) 360p.start() 361 362s.send("hello") 363s.send("trigger") 364s.send("hello2") 365s.send("trigger") 366s.send("hello3") 367 368assert c.q.get(timeout=5) == "hello" 369assert c.q.get(timeout=5) == "trigger" 370assert c.q.get(timeout=5) == "hello3" 371 372p.stop() 373 374= TriggerDrain and TriggeredQueueingValve with CLIFeeder 375 376s = CLIFeeder() 377d1 = TriggerDrain(lambda x:x=="trigger") 378d2 = TriggeredValve() 379c = QueueSink() 380 381s > d1 > d2 > c 382d1 ^ d2 383 384p = PipeEngine(s) 385p.start() 386 387s.send("hello") 388s.send("trigger") 389s.send("hello2") 390s.send("trigger") 391s.send("hello3") 392 393assert c.q.get(timeout=5) == "hello" 394assert c.q.get(timeout=5) == "trigger" 395assert c.q.get(timeout=5) == "hello3" 396 397p.stop() 398 399= TriggerDrain and TriggeredSwitch with CLIFeeder on high channel 400 401s = CLIFeeder() 402d1 = TriggerDrain(lambda x:x=="trigger") 403d2 = TriggeredSwitch() 404c = QueueSink() 405 406s > d1 > d2 407d2 >> c 408d1 ^ d2 409 410p = PipeEngine(s) 411p.start() 412 413s.send("hello") 414s.send("trigger") 415s.send("hello2") 416s.send("trigger") 417s.send("hello3") 418 419assert c.q.get(timeout=5) == "trigger" 420assert c.q.get(timeout=5) == "hello2" 421 422p.stop() 423 424= TriggerDrain and TriggeredSwitch with CLIHighFeeder on low channel 425 426s = CLIHighFeeder() 427d1 = TriggerDrain(lambda x:x=="trigger") 428d2 = TriggeredSwitch() 429c = QueueSink() 430 431s >> d1 432d1 >> d2 433d2 > c 434d1 ^ d2 435 436p = PipeEngine(s) 437p.start() 438 439s.send("hello") 440s.send("trigger") 441s.send("hello2") 442s.send("trigger") 443s.send("hello3") 444 445assert c.q.get(timeout=5) == "hello" 446assert c.q.get(timeout=5) == "trigger" 447assert c.q.get(timeout=5) == "hello3" 448 449p.stop() 450 451= TriggerDrain and TriggeredMessage 452 453s = CLIFeeder() 454d1 = TriggerDrain(lambda x:x=="trigger") 455d2 = TriggeredMessage("hello") 456c = QueueSink() 457 458s > d1 > d2 > c 459d1 ^ d2 460 461p = PipeEngine(s) 462p.start() 463 464s.send("trigger") 465 466r = [c.q.get(timeout=5), c.q.get(timeout=5)] 467assert "hello" in r 468assert "trigger" in r 469 470p.stop() 471 472= TriggerDrain and TriggeredQueueingValve on low channel 473 474p = PipeEngine() 475 476s = CLIFeeder() 477r, w = os.pipe() 478 479d1 = TriggerDrain(lambda x:x=="trigger") 480d2 = TriggeredQueueingValve() 481c = QueueSink(name="c") 482s > d1 > d2 > c 483d1 ^ d2 484 485p.add(s) 486p.start() 487 488s.send("trigger") 489s.send("hello") 490s.send("trigger") 491assert c.q.get(timeout=3) == "trigger" 492assert c.q.get(timeout=3) in ['hello', 'trigger'] 493assert c.q.get(timeout=3) in ['hello', 'trigger'] 494assert d2.q.qsize() == 0 495 496p.stop() 497 498= TriggerDrain and TriggeredQueueingValve on high channel 499 500p = PipeEngine() 501 502s = CLIHighFeeder() 503r, w = os.pipe() 504 505d1 = TriggerDrain(lambda x:x=="trigger") 506d2 = TriggeredQueueingValve() 507c = QueueSink(name="c") 508s >> d1 >> d2 >> c 509d1 ^ d2 510 511p.add(s) 512p.start() 513 514s.send("trigger") 515s.send("hello") 516s.send("trigger") 517assert c.q.get(timeout=3) == "trigger" 518assert c.q.get(timeout=3) == "hello" 519assert d2.q.qsize() == 0 520 521p.stop() 522 523= UDPDrain 524 525p = PipeEngine() 526 527s = CLIFeeder() 528s2 = CLIHighFeeder() 529d1 = UDPDrain() 530c = QueueSink() 531 532s > d1 > c 533s2 >> d1 >> c 534 535p.add(s) 536p.add(s2) 537p.start() 538 539s.send(IP(src="127.0.0.1")/UDP()/DNS()) 540s2.send(DNS()) 541 542res = [c.q.get(timeout=2), c.q.get(timeout=2)] 543assert b'\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00' in res 544res.remove(b'\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00') 545assert DNS in res[0] and res[0][UDP].sport == 1234 546 547p.stop() 548 549= FDSourceSink on a Bunch object 550 551class Bunch: 552 __init__ = lambda self, **kw: setattr(self, '__dict__', kw) 553 554fd = Bunch(write=lambda x: None, read=lambda: "hello", fileno=lambda: None) 555 556s = FDSourceSink(fd) 557d = Drain() 558c = QueueSink() 559s > d > c 560 561assert s.fileno() == None 562s.push("data") 563s.deliver() 564assert c.q.get(timeout=1) == "hello" 565 566= TCPConnectPipe networking test 567~ networking needs_root 568 569p = PipeEngine() 570 571s = CLIFeeder() 572d1 = TCPConnectPipe(addr="www.google.fr", port=80) 573c = QueueSink() 574 575s > d1 > c 576 577p.add(s) 578p.start() 579 580s.send(b"GET http://www.google.fr/search?q=scapy&start=1&num=1\n") 581result = c.q.get(timeout=10) 582p.stop() 583 584assert result.startswith(b"HTTP/1.0 200 OK") 585