1/*
2 * Copyright (C) 2011 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package libcore.java.io;
18
19import java.io.IOException;
20import java.io.InputStream;
21import java.io.InterruptedIOException;
22import java.io.OutputStream;
23import java.io.PipedInputStream;
24import java.io.PipedOutputStream;
25import java.io.PipedReader;
26import java.io.PipedWriter;
27import java.net.InetSocketAddress;
28import java.net.Socket;
29import java.nio.ByteBuffer;
30import java.nio.channels.ClosedByInterruptException;
31import java.nio.channels.ClosedChannelException;
32import java.nio.channels.Pipe;
33import java.nio.channels.ReadableByteChannel;
34import java.nio.channels.ServerSocketChannel;
35import java.nio.channels.SocketChannel;
36import java.nio.channels.WritableByteChannel;
37import junit.framework.TestCase;
38
39/**
40 * Test that interrupting a thread blocked on I/O causes that thread to throw
41 * an InterruptedIOException.
42 */
43public final class InterruptedStreamTest extends TestCase {
44
45    private static final int BUFFER_SIZE = 1024 * 1024;
46
47    private Socket[] sockets;
48
49    @Override protected void setUp() throws Exception {
50        // Clear the interrupted bit to make sure an earlier test did
51        // not leave us in a bad state.
52        Thread.interrupted();
53        super.tearDown();
54    }
55
56    @Override protected void tearDown() throws Exception {
57        if (sockets != null) {
58            sockets[0].close();
59            sockets[1].close();
60        }
61        Thread.interrupted(); // clear interrupted bit
62        super.tearDown();
63    }
64
65    public void testInterruptPipedInputStream() throws Exception {
66        PipedOutputStream out = new PipedOutputStream();
67        PipedInputStream in = new PipedInputStream(out);
68        testInterruptInputStream(in);
69    }
70
71    public void testInterruptPipedOutputStream() throws Exception {
72        PipedOutputStream out = new PipedOutputStream();
73        new PipedInputStream(out);
74        testInterruptOutputStream(out);
75    }
76
77    public void testInterruptPipedReader() throws Exception {
78        PipedWriter writer = new PipedWriter();
79        PipedReader reader = new PipedReader(writer);
80        testInterruptReader(reader);
81    }
82
83    public void testInterruptPipedWriter() throws Exception {
84        final PipedWriter writer = new PipedWriter();
85        new PipedReader(writer);
86        testInterruptWriter(writer);
87    }
88
89    public void testInterruptReadablePipeChannel() throws Exception {
90        testInterruptReadableChannel(Pipe.open().source());
91    }
92
93    public void testInterruptWritablePipeChannel() throws Exception {
94        testInterruptWritableChannel(Pipe.open().sink());
95    }
96
97    public void testInterruptReadableSocketChannel() throws Exception {
98        sockets = newSocketChannelPair();
99        testInterruptReadableChannel(sockets[0].getChannel());
100    }
101
102    public void testInterruptWritableSocketChannel() throws Exception {
103        sockets = newSocketChannelPair();
104        testInterruptWritableChannel(sockets[0].getChannel());
105    }
106
107    /**
108     * Returns a pair of connected sockets backed by NIO socket channels.
109     */
110    private Socket[] newSocketChannelPair() throws IOException {
111        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
112        serverSocketChannel.socket().bind(new InetSocketAddress(0));
113        SocketChannel clientSocketChannel = SocketChannel.open();
114        clientSocketChannel.connect(serverSocketChannel.socket().getLocalSocketAddress());
115        SocketChannel server = serverSocketChannel.accept();
116        serverSocketChannel.close();
117        return new Socket[] { clientSocketChannel.socket(), server.socket() };
118    }
119
120    private void testInterruptInputStream(final InputStream in) throws Exception {
121        Thread thread = interruptMeLater();
122        try {
123            in.read();
124            fail();
125        } catch (InterruptedIOException expected) {
126        } finally {
127            confirmInterrupted(thread);
128        }
129    }
130
131    private void testInterruptReader(final PipedReader reader) throws Exception {
132        Thread thread = interruptMeLater();
133        try {
134            reader.read();
135            fail();
136        } catch (InterruptedIOException expected) {
137        } finally {
138            confirmInterrupted(thread);
139        }
140    }
141
142    private void testInterruptReadableChannel(final ReadableByteChannel channel) throws Exception {
143        Thread thread = interruptMeLater();
144        try {
145            channel.read(ByteBuffer.allocate(BUFFER_SIZE));
146            fail();
147        } catch (ClosedByInterruptException expected) {
148        } finally {
149            confirmInterrupted(thread);
150        }
151    }
152
153    private void testInterruptOutputStream(final OutputStream out) throws Exception {
154        Thread thread = interruptMeLater();
155        try {
156            // this will block when the receiving buffer fills up
157            while (true) {
158                out.write(new byte[BUFFER_SIZE]);
159            }
160        } catch (InterruptedIOException expected) {
161        } finally {
162            confirmInterrupted(thread);
163        }
164    }
165
166    private void testInterruptWriter(final PipedWriter writer) throws Exception {
167        Thread thread = interruptMeLater();
168        try {
169            // this will block when the receiving buffer fills up
170            while (true) {
171                writer.write(new char[BUFFER_SIZE]);
172            }
173        } catch (InterruptedIOException expected) {
174        } finally {
175            confirmInterrupted(thread);
176        }
177    }
178
179    private void testInterruptWritableChannel(final WritableByteChannel channel) throws Exception {
180        Thread thread = interruptMeLater();
181        try {
182            // this will block when the receiving buffer fills up
183            while (true) {
184                channel.write(ByteBuffer.allocate(BUFFER_SIZE));
185            }
186        } catch (ClosedByInterruptException expected) {
187        } catch (ClosedChannelException expected) {
188        } finally {
189            confirmInterrupted(thread);
190        }
191    }
192
193    private Thread interruptMeLater() throws Exception {
194        final Thread toInterrupt = Thread.currentThread();
195        Thread thread = new Thread(new Runnable () {
196            @Override public void run() {
197                try {
198                    Thread.sleep(1000);
199                } catch (InterruptedException ex) {
200                }
201                toInterrupt.interrupt();
202            }
203        });
204        thread.start();
205        return thread;
206    }
207
208    private static void confirmInterrupted(Thread thread) throws InterruptedException {
209        // validate and clear interrupted bit before join
210        assertTrue(Thread.interrupted());
211        thread.join();
212    }
213}
214