1/*
2 * Copyright (C) 2011 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package com.squareup.okhttp.internal.framed;
17
18import com.squareup.okhttp.Protocol;
19import com.squareup.okhttp.internal.NamedRunnable;
20import com.squareup.okhttp.internal.Util;
21import java.io.Closeable;
22import java.io.IOException;
23import java.io.InterruptedIOException;
24import java.net.InetSocketAddress;
25import java.net.Socket;
26import java.util.HashMap;
27import java.util.LinkedHashSet;
28import java.util.List;
29import java.util.Map;
30import java.util.Set;
31import java.util.concurrent.ExecutorService;
32import java.util.concurrent.LinkedBlockingQueue;
33import java.util.concurrent.SynchronousQueue;
34import java.util.concurrent.ThreadPoolExecutor;
35import java.util.concurrent.TimeUnit;
36import java.util.logging.Level;
37import okio.Buffer;
38import okio.BufferedSink;
39import okio.BufferedSource;
40import okio.ByteString;
41import okio.Okio;
42
43import static com.squareup.okhttp.internal.Internal.logger;
44import static com.squareup.okhttp.internal.framed.Settings.DEFAULT_INITIAL_WINDOW_SIZE;
45
46/**
47 * A socket connection to a remote peer. A connection hosts streams which can
48 * send and receive data.
49 *
50 * <p>Many methods in this API are <strong>synchronous:</strong> the call is
51 * completed before the method returns. This is typical for Java but atypical
52 * for SPDY. This is motivated by exception transparency: an IOException that
53 * was triggered by a certain caller can be caught and handled by that caller.
54 */
55public final class FramedConnection implements Closeable {
56
57  // Internal state of this connection is guarded by 'this'. No blocking
58  // operations may be performed while holding this lock!
59  //
60  // Socket writes are guarded by frameWriter.
61  //
62  // Socket reads are unguarded but are only made by the reader thread.
63  //
64  // Certain operations (like SYN_STREAM) need to synchronize on both the
65  // frameWriter (to do blocking I/O) and this (to create streams). Such
66  // operations must synchronize on 'this' last. This ensures that we never
67  // wait for a blocking operation while holding 'this'.
68
69  private static final ExecutorService executor = new ThreadPoolExecutor(0,
70      Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
71      Util.threadFactory("OkHttp FramedConnection", true));
72
73  /** The protocol variant, like {@link com.squareup.okhttp.internal.framed.Spdy3}. */
74  final Protocol protocol;
75
76  /** True if this peer initiated the connection. */
77  final boolean client;
78
79  /**
80   * User code to run in response to incoming streams or settings. Calls to this are always invoked
81   * on {@link #executor}.
82   */
83  private final Listener listener;
84  private final Map<Integer, FramedStream> streams = new HashMap<>();
85  private final String hostName;
86  private int lastGoodStreamId;
87  private int nextStreamId;
88  private boolean shutdown;
89  private long idleStartTimeNs = System.nanoTime();
90
91  /** Ensures push promise callbacks events are sent in order per stream. */
92  private final ExecutorService pushExecutor;
93
94  /** Lazily-created map of in-flight pings awaiting a response. Guarded by this. */
95  private Map<Integer, Ping> pings;
96  /** User code to run in response to push promise events. */
97  private final PushObserver pushObserver;
98  private int nextPingId;
99
100  /**
101   * The total number of bytes consumed by the application, but not yet
102   * acknowledged by sending a {@code WINDOW_UPDATE} frame on this connection.
103   */
104  // Visible for testing
105  long unacknowledgedBytesRead = 0;
106
107  /**
108   * Count of bytes that can be written on the connection before receiving a
109   * window update.
110   */
111  // Visible for testing
112  long bytesLeftInWriteWindow;
113
114  /** Settings we communicate to the peer. */
115  Settings okHttpSettings = new Settings();
116
117  private static final int OKHTTP_CLIENT_WINDOW_SIZE = 16 * 1024 * 1024;
118
119  /** Settings we receive from the peer. */
120  // TODO: MWS will need to guard on this setting before attempting to push.
121  final Settings peerSettings = new Settings();
122
123  private boolean receivedInitialPeerSettings = false;
124  final Variant variant;
125  final Socket socket;
126  final FrameWriter frameWriter;
127
128  // Visible for testing
129  final Reader readerRunnable;
130
131  private FramedConnection(Builder builder) throws IOException {
132    protocol = builder.protocol;
133    pushObserver = builder.pushObserver;
134    client = builder.client;
135    listener = builder.listener;
136    // http://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-5.1.1
137    nextStreamId = builder.client ? 1 : 2;
138    if (builder.client && protocol == Protocol.HTTP_2) {
139      nextStreamId += 2; // In HTTP/2, 1 on client is reserved for Upgrade.
140    }
141
142    nextPingId = builder.client ? 1 : 2;
143
144    // Flow control was designed more for servers, or proxies than edge clients.
145    // If we are a client, set the flow control window to 16MiB.  This avoids
146    // thrashing window updates every 64KiB, yet small enough to avoid blowing
147    // up the heap.
148    if (builder.client) {
149      okHttpSettings.set(Settings.INITIAL_WINDOW_SIZE, 0, OKHTTP_CLIENT_WINDOW_SIZE);
150    }
151
152    hostName = builder.hostName;
153
154    if (protocol == Protocol.HTTP_2) {
155      variant = new Http2();
156      // Like newSingleThreadExecutor, except lazy creates the thread.
157      pushExecutor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS,
158          new LinkedBlockingQueue<Runnable>(),
159          Util.threadFactory(String.format("OkHttp %s Push Observer", hostName), true));
160      // 1 less than SPDY http://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-6.9.2
161      peerSettings.set(Settings.INITIAL_WINDOW_SIZE, 0, 65535);
162      peerSettings.set(Settings.MAX_FRAME_SIZE, 0, Http2.INITIAL_MAX_FRAME_SIZE);
163    } else if (protocol == Protocol.SPDY_3) {
164      variant = new Spdy3();
165      pushExecutor = null;
166    } else {
167      throw new AssertionError(protocol);
168    }
169    bytesLeftInWriteWindow = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE);
170    socket = builder.socket;
171    frameWriter = variant.newWriter(builder.sink, client);
172
173    readerRunnable = new Reader(variant.newReader(builder.source, client));
174    new Thread(readerRunnable).start(); // Not a daemon thread.
175  }
176
177  /** The protocol as selected using ALPN. */
178  public Protocol getProtocol() {
179    return protocol;
180  }
181
182  /**
183   * Returns the number of {@link FramedStream#isOpen() open streams} on this
184   * connection.
185   */
186  public synchronized int openStreamCount() {
187    return streams.size();
188  }
189
190  synchronized FramedStream getStream(int id) {
191    return streams.get(id);
192  }
193
194  synchronized FramedStream removeStream(int streamId) {
195    FramedStream stream = streams.remove(streamId);
196    if (stream != null && streams.isEmpty()) {
197      setIdle(true);
198    }
199    notifyAll(); // The removed stream may be blocked on a connection-wide window update.
200    return stream;
201  }
202
203  private synchronized void setIdle(boolean value) {
204    idleStartTimeNs = value ? System.nanoTime() : Long.MAX_VALUE;
205  }
206
207  /** Returns true if this connection is idle. */
208  public synchronized boolean isIdle() {
209    return idleStartTimeNs != Long.MAX_VALUE;
210  }
211
212  public synchronized int maxConcurrentStreams() {
213    return peerSettings.getMaxConcurrentStreams(Integer.MAX_VALUE);
214  }
215
216  /**
217   * Returns the time in ns when this connection became idle or Long.MAX_VALUE
218   * if connection is not idle.
219   */
220  public synchronized long getIdleStartTimeNs() {
221    return idleStartTimeNs;
222  }
223
224  /**
225   * Returns a new server-initiated stream.
226   *
227   * @param associatedStreamId the stream that triggered the sender to create
228   *     this stream.
229   * @param out true to create an output stream that we can use to send data
230   *     to the remote peer. Corresponds to {@code FLAG_FIN}.
231   */
232  public FramedStream pushStream(int associatedStreamId, List<Header> requestHeaders, boolean out)
233      throws IOException {
234    if (client) throw new IllegalStateException("Client cannot push requests.");
235    if (protocol != Protocol.HTTP_2) throw new IllegalStateException("protocol != HTTP_2");
236    return newStream(associatedStreamId, requestHeaders, out, false);
237  }
238
239  /**
240   * Returns a new locally-initiated stream.
241   *
242   * @param out true to create an output stream that we can use to send data to the remote peer.
243   *     Corresponds to {@code FLAG_FIN}.
244   * @param in true to create an input stream that the remote peer can use to send data to us.
245   *     Corresponds to {@code FLAG_UNIDIRECTIONAL}.
246   */
247  public FramedStream newStream(List<Header> requestHeaders, boolean out, boolean in)
248      throws IOException {
249    return newStream(0, requestHeaders, out, in);
250  }
251
252  private FramedStream newStream(int associatedStreamId, List<Header> requestHeaders, boolean out,
253      boolean in) throws IOException {
254    boolean outFinished = !out;
255    boolean inFinished = !in;
256    FramedStream stream;
257    int streamId;
258
259    synchronized (frameWriter) {
260      synchronized (this) {
261        if (shutdown) {
262          throw new IOException("shutdown");
263        }
264        streamId = nextStreamId;
265        nextStreamId += 2;
266        stream = new FramedStream(streamId, this, outFinished, inFinished, requestHeaders);
267        if (stream.isOpen()) {
268          streams.put(streamId, stream);
269          setIdle(false);
270        }
271      }
272      if (associatedStreamId == 0) {
273        frameWriter.synStream(outFinished, inFinished, streamId, associatedStreamId,
274            requestHeaders);
275      } else if (client) {
276        throw new IllegalArgumentException("client streams shouldn't have associated stream IDs");
277      } else { // HTTP/2 has a PUSH_PROMISE frame.
278        frameWriter.pushPromise(associatedStreamId, streamId, requestHeaders);
279      }
280    }
281
282    if (!out) {
283      frameWriter.flush();
284    }
285
286    return stream;
287  }
288
289  void writeSynReply(int streamId, boolean outFinished, List<Header> alternating)
290      throws IOException {
291    frameWriter.synReply(outFinished, streamId, alternating);
292  }
293
294  /**
295   * Callers of this method are not thread safe, and sometimes on application threads. Most often,
296   * this method will be called to send a buffer worth of data to the peer.
297   *
298   * <p>Writes are subject to the write window of the stream and the connection. Until there is a
299   * window sufficient to send {@code byteCount}, the caller will block. For example, a user of
300   * {@code HttpURLConnection} who flushes more bytes to the output stream than the connection's
301   * write window will block.
302   *
303   * <p>Zero {@code byteCount} writes are not subject to flow control and will not block. The only
304   * use case for zero {@code byteCount} is closing a flushed output stream.
305   */
306  public void writeData(int streamId, boolean outFinished, Buffer buffer, long byteCount)
307      throws IOException {
308    if (byteCount == 0) { // Empty data frames are not flow-controlled.
309      frameWriter.data(outFinished, streamId, buffer, 0);
310      return;
311    }
312
313    while (byteCount > 0) {
314      int toWrite;
315      synchronized (FramedConnection.this) {
316        try {
317          while (bytesLeftInWriteWindow <= 0) {
318            // Before blocking, confirm that the stream we're writing is still open. It's possible
319            // that the stream has since been closed (such as if this write timed out.)
320            if (!streams.containsKey(streamId)) {
321              throw new IOException("stream closed");
322            }
323            FramedConnection.this.wait(); // Wait until we receive a WINDOW_UPDATE.
324          }
325        } catch (InterruptedException e) {
326          throw new InterruptedIOException();
327        }
328
329        toWrite = (int) Math.min(byteCount, bytesLeftInWriteWindow);
330        toWrite = Math.min(toWrite, frameWriter.maxDataLength());
331        bytesLeftInWriteWindow -= toWrite;
332      }
333
334      byteCount -= toWrite;
335      frameWriter.data(outFinished && byteCount == 0, streamId, buffer, toWrite);
336    }
337  }
338
339  /**
340   * {@code delta} will be negative if a settings frame initial window is
341   * smaller than the last.
342   */
343  void addBytesToWriteWindow(long delta) {
344    bytesLeftInWriteWindow += delta;
345    if (delta > 0) FramedConnection.this.notifyAll();
346  }
347
348  void writeSynResetLater(final int streamId, final ErrorCode errorCode) {
349    executor.submit(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) {
350      @Override public void execute() {
351        try {
352          writeSynReset(streamId, errorCode);
353        } catch (IOException ignored) {
354        }
355      }
356    });
357  }
358
359  void writeSynReset(int streamId, ErrorCode statusCode) throws IOException {
360    frameWriter.rstStream(streamId, statusCode);
361  }
362
363  void writeWindowUpdateLater(final int streamId, final long unacknowledgedBytesRead) {
364    executor.execute(new NamedRunnable("OkHttp Window Update %s stream %d", hostName, streamId) {
365      @Override public void execute() {
366        try {
367          frameWriter.windowUpdate(streamId, unacknowledgedBytesRead);
368        } catch (IOException ignored) {
369        }
370      }
371    });
372  }
373
374  /**
375   * Sends a ping frame to the peer. Use the returned object to await the
376   * ping's response and observe its round trip time.
377   */
378  public Ping ping() throws IOException {
379    Ping ping = new Ping();
380    int pingId;
381    synchronized (this) {
382      if (shutdown) {
383        throw new IOException("shutdown");
384      }
385      pingId = nextPingId;
386      nextPingId += 2;
387      if (pings == null) pings = new HashMap<>();
388      pings.put(pingId, ping);
389    }
390    writePing(false, pingId, 0x4f4b6f6b /* ASCII "OKok" */, ping);
391    return ping;
392  }
393
394  private void writePingLater(
395      final boolean reply, final int payload1, final int payload2, final Ping ping) {
396    executor.execute(new NamedRunnable("OkHttp %s ping %08x%08x",
397        hostName, payload1, payload2) {
398      @Override public void execute() {
399        try {
400          writePing(reply, payload1, payload2, ping);
401        } catch (IOException ignored) {
402        }
403      }
404    });
405  }
406
407  private void writePing(boolean reply, int payload1, int payload2, Ping ping) throws IOException {
408    synchronized (frameWriter) {
409      // Observe the sent time immediately before performing I/O.
410      if (ping != null) ping.send();
411      frameWriter.ping(reply, payload1, payload2);
412    }
413  }
414
415  private synchronized Ping removePing(int id) {
416    return pings != null ? pings.remove(id) : null;
417  }
418
419  public void flush() throws IOException {
420    frameWriter.flush();
421  }
422
423  /**
424   * Degrades this connection such that new streams can neither be created
425   * locally, nor accepted from the remote peer. Existing streams are not
426   * impacted. This is intended to permit an endpoint to gracefully stop
427   * accepting new requests without harming previously established streams.
428   */
429  public void shutdown(ErrorCode statusCode) throws IOException {
430    synchronized (frameWriter) {
431      int lastGoodStreamId;
432      synchronized (this) {
433        if (shutdown) {
434          return;
435        }
436        shutdown = true;
437        lastGoodStreamId = this.lastGoodStreamId;
438      }
439      // TODO: propagate exception message into debugData
440      frameWriter.goAway(lastGoodStreamId, statusCode, Util.EMPTY_BYTE_ARRAY);
441    }
442  }
443
444  /**
445   * Closes this connection. This cancels all open streams and unanswered
446   * pings. It closes the underlying input and output streams and shuts down
447   * internal executor services.
448   */
449  @Override public void close() throws IOException {
450    close(ErrorCode.NO_ERROR, ErrorCode.CANCEL);
451  }
452
453  private void close(ErrorCode connectionCode, ErrorCode streamCode) throws IOException {
454    assert (!Thread.holdsLock(this));
455    IOException thrown = null;
456    try {
457      shutdown(connectionCode);
458    } catch (IOException e) {
459      thrown = e;
460    }
461
462    FramedStream[] streamsToClose = null;
463    Ping[] pingsToCancel = null;
464    synchronized (this) {
465      if (!streams.isEmpty()) {
466        streamsToClose = streams.values().toArray(new FramedStream[streams.size()]);
467        streams.clear();
468        setIdle(false);
469      }
470      if (pings != null) {
471        pingsToCancel = pings.values().toArray(new Ping[pings.size()]);
472        pings = null;
473      }
474    }
475
476    if (streamsToClose != null) {
477      for (FramedStream stream : streamsToClose) {
478        try {
479          stream.close(streamCode);
480        } catch (IOException e) {
481          if (thrown != null) thrown = e;
482        }
483      }
484    }
485
486    if (pingsToCancel != null) {
487      for (Ping ping : pingsToCancel) {
488        ping.cancel();
489      }
490    }
491
492    // Close the writer to release its resources (such as deflaters).
493    try {
494      frameWriter.close();
495    } catch (IOException e) {
496      if (thrown == null) thrown = e;
497    }
498
499    // Close the socket to break out the reader thread, which will clean up after itself.
500    try {
501      socket.close();
502    } catch (IOException e) {
503      thrown = e;
504    }
505
506    if (thrown != null) throw thrown;
507  }
508
509  /**
510   * Sends a connection header if the current variant requires it. This should
511   * be called after {@link Builder#build} for all new connections.
512   */
513  public void sendConnectionPreface() throws IOException {
514    frameWriter.connectionPreface();
515    frameWriter.settings(okHttpSettings);
516    int windowSize = okHttpSettings.getInitialWindowSize(Settings.DEFAULT_INITIAL_WINDOW_SIZE);
517    if (windowSize != Settings.DEFAULT_INITIAL_WINDOW_SIZE) {
518      frameWriter.windowUpdate(0, windowSize - Settings.DEFAULT_INITIAL_WINDOW_SIZE);
519    }
520  }
521
522  /** Merges {@code settings} into this peer's settings and sends them to the remote peer. */
523  public void setSettings(Settings settings) throws IOException {
524    synchronized (frameWriter) {
525      synchronized (this) {
526        if (shutdown) {
527          throw new IOException("shutdown");
528        }
529        okHttpSettings.merge(settings);
530        frameWriter.settings(settings);
531      }
532    }
533  }
534
535  public static class Builder {
536    private Socket socket;
537    private String hostName;
538    private BufferedSource source;
539    private BufferedSink sink;
540    private Listener listener = Listener.REFUSE_INCOMING_STREAMS;
541    private Protocol protocol = Protocol.SPDY_3;
542    private PushObserver pushObserver = PushObserver.CANCEL;
543    private boolean client;
544
545    /**
546     * @param client true if this peer initiated the connection; false if this
547     *     peer accepted the connection.
548     */
549    public Builder(boolean client) throws IOException {
550      this.client = client;
551    }
552
553    public Builder socket(Socket socket) throws IOException {
554      return socket(socket, ((InetSocketAddress) socket.getRemoteSocketAddress()).getHostName(),
555          Okio.buffer(Okio.source(socket)), Okio.buffer(Okio.sink(socket)));
556    }
557
558    public Builder socket(
559        Socket socket, String hostName, BufferedSource source, BufferedSink sink) {
560      this.socket = socket;
561      this.hostName = hostName;
562      this.source = source;
563      this.sink = sink;
564      return this;
565    }
566
567    public Builder listener(Listener listener) {
568      this.listener = listener;
569      return this;
570    }
571
572    public Builder protocol(Protocol protocol) {
573      this.protocol = protocol;
574      return this;
575    }
576
577    public Builder pushObserver(PushObserver pushObserver) {
578      this.pushObserver = pushObserver;
579      return this;
580    }
581
582    public FramedConnection build() throws IOException {
583      return new FramedConnection(this);
584    }
585  }
586
587  /**
588   * Methods in this class must not lock FrameWriter.  If a method needs to
589   * write a frame, create an async task to do so.
590   */
591  class Reader extends NamedRunnable implements FrameReader.Handler {
592    final FrameReader frameReader;
593
594    private Reader(FrameReader frameReader) {
595      super("OkHttp %s", hostName);
596      this.frameReader = frameReader;
597    }
598
599    @Override protected void execute() {
600      ErrorCode connectionErrorCode = ErrorCode.INTERNAL_ERROR;
601      ErrorCode streamErrorCode = ErrorCode.INTERNAL_ERROR;
602      try {
603        if (!client) {
604          frameReader.readConnectionPreface();
605        }
606        while (frameReader.nextFrame(this)) {
607        }
608        connectionErrorCode = ErrorCode.NO_ERROR;
609        streamErrorCode = ErrorCode.CANCEL;
610      } catch (IOException e) {
611        connectionErrorCode = ErrorCode.PROTOCOL_ERROR;
612        streamErrorCode = ErrorCode.PROTOCOL_ERROR;
613      } finally {
614        try {
615          close(connectionErrorCode, streamErrorCode);
616        } catch (IOException ignored) {
617        }
618        Util.closeQuietly(frameReader);
619      }
620    }
621
622    @Override public void data(boolean inFinished, int streamId, BufferedSource source, int length)
623        throws IOException {
624      if (pushedStream(streamId)) {
625        pushDataLater(streamId, source, length, inFinished);
626        return;
627      }
628      FramedStream dataStream = getStream(streamId);
629      if (dataStream == null) {
630        writeSynResetLater(streamId, ErrorCode.INVALID_STREAM);
631        source.skip(length);
632        return;
633      }
634      dataStream.receiveData(source, length);
635      if (inFinished) {
636        dataStream.receiveFin();
637      }
638    }
639
640    @Override public void headers(boolean outFinished, boolean inFinished, int streamId,
641        int associatedStreamId, List<Header> headerBlock, HeadersMode headersMode) {
642      if (pushedStream(streamId)) {
643        pushHeadersLater(streamId, headerBlock, inFinished);
644        return;
645      }
646      FramedStream stream;
647      synchronized (FramedConnection.this) {
648        // If we're shutdown, don't bother with this stream.
649        if (shutdown) return;
650
651        stream = getStream(streamId);
652
653        if (stream == null) {
654          // The headers claim to be for an existing stream, but we don't have one.
655          if (headersMode.failIfStreamAbsent()) {
656            writeSynResetLater(streamId, ErrorCode.INVALID_STREAM);
657            return;
658          }
659
660          // If the stream ID is less than the last created ID, assume it's already closed.
661          if (streamId <= lastGoodStreamId) return;
662
663          // If the stream ID is in the client's namespace, assume it's already closed.
664          if (streamId % 2 == nextStreamId % 2) return;
665
666          // Create a stream.
667          final FramedStream
668              newStream = new FramedStream(streamId, FramedConnection.this, outFinished,
669              inFinished, headerBlock);
670          lastGoodStreamId = streamId;
671          streams.put(streamId, newStream);
672          executor.execute(new NamedRunnable("OkHttp %s stream %d", hostName, streamId) {
673            @Override public void execute() {
674              try {
675                listener.onStream(newStream);
676              } catch (IOException e) {
677                logger.log(Level.INFO, "FramedConnection.Listener failure for " + hostName, e);
678                try {
679                  newStream.close(ErrorCode.PROTOCOL_ERROR);
680                } catch (IOException ignored) {
681                }
682              }
683            }
684          });
685          return;
686        }
687      }
688
689      // The headers claim to be for a new stream, but we already have one.
690      if (headersMode.failIfStreamPresent()) {
691        stream.closeLater(ErrorCode.PROTOCOL_ERROR);
692        removeStream(streamId);
693        return;
694      }
695
696      // Update an existing stream.
697      stream.receiveHeaders(headerBlock, headersMode);
698      if (inFinished) stream.receiveFin();
699    }
700
701    @Override public void rstStream(int streamId, ErrorCode errorCode) {
702      if (pushedStream(streamId)) {
703        pushResetLater(streamId, errorCode);
704        return;
705      }
706      FramedStream rstStream = removeStream(streamId);
707      if (rstStream != null) {
708        rstStream.receiveRstStream(errorCode);
709      }
710    }
711
712    @Override public void settings(boolean clearPrevious, Settings newSettings) {
713      long delta = 0;
714      FramedStream[] streamsToNotify = null;
715      synchronized (FramedConnection.this) {
716        int priorWriteWindowSize = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE);
717        if (clearPrevious) peerSettings.clear();
718        peerSettings.merge(newSettings);
719        if (getProtocol() == Protocol.HTTP_2) {
720          ackSettingsLater(newSettings);
721        }
722        int peerInitialWindowSize = peerSettings.getInitialWindowSize(DEFAULT_INITIAL_WINDOW_SIZE);
723        if (peerInitialWindowSize != -1 && peerInitialWindowSize != priorWriteWindowSize) {
724          delta = peerInitialWindowSize - priorWriteWindowSize;
725          if (!receivedInitialPeerSettings) {
726            addBytesToWriteWindow(delta);
727            receivedInitialPeerSettings = true;
728          }
729          if (!streams.isEmpty()) {
730            streamsToNotify = streams.values().toArray(new FramedStream[streams.size()]);
731          }
732        }
733        executor.execute(new NamedRunnable("OkHttp %s settings", hostName) {
734          @Override public void execute() {
735            listener.onSettings(FramedConnection.this);
736          }
737        });
738      }
739      if (streamsToNotify != null && delta != 0) {
740        for (FramedStream stream : streamsToNotify) {
741          synchronized (stream) {
742            stream.addBytesToWriteWindow(delta);
743          }
744        }
745      }
746    }
747
748    private void ackSettingsLater(final Settings peerSettings) {
749      executor.execute(new NamedRunnable("OkHttp %s ACK Settings", hostName) {
750        @Override public void execute() {
751          try {
752            frameWriter.ackSettings(peerSettings);
753          } catch (IOException ignored) {
754          }
755        }
756      });
757    }
758
759    @Override public void ackSettings() {
760      // TODO: If we don't get this callback after sending settings to the peer, SETTINGS_TIMEOUT.
761    }
762
763    @Override public void ping(boolean reply, int payload1, int payload2) {
764      if (reply) {
765        Ping ping = removePing(payload1);
766        if (ping != null) {
767          ping.receive();
768        }
769      } else {
770        // Send a reply to a client ping if this is a server and vice versa.
771        writePingLater(true, payload1, payload2, null);
772      }
773    }
774
775    @Override public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
776      if (debugData.size() > 0) { // TODO: log the debugData
777      }
778
779      // Copy the streams first. We don't want to hold a lock when we call receiveRstStream().
780      FramedStream[] streamsCopy;
781      synchronized (FramedConnection.this) {
782        streamsCopy = streams.values().toArray(new FramedStream[streams.size()]);
783        shutdown = true;
784      }
785
786      // Fail all streams created after the last good stream ID.
787      for (FramedStream framedStream : streamsCopy) {
788        if (framedStream.getId() > lastGoodStreamId && framedStream.isLocallyInitiated()) {
789          framedStream.receiveRstStream(ErrorCode.REFUSED_STREAM);
790          removeStream(framedStream.getId());
791        }
792      }
793    }
794
795    @Override public void windowUpdate(int streamId, long windowSizeIncrement) {
796      if (streamId == 0) {
797        synchronized (FramedConnection.this) {
798          bytesLeftInWriteWindow += windowSizeIncrement;
799          FramedConnection.this.notifyAll();
800        }
801      } else {
802        FramedStream stream = getStream(streamId);
803        if (stream != null) {
804          synchronized (stream) {
805            stream.addBytesToWriteWindow(windowSizeIncrement);
806          }
807        }
808      }
809    }
810
811    @Override public void priority(int streamId, int streamDependency, int weight,
812        boolean exclusive) {
813      // TODO: honor priority.
814    }
815
816    @Override
817    public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders) {
818      pushRequestLater(promisedStreamId, requestHeaders);
819    }
820
821    @Override public void alternateService(int streamId, String origin, ByteString protocol,
822        String host, int port, long maxAge) {
823      // TODO: register alternate service.
824    }
825  }
826
827  /** Even, positive numbered streams are pushed streams in HTTP/2. */
828  private boolean pushedStream(int streamId) {
829    return protocol == Protocol.HTTP_2 && streamId != 0 && (streamId & 1) == 0;
830  }
831
832  // Guarded by this.
833  private final Set<Integer> currentPushRequests = new LinkedHashSet<>();
834
835  private void pushRequestLater(final int streamId, final List<Header> requestHeaders) {
836    synchronized (this) {
837      if (currentPushRequests.contains(streamId)) {
838        writeSynResetLater(streamId, ErrorCode.PROTOCOL_ERROR);
839        return;
840      }
841      currentPushRequests.add(streamId);
842    }
843    pushExecutor.execute(new NamedRunnable("OkHttp %s Push Request[%s]", hostName, streamId) {
844      @Override public void execute() {
845        boolean cancel = pushObserver.onRequest(streamId, requestHeaders);
846        try {
847          if (cancel) {
848            frameWriter.rstStream(streamId, ErrorCode.CANCEL);
849            synchronized (FramedConnection.this) {
850              currentPushRequests.remove(streamId);
851            }
852          }
853        } catch (IOException ignored) {
854        }
855      }
856    });
857  }
858
859  private void pushHeadersLater(final int streamId, final List<Header> requestHeaders,
860      final boolean inFinished) {
861    pushExecutor.execute(new NamedRunnable("OkHttp %s Push Headers[%s]", hostName, streamId) {
862      @Override public void execute() {
863        boolean cancel = pushObserver.onHeaders(streamId, requestHeaders, inFinished);
864        try {
865          if (cancel) frameWriter.rstStream(streamId, ErrorCode.CANCEL);
866          if (cancel || inFinished) {
867            synchronized (FramedConnection.this) {
868              currentPushRequests.remove(streamId);
869            }
870          }
871        } catch (IOException ignored) {
872        }
873      }
874    });
875  }
876
877  /**
878   * Eagerly reads {@code byteCount} bytes from the source before launching a background task to
879   * process the data.  This avoids corrupting the stream.
880   */
881  private void pushDataLater(final int streamId, final BufferedSource source, final int byteCount,
882      final boolean inFinished) throws IOException {
883    final Buffer buffer = new Buffer();
884    source.require(byteCount); // Eagerly read the frame before firing client thread.
885    source.read(buffer, byteCount);
886    if (buffer.size() != byteCount) throw new IOException(buffer.size() + " != " + byteCount);
887    pushExecutor.execute(new NamedRunnable("OkHttp %s Push Data[%s]", hostName, streamId) {
888      @Override public void execute() {
889        try {
890          boolean cancel = pushObserver.onData(streamId, buffer, byteCount, inFinished);
891          if (cancel) frameWriter.rstStream(streamId, ErrorCode.CANCEL);
892          if (cancel || inFinished) {
893            synchronized (FramedConnection.this) {
894              currentPushRequests.remove(streamId);
895            }
896          }
897        } catch (IOException ignored) {
898        }
899      }
900    });
901  }
902
903  private void pushResetLater(final int streamId, final ErrorCode errorCode) {
904    pushExecutor.execute(new NamedRunnable("OkHttp %s Push Reset[%s]", hostName, streamId) {
905      @Override public void execute() {
906        pushObserver.onReset(streamId, errorCode);
907        synchronized (FramedConnection.this) {
908          currentPushRequests.remove(streamId);
909        }
910      }
911    });
912  }
913
914  /** Listener of streams and settings initiated by the peer. */
915  public abstract static class Listener {
916    public static final Listener REFUSE_INCOMING_STREAMS = new Listener() {
917      @Override public void onStream(FramedStream stream) throws IOException {
918        stream.close(ErrorCode.REFUSED_STREAM);
919      }
920    };
921
922    /**
923     * Handle a new stream from this connection's peer. Implementations should
924     * respond by either {@linkplain FramedStream#reply replying to the stream}
925     * or {@linkplain FramedStream#close closing it}. This response does not
926     * need to be synchronous.
927     */
928    public abstract void onStream(FramedStream stream) throws IOException;
929
930    /**
931     * Notification that the connection's peer's settings may have changed.
932     * Implementations should take appropriate action to handle the updated
933     * settings.
934     *
935     * <p>It is the implementation's responsibility to handle concurrent calls
936     * to this method. A remote peer that sends multiple settings frames will
937     * trigger multiple calls to this method, and those calls are not
938     * necessarily serialized.
939     */
940    public void onSettings(FramedConnection connection) {
941    }
942  }
943}
944