1/*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.squareup.okhttp.internal.http;
18
19import com.squareup.okhttp.Headers;
20import com.squareup.okhttp.Request;
21import com.squareup.okhttp.Response;
22import com.squareup.okhttp.ResponseBody;
23import com.squareup.okhttp.internal.Internal;
24import com.squareup.okhttp.internal.Util;
25import com.squareup.okhttp.internal.io.RealConnection;
26import java.io.EOFException;
27import java.io.IOException;
28import java.net.ProtocolException;
29import okio.Buffer;
30import okio.BufferedSink;
31import okio.BufferedSource;
32import okio.ForwardingTimeout;
33import okio.Okio;
34import okio.Sink;
35import okio.Source;
36import okio.Timeout;
37
38import static com.squareup.okhttp.internal.Util.checkOffsetAndCount;
39import static com.squareup.okhttp.internal.http.StatusLine.HTTP_CONTINUE;
40import static java.util.concurrent.TimeUnit.MILLISECONDS;
41
42/**
43 * A socket connection that can be used to send HTTP/1.1 messages. This class
44 * strictly enforces the following lifecycle:
45 * <ol>
46 *   <li>{@link #writeRequest Send request headers}.
47 *   <li>Open a sink to write the request body. Either {@link
48 *       #newFixedLengthSink fixed-length} or {@link #newChunkedSink chunked}.
49 *   <li>Write to and then close that sink.
50 *   <li>{@link #readResponse Read response headers}.
51 *   <li>Open a source to read the response body. Either {@link
52 *       #newFixedLengthSource fixed-length}, {@link #newChunkedSource chunked}
53 *       or {@link #newUnknownLengthSource unknown length}.
54 *   <li>Read from and close that source.
55 * </ol>
56 * <p>Exchanges that do not have a request body may skip creating and closing
57 * the request body. Exchanges that do not have a response body can call {@link
58 * #newFixedLengthSource(long) newFixedLengthSource(0)} and may skip reading and
59 * closing that source.
60 */
61public final class Http1xStream implements HttpStream {
62  private static final int STATE_IDLE = 0; // Idle connections are ready to write request headers.
63  private static final int STATE_OPEN_REQUEST_BODY = 1;
64  private static final int STATE_WRITING_REQUEST_BODY = 2;
65  private static final int STATE_READ_RESPONSE_HEADERS = 3;
66  private static final int STATE_OPEN_RESPONSE_BODY = 4;
67  private static final int STATE_READING_RESPONSE_BODY = 5;
68  private static final int STATE_CLOSED = 6;
69
70  /** The stream allocation that owns this stream. May be null for HTTPS proxy tunnels. */
71  private final StreamAllocation streamAllocation;
72  private final BufferedSource source;
73  private final BufferedSink sink;
74  private HttpEngine httpEngine;
75  private int state = STATE_IDLE;
76
77  public Http1xStream(StreamAllocation streamAllocation, BufferedSource source, BufferedSink sink) {
78    this.streamAllocation = streamAllocation;
79    this.source = source;
80    this.sink = sink;
81  }
82
83  @Override public void setHttpEngine(HttpEngine httpEngine) {
84    this.httpEngine = httpEngine;
85  }
86
87  @Override public Sink createRequestBody(Request request, long contentLength) throws IOException {
88    if ("chunked".equalsIgnoreCase(request.header("Transfer-Encoding"))) {
89      // Stream a request body of unknown length.
90      return newChunkedSink();
91    }
92
93    if (contentLength != -1) {
94      // Stream a request body of a known length.
95      return newFixedLengthSink(contentLength);
96    }
97
98    throw new IllegalStateException(
99        "Cannot stream a request body without chunked encoding or a known content length!");
100  }
101
102  @Override public void cancel() {
103    RealConnection connection = streamAllocation.connection();
104    if (connection != null) connection.cancel();
105  }
106
107  /**
108   * Prepares the HTTP headers and sends them to the server.
109   *
110   * <p>For streaming requests with a body, headers must be prepared
111   * <strong>before</strong> the output stream has been written to. Otherwise
112   * the body would need to be buffered!
113   *
114   * <p>For non-streaming requests with a body, headers must be prepared
115   * <strong>after</strong> the output stream has been written to and closed.
116   * This ensures that the {@code Content-Length} header field receives the
117   * proper value.
118   */
119  @Override public void writeRequestHeaders(Request request) throws IOException {
120    httpEngine.writingRequestHeaders();
121    String requestLine = RequestLine.get(
122        request, httpEngine.getConnection().getRoute().getProxy().type());
123    writeRequest(request.headers(), requestLine);
124  }
125
126  @Override public Response.Builder readResponseHeaders() throws IOException {
127    return readResponse();
128  }
129
130  @Override public ResponseBody openResponseBody(Response response) throws IOException {
131    Source source = getTransferStream(response);
132    return new RealResponseBody(response.headers(), Okio.buffer(source));
133  }
134
135  private Source getTransferStream(Response response) throws IOException {
136    if (!HttpEngine.hasBody(response)) {
137      return newFixedLengthSource(0);
138    }
139
140    if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
141      return newChunkedSource(httpEngine);
142    }
143
144    long contentLength = OkHeaders.contentLength(response);
145    if (contentLength != -1) {
146      return newFixedLengthSource(contentLength);
147    }
148
149    // Wrap the input stream from the connection (rather than just returning
150    // "socketIn" directly here), so that we can control its use after the
151    // reference escapes.
152    return newUnknownLengthSource();
153  }
154
155  /** Returns true if this connection is closed. */
156  public boolean isClosed() {
157    return state == STATE_CLOSED;
158  }
159
160  @Override public void finishRequest() throws IOException {
161    sink.flush();
162  }
163
164  /** Returns bytes of a request header for sending on an HTTP transport. */
165  public void writeRequest(Headers headers, String requestLine) throws IOException {
166    if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
167    sink.writeUtf8(requestLine).writeUtf8("\r\n");
168    for (int i = 0, size = headers.size(); i < size; i ++) {
169      sink.writeUtf8(headers.name(i))
170          .writeUtf8(": ")
171          .writeUtf8(headers.value(i))
172          .writeUtf8("\r\n");
173    }
174    sink.writeUtf8("\r\n");
175    state = STATE_OPEN_REQUEST_BODY;
176  }
177
178  /** Parses bytes of a response header from an HTTP transport. */
179  public Response.Builder readResponse() throws IOException {
180    if (state != STATE_OPEN_REQUEST_BODY && state != STATE_READ_RESPONSE_HEADERS) {
181      throw new IllegalStateException("state: " + state);
182    }
183
184    try {
185      while (true) {
186        StatusLine statusLine = StatusLine.parse(source.readUtf8LineStrict());
187
188        Response.Builder responseBuilder = new Response.Builder()
189            .protocol(statusLine.protocol)
190            .code(statusLine.code)
191            .message(statusLine.message)
192            .headers(readHeaders());
193
194        if (statusLine.code != HTTP_CONTINUE) {
195          state = STATE_OPEN_RESPONSE_BODY;
196          return responseBuilder;
197        }
198      }
199    } catch (EOFException e) {
200      // Provide more context if the server ends the stream before sending a response.
201      IOException exception = new IOException("unexpected end of stream on " + streamAllocation);
202      exception.initCause(e);
203      throw exception;
204    }
205  }
206
207  /** Reads headers or trailers. */
208  public Headers readHeaders() throws IOException {
209    Headers.Builder headers = new Headers.Builder();
210    // parse the result headers until the first blank line
211    for (String line; (line = source.readUtf8LineStrict()).length() != 0; ) {
212      Internal.instance.addLenient(headers, line);
213    }
214    return headers.build();
215  }
216
217  public Sink newChunkedSink() {
218    if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state);
219    state = STATE_WRITING_REQUEST_BODY;
220    return new ChunkedSink();
221  }
222
223  public Sink newFixedLengthSink(long contentLength) {
224    if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state);
225    state = STATE_WRITING_REQUEST_BODY;
226    return new FixedLengthSink(contentLength);
227  }
228
229  @Override public void writeRequestBody(RetryableSink requestBody) throws IOException {
230    if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state);
231    state = STATE_READ_RESPONSE_HEADERS;
232    requestBody.writeToSocket(sink);
233  }
234
235  public Source newFixedLengthSource(long length) throws IOException {
236    if (state != STATE_OPEN_RESPONSE_BODY) throw new IllegalStateException("state: " + state);
237    state = STATE_READING_RESPONSE_BODY;
238    return new FixedLengthSource(length);
239  }
240
241  public Source newChunkedSource(HttpEngine httpEngine) throws IOException {
242    if (state != STATE_OPEN_RESPONSE_BODY) throw new IllegalStateException("state: " + state);
243    state = STATE_READING_RESPONSE_BODY;
244    return new ChunkedSource(httpEngine);
245  }
246
247  public Source newUnknownLengthSource() throws IOException {
248    if (state != STATE_OPEN_RESPONSE_BODY) throw new IllegalStateException("state: " + state);
249    if (streamAllocation == null) throw new IllegalStateException("streamAllocation == null");
250    state = STATE_READING_RESPONSE_BODY;
251    streamAllocation.noNewStreams();
252    return new UnknownLengthSource();
253  }
254
255  /**
256   * Sets the delegate of {@code timeout} to {@link Timeout#NONE} and resets its underlying timeout
257   * to the default configuration. Use this to avoid unexpected sharing of timeouts between pooled
258   * connections.
259   */
260  private void detachTimeout(ForwardingTimeout timeout) {
261    Timeout oldDelegate = timeout.delegate();
262    timeout.setDelegate(Timeout.NONE);
263    oldDelegate.clearDeadline();
264    oldDelegate.clearTimeout();
265  }
266
267  /** An HTTP body with a fixed length known in advance. */
268  private final class FixedLengthSink implements Sink {
269    private final ForwardingTimeout timeout = new ForwardingTimeout(sink.timeout());
270    private boolean closed;
271    private long bytesRemaining;
272
273    private FixedLengthSink(long bytesRemaining) {
274      this.bytesRemaining = bytesRemaining;
275    }
276
277    @Override public Timeout timeout() {
278      return timeout;
279    }
280
281    @Override public void write(Buffer source, long byteCount) throws IOException {
282      if (closed) throw new IllegalStateException("closed");
283      checkOffsetAndCount(source.size(), 0, byteCount);
284      if (byteCount > bytesRemaining) {
285        throw new ProtocolException("expected " + bytesRemaining
286            + " bytes but received " + byteCount);
287      }
288      sink.write(source, byteCount);
289      bytesRemaining -= byteCount;
290    }
291
292    @Override public void flush() throws IOException {
293      if (closed) return; // Don't throw; this stream might have been closed on the caller's behalf.
294      sink.flush();
295    }
296
297    @Override public void close() throws IOException {
298      if (closed) return;
299      closed = true;
300      if (bytesRemaining > 0) throw new ProtocolException("unexpected end of stream");
301      detachTimeout(timeout);
302      state = STATE_READ_RESPONSE_HEADERS;
303    }
304  }
305
306  /**
307   * An HTTP body with alternating chunk sizes and chunk bodies. It is the
308   * caller's responsibility to buffer chunks; typically by using a buffered
309   * sink with this sink.
310   */
311  private final class ChunkedSink implements Sink {
312    private final ForwardingTimeout timeout = new ForwardingTimeout(sink.timeout());
313    private boolean closed;
314
315    @Override public Timeout timeout() {
316      return timeout;
317    }
318
319    @Override public void write(Buffer source, long byteCount) throws IOException {
320      if (closed) throw new IllegalStateException("closed");
321      if (byteCount == 0) return;
322
323      sink.writeHexadecimalUnsignedLong(byteCount);
324      sink.writeUtf8("\r\n");
325      sink.write(source, byteCount);
326      sink.writeUtf8("\r\n");
327    }
328
329    @Override public synchronized void flush() throws IOException {
330      if (closed) return; // Don't throw; this stream might have been closed on the caller's behalf.
331      sink.flush();
332    }
333
334    @Override public synchronized void close() throws IOException {
335      if (closed) return;
336      closed = true;
337      sink.writeUtf8("0\r\n\r\n");
338      detachTimeout(timeout);
339      state = STATE_READ_RESPONSE_HEADERS;
340    }
341  }
342
343  private abstract class AbstractSource implements Source {
344    protected final ForwardingTimeout timeout = new ForwardingTimeout(source.timeout());
345    protected boolean closed;
346
347    @Override public Timeout timeout() {
348      return timeout;
349    }
350
351    /**
352     * Closes the cache entry and makes the socket available for reuse. This
353     * should be invoked when the end of the body has been reached.
354     */
355    protected final void endOfInput() throws IOException {
356      if (state != STATE_READING_RESPONSE_BODY) throw new IllegalStateException("state: " + state);
357
358      detachTimeout(timeout);
359
360      state = STATE_CLOSED;
361      if (streamAllocation != null) {
362        streamAllocation.streamFinished(Http1xStream.this);
363      }
364    }
365
366    protected final void unexpectedEndOfInput() {
367      if (state == STATE_CLOSED) return;
368
369      state = STATE_CLOSED;
370      if (streamAllocation != null) {
371        streamAllocation.noNewStreams();
372        streamAllocation.streamFinished(Http1xStream.this);
373      }
374    }
375  }
376
377  /** An HTTP body with a fixed length specified in advance. */
378  private class FixedLengthSource extends AbstractSource {
379    private long bytesRemaining;
380
381    public FixedLengthSource(long length) throws IOException {
382      bytesRemaining = length;
383      if (bytesRemaining == 0) {
384        endOfInput();
385      }
386    }
387
388    @Override public long read(Buffer sink, long byteCount) throws IOException {
389      if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
390      if (closed) throw new IllegalStateException("closed");
391      if (bytesRemaining == 0) return -1;
392
393      long read = source.read(sink, Math.min(bytesRemaining, byteCount));
394      if (read == -1) {
395        unexpectedEndOfInput(); // The server didn't supply the promised content length.
396        throw new ProtocolException("unexpected end of stream");
397      }
398
399      bytesRemaining -= read;
400      if (bytesRemaining == 0) {
401        endOfInput();
402      }
403      return read;
404    }
405
406    @Override public void close() throws IOException {
407      if (closed) return;
408
409      if (bytesRemaining != 0
410          && !Util.discard(this, DISCARD_STREAM_TIMEOUT_MILLIS, MILLISECONDS)) {
411        unexpectedEndOfInput();
412      }
413
414      closed = true;
415    }
416  }
417
418  /** An HTTP body with alternating chunk sizes and chunk bodies. */
419  private class ChunkedSource extends AbstractSource {
420    private static final long NO_CHUNK_YET = -1L;
421    private long bytesRemainingInChunk = NO_CHUNK_YET;
422    private boolean hasMoreChunks = true;
423    private final HttpEngine httpEngine;
424
425    ChunkedSource(HttpEngine httpEngine) throws IOException {
426      this.httpEngine = httpEngine;
427    }
428
429    @Override public long read(Buffer sink, long byteCount) throws IOException {
430      if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
431      if (closed) throw new IllegalStateException("closed");
432      if (!hasMoreChunks) return -1;
433
434      if (bytesRemainingInChunk == 0 || bytesRemainingInChunk == NO_CHUNK_YET) {
435        readChunkSize();
436        if (!hasMoreChunks) return -1;
437      }
438
439      long read = source.read(sink, Math.min(byteCount, bytesRemainingInChunk));
440      if (read == -1) {
441        unexpectedEndOfInput(); // The server didn't supply the promised chunk length.
442        throw new ProtocolException("unexpected end of stream");
443      }
444      bytesRemainingInChunk -= read;
445      return read;
446    }
447
448    private void readChunkSize() throws IOException {
449      // Read the suffix of the previous chunk.
450      if (bytesRemainingInChunk != NO_CHUNK_YET) {
451        source.readUtf8LineStrict();
452      }
453      try {
454        bytesRemainingInChunk = source.readHexadecimalUnsignedLong();
455        String extensions = source.readUtf8LineStrict().trim();
456        if (bytesRemainingInChunk < 0 || (!extensions.isEmpty() && !extensions.startsWith(";"))) {
457          throw new ProtocolException("expected chunk size and optional extensions but was \""
458              + bytesRemainingInChunk + extensions + "\"");
459        }
460      } catch (NumberFormatException e) {
461        throw new ProtocolException(e.getMessage());
462      }
463      if (bytesRemainingInChunk == 0L) {
464        hasMoreChunks = false;
465        httpEngine.receiveHeaders(readHeaders());
466        endOfInput();
467      }
468    }
469
470    @Override public void close() throws IOException {
471      if (closed) return;
472      if (hasMoreChunks && !Util.discard(this, DISCARD_STREAM_TIMEOUT_MILLIS, MILLISECONDS)) {
473        unexpectedEndOfInput();
474      }
475      closed = true;
476    }
477  }
478
479  /** An HTTP message body terminated by the end of the underlying stream. */
480  private class UnknownLengthSource extends AbstractSource {
481    private boolean inputExhausted;
482
483    @Override public long read(Buffer sink, long byteCount)
484        throws IOException {
485      if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
486      if (closed) throw new IllegalStateException("closed");
487      if (inputExhausted) return -1;
488
489      long read = source.read(sink, byteCount);
490      if (read == -1) {
491        inputExhausted = true;
492        endOfInput();
493        return -1;
494      }
495      return read;
496    }
497
498    @Override public void close() throws IOException {
499      if (closed) return;
500      if (!inputExhausted) {
501        unexpectedEndOfInput();
502      }
503      closed = true;
504    }
505  }
506}
507