1/*
2 * Copyright (C) 2011 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.squareup.okhttp.internal.spdy;
18
19import com.squareup.okhttp.internal.Util;
20import java.io.IOException;
21import java.io.InputStream;
22import java.io.InterruptedIOException;
23import java.io.OutputStream;
24import java.net.SocketTimeoutException;
25import java.util.ArrayList;
26import java.util.List;
27
28import static com.squareup.okhttp.internal.Util.checkOffsetAndCount;
29import static com.squareup.okhttp.internal.Util.pokeInt;
30import static java.nio.ByteOrder.BIG_ENDIAN;
31
32/** A logical bidirectional stream. */
33public final class SpdyStream {
34
35  // Internal state is guarded by this. No long-running or potentially
36  // blocking operations are performed while the lock is held.
37
38  private static final int DATA_FRAME_HEADER_LENGTH = 8;
39
40  private static final String[] STATUS_CODE_NAMES = {
41      null,
42      "PROTOCOL_ERROR",
43      "INVALID_STREAM",
44      "REFUSED_STREAM",
45      "UNSUPPORTED_VERSION",
46      "CANCEL",
47      "INTERNAL_ERROR",
48      "FLOW_CONTROL_ERROR",
49      "STREAM_IN_USE",
50      "STREAM_ALREADY_CLOSED",
51      "INVALID_CREDENTIALS",
52      "FRAME_TOO_LARGE"
53  };
54
55  public static final int RST_PROTOCOL_ERROR = 1;
56  public static final int RST_INVALID_STREAM = 2;
57  public static final int RST_REFUSED_STREAM = 3;
58  public static final int RST_UNSUPPORTED_VERSION = 4;
59  public static final int RST_CANCEL = 5;
60  public static final int RST_INTERNAL_ERROR = 6;
61  public static final int RST_FLOW_CONTROL_ERROR = 7;
62  public static final int RST_STREAM_IN_USE = 8;
63  public static final int RST_STREAM_ALREADY_CLOSED = 9;
64  public static final int RST_INVALID_CREDENTIALS = 10;
65  public static final int RST_FRAME_TOO_LARGE = 11;
66
67  /**
68   * The number of unacknowledged bytes at which the input stream will send
69   * the peer a {@code WINDOW_UPDATE} frame. Must be less than this client's
70   * window size, otherwise the remote peer will stop sending data on this
71   * stream. (Chrome 25 uses 5 MiB.)
72   */
73  public static final int WINDOW_UPDATE_THRESHOLD = Settings.DEFAULT_INITIAL_WINDOW_SIZE / 2;
74
75  private final int id;
76  private final SpdyConnection connection;
77  private final int priority;
78  private final int slot;
79  private long readTimeoutMillis = 0;
80  private int writeWindowSize;
81
82  /** Headers sent by the stream initiator. Immutable and non null. */
83  private final List<String> requestHeaders;
84
85  /** Headers sent in the stream reply. Null if reply is either not sent or not sent yet. */
86  private List<String> responseHeaders;
87
88  private final SpdyDataInputStream in = new SpdyDataInputStream();
89  private final SpdyDataOutputStream out = new SpdyDataOutputStream();
90
91  /**
92   * The reason why this stream was abnormally closed. If there are multiple
93   * reasons to abnormally close this stream (such as both peers closing it
94   * near-simultaneously) then this is the first reason known to this peer.
95   */
96  private int rstStatusCode = -1;
97
98  SpdyStream(int id, SpdyConnection connection, int flags, int priority, int slot,
99      List<String> requestHeaders, Settings settings) {
100    if (connection == null) throw new NullPointerException("connection == null");
101    if (requestHeaders == null) throw new NullPointerException("requestHeaders == null");
102    this.id = id;
103    this.connection = connection;
104    this.priority = priority;
105    this.slot = slot;
106    this.requestHeaders = requestHeaders;
107
108    if (isLocallyInitiated()) {
109      // I am the sender
110      in.finished = (flags & SpdyConnection.FLAG_UNIDIRECTIONAL) != 0;
111      out.finished = (flags & SpdyConnection.FLAG_FIN) != 0;
112    } else {
113      // I am the receiver
114      in.finished = (flags & SpdyConnection.FLAG_FIN) != 0;
115      out.finished = (flags & SpdyConnection.FLAG_UNIDIRECTIONAL) != 0;
116    }
117
118    setSettings(settings);
119  }
120
121  /**
122   * Returns true if this stream is open. A stream is open until either:
123   * <ul>
124   * <li>A {@code SYN_RESET} frame abnormally terminates the stream.
125   * <li>Both input and output streams have transmitted all data and
126   * headers.
127   * </ul>
128   * Note that the input stream may continue to yield data even after a stream
129   * reports itself as not open. This is because input data is buffered.
130   */
131  public synchronized boolean isOpen() {
132    if (rstStatusCode != -1) {
133      return false;
134    }
135    if ((in.finished || in.closed) && (out.finished || out.closed) && responseHeaders != null) {
136      return false;
137    }
138    return true;
139  }
140
141  /** Returns true if this stream was created by this peer. */
142  public boolean isLocallyInitiated() {
143    boolean streamIsClient = (id % 2 == 1);
144    return connection.client == streamIsClient;
145  }
146
147  public SpdyConnection getConnection() {
148    return connection;
149  }
150
151  public List<String> getRequestHeaders() {
152    return requestHeaders;
153  }
154
155  /**
156   * Returns the stream's response headers, blocking if necessary if they
157   * have not been received yet.
158   */
159  public synchronized List<String> getResponseHeaders() throws IOException {
160    try {
161      while (responseHeaders == null && rstStatusCode == -1) {
162        wait();
163      }
164      if (responseHeaders != null) {
165        return responseHeaders;
166      }
167      throw new IOException("stream was reset: " + rstStatusString());
168    } catch (InterruptedException e) {
169      InterruptedIOException rethrow = new InterruptedIOException();
170      rethrow.initCause(e);
171      throw rethrow;
172    }
173  }
174
175  /**
176   * Returns the reason why this stream was closed, or -1 if it closed
177   * normally or has not yet been closed. Valid reasons are {@link
178   * #RST_PROTOCOL_ERROR}, {@link #RST_INVALID_STREAM}, {@link
179   * #RST_REFUSED_STREAM}, {@link #RST_UNSUPPORTED_VERSION}, {@link
180   * #RST_CANCEL}, {@link #RST_INTERNAL_ERROR} and {@link
181   * #RST_FLOW_CONTROL_ERROR}.
182   */
183  public synchronized int getRstStatusCode() {
184    return rstStatusCode;
185  }
186
187  /**
188   * Sends a reply to an incoming stream.
189   *
190   * @param out true to create an output stream that we can use to send data
191   * to the remote peer. Corresponds to {@code FLAG_FIN}.
192   */
193  public void reply(List<String> responseHeaders, boolean out) throws IOException {
194    assert (!Thread.holdsLock(SpdyStream.this));
195    int flags = 0;
196    synchronized (this) {
197      if (responseHeaders == null) {
198        throw new NullPointerException("responseHeaders == null");
199      }
200      if (isLocallyInitiated()) {
201        throw new IllegalStateException("cannot reply to a locally initiated stream");
202      }
203      if (this.responseHeaders != null) {
204        throw new IllegalStateException("reply already sent");
205      }
206      this.responseHeaders = responseHeaders;
207      if (!out) {
208        this.out.finished = true;
209        flags |= SpdyConnection.FLAG_FIN;
210      }
211    }
212    connection.writeSynReply(id, flags, responseHeaders);
213  }
214
215  /**
216   * Sets the maximum time to wait on input stream reads before failing with a
217   * {@code SocketTimeoutException}, or {@code 0} to wait indefinitely.
218   */
219  public void setReadTimeout(long readTimeoutMillis) {
220    this.readTimeoutMillis = readTimeoutMillis;
221  }
222
223  public long getReadTimeoutMillis() {
224    return readTimeoutMillis;
225  }
226
227  /** Returns an input stream that can be used to read data from the peer. */
228  public InputStream getInputStream() {
229    return in;
230  }
231
232  /**
233   * Returns an output stream that can be used to write data to the peer.
234   *
235   * @throws IllegalStateException if this stream was initiated by the peer
236   * and a {@link #reply} has not yet been sent.
237   */
238  public OutputStream getOutputStream() {
239    synchronized (this) {
240      if (responseHeaders == null && !isLocallyInitiated()) {
241        throw new IllegalStateException("reply before requesting the output stream");
242      }
243    }
244    return out;
245  }
246
247  /**
248   * Abnormally terminate this stream. This blocks until the {@code RST_STREAM}
249   * frame has been transmitted.
250   */
251  public void close(int rstStatusCode) throws IOException {
252    if (!closeInternal(rstStatusCode)) {
253      return; // Already closed.
254    }
255    connection.writeSynReset(id, rstStatusCode);
256  }
257
258  /**
259   * Abnormally terminate this stream. This enqueues a {@code RST_STREAM}
260   * frame and returns immediately.
261   */
262  public void closeLater(int rstStatusCode) {
263    if (!closeInternal(rstStatusCode)) {
264      return; // Already closed.
265    }
266    connection.writeSynResetLater(id, rstStatusCode);
267  }
268
269  /** Returns true if this stream was closed. */
270  private boolean closeInternal(int rstStatusCode) {
271    assert (!Thread.holdsLock(this));
272    synchronized (this) {
273      if (this.rstStatusCode != -1) {
274        return false;
275      }
276      if (in.finished && out.finished) {
277        return false;
278      }
279      this.rstStatusCode = rstStatusCode;
280      notifyAll();
281    }
282    connection.removeStream(id);
283    return true;
284  }
285
286  void receiveReply(List<String> strings) throws IOException {
287    assert (!Thread.holdsLock(SpdyStream.this));
288    boolean streamInUseError = false;
289    boolean open = true;
290    synchronized (this) {
291      if (isLocallyInitiated() && responseHeaders == null) {
292        responseHeaders = strings;
293        open = isOpen();
294        notifyAll();
295      } else {
296        streamInUseError = true;
297      }
298    }
299    if (streamInUseError) {
300      closeLater(SpdyStream.RST_STREAM_IN_USE);
301    } else if (!open) {
302      connection.removeStream(id);
303    }
304  }
305
306  void receiveHeaders(List<String> headers) throws IOException {
307    assert (!Thread.holdsLock(SpdyStream.this));
308    boolean protocolError = false;
309    synchronized (this) {
310      if (responseHeaders != null) {
311        List<String> newHeaders = new ArrayList<String>();
312        newHeaders.addAll(responseHeaders);
313        newHeaders.addAll(headers);
314        this.responseHeaders = newHeaders;
315      } else {
316        protocolError = true;
317      }
318    }
319    if (protocolError) {
320      closeLater(SpdyStream.RST_PROTOCOL_ERROR);
321    }
322  }
323
324  void receiveData(InputStream in, int length) throws IOException {
325    assert (!Thread.holdsLock(SpdyStream.this));
326    this.in.receive(in, length);
327  }
328
329  void receiveFin() {
330    assert (!Thread.holdsLock(SpdyStream.this));
331    boolean open;
332    synchronized (this) {
333      this.in.finished = true;
334      open = isOpen();
335      notifyAll();
336    }
337    if (!open) {
338      connection.removeStream(id);
339    }
340  }
341
342  synchronized void receiveRstStream(int statusCode) {
343    if (rstStatusCode == -1) {
344      rstStatusCode = statusCode;
345      notifyAll();
346    }
347  }
348
349  private void setSettings(Settings settings) {
350    assert (Thread.holdsLock(connection)); // Because 'settings' is guarded by 'connection'.
351    this.writeWindowSize =
352        settings != null ? settings.getInitialWindowSize(Settings.DEFAULT_INITIAL_WINDOW_SIZE)
353            : Settings.DEFAULT_INITIAL_WINDOW_SIZE;
354  }
355
356  void receiveSettings(Settings settings) {
357    assert (Thread.holdsLock(this));
358    setSettings(settings);
359    notifyAll();
360  }
361
362  synchronized void receiveWindowUpdate(int deltaWindowSize) {
363    out.unacknowledgedBytes -= deltaWindowSize;
364    notifyAll();
365  }
366
367  private String rstStatusString() {
368    return rstStatusCode > 0 && rstStatusCode < STATUS_CODE_NAMES.length
369        ? STATUS_CODE_NAMES[rstStatusCode] : Integer.toString(rstStatusCode);
370  }
371
372  int getPriority() {
373    return priority;
374  }
375
376  int getSlot() {
377    return slot;
378  }
379
380  /**
381   * An input stream that reads the incoming data frames of a stream. Although
382   * this class uses synchronization to safely receive incoming data frames,
383   * it is not intended for use by multiple readers.
384   */
385  private final class SpdyDataInputStream extends InputStream {
386    // Store incoming data bytes in a circular buffer. When the buffer is
387    // empty, pos == -1. Otherwise pos is the first byte to read and limit
388    // is the first byte to write.
389    //
390    // { - - - X X X X - - - }
391    //         ^       ^
392    //        pos    limit
393    //
394    // { X X X - - - - X X X }
395    //         ^       ^
396    //       limit    pos
397
398    private final byte[] buffer = new byte[Settings.DEFAULT_INITIAL_WINDOW_SIZE];
399
400    /** the next byte to be read, or -1 if the buffer is empty. Never buffer.length */
401    private int pos = -1;
402
403    /** the last byte to be read. Never buffer.length */
404    private int limit;
405
406    /** True if the caller has closed this stream. */
407    private boolean closed;
408
409    /**
410     * True if either side has cleanly shut down this stream. We will
411     * receive no more bytes beyond those already in the buffer.
412     */
413    private boolean finished;
414
415    /**
416     * The total number of bytes consumed by the application (with {@link
417     * #read}), but not yet acknowledged by sending a {@code WINDOW_UPDATE}
418     * frame.
419     */
420    private int unacknowledgedBytes = 0;
421
422    @Override public int available() throws IOException {
423      synchronized (SpdyStream.this) {
424        checkNotClosed();
425        if (pos == -1) {
426          return 0;
427        } else if (limit > pos) {
428          return limit - pos;
429        } else {
430          return limit + (buffer.length - pos);
431        }
432      }
433    }
434
435    @Override public int read() throws IOException {
436      return Util.readSingleByte(this);
437    }
438
439    @Override public int read(byte[] b, int offset, int count) throws IOException {
440      synchronized (SpdyStream.this) {
441        checkOffsetAndCount(b.length, offset, count);
442        waitUntilReadable();
443        checkNotClosed();
444
445        if (pos == -1) {
446          return -1;
447        }
448
449        int copied = 0;
450
451        // drain from [pos..buffer.length)
452        if (limit <= pos) {
453          int bytesToCopy = Math.min(count, buffer.length - pos);
454          System.arraycopy(buffer, pos, b, offset, bytesToCopy);
455          pos += bytesToCopy;
456          copied += bytesToCopy;
457          if (pos == buffer.length) {
458            pos = 0;
459          }
460        }
461
462        // drain from [pos..limit)
463        if (copied < count) {
464          int bytesToCopy = Math.min(limit - pos, count - copied);
465          System.arraycopy(buffer, pos, b, offset + copied, bytesToCopy);
466          pos += bytesToCopy;
467          copied += bytesToCopy;
468        }
469
470        // Flow control: notify the peer that we're ready for more data!
471        unacknowledgedBytes += copied;
472        if (unacknowledgedBytes >= WINDOW_UPDATE_THRESHOLD) {
473          connection.writeWindowUpdateLater(id, unacknowledgedBytes);
474          unacknowledgedBytes = 0;
475        }
476
477        if (pos == limit) {
478          pos = -1;
479          limit = 0;
480        }
481
482        return copied;
483      }
484    }
485
486    /**
487     * Returns once the input stream is either readable or finished. Throws
488     * a {@link SocketTimeoutException} if the read timeout elapses before
489     * that happens.
490     */
491    private void waitUntilReadable() throws IOException {
492      long start = 0;
493      long remaining = 0;
494      if (readTimeoutMillis != 0) {
495        start = (System.nanoTime() / 1000000);
496        remaining = readTimeoutMillis;
497      }
498      try {
499        while (pos == -1 && !finished && !closed && rstStatusCode == -1) {
500          if (readTimeoutMillis == 0) {
501            SpdyStream.this.wait();
502          } else if (remaining > 0) {
503            SpdyStream.this.wait(remaining);
504            remaining = start + readTimeoutMillis - (System.nanoTime() / 1000000);
505          } else {
506            throw new SocketTimeoutException();
507          }
508        }
509      } catch (InterruptedException e) {
510        throw new InterruptedIOException();
511      }
512    }
513
514    void receive(InputStream in, int byteCount) throws IOException {
515      assert (!Thread.holdsLock(SpdyStream.this));
516
517      if (byteCount == 0) {
518        return;
519      }
520
521      int pos;
522      int limit;
523      int firstNewByte;
524      boolean finished;
525      boolean flowControlError;
526      synchronized (SpdyStream.this) {
527        finished = this.finished;
528        pos = this.pos;
529        firstNewByte = this.limit;
530        limit = this.limit;
531        flowControlError = byteCount > buffer.length - available();
532      }
533
534      // If the peer sends more data than we can handle, discard it and close the connection.
535      if (flowControlError) {
536        Util.skipByReading(in, byteCount);
537        closeLater(SpdyStream.RST_FLOW_CONTROL_ERROR);
538        return;
539      }
540
541      // Discard data received after the stream is finished. It's probably a benign race.
542      if (finished) {
543        Util.skipByReading(in, byteCount);
544        return;
545      }
546
547      // Fill the buffer without holding any locks. First fill [limit..buffer.length) if that
548      // won't overwrite unread data. Then fill [limit..pos). We can't hold a lock, otherwise
549      // writes will be blocked until reads complete.
550      if (pos < limit) {
551        int firstCopyCount = Math.min(byteCount, buffer.length - limit);
552        Util.readFully(in, buffer, limit, firstCopyCount);
553        limit += firstCopyCount;
554        byteCount -= firstCopyCount;
555        if (limit == buffer.length) {
556          limit = 0;
557        }
558      }
559      if (byteCount > 0) {
560        Util.readFully(in, buffer, limit, byteCount);
561        limit += byteCount;
562      }
563
564      synchronized (SpdyStream.this) {
565        // Update the new limit, and mark the position as readable if necessary.
566        this.limit = limit;
567        if (this.pos == -1) {
568          this.pos = firstNewByte;
569          SpdyStream.this.notifyAll();
570        }
571      }
572    }
573
574    @Override public void close() throws IOException {
575      synchronized (SpdyStream.this) {
576        closed = true;
577        SpdyStream.this.notifyAll();
578      }
579      cancelStreamIfNecessary();
580    }
581
582    private void checkNotClosed() throws IOException {
583      if (closed) {
584        throw new IOException("stream closed");
585      }
586      if (rstStatusCode != -1) {
587        throw new IOException("stream was reset: " + rstStatusString());
588      }
589    }
590  }
591
592  private void cancelStreamIfNecessary() throws IOException {
593    assert (!Thread.holdsLock(SpdyStream.this));
594    boolean open;
595    boolean cancel;
596    synchronized (this) {
597      cancel = !in.finished && in.closed && (out.finished || out.closed);
598      open = isOpen();
599    }
600    if (cancel) {
601      // RST this stream to prevent additional data from being sent. This
602      // is safe because the input stream is closed (we won't use any
603      // further bytes) and the output stream is either finished or closed
604      // (so RSTing both streams doesn't cause harm).
605      SpdyStream.this.close(RST_CANCEL);
606    } else if (!open) {
607      connection.removeStream(id);
608    }
609  }
610
611  /**
612   * An output stream that writes outgoing data frames of a stream. This class
613   * is not thread safe.
614   */
615  private final class SpdyDataOutputStream extends OutputStream {
616    private final byte[] buffer = new byte[8192];
617    private int pos = DATA_FRAME_HEADER_LENGTH;
618
619    /** True if the caller has closed this stream. */
620    private boolean closed;
621
622    /**
623     * True if either side has cleanly shut down this stream. We shall send
624     * no more bytes.
625     */
626    private boolean finished;
627
628    /**
629     * The total number of bytes written out to the peer, but not yet
630     * acknowledged with an incoming {@code WINDOW_UPDATE} frame. Writes
631     * block if they cause this to exceed the {@code WINDOW_SIZE}.
632     */
633    private int unacknowledgedBytes = 0;
634
635    @Override public void write(int b) throws IOException {
636      Util.writeSingleByte(this, b);
637    }
638
639    @Override public void write(byte[] bytes, int offset, int count) throws IOException {
640      assert (!Thread.holdsLock(SpdyStream.this));
641      checkOffsetAndCount(bytes.length, offset, count);
642      checkNotClosed();
643
644      while (count > 0) {
645        if (pos == buffer.length) {
646          writeFrame(false);
647        }
648        int bytesToCopy = Math.min(count, buffer.length - pos);
649        System.arraycopy(bytes, offset, buffer, pos, bytesToCopy);
650        pos += bytesToCopy;
651        offset += bytesToCopy;
652        count -= bytesToCopy;
653      }
654    }
655
656    @Override public void flush() throws IOException {
657      assert (!Thread.holdsLock(SpdyStream.this));
658      checkNotClosed();
659      if (pos > DATA_FRAME_HEADER_LENGTH) {
660        writeFrame(false);
661        connection.flush();
662      }
663    }
664
665    @Override public void close() throws IOException {
666      assert (!Thread.holdsLock(SpdyStream.this));
667      synchronized (SpdyStream.this) {
668        if (closed) {
669          return;
670        }
671        closed = true;
672      }
673      writeFrame(true);
674      connection.flush();
675      cancelStreamIfNecessary();
676    }
677
678    private void writeFrame(boolean last) throws IOException {
679      assert (!Thread.holdsLock(SpdyStream.this));
680
681      int length = pos - DATA_FRAME_HEADER_LENGTH;
682      synchronized (SpdyStream.this) {
683        waitUntilWritable(length, last);
684        unacknowledgedBytes += length;
685      }
686      int flags = 0;
687      if (last) {
688        flags |= SpdyConnection.FLAG_FIN;
689      }
690      pokeInt(buffer, 0, id & 0x7fffffff, BIG_ENDIAN);
691      pokeInt(buffer, 4, (flags & 0xff) << 24 | length & 0xffffff, BIG_ENDIAN);
692      connection.writeFrame(buffer, 0, pos);
693      pos = DATA_FRAME_HEADER_LENGTH;
694    }
695
696    /**
697     * Returns once the peer is ready to receive {@code count} bytes.
698     *
699     * @throws IOException if the stream was finished or closed, or the
700     * thread was interrupted.
701     */
702    private void waitUntilWritable(int count, boolean last) throws IOException {
703      try {
704        while (unacknowledgedBytes + count >= writeWindowSize) {
705          SpdyStream.this.wait(); // Wait until we receive a WINDOW_UPDATE.
706
707          // The stream may have been closed or reset while we were waiting!
708          if (!last && closed) {
709            throw new IOException("stream closed");
710          } else if (finished) {
711            throw new IOException("stream finished");
712          } else if (rstStatusCode != -1) {
713            throw new IOException("stream was reset: " + rstStatusString());
714          }
715        }
716      } catch (InterruptedException e) {
717        throw new InterruptedIOException();
718      }
719    }
720
721    private void checkNotClosed() throws IOException {
722      synchronized (SpdyStream.this) {
723        if (closed) {
724          throw new IOException("stream closed");
725        } else if (finished) {
726          throw new IOException("stream finished");
727        } else if (rstStatusCode != -1) {
728          throw new IOException("stream was reset: " + rstStatusString());
729        }
730      }
731    }
732  }
733}
734