1import pipes
2import os
3import string
4import unittest
5from test.test_support import TESTFN, run_unittest, unlink, reap_children
6
7if os.name != 'posix':
8    raise unittest.SkipTest('pipes module only works on posix')
9
10TESTFN2 = TESTFN + "2"
11
12# tr a-z A-Z is not portable, so make the ranges explicit
13s_command = 'tr %s %s' % (string.ascii_lowercase, string.ascii_uppercase)
14
15class SimplePipeTests(unittest.TestCase):
16    def tearDown(self):
17        for f in (TESTFN, TESTFN2):
18            unlink(f)
19
20    def testSimplePipe1(self):
21        t = pipes.Template()
22        t.append(s_command, pipes.STDIN_STDOUT)
23        f = t.open(TESTFN, 'w')
24        f.write('hello world #1')
25        f.close()
26        with open(TESTFN) as f:
27            self.assertEqual(f.read(), 'HELLO WORLD #1')
28
29    def testSimplePipe2(self):
30        with open(TESTFN, 'w') as f:
31            f.write('hello world #2')
32        t = pipes.Template()
33        t.append(s_command + ' < $IN > $OUT', pipes.FILEIN_FILEOUT)
34        t.copy(TESTFN, TESTFN2)
35        with open(TESTFN2) as f:
36            self.assertEqual(f.read(), 'HELLO WORLD #2')
37
38    def testSimplePipe3(self):
39        with open(TESTFN, 'w') as f:
40            f.write('hello world #2')
41        t = pipes.Template()
42        t.append(s_command + ' < $IN', pipes.FILEIN_STDOUT)
43        with t.open(TESTFN, 'r') as f:
44            self.assertEqual(f.read(), 'HELLO WORLD #2')
45
46    def testEmptyPipeline1(self):
47        # copy through empty pipe
48        d = 'empty pipeline test COPY'
49        with open(TESTFN, 'w') as f:
50            f.write(d)
51        with open(TESTFN2, 'w') as f:
52            f.write('')
53        t=pipes.Template()
54        t.copy(TESTFN, TESTFN2)
55        with open(TESTFN2) as f:
56            self.assertEqual(f.read(), d)
57
58    def testEmptyPipeline2(self):
59        # read through empty pipe
60        d = 'empty pipeline test READ'
61        with open(TESTFN, 'w') as f:
62            f.write(d)
63        t=pipes.Template()
64        with t.open(TESTFN, 'r') as f:
65            self.assertEqual(f.read(), d)
66
67    def testEmptyPipeline3(self):
68        # write through empty pipe
69        d = 'empty pipeline test WRITE'
70        t = pipes.Template()
71        with t.open(TESTFN, 'w') as f:
72            f.write(d)
73        with open(TESTFN) as f:
74            self.assertEqual(f.read(), d)
75
76    def testQuoting(self):
77        safeunquoted = string.ascii_letters + string.digits + '@%_-+=:,./'
78        unsafe = '"`$\\!'
79
80        self.assertEqual(pipes.quote(''), "''")
81        self.assertEqual(pipes.quote(safeunquoted), safeunquoted)
82        self.assertEqual(pipes.quote('test file name'), "'test file name'")
83        for u in unsafe:
84            self.assertEqual(pipes.quote('test%sname' % u),
85                              "'test%sname'" % u)
86        for u in unsafe:
87            self.assertEqual(pipes.quote("test%s'name'" % u),
88                             "'test%s'\"'\"'name'\"'\"''" % u)
89
90    def testRepr(self):
91        t = pipes.Template()
92        self.assertEqual(repr(t), "<Template instance, steps=[]>")
93        t.append('tr a-z A-Z', pipes.STDIN_STDOUT)
94        self.assertEqual(repr(t),
95                    "<Template instance, steps=[('tr a-z A-Z', '--')]>")
96
97    def testSetDebug(self):
98        t = pipes.Template()
99        t.debug(False)
100        self.assertEqual(t.debugging, False)
101        t.debug(True)
102        self.assertEqual(t.debugging, True)
103
104    def testReadOpenSink(self):
105        # check calling open('r') on a pipe ending with
106        # a sink raises ValueError
107        t = pipes.Template()
108        t.append('boguscmd', pipes.SINK)
109        self.assertRaises(ValueError, t.open, 'bogusfile', 'r')
110
111    def testWriteOpenSource(self):
112        # check calling open('w') on a pipe ending with
113        # a source raises ValueError
114        t = pipes.Template()
115        t.prepend('boguscmd', pipes.SOURCE)
116        self.assertRaises(ValueError, t.open, 'bogusfile', 'w')
117
118    def testBadAppendOptions(self):
119        t = pipes.Template()
120
121        # try a non-string command
122        self.assertRaises(TypeError, t.append, 7, pipes.STDIN_STDOUT)
123
124        # try a type that isn't recognized
125        self.assertRaises(ValueError, t.append, 'boguscmd', 'xx')
126
127        # shouldn't be able to append a source
128        self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SOURCE)
129
130        # check appending two sinks
131        t = pipes.Template()
132        t.append('boguscmd', pipes.SINK)
133        self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SINK)
134
135        # command needing file input but with no $IN
136        t = pipes.Template()
137        self.assertRaises(ValueError, t.append, 'boguscmd $OUT',
138                           pipes.FILEIN_FILEOUT)
139        t = pipes.Template()
140        self.assertRaises(ValueError, t.append, 'boguscmd',
141                           pipes.FILEIN_STDOUT)
142
143        # command needing file output but with no $OUT
144        t = pipes.Template()
145        self.assertRaises(ValueError, t.append, 'boguscmd $IN',
146                           pipes.FILEIN_FILEOUT)
147        t = pipes.Template()
148        self.assertRaises(ValueError, t.append, 'boguscmd',
149                           pipes.STDIN_FILEOUT)
150
151
152    def testBadPrependOptions(self):
153        t = pipes.Template()
154
155        # try a non-string command
156        self.assertRaises(TypeError, t.prepend, 7, pipes.STDIN_STDOUT)
157
158        # try a type that isn't recognized
159        self.assertRaises(ValueError, t.prepend, 'tr a-z A-Z', 'xx')
160
161        # shouldn't be able to prepend a sink
162        self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SINK)
163
164        # check prepending two sources
165        t = pipes.Template()
166        t.prepend('boguscmd', pipes.SOURCE)
167        self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SOURCE)
168
169        # command needing file input but with no $IN
170        t = pipes.Template()
171        self.assertRaises(ValueError, t.prepend, 'boguscmd $OUT',
172                           pipes.FILEIN_FILEOUT)
173        t = pipes.Template()
174        self.assertRaises(ValueError, t.prepend, 'boguscmd',
175                           pipes.FILEIN_STDOUT)
176
177        # command needing file output but with no $OUT
178        t = pipes.Template()
179        self.assertRaises(ValueError, t.prepend, 'boguscmd $IN',
180                           pipes.FILEIN_FILEOUT)
181        t = pipes.Template()
182        self.assertRaises(ValueError, t.prepend, 'boguscmd',
183                           pipes.STDIN_FILEOUT)
184
185    def testBadOpenMode(self):
186        t = pipes.Template()
187        self.assertRaises(ValueError, t.open, 'bogusfile', 'x')
188
189    def testClone(self):
190        t = pipes.Template()
191        t.append('tr a-z A-Z', pipes.STDIN_STDOUT)
192
193        u = t.clone()
194        self.assertNotEqual(id(t), id(u))
195        self.assertEqual(t.steps, u.steps)
196        self.assertNotEqual(id(t.steps), id(u.steps))
197        self.assertEqual(t.debugging, u.debugging)
198
199def test_main():
200    run_unittest(SimplePipeTests)
201    reap_children()
202
203if __name__ == "__main__":
204    test_main()
205