1//
2//  ========================================================================
3//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4//  ------------------------------------------------------------------------
5//  All rights reserved. This program and the accompanying materials
6//  are made available under the terms of the Eclipse Public License v1.0
7//  and Apache License v2.0 which accompanies this distribution.
8//
9//      The Eclipse Public License is available at
10//      http://www.eclipse.org/legal/epl-v10.html
11//
12//      The Apache License v2.0 is available at
13//      http://www.opensource.org/licenses/apache2.0.php
14//
15//  You may elect to redistribute this code under either of these licenses.
16//  ========================================================================
17//
18
19package org.eclipse.jetty.client;
20
21import java.io.IOException;
22
23import org.eclipse.jetty.http.AbstractGenerator;
24import org.eclipse.jetty.http.HttpStatus;
25import org.eclipse.jetty.io.AsyncEndPoint;
26import org.eclipse.jetty.io.Buffer;
27import org.eclipse.jetty.io.Buffers;
28import org.eclipse.jetty.io.Connection;
29import org.eclipse.jetty.io.EndPoint;
30import org.eclipse.jetty.io.nio.AsyncConnection;
31import org.eclipse.jetty.util.log.Log;
32import org.eclipse.jetty.util.log.Logger;
33
34
35
36/* ------------------------------------------------------------ */
37/** Asynchronous Client HTTP Connection
38 */
39public class AsyncHttpConnection extends AbstractHttpConnection implements AsyncConnection
40{
41    private static final Logger LOG = Log.getLogger(AsyncHttpConnection.class);
42
43    private boolean _requestComplete;
44    private Buffer _requestContentChunk;
45    private final AsyncEndPoint _asyncEndp;
46
47    AsyncHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
48    {
49        super(requestBuffers,responseBuffers,endp);
50        _asyncEndp=(AsyncEndPoint)endp;
51    }
52
53    protected void reset() throws IOException
54    {
55        _requestComplete = false;
56        super.reset();
57    }
58
59    public Connection handle() throws IOException
60    {
61        Connection connection = this;
62        boolean progress=true;
63
64        try
65        {
66            boolean failed = false;
67
68            // While we are making progress and have not changed connection
69            while (progress && connection==this)
70            {
71                LOG.debug("while open={} more={} progress={}",_endp.isOpen(),_parser.isMoreInBuffer(),progress);
72
73                progress=false;
74                HttpExchange exchange=_exchange;
75
76                LOG.debug("exchange {} on {}",exchange,this);
77
78                try
79                {
80                    // Should we commit the request?
81                    if (!_generator.isCommitted() && exchange!=null && exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
82                    {
83                        LOG.debug("commit {}",exchange);
84                        progress=true;
85                        commitRequest();
86                    }
87
88                    // Generate output
89                    if (_generator.isCommitted() && !_generator.isComplete())
90                    {
91                        if (_generator.flushBuffer()>0)
92                        {
93                            LOG.debug("flushed");
94                            progress=true;
95                        }
96
97                        // Is there more content to send or should we complete the generator
98                        if (_generator.isState(AbstractGenerator.STATE_CONTENT))
99                        {
100                            // Look for more content to send.
101                            if (_requestContentChunk==null)
102                                _requestContentChunk = exchange.getRequestContentChunk(null);
103
104                            if (_requestContentChunk==null)
105                            {
106                                LOG.debug("complete {}",exchange);
107                                progress=true;
108                                _generator.complete();
109                                if (exchange.getStatus() < HttpExchange.STATUS_WAITING_FOR_RESPONSE)
110                                    exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
111                            }
112                            else if (_generator.isEmpty())
113                            {
114                                LOG.debug("addChunk");
115                                progress=true;
116                                Buffer chunk=_requestContentChunk;
117                                _requestContentChunk=exchange.getRequestContentChunk(null);
118                                _generator.addContent(chunk,_requestContentChunk==null);
119                                if (_requestContentChunk==null)
120                                    exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
121                            }
122                        }
123                    }
124
125                    // Signal request completion
126                    if (_generator.isComplete() && !_requestComplete)
127                    {
128                        LOG.debug("requestComplete {}",exchange);
129                        progress=true;
130                        _requestComplete = true;
131                        exchange.getEventListener().onRequestComplete();
132                    }
133
134                    // Read any input that is available
135                    if (!_parser.isComplete() && _parser.parseAvailable())
136                    {
137                        LOG.debug("parsed {}",exchange);
138                        progress=true;
139                    }
140
141                    // Flush output
142                    _endp.flush();
143
144                    // Has any IO been done by the endpoint itself since last loop
145                    if (_asyncEndp.hasProgressed())
146                    {
147                        LOG.debug("hasProgressed {}",exchange);
148                        progress=true;
149                    }
150                }
151                catch (Throwable e)
152                {
153                    LOG.debug("Failure on " + _exchange, e);
154
155                    failed = true;
156
157                    synchronized (this)
158                    {
159                        if (exchange != null)
160                        {
161                            // Cancelling the exchange causes an exception as we close the connection,
162                            // but we don't report it as it is normal cancelling operation
163                            if (exchange.getStatus() != HttpExchange.STATUS_CANCELLING &&
164                                    exchange.getStatus() != HttpExchange.STATUS_CANCELLED &&
165                                    !exchange.isDone())
166                            {
167                                if (exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
168                                    exchange.getEventListener().onException(e);
169                            }
170                        }
171                        else
172                        {
173                            if (e instanceof IOException)
174                                throw (IOException)e;
175                            if (e instanceof Error)
176                                throw (Error)e;
177                            if (e instanceof RuntimeException)
178                                throw (RuntimeException)e;
179                            throw new RuntimeException(e);
180                        }
181                    }
182                }
183                finally
184                {
185                    LOG.debug("finally {} on {} progress={} {}",exchange,this,progress,_endp);
186
187                    boolean complete = failed || _generator.isComplete() && _parser.isComplete();
188
189                    if (complete)
190                    {
191                        boolean persistent = !failed && _parser.isPersistent() && _generator.isPersistent();
192                        _generator.setPersistent(persistent);
193                        reset();
194                        if (persistent)
195                            _endp.setMaxIdleTime((int)_destination.getHttpClient().getIdleTimeout());
196
197                        synchronized (this)
198                        {
199                            exchange=_exchange;
200                            _exchange = null;
201
202                            // Cancel the exchange
203                            if (exchange!=null)
204                            {
205                                exchange.cancelTimeout(_destination.getHttpClient());
206
207                                // TODO should we check the exchange is done?
208                            }
209
210                            // handle switched protocols
211                            if (_status==HttpStatus.SWITCHING_PROTOCOLS_101)
212                            {
213                                Connection switched=exchange.onSwitchProtocol(_endp);
214                                if (switched!=null)
215                                {
216                                    // switched protocol!
217                                    if (_pipeline!=null)
218                                    {
219                                        _destination.send(_pipeline);
220                                    }
221                                    _pipeline = null;
222
223                                    connection=switched;
224                                }
225                            }
226
227                            // handle pipelined requests
228                            if (_pipeline!=null)
229                            {
230                                if (!persistent || connection!=this)
231                                    _destination.send(_pipeline);
232                                else
233                                    _exchange=_pipeline;
234                                _pipeline=null;
235                            }
236
237                            if (_exchange==null && !isReserved())  // TODO how do we return switched connections?
238                                _destination.returnConnection(this, !persistent);
239                        }
240
241                    }
242                }
243            }
244        }
245        finally
246        {
247            _parser.returnBuffers();
248            _generator.returnBuffers();
249            LOG.debug("unhandle {} on {}",_exchange,_endp);
250        }
251
252        return connection;
253    }
254
255    public void onInputShutdown() throws IOException
256    {
257        if (_generator.isIdle())
258            _endp.shutdownOutput();
259    }
260
261    @Override
262    public boolean send(HttpExchange ex) throws IOException
263    {
264        boolean sent=super.send(ex);
265        if (sent)
266            _asyncEndp.asyncDispatch();
267        return sent;
268    }
269}
270