1/* 2 * Copyright (C) 2014 Square, Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16package com.squareup.okhttp.benchmarks; 17 18import com.squareup.okhttp.internal.SslContextBuilder; 19import com.squareup.okhttp.internal.Util; 20import io.netty.bootstrap.Bootstrap; 21import io.netty.buffer.ByteBuf; 22import io.netty.buffer.PooledByteBufAllocator; 23import io.netty.channel.Channel; 24import io.netty.channel.ChannelHandlerContext; 25import io.netty.channel.ChannelInitializer; 26import io.netty.channel.ChannelOption; 27import io.netty.channel.ChannelPipeline; 28import io.netty.channel.SimpleChannelInboundHandler; 29import io.netty.channel.nio.NioEventLoopGroup; 30import io.netty.channel.socket.SocketChannel; 31import io.netty.channel.socket.nio.NioSocketChannel; 32import io.netty.handler.codec.http.DefaultFullHttpRequest; 33import io.netty.handler.codec.http.HttpClientCodec; 34import io.netty.handler.codec.http.HttpContent; 35import io.netty.handler.codec.http.HttpContentDecompressor; 36import io.netty.handler.codec.http.HttpHeaders; 37import io.netty.handler.codec.http.HttpMethod; 38import io.netty.handler.codec.http.HttpObject; 39import io.netty.handler.codec.http.HttpRequest; 40import io.netty.handler.codec.http.HttpResponse; 41import io.netty.handler.codec.http.HttpVersion; 42import io.netty.handler.codec.http.LastHttpContent; 43import io.netty.handler.ssl.SslHandler; 44import java.net.URL; 45import java.util.ArrayDeque; 46import java.util.Deque; 47import java.util.concurrent.TimeUnit; 48import javax.net.ssl.SSLContext; 49import javax.net.ssl.SSLEngine; 50 51/** Netty isn't an HTTP client, but it's almost one. */ 52class NettyHttpClient implements HttpClient { 53 private static final boolean VERBOSE = false; 54 55 // Guarded by this. Real apps need more capable connection management. 56 private final Deque<HttpChannel> freeChannels = new ArrayDeque<HttpChannel>(); 57 private final Deque<URL> backlog = new ArrayDeque<URL>(); 58 59 private int totalChannels = 0; 60 private int concurrencyLevel; 61 private int targetBacklog; 62 private Bootstrap bootstrap; 63 64 @Override public void prepare(final Benchmark benchmark) { 65 this.concurrencyLevel = benchmark.concurrencyLevel; 66 this.targetBacklog = benchmark.targetBacklog; 67 68 ChannelInitializer<SocketChannel> channelInitializer = new ChannelInitializer<SocketChannel>() { 69 @Override public void initChannel(SocketChannel channel) throws Exception { 70 ChannelPipeline pipeline = channel.pipeline(); 71 72 if (benchmark.tls) { 73 SSLContext sslContext = SslContextBuilder.localhost(); 74 SSLEngine engine = sslContext.createSSLEngine(); 75 engine.setUseClientMode(true); 76 pipeline.addLast("ssl", new SslHandler(engine)); 77 } 78 79 pipeline.addLast("codec", new HttpClientCodec()); 80 pipeline.addLast("inflater", new HttpContentDecompressor()); 81 pipeline.addLast("handler", new HttpChannel(channel)); 82 } 83 }; 84 85 bootstrap = new Bootstrap(); 86 bootstrap.group(new NioEventLoopGroup(concurrencyLevel)) 87 .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) 88 .channel(NioSocketChannel.class) 89 .handler(channelInitializer); 90 } 91 92 @Override public void enqueue(URL url) throws Exception { 93 HttpChannel httpChannel = null; 94 synchronized (this) { 95 if (!freeChannels.isEmpty()) { 96 httpChannel = freeChannels.pop(); 97 } else if (totalChannels < concurrencyLevel) { 98 totalChannels++; // Create a new channel. (outside of the synchronized block). 99 } else { 100 backlog.add(url); // Enqueue this for later, to be picked up when another request completes. 101 return; 102 } 103 } 104 if (httpChannel == null) { 105 Channel channel = bootstrap.connect(url.getHost(), Util.getEffectivePort(url)) 106 .sync().channel(); 107 httpChannel = (HttpChannel) channel.pipeline().last(); 108 } 109 httpChannel.sendRequest(url); 110 } 111 112 @Override public synchronized boolean acceptingJobs() { 113 return backlog.size() < targetBacklog || hasFreeChannels(); 114 } 115 116 private boolean hasFreeChannels() { 117 int activeChannels = totalChannels - freeChannels.size(); 118 return activeChannels < concurrencyLevel; 119 } 120 121 private void release(HttpChannel httpChannel) { 122 URL url; 123 synchronized (this) { 124 url = backlog.pop(); 125 if (url == null) { 126 // There were no URLs in the backlog. Pool this channel for later. 127 freeChannels.push(httpChannel); 128 return; 129 } 130 } 131 132 // We removed a URL from the backlog. Schedule it right away. 133 httpChannel.sendRequest(url); 134 } 135 136 class HttpChannel extends SimpleChannelInboundHandler<HttpObject> { 137 private final SocketChannel channel; 138 byte[] buffer = new byte[1024]; 139 int total; 140 long start; 141 142 public HttpChannel(SocketChannel channel) { 143 this.channel = channel; 144 } 145 146 private void sendRequest(URL url) { 147 start = System.nanoTime(); 148 total = 0; 149 HttpRequest request = new DefaultFullHttpRequest( 150 HttpVersion.HTTP_1_1, HttpMethod.GET, url.getPath()); 151 request.headers().set(HttpHeaders.Names.HOST, url.getHost()); 152 request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP); 153 channel.writeAndFlush(request); 154 } 155 156 @Override protected void channelRead0( 157 ChannelHandlerContext context, HttpObject message) throws Exception { 158 if (message instanceof HttpResponse) { 159 receive((HttpResponse) message); 160 } 161 if (message instanceof HttpContent) { 162 receive((HttpContent) message); 163 if (message instanceof LastHttpContent) { 164 release(this); 165 } 166 } 167 } 168 169 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { 170 super.channelInactive(ctx); 171 } 172 173 void receive(HttpResponse response) { 174 // Don't do anything with headers. 175 } 176 177 void receive(HttpContent content) { 178 // Consume the response body. 179 ByteBuf byteBuf = content.content(); 180 for (int toRead; (toRead = byteBuf.readableBytes()) > 0; ) { 181 byteBuf.readBytes(buffer, 0, Math.min(buffer.length, toRead)); 182 total += toRead; 183 } 184 185 if (VERBOSE && content instanceof LastHttpContent) { 186 long finish = System.nanoTime(); 187 System.out.println(String.format("Transferred % 8d bytes in %4d ms", 188 total, TimeUnit.NANOSECONDS.toMillis(finish - start))); 189 } 190 } 191 192 @Override public void exceptionCaught(ChannelHandlerContext context, Throwable cause) { 193 System.out.println("Failed: " + cause); 194 } 195 } 196} 197