MockSpdyPeer.java revision 3c938a3f6b61ce5e2dba0d039b03fe73b89fd26c
1/*
2 * Copyright (C) 2011 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.squareup.okhttp.internal.spdy;
18
19import com.squareup.okhttp.internal.Util;
20import java.io.Closeable;
21import java.io.IOException;
22import java.io.InputStream;
23import java.io.OutputStream;
24import java.net.ServerSocket;
25import java.net.Socket;
26import java.util.ArrayList;
27import java.util.Iterator;
28import java.util.List;
29import java.util.concurrent.BlockingQueue;
30import java.util.concurrent.ExecutorService;
31import java.util.concurrent.Executors;
32import java.util.concurrent.LinkedBlockingQueue;
33import okio.BufferedSource;
34import okio.ByteString;
35import okio.OkBuffer;
36import okio.Okio;
37
38/** Replays prerecorded outgoing frames and records incoming frames. */
39public final class MockSpdyPeer implements Closeable {
40  private int frameCount = 0;
41  private boolean client = false;
42  private Variant variant = new Spdy3();
43  private final OkBuffer bytesOut = new OkBuffer();
44  private FrameWriter frameWriter = variant.newWriter(bytesOut, client);
45  private final List<OutFrame> outFrames = new ArrayList<OutFrame>();
46  private final BlockingQueue<InFrame> inFrames = new LinkedBlockingQueue<InFrame>();
47  private int port;
48  private final ExecutorService executor = Executors.newCachedThreadPool(
49      Util.threadFactory("MockSpdyPeer", false));
50  private ServerSocket serverSocket;
51  private Socket socket;
52
53  public void setVariantAndClient(Variant variant, boolean client) {
54    if (this.variant.getProtocol() == variant.getProtocol() && this.client == client) {
55      return;
56    }
57    this.client = client;
58    this.variant = variant;
59    this.frameWriter = variant.newWriter(bytesOut, client);
60  }
61
62  public void acceptFrame() {
63    frameCount++;
64  }
65
66  /** Count of frames sent or received. */
67  public int frameCount() {
68    return frameCount;
69  }
70
71  public FrameWriter sendFrame() {
72    outFrames.add(new OutFrame(frameCount++, bytesOut.size(), Integer.MAX_VALUE));
73    return frameWriter;
74  }
75
76  /**
77   * Sends a manually-constructed frame. This is useful to test frames that
78   * won't be generated naturally.
79   */
80  public void sendFrame(byte[] frame) throws IOException {
81    outFrames.add(new OutFrame(frameCount++, bytesOut.size(), Integer.MAX_VALUE));
82    bytesOut.write(frame);
83  }
84
85  /**
86   * Sends a frame, truncated to {@code truncateToLength} bytes. This is only
87   * useful for testing error handling as the truncated frame will be
88   * malformed.
89   */
90  public FrameWriter sendTruncatedFrame(int truncateToLength) {
91    outFrames.add(new OutFrame(frameCount++, bytesOut.size(), truncateToLength));
92    return frameWriter;
93  }
94
95  public InFrame takeFrame() throws InterruptedException {
96    return inFrames.take();
97  }
98
99  public void play() throws IOException {
100    if (serverSocket != null) throw new IllegalStateException();
101    serverSocket = new ServerSocket(0);
102    serverSocket.setReuseAddress(true);
103    port = serverSocket.getLocalPort();
104    executor.execute(new Runnable() {
105      @Override public void run() {
106        try {
107          readAndWriteFrames();
108        } catch (IOException e) {
109          Util.closeQuietly(MockSpdyPeer.this);
110          throw new RuntimeException(e);
111        }
112      }
113    });
114  }
115
116  private void readAndWriteFrames() throws IOException {
117    if (socket != null) throw new IllegalStateException();
118    socket = serverSocket.accept();
119    OutputStream out = socket.getOutputStream();
120    InputStream in = socket.getInputStream();
121    FrameReader reader = variant.newReader(Okio.buffer(Okio.source(in)), client);
122
123    Iterator<OutFrame> outFramesIterator = outFrames.iterator();
124    byte[] outBytes = bytesOut.readByteString(bytesOut.size()).toByteArray();
125    OutFrame nextOutFrame = null;
126
127    for (int i = 0; i < frameCount; i++) {
128      if (nextOutFrame == null && outFramesIterator.hasNext()) {
129        nextOutFrame = outFramesIterator.next();
130      }
131
132      if (nextOutFrame != null && nextOutFrame.sequence == i) {
133        long start = nextOutFrame.start;
134        int truncateToLength = nextOutFrame.truncateToLength;
135        long end;
136        if (outFramesIterator.hasNext()) {
137          nextOutFrame = outFramesIterator.next();
138          end = nextOutFrame.start;
139        } else {
140          end = outBytes.length;
141        }
142
143        // write a frame
144        int length = (int) Math.min(end - start, truncateToLength);
145        out.write(outBytes, (int) start, length);
146      } else {
147        // read a frame
148        InFrame inFrame = new InFrame(i, reader);
149        reader.nextFrame(inFrame);
150        inFrames.add(inFrame);
151      }
152    }
153    Util.closeQuietly(socket);
154  }
155
156  public Socket openSocket() throws IOException {
157    return new Socket("localhost", port);
158  }
159
160  @Override public synchronized void close() throws IOException {
161    executor.shutdown();
162    Socket socket = this.socket;
163    if (socket != null) {
164      Util.closeQuietly(socket);
165      this.socket = null;
166    }
167    ServerSocket serverSocket = this.serverSocket;
168    if (serverSocket != null) {
169      Util.closeQuietly(serverSocket);
170      this.serverSocket = null;
171    }
172  }
173
174  private static class OutFrame {
175    private final int sequence;
176    private final long start;
177    private final int truncateToLength;
178
179    private OutFrame(int sequence, long start, int truncateToLength) {
180      this.sequence = sequence;
181      this.start = start;
182      this.truncateToLength = truncateToLength;
183    }
184  }
185
186  public static class InFrame implements FrameReader.Handler {
187    public final int sequence;
188    public final FrameReader reader;
189    public int type = -1;
190    public boolean clearPrevious;
191    public boolean outFinished;
192    public boolean inFinished;
193    public int streamId;
194    public int associatedStreamId;
195    public int priority;
196    public ErrorCode errorCode;
197    public long windowSizeIncrement;
198    public List<Header> headerBlock;
199    public byte[] data;
200    public Settings settings;
201    public HeadersMode headersMode;
202    public boolean ack;
203    public int payload1;
204    public int payload2;
205
206    public InFrame(int sequence, FrameReader reader) {
207      this.sequence = sequence;
208      this.reader = reader;
209    }
210
211    @Override public void settings(boolean clearPrevious, Settings settings) {
212      if (this.type != -1) throw new IllegalStateException();
213      this.type = Spdy3.TYPE_SETTINGS;
214      this.clearPrevious = clearPrevious;
215      this.settings = settings;
216    }
217
218    @Override public void ackSettings() {
219      if (this.type != -1) throw new IllegalStateException();
220      this.type = Spdy3.TYPE_SETTINGS;
221      this.ack = true;
222    }
223
224    @Override public void headers(boolean outFinished, boolean inFinished, int streamId,
225        int associatedStreamId, int priority, List<Header> headerBlock,
226        HeadersMode headersMode) {
227      if (this.type != -1) throw new IllegalStateException();
228      this.type = Spdy3.TYPE_HEADERS;
229      this.outFinished = outFinished;
230      this.inFinished = inFinished;
231      this.streamId = streamId;
232      this.associatedStreamId = associatedStreamId;
233      this.priority = priority;
234      this.headerBlock = headerBlock;
235      this.headersMode = headersMode;
236    }
237
238    @Override public void data(boolean inFinished, int streamId, BufferedSource source, int length)
239        throws IOException {
240      if (this.type != -1) throw new IllegalStateException();
241      this.type = Spdy3.TYPE_DATA;
242      this.inFinished = inFinished;
243      this.streamId = streamId;
244      this.data = source.readByteString(length).toByteArray();
245    }
246
247    @Override public void rstStream(int streamId, ErrorCode errorCode) {
248      if (this.type != -1) throw new IllegalStateException();
249      this.type = Spdy3.TYPE_RST_STREAM;
250      this.streamId = streamId;
251      this.errorCode = errorCode;
252    }
253
254    @Override public void ping(boolean ack, int payload1, int payload2) {
255      if (this.type != -1) throw new IllegalStateException();
256      this.type = Spdy3.TYPE_PING;
257      this.ack = ack;
258      this.payload1 = payload1;
259      this.payload2 = payload2;
260    }
261
262    @Override public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
263      if (this.type != -1) throw new IllegalStateException();
264      this.type = Spdy3.TYPE_GOAWAY;
265      this.streamId = lastGoodStreamId;
266      this.errorCode = errorCode;
267      this.data = debugData.toByteArray();
268    }
269
270    @Override public void windowUpdate(int streamId, long windowSizeIncrement) {
271      if (this.type != -1) throw new IllegalStateException();
272      this.type = Spdy3.TYPE_WINDOW_UPDATE;
273      this.streamId = streamId;
274      this.windowSizeIncrement = windowSizeIncrement;
275    }
276
277    @Override public void priority(int streamId, int priority) {
278      throw new UnsupportedOperationException();
279    }
280
281    @Override
282    public void pushPromise(int streamId, int associatedStreamId, List<Header> headerBlock) {
283      this.type = Http20Draft09.TYPE_PUSH_PROMISE;
284      this.streamId = streamId;
285      this.associatedStreamId = associatedStreamId;
286      this.headerBlock = headerBlock;
287    }
288  }
289}
290