1/*
2 * Copyright (C) 2014 Square, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package com.squareup.okhttp.benchmarks;
17
18import com.squareup.okhttp.internal.SslContextBuilder;
19import com.squareup.okhttp.internal.Util;
20import io.netty.bootstrap.Bootstrap;
21import io.netty.buffer.ByteBuf;
22import io.netty.buffer.PooledByteBufAllocator;
23import io.netty.channel.Channel;
24import io.netty.channel.ChannelHandlerContext;
25import io.netty.channel.ChannelInitializer;
26import io.netty.channel.ChannelOption;
27import io.netty.channel.ChannelPipeline;
28import io.netty.channel.SimpleChannelInboundHandler;
29import io.netty.channel.nio.NioEventLoopGroup;
30import io.netty.channel.socket.SocketChannel;
31import io.netty.channel.socket.nio.NioSocketChannel;
32import io.netty.handler.codec.http.DefaultFullHttpRequest;
33import io.netty.handler.codec.http.HttpClientCodec;
34import io.netty.handler.codec.http.HttpContent;
35import io.netty.handler.codec.http.HttpContentDecompressor;
36import io.netty.handler.codec.http.HttpHeaders;
37import io.netty.handler.codec.http.HttpMethod;
38import io.netty.handler.codec.http.HttpObject;
39import io.netty.handler.codec.http.HttpRequest;
40import io.netty.handler.codec.http.HttpResponse;
41import io.netty.handler.codec.http.HttpVersion;
42import io.netty.handler.codec.http.LastHttpContent;
43import io.netty.handler.ssl.SslHandler;
44import java.net.URL;
45import java.util.ArrayDeque;
46import java.util.Deque;
47import java.util.concurrent.TimeUnit;
48import javax.net.ssl.SSLContext;
49import javax.net.ssl.SSLEngine;
50
51/** Netty isn't an HTTP client, but it's almost one. */
52class NettyHttpClient implements HttpClient {
53  private static final boolean VERBOSE = false;
54
55  // Guarded by this. Real apps need more capable connection management.
56  private final Deque<HttpChannel> freeChannels = new ArrayDeque<HttpChannel>();
57  private final Deque<URL> backlog = new ArrayDeque<URL>();
58
59  private int totalChannels = 0;
60  private int concurrencyLevel;
61  private int targetBacklog;
62  private Bootstrap bootstrap;
63
64  @Override public void prepare(final Benchmark benchmark) {
65    this.concurrencyLevel = benchmark.concurrencyLevel;
66    this.targetBacklog = benchmark.targetBacklog;
67
68    ChannelInitializer<SocketChannel> channelInitializer = new ChannelInitializer<SocketChannel>() {
69      @Override public void initChannel(SocketChannel channel) throws Exception {
70        ChannelPipeline pipeline = channel.pipeline();
71
72        if (benchmark.tls) {
73          SSLContext sslContext = SslContextBuilder.localhost();
74          SSLEngine engine = sslContext.createSSLEngine();
75          engine.setUseClientMode(true);
76          pipeline.addLast("ssl", new SslHandler(engine));
77        }
78
79        pipeline.addLast("codec", new HttpClientCodec());
80        pipeline.addLast("inflater", new HttpContentDecompressor());
81        pipeline.addLast("handler", new HttpChannel(channel));
82      }
83    };
84
85    bootstrap = new Bootstrap();
86    bootstrap.group(new NioEventLoopGroup(concurrencyLevel))
87        .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
88        .channel(NioSocketChannel.class)
89        .handler(channelInitializer);
90  }
91
92  @Override public void enqueue(URL url) throws Exception {
93    HttpChannel httpChannel = null;
94    synchronized (this) {
95      if (!freeChannels.isEmpty()) {
96        httpChannel = freeChannels.pop();
97      } else if (totalChannels < concurrencyLevel) {
98        totalChannels++; // Create a new channel. (outside of the synchronized block).
99      } else {
100        backlog.add(url); // Enqueue this for later, to be picked up when another request completes.
101        return;
102      }
103    }
104    if (httpChannel == null) {
105      Channel channel = bootstrap.connect(url.getHost(), Util.getEffectivePort(url))
106          .sync().channel();
107      httpChannel = (HttpChannel) channel.pipeline().last();
108    }
109    httpChannel.sendRequest(url);
110  }
111
112  @Override public synchronized boolean acceptingJobs() {
113    return backlog.size() < targetBacklog || hasFreeChannels();
114  }
115
116  private boolean hasFreeChannels() {
117    int activeChannels = totalChannels - freeChannels.size();
118    return activeChannels < concurrencyLevel;
119  }
120
121  private void release(HttpChannel httpChannel) {
122    URL url;
123    synchronized (this) {
124      url = backlog.pop();
125      if (url == null) {
126        // There were no URLs in the backlog. Pool this channel for later.
127        freeChannels.push(httpChannel);
128        return;
129      }
130    }
131
132    // We removed a URL from the backlog. Schedule it right away.
133    httpChannel.sendRequest(url);
134  }
135
136  class HttpChannel extends SimpleChannelInboundHandler<HttpObject> {
137    private final SocketChannel channel;
138    byte[] buffer = new byte[1024];
139    int total;
140    long start;
141
142    public HttpChannel(SocketChannel channel) {
143      this.channel = channel;
144    }
145
146    private void sendRequest(URL url) {
147      start = System.nanoTime();
148      total = 0;
149      HttpRequest request = new DefaultFullHttpRequest(
150          HttpVersion.HTTP_1_1, HttpMethod.GET, url.getPath());
151      request.headers().set(HttpHeaders.Names.HOST, url.getHost());
152      request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
153      channel.writeAndFlush(request);
154    }
155
156    @Override protected void channelRead0(
157        ChannelHandlerContext context, HttpObject message) throws Exception {
158      if (message instanceof HttpResponse) {
159        receive((HttpResponse) message);
160      }
161      if (message instanceof HttpContent) {
162        receive((HttpContent) message);
163        if (message instanceof LastHttpContent) {
164          release(this);
165        }
166      }
167    }
168
169    @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
170      super.channelInactive(ctx);
171    }
172
173    void receive(HttpResponse response) {
174      // Don't do anything with headers.
175    }
176
177    void receive(HttpContent content) {
178      // Consume the response body.
179      ByteBuf byteBuf = content.content();
180      for (int toRead; (toRead = byteBuf.readableBytes()) > 0; ) {
181        byteBuf.readBytes(buffer, 0, Math.min(buffer.length, toRead));
182        total += toRead;
183      }
184
185      if (VERBOSE && content instanceof LastHttpContent) {
186        long finish = System.nanoTime();
187        System.out.println(String.format("Transferred % 8d bytes in %4d ms",
188            total, TimeUnit.NANOSECONDS.toMillis(finish - start)));
189      }
190    }
191
192    @Override public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
193      System.out.println("Failed: " + cause);
194    }
195  }
196}
197