1########################
2% Pipetool related tests
3########################
4
5+ Basic tests
6
7= Test default test case
8
9s = PeriodicSource("hello", 1, name="src")
10d1 = Drain(name="d1")
11c = ConsoleSink(name="c")
12tf = TransformDrain(lambda x: "Got %s" % x)
13t = TermSink(name="PipeToolsPeriodicTest", keepterm=False)
14s > d1 > c
15d1 > tf > t
16
17p = PipeEngine(s)
18p.start()
19time.sleep(3)
20s.msg = []
21p.stop()
22
23try:
24    os.remove("test.png")
25except OSError:
26    pass
27
28= Test add_pipe
29
30s = AutoSource()
31p = PipeEngine(s)
32p.add(Pipe())
33assert len(p.active_pipes) == 2
34
35x = p.spawn_Pipe()
36assert len(p.active_pipes) == 3
37assert isinstance(x, Pipe)
38
39= Test exhausted source
40
41s = AutoSource()
42s._gen_data("hello")
43s.is_exhausted = True
44d1 = Drain(name="d1")
45c = ConsoleSink(name="c")
46s > d1 > c
47
48p = PipeEngine(s)
49p.start()
50p.wait_and_stop()
51
52= Test add_pipe on running instance
53
54p = PipeEngine()
55p.start()
56
57s = CLIFeeder()
58
59d1 = Drain(name="d1")
60c = QueueSink(name="c")
61s > d1 > c
62
63p.add(s)
64
65s.send("hello")
66s.send("hi")
67
68assert c.q.get(timeout=5) == "hello"
69assert c.q.get(timeout=5) == "hi"
70
71p.stop()
72
73= Test Operators
74
75s = AutoSource()
76p = PipeEngine(s)
77assert p == p
78
79a = AutoSource()
80b = AutoSource()
81a >> b
82assert len(a.high_sinks) == 1
83assert len(a.high_sources) == 0
84assert len(b.high_sinks) == 0
85assert len(b.high_sources) == 1
86a
87b
88
89a = AutoSource()
90b = AutoSource()
91a << b
92assert len(a.high_sinks) == 0
93assert len(a.high_sources) == 1
94assert len(b.high_sinks) == 1
95assert len(b.high_sources) == 0
96a
97b
98
99a = AutoSource()
100b = AutoSource()
101a == b
102assert len(a.sinks) == 1
103assert len(a.sources) == 1
104assert len(b.sinks) == 1
105assert len(b.sources) == 1
106
107a = AutoSource()
108b = AutoSource()
109a//b
110assert len(a.high_sinks) == 1
111assert len(a.high_sources) == 1
112assert len(b.high_sinks) == 1
113assert len(b.high_sources) == 1
114
115a = AutoSource()
116b = AutoSource()
117a^b
118assert len(b.trigger_sources) == 1
119assert len(a.trigger_sinks) == 1
120
121= Test doc
122
123s = AutoSource()
124p = PipeEngine(s)
125p.list_pipes()
126p.list_pipes_detailed()
127
128= Test RawConsoleSink with CLIFeeder
129
130p = PipeEngine()
131
132s = CLIFeeder()
133s.send("hello")
134s.is_exhausted = True
135
136r, w = os.pipe()
137
138d1 = Drain(name="d1")
139c = RawConsoleSink(name="c")
140c._write_pipe = w
141s > d1 > c
142
143p.add(s)
144p.start()
145
146assert os.read(r, 20) == b"hello\n"
147p.wait_and_stop()
148
149= Test QueueSink with CLIFeeder
150
151p = PipeEngine()
152
153s = CLIFeeder()
154s.send("hello")
155s.is_exhausted = True
156
157d1 = Drain(name="d1")
158c = QueueSink(name="c")
159s > d1 > c
160
161p.add(s)
162p.start()
163
164p.wait_and_stop()
165assert c.recv() == "hello"
166
167= Test UpDrain
168
169test_val = None
170
171class TestSink(Sink):
172    def high_push(self, msg):
173        global test_val
174        test_val = msg
175
176p = PipeEngine()
177
178s = CLIFeeder()
179s.send("hello")
180s.is_exhausted = True
181
182d1 = UpDrain(name="d1")
183c = TestSink(name="c")
184s > d1
185d1 >> c
186
187p.add(s)
188p.start()
189
190p.wait_and_stop()
191assert test_val == "hello"
192
193= Test DownDrain
194
195test_val = None
196
197class TestSink(Sink):
198    def push(self, msg):
199        global test_val
200        test_val = msg
201
202p = PipeEngine()
203
204s = CLIHighFeeder()
205s.send("hello")
206s.is_exhausted = True
207
208d1 = DownDrain(name="d1")
209c = TestSink(name="c")
210s >> d1
211d1 > c
212
213p.add(s)
214p.start()
215
216p.wait_and_stop()
217assert test_val == "hello"
218
219+ Advanced ScapyPipes pipetools tests
220
221= Test SniffSource
222~ netaccess
223
224p = PipeEngine()
225
226s = SniffSource()
227d1 = Drain(name="d1")
228c = QueueSink(name="c")
229s > d1 > c
230
231p.add(s)
232p.start()
233sniff(count=3)
234p.stop()
235assert c.q.get()
236
237= Test exhausted AutoSource and SniffSource
238
239import mock
240from scapy.error import Scapy_Exception
241
242def _fail():
243    raise Scapy_Exception()
244
245a = AutoSource()
246a._send = mock.MagicMock(side_effect=_fail)
247a._wake_up()
248try:
249    a.deliver()
250except:
251    pass
252
253s = SniffSource()
254s.s = mock.MagicMock()
255s.s.recv = mock.MagicMock(side_effect=_fail)
256try:
257    s.deliver()
258except:
259    pass
260
261= Test RdpcapSource and WrpcapSink
262~ needs_root
263
264req = Ether()/IP()/ICMP()
265rpy = Ether()/IP('E\x00\x00\x1c\x00\x00\x00\x004\x01\x1d\x04\xd8:\xd0\x83\xc0\xa8\x00w\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
266
267wrpcap("t.pcap", [req, rpy])
268
269p = PipeEngine()
270
271s = RdpcapSource("t.pcap")
272d1 = Drain(name="d1")
273c = WrpcapSink("t2.pcap", name="c")
274s > d1 > c
275p.add(s)
276p.start()
277p.wait_and_stop()
278
279results = rdpcap("t2.pcap")
280
281assert raw(results[0]) == raw(req)
282assert raw(results[1]) == raw(rpy)
283
284os.unlink("t.pcap")
285os.unlink("t2.pcap")
286
287= Test InjectSink and Inject3Sink
288~ needs_root
289
290import mock
291
292a = IP(dst="192.168.0.1")/ICMP()
293msgs = []
294
295class FakeSocket(object):
296    def __init__(self, *arg, **karg):
297        pass
298    def close(self):
299        pass
300    def send(self, msg):
301        global msgs
302        msgs.append(msg)
303
304@mock.patch("scapy.scapypipes.conf.L2socket", FakeSocket)
305@mock.patch("scapy.scapypipes.conf.L3socket", FakeSocket)
306def _inject_sink(i3):
307    s = CLIFeeder()
308    s.send(a)
309    s.is_exhausted = True
310    d1 = Drain(name="d1")
311    c = Inject3Sink() if i3 else InjectSink()
312    s > d1 > c
313    p = PipeEngine(s)
314    p.start()
315    p.wait_and_stop()
316
317_inject_sink(False) # InjectSink
318_inject_sink(True) # Inject3Sink
319
320assert msgs == [a,a]
321
322= TriggerDrain and TriggeredValve with CLIFeeder
323
324s = CLIFeeder()
325d1 = TriggerDrain(lambda x:x=="trigger")
326d2 = TriggeredValve()
327c = QueueSink()
328
329s > d1 > d2 > c
330d1 ^ d2
331
332p = PipeEngine(s)
333p.start()
334
335s.send("hello")
336s.send("trigger")
337s.send("hello2")
338s.send("trigger")
339s.send("hello3")
340
341assert c.q.get(timeout=5) == "hello"
342assert c.q.get(timeout=5) == "trigger"
343assert c.q.get(timeout=5) == "hello3"
344
345p.stop()
346
347= TriggerDrain and TriggeredValve with CLIHighFeeder
348
349s = CLIHighFeeder()
350d1 = TriggerDrain(lambda x:x=="trigger")
351d2 = TriggeredValve()
352c = QueueSink()
353
354s >> d1
355d1 >> d2
356d2 >> c
357d1 ^ d2
358
359p = PipeEngine(s)
360p.start()
361
362s.send("hello")
363s.send("trigger")
364s.send("hello2")
365s.send("trigger")
366s.send("hello3")
367
368assert c.q.get(timeout=5) == "hello"
369assert c.q.get(timeout=5) == "trigger"
370assert c.q.get(timeout=5) == "hello3"
371
372p.stop()
373
374= TriggerDrain and TriggeredQueueingValve with CLIFeeder
375
376s = CLIFeeder()
377d1 = TriggerDrain(lambda x:x=="trigger")
378d2 = TriggeredValve()
379c = QueueSink()
380
381s > d1 > d2 > c
382d1 ^ d2
383
384p = PipeEngine(s)
385p.start()
386
387s.send("hello")
388s.send("trigger")
389s.send("hello2")
390s.send("trigger")
391s.send("hello3")
392
393assert c.q.get(timeout=5) == "hello"
394assert c.q.get(timeout=5) == "trigger"
395assert c.q.get(timeout=5) == "hello3"
396
397p.stop()
398
399= TriggerDrain and TriggeredSwitch with CLIFeeder on high channel
400
401s = CLIFeeder()
402d1 = TriggerDrain(lambda x:x=="trigger")
403d2 = TriggeredSwitch()
404c = QueueSink()
405
406s > d1 > d2
407d2 >> c
408d1 ^ d2
409
410p = PipeEngine(s)
411p.start()
412
413s.send("hello")
414s.send("trigger")
415s.send("hello2")
416s.send("trigger")
417s.send("hello3")
418
419assert c.q.get(timeout=5) == "trigger"
420assert c.q.get(timeout=5) == "hello2"
421
422p.stop()
423
424= TriggerDrain and TriggeredSwitch with CLIHighFeeder on low channel
425
426s = CLIHighFeeder()
427d1 = TriggerDrain(lambda x:x=="trigger")
428d2 = TriggeredSwitch()
429c = QueueSink()
430
431s >> d1
432d1 >> d2
433d2 > c
434d1 ^ d2
435
436p = PipeEngine(s)
437p.start()
438
439s.send("hello")
440s.send("trigger")
441s.send("hello2")
442s.send("trigger")
443s.send("hello3")
444
445assert c.q.get(timeout=5) == "hello"
446assert c.q.get(timeout=5) == "trigger"
447assert c.q.get(timeout=5) == "hello3"
448
449p.stop()
450
451= TriggerDrain and TriggeredMessage
452
453s = CLIFeeder()
454d1 = TriggerDrain(lambda x:x=="trigger")
455d2 = TriggeredMessage("hello")
456c = QueueSink()
457
458s > d1 > d2 > c
459d1 ^ d2
460
461p = PipeEngine(s)
462p.start()
463
464s.send("trigger")
465
466r = [c.q.get(timeout=5), c.q.get(timeout=5)]
467assert "hello" in r
468assert "trigger" in r
469
470p.stop()
471
472= TriggerDrain and TriggeredQueueingValve on low channel
473
474p = PipeEngine()
475
476s = CLIFeeder()
477r, w = os.pipe()
478
479d1 = TriggerDrain(lambda x:x=="trigger")
480d2 = TriggeredQueueingValve()
481c = QueueSink(name="c")
482s > d1 > d2 > c
483d1 ^ d2
484
485p.add(s)
486p.start()
487
488s.send("trigger")
489s.send("hello")
490s.send("trigger")
491assert c.q.get(timeout=3) == "trigger"
492assert c.q.get(timeout=3) in ['hello', 'trigger']
493assert c.q.get(timeout=3) in ['hello', 'trigger']
494assert d2.q.qsize() == 0
495
496p.stop()
497
498= TriggerDrain and TriggeredQueueingValve on high channel
499
500p = PipeEngine()
501
502s = CLIHighFeeder()
503r, w = os.pipe()
504
505d1 = TriggerDrain(lambda x:x=="trigger")
506d2 = TriggeredQueueingValve()
507c = QueueSink(name="c")
508s >> d1 >> d2 >> c
509d1 ^ d2
510
511p.add(s)
512p.start()
513
514s.send("trigger")
515s.send("hello")
516s.send("trigger")
517assert c.q.get(timeout=3) == "trigger"
518assert c.q.get(timeout=3) == "hello"
519assert d2.q.qsize() == 0
520
521p.stop()
522
523= UDPDrain
524
525p = PipeEngine()
526
527s = CLIFeeder()
528s2 = CLIHighFeeder()
529d1 = UDPDrain()
530c = QueueSink()
531
532s > d1 > c
533s2 >> d1 >> c
534
535p.add(s)
536p.add(s2)
537p.start()
538
539s.send(IP(src="127.0.0.1")/UDP()/DNS())
540s2.send(DNS())
541
542res = [c.q.get(timeout=2), c.q.get(timeout=2)]
543assert b'\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00' in res
544res.remove(b'\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00')
545assert DNS in res[0] and res[0][UDP].sport == 1234
546
547p.stop()
548
549= FDSourceSink on a Bunch object
550
551class Bunch:
552    __init__ = lambda self, **kw: setattr(self, '__dict__', kw)
553
554fd = Bunch(write=lambda x: None, read=lambda: "hello", fileno=lambda: None)
555
556s = FDSourceSink(fd)
557d = Drain()
558c = QueueSink()
559s > d > c
560
561assert s.fileno() == None
562s.push("data")
563s.deliver()
564assert c.q.get(timeout=1) == "hello"
565
566= TCPConnectPipe networking test
567~ networking needs_root
568
569p = PipeEngine()
570
571s = CLIFeeder()
572d1 = TCPConnectPipe(addr="www.google.fr", port=80)
573c = QueueSink()
574
575s > d1 > c
576
577p.add(s)
578p.start()
579
580s.send(b"GET http://www.google.fr/search?q=scapy&start=1&num=1\n")
581result = c.q.get(timeout=10)
582p.stop()
583
584assert result.startswith(b"HTTP/1.0 200 OK")
585