1/*
2 * Conditions Of Use
3 *
4 * This software was developed by employees of the National Institute of
5 * Standards and Technology (NIST), an agency of the Federal Government.
6 * Pursuant to title 15 Untied States Code Section 105, works of NIST
7 * employees are not subject to copyright protection in the United States
8 * and are considered to be in the public domain.  As a result, a formal
9 * license is not needed to use the software.
10 *
11 * This software is provided by NIST as a service and is expressly
12 * provided "AS IS."  NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED
13 * OR STATUTORY, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF
14 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT
15 * AND DATA ACCURACY.  NIST does not warrant or make any representations
16 * regarding the use of the software or the results thereof, including but
17 * not limited to the correctness, accuracy, reliability or usefulness of
18 * the software.
19 *
20 * Permission to use this software is contingent upon your acceptance
21 * of the terms of this agreement
22 *
23 * .
24 *
25 */
26package gov.nist.javax.sip.parser;
27
28import gov.nist.core.InternalErrorHandler;
29import gov.nist.javax.sip.stack.SIPStackTimerTask;
30
31import java.io.*;
32import java.util.*;
33
34/**
35 * Input class for the pipelined parser. Buffer all bytes read from the socket
36 * and make them available to the message parser.
37 *
38 * @author M. Ranganathan (Contains a bug fix contributed by Rob Daugherty (
39 *         Lucent Technologies) )
40 *
41 */
42
43public class Pipeline extends InputStream {
44    private LinkedList buffList;
45
46    private Buffer currentBuffer;
47
48    private boolean isClosed;
49
50    private Timer timer;
51
52    private InputStream pipe;
53
54    private int readTimeout;
55
56    private TimerTask myTimerTask;
57
58    class MyTimer extends SIPStackTimerTask {
59        Pipeline pipeline;
60
61        private boolean isCancelled;
62
63        protected MyTimer(Pipeline pipeline) {
64            this.pipeline = pipeline;
65        }
66
67        protected void runTask() {
68            if (this.isCancelled)
69                return;
70
71            try {
72                pipeline.close();
73            } catch (IOException ex) {
74                InternalErrorHandler.handleException(ex);
75            }
76        }
77
78        public boolean cancel() {
79            boolean retval = super.cancel();
80            this.isCancelled = true;
81            return retval;
82        }
83
84    }
85
86    class Buffer {
87        byte[] bytes;
88
89        int length;
90
91        int ptr;
92
93        public Buffer(byte[] bytes, int length) {
94            ptr = 0;
95            this.length = length;
96            this.bytes = bytes;
97        }
98
99        public int getNextByte() {
100            int retval = bytes[ptr++] & 0xFF;
101            return retval;
102        }
103
104    }
105
106    public void startTimer() {
107        if (this.readTimeout == -1)
108            return;
109        // TODO make this a tunable number. For now 4 seconds
110        // between reads seems reasonable upper limit.
111        this.myTimerTask = new MyTimer(this);
112        this.timer.schedule(this.myTimerTask, this.readTimeout);
113    }
114
115    public void stopTimer() {
116        if (this.readTimeout == -1)
117            return;
118        if (this.myTimerTask != null)
119            this.myTimerTask.cancel();
120    }
121
122    public Pipeline(InputStream pipe, int readTimeout, Timer timer) {
123        // pipe is the Socket stream
124        // this is recorded here to implement a timeout.
125        this.timer = timer;
126        this.pipe = pipe;
127        buffList = new LinkedList();
128        this.readTimeout = readTimeout;
129    }
130
131    public void write(byte[] bytes, int start, int length) throws IOException {
132        if (this.isClosed)
133            throw new IOException("Closed!!");
134        Buffer buff = new Buffer(bytes, length);
135        buff.ptr = start;
136        synchronized (this.buffList) {
137            buffList.add(buff);
138            buffList.notifyAll();
139        }
140    }
141
142    public void write(byte[] bytes) throws IOException {
143        if (this.isClosed)
144            throw new IOException("Closed!!");
145        Buffer buff = new Buffer(bytes, bytes.length);
146        synchronized (this.buffList) {
147            buffList.add(buff);
148            buffList.notifyAll();
149        }
150    }
151
152    public void close() throws IOException {
153        this.isClosed = true;
154        synchronized (this.buffList) {
155            this.buffList.notifyAll();
156        }
157
158        // JvB: added
159        this.pipe.close();
160    }
161
162    public int read() throws IOException {
163        // if (this.isClosed) return -1;
164        synchronized (this.buffList) {
165            if (currentBuffer != null
166                    && currentBuffer.ptr < currentBuffer.length) {
167                int retval = currentBuffer.getNextByte();
168                if (currentBuffer.ptr == currentBuffer.length)
169                    this.currentBuffer = null;
170                return retval;
171            }
172            // Bug fix contributed by Rob Daugherty.
173            if (this.isClosed && this.buffList.isEmpty())
174                return -1;
175            try {
176                // wait till something is posted.
177                while (this.buffList.isEmpty()) {
178                    this.buffList.wait();
179                    if (this.isClosed)
180                        return -1;
181                }
182                currentBuffer = (Buffer) this.buffList.removeFirst();
183                int retval = currentBuffer.getNextByte();
184                if (currentBuffer.ptr == currentBuffer.length)
185                    this.currentBuffer = null;
186                return retval;
187            } catch (InterruptedException ex) {
188                throw new IOException(ex.getMessage());
189            } catch (NoSuchElementException ex) {
190                ex.printStackTrace();
191                throw new IOException(ex.getMessage());
192            }
193        }
194    }
195
196}
197