1// 2// ======================================================================== 3// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. 4// ------------------------------------------------------------------------ 5// All rights reserved. This program and the accompanying materials 6// are made available under the terms of the Eclipse Public License v1.0 7// and Apache License v2.0 which accompanies this distribution. 8// 9// The Eclipse Public License is available at 10// http://www.eclipse.org/legal/epl-v10.html 11// 12// The Apache License v2.0 is available at 13// http://www.opensource.org/licenses/apache2.0.php 14// 15// You may elect to redistribute this code under either of these licenses. 16// ======================================================================== 17// 18 19package org.eclipse.jetty.client; 20 21import java.io.IOException; 22 23import org.eclipse.jetty.http.AbstractGenerator; 24import org.eclipse.jetty.http.HttpStatus; 25import org.eclipse.jetty.io.AsyncEndPoint; 26import org.eclipse.jetty.io.Buffer; 27import org.eclipse.jetty.io.Buffers; 28import org.eclipse.jetty.io.Connection; 29import org.eclipse.jetty.io.EndPoint; 30import org.eclipse.jetty.io.nio.AsyncConnection; 31import org.eclipse.jetty.util.log.Log; 32import org.eclipse.jetty.util.log.Logger; 33 34 35 36/* ------------------------------------------------------------ */ 37/** Asynchronous Client HTTP Connection 38 */ 39public class AsyncHttpConnection extends AbstractHttpConnection implements AsyncConnection 40{ 41 private static final Logger LOG = Log.getLogger(AsyncHttpConnection.class); 42 43 private boolean _requestComplete; 44 private Buffer _requestContentChunk; 45 private final AsyncEndPoint _asyncEndp; 46 47 AsyncHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp) 48 { 49 super(requestBuffers,responseBuffers,endp); 50 _asyncEndp=(AsyncEndPoint)endp; 51 } 52 53 protected void reset() throws IOException 54 { 55 _requestComplete = false; 56 super.reset(); 57 } 58 59 public Connection handle() throws IOException 60 { 61 Connection connection = this; 62 boolean progress=true; 63 64 try 65 { 66 boolean failed = false; 67 68 // While we are making progress and have not changed connection 69 while (progress && connection==this) 70 { 71 LOG.debug("while open={} more={} progress={}",_endp.isOpen(),_parser.isMoreInBuffer(),progress); 72 73 progress=false; 74 HttpExchange exchange=_exchange; 75 76 LOG.debug("exchange {} on {}",exchange,this); 77 78 try 79 { 80 // Should we commit the request? 81 if (!_generator.isCommitted() && exchange!=null && exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT) 82 { 83 LOG.debug("commit {}",exchange); 84 progress=true; 85 commitRequest(); 86 } 87 88 // Generate output 89 if (_generator.isCommitted() && !_generator.isComplete()) 90 { 91 if (_generator.flushBuffer()>0) 92 { 93 LOG.debug("flushed"); 94 progress=true; 95 } 96 97 // Is there more content to send or should we complete the generator 98 if (_generator.isState(AbstractGenerator.STATE_CONTENT)) 99 { 100 // Look for more content to send. 101 if (_requestContentChunk==null) 102 _requestContentChunk = exchange.getRequestContentChunk(null); 103 104 if (_requestContentChunk==null) 105 { 106 LOG.debug("complete {}",exchange); 107 progress=true; 108 _generator.complete(); 109 if (exchange.getStatus() < HttpExchange.STATUS_WAITING_FOR_RESPONSE) 110 exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE); 111 } 112 else if (_generator.isEmpty()) 113 { 114 LOG.debug("addChunk"); 115 progress=true; 116 Buffer chunk=_requestContentChunk; 117 _requestContentChunk=exchange.getRequestContentChunk(null); 118 _generator.addContent(chunk,_requestContentChunk==null); 119 if (_requestContentChunk==null) 120 exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE); 121 } 122 } 123 } 124 125 // Signal request completion 126 if (_generator.isComplete() && !_requestComplete) 127 { 128 LOG.debug("requestComplete {}",exchange); 129 progress=true; 130 _requestComplete = true; 131 exchange.getEventListener().onRequestComplete(); 132 } 133 134 // Read any input that is available 135 if (!_parser.isComplete() && _parser.parseAvailable()) 136 { 137 LOG.debug("parsed {}",exchange); 138 progress=true; 139 } 140 141 // Flush output 142 _endp.flush(); 143 144 // Has any IO been done by the endpoint itself since last loop 145 if (_asyncEndp.hasProgressed()) 146 { 147 LOG.debug("hasProgressed {}",exchange); 148 progress=true; 149 } 150 } 151 catch (Throwable e) 152 { 153 LOG.debug("Failure on " + _exchange, e); 154 155 failed = true; 156 157 synchronized (this) 158 { 159 if (exchange != null) 160 { 161 // Cancelling the exchange causes an exception as we close the connection, 162 // but we don't report it as it is normal cancelling operation 163 if (exchange.getStatus() != HttpExchange.STATUS_CANCELLING && 164 exchange.getStatus() != HttpExchange.STATUS_CANCELLED && 165 !exchange.isDone()) 166 { 167 if (exchange.setStatus(HttpExchange.STATUS_EXCEPTED)) 168 exchange.getEventListener().onException(e); 169 } 170 } 171 else 172 { 173 if (e instanceof IOException) 174 throw (IOException)e; 175 if (e instanceof Error) 176 throw (Error)e; 177 if (e instanceof RuntimeException) 178 throw (RuntimeException)e; 179 throw new RuntimeException(e); 180 } 181 } 182 } 183 finally 184 { 185 LOG.debug("finally {} on {} progress={} {}",exchange,this,progress,_endp); 186 187 boolean complete = failed || _generator.isComplete() && _parser.isComplete(); 188 189 if (complete) 190 { 191 boolean persistent = !failed && _parser.isPersistent() && _generator.isPersistent(); 192 _generator.setPersistent(persistent); 193 reset(); 194 if (persistent) 195 _endp.setMaxIdleTime((int)_destination.getHttpClient().getIdleTimeout()); 196 197 synchronized (this) 198 { 199 exchange=_exchange; 200 _exchange = null; 201 202 // Cancel the exchange 203 if (exchange!=null) 204 { 205 exchange.cancelTimeout(_destination.getHttpClient()); 206 207 // TODO should we check the exchange is done? 208 } 209 210 // handle switched protocols 211 if (_status==HttpStatus.SWITCHING_PROTOCOLS_101) 212 { 213 Connection switched=exchange.onSwitchProtocol(_endp); 214 if (switched!=null) 215 { 216 // switched protocol! 217 if (_pipeline!=null) 218 { 219 _destination.send(_pipeline); 220 } 221 _pipeline = null; 222 223 connection=switched; 224 } 225 } 226 227 // handle pipelined requests 228 if (_pipeline!=null) 229 { 230 if (!persistent || connection!=this) 231 _destination.send(_pipeline); 232 else 233 _exchange=_pipeline; 234 _pipeline=null; 235 } 236 237 if (_exchange==null && !isReserved()) // TODO how do we return switched connections? 238 _destination.returnConnection(this, !persistent); 239 } 240 241 } 242 } 243 } 244 } 245 finally 246 { 247 _parser.returnBuffers(); 248 _generator.returnBuffers(); 249 LOG.debug("unhandle {} on {}",_exchange,_endp); 250 } 251 252 return connection; 253 } 254 255 public void onInputShutdown() throws IOException 256 { 257 if (_generator.isIdle()) 258 _endp.shutdownOutput(); 259 } 260 261 @Override 262 public boolean send(HttpExchange ex) throws IOException 263 { 264 boolean sent=super.send(ex); 265 if (sent) 266 _asyncEndp.asyncDispatch(); 267 return sent; 268 } 269} 270