View Javadoc
1   /*
2     File: BoundedLinkedQueue.java
3   
4     Originally written by Doug Lea and released into the public domain.
5     This may be used for any purposes whatsoever without acknowledgment.
6     Thanks for the assistance and support of Sun Microsystems Labs,
7     and everyone contributing, testing, and using this code.
8   
9     History:
10    Date       Who                What
11    11Jun1998  dl               Create public version
12    17Jul1998  dl               Simplified by eliminating wait counts
13    25aug1998  dl               added peek
14    10oct1999  dl               lock on node object to ensure visibility
15    27jan2000  dl               setCapacity forces immediate permit reconcile
16  */
17  
18  package org.dbunit.util.concurrent;
19  
20  import org.slf4j.Logger;
21  import org.slf4j.LoggerFactory;
22  
23  /**
24   * A bounded variant of 
25   * LinkedQueue 
26   * class. This class may be
27   * preferable to 
28   * BoundedBuffer 
29   * because it allows a bit more
30   * concurency among puts and takes,  because it does not
31   * pre-allocate fixed storage for elements, and allows 
32   * capacity to be dynamically reset.
33   * On the other hand, since it allocates a node object
34   * on each put, it can be slow on systems with slow
35   * allocation and GC.
36   * Also, it may be
37   * preferable to 
38   * LinkedQueue 
39   * when you need to limit
40   * the capacity to prevent resource exhaustion. This protection
41   * normally does not hurt much performance-wise: When the
42   * queue is not empty or full, most puts and
43   * takes are still usually able to execute concurrently.
44   * @see LinkedQueue 
45   * @see BoundedBuffer 
46   * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p>
47   * 
48   * @author Doug Lea
49   * @author Last changed by: $Author$
50   * @version $Revision$ $Date$
51   * @since ? (pre 2.1)
52   */
53  public class BoundedLinkedQueue implements BoundedChannel {
54  
55      /**
56       * Logger for this class
57       */
58      private static final Logger logger = LoggerFactory.getLogger(BoundedLinkedQueue.class);
59  
60    /*
61     * It might be a bit nicer if this were declared as
62     * a subclass of LinkedQueue, or a sibling class of
63     * a common abstract class. It shares much of the
64     * basic design and bookkeeping fields. But too 
65     * many details differ to make this worth doing.
66     */
67  
68  
69  
70    /** 
71     * Dummy header node of list. The first actual node, if it exists, is always 
72     * at head_.next. After each take, the old first node becomes the head.
73     **/
74    protected LinkedNode head_;
75  
76    /** 
77     * The last node of list. Put() appends to list, so modifies last_
78     **/
79    protected LinkedNode last_;
80  
81  
82    /**
83     * Helper monitor. Ensures that only one put at a time executes.
84     **/
85  
86    protected final Object putGuard_ = new Object();
87  
88    /**
89     * Helper monitor. Protects and provides wait queue for takes
90     **/
91  
92    protected final Object takeGuard_ = new Object();
93  
94  
95    /** Number of elements allowed **/
96    protected int capacity_;
97  
98    
99    /**
100    * One side of a split permit count. 
101    * The counts represent permits to do a put. (The queue is full when zero).
102    * Invariant: putSidePutPermits_ + takeSidePutPermits_ = capacity_ - length.
103    * (The length is never separately recorded, so this cannot be
104    * checked explicitly.)
105    * To minimize contention between puts and takes, the
106    * put side uses up all of its permits before transfering them from
107    * the take side. The take side just increments the count upon each take.
108    * Thus, most puts and take can run independently of each other unless
109    * the queue is empty or full. 
110    * Initial value is queue capacity.
111    **/
112 
113   protected int putSidePutPermits_; 
114 
115   /** Number of takes since last reconcile **/
116   protected int takeSidePutPermits_ = 0;
117 
118 
119   /**
120    * Create a queue with the given capacity
121    * @exception IllegalArgumentException if capacity less or equal to zero
122    **/
123   public BoundedLinkedQueue(int capacity) {
124     if (capacity <= 0) throw new IllegalArgumentException();
125     capacity_ = capacity;
126     putSidePutPermits_ = capacity;
127     head_ =  new LinkedNode(null); 
128     last_ = head_;
129   }
130 
131   /**
132    * Create a queue with the current default capacity
133    **/
134 
135   public BoundedLinkedQueue() { 
136     this(DefaultChannelCapacity.get()); 
137   }
138 
139   /**
140    * Move put permits from take side to put side; 
141    * return the number of put side permits that are available.
142    * Call only under synch on puGuard_ AND this.
143    **/
144   protected final int reconcilePutPermits() {
145         logger.debug("reconcilePutPermits() - start");
146 
147     putSidePutPermits_ += takeSidePutPermits_;
148     takeSidePutPermits_ = 0;
149     return putSidePutPermits_;
150   }
151 
152 
153   /** Return the current capacity of this queue **/
154   public synchronized int capacity() {
155         logger.debug("capacity() - start");
156  return capacity_; }
157 
158 
159   /** 
160    * Return the number of elements in the queue.
161    * This is only a snapshot value, that may be in the midst 
162    * of changing. The returned value will be unreliable in the presence of
163    * active puts and takes, and should only be used as a heuristic
164    * estimate, for example for resource monitoring purposes.
165    **/
166   public synchronized int size() {
167         logger.debug("size() - start");
168 
169     /*
170       This should ideally synch on putGuard_, but
171       doing so would cause it to block waiting for an in-progress
172       put, which might be stuck. So we instead use whatever
173       value of putSidePutPermits_ that we happen to read.
174     */
175     return capacity_ - (takeSidePutPermits_ + putSidePutPermits_);
176   }
177 
178 
179   /**
180    * Reset the capacity of this queue.
181    * If the new capacity is less than the old capacity,
182    * existing elements are NOT removed, but
183    * incoming puts will not proceed until the number of elements
184    * is less than the new capacity.
185    * @exception IllegalArgumentException if capacity less or equal to zero
186    **/
187 
188   public void setCapacity(int newCapacity) {
189         logger.debug("setCapacity(newCapacity=" + newCapacity + ") - start");
190 
191     if (newCapacity <= 0) throw new IllegalArgumentException();
192     synchronized (putGuard_) {
193       synchronized(this) {
194         takeSidePutPermits_ += (newCapacity - capacity_);
195         capacity_ = newCapacity;
196         
197         // Force immediate reconcilation.
198         reconcilePutPermits();
199         notifyAll();
200       }
201     }
202   }
203 
204 
205   /** Main mechanics for take/poll **/
206   protected synchronized Object extract() {
207         logger.debug("extract() - start");
208 
209     synchronized(head_) {
210       Object x = null;
211       LinkedNode first = head_.next;
212       if (first != null) {
213         x = first.value;
214         first.value = null;
215         head_ = first; 
216         ++takeSidePutPermits_;
217         notify();
218       }
219       return x;
220     }
221   }
222 
223   public Object peek() {
224         logger.debug("peek() - start");
225 
226     synchronized(head_) {
227       LinkedNode first = head_.next;
228       if (first != null) 
229         return first.value;
230       else
231         return null;
232     }
233   }
234 
235   public Object take() throws InterruptedException {
236         logger.debug("take() - start");
237 
238     if (Thread.interrupted()) throw new InterruptedException();
239     Object x = extract();
240     if (x != null) 
241       return x;
242     else {
243       synchronized(takeGuard_) {
244         try {
245           for (;;) {
246             x = extract();
247             if (x != null) {
248               return x;
249             }
250             else {
251               takeGuard_.wait(); 
252             }
253           }
254         }
255         catch(InterruptedException ex) {
256           takeGuard_.notify();
257           throw ex; 
258         }
259       }
260     }
261   }
262 
263   public Object poll(long msecs) throws InterruptedException {
264         logger.debug("poll(msecs=" + msecs + ") - start");
265 
266     if (Thread.interrupted()) throw new InterruptedException();
267     Object x = extract();
268     if (x != null) 
269       return x;
270     else {
271       synchronized(takeGuard_) {
272         try {
273           long waitTime = msecs;
274           long start = (msecs <= 0)? 0: System.currentTimeMillis();
275           for (;;) {
276             x = extract();
277             if (x != null || waitTime <= 0) {
278               return x;
279             }
280             else {
281               takeGuard_.wait(waitTime); 
282               waitTime = msecs - (System.currentTimeMillis() - start);
283             }
284           }
285         }
286         catch(InterruptedException ex) {
287           takeGuard_.notify();
288           throw ex; 
289         }
290       }
291     }
292   }
293 
294   /** Notify a waiting take if needed **/
295   protected final void allowTake() {
296         logger.debug("allowTake() - start");
297 
298     synchronized(takeGuard_) {
299       takeGuard_.notify();
300     }
301   }
302 
303 
304   /**
305    * Create and insert a node.
306    * Call only under synch on putGuard_
307    **/
308   protected void insert(Object x) {
309         logger.debug("insert(x=" + x + ") - start");
310  
311     --putSidePutPermits_;
312     LinkedNode p = new LinkedNode(x);
313     synchronized(last_) {
314       last_.next = p;
315       last_ = p;
316     }
317   }
318 
319 
320   /* 
321      put and offer(ms) differ only in policy before insert/allowTake
322   */
323 
324   public void put(Object x) throws InterruptedException {
325         logger.debug("put(x=" + x + ") - start");
326 
327     if (x == null) throw new IllegalArgumentException();
328     if (Thread.interrupted()) throw new InterruptedException();
329 
330     synchronized(putGuard_) {
331 
332       if (putSidePutPermits_ <= 0) { // wait for permit. 
333         synchronized(this) {
334           if (reconcilePutPermits() <= 0) {
335             try {
336               for(;;) {
337                 wait();
338                 if (reconcilePutPermits() > 0) {
339                   break;
340                 }
341               }
342             }
343             catch (InterruptedException ex) {
344               notify(); 
345               throw ex; 
346             }
347           }
348         }
349       }
350       insert(x);
351     }
352     // call outside of lock to loosen put/take coupling
353     allowTake();
354   }
355 
356   public boolean offer(Object x, long msecs) throws InterruptedException {
357         logger.debug("offer(x=" + x + ", msecs=" + msecs + ") - start");
358 
359     if (x == null) throw new IllegalArgumentException();
360     if (Thread.interrupted()) throw new InterruptedException();
361 
362     synchronized(putGuard_) {
363 
364       if (putSidePutPermits_ <= 0) {
365         synchronized(this) {
366           if (reconcilePutPermits() <= 0) {
367             if (msecs <= 0)
368               return false;
369             else {
370               try {
371                 long waitTime = msecs;
372                 long start = System.currentTimeMillis();
373                 
374                 for(;;) {
375                   wait(waitTime);
376                   if (reconcilePutPermits() > 0) {
377                     break;
378                   }
379                   else {
380                     waitTime = msecs - (System.currentTimeMillis() - start);
381                     if (waitTime <= 0) {
382                       return false;
383                     }
384                   }
385                 }
386               }
387               catch (InterruptedException ex) {
388                 notify(); 
389                 throw ex; 
390               }
391             }
392           }
393         }
394       }
395 
396       insert(x);
397     }
398 
399     allowTake();
400     return true;
401   }
402 
403   public boolean isEmpty() {
404         logger.debug("isEmpty() - start");
405 
406     synchronized(head_) {
407       return head_.next == null;
408     }
409   }    
410     
411 }