1//
2//  ========================================================================
3//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4//  ------------------------------------------------------------------------
5//  All rights reserved. This program and the accompanying materials
6//  are made available under the terms of the Eclipse Public License v1.0
7//  and Apache License v2.0 which accompanies this distribution.
8//
9//      The Eclipse Public License is available at
10//      http://www.eclipse.org/legal/epl-v10.html
11//
12//      The Apache License v2.0 is available at
13//      http://www.opensource.org/licenses/apache2.0.php
14//
15//  You may elect to redistribute this code under either of these licenses.
16//  ========================================================================
17//
18
19package org.eclipse.jetty.client;
20
21import java.io.IOException;
22import java.io.InterruptedIOException;
23
24import org.eclipse.jetty.http.AbstractGenerator;
25import org.eclipse.jetty.http.HttpStatus;
26import org.eclipse.jetty.io.Buffer;
27import org.eclipse.jetty.io.Buffers;
28import org.eclipse.jetty.io.Connection;
29import org.eclipse.jetty.io.EndPoint;
30import org.eclipse.jetty.util.log.Log;
31import org.eclipse.jetty.util.log.Logger;
32
33
34/* ------------------------------------------------------------ */
35/** Blocking HTTP Connection
36 */
37public class BlockingHttpConnection extends AbstractHttpConnection
38{
39    private static final Logger LOG = Log.getLogger(BlockingHttpConnection.class);
40
41    private boolean _requestComplete;
42    private Buffer _requestContentChunk;
43    private boolean _expired=false;
44
45    BlockingHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endPoint)
46    {
47        super(requestBuffers, responseBuffers, endPoint);
48    }
49
50    protected void reset() throws IOException
51    {
52        _requestComplete = false;
53        _expired = false;
54        super.reset();
55    }
56
57
58    @Override
59    protected void exchangeExpired(HttpExchange exchange)
60    {
61        synchronized (this)
62        {
63           super.exchangeExpired(exchange);
64           _expired = true;
65           this.notifyAll();
66        }
67    }
68
69
70
71    @Override
72    public void onIdleExpired(long idleForMs)
73    {
74        try
75        {
76            LOG.debug("onIdleExpired {}ms {} {}",idleForMs,this,_endp);
77            _expired = true;
78            _endp.close();
79        }
80        catch(IOException e)
81        {
82            LOG.ignore(e);
83
84            try
85            {
86                _endp.close();
87            }
88            catch(IOException e2)
89            {
90                LOG.ignore(e2);
91            }
92        }
93
94        synchronized(this)
95        {
96            this.notifyAll();
97        }
98    }
99
100    @Override
101    public Connection handle() throws IOException
102    {
103        Connection connection = this;
104
105        try
106        {
107            boolean failed = false;
108
109
110            // While we are making progress and have not changed connection
111            while (_endp.isOpen() && connection==this)
112            {
113                LOG.debug("open={} more={}",_endp.isOpen(),_parser.isMoreInBuffer());
114
115                HttpExchange exchange;
116                synchronized (this)
117                {
118                    exchange=_exchange;
119                    while (exchange == null)
120                    {
121                        try
122                        {
123                            this.wait();
124                            exchange=_exchange;
125                            if (_expired)
126                            {
127                                failed = true;
128                                throw new InterruptedException();
129                            }
130
131                        }
132                        catch (InterruptedException e)
133                        {
134                            throw new InterruptedIOException();
135                        }
136                    }
137                }
138                LOG.debug("exchange {}",exchange);
139
140                try
141                {
142                    // Should we commit the request?
143                    if (!_generator.isCommitted() && exchange!=null && exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
144                    {
145                        LOG.debug("commit");
146                        commitRequest();
147                    }
148
149                    // Generate output
150                    while (_generator.isCommitted() && !_generator.isComplete())
151                    {
152                        if (_generator.flushBuffer()>0)
153                        {
154                            LOG.debug("flushed");
155                        }
156
157                        // Is there more content to send or should we complete the generator
158                        if (_generator.isState(AbstractGenerator.STATE_CONTENT))
159                        {
160                            // Look for more content to send.
161                            if (_requestContentChunk==null)
162                                _requestContentChunk = exchange.getRequestContentChunk(null);
163
164                            if (_requestContentChunk==null)
165                            {
166                                LOG.debug("complete");
167                                _generator.complete();
168                            }
169                            else if (_generator.isEmpty())
170                            {
171                                LOG.debug("addChunk");
172                                Buffer chunk=_requestContentChunk;
173                                _requestContentChunk=exchange.getRequestContentChunk(null);
174                                _generator.addContent(chunk,_requestContentChunk==null);
175                                if (_requestContentChunk==null)
176                                    exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
177                            }
178                        }
179                    }
180
181                    // Signal request completion
182                    if (_generator.isComplete() && !_requestComplete)
183                    {
184                        LOG.debug("requestComplete");
185                        _requestComplete = true;
186                        exchange.getEventListener().onRequestComplete();
187                    }
188
189                    // Read any input that is available
190                    if (!_parser.isComplete() && _parser.parseAvailable())
191                    {
192                        LOG.debug("parsed");
193                    }
194
195                    // Flush output
196                    _endp.flush();
197                }
198                catch (Throwable e)
199                {
200                    LOG.debug("Failure on " + _exchange, e);
201
202                    failed = true;
203
204                    synchronized (this)
205                    {
206                        if (exchange != null)
207                        {
208                            // Cancelling the exchange causes an exception as we close the connection,
209                            // but we don't report it as it is normal cancelling operation
210                            if (exchange.getStatus() != HttpExchange.STATUS_CANCELLING &&
211                                    exchange.getStatus() != HttpExchange.STATUS_CANCELLED &&
212                                    !exchange.isDone())
213                            {
214                                if(exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
215                                    exchange.getEventListener().onException(e);
216                            }
217                        }
218                        else
219                        {
220                            if (e instanceof IOException)
221                                throw (IOException)e;
222                            if (e instanceof Error)
223                                throw (Error)e;
224                            if (e instanceof RuntimeException)
225                                throw (RuntimeException)e;
226                            throw new RuntimeException(e);
227                        }
228                    }
229                }
230                finally
231                {
232                    LOG.debug("{} {}",_generator, _parser);
233                    LOG.debug("{}",_endp);
234
235                    boolean complete = failed || _generator.isComplete() && _parser.isComplete();
236
237                    if (complete)
238                    {
239                        boolean persistent = !failed && _parser.isPersistent() && _generator.isPersistent();
240                        _generator.setPersistent(persistent);
241                        reset();
242                        if (persistent)
243                            _endp.setMaxIdleTime((int)_destination.getHttpClient().getIdleTimeout());
244
245                        synchronized (this)
246                        {
247                            exchange=_exchange;
248                            _exchange = null;
249
250                            // Cancel the exchange
251                            if (exchange!=null)
252                            {
253                                exchange.cancelTimeout(_destination.getHttpClient());
254
255                                // TODO should we check the exchange is done?
256                            }
257
258                            // handle switched protocols
259                            if (_status==HttpStatus.SWITCHING_PROTOCOLS_101)
260                            {
261                                Connection switched=exchange.onSwitchProtocol(_endp);
262                                if (switched!=null)
263                                    connection=switched;
264                                {
265                                    // switched protocol!
266                                    _pipeline = null;
267                                    if (_pipeline!=null)
268                                        _destination.send(_pipeline);
269                                    _pipeline = null;
270
271                                    connection=switched;
272                                }
273                            }
274
275                            // handle pipelined requests
276                            if (_pipeline!=null)
277                            {
278                                if (!persistent || connection!=this)
279                                    _destination.send(_pipeline);
280                                else
281                                    _exchange=_pipeline;
282                                _pipeline=null;
283                            }
284
285                            if (_exchange==null && !isReserved())  // TODO how do we return switched connections?
286                                _destination.returnConnection(this, !persistent);
287                        }
288                    }
289                }
290            }
291        }
292        finally
293        {
294            _parser.returnBuffers();
295            _generator.returnBuffers();
296        }
297
298        return connection;
299    }
300
301    @Override
302    public boolean send(HttpExchange ex) throws IOException
303    {
304        boolean sent=super.send(ex);
305        if (sent)
306        {
307            synchronized (this)
308            {
309                notifyAll();
310            }
311        }
312        return sent;
313    }
314}
315