BoundedLinkedQueue.java

  1. /*
  2.   File: BoundedLinkedQueue.java

  3.   Originally written by Doug Lea and released into the public domain.
  4.   This may be used for any purposes whatsoever without acknowledgment.
  5.   Thanks for the assistance and support of Sun Microsystems Labs,
  6.   and everyone contributing, testing, and using this code.

  7.   History:
  8.   Date       Who                What
  9.   11Jun1998  dl               Create public version
  10.   17Jul1998  dl               Simplified by eliminating wait counts
  11.   25aug1998  dl               added peek
  12.   10oct1999  dl               lock on node object to ensure visibility
  13.   27jan2000  dl               setCapacity forces immediate permit reconcile
  14. */

  15. package org.dbunit.util.concurrent;

  16. import org.slf4j.Logger;
  17. import org.slf4j.LoggerFactory;

  18. /**
  19.  * A bounded variant of
  20.  * LinkedQueue
  21.  * class. This class may be
  22.  * preferable to
  23.  * BoundedBuffer
  24.  * because it allows a bit more
  25.  * concurency among puts and takes,  because it does not
  26.  * pre-allocate fixed storage for elements, and allows
  27.  * capacity to be dynamically reset.
  28.  * On the other hand, since it allocates a node object
  29.  * on each put, it can be slow on systems with slow
  30.  * allocation and GC.
  31.  * Also, it may be
  32.  * preferable to
  33.  * LinkedQueue
  34.  * when you need to limit
  35.  * the capacity to prevent resource exhaustion. This protection
  36.  * normally does not hurt much performance-wise: When the
  37.  * queue is not empty or full, most puts and
  38.  * takes are still usually able to execute concurrently.
  39.  * @see LinkedQueue
  40.  * @see BoundedBuffer
  41.  * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p>
  42.  *
  43.  * @author Doug Lea
  44.  * @author Last changed by: $Author$
  45.  * @version $Revision$ $Date$
  46.  * @since ? (pre 2.1)
  47.  */
  48. public class BoundedLinkedQueue implements BoundedChannel {

  49.     /**
  50.      * Logger for this class
  51.      */
  52.     private static final Logger logger = LoggerFactory.getLogger(BoundedLinkedQueue.class);

  53.   /*
  54.    * It might be a bit nicer if this were declared as
  55.    * a subclass of LinkedQueue, or a sibling class of
  56.    * a common abstract class. It shares much of the
  57.    * basic design and bookkeeping fields. But too
  58.    * many details differ to make this worth doing.
  59.    */



  60.   /**
  61.    * Dummy header node of list. The first actual node, if it exists, is always
  62.    * at head_.next. After each take, the old first node becomes the head.
  63.    **/
  64.   protected LinkedNode head_;

  65.   /**
  66.    * The last node of list. Put() appends to list, so modifies last_
  67.    **/
  68.   protected LinkedNode last_;


  69.   /**
  70.    * Helper monitor. Ensures that only one put at a time executes.
  71.    **/

  72.   protected final Object putGuard_ = new Object();

  73.   /**
  74.    * Helper monitor. Protects and provides wait queue for takes
  75.    **/

  76.   protected final Object takeGuard_ = new Object();


  77.   /** Number of elements allowed **/
  78.   protected int capacity_;

  79.  
  80.   /**
  81.    * One side of a split permit count.
  82.    * The counts represent permits to do a put. (The queue is full when zero).
  83.    * Invariant: putSidePutPermits_ + takeSidePutPermits_ = capacity_ - length.
  84.    * (The length is never separately recorded, so this cannot be
  85.    * checked explicitly.)
  86.    * To minimize contention between puts and takes, the
  87.    * put side uses up all of its permits before transfering them from
  88.    * the take side. The take side just increments the count upon each take.
  89.    * Thus, most puts and take can run independently of each other unless
  90.    * the queue is empty or full.
  91.    * Initial value is queue capacity.
  92.    **/

  93.   protected int putSidePutPermits_;

  94.   /** Number of takes since last reconcile **/
  95.   protected int takeSidePutPermits_ = 0;


  96.   /**
  97.    * Create a queue with the given capacity
  98.    * @exception IllegalArgumentException if capacity less or equal to zero
  99.    **/
  100.   public BoundedLinkedQueue(int capacity) {
  101.     if (capacity <= 0) throw new IllegalArgumentException();
  102.     capacity_ = capacity;
  103.     putSidePutPermits_ = capacity;
  104.     head_ =  new LinkedNode(null);
  105.     last_ = head_;
  106.   }

  107.   /**
  108.    * Create a queue with the current default capacity
  109.    **/

  110.   public BoundedLinkedQueue() {
  111.     this(DefaultChannelCapacity.get());
  112.   }

  113.   /**
  114.    * Move put permits from take side to put side;
  115.    * return the number of put side permits that are available.
  116.    * Call only under synch on puGuard_ AND this.
  117.    **/
  118.   protected final int reconcilePutPermits() {
  119.         logger.debug("reconcilePutPermits() - start");

  120.     putSidePutPermits_ += takeSidePutPermits_;
  121.     takeSidePutPermits_ = 0;
  122.     return putSidePutPermits_;
  123.   }


  124.   /** Return the current capacity of this queue **/
  125.   public synchronized int capacity() {
  126.         logger.debug("capacity() - start");
  127.  return capacity_; }


  128.   /**
  129.    * Return the number of elements in the queue.
  130.    * This is only a snapshot value, that may be in the midst
  131.    * of changing. The returned value will be unreliable in the presence of
  132.    * active puts and takes, and should only be used as a heuristic
  133.    * estimate, for example for resource monitoring purposes.
  134.    **/
  135.   public synchronized int size() {
  136.         logger.debug("size() - start");

  137.     /*
  138.       This should ideally synch on putGuard_, but
  139.       doing so would cause it to block waiting for an in-progress
  140.       put, which might be stuck. So we instead use whatever
  141.       value of putSidePutPermits_ that we happen to read.
  142.     */
  143.     return capacity_ - (takeSidePutPermits_ + putSidePutPermits_);
  144.   }


  145.   /**
  146.    * Reset the capacity of this queue.
  147.    * If the new capacity is less than the old capacity,
  148.    * existing elements are NOT removed, but
  149.    * incoming puts will not proceed until the number of elements
  150.    * is less than the new capacity.
  151.    * @exception IllegalArgumentException if capacity less or equal to zero
  152.    **/

  153.   public void setCapacity(int newCapacity) {
  154.         logger.debug("setCapacity(newCapacity=" + newCapacity + ") - start");

  155.     if (newCapacity <= 0) throw new IllegalArgumentException();
  156.     synchronized (putGuard_) {
  157.       synchronized(this) {
  158.         takeSidePutPermits_ += (newCapacity - capacity_);
  159.         capacity_ = newCapacity;
  160.        
  161.         // Force immediate reconcilation.
  162.         reconcilePutPermits();
  163.         notifyAll();
  164.       }
  165.     }
  166.   }


  167.   /** Main mechanics for take/poll **/
  168.   protected synchronized Object extract() {
  169.         logger.debug("extract() - start");

  170.     synchronized(head_) {
  171.       Object x = null;
  172.       LinkedNode first = head_.next;
  173.       if (first != null) {
  174.         x = first.value;
  175.         first.value = null;
  176.         head_ = first;
  177.         ++takeSidePutPermits_;
  178.         notify();
  179.       }
  180.       return x;
  181.     }
  182.   }

  183.   public Object peek() {
  184.         logger.debug("peek() - start");

  185.     synchronized(head_) {
  186.       LinkedNode first = head_.next;
  187.       if (first != null)
  188.         return first.value;
  189.       else
  190.         return null;
  191.     }
  192.   }

  193.   public Object take() throws InterruptedException {
  194.         logger.debug("take() - start");

  195.     if (Thread.interrupted()) throw new InterruptedException();
  196.     Object x = extract();
  197.     if (x != null)
  198.       return x;
  199.     else {
  200.       synchronized(takeGuard_) {
  201.         try {
  202.           for (;;) {
  203.             x = extract();
  204.             if (x != null) {
  205.               return x;
  206.             }
  207.             else {
  208.               takeGuard_.wait();
  209.             }
  210.           }
  211.         }
  212.         catch(InterruptedException ex) {
  213.           takeGuard_.notify();
  214.           throw ex;
  215.         }
  216.       }
  217.     }
  218.   }

  219.   public Object poll(long msecs) throws InterruptedException {
  220.         logger.debug("poll(msecs=" + msecs + ") - start");

  221.     if (Thread.interrupted()) throw new InterruptedException();
  222.     Object x = extract();
  223.     if (x != null)
  224.       return x;
  225.     else {
  226.       synchronized(takeGuard_) {
  227.         try {
  228.           long waitTime = msecs;
  229.           long start = (msecs <= 0)? 0: System.currentTimeMillis();
  230.           for (;;) {
  231.             x = extract();
  232.             if (x != null || waitTime <= 0) {
  233.               return x;
  234.             }
  235.             else {
  236.               takeGuard_.wait(waitTime);
  237.               waitTime = msecs - (System.currentTimeMillis() - start);
  238.             }
  239.           }
  240.         }
  241.         catch(InterruptedException ex) {
  242.           takeGuard_.notify();
  243.           throw ex;
  244.         }
  245.       }
  246.     }
  247.   }

  248.   /** Notify a waiting take if needed **/
  249.   protected final void allowTake() {
  250.         logger.debug("allowTake() - start");

  251.     synchronized(takeGuard_) {
  252.       takeGuard_.notify();
  253.     }
  254.   }


  255.   /**
  256.    * Create and insert a node.
  257.    * Call only under synch on putGuard_
  258.    **/
  259.   protected void insert(Object x) {
  260.         logger.debug("insert(x=" + x + ") - start");
  261.  
  262.     --putSidePutPermits_;
  263.     LinkedNode p = new LinkedNode(x);
  264.     synchronized(last_) {
  265.       last_.next = p;
  266.       last_ = p;
  267.     }
  268.   }


  269.   /*
  270.      put and offer(ms) differ only in policy before insert/allowTake
  271.   */

  272.   public void put(Object x) throws InterruptedException {
  273.         logger.debug("put(x=" + x + ") - start");

  274.     if (x == null) throw new IllegalArgumentException();
  275.     if (Thread.interrupted()) throw new InterruptedException();

  276.     synchronized(putGuard_) {

  277.       if (putSidePutPermits_ <= 0) { // wait for permit.
  278.         synchronized(this) {
  279.           if (reconcilePutPermits() <= 0) {
  280.             try {
  281.               for(;;) {
  282.                 wait();
  283.                 if (reconcilePutPermits() > 0) {
  284.                   break;
  285.                 }
  286.               }
  287.             }
  288.             catch (InterruptedException ex) {
  289.               notify();
  290.               throw ex;
  291.             }
  292.           }
  293.         }
  294.       }
  295.       insert(x);
  296.     }
  297.     // call outside of lock to loosen put/take coupling
  298.     allowTake();
  299.   }

  300.   public boolean offer(Object x, long msecs) throws InterruptedException {
  301.         logger.debug("offer(x=" + x + ", msecs=" + msecs + ") - start");

  302.     if (x == null) throw new IllegalArgumentException();
  303.     if (Thread.interrupted()) throw new InterruptedException();

  304.     synchronized(putGuard_) {

  305.       if (putSidePutPermits_ <= 0) {
  306.         synchronized(this) {
  307.           if (reconcilePutPermits() <= 0) {
  308.             if (msecs <= 0)
  309.               return false;
  310.             else {
  311.               try {
  312.                 long waitTime = msecs;
  313.                 long start = System.currentTimeMillis();
  314.                
  315.                 for(;;) {
  316.                   wait(waitTime);
  317.                   if (reconcilePutPermits() > 0) {
  318.                     break;
  319.                   }
  320.                   else {
  321.                     waitTime = msecs - (System.currentTimeMillis() - start);
  322.                     if (waitTime <= 0) {
  323.                       return false;
  324.                     }
  325.                   }
  326.                 }
  327.               }
  328.               catch (InterruptedException ex) {
  329.                 notify();
  330.                 throw ex;
  331.               }
  332.             }
  333.           }
  334.         }
  335.       }

  336.       insert(x);
  337.     }

  338.     allowTake();
  339.     return true;
  340.   }

  341.   public boolean isEmpty() {
  342.         logger.debug("isEmpty() - start");

  343.     synchronized(head_) {
  344.       return head_.next == null;
  345.     }
  346.   }    
  347.    
  348. }