1//
2//  ========================================================================
3//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4//  ------------------------------------------------------------------------
5//  All rights reserved. This program and the accompanying materials
6//  are made available under the terms of the Eclipse Public License v1.0
7//  and Apache License v2.0 which accompanies this distribution.
8//
9//      The Eclipse Public License is available at
10//      http://www.eclipse.org/legal/epl-v10.html
11//
12//      The Apache License v2.0 is available at
13//      http://www.opensource.org/licenses/apache2.0.php
14//
15//  You may elect to redistribute this code under either of these licenses.
16//  ========================================================================
17//
18
19package org.eclipse.jetty.util;
20
21import java.util.AbstractList;
22import java.util.Collection;
23import java.util.NoSuchElementException;
24import java.util.concurrent.BlockingQueue;
25import java.util.concurrent.TimeUnit;
26import java.util.concurrent.atomic.AtomicInteger;
27import java.util.concurrent.locks.Condition;
28import java.util.concurrent.locks.ReentrantLock;
29
30
31/* ------------------------------------------------------------ */
32/** Queue backed by a circular array.
33 *
34 * This queue is uses  a variant of the two lock queue algorithm to
35 * provide an efficient queue or list backed by a growable circular
36 * array.  This queue also has a partial implementation of
37 * {@link java.util.concurrent.BlockingQueue}, specifically the {@link #take()} and
38 * {@link #poll(long, TimeUnit)} methods.
39 * Unlike {@link java.util.concurrent.ArrayBlockingQueue}, this class is
40 * able to grow and provides a blocking put call.
41 * <p>
42 * The queue has both a capacity (the size of the array currently allocated)
43 * and a limit (the maximum size that may be allocated), which defaults to
44 * {@link Integer#MAX_VALUE}.
45 *
46 * @param <E> The element type
47 */
48public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
49{
50    public final int DEFAULT_CAPACITY=128;
51    public final int DEFAULT_GROWTH=64;
52    private final int _limit;
53    private final AtomicInteger _size=new AtomicInteger();
54    private final int _growCapacity;
55
56    private volatile int _capacity;
57    private Object[] _elements;
58
59    private final ReentrantLock _headLock = new ReentrantLock();
60    private final Condition _notEmpty = _headLock.newCondition();
61    private int _head;
62
63    // spacers created to prevent false sharing between head and tail http://en.wikipedia.org/wiki/False_sharing
64    // TODO verify this has benefits
65    private long _space0;
66    private long _space1;
67    private long _space2;
68    private long _space3;
69    private long _space4;
70    private long _space5;
71    private long _space6;
72    private long _space7;
73
74    private final ReentrantLock _tailLock = new ReentrantLock();
75    private int _tail;
76
77
78    /* ------------------------------------------------------------ */
79    /** Create a growing partially blocking Queue
80     *
81     */
82    public BlockingArrayQueue()
83    {
84        _elements=new Object[DEFAULT_CAPACITY];
85        _growCapacity=DEFAULT_GROWTH;
86        _capacity=_elements.length;
87        _limit=Integer.MAX_VALUE;
88    }
89
90    /* ------------------------------------------------------------ */
91    /** Create a fixed size partially blocking Queue
92     * @param limit The initial capacity and the limit.
93     */
94    public BlockingArrayQueue(int limit)
95    {
96        _elements=new Object[limit];
97        _capacity=_elements.length;
98        _growCapacity=-1;
99        _limit=limit;
100    }
101
102    /* ------------------------------------------------------------ */
103    /** Create a growing partially blocking Queue.
104     * @param capacity Initial capacity
105     * @param growBy Incremental capacity.
106     */
107    public BlockingArrayQueue(int capacity,int growBy)
108    {
109        _elements=new Object[capacity];
110        _capacity=_elements.length;
111        _growCapacity=growBy;
112        _limit=Integer.MAX_VALUE;
113    }
114
115    /* ------------------------------------------------------------ */
116    /** Create a growing limited partially blocking Queue.
117     * @param capacity Initial capacity
118     * @param growBy Incremental capacity.
119     * @param limit maximum capacity.
120     */
121    public BlockingArrayQueue(int capacity,int growBy,int limit)
122    {
123        if (capacity>limit)
124            throw new IllegalArgumentException();
125
126        _elements=new Object[capacity];
127        _capacity=_elements.length;
128        _growCapacity=growBy;
129        _limit=limit;
130    }
131
132    /* ------------------------------------------------------------ */
133    public int getCapacity()
134    {
135        return _capacity;
136    }
137
138    /* ------------------------------------------------------------ */
139    public int getLimit()
140    {
141        return _limit;
142    }
143
144    /* ------------------------------------------------------------ */
145    @Override
146    public boolean add(E e)
147    {
148        return offer(e);
149    }
150
151    /* ------------------------------------------------------------ */
152    public E element()
153    {
154        E e = peek();
155        if (e==null)
156            throw new NoSuchElementException();
157        return e;
158    }
159
160    /* ------------------------------------------------------------ */
161    @SuppressWarnings("unchecked")
162    public E peek()
163    {
164        if (_size.get() == 0)
165            return null;
166
167        E e = null;
168        _headLock.lock(); // Size cannot shrink
169        try
170        {
171            if (_size.get() > 0)
172                e = (E)_elements[_head];
173        }
174        finally
175        {
176            _headLock.unlock();
177        }
178
179        return e;
180    }
181
182    /* ------------------------------------------------------------ */
183    public boolean offer(E e)
184    {
185        if (e == null)
186            throw new NullPointerException();
187
188        boolean not_empty=false;
189        _tailLock.lock();  // size cannot grow... only shrink
190        try
191        {
192            if (_size.get() >= _limit)
193                return false;
194
195            // should we expand array?
196            if (_size.get()==_capacity)
197            {
198                _headLock.lock();   // Need to grow array
199                try
200                {
201                    if (!grow())
202                        return false;
203                }
204                finally
205                {
206                    _headLock.unlock();
207                }
208            }
209
210            // add the element
211            _elements[_tail]=e;
212            _tail=(_tail+1)%_capacity;
213
214            not_empty=0==_size.getAndIncrement();
215
216        }
217        finally
218        {
219            _tailLock.unlock();
220        }
221
222        if (not_empty)
223        {
224            _headLock.lock();
225            try
226            {
227                _notEmpty.signal();
228            }
229            finally
230            {
231                _headLock.unlock();
232            }
233        }
234
235        return true;
236    }
237
238
239    /* ------------------------------------------------------------ */
240    @SuppressWarnings("unchecked")
241    public E poll()
242    {
243        if (_size.get() == 0)
244            return null;
245
246        E e = null;
247        _headLock.lock(); // Size cannot shrink
248        try
249        {
250            if (_size.get() > 0)
251            {
252                final int head=_head;
253                e = (E)_elements[head];
254                _elements[head]=null;
255                _head=(head+1)%_capacity;
256
257                if (_size.decrementAndGet()>0)
258                    _notEmpty.signal();
259            }
260        }
261        finally
262        {
263            _headLock.unlock();
264        }
265
266        return e;
267    }
268
269    /* ------------------------------------------------------------ */
270    /**
271     * Retrieves and removes the head of this queue, waiting
272     * if no elements are present on this queue.
273     * @return the head of this queue
274     * @throws InterruptedException if interrupted while waiting.
275     */
276    @SuppressWarnings("unchecked")
277    public E take() throws InterruptedException
278    {
279        E e = null;
280        _headLock.lockInterruptibly();  // Size cannot shrink
281        try
282        {
283            try
284            {
285                while (_size.get() == 0)
286                {
287                    _notEmpty.await();
288                }
289            }
290            catch (InterruptedException ie)
291            {
292                _notEmpty.signal();
293                throw ie;
294            }
295
296            final int head=_head;
297            e = (E)_elements[head];
298            _elements[head]=null;
299            _head=(head+1)%_capacity;
300
301            if (_size.decrementAndGet()>0)
302                _notEmpty.signal();
303        }
304        finally
305        {
306            _headLock.unlock();
307        }
308
309        return e;
310    }
311
312    /* ------------------------------------------------------------ */
313    /**
314     * Retrieves and removes the head of this queue, waiting
315     * if necessary up to the specified wait time if no elements are
316     * present on this queue.
317     * @param time how long to wait before giving up, in units of
318     * <tt>unit</tt>
319     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
320     * <tt>timeout</tt> parameter
321     * @return the head of this queue, or <tt>null</tt> if the
322     * specified waiting time elapses before an element is present.
323     * @throws InterruptedException if interrupted while waiting.
324     */
325    @SuppressWarnings("unchecked")
326    public E poll(long time, TimeUnit unit) throws InterruptedException
327    {
328
329        E e = null;
330
331        long nanos = unit.toNanos(time);
332
333        _headLock.lockInterruptibly(); // Size cannot shrink
334        try
335        {
336            try
337            {
338                while (_size.get() == 0)
339                {
340                    if (nanos<=0)
341                        return null;
342                    nanos = _notEmpty.awaitNanos(nanos);
343                }
344            }
345            catch (InterruptedException ie)
346            {
347                _notEmpty.signal();
348                throw ie;
349            }
350
351            e = (E)_elements[_head];
352            _elements[_head]=null;
353            _head=(_head+1)%_capacity;
354
355            if (_size.decrementAndGet()>0)
356                _notEmpty.signal();
357        }
358        finally
359        {
360            _headLock.unlock();
361        }
362
363        return e;
364    }
365
366    /* ------------------------------------------------------------ */
367    public E remove()
368    {
369        E e=poll();
370        if (e==null)
371            throw new NoSuchElementException();
372        return e;
373    }
374
375    /* ------------------------------------------------------------ */
376    @Override
377    public void clear()
378    {
379        _tailLock.lock();
380        try
381        {
382            _headLock.lock();
383            try
384            {
385                _head=0;
386                _tail=0;
387                _size.set(0);
388            }
389            finally
390            {
391                _headLock.unlock();
392            }
393        }
394        finally
395        {
396            _tailLock.unlock();
397        }
398    }
399
400    /* ------------------------------------------------------------ */
401    @Override
402    public boolean isEmpty()
403    {
404        return _size.get()==0;
405    }
406
407    /* ------------------------------------------------------------ */
408    @Override
409    public int size()
410    {
411        return _size.get();
412    }
413
414    /* ------------------------------------------------------------ */
415    @SuppressWarnings("unchecked")
416    @Override
417    public E get(int index)
418    {
419        _tailLock.lock();
420        try
421        {
422            _headLock.lock();
423            try
424            {
425                if (index<0 || index>=_size.get())
426                    throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
427                int i = _head+index;
428                if (i>=_capacity)
429                    i-=_capacity;
430                return (E)_elements[i];
431            }
432            finally
433            {
434                _headLock.unlock();
435            }
436        }
437        finally
438        {
439            _tailLock.unlock();
440        }
441    }
442
443    /* ------------------------------------------------------------ */
444    @Override
445    public E remove(int index)
446    {
447        _tailLock.lock();
448        try
449        {
450            _headLock.lock();
451            try
452            {
453
454                if (index<0 || index>=_size.get())
455                    throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
456
457                int i = _head+index;
458                if (i>=_capacity)
459                    i-=_capacity;
460                @SuppressWarnings("unchecked")
461                E old=(E)_elements[i];
462
463                if (i<_tail)
464                {
465                    System.arraycopy(_elements,i+1,_elements,i,_tail-i);
466                    _tail--;
467                    _size.decrementAndGet();
468                }
469                else
470                {
471                    System.arraycopy(_elements,i+1,_elements,i,_capacity-i-1);
472                    if (_tail>0)
473                    {
474                        _elements[_capacity]=_elements[0];
475                        System.arraycopy(_elements,1,_elements,0,_tail-1);
476                        _tail--;
477                    }
478                    else
479                        _tail=_capacity-1;
480
481                    _size.decrementAndGet();
482                }
483
484                return old;
485            }
486            finally
487            {
488                _headLock.unlock();
489            }
490        }
491        finally
492        {
493            _tailLock.unlock();
494        }
495    }
496
497    /* ------------------------------------------------------------ */
498    @Override
499    public E set(int index, E e)
500    {
501        if (e == null)
502            throw new NullPointerException();
503
504        _tailLock.lock();
505        try
506        {
507            _headLock.lock();
508            try
509            {
510
511                if (index<0 || index>=_size.get())
512                    throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
513
514                int i = _head+index;
515                if (i>=_capacity)
516                    i-=_capacity;
517                @SuppressWarnings("unchecked")
518                E old=(E)_elements[i];
519                _elements[i]=e;
520                return old;
521            }
522            finally
523            {
524                _headLock.unlock();
525            }
526        }
527        finally
528        {
529            _tailLock.unlock();
530        }
531    }
532
533    /* ------------------------------------------------------------ */
534    @Override
535    public void add(int index, E e)
536    {
537        if (e == null)
538            throw new NullPointerException();
539
540        _tailLock.lock();
541        try
542        {
543            _headLock.lock();
544            try
545            {
546
547                if (index<0 || index>_size.get())
548                    throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
549
550                if (index==_size.get())
551                {
552                    add(e);
553                }
554                else
555                {
556                    if (_tail==_head)
557                        if (!grow())
558                            throw new IllegalStateException("full");
559
560                    int i = _head+index;
561                    if (i>=_capacity)
562                        i-=_capacity;
563
564                    _size.incrementAndGet();
565                    _tail=(_tail+1)%_capacity;
566
567
568                    if (i<_tail)
569                    {
570                        System.arraycopy(_elements,i,_elements,i+1,_tail-i);
571                        _elements[i]=e;
572                    }
573                    else
574                    {
575                        if (_tail>0)
576                        {
577                            System.arraycopy(_elements,0,_elements,1,_tail);
578                            _elements[0]=_elements[_capacity-1];
579                        }
580
581                        System.arraycopy(_elements,i,_elements,i+1,_capacity-i-1);
582                        _elements[i]=e;
583                    }
584                }
585            }
586            finally
587            {
588                _headLock.unlock();
589            }
590        }
591        finally
592        {
593            _tailLock.unlock();
594        }
595    }
596
597    /* ------------------------------------------------------------ */
598    private boolean grow()
599    {
600        if (_growCapacity<=0)
601            return false;
602
603        _tailLock.lock();
604        try
605        {
606            _headLock.lock();
607            try
608            {
609                final int head=_head;
610                final int tail=_tail;
611                final int new_tail;
612
613                Object[] elements=new Object[_capacity+_growCapacity];
614
615                if (head<tail)
616                {
617                    new_tail=tail-head;
618                    System.arraycopy(_elements,head,elements,0,new_tail);
619                }
620                else if (head>tail || _size.get()>0)
621                {
622                    new_tail=_capacity+tail-head;
623                    int cut=_capacity-head;
624                    System.arraycopy(_elements,head,elements,0,cut);
625                    System.arraycopy(_elements,0,elements,cut,tail);
626                }
627                else
628                {
629                    new_tail=0;
630                }
631
632                _elements=elements;
633                _capacity=_elements.length;
634                _head=0;
635                _tail=new_tail;
636                return true;
637            }
638            finally
639            {
640                _headLock.unlock();
641            }
642        }
643        finally
644        {
645            _tailLock.unlock();
646        }
647
648    }
649
650    /* ------------------------------------------------------------ */
651    public int drainTo(Collection<? super E> c)
652    {
653        throw new UnsupportedOperationException();
654    }
655
656    /* ------------------------------------------------------------ */
657    public int drainTo(Collection<? super E> c, int maxElements)
658    {
659        throw new UnsupportedOperationException();
660    }
661
662    /* ------------------------------------------------------------ */
663    public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException
664    {
665        throw new UnsupportedOperationException();
666    }
667
668    /* ------------------------------------------------------------ */
669    public void put(E o) throws InterruptedException
670    {
671        if (!add(o))
672            throw new IllegalStateException("full");
673    }
674
675    /* ------------------------------------------------------------ */
676    public int remainingCapacity()
677    {
678        _tailLock.lock();
679        try
680        {
681            _headLock.lock();
682            try
683            {
684                return getCapacity()-size();
685            }
686            finally
687            {
688                _headLock.unlock();
689            }
690        }
691        finally
692        {
693            _tailLock.unlock();
694        }
695    }
696
697
698    /* ------------------------------------------------------------ */
699    long sumOfSpace()
700    {
701        // this method exists to stop clever optimisers removing the spacers
702        return _space0++ +_space1++ +_space2++ +_space3++ +_space4++ +_space5++ +_space6++ +_space7++;
703    }
704}
705