1import pipes
2import os
3import string
4import unittest
5import shutil
6from test.support import TESTFN, run_unittest, unlink, reap_children
7
8if os.name != 'posix':
9    raise unittest.SkipTest('pipes module only works on posix')
10
11TESTFN2 = TESTFN + "2"
12
13# tr a-z A-Z is not portable, so make the ranges explicit
14s_command = 'tr %s %s' % (string.ascii_lowercase, string.ascii_uppercase)
15
16class SimplePipeTests(unittest.TestCase):
17    def tearDown(self):
18        for f in (TESTFN, TESTFN2):
19            unlink(f)
20
21    def testSimplePipe1(self):
22        if shutil.which('tr') is None:
23            self.skipTest('tr is not available')
24        t = pipes.Template()
25        t.append(s_command, pipes.STDIN_STDOUT)
26        f = t.open(TESTFN, 'w')
27        f.write('hello world #1')
28        f.close()
29        with open(TESTFN) as f:
30            self.assertEqual(f.read(), 'HELLO WORLD #1')
31
32    def testSimplePipe2(self):
33        if shutil.which('tr') is None:
34            self.skipTest('tr is not available')
35        with open(TESTFN, 'w') as f:
36            f.write('hello world #2')
37        t = pipes.Template()
38        t.append(s_command + ' < $IN > $OUT', pipes.FILEIN_FILEOUT)
39        t.copy(TESTFN, TESTFN2)
40        with open(TESTFN2) as f:
41            self.assertEqual(f.read(), 'HELLO WORLD #2')
42
43    def testSimplePipe3(self):
44        if shutil.which('tr') is None:
45            self.skipTest('tr is not available')
46        with open(TESTFN, 'w') as f:
47            f.write('hello world #2')
48        t = pipes.Template()
49        t.append(s_command + ' < $IN', pipes.FILEIN_STDOUT)
50        f = t.open(TESTFN, 'r')
51        try:
52            self.assertEqual(f.read(), 'HELLO WORLD #2')
53        finally:
54            f.close()
55
56    def testEmptyPipeline1(self):
57        # copy through empty pipe
58        d = 'empty pipeline test COPY'
59        with open(TESTFN, 'w') as f:
60            f.write(d)
61        with open(TESTFN2, 'w') as f:
62            f.write('')
63        t=pipes.Template()
64        t.copy(TESTFN, TESTFN2)
65        with open(TESTFN2) as f:
66            self.assertEqual(f.read(), d)
67
68    def testEmptyPipeline2(self):
69        # read through empty pipe
70        d = 'empty pipeline test READ'
71        with open(TESTFN, 'w') as f:
72            f.write(d)
73        t=pipes.Template()
74        f = t.open(TESTFN, 'r')
75        try:
76            self.assertEqual(f.read(), d)
77        finally:
78            f.close()
79
80    def testEmptyPipeline3(self):
81        # write through empty pipe
82        d = 'empty pipeline test WRITE'
83        t = pipes.Template()
84        with t.open(TESTFN, 'w') as f:
85            f.write(d)
86        with open(TESTFN) as f:
87            self.assertEqual(f.read(), d)
88
89    def testRepr(self):
90        t = pipes.Template()
91        self.assertEqual(repr(t), "<Template instance, steps=[]>")
92        t.append('tr a-z A-Z', pipes.STDIN_STDOUT)
93        self.assertEqual(repr(t),
94                    "<Template instance, steps=[('tr a-z A-Z', '--')]>")
95
96    def testSetDebug(self):
97        t = pipes.Template()
98        t.debug(False)
99        self.assertEqual(t.debugging, False)
100        t.debug(True)
101        self.assertEqual(t.debugging, True)
102
103    def testReadOpenSink(self):
104        # check calling open('r') on a pipe ending with
105        # a sink raises ValueError
106        t = pipes.Template()
107        t.append('boguscmd', pipes.SINK)
108        self.assertRaises(ValueError, t.open, 'bogusfile', 'r')
109
110    def testWriteOpenSource(self):
111        # check calling open('w') on a pipe ending with
112        # a source raises ValueError
113        t = pipes.Template()
114        t.prepend('boguscmd', pipes.SOURCE)
115        self.assertRaises(ValueError, t.open, 'bogusfile', 'w')
116
117    def testBadAppendOptions(self):
118        t = pipes.Template()
119
120        # try a non-string command
121        self.assertRaises(TypeError, t.append, 7, pipes.STDIN_STDOUT)
122
123        # try a type that isn't recognized
124        self.assertRaises(ValueError, t.append, 'boguscmd', 'xx')
125
126        # shouldn't be able to append a source
127        self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SOURCE)
128
129        # check appending two sinks
130        t = pipes.Template()
131        t.append('boguscmd', pipes.SINK)
132        self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SINK)
133
134        # command needing file input but with no $IN
135        t = pipes.Template()
136        self.assertRaises(ValueError, t.append, 'boguscmd $OUT',
137                           pipes.FILEIN_FILEOUT)
138        t = pipes.Template()
139        self.assertRaises(ValueError, t.append, 'boguscmd',
140                           pipes.FILEIN_STDOUT)
141
142        # command needing file output but with no $OUT
143        t = pipes.Template()
144        self.assertRaises(ValueError, t.append, 'boguscmd $IN',
145                           pipes.FILEIN_FILEOUT)
146        t = pipes.Template()
147        self.assertRaises(ValueError, t.append, 'boguscmd',
148                           pipes.STDIN_FILEOUT)
149
150
151    def testBadPrependOptions(self):
152        t = pipes.Template()
153
154        # try a non-string command
155        self.assertRaises(TypeError, t.prepend, 7, pipes.STDIN_STDOUT)
156
157        # try a type that isn't recognized
158        self.assertRaises(ValueError, t.prepend, 'tr a-z A-Z', 'xx')
159
160        # shouldn't be able to prepend a sink
161        self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SINK)
162
163        # check prepending two sources
164        t = pipes.Template()
165        t.prepend('boguscmd', pipes.SOURCE)
166        self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SOURCE)
167
168        # command needing file input but with no $IN
169        t = pipes.Template()
170        self.assertRaises(ValueError, t.prepend, 'boguscmd $OUT',
171                           pipes.FILEIN_FILEOUT)
172        t = pipes.Template()
173        self.assertRaises(ValueError, t.prepend, 'boguscmd',
174                           pipes.FILEIN_STDOUT)
175
176        # command needing file output but with no $OUT
177        t = pipes.Template()
178        self.assertRaises(ValueError, t.prepend, 'boguscmd $IN',
179                           pipes.FILEIN_FILEOUT)
180        t = pipes.Template()
181        self.assertRaises(ValueError, t.prepend, 'boguscmd',
182                           pipes.STDIN_FILEOUT)
183
184    def testBadOpenMode(self):
185        t = pipes.Template()
186        self.assertRaises(ValueError, t.open, 'bogusfile', 'x')
187
188    def testClone(self):
189        t = pipes.Template()
190        t.append('tr a-z A-Z', pipes.STDIN_STDOUT)
191
192        u = t.clone()
193        self.assertNotEqual(id(t), id(u))
194        self.assertEqual(t.steps, u.steps)
195        self.assertNotEqual(id(t.steps), id(u.steps))
196        self.assertEqual(t.debugging, u.debugging)
197
198def test_main():
199    run_unittest(SimplePipeTests)
200    reap_children()
201
202if __name__ == "__main__":
203    test_main()
204