1/*
2 * Copyright (C) 2011 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.squareup.okhttp.internal.spdy;
18
19import com.squareup.okhttp.internal.NamedRunnable;
20import com.squareup.okhttp.internal.Util;
21import java.io.Closeable;
22import java.io.IOException;
23import java.io.InputStream;
24import java.io.OutputStream;
25import java.net.Socket;
26import java.util.HashMap;
27import java.util.Iterator;
28import java.util.List;
29import java.util.Map;
30import java.util.concurrent.ExecutorService;
31import java.util.concurrent.SynchronousQueue;
32import java.util.concurrent.ThreadPoolExecutor;
33import java.util.concurrent.TimeUnit;
34
35/**
36 * A socket connection to a remote peer. A connection hosts streams which can
37 * send and receive data.
38 *
39 * <p>Many methods in this API are <strong>synchronous:</strong> the call is
40 * completed before the method returns. This is typical for Java but atypical
41 * for SPDY. This is motivated by exception transparency: an IOException that
42 * was triggered by a certain caller can be caught and handled by that caller.
43 */
44public final class SpdyConnection implements Closeable {
45
46  // Internal state of this connection is guarded by 'this'. No blocking
47  // operations may be performed while holding this lock!
48  //
49  // Socket writes are guarded by spdyWriter.
50  //
51  // Socket reads are unguarded but are only made by the reader thread.
52  //
53  // Certain operations (like SYN_STREAM) need to synchronize on both the
54  // spdyWriter (to do blocking I/O) and this (to create streams). Such
55  // operations must synchronize on 'this' last. This ensures that we never
56  // wait for a blocking operation while holding 'this'.
57
58  static final int FLAG_FIN = 0x1;
59  static final int FLAG_UNIDIRECTIONAL = 0x2;
60
61  static final int TYPE_DATA = 0x0;
62  static final int TYPE_SYN_STREAM = 0x1;
63  static final int TYPE_SYN_REPLY = 0x2;
64  static final int TYPE_RST_STREAM = 0x3;
65  static final int TYPE_SETTINGS = 0x4;
66  static final int TYPE_NOOP = 0x5;
67  static final int TYPE_PING = 0x6;
68  static final int TYPE_GOAWAY = 0x7;
69  static final int TYPE_HEADERS = 0x8;
70  static final int TYPE_WINDOW_UPDATE = 0x9;
71  static final int TYPE_CREDENTIAL = 0x10;
72  static final int VERSION = 3;
73
74  static final int GOAWAY_OK = 0;
75  static final int GOAWAY_PROTOCOL_ERROR = 1;
76  static final int GOAWAY_INTERNAL_ERROR = 2;
77
78  private static final ExecutorService executor = new ThreadPoolExecutor(0,
79      Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
80      Util.daemonThreadFactory("OkHttp SpdyConnection"));
81
82  /** True if this peer initiated the connection. */
83  final boolean client;
84
85  /**
86   * User code to run in response to an incoming stream. Callbacks must not be
87   * run on the callback executor.
88   */
89  private final IncomingStreamHandler handler;
90  private final SpdyReader spdyReader;
91  private final SpdyWriter spdyWriter;
92
93  private final Map<Integer, SpdyStream> streams = new HashMap<Integer, SpdyStream>();
94  private final String hostName;
95  private int lastGoodStreamId;
96  private int nextStreamId;
97  private boolean shutdown;
98  private long idleStartTimeNs = System.nanoTime();
99
100  /** Lazily-created map of in-flight pings awaiting a response. Guarded by this. */
101  private Map<Integer, Ping> pings;
102  private int nextPingId;
103
104  /** Lazily-created settings for this connection. */
105  Settings settings;
106
107  private SpdyConnection(Builder builder) {
108    client = builder.client;
109    handler = builder.handler;
110    spdyReader = new SpdyReader(builder.in);
111    spdyWriter = new SpdyWriter(builder.out);
112    nextStreamId = builder.client ? 1 : 2;
113    nextPingId = builder.client ? 1 : 2;
114
115    hostName = builder.hostName;
116
117    new Thread(new Reader(), "Spdy Reader " + hostName).start();
118  }
119
120  /**
121   * Returns the number of {@link SpdyStream#isOpen() open streams} on this
122   * connection.
123   */
124  public synchronized int openStreamCount() {
125    return streams.size();
126  }
127
128  private synchronized SpdyStream getStream(int id) {
129    return streams.get(id);
130  }
131
132  synchronized SpdyStream removeStream(int streamId) {
133    SpdyStream stream = streams.remove(streamId);
134    if (stream != null && streams.isEmpty()) {
135      setIdle(true);
136    }
137    return stream;
138  }
139
140  private synchronized void setIdle(boolean value) {
141    idleStartTimeNs = value ? System.nanoTime() : 0L;
142  }
143
144  /** Returns true if this connection is idle. */
145  public synchronized boolean isIdle() {
146    return idleStartTimeNs != 0L;
147  }
148
149  /** Returns the time in ns when this connection became idle or 0L if connection is not idle. */
150  public synchronized long getIdleStartTimeNs() {
151    return idleStartTimeNs;
152  }
153
154  /**
155   * Returns a new locally-initiated stream.
156   *
157   * @param out true to create an output stream that we can use to send data
158   * to the remote peer. Corresponds to {@code FLAG_FIN}.
159   * @param in true to create an input stream that the remote peer can use to
160   * send data to us. Corresponds to {@code FLAG_UNIDIRECTIONAL}.
161   */
162  public SpdyStream newStream(List<String> requestHeaders, boolean out, boolean in)
163      throws IOException {
164    int flags = (out ? 0 : FLAG_FIN) | (in ? 0 : FLAG_UNIDIRECTIONAL);
165    int associatedStreamId = 0;  // TODO: permit the caller to specify an associated stream?
166    int priority = 0; // TODO: permit the caller to specify a priority?
167    int slot = 0; // TODO: permit the caller to specify a slot?
168    SpdyStream stream;
169    int streamId;
170
171    synchronized (spdyWriter) {
172      synchronized (this) {
173        if (shutdown) {
174          throw new IOException("shutdown");
175        }
176        streamId = nextStreamId;
177        nextStreamId += 2;
178        stream = new SpdyStream(streamId, this, flags, priority, slot, requestHeaders, settings);
179        if (stream.isOpen()) {
180          streams.put(streamId, stream);
181          setIdle(false);
182        }
183      }
184
185      spdyWriter.synStream(flags, streamId, associatedStreamId, priority, slot, requestHeaders);
186    }
187
188    return stream;
189  }
190
191  void writeSynReply(int streamId, int flags, List<String> alternating) throws IOException {
192    spdyWriter.synReply(flags, streamId, alternating);
193  }
194
195  /** Writes a complete data frame. */
196  void writeFrame(byte[] bytes, int offset, int length) throws IOException {
197    synchronized (spdyWriter) {
198      spdyWriter.out.write(bytes, offset, length);
199    }
200  }
201
202  void writeSynResetLater(final int streamId, final int statusCode) {
203    executor.submit(new NamedRunnable("OkHttp SPDY Writer %s stream %d", hostName, streamId) {
204      @Override public void execute() {
205        try {
206          writeSynReset(streamId, statusCode);
207        } catch (IOException ignored) {
208        }
209      }
210    });
211  }
212
213  void writeSynReset(int streamId, int statusCode) throws IOException {
214    spdyWriter.rstStream(streamId, statusCode);
215  }
216
217  void writeWindowUpdateLater(final int streamId, final int deltaWindowSize) {
218    executor.submit(new NamedRunnable("OkHttp SPDY Writer %s stream %d", hostName, streamId) {
219      @Override public void execute() {
220        try {
221          writeWindowUpdate(streamId, deltaWindowSize);
222        } catch (IOException ignored) {
223        }
224      }
225    });
226  }
227
228  void writeWindowUpdate(int streamId, int deltaWindowSize) throws IOException {
229    spdyWriter.windowUpdate(streamId, deltaWindowSize);
230  }
231
232  /**
233   * Sends a ping frame to the peer. Use the returned object to await the
234   * ping's response and observe its round trip time.
235   */
236  public Ping ping() throws IOException {
237    Ping ping = new Ping();
238    int pingId;
239    synchronized (this) {
240      if (shutdown) {
241        throw new IOException("shutdown");
242      }
243      pingId = nextPingId;
244      nextPingId += 2;
245      if (pings == null) pings = new HashMap<Integer, Ping>();
246      pings.put(pingId, ping);
247    }
248    writePing(pingId, ping);
249    return ping;
250  }
251
252  private void writePingLater(final int streamId, final Ping ping) {
253    executor.submit(new NamedRunnable("OkHttp SPDY Writer %s ping %d", hostName, streamId) {
254      @Override public void execute() {
255        try {
256          writePing(streamId, ping);
257        } catch (IOException ignored) {
258        }
259      }
260    });
261  }
262
263  private void writePing(int id, Ping ping) throws IOException {
264    synchronized (spdyWriter) {
265      // Observe the sent time immediately before performing I/O.
266      if (ping != null) ping.send();
267      spdyWriter.ping(0, id);
268    }
269  }
270
271  private synchronized Ping removePing(int id) {
272    return pings != null ? pings.remove(id) : null;
273  }
274
275  /** Sends a noop frame to the peer. */
276  public void noop() throws IOException {
277    spdyWriter.noop();
278  }
279
280  public void flush() throws IOException {
281    synchronized (spdyWriter) {
282      spdyWriter.out.flush();
283    }
284  }
285
286  /**
287   * Degrades this connection such that new streams can neither be created
288   * locally, nor accepted from the remote peer. Existing streams are not
289   * impacted. This is intended to permit an endpoint to gracefully stop
290   * accepting new requests without harming previously established streams.
291   *
292   * @param statusCode one of {@link #GOAWAY_OK}, {@link
293   * #GOAWAY_INTERNAL_ERROR} or {@link #GOAWAY_PROTOCOL_ERROR}.
294   */
295  public void shutdown(int statusCode) throws IOException {
296    synchronized (spdyWriter) {
297      int lastGoodStreamId;
298      synchronized (this) {
299        if (shutdown) {
300          return;
301        }
302        shutdown = true;
303        lastGoodStreamId = this.lastGoodStreamId;
304      }
305      spdyWriter.goAway(0, lastGoodStreamId, statusCode);
306    }
307  }
308
309  /**
310   * Closes this connection. This cancels all open streams and unanswered
311   * pings. It closes the underlying input and output streams and shuts down
312   * internal executor services.
313   */
314  @Override public void close() throws IOException {
315    close(GOAWAY_OK, SpdyStream.RST_CANCEL);
316  }
317
318  private void close(int shutdownStatusCode, int rstStatusCode) throws IOException {
319    assert (!Thread.holdsLock(this));
320    IOException thrown = null;
321    try {
322      shutdown(shutdownStatusCode);
323    } catch (IOException e) {
324      thrown = e;
325    }
326
327    SpdyStream[] streamsToClose = null;
328    Ping[] pingsToCancel = null;
329    synchronized (this) {
330      if (!streams.isEmpty()) {
331        streamsToClose = streams.values().toArray(new SpdyStream[streams.size()]);
332        streams.clear();
333        setIdle(false);
334      }
335      if (pings != null) {
336        pingsToCancel = pings.values().toArray(new Ping[pings.size()]);
337        pings = null;
338      }
339    }
340
341    if (streamsToClose != null) {
342      for (SpdyStream stream : streamsToClose) {
343        try {
344          stream.close(rstStatusCode);
345        } catch (IOException e) {
346          if (thrown != null) thrown = e;
347        }
348      }
349    }
350
351    if (pingsToCancel != null) {
352      for (Ping ping : pingsToCancel) {
353        ping.cancel();
354      }
355    }
356
357    try {
358      spdyReader.close();
359    } catch (IOException e) {
360      thrown = e;
361    }
362    try {
363      spdyWriter.close();
364    } catch (IOException e) {
365      if (thrown == null) thrown = e;
366    }
367
368    if (thrown != null) throw thrown;
369  }
370
371  public static class Builder {
372    private String hostName;
373    private InputStream in;
374    private OutputStream out;
375    private IncomingStreamHandler handler = IncomingStreamHandler.REFUSE_INCOMING_STREAMS;
376    public boolean client;
377
378    public Builder(boolean client, Socket socket) throws IOException {
379      this("", client, socket.getInputStream(), socket.getOutputStream());
380    }
381
382    public Builder(boolean client, InputStream in, OutputStream out) {
383      this("", client, in, out);
384    }
385
386    /**
387     * @param client true if this peer initiated the connection; false if
388     * this peer accepted the connection.
389     */
390    public Builder(String hostName, boolean client, Socket socket) throws IOException {
391      this(hostName, client, socket.getInputStream(), socket.getOutputStream());
392    }
393
394    /**
395     * @param client true if this peer initiated the connection; false if this
396     * peer accepted the connection.
397     */
398    public Builder(String hostName, boolean client, InputStream in, OutputStream out) {
399      this.hostName = hostName;
400      this.client = client;
401      this.in = in;
402      this.out = out;
403    }
404
405    public Builder handler(IncomingStreamHandler handler) {
406      this.handler = handler;
407      return this;
408    }
409
410    public SpdyConnection build() {
411      return new SpdyConnection(this);
412    }
413  }
414
415  private class Reader implements Runnable, SpdyReader.Handler {
416    @Override public void run() {
417      int shutdownStatusCode = GOAWAY_INTERNAL_ERROR;
418      int rstStatusCode = SpdyStream.RST_INTERNAL_ERROR;
419      try {
420        while (spdyReader.nextFrame(this)) {
421        }
422        shutdownStatusCode = GOAWAY_OK;
423        rstStatusCode = SpdyStream.RST_CANCEL;
424      } catch (IOException e) {
425        shutdownStatusCode = GOAWAY_PROTOCOL_ERROR;
426        rstStatusCode = SpdyStream.RST_PROTOCOL_ERROR;
427      } finally {
428        try {
429          close(shutdownStatusCode, rstStatusCode);
430        } catch (IOException ignored) {
431        }
432      }
433    }
434
435    @Override public void data(int flags, int streamId, InputStream in, int length)
436        throws IOException {
437      SpdyStream dataStream = getStream(streamId);
438      if (dataStream == null) {
439        writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM);
440        Util.skipByReading(in, length);
441        return;
442      }
443      dataStream.receiveData(in, length);
444      if ((flags & SpdyConnection.FLAG_FIN) != 0) {
445        dataStream.receiveFin();
446      }
447    }
448
449    @Override public void synStream(int flags, int streamId, int associatedStreamId, int priority,
450        int slot, List<String> nameValueBlock) {
451      final SpdyStream synStream;
452      final SpdyStream previous;
453      synchronized (SpdyConnection.this) {
454        synStream =
455            new SpdyStream(streamId, SpdyConnection.this, flags, priority, slot, nameValueBlock,
456                settings);
457        if (shutdown) {
458          return;
459        }
460        lastGoodStreamId = streamId;
461        previous = streams.put(streamId, synStream);
462      }
463      if (previous != null) {
464        previous.closeLater(SpdyStream.RST_PROTOCOL_ERROR);
465        removeStream(streamId);
466        return;
467      }
468
469      executor.submit(new NamedRunnable("OkHttp SPDY Callback %s stream %d", hostName, streamId) {
470        @Override public void execute() {
471          try {
472            handler.receive(synStream);
473          } catch (IOException e) {
474            throw new RuntimeException(e);
475          }
476        }
477      });
478    }
479
480    @Override public void synReply(int flags, int streamId, List<String> nameValueBlock)
481        throws IOException {
482      SpdyStream replyStream = getStream(streamId);
483      if (replyStream == null) {
484        writeSynResetLater(streamId, SpdyStream.RST_INVALID_STREAM);
485        return;
486      }
487      replyStream.receiveReply(nameValueBlock);
488      if ((flags & SpdyConnection.FLAG_FIN) != 0) {
489        replyStream.receiveFin();
490      }
491    }
492
493    @Override public void headers(int flags, int streamId, List<String> nameValueBlock)
494        throws IOException {
495      SpdyStream replyStream = getStream(streamId);
496      if (replyStream != null) {
497        replyStream.receiveHeaders(nameValueBlock);
498      }
499    }
500
501    @Override public void rstStream(int flags, int streamId, int statusCode) {
502      SpdyStream rstStream = removeStream(streamId);
503      if (rstStream != null) {
504        rstStream.receiveRstStream(statusCode);
505      }
506    }
507
508    @Override public void settings(int flags, Settings newSettings) {
509      SpdyStream[] streamsToNotify = null;
510      synchronized (SpdyConnection.this) {
511        if (settings == null || (flags & Settings.FLAG_CLEAR_PREVIOUSLY_PERSISTED_SETTINGS) != 0) {
512          settings = newSettings;
513        } else {
514          settings.merge(newSettings);
515        }
516        if (!streams.isEmpty()) {
517          streamsToNotify = streams.values().toArray(new SpdyStream[streams.size()]);
518        }
519      }
520      if (streamsToNotify != null) {
521        for (SpdyStream stream : streamsToNotify) {
522          // The synchronization here is ugly. We need to synchronize on 'this' to guard
523          // reads to 'settings'. We synchronize on 'stream' to guard the state change.
524          // And we need to acquire the 'stream' lock first, since that may block.
525          synchronized (stream) {
526            synchronized (SpdyConnection.this) {
527              stream.receiveSettings(settings);
528            }
529          }
530        }
531      }
532    }
533
534    @Override public void noop() {
535    }
536
537    @Override public void ping(int flags, int streamId) {
538      if (client != (streamId % 2 == 1)) {
539        // Respond to a client ping if this is a server and vice versa.
540        writePingLater(streamId, null);
541      } else {
542        Ping ping = removePing(streamId);
543        if (ping != null) {
544          ping.receive();
545        }
546      }
547    }
548
549    @Override public void goAway(int flags, int lastGoodStreamId, int statusCode) {
550      synchronized (SpdyConnection.this) {
551        shutdown = true;
552
553        // Fail all streams created after the last good stream ID.
554        for (Iterator<Map.Entry<Integer, SpdyStream>> i = streams.entrySet().iterator();
555            i.hasNext(); ) {
556          Map.Entry<Integer, SpdyStream> entry = i.next();
557          int streamId = entry.getKey();
558          if (streamId > lastGoodStreamId && entry.getValue().isLocallyInitiated()) {
559            entry.getValue().receiveRstStream(SpdyStream.RST_REFUSED_STREAM);
560            i.remove();
561          }
562        }
563      }
564    }
565
566    @Override public void windowUpdate(int flags, int streamId, int deltaWindowSize) {
567      SpdyStream stream = getStream(streamId);
568      if (stream != null) {
569        stream.receiveWindowUpdate(deltaWindowSize);
570      }
571    }
572  }
573}
574