1/* 2 * Copyright (C) 2011 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17package com.squareup.okhttp.internal.spdy; 18 19import com.squareup.okhttp.internal.Util; 20import java.io.IOException; 21import java.io.InputStream; 22import java.io.InterruptedIOException; 23import java.io.OutputStream; 24import java.net.SocketTimeoutException; 25import java.util.ArrayList; 26import java.util.List; 27 28import static com.squareup.okhttp.internal.Util.checkOffsetAndCount; 29import static com.squareup.okhttp.internal.Util.pokeInt; 30import static java.nio.ByteOrder.BIG_ENDIAN; 31 32/** A logical bidirectional stream. */ 33public final class SpdyStream { 34 35 // Internal state is guarded by this. No long-running or potentially 36 // blocking operations are performed while the lock is held. 37 38 private static final int DATA_FRAME_HEADER_LENGTH = 8; 39 40 private static final String[] STATUS_CODE_NAMES = { 41 null, 42 "PROTOCOL_ERROR", 43 "INVALID_STREAM", 44 "REFUSED_STREAM", 45 "UNSUPPORTED_VERSION", 46 "CANCEL", 47 "INTERNAL_ERROR", 48 "FLOW_CONTROL_ERROR", 49 "STREAM_IN_USE", 50 "STREAM_ALREADY_CLOSED", 51 "INVALID_CREDENTIALS", 52 "FRAME_TOO_LARGE" 53 }; 54 55 public static final int RST_PROTOCOL_ERROR = 1; 56 public static final int RST_INVALID_STREAM = 2; 57 public static final int RST_REFUSED_STREAM = 3; 58 public static final int RST_UNSUPPORTED_VERSION = 4; 59 public static final int RST_CANCEL = 5; 60 public static final int RST_INTERNAL_ERROR = 6; 61 public static final int RST_FLOW_CONTROL_ERROR = 7; 62 public static final int RST_STREAM_IN_USE = 8; 63 public static final int RST_STREAM_ALREADY_CLOSED = 9; 64 public static final int RST_INVALID_CREDENTIALS = 10; 65 public static final int RST_FRAME_TOO_LARGE = 11; 66 67 /** 68 * The number of unacknowledged bytes at which the input stream will send 69 * the peer a {@code WINDOW_UPDATE} frame. Must be less than this client's 70 * window size, otherwise the remote peer will stop sending data on this 71 * stream. (Chrome 25 uses 5 MiB.) 72 */ 73 public static final int WINDOW_UPDATE_THRESHOLD = Settings.DEFAULT_INITIAL_WINDOW_SIZE / 2; 74 75 private final int id; 76 private final SpdyConnection connection; 77 private final int priority; 78 private final int slot; 79 private long readTimeoutMillis = 0; 80 private int writeWindowSize; 81 82 /** Headers sent by the stream initiator. Immutable and non null. */ 83 private final List<String> requestHeaders; 84 85 /** Headers sent in the stream reply. Null if reply is either not sent or not sent yet. */ 86 private List<String> responseHeaders; 87 88 private final SpdyDataInputStream in = new SpdyDataInputStream(); 89 private final SpdyDataOutputStream out = new SpdyDataOutputStream(); 90 91 /** 92 * The reason why this stream was abnormally closed. If there are multiple 93 * reasons to abnormally close this stream (such as both peers closing it 94 * near-simultaneously) then this is the first reason known to this peer. 95 */ 96 private int rstStatusCode = -1; 97 98 SpdyStream(int id, SpdyConnection connection, int flags, int priority, int slot, 99 List<String> requestHeaders, Settings settings) { 100 if (connection == null) throw new NullPointerException("connection == null"); 101 if (requestHeaders == null) throw new NullPointerException("requestHeaders == null"); 102 this.id = id; 103 this.connection = connection; 104 this.priority = priority; 105 this.slot = slot; 106 this.requestHeaders = requestHeaders; 107 108 if (isLocallyInitiated()) { 109 // I am the sender 110 in.finished = (flags & SpdyConnection.FLAG_UNIDIRECTIONAL) != 0; 111 out.finished = (flags & SpdyConnection.FLAG_FIN) != 0; 112 } else { 113 // I am the receiver 114 in.finished = (flags & SpdyConnection.FLAG_FIN) != 0; 115 out.finished = (flags & SpdyConnection.FLAG_UNIDIRECTIONAL) != 0; 116 } 117 118 setSettings(settings); 119 } 120 121 /** 122 * Returns true if this stream is open. A stream is open until either: 123 * <ul> 124 * <li>A {@code SYN_RESET} frame abnormally terminates the stream. 125 * <li>Both input and output streams have transmitted all data and 126 * headers. 127 * </ul> 128 * Note that the input stream may continue to yield data even after a stream 129 * reports itself as not open. This is because input data is buffered. 130 */ 131 public synchronized boolean isOpen() { 132 if (rstStatusCode != -1) { 133 return false; 134 } 135 if ((in.finished || in.closed) && (out.finished || out.closed) && responseHeaders != null) { 136 return false; 137 } 138 return true; 139 } 140 141 /** Returns true if this stream was created by this peer. */ 142 public boolean isLocallyInitiated() { 143 boolean streamIsClient = (id % 2 == 1); 144 return connection.client == streamIsClient; 145 } 146 147 public SpdyConnection getConnection() { 148 return connection; 149 } 150 151 public List<String> getRequestHeaders() { 152 return requestHeaders; 153 } 154 155 /** 156 * Returns the stream's response headers, blocking if necessary if they 157 * have not been received yet. 158 */ 159 public synchronized List<String> getResponseHeaders() throws IOException { 160 try { 161 while (responseHeaders == null && rstStatusCode == -1) { 162 wait(); 163 } 164 if (responseHeaders != null) { 165 return responseHeaders; 166 } 167 throw new IOException("stream was reset: " + rstStatusString()); 168 } catch (InterruptedException e) { 169 InterruptedIOException rethrow = new InterruptedIOException(); 170 rethrow.initCause(e); 171 throw rethrow; 172 } 173 } 174 175 /** 176 * Returns the reason why this stream was closed, or -1 if it closed 177 * normally or has not yet been closed. Valid reasons are {@link 178 * #RST_PROTOCOL_ERROR}, {@link #RST_INVALID_STREAM}, {@link 179 * #RST_REFUSED_STREAM}, {@link #RST_UNSUPPORTED_VERSION}, {@link 180 * #RST_CANCEL}, {@link #RST_INTERNAL_ERROR} and {@link 181 * #RST_FLOW_CONTROL_ERROR}. 182 */ 183 public synchronized int getRstStatusCode() { 184 return rstStatusCode; 185 } 186 187 /** 188 * Sends a reply to an incoming stream. 189 * 190 * @param out true to create an output stream that we can use to send data 191 * to the remote peer. Corresponds to {@code FLAG_FIN}. 192 */ 193 public void reply(List<String> responseHeaders, boolean out) throws IOException { 194 assert (!Thread.holdsLock(SpdyStream.this)); 195 int flags = 0; 196 synchronized (this) { 197 if (responseHeaders == null) { 198 throw new NullPointerException("responseHeaders == null"); 199 } 200 if (isLocallyInitiated()) { 201 throw new IllegalStateException("cannot reply to a locally initiated stream"); 202 } 203 if (this.responseHeaders != null) { 204 throw new IllegalStateException("reply already sent"); 205 } 206 this.responseHeaders = responseHeaders; 207 if (!out) { 208 this.out.finished = true; 209 flags |= SpdyConnection.FLAG_FIN; 210 } 211 } 212 connection.writeSynReply(id, flags, responseHeaders); 213 } 214 215 /** 216 * Sets the maximum time to wait on input stream reads before failing with a 217 * {@code SocketTimeoutException}, or {@code 0} to wait indefinitely. 218 */ 219 public void setReadTimeout(long readTimeoutMillis) { 220 this.readTimeoutMillis = readTimeoutMillis; 221 } 222 223 public long getReadTimeoutMillis() { 224 return readTimeoutMillis; 225 } 226 227 /** Returns an input stream that can be used to read data from the peer. */ 228 public InputStream getInputStream() { 229 return in; 230 } 231 232 /** 233 * Returns an output stream that can be used to write data to the peer. 234 * 235 * @throws IllegalStateException if this stream was initiated by the peer 236 * and a {@link #reply} has not yet been sent. 237 */ 238 public OutputStream getOutputStream() { 239 synchronized (this) { 240 if (responseHeaders == null && !isLocallyInitiated()) { 241 throw new IllegalStateException("reply before requesting the output stream"); 242 } 243 } 244 return out; 245 } 246 247 /** 248 * Abnormally terminate this stream. This blocks until the {@code RST_STREAM} 249 * frame has been transmitted. 250 */ 251 public void close(int rstStatusCode) throws IOException { 252 if (!closeInternal(rstStatusCode)) { 253 return; // Already closed. 254 } 255 connection.writeSynReset(id, rstStatusCode); 256 } 257 258 /** 259 * Abnormally terminate this stream. This enqueues a {@code RST_STREAM} 260 * frame and returns immediately. 261 */ 262 public void closeLater(int rstStatusCode) { 263 if (!closeInternal(rstStatusCode)) { 264 return; // Already closed. 265 } 266 connection.writeSynResetLater(id, rstStatusCode); 267 } 268 269 /** Returns true if this stream was closed. */ 270 private boolean closeInternal(int rstStatusCode) { 271 assert (!Thread.holdsLock(this)); 272 synchronized (this) { 273 if (this.rstStatusCode != -1) { 274 return false; 275 } 276 if (in.finished && out.finished) { 277 return false; 278 } 279 this.rstStatusCode = rstStatusCode; 280 notifyAll(); 281 } 282 connection.removeStream(id); 283 return true; 284 } 285 286 void receiveReply(List<String> strings) throws IOException { 287 assert (!Thread.holdsLock(SpdyStream.this)); 288 boolean streamInUseError = false; 289 boolean open = true; 290 synchronized (this) { 291 if (isLocallyInitiated() && responseHeaders == null) { 292 responseHeaders = strings; 293 open = isOpen(); 294 notifyAll(); 295 } else { 296 streamInUseError = true; 297 } 298 } 299 if (streamInUseError) { 300 closeLater(SpdyStream.RST_STREAM_IN_USE); 301 } else if (!open) { 302 connection.removeStream(id); 303 } 304 } 305 306 void receiveHeaders(List<String> headers) throws IOException { 307 assert (!Thread.holdsLock(SpdyStream.this)); 308 boolean protocolError = false; 309 synchronized (this) { 310 if (responseHeaders != null) { 311 List<String> newHeaders = new ArrayList<String>(); 312 newHeaders.addAll(responseHeaders); 313 newHeaders.addAll(headers); 314 this.responseHeaders = newHeaders; 315 } else { 316 protocolError = true; 317 } 318 } 319 if (protocolError) { 320 closeLater(SpdyStream.RST_PROTOCOL_ERROR); 321 } 322 } 323 324 void receiveData(InputStream in, int length) throws IOException { 325 assert (!Thread.holdsLock(SpdyStream.this)); 326 this.in.receive(in, length); 327 } 328 329 void receiveFin() { 330 assert (!Thread.holdsLock(SpdyStream.this)); 331 boolean open; 332 synchronized (this) { 333 this.in.finished = true; 334 open = isOpen(); 335 notifyAll(); 336 } 337 if (!open) { 338 connection.removeStream(id); 339 } 340 } 341 342 synchronized void receiveRstStream(int statusCode) { 343 if (rstStatusCode == -1) { 344 rstStatusCode = statusCode; 345 notifyAll(); 346 } 347 } 348 349 private void setSettings(Settings settings) { 350 assert (Thread.holdsLock(connection)); // Because 'settings' is guarded by 'connection'. 351 this.writeWindowSize = 352 settings != null ? settings.getInitialWindowSize(Settings.DEFAULT_INITIAL_WINDOW_SIZE) 353 : Settings.DEFAULT_INITIAL_WINDOW_SIZE; 354 } 355 356 void receiveSettings(Settings settings) { 357 assert (Thread.holdsLock(this)); 358 setSettings(settings); 359 notifyAll(); 360 } 361 362 synchronized void receiveWindowUpdate(int deltaWindowSize) { 363 out.unacknowledgedBytes -= deltaWindowSize; 364 notifyAll(); 365 } 366 367 private String rstStatusString() { 368 return rstStatusCode > 0 && rstStatusCode < STATUS_CODE_NAMES.length 369 ? STATUS_CODE_NAMES[rstStatusCode] : Integer.toString(rstStatusCode); 370 } 371 372 int getPriority() { 373 return priority; 374 } 375 376 int getSlot() { 377 return slot; 378 } 379 380 /** 381 * An input stream that reads the incoming data frames of a stream. Although 382 * this class uses synchronization to safely receive incoming data frames, 383 * it is not intended for use by multiple readers. 384 */ 385 private final class SpdyDataInputStream extends InputStream { 386 // Store incoming data bytes in a circular buffer. When the buffer is 387 // empty, pos == -1. Otherwise pos is the first byte to read and limit 388 // is the first byte to write. 389 // 390 // { - - - X X X X - - - } 391 // ^ ^ 392 // pos limit 393 // 394 // { X X X - - - - X X X } 395 // ^ ^ 396 // limit pos 397 398 private final byte[] buffer = new byte[Settings.DEFAULT_INITIAL_WINDOW_SIZE]; 399 400 /** the next byte to be read, or -1 if the buffer is empty. Never buffer.length */ 401 private int pos = -1; 402 403 /** the last byte to be read. Never buffer.length */ 404 private int limit; 405 406 /** True if the caller has closed this stream. */ 407 private boolean closed; 408 409 /** 410 * True if either side has cleanly shut down this stream. We will 411 * receive no more bytes beyond those already in the buffer. 412 */ 413 private boolean finished; 414 415 /** 416 * The total number of bytes consumed by the application (with {@link 417 * #read}), but not yet acknowledged by sending a {@code WINDOW_UPDATE} 418 * frame. 419 */ 420 private int unacknowledgedBytes = 0; 421 422 @Override public int available() throws IOException { 423 synchronized (SpdyStream.this) { 424 checkNotClosed(); 425 if (pos == -1) { 426 return 0; 427 } else if (limit > pos) { 428 return limit - pos; 429 } else { 430 return limit + (buffer.length - pos); 431 } 432 } 433 } 434 435 @Override public int read() throws IOException { 436 return Util.readSingleByte(this); 437 } 438 439 @Override public int read(byte[] b, int offset, int count) throws IOException { 440 synchronized (SpdyStream.this) { 441 checkOffsetAndCount(b.length, offset, count); 442 waitUntilReadable(); 443 checkNotClosed(); 444 445 if (pos == -1) { 446 return -1; 447 } 448 449 int copied = 0; 450 451 // drain from [pos..buffer.length) 452 if (limit <= pos) { 453 int bytesToCopy = Math.min(count, buffer.length - pos); 454 System.arraycopy(buffer, pos, b, offset, bytesToCopy); 455 pos += bytesToCopy; 456 copied += bytesToCopy; 457 if (pos == buffer.length) { 458 pos = 0; 459 } 460 } 461 462 // drain from [pos..limit) 463 if (copied < count) { 464 int bytesToCopy = Math.min(limit - pos, count - copied); 465 System.arraycopy(buffer, pos, b, offset + copied, bytesToCopy); 466 pos += bytesToCopy; 467 copied += bytesToCopy; 468 } 469 470 // Flow control: notify the peer that we're ready for more data! 471 unacknowledgedBytes += copied; 472 if (unacknowledgedBytes >= WINDOW_UPDATE_THRESHOLD) { 473 connection.writeWindowUpdateLater(id, unacknowledgedBytes); 474 unacknowledgedBytes = 0; 475 } 476 477 if (pos == limit) { 478 pos = -1; 479 limit = 0; 480 } 481 482 return copied; 483 } 484 } 485 486 /** 487 * Returns once the input stream is either readable or finished. Throws 488 * a {@link SocketTimeoutException} if the read timeout elapses before 489 * that happens. 490 */ 491 private void waitUntilReadable() throws IOException { 492 long start = 0; 493 long remaining = 0; 494 if (readTimeoutMillis != 0) { 495 start = (System.nanoTime() / 1000000); 496 remaining = readTimeoutMillis; 497 } 498 try { 499 while (pos == -1 && !finished && !closed && rstStatusCode == -1) { 500 if (readTimeoutMillis == 0) { 501 SpdyStream.this.wait(); 502 } else if (remaining > 0) { 503 SpdyStream.this.wait(remaining); 504 remaining = start + readTimeoutMillis - (System.nanoTime() / 1000000); 505 } else { 506 throw new SocketTimeoutException(); 507 } 508 } 509 } catch (InterruptedException e) { 510 throw new InterruptedIOException(); 511 } 512 } 513 514 void receive(InputStream in, int byteCount) throws IOException { 515 assert (!Thread.holdsLock(SpdyStream.this)); 516 517 if (byteCount == 0) { 518 return; 519 } 520 521 int pos; 522 int limit; 523 int firstNewByte; 524 boolean finished; 525 boolean flowControlError; 526 synchronized (SpdyStream.this) { 527 finished = this.finished; 528 pos = this.pos; 529 firstNewByte = this.limit; 530 limit = this.limit; 531 flowControlError = byteCount > buffer.length - available(); 532 } 533 534 // If the peer sends more data than we can handle, discard it and close the connection. 535 if (flowControlError) { 536 Util.skipByReading(in, byteCount); 537 closeLater(SpdyStream.RST_FLOW_CONTROL_ERROR); 538 return; 539 } 540 541 // Discard data received after the stream is finished. It's probably a benign race. 542 if (finished) { 543 Util.skipByReading(in, byteCount); 544 return; 545 } 546 547 // Fill the buffer without holding any locks. First fill [limit..buffer.length) if that 548 // won't overwrite unread data. Then fill [limit..pos). We can't hold a lock, otherwise 549 // writes will be blocked until reads complete. 550 if (pos < limit) { 551 int firstCopyCount = Math.min(byteCount, buffer.length - limit); 552 Util.readFully(in, buffer, limit, firstCopyCount); 553 limit += firstCopyCount; 554 byteCount -= firstCopyCount; 555 if (limit == buffer.length) { 556 limit = 0; 557 } 558 } 559 if (byteCount > 0) { 560 Util.readFully(in, buffer, limit, byteCount); 561 limit += byteCount; 562 } 563 564 synchronized (SpdyStream.this) { 565 // Update the new limit, and mark the position as readable if necessary. 566 this.limit = limit; 567 if (this.pos == -1) { 568 this.pos = firstNewByte; 569 SpdyStream.this.notifyAll(); 570 } 571 } 572 } 573 574 @Override public void close() throws IOException { 575 synchronized (SpdyStream.this) { 576 closed = true; 577 SpdyStream.this.notifyAll(); 578 } 579 cancelStreamIfNecessary(); 580 } 581 582 private void checkNotClosed() throws IOException { 583 if (closed) { 584 throw new IOException("stream closed"); 585 } 586 if (rstStatusCode != -1) { 587 throw new IOException("stream was reset: " + rstStatusString()); 588 } 589 } 590 } 591 592 private void cancelStreamIfNecessary() throws IOException { 593 assert (!Thread.holdsLock(SpdyStream.this)); 594 boolean open; 595 boolean cancel; 596 synchronized (this) { 597 cancel = !in.finished && in.closed && (out.finished || out.closed); 598 open = isOpen(); 599 } 600 if (cancel) { 601 // RST this stream to prevent additional data from being sent. This 602 // is safe because the input stream is closed (we won't use any 603 // further bytes) and the output stream is either finished or closed 604 // (so RSTing both streams doesn't cause harm). 605 SpdyStream.this.close(RST_CANCEL); 606 } else if (!open) { 607 connection.removeStream(id); 608 } 609 } 610 611 /** 612 * An output stream that writes outgoing data frames of a stream. This class 613 * is not thread safe. 614 */ 615 private final class SpdyDataOutputStream extends OutputStream { 616 private final byte[] buffer = new byte[8192]; 617 private int pos = DATA_FRAME_HEADER_LENGTH; 618 619 /** True if the caller has closed this stream. */ 620 private boolean closed; 621 622 /** 623 * True if either side has cleanly shut down this stream. We shall send 624 * no more bytes. 625 */ 626 private boolean finished; 627 628 /** 629 * The total number of bytes written out to the peer, but not yet 630 * acknowledged with an incoming {@code WINDOW_UPDATE} frame. Writes 631 * block if they cause this to exceed the {@code WINDOW_SIZE}. 632 */ 633 private int unacknowledgedBytes = 0; 634 635 @Override public void write(int b) throws IOException { 636 Util.writeSingleByte(this, b); 637 } 638 639 @Override public void write(byte[] bytes, int offset, int count) throws IOException { 640 assert (!Thread.holdsLock(SpdyStream.this)); 641 checkOffsetAndCount(bytes.length, offset, count); 642 checkNotClosed(); 643 644 while (count > 0) { 645 if (pos == buffer.length) { 646 writeFrame(false); 647 } 648 int bytesToCopy = Math.min(count, buffer.length - pos); 649 System.arraycopy(bytes, offset, buffer, pos, bytesToCopy); 650 pos += bytesToCopy; 651 offset += bytesToCopy; 652 count -= bytesToCopy; 653 } 654 } 655 656 @Override public void flush() throws IOException { 657 assert (!Thread.holdsLock(SpdyStream.this)); 658 checkNotClosed(); 659 if (pos > DATA_FRAME_HEADER_LENGTH) { 660 writeFrame(false); 661 connection.flush(); 662 } 663 } 664 665 @Override public void close() throws IOException { 666 assert (!Thread.holdsLock(SpdyStream.this)); 667 synchronized (SpdyStream.this) { 668 if (closed) { 669 return; 670 } 671 closed = true; 672 } 673 writeFrame(true); 674 connection.flush(); 675 cancelStreamIfNecessary(); 676 } 677 678 private void writeFrame(boolean last) throws IOException { 679 assert (!Thread.holdsLock(SpdyStream.this)); 680 681 int length = pos - DATA_FRAME_HEADER_LENGTH; 682 synchronized (SpdyStream.this) { 683 waitUntilWritable(length, last); 684 unacknowledgedBytes += length; 685 } 686 int flags = 0; 687 if (last) { 688 flags |= SpdyConnection.FLAG_FIN; 689 } 690 pokeInt(buffer, 0, id & 0x7fffffff, BIG_ENDIAN); 691 pokeInt(buffer, 4, (flags & 0xff) << 24 | length & 0xffffff, BIG_ENDIAN); 692 connection.writeFrame(buffer, 0, pos); 693 pos = DATA_FRAME_HEADER_LENGTH; 694 } 695 696 /** 697 * Returns once the peer is ready to receive {@code count} bytes. 698 * 699 * @throws IOException if the stream was finished or closed, or the 700 * thread was interrupted. 701 */ 702 private void waitUntilWritable(int count, boolean last) throws IOException { 703 try { 704 while (unacknowledgedBytes + count >= writeWindowSize) { 705 SpdyStream.this.wait(); // Wait until we receive a WINDOW_UPDATE. 706 707 // The stream may have been closed or reset while we were waiting! 708 if (!last && closed) { 709 throw new IOException("stream closed"); 710 } else if (finished) { 711 throw new IOException("stream finished"); 712 } else if (rstStatusCode != -1) { 713 throw new IOException("stream was reset: " + rstStatusString()); 714 } 715 } 716 } catch (InterruptedException e) { 717 throw new InterruptedIOException(); 718 } 719 } 720 721 private void checkNotClosed() throws IOException { 722 synchronized (SpdyStream.this) { 723 if (closed) { 724 throw new IOException("stream closed"); 725 } else if (finished) { 726 throw new IOException("stream finished"); 727 } else if (rstStatusCode != -1) { 728 throw new IOException("stream was reset: " + rstStatusString()); 729 } 730 } 731 } 732 } 733} 734