SpdyConnection.java revision faf49723fb689c626f69876e718c58018eff8ee7
1/* 2 * Copyright (C) 2011 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17package com.squareup.okhttp.internal.spdy; 18 19import com.squareup.okhttp.internal.NamedRunnable; 20import com.squareup.okhttp.internal.Util; 21import java.io.Closeable; 22import java.io.IOException; 23import java.io.InputStream; 24import java.io.OutputStream; 25import java.net.Socket; 26import java.util.HashMap; 27import java.util.Iterator; 28import java.util.List; 29import java.util.Map; 30import java.util.concurrent.ExecutorService; 31import java.util.concurrent.SynchronousQueue; 32import java.util.concurrent.ThreadPoolExecutor; 33import java.util.concurrent.TimeUnit; 34 35/** 36 * A socket connection to a remote peer. A connection hosts streams which can 37 * send and receive data. 38 * 39 * <p>Many methods in this API are <strong>synchronous:</strong> the call is 40 * completed before the method returns. This is typical for Java but atypical 41 * for SPDY. This is motivated by exception transparency: an IOException that 42 * was triggered by a certain caller can be caught and handled by that caller. 43 */ 44public final class SpdyConnection implements Closeable { 45 46 // Internal state of this connection is guarded by 'this'. No blocking 47 // operations may be performed while holding this lock! 48 // 49 // Socket writes are guarded by spdyWriter. 50 // 51 // Socket reads are unguarded but are only made by the reader thread. 52 // 53 // Certain operations (like SYN_STREAM) need to synchronize on both the 54 // spdyWriter (to do blocking I/O) and this (to create streams). Such 55 // operations must synchronize on 'this' last. This ensures that we never 56 // wait for a blocking operation while holding 'this'. 57 58 static final int FLAG_FIN = 0x1; 59 static final int FLAG_UNIDIRECTIONAL = 0x2; 60 61 static final int TYPE_DATA = 0x0; 62 static final int TYPE_SYN_STREAM = 0x1; 63 static final int TYPE_SYN_REPLY = 0x2; 64 static final int TYPE_RST_STREAM = 0x3; 65 static final int TYPE_SETTINGS = 0x4; 66 static final int TYPE_NOOP = 0x5; 67 static final int TYPE_PING = 0x6; 68 static final int TYPE_GOAWAY = 0x7; 69 static final int TYPE_HEADERS = 0x8; 70 static final int TYPE_WINDOW_UPDATE = 0x9; 71 static final int TYPE_CREDENTIAL = 0x10; 72 static final int VERSION = 3; 73 74 static final int GOAWAY_OK = 0; 75 static final int GOAWAY_PROTOCOL_ERROR = 1; 76 static final int GOAWAY_INTERNAL_ERROR = 2; 77 78 private static final ExecutorService executor = new ThreadPoolExecutor(0, 79 Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), 80 Util.daemonThreadFactory("OkHttp SpdyConnection")); 81 82 /** True if this peer initiated the connection. */ 83 final boolean client; 84 85 /** 86 * User code to run in response to an incoming stream. Callbacks must not be 87 * run on the callback executor. 88 */ 89 private final IncomingStreamHandler handler; 90 private final SpdyReader spdyReader; 91 private final SpdyWriter spdyWriter; 92 93 private final Map<Integer, SpdyStream> streams = new HashMap<Integer, SpdyStream>(); 94 private final String hostName; 95 private int lastGoodStreamId; 96 private int nextStreamId; 97 private boolean shutdown; 98 private long idleStartTimeNs = System.nanoTime(); 99 100 /** Lazily-created map of in-flight pings awaiting a response. Guarded by this. */ 101 private Map<Integer, Ping> pings; 102 private int nextPingId; 103 104 /** Lazily-created settings for this connection. */ 105 Settings settings; 106 107 private SpdyConnection(Builder builder) { 108 client = builder.client; 109 handler = builder.handler; 110 spdyReader = new SpdyReader(builder.in); 111 spdyWriter = new SpdyWriter(builder.out); 112 nextStreamId = builder.client ? 1 : 2; 113 nextPingId = builder.client ? 1 : 2; 114 115 hostName = builder.hostName; 116 117 new Thread(new Reader(), "Spdy Reader " + hostName).start(); 118 } 119 120 /** 121 * Returns the number of {@link SpdyStream#isOpen() open streams} on this 122 * connection. 123 */ 124 public synchronized int openStreamCount() { 125 return streams.size(); 126 } 127 128 private synchronized SpdyStream getStream(int id) { 129 return streams.get(id); 130 } 131 132 synchronized SpdyStream removeStream(int streamId) { 133 SpdyStream stream = streams.remove(streamId); 134 if (stream != null && streams.isEmpty()) { 135 setIdle(true); 136 } 137 return stream; 138 } 139 140 private synchronized void setIdle(boolean value) { 141 idleStartTimeNs = value ? System.nanoTime() : 0L; 142 } 143 144 /** Returns true if this connection is idle. */ 145 public synchronized boolean isIdle() { 146 return idleStartTimeNs != 0L; 147 } 148 149 /** Returns the time in ns when this connection became idle or 0L if connection is not idle. */ 150 public synchronized long getIdleStartTimeNs() { 151 return idleStartTimeNs; 152 } 153 154 /** 155 * Returns a new locally-initiated stream. 156 * 157 * @param out true to create an output stream that we can use to send data 158 * to the remote peer. Corresponds to {@code FLAG_FIN}. 159 * @param in true to create an input stream that the remote peer can use to 160 * send data to us. Corresponds to {@code FLAG_UNIDIRECTIONAL}. 161 */ 162 public SpdyStream newStream(List<String> requestHeaders, boolean out, boolean in) 163 throws IOException { 164 int flags = (out ? 0 : FLAG_FIN) | (in ? 0 : FLAG_UNIDIRECTIONAL); 165 int associatedStreamId = 0; // TODO: permit the caller to specify an associated stream? 166 int priority = 0; // TODO: permit the caller to specify a priority? 167 int slot = 0; // TODO: permit the caller to specify a slot? 168 SpdyStream stream; 169 int streamId; 170 171 synchronized (spdyWriter) { 172 synchronized (this) { 173 if (shutdown) { 174 throw new IOException("shutdown"); 175 } 176 streamId = nextStreamId; 177 nextStreamId += 2; 178 stream = new SpdyStream(streamId, this, flags, priority, slot, requestHeaders, settings); 179 if (stream.isOpen()) { 180 streams.put(streamId, stream); 181 setIdle(false); 182 } 183 } 184 185 spdyWriter.synStream(flags, streamId, associatedStreamId, priority, slot, requestHeaders); 186 } 187 188 return stream; 189 } 190 191 void writeSynReply(int streamId, int flags, List<String> alternating) throws IOException { 192 spdyWriter.synReply(flags, streamId, alternating); 193 } 194 195 /** Writes a complete data frame. */ 196 void writeFrame(byte[] bytes, int offset, int length) throws IOException { 197 synchronized (spdyWriter) { 198 spdyWriter.out.write(bytes, offset, length); 199 } 200 } 201 202 void writeSynResetLater(final int streamId, final int statusCode) { 203 executor.submit(new NamedRunnable("OkHttp SPDY Writer %s stream %d", hostName, streamId) { 204 @Override public void execute() { 205 try { 206 writeSynReset(streamId, statusCode); 207 } catch (IOException ignored) { 208 } 209 } 210 }); 211 } 212 213 void writeSynReset(int streamId, int statusCode) throws IOException { 214 spdyWriter.rstStream(streamId, statusCode); 215 } 216 217 void writeWindowUpdateLater(final int streamId, final int deltaWindowSize) { 218 executor.submit(new NamedRunnable("OkHttp SPDY Writer %s stream %d", hostName, streamId) { 219 @Override public void execute() { 220 try { 221 writeWindowUpdate(streamId, deltaWindowSize); 222 } catch (IOException ignored) { 223 } 224 } 225 }); 226 } 227 228 void writeWindowUpdate(int streamId, int deltaWindowSize) throws IOException { 229 spdyWriter.windowUpdate(streamId, deltaWindowSize); 230 } 231 232 /** 233 * Sends a ping frame to the peer. Use the returned object to await the 234 * ping's response and observe its round trip time. 235 */ 236 public Ping ping() throws IOException { 237 Ping ping = new Ping(); 238 int pingId; 239 synchronized (this) { 240 if (shutdown) { 241 throw new IOException("shutdown"); 242 } 243 pingId = nextPingId; 244 nextPingId += 2; 245 if (pings == null) pings = new HashMap<Integer, Ping>(); 246 pings.put(pingId, ping); 247 } 248 writePing(pingId, ping); 249 return ping; 250 } 251 252 private void writePingLater(final int streamId, final Ping ping) { 253 executor.submit(new NamedRunnable("OkHttp SPDY Writer %s ping %d", hostName, streamId) { 254 @Override public void execute() { 255 try { 256 writePing(streamId, ping); 257 } catch (IOException ignored) { 258 } 259 } 260 }); 261 } 262 263 private void writePing(int id, Ping ping) throws IOException { 264 synchronized (spdyWriter) { 265 // Observe the sent time immediately before performing I/O. 266 if (ping != null) ping.send(); 267 spdyWriter.ping(0, id); 268 } 269 } 270 271 private synchronized Ping removePing(int id) { 272 return pings != null ? pings.remove(id) : null; 273 } 274 275 /** Sends a noop frame to the peer. */ 276 public void noop() throws IOException { 277 spdyWriter.noop(); 278 } 279 280 public void flush() throws IOException { 281 synchronized (spdyWriter) { 282 spdyWriter.out.flush(); 283 } 284 } 285 286 /** 287 * Degrades this connection such that new streams can neither be created 288 * locally, nor accepted from the remote peer. Existing streams are not 289 * impacted. This is intended to permit an endpoint to gracefully stop 290 * accepting new requests without harming previously established streams. 291 * 292 * @param statusCode one of {@link #GOAWAY_OK}, {@link 293 * #GOAWAY_INTERNAL_ERROR} or {@link #GOAWAY_PROTOCOL_ERROR}. 294 */ 295 public void shutdown(int statusCode) throws IOException { 296 synchronized (spdyWriter) { 297 int lastGoodStreamId; 298 synchronized (this) { 299 if (shutdown) { 300 return; 301 } 302 shutdown = true; 303 lastGoodStreamId = this.lastGoodStreamId; 304 } 305 spdyWriter.goAway(0, lastGoodStreamId, statusCode); 306 } 307 } 308 309 /** 310 * Closes this connection. This cancels all open streams and unanswered 311 * pings. It closes the underlying input and output streams and shuts down 312 * internal executor services. 313 */ 314 @Override public void close() throws IOException { 315 close(GOAWAY_OK, SpdyStream.RST_CANCEL); 316 } 317 318 private void close(int shutdownStatusCode, int rstStatusCode) throws IOException { 319 assert (!Thread.holdsLock(this)); 320 IOException thrown = null; 321 try { 322 shutdown(shutdownStatusCode); 323 } catch (IOException e) { 324 thrown = e; 325 } 326 327 SpdyStream[] streamsToClose = null; 328 Ping[] pingsToCancel = null; 329 synchronized (this) { 330 if (!streams.isEmpty()) { 331 streamsToClose = streams.values().toArray(new SpdyStream[streams.size()]); 332 streams.clear(); 333 setIdle(false); 334 } 335 if (pings != null) { 336 pingsToCancel = pings.values().toArray(new Ping[pings.size()]); 337 pings = null; 338 } 339 } 340 341 if (streamsToClose != null) { 342 for (SpdyStream stream : streamsToClose) { 343 try { 344 stream.close(rstStatusCode); 345 } catch (IOException e) { 346 if (thrown != null) thrown = e; 347 } 348 } 349 } 350 351 if (pingsToCancel != null) { 352 for (Ping ping : pingsToCancel) { 353 ping.cancel(); 354 } 355 } 356 357 try { 358 spdyReader.close(); 359 } catch (IOException e) { 360 thrown = e; 361 } 362 try { 363 spdyWriter.close(); 364 } catch (IOException e) { 365 if (thrown == null) thrown = e; 366 } 367 368 if (thrown != null) throw thrown; 369 } 370 371 public static class Builder { 372 private String hostName; 373 private InputStream in; 374 private OutputStream out; 375 private IncomingStreamHandler handler = IncomingStreamHandler.REFUSE_INCOMING_STREAMS; 376 public boolean client; 377 378 public Builder(boolean client, Socket socket) throws IOException { 379 this("", client, socket.getInputStream(), socket.getOutputStream()); 380 } 381 382 public Builder(boolean client, InputStream in, OutputStream out) { 383 this("", client, in, out); 384 } 385 386 /** 387 * @param client true if this peer initiated the connection; false if 388 * this peer accepted the connection. 389 */ 390 public Builder(String hostName, boolean client, Socket socket) throws IOException { 391 this(hostName, client, socket.getInputStream(), socket.getOutputStream()); 392 } 393 394 /** 395 * @param client true if this peer initiated the connection; false if this 396 * peer accepted the connection. 397 */ 398 public Builder(String hostName, boolean client, InputStream in, OutputStream out) { 399 this.hostName = hostName; 400 this.client = client; 401 this.in = in; 402 this.out = out; 403 } 404 405 public Builder handler(IncomingStreamHandler handler) { 406 this.handler = handler; 407 return this; 408 } 409 410 public SpdyConnection build() { 411 return new SpdyConnection(this); 412 } 413 } 414 415 private class Reader implements Runnable, SpdyReader.Handler { 416 @Override public void run() { 417 int shutdownStatusCode = GOAWAY_INTERNAL_ERROR; 418 int rstStatusCode = SpdyStream.RST_INTERNAL_ERROR; 419 try { 420 while (spdyReader.nextFrame(this)) { 421 } 422 shutdownStatusCode = GOAWAY_OK; 423 rstStatusCode = SpdyStream.RST_CANCEL; 424 } catch (IOException e) { 425 shutdownStatusCode = GOAWAY_PROTOCOL_ERROR; 426 rstStatusCode = SpdyStream.RST_PROTOCOL_ERROR; 427 } finally { 428 try { 429 close(shutdownStatusCode, rstStatusCode); 430 } catch (IOException ignored) { 431 } 432 } 433 } 434 435 @Override public void data(int flags, int streamId, InputStream in, int length) 436 throws IOException { 437 SpdyStream dataStream = getStream(streamId); 438 if (dataStream == null) { 439 writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM); 440 Util.skipByReading(in, length); 441 return; 442 } 443 dataStream.receiveData(in, length); 444 if ((flags & SpdyConnection.FLAG_FIN) != 0) { 445 dataStream.receiveFin(); 446 } 447 } 448 449 @Override 450 public void synStream(int flags, int streamId, int associatedStreamId, int priority, int slot, 451 List<String> nameValueBlock) { 452 final SpdyStream synStream; 453 final SpdyStream previous; 454 synchronized (SpdyConnection.this) { 455 synStream = 456 new SpdyStream(streamId, SpdyConnection.this, flags, priority, slot, nameValueBlock, 457 settings); 458 if (shutdown) { 459 return; 460 } 461 lastGoodStreamId = streamId; 462 previous = streams.put(streamId, synStream); 463 } 464 if (previous != null) { 465 previous.closeLater(SpdyStream.RST_PROTOCOL_ERROR); 466 removeStream(streamId); 467 return; 468 } 469 470 executor.submit(new NamedRunnable("OkHttp SPDY Callback %s stream %d", hostName, streamId) { 471 @Override public void execute() { 472 try { 473 handler.receive(synStream); 474 } catch (IOException e) { 475 throw new RuntimeException(e); 476 } 477 } 478 }); 479 } 480 481 @Override public void synReply(int flags, int streamId, List<String> nameValueBlock) 482 throws IOException { 483 SpdyStream replyStream = getStream(streamId); 484 if (replyStream == null) { 485 writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM); 486 return; 487 } 488 replyStream.receiveReply(nameValueBlock); 489 if ((flags & SpdyConnection.FLAG_FIN) != 0) { 490 replyStream.receiveFin(); 491 } 492 } 493 494 @Override public void headers(int flags, int streamId, List<String> nameValueBlock) 495 throws IOException { 496 SpdyStream replyStream = getStream(streamId); 497 if (replyStream != null) { 498 replyStream.receiveHeaders(nameValueBlock); 499 } 500 } 501 502 @Override public void rstStream(int flags, int streamId, int statusCode) { 503 SpdyStream rstStream = removeStream(streamId); 504 if (rstStream != null) { 505 rstStream.receiveRstStream(statusCode); 506 } 507 } 508 509 @Override public void settings(int flags, Settings newSettings) { 510 SpdyStream[] streamsToNotify = null; 511 synchronized (SpdyConnection.this) { 512 if (settings == null || (flags & Settings.FLAG_CLEAR_PREVIOUSLY_PERSISTED_SETTINGS) != 0) { 513 settings = newSettings; 514 } else { 515 settings.merge(newSettings); 516 } 517 if (!streams.isEmpty()) { 518 streamsToNotify = streams.values().toArray(new SpdyStream[streams.size()]); 519 } 520 } 521 if (streamsToNotify != null) { 522 for (SpdyStream stream : streamsToNotify) { 523 // The synchronization here is ugly. We need to synchronize on 'this' to guard 524 // reads to 'settings'. We synchronize on 'stream' to guard the state change. 525 // And we need to acquire the 'stream' lock first, since that may block. 526 synchronized (stream) { 527 synchronized (this) { 528 stream.receiveSettings(settings); 529 } 530 } 531 } 532 } 533 } 534 535 @Override public void noop() { 536 } 537 538 @Override public void ping(int flags, int streamId) { 539 if (client != (streamId % 2 == 1)) { 540 // Respond to a client ping if this is a server and vice versa. 541 writePingLater(streamId, null); 542 } else { 543 Ping ping = removePing(streamId); 544 if (ping != null) { 545 ping.receive(); 546 } 547 } 548 } 549 550 @Override public void goAway(int flags, int lastGoodStreamId, int statusCode) { 551 synchronized (SpdyConnection.this) { 552 shutdown = true; 553 554 // Fail all streams created after the last good stream ID. 555 for (Iterator<Map.Entry<Integer, SpdyStream>> i = streams.entrySet().iterator(); 556 i.hasNext(); ) { 557 Map.Entry<Integer, SpdyStream> entry = i.next(); 558 int streamId = entry.getKey(); 559 if (streamId > lastGoodStreamId && entry.getValue().isLocallyInitiated()) { 560 entry.getValue().receiveRstStream(SpdyStream.RST_REFUSED_STREAM); 561 i.remove(); 562 } 563 } 564 } 565 } 566 567 @Override public void windowUpdate(int flags, int streamId, int deltaWindowSize) { 568 SpdyStream stream = getStream(streamId); 569 if (stream != null) { 570 stream.receiveWindowUpdate(deltaWindowSize); 571 } 572 } 573 } 574} 575