1/*
2 *  Licensed to the Apache Software Foundation (ASF) under one or more
3 *  contributor license agreements.  See the NOTICE file distributed with
4 *  this work for additional information regarding copyright ownership.
5 *  The ASF licenses this file to You under the Apache License, Version 2.0
6 *  (the "License"); you may not use this file except in compliance with
7 *  the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *  Unless required by applicable law or agreed to in writing, software
12 *  distributed under the License is distributed on an "AS IS" BASIS,
13 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  See the License for the specific language governing permissions and
15 *  limitations under the License.
16 */
17package org.apache.harmony.tests.java.io;
18
19import java.io.IOException;
20import java.io.PipedInputStream;
21import java.io.PipedOutputStream;
22import java.util.concurrent.CountDownLatch;
23
24public class PipedInputStreamTest extends junit.framework.TestCase {
25
26    static class PWriter implements Runnable {
27        PipedOutputStream pos;
28
29        public byte bytes[];
30
31        public void run() {
32            try {
33                pos.write(bytes);
34                synchronized (this) {
35                    notify();
36                }
37            } catch (IOException e) {
38                e.printStackTrace(System.out);
39                System.out.println("Could not write bytes");
40            }
41        }
42
43        public PWriter(PipedOutputStream pout, int nbytes) {
44            pos = pout;
45            bytes = new byte[nbytes];
46            for (int i = 0; i < bytes.length; i++) {
47                bytes[i] = (byte) (System.currentTimeMillis() % 9);
48            }
49        }
50    }
51
52    Thread t;
53
54    PWriter pw;
55
56    PipedInputStream pis;
57
58    PipedOutputStream pos;
59
60    /**
61     * java.io.PipedInputStream#PipedInputStream()
62     */
63    public void test_Constructor() {
64        // Test for method java.io.PipedInputStream()
65        // Used in tests
66    }
67
68    /**
69     * java.io.PipedInputStream#PipedInputStream(java.io.PipedOutputStream)
70     */
71    public void test_ConstructorLjava_io_PipedOutputStream() throws Exception {
72        // Test for method java.io.PipedInputStream(java.io.PipedOutputStream)
73        pis = new PipedInputStream(new PipedOutputStream());
74        pis.available();
75    }
76
77
78    public void test_readException() throws IOException {
79        pis = new PipedInputStream();
80        pos = new PipedOutputStream();
81
82        try {
83            pis.connect(pos);
84            t = new Thread(pw = new PWriter(pos, 1000));
85            t.start();
86            while (true) {
87                pis.read();
88                t.interrupt();
89            }
90        } catch (IOException expected) {
91        } finally {
92            try {
93                pis.close();
94                pos.close();
95            } catch (IOException ee) {
96            }
97        }
98    }
99
100    /**
101     * java.io.PipedInputStream#available()
102     */
103    public void test_available() throws Exception {
104        pis = new PipedInputStream();
105        pos = new PipedOutputStream();
106
107        pis.connect(pos);
108        t = new Thread(pw = new PWriter(pos, 1000));
109        t.start();
110
111        synchronized (pw) {
112            pw.wait(10000);
113        }
114        assertTrue("Available returned incorrect number of bytes: "
115                + pis.available(), pis.available() == 1000);
116
117        PipedInputStream pin = new PipedInputStream();
118        PipedOutputStream pout = new PipedOutputStream(pin);
119        // We know the PipedInputStream buffer size is 1024.
120        // Writing another byte would cause the write to wait
121        // for a read before returning
122        for (int i = 0; i < 1024; i++) {
123            pout.write(i);
124        }
125        assertEquals("Incorrect available count", 1024, pin.available());
126    }
127
128    /**
129     * java.io.PipedInputStream#close()
130     */
131    public void test_close() throws IOException {
132        // Test for method void java.io.PipedInputStream.close()
133        pis = new PipedInputStream();
134        pos = new PipedOutputStream();
135        pis.connect(pos);
136        pis.close();
137        try {
138            pos.write((byte) 127);
139            fail("Failed to throw expected exception");
140        } catch (IOException e) {
141            // The spec for PipedInput saya an exception should be thrown if
142            // a write is attempted to a closed input. The PipedOuput spec
143            // indicates that an exception should be thrown only when the
144            // piped input thread is terminated without closing
145            return;
146        }
147    }
148
149    /**
150     * java.io.PipedInputStream#connect(java.io.PipedOutputStream)
151     */
152    public void test_connectLjava_io_PipedOutputStream() throws Exception {
153        pis = new PipedInputStream();
154        pos = new PipedOutputStream();
155        assertEquals("Non-conected pipe returned non-zero available bytes", 0,
156                pis.available());
157
158        pis.connect(pos);
159        t = new Thread(pw = new PWriter(pos, 1000));
160        t.start();
161
162        synchronized (pw) {
163            pw.wait(10000);
164        }
165        assertEquals("Available returned incorrect number of bytes", 1000, pis
166                .available());
167    }
168
169    /**
170     * java.io.PipedInputStream#read()
171     */
172    public void test_read() throws Exception {
173        pis = new PipedInputStream();
174        pos = new PipedOutputStream();
175
176        pis.connect(pos);
177        t = new Thread(pw = new PWriter(pos, 1000));
178        t.start();
179
180        synchronized (pw) {
181            pw.wait(10000);
182        }
183        assertEquals("Available returned incorrect number of bytes", 1000, pis
184                .available());
185        assertEquals("read returned incorrect byte", pw.bytes[0], (byte) pis
186                .read());
187    }
188
189    /**
190     * java.io.PipedInputStream#read(byte[], int, int)
191     */
192    public void test_read$BII() throws Exception {
193        pis = new PipedInputStream();
194        pos = new PipedOutputStream();
195
196        pis.connect(pos);
197        t = new Thread(pw = new PWriter(pos, 1000));
198        t.start();
199
200        byte[] buf = new byte[400];
201        synchronized (pw) {
202            pw.wait(10000);
203        }
204        assertTrue("Available returned incorrect number of bytes: "
205                + pis.available(), pis.available() == 1000);
206        pis.read(buf, 0, 400);
207        for (int i = 0; i < 400; i++) {
208            assertEquals("read returned incorrect byte[]", pw.bytes[i], buf[i]);
209        }
210    }
211
212    /**
213     * java.io.PipedInputStream#read(byte[], int, int)
214     * Regression for HARMONY-387
215     */
216    public void test_read$BII_2() throws IOException {
217        PipedInputStream obj = new PipedInputStream();
218        try {
219            obj.read(new byte[0], 0, -1);
220            fail();
221        } catch (IndexOutOfBoundsException expected) {
222        }
223    }
224
225    /**
226     * java.io.PipedInputStream#read(byte[], int, int)
227     */
228    public void test_read$BII_3() throws IOException {
229        PipedInputStream obj = new PipedInputStream();
230        try {
231            obj.read(new byte[0], -1, 0);
232            fail();
233        } catch (IndexOutOfBoundsException expected) {
234        }
235    }
236
237    /**
238     * java.io.PipedInputStream#read(byte[], int, int)
239     */
240    public void test_read$BII_4() throws IOException {
241        PipedInputStream obj = new PipedInputStream();
242        try {
243            obj.read(new byte[0], -1, -1);
244            fail();
245        } catch (IndexOutOfBoundsException expected) {
246        }
247    }
248
249    /**
250     * java.io.PipedInputStream#receive(int)
251     */
252    public void test_write_failsAfterReaderDead() throws Exception {
253        pis = new PipedInputStream();
254        pos = new PipedOutputStream();
255
256        // test if writer recognizes dead reader
257        pis.connect(pos);
258
259        class WriteRunnable implements Runnable {
260
261            final CountDownLatch readerAlive = new CountDownLatch(1);
262
263            public void run() {
264                try {
265                    pos.write(1);
266
267                    try {
268                        readerAlive.await();
269                    } catch (InterruptedException ie) {
270                        fail();
271                        return;
272                    }
273
274                    try {
275                        // should throw exception since reader thread
276                        // is now dead
277                        pos.write(1);
278                        fail();
279                    } catch (IOException expected) {
280                    }
281                } catch (IOException e) {
282                }
283            }
284        }
285
286        class ReadRunnable implements Runnable {
287            public void run() {
288                try {
289                    pis.read();
290                } catch (IOException e) {
291                    fail();
292                }
293            }
294        }
295
296        WriteRunnable writeRunnable = new WriteRunnable();
297        Thread writeThread = new Thread(writeRunnable);
298
299        ReadRunnable readRunnable = new ReadRunnable();
300        Thread readThread = new Thread(readRunnable);
301        writeThread.start();
302        readThread.start();
303        readThread.join();
304
305        writeRunnable.readerAlive.countDown();
306        writeThread.join();
307    }
308
309    static final class PipedInputStreamWithPublicReceive extends PipedInputStream {
310        @Override
311        public void receive(int oneByte) throws IOException {
312            super.receive(oneByte);
313        }
314    }
315
316
317    public void test_receive_failsIfWriterClosed() throws Exception {
318        // attempt to write to stream after writer closed
319        PipedInputStreamWithPublicReceive pis = new PipedInputStreamWithPublicReceive();
320
321        pos = new PipedOutputStream();
322        pos.connect(pis);
323        pos.close();
324        try {
325            pis.receive(1);
326            fail();
327        } catch (IOException expected) {
328        }
329    }
330
331    static class Worker extends Thread {
332        PipedOutputStream out;
333
334        Worker(PipedOutputStream pos) {
335            this.out = pos;
336        }
337
338        public void run() {
339            try {
340                out.write(20);
341                out.close();
342                Thread.sleep(5000);
343            } catch (Exception e) {
344            }
345        }
346    }
347
348    public void test_read_after_write_close() throws Exception {
349        PipedInputStream in = new PipedInputStream();
350        PipedOutputStream out = new PipedOutputStream();
351        in.connect(out);
352        Thread worker = new Worker(out);
353        worker.start();
354        Thread.sleep(2000);
355        assertEquals("Should read 20.", 20, in.read());
356        worker.join();
357        assertEquals("Write end is closed, should return -1", -1, in.read());
358        byte[] buf = new byte[1];
359        assertEquals("Write end is closed, should return -1", -1, in.read(buf, 0, 1));
360        assertEquals("Buf len 0 should return first", 0, in.read(buf, 0, 0));
361        in.close();
362        out.close();
363    }
364
365    /**
366     * Tears down the fixture, for example, close a network connection. This
367     * method is called after a test is executed.
368     */
369    protected void tearDown() throws Exception {
370        try {
371            if (t != null) {
372                t.interrupt();
373            }
374        } catch (Exception ignore) {
375        }
376        super.tearDown();
377    }
378
379
380    /**
381     * java.io.PipedInputStream#PipedInputStream(java.io.PipedOutputStream,
382     *int)
383     * @since 1.6
384     */
385    public void test_Constructor_LPipedOutputStream_I() throws Exception {
386        // Test for method java.io.PipedInputStream(java.io.PipedOutputStream,
387        // int)
388        MockPipedInputStream mpis = new MockPipedInputStream(
389                new PipedOutputStream(), 100);
390        int bufferLength = mpis.bufferLength();
391        assertEquals(100, bufferLength);
392
393        try {
394            pis = new PipedInputStream(null, -1);
395            fail("Should throw IllegalArgumentException"); //$NON-NLS-1$
396        } catch (IllegalArgumentException e) {
397            // expected
398        }
399
400        try {
401            pis = new PipedInputStream(null, 0);
402            fail("Should throw IllegalArgumentException"); //$NON-NLS-1$
403        } catch (IllegalArgumentException e) {
404            // expected
405        }
406    }
407
408    /**
409     * java.io.PipedInputStream#PipedInputStream(int)
410     * @since 1.6
411     */
412    public void test_Constructor_I() throws Exception {
413        // Test for method java.io.PipedInputStream(int)
414        MockPipedInputStream mpis = new MockPipedInputStream(100);
415        int bufferLength = mpis.bufferLength();
416        assertEquals(100, bufferLength);
417
418        try {
419            pis = new PipedInputStream(-1);
420            fail("Should throw IllegalArgumentException"); //$NON-NLS-1$
421        } catch (IllegalArgumentException e) {
422            // expected
423        }
424
425        try {
426            pis = new PipedInputStream(0);
427            fail("Should throw IllegalArgumentException"); //$NON-NLS-1$
428        } catch (IllegalArgumentException e) {
429            // expected
430        }
431    }
432
433    static class MockPipedInputStream extends PipedInputStream {
434
435        public MockPipedInputStream(java.io.PipedOutputStream src,
436                int bufferSize) throws IOException {
437            super(src, bufferSize);
438        }
439
440        public MockPipedInputStream(int bufferSize) {
441            super(bufferSize);
442        }
443
444        public int bufferLength() {
445            return super.buffer.length;
446        }
447    }
448}
449