1// 2// ======================================================================== 3// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. 4// ------------------------------------------------------------------------ 5// All rights reserved. This program and the accompanying materials 6// are made available under the terms of the Eclipse Public License v1.0 7// and Apache License v2.0 which accompanies this distribution. 8// 9// The Eclipse Public License is available at 10// http://www.eclipse.org/legal/epl-v10.html 11// 12// The Apache License v2.0 is available at 13// http://www.opensource.org/licenses/apache2.0.php 14// 15// You may elect to redistribute this code under either of these licenses. 16// ======================================================================== 17// 18 19package org.eclipse.jetty.util; 20 21import java.util.AbstractList; 22import java.util.Collection; 23import java.util.NoSuchElementException; 24import java.util.concurrent.BlockingQueue; 25import java.util.concurrent.TimeUnit; 26import java.util.concurrent.atomic.AtomicInteger; 27import java.util.concurrent.locks.Condition; 28import java.util.concurrent.locks.ReentrantLock; 29 30 31/* ------------------------------------------------------------ */ 32/** Queue backed by a circular array. 33 * 34 * This queue is uses a variant of the two lock queue algorithm to 35 * provide an efficient queue or list backed by a growable circular 36 * array. This queue also has a partial implementation of 37 * {@link java.util.concurrent.BlockingQueue}, specifically the {@link #take()} and 38 * {@link #poll(long, TimeUnit)} methods. 39 * Unlike {@link java.util.concurrent.ArrayBlockingQueue}, this class is 40 * able to grow and provides a blocking put call. 41 * <p> 42 * The queue has both a capacity (the size of the array currently allocated) 43 * and a limit (the maximum size that may be allocated), which defaults to 44 * {@link Integer#MAX_VALUE}. 45 * 46 * @param <E> The element type 47 */ 48public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E> 49{ 50 public final int DEFAULT_CAPACITY=128; 51 public final int DEFAULT_GROWTH=64; 52 private final int _limit; 53 private final AtomicInteger _size=new AtomicInteger(); 54 private final int _growCapacity; 55 56 private volatile int _capacity; 57 private Object[] _elements; 58 59 private final ReentrantLock _headLock = new ReentrantLock(); 60 private final Condition _notEmpty = _headLock.newCondition(); 61 private int _head; 62 63 // spacers created to prevent false sharing between head and tail http://en.wikipedia.org/wiki/False_sharing 64 // TODO verify this has benefits 65 private long _space0; 66 private long _space1; 67 private long _space2; 68 private long _space3; 69 private long _space4; 70 private long _space5; 71 private long _space6; 72 private long _space7; 73 74 private final ReentrantLock _tailLock = new ReentrantLock(); 75 private int _tail; 76 77 78 /* ------------------------------------------------------------ */ 79 /** Create a growing partially blocking Queue 80 * 81 */ 82 public BlockingArrayQueue() 83 { 84 _elements=new Object[DEFAULT_CAPACITY]; 85 _growCapacity=DEFAULT_GROWTH; 86 _capacity=_elements.length; 87 _limit=Integer.MAX_VALUE; 88 } 89 90 /* ------------------------------------------------------------ */ 91 /** Create a fixed size partially blocking Queue 92 * @param limit The initial capacity and the limit. 93 */ 94 public BlockingArrayQueue(int limit) 95 { 96 _elements=new Object[limit]; 97 _capacity=_elements.length; 98 _growCapacity=-1; 99 _limit=limit; 100 } 101 102 /* ------------------------------------------------------------ */ 103 /** Create a growing partially blocking Queue. 104 * @param capacity Initial capacity 105 * @param growBy Incremental capacity. 106 */ 107 public BlockingArrayQueue(int capacity,int growBy) 108 { 109 _elements=new Object[capacity]; 110 _capacity=_elements.length; 111 _growCapacity=growBy; 112 _limit=Integer.MAX_VALUE; 113 } 114 115 /* ------------------------------------------------------------ */ 116 /** Create a growing limited partially blocking Queue. 117 * @param capacity Initial capacity 118 * @param growBy Incremental capacity. 119 * @param limit maximum capacity. 120 */ 121 public BlockingArrayQueue(int capacity,int growBy,int limit) 122 { 123 if (capacity>limit) 124 throw new IllegalArgumentException(); 125 126 _elements=new Object[capacity]; 127 _capacity=_elements.length; 128 _growCapacity=growBy; 129 _limit=limit; 130 } 131 132 /* ------------------------------------------------------------ */ 133 public int getCapacity() 134 { 135 return _capacity; 136 } 137 138 /* ------------------------------------------------------------ */ 139 public int getLimit() 140 { 141 return _limit; 142 } 143 144 /* ------------------------------------------------------------ */ 145 @Override 146 public boolean add(E e) 147 { 148 return offer(e); 149 } 150 151 /* ------------------------------------------------------------ */ 152 public E element() 153 { 154 E e = peek(); 155 if (e==null) 156 throw new NoSuchElementException(); 157 return e; 158 } 159 160 /* ------------------------------------------------------------ */ 161 @SuppressWarnings("unchecked") 162 public E peek() 163 { 164 if (_size.get() == 0) 165 return null; 166 167 E e = null; 168 _headLock.lock(); // Size cannot shrink 169 try 170 { 171 if (_size.get() > 0) 172 e = (E)_elements[_head]; 173 } 174 finally 175 { 176 _headLock.unlock(); 177 } 178 179 return e; 180 } 181 182 /* ------------------------------------------------------------ */ 183 public boolean offer(E e) 184 { 185 if (e == null) 186 throw new NullPointerException(); 187 188 boolean not_empty=false; 189 _tailLock.lock(); // size cannot grow... only shrink 190 try 191 { 192 if (_size.get() >= _limit) 193 return false; 194 195 // should we expand array? 196 if (_size.get()==_capacity) 197 { 198 _headLock.lock(); // Need to grow array 199 try 200 { 201 if (!grow()) 202 return false; 203 } 204 finally 205 { 206 _headLock.unlock(); 207 } 208 } 209 210 // add the element 211 _elements[_tail]=e; 212 _tail=(_tail+1)%_capacity; 213 214 not_empty=0==_size.getAndIncrement(); 215 216 } 217 finally 218 { 219 _tailLock.unlock(); 220 } 221 222 if (not_empty) 223 { 224 _headLock.lock(); 225 try 226 { 227 _notEmpty.signal(); 228 } 229 finally 230 { 231 _headLock.unlock(); 232 } 233 } 234 235 return true; 236 } 237 238 239 /* ------------------------------------------------------------ */ 240 @SuppressWarnings("unchecked") 241 public E poll() 242 { 243 if (_size.get() == 0) 244 return null; 245 246 E e = null; 247 _headLock.lock(); // Size cannot shrink 248 try 249 { 250 if (_size.get() > 0) 251 { 252 final int head=_head; 253 e = (E)_elements[head]; 254 _elements[head]=null; 255 _head=(head+1)%_capacity; 256 257 if (_size.decrementAndGet()>0) 258 _notEmpty.signal(); 259 } 260 } 261 finally 262 { 263 _headLock.unlock(); 264 } 265 266 return e; 267 } 268 269 /* ------------------------------------------------------------ */ 270 /** 271 * Retrieves and removes the head of this queue, waiting 272 * if no elements are present on this queue. 273 * @return the head of this queue 274 * @throws InterruptedException if interrupted while waiting. 275 */ 276 @SuppressWarnings("unchecked") 277 public E take() throws InterruptedException 278 { 279 E e = null; 280 _headLock.lockInterruptibly(); // Size cannot shrink 281 try 282 { 283 try 284 { 285 while (_size.get() == 0) 286 { 287 _notEmpty.await(); 288 } 289 } 290 catch (InterruptedException ie) 291 { 292 _notEmpty.signal(); 293 throw ie; 294 } 295 296 final int head=_head; 297 e = (E)_elements[head]; 298 _elements[head]=null; 299 _head=(head+1)%_capacity; 300 301 if (_size.decrementAndGet()>0) 302 _notEmpty.signal(); 303 } 304 finally 305 { 306 _headLock.unlock(); 307 } 308 309 return e; 310 } 311 312 /* ------------------------------------------------------------ */ 313 /** 314 * Retrieves and removes the head of this queue, waiting 315 * if necessary up to the specified wait time if no elements are 316 * present on this queue. 317 * @param time how long to wait before giving up, in units of 318 * <tt>unit</tt> 319 * @param unit a <tt>TimeUnit</tt> determining how to interpret the 320 * <tt>timeout</tt> parameter 321 * @return the head of this queue, or <tt>null</tt> if the 322 * specified waiting time elapses before an element is present. 323 * @throws InterruptedException if interrupted while waiting. 324 */ 325 @SuppressWarnings("unchecked") 326 public E poll(long time, TimeUnit unit) throws InterruptedException 327 { 328 329 E e = null; 330 331 long nanos = unit.toNanos(time); 332 333 _headLock.lockInterruptibly(); // Size cannot shrink 334 try 335 { 336 try 337 { 338 while (_size.get() == 0) 339 { 340 if (nanos<=0) 341 return null; 342 nanos = _notEmpty.awaitNanos(nanos); 343 } 344 } 345 catch (InterruptedException ie) 346 { 347 _notEmpty.signal(); 348 throw ie; 349 } 350 351 e = (E)_elements[_head]; 352 _elements[_head]=null; 353 _head=(_head+1)%_capacity; 354 355 if (_size.decrementAndGet()>0) 356 _notEmpty.signal(); 357 } 358 finally 359 { 360 _headLock.unlock(); 361 } 362 363 return e; 364 } 365 366 /* ------------------------------------------------------------ */ 367 public E remove() 368 { 369 E e=poll(); 370 if (e==null) 371 throw new NoSuchElementException(); 372 return e; 373 } 374 375 /* ------------------------------------------------------------ */ 376 @Override 377 public void clear() 378 { 379 _tailLock.lock(); 380 try 381 { 382 _headLock.lock(); 383 try 384 { 385 _head=0; 386 _tail=0; 387 _size.set(0); 388 } 389 finally 390 { 391 _headLock.unlock(); 392 } 393 } 394 finally 395 { 396 _tailLock.unlock(); 397 } 398 } 399 400 /* ------------------------------------------------------------ */ 401 @Override 402 public boolean isEmpty() 403 { 404 return _size.get()==0; 405 } 406 407 /* ------------------------------------------------------------ */ 408 @Override 409 public int size() 410 { 411 return _size.get(); 412 } 413 414 /* ------------------------------------------------------------ */ 415 @SuppressWarnings("unchecked") 416 @Override 417 public E get(int index) 418 { 419 _tailLock.lock(); 420 try 421 { 422 _headLock.lock(); 423 try 424 { 425 if (index<0 || index>=_size.get()) 426 throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")"); 427 int i = _head+index; 428 if (i>=_capacity) 429 i-=_capacity; 430 return (E)_elements[i]; 431 } 432 finally 433 { 434 _headLock.unlock(); 435 } 436 } 437 finally 438 { 439 _tailLock.unlock(); 440 } 441 } 442 443 /* ------------------------------------------------------------ */ 444 @Override 445 public E remove(int index) 446 { 447 _tailLock.lock(); 448 try 449 { 450 _headLock.lock(); 451 try 452 { 453 454 if (index<0 || index>=_size.get()) 455 throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")"); 456 457 int i = _head+index; 458 if (i>=_capacity) 459 i-=_capacity; 460 @SuppressWarnings("unchecked") 461 E old=(E)_elements[i]; 462 463 if (i<_tail) 464 { 465 System.arraycopy(_elements,i+1,_elements,i,_tail-i); 466 _tail--; 467 _size.decrementAndGet(); 468 } 469 else 470 { 471 System.arraycopy(_elements,i+1,_elements,i,_capacity-i-1); 472 if (_tail>0) 473 { 474 _elements[_capacity]=_elements[0]; 475 System.arraycopy(_elements,1,_elements,0,_tail-1); 476 _tail--; 477 } 478 else 479 _tail=_capacity-1; 480 481 _size.decrementAndGet(); 482 } 483 484 return old; 485 } 486 finally 487 { 488 _headLock.unlock(); 489 } 490 } 491 finally 492 { 493 _tailLock.unlock(); 494 } 495 } 496 497 /* ------------------------------------------------------------ */ 498 @Override 499 public E set(int index, E e) 500 { 501 if (e == null) 502 throw new NullPointerException(); 503 504 _tailLock.lock(); 505 try 506 { 507 _headLock.lock(); 508 try 509 { 510 511 if (index<0 || index>=_size.get()) 512 throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")"); 513 514 int i = _head+index; 515 if (i>=_capacity) 516 i-=_capacity; 517 @SuppressWarnings("unchecked") 518 E old=(E)_elements[i]; 519 _elements[i]=e; 520 return old; 521 } 522 finally 523 { 524 _headLock.unlock(); 525 } 526 } 527 finally 528 { 529 _tailLock.unlock(); 530 } 531 } 532 533 /* ------------------------------------------------------------ */ 534 @Override 535 public void add(int index, E e) 536 { 537 if (e == null) 538 throw new NullPointerException(); 539 540 _tailLock.lock(); 541 try 542 { 543 _headLock.lock(); 544 try 545 { 546 547 if (index<0 || index>_size.get()) 548 throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")"); 549 550 if (index==_size.get()) 551 { 552 add(e); 553 } 554 else 555 { 556 if (_tail==_head) 557 if (!grow()) 558 throw new IllegalStateException("full"); 559 560 int i = _head+index; 561 if (i>=_capacity) 562 i-=_capacity; 563 564 _size.incrementAndGet(); 565 _tail=(_tail+1)%_capacity; 566 567 568 if (i<_tail) 569 { 570 System.arraycopy(_elements,i,_elements,i+1,_tail-i); 571 _elements[i]=e; 572 } 573 else 574 { 575 if (_tail>0) 576 { 577 System.arraycopy(_elements,0,_elements,1,_tail); 578 _elements[0]=_elements[_capacity-1]; 579 } 580 581 System.arraycopy(_elements,i,_elements,i+1,_capacity-i-1); 582 _elements[i]=e; 583 } 584 } 585 } 586 finally 587 { 588 _headLock.unlock(); 589 } 590 } 591 finally 592 { 593 _tailLock.unlock(); 594 } 595 } 596 597 /* ------------------------------------------------------------ */ 598 private boolean grow() 599 { 600 if (_growCapacity<=0) 601 return false; 602 603 _tailLock.lock(); 604 try 605 { 606 _headLock.lock(); 607 try 608 { 609 final int head=_head; 610 final int tail=_tail; 611 final int new_tail; 612 613 Object[] elements=new Object[_capacity+_growCapacity]; 614 615 if (head<tail) 616 { 617 new_tail=tail-head; 618 System.arraycopy(_elements,head,elements,0,new_tail); 619 } 620 else if (head>tail || _size.get()>0) 621 { 622 new_tail=_capacity+tail-head; 623 int cut=_capacity-head; 624 System.arraycopy(_elements,head,elements,0,cut); 625 System.arraycopy(_elements,0,elements,cut,tail); 626 } 627 else 628 { 629 new_tail=0; 630 } 631 632 _elements=elements; 633 _capacity=_elements.length; 634 _head=0; 635 _tail=new_tail; 636 return true; 637 } 638 finally 639 { 640 _headLock.unlock(); 641 } 642 } 643 finally 644 { 645 _tailLock.unlock(); 646 } 647 648 } 649 650 /* ------------------------------------------------------------ */ 651 public int drainTo(Collection<? super E> c) 652 { 653 throw new UnsupportedOperationException(); 654 } 655 656 /* ------------------------------------------------------------ */ 657 public int drainTo(Collection<? super E> c, int maxElements) 658 { 659 throw new UnsupportedOperationException(); 660 } 661 662 /* ------------------------------------------------------------ */ 663 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException 664 { 665 throw new UnsupportedOperationException(); 666 } 667 668 /* ------------------------------------------------------------ */ 669 public void put(E o) throws InterruptedException 670 { 671 if (!add(o)) 672 throw new IllegalStateException("full"); 673 } 674 675 /* ------------------------------------------------------------ */ 676 public int remainingCapacity() 677 { 678 _tailLock.lock(); 679 try 680 { 681 _headLock.lock(); 682 try 683 { 684 return getCapacity()-size(); 685 } 686 finally 687 { 688 _headLock.unlock(); 689 } 690 } 691 finally 692 { 693 _tailLock.unlock(); 694 } 695 } 696 697 698 /* ------------------------------------------------------------ */ 699 long sumOfSpace() 700 { 701 // this method exists to stop clever optimisers removing the spacers 702 return _space0++ +_space1++ +_space2++ +_space3++ +_space4++ +_space5++ +_space6++ +_space7++; 703 } 704} 705