1//
2//  ========================================================================
3//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4//  ------------------------------------------------------------------------
5//  All rights reserved. This program and the accompanying materials
6//  are made available under the terms of the Eclipse Public License v1.0
7//  and Apache License v2.0 which accompanies this distribution.
8//
9//      The Eclipse Public License is available at
10//      http://www.eclipse.org/legal/epl-v10.html
11//
12//      The Apache License v2.0 is available at
13//      http://www.opensource.org/licenses/apache2.0.php
14//
15//  You may elect to redistribute this code under either of these licenses.
16//  ========================================================================
17//
18
19package org.eclipse.jetty.io.nio;
20
21import java.io.IOException;
22import java.io.InterruptedIOException;
23import java.nio.channels.ClosedChannelException;
24import java.nio.channels.SelectableChannel;
25import java.nio.channels.SelectionKey;
26import java.nio.channels.SocketChannel;
27import java.util.Locale;
28
29import org.eclipse.jetty.io.AsyncEndPoint;
30import org.eclipse.jetty.io.Buffer;
31import org.eclipse.jetty.io.ConnectedEndPoint;
32import org.eclipse.jetty.io.Connection;
33import org.eclipse.jetty.io.EofException;
34import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
35import org.eclipse.jetty.util.log.Log;
36import org.eclipse.jetty.util.log.Logger;
37import org.eclipse.jetty.util.thread.Timeout.Task;
38
39/* ------------------------------------------------------------ */
40/**
41 * An Endpoint that can be scheduled by {@link SelectorManager}.
42 */
43public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
44{
45    public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
46
47    private final boolean WORK_AROUND_JVM_BUG_6346658 = System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("win");
48    private final SelectorManager.SelectSet _selectSet;
49    private final SelectorManager _manager;
50    private  SelectionKey _key;
51    private final Runnable _handler = new Runnable()
52        {
53            public void run() { handle(); }
54        };
55
56    /** The desired value for {@link SelectionKey#interestOps()} */
57    private int _interestOps;
58
59    /**
60     * The connection instance is the handler for any IO activity on the endpoint.
61     * There is a different type of connection for HTTP, AJP, WebSocket and
62     * ProxyConnect.   The connection may change for an SCEP as it is upgraded
63     * from HTTP to proxy connect or websocket.
64     */
65    private volatile AsyncConnection _connection;
66
67    private static final int STATE_NEEDS_DISPATCH=-1;
68    private static final int STATE_UNDISPATCHED=0;
69    private static final int STATE_DISPATCHED=1;
70    private static final int STATE_ASYNC=2;
71    private int _state;
72
73    private boolean _onIdle;
74
75    /** true if the last write operation succeed and wrote all offered bytes */
76    private volatile boolean _writable = true;
77
78
79    /** True if a thread has is blocked in {@link #blockReadable(long)} */
80    private boolean _readBlocked;
81
82    /** True if a thread has is blocked in {@link #blockWritable(long)} */
83    private boolean _writeBlocked;
84
85    /** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */
86    private boolean _open;
87
88    private volatile long _idleTimestamp;
89    private volatile boolean _checkIdle;
90
91    private boolean _interruptable;
92
93    private boolean _ishut;
94
95    /* ------------------------------------------------------------ */
96    public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
97        throws IOException
98    {
99        super(channel, maxIdleTime);
100
101        _manager = selectSet.getManager();
102        _selectSet = selectSet;
103        _state=STATE_UNDISPATCHED;
104        _onIdle=false;
105        _open=true;
106        _key = key;
107
108        setCheckForIdle(true);
109    }
110
111    /* ------------------------------------------------------------ */
112    public SelectionKey getSelectionKey()
113    {
114        synchronized (this)
115        {
116            return _key;
117        }
118    }
119
120    /* ------------------------------------------------------------ */
121    public SelectorManager getSelectManager()
122    {
123        return _manager;
124    }
125
126    /* ------------------------------------------------------------ */
127    public Connection getConnection()
128    {
129        return _connection;
130    }
131
132    /* ------------------------------------------------------------ */
133    public void setConnection(Connection connection)
134    {
135        Connection old=_connection;
136        _connection=(AsyncConnection)connection;
137        if (old!=null && old!=_connection)
138            _manager.endPointUpgraded(this,old);
139    }
140
141    /* ------------------------------------------------------------ */
142    public long getIdleTimestamp()
143    {
144        return _idleTimestamp;
145    }
146
147    /* ------------------------------------------------------------ */
148    /** Called by selectSet to schedule handling
149     *
150     */
151    public void schedule()
152    {
153        synchronized (this)
154        {
155            // If there is no key, then do nothing
156            if (_key == null || !_key.isValid())
157            {
158                _readBlocked=false;
159                _writeBlocked=false;
160                this.notifyAll();
161                return;
162            }
163
164            // If there are threads dispatched reading and writing
165            if (_readBlocked || _writeBlocked)
166            {
167                // assert _dispatched;
168                if (_readBlocked && _key.isReadable())
169                    _readBlocked=false;
170                if (_writeBlocked && _key.isWritable())
171                    _writeBlocked=false;
172
173                // wake them up is as good as a dispatched.
174                this.notifyAll();
175
176                // we are not interested in further selecting
177                _key.interestOps(0);
178                if (_state<STATE_DISPATCHED)
179                    updateKey();
180                return;
181            }
182
183            // Remove writeable op
184            if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
185            {
186                // Remove writeable op
187                _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
188                _key.interestOps(_interestOps);
189                _writable = true; // Once writable is in ops, only removed with dispatch.
190            }
191
192            // If dispatched, then deregister interest
193            if (_state>=STATE_DISPATCHED)
194                _key.interestOps(0);
195            else
196            {
197                // other wise do the dispatch
198                dispatch();
199                if (_state>=STATE_DISPATCHED && !_selectSet.getManager().isDeferringInterestedOps0())
200                {
201                    _key.interestOps(0);
202                }
203            }
204        }
205    }
206
207    /* ------------------------------------------------------------ */
208    public void asyncDispatch()
209    {
210        synchronized(this)
211        {
212            switch(_state)
213            {
214                case STATE_NEEDS_DISPATCH:
215                case STATE_UNDISPATCHED:
216                    dispatch();
217                    break;
218
219                case STATE_DISPATCHED:
220                case STATE_ASYNC:
221                    _state=STATE_ASYNC;
222                    break;
223            }
224        }
225    }
226
227    /* ------------------------------------------------------------ */
228    public void dispatch()
229    {
230        synchronized(this)
231        {
232            if (_state<=STATE_UNDISPATCHED)
233            {
234                if (_onIdle)
235                    _state = STATE_NEEDS_DISPATCH;
236                else
237                {
238                    _state = STATE_DISPATCHED;
239                    boolean dispatched = _manager.dispatch(_handler);
240                    if(!dispatched)
241                    {
242                        _state = STATE_NEEDS_DISPATCH;
243                        LOG.warn("Dispatched Failed! "+this+" to "+_manager);
244                        updateKey();
245                    }
246                }
247            }
248        }
249    }
250
251    /* ------------------------------------------------------------ */
252    /**
253     * Called when a dispatched thread is no longer handling the endpoint.
254     * The selection key operations are updated.
255     * @return If false is returned, the endpoint has been redispatched and
256     * thread must keep handling the endpoint.
257     */
258    protected boolean undispatch()
259    {
260        synchronized (this)
261        {
262            switch(_state)
263            {
264                case STATE_ASYNC:
265                    _state=STATE_DISPATCHED;
266                    return false;
267
268                default:
269                    _state=STATE_UNDISPATCHED;
270                    updateKey();
271                    return true;
272            }
273        }
274    }
275
276    /* ------------------------------------------------------------ */
277    public void cancelTimeout(Task task)
278    {
279        getSelectSet().cancelTimeout(task);
280    }
281
282    /* ------------------------------------------------------------ */
283    public void scheduleTimeout(Task task, long timeoutMs)
284    {
285        getSelectSet().scheduleTimeout(task,timeoutMs);
286    }
287
288    /* ------------------------------------------------------------ */
289    public void setCheckForIdle(boolean check)
290    {
291        if (check)
292        {
293            _idleTimestamp=System.currentTimeMillis();
294            _checkIdle=true;
295        }
296        else
297            _checkIdle=false;
298    }
299
300    /* ------------------------------------------------------------ */
301    public boolean isCheckForIdle()
302    {
303        return _checkIdle;
304    }
305
306    /* ------------------------------------------------------------ */
307    protected void notIdle()
308    {
309        _idleTimestamp=System.currentTimeMillis();
310    }
311
312    /* ------------------------------------------------------------ */
313    public void checkIdleTimestamp(long now)
314    {
315        if (isCheckForIdle() && _maxIdleTime>0)
316        {
317            final long idleForMs=now-_idleTimestamp;
318
319            if (idleForMs>_maxIdleTime)
320            {
321                // Don't idle out again until onIdleExpired task completes.
322                setCheckForIdle(false);
323                _manager.dispatch(new Runnable()
324                {
325                    public void run()
326                    {
327                        try
328                        {
329                            onIdleExpired(idleForMs);
330                        }
331                        finally
332                        {
333                            setCheckForIdle(true);
334                        }
335                    }
336                });
337            }
338        }
339    }
340
341    /* ------------------------------------------------------------ */
342    public void onIdleExpired(long idleForMs)
343    {
344        try
345        {
346            synchronized (this)
347            {
348                _onIdle=true;
349            }
350
351            _connection.onIdleExpired(idleForMs);
352        }
353        finally
354        {
355            synchronized (this)
356            {
357                _onIdle=false;
358                if (_state==STATE_NEEDS_DISPATCH)
359                    dispatch();
360            }
361        }
362    }
363
364    /* ------------------------------------------------------------ */
365    @Override
366    public int fill(Buffer buffer) throws IOException
367    {
368        int fill=super.fill(buffer);
369        if (fill>0)
370            notIdle();
371        return fill;
372    }
373
374    /* ------------------------------------------------------------ */
375    @Override
376    public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
377    {
378        int l = super.flush(header, buffer, trailer);
379
380        // If there was something to write and it wasn't written, then we are not writable.
381        if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
382        {
383            synchronized (this)
384            {
385                _writable=false;
386                if (_state<STATE_DISPATCHED)
387                    updateKey();
388            }
389        }
390        else if (l>0)
391        {
392            _writable=true;
393            notIdle();
394        }
395        return l;
396    }
397
398    /* ------------------------------------------------------------ */
399    /*
400     */
401    @Override
402    public int flush(Buffer buffer) throws IOException
403    {
404        int l = super.flush(buffer);
405
406        // If there was something to write and it wasn't written, then we are not writable.
407        if (l==0 && buffer!=null && buffer.hasContent())
408        {
409            synchronized (this)
410            {
411                _writable=false;
412                if (_state<STATE_DISPATCHED)
413                    updateKey();
414            }
415        }
416        else if (l>0)
417        {
418            _writable=true;
419            notIdle();
420        }
421
422        return l;
423    }
424
425    /* ------------------------------------------------------------ */
426    /*
427     * Allows thread to block waiting for further events.
428     */
429    @Override
430    public boolean blockReadable(long timeoutMs) throws IOException
431    {
432        synchronized (this)
433        {
434            if (isInputShutdown())
435                throw new EofException();
436
437            long now=_selectSet.getNow();
438            long end=now+timeoutMs;
439            boolean check=isCheckForIdle();
440            setCheckForIdle(true);
441            try
442            {
443                _readBlocked=true;
444                while (!isInputShutdown() && _readBlocked)
445                {
446                    try
447                    {
448                        updateKey();
449                        this.wait(timeoutMs>0?(end-now):10000);
450                    }
451                    catch (final InterruptedException e)
452                    {
453                        LOG.warn(e);
454                        if (_interruptable)
455                            throw new InterruptedIOException(){{this.initCause(e);}};
456                    }
457                    finally
458                    {
459                        now=_selectSet.getNow();
460                    }
461
462                    if (_readBlocked && timeoutMs>0 && now>=end)
463                        return false;
464                }
465            }
466            finally
467            {
468                _readBlocked=false;
469                setCheckForIdle(check);
470            }
471        }
472        return true;
473    }
474
475    /* ------------------------------------------------------------ */
476    /*
477     * Allows thread to block waiting for further events.
478     */
479    @Override
480    public boolean blockWritable(long timeoutMs) throws IOException
481    {
482        synchronized (this)
483        {
484            if (isOutputShutdown())
485                throw new EofException();
486
487            long now=_selectSet.getNow();
488            long end=now+timeoutMs;
489            boolean check=isCheckForIdle();
490            setCheckForIdle(true);
491            try
492            {
493                _writeBlocked=true;
494                while (_writeBlocked && !isOutputShutdown())
495                {
496                    try
497                    {
498                        updateKey();
499                        this.wait(timeoutMs>0?(end-now):10000);
500                    }
501                    catch (final InterruptedException e)
502                    {
503                        LOG.warn(e);
504                        if (_interruptable)
505                            throw new InterruptedIOException(){{this.initCause(e);}};
506                    }
507                    finally
508                    {
509                        now=_selectSet.getNow();
510                    }
511                    if (_writeBlocked && timeoutMs>0 && now>=end)
512                        return false;
513                }
514            }
515            finally
516            {
517                _writeBlocked=false;
518                setCheckForIdle(check);
519            }
520        }
521        return true;
522    }
523
524    /* ------------------------------------------------------------ */
525    /** Set the interruptable mode of the endpoint.
526     * If set to false (default), then interrupts are assumed to be spurious
527     * and blocking operations continue unless the endpoint has been closed.
528     * If true, then interrupts of blocking operations result in InterruptedIOExceptions
529     * being thrown.
530     * @param interupable
531     */
532    public void setInterruptable(boolean interupable)
533    {
534        synchronized (this)
535        {
536            _interruptable=interupable;
537        }
538    }
539
540    /* ------------------------------------------------------------ */
541    public boolean isInterruptable()
542    {
543        return _interruptable;
544    }
545
546    /* ------------------------------------------------------------ */
547    /**
548     * @see org.eclipse.jetty.io.AsyncEndPoint#scheduleWrite()
549     */
550    public void scheduleWrite()
551    {
552        if (_writable)
553            LOG.debug("Required scheduleWrite {}",this);
554
555        _writable=false;
556        updateKey();
557    }
558
559    /* ------------------------------------------------------------ */
560    public boolean isWritable()
561    {
562        return _writable;
563    }
564
565    /* ------------------------------------------------------------ */
566    public boolean hasProgressed()
567    {
568        return false;
569    }
570
571    /* ------------------------------------------------------------ */
572    /**
573     * Updates selection key. Adds operations types to the selection key as needed. No operations
574     * are removed as this is only done during dispatch. This method records the new key and
575     * schedules a call to doUpdateKey to do the keyChange
576     */
577    private void updateKey()
578    {
579        final boolean changed;
580        synchronized (this)
581        {
582            int current_ops=-1;
583            if (getChannel().isOpen())
584            {
585                boolean read_interest = _readBlocked || (_state<STATE_DISPATCHED && !_connection.isSuspended());
586                boolean write_interest= _writeBlocked || (_state<STATE_DISPATCHED && !_writable);
587
588                _interestOps =
589                    ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ  : 0)
590                |   ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0);
591                try
592                {
593                    current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
594                }
595                catch(Exception e)
596                {
597                    _key=null;
598                    LOG.ignore(e);
599                }
600            }
601            changed=_interestOps!=current_ops;
602        }
603
604        if(changed)
605        {
606            _selectSet.addChange(this);
607            _selectSet.wakeup();
608        }
609    }
610
611
612    /* ------------------------------------------------------------ */
613    /**
614     * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
615     */
616    void doUpdateKey()
617    {
618        synchronized (this)
619        {
620            if (getChannel().isOpen())
621            {
622                if (_interestOps>0)
623                {
624                    if (_key==null || !_key.isValid())
625                    {
626                        SelectableChannel sc = (SelectableChannel)getChannel();
627                        if (sc.isRegistered())
628                        {
629                            updateKey();
630                        }
631                        else
632                        {
633                            try
634                            {
635                                _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
636                            }
637                            catch (Exception e)
638                            {
639                                LOG.ignore(e);
640                                if (_key!=null && _key.isValid())
641                                {
642                                    _key.cancel();
643                                }
644
645                                if (_open)
646                                {
647                                    _selectSet.destroyEndPoint(this);
648                                }
649                                _open=false;
650                                _key = null;
651                            }
652                        }
653                    }
654                    else
655                    {
656                        _key.interestOps(_interestOps);
657                    }
658                }
659                else
660                {
661                    if (_key!=null && _key.isValid())
662                        _key.interestOps(0);
663                    else
664                        _key=null;
665                }
666            }
667            else
668            {
669                if (_key!=null && _key.isValid())
670                    _key.cancel();
671
672                if (_open)
673                {
674                    _open=false;
675                    _selectSet.destroyEndPoint(this);
676                }
677                _key = null;
678            }
679        }
680    }
681
682    /* ------------------------------------------------------------ */
683    /*
684     */
685    protected void handle()
686    {
687        boolean dispatched=true;
688        try
689        {
690            while(dispatched)
691            {
692                try
693                {
694                    while(true)
695                    {
696                        final AsyncConnection next = (AsyncConnection)_connection.handle();
697                        if (next!=_connection)
698                        {
699                            LOG.debug("{} replaced {}",next,_connection);
700                            Connection old=_connection;
701                            _connection=next;
702                            _manager.endPointUpgraded(this,old);
703                            continue;
704                        }
705                        break;
706                    }
707                }
708                catch (ClosedChannelException e)
709                {
710                    LOG.ignore(e);
711                }
712                catch (EofException e)
713                {
714                    LOG.debug("EOF", e);
715                    try{close();}
716                    catch(IOException e2){LOG.ignore(e2);}
717                }
718                catch (IOException e)
719                {
720                    LOG.warn(e.toString());
721                    try{close();}
722                    catch(IOException e2){LOG.ignore(e2);}
723                }
724                catch (Throwable e)
725                {
726                    LOG.warn("handle failed", e);
727                    try{close();}
728                    catch(IOException e2){LOG.ignore(e2);}
729                }
730                finally
731                {
732                    if (!_ishut && isInputShutdown() && isOpen())
733                    {
734                        _ishut=true;
735                        try
736                        {
737                            _connection.onInputShutdown();
738                        }
739                        catch(Throwable x)
740                        {
741                            LOG.warn("onInputShutdown failed", x);
742                            try{close();}
743                            catch(IOException e2){LOG.ignore(e2);}
744                        }
745                        finally
746                        {
747                            updateKey();
748                        }
749                    }
750                    dispatched=!undispatch();
751                }
752            }
753        }
754        finally
755        {
756            if (dispatched)
757            {
758                dispatched=!undispatch();
759                while (dispatched)
760                {
761                    LOG.warn("SCEP.run() finally DISPATCHED");
762                    dispatched=!undispatch();
763                }
764            }
765        }
766    }
767
768    /* ------------------------------------------------------------ */
769    /*
770     * @see org.eclipse.io.nio.ChannelEndPoint#close()
771     */
772    @Override
773    public void close() throws IOException
774    {
775        // On unix systems there is a JVM issue that if you cancel before closing, it can
776        // cause the selector to block waiting for a channel to close and that channel can
777        // block waiting for the remote end.  But on windows, if you don't cancel before a
778        // close, then the selector can block anyway!
779        // https://bugs.eclipse.org/bugs/show_bug.cgi?id=357318
780        if (WORK_AROUND_JVM_BUG_6346658)
781        {
782            try
783            {
784                SelectionKey key = _key;
785                if (key!=null)
786                    key.cancel();
787            }
788            catch (Throwable e)
789            {
790                LOG.ignore(e);
791            }
792        }
793
794        try
795        {
796            super.close();
797        }
798        catch (IOException e)
799        {
800            LOG.ignore(e);
801        }
802        finally
803        {
804            updateKey();
805        }
806    }
807
808    /* ------------------------------------------------------------ */
809    @Override
810    public String toString()
811    {
812        // Do NOT use synchronized (this)
813        // because it's very easy to deadlock when debugging is enabled.
814        // We do a best effort to print the right toString() and that's it.
815        SelectionKey key = _key;
816        String keyString = "";
817        if (key != null)
818        {
819            if (key.isValid())
820            {
821                if (key.isReadable())
822                    keyString += "r";
823                if (key.isWritable())
824                    keyString += "w";
825            }
826            else
827            {
828                keyString += "!";
829            }
830        }
831        else
832        {
833            keyString += "-";
834        }
835        return String.format("SCEP@%x{l(%s)<->r(%s),s=%d,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s}-{%s}",
836                hashCode(),
837                _socket.getRemoteSocketAddress(),
838                _socket.getLocalSocketAddress(),
839                _state,
840                isOpen(),
841                isInputShutdown(),
842                isOutputShutdown(),
843                _readBlocked,
844                _writeBlocked,
845                _writable,
846                _interestOps,
847                keyString,
848                _connection);
849    }
850
851    /* ------------------------------------------------------------ */
852    public SelectSet getSelectSet()
853    {
854        return _selectSet;
855    }
856
857    /* ------------------------------------------------------------ */
858    /**
859     * Don't set the SoTimeout
860     * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int)
861     */
862    @Override
863    public void setMaxIdleTime(int timeMs) throws IOException
864    {
865        _maxIdleTime=timeMs;
866    }
867
868}
869