View Javadoc
1   /*
2     File: LinkedQueue.java
3   
4     Originally written by Doug Lea and released into the public domain.
5     This may be used for any purposes whatsoever without acknowledgment.
6     Thanks for the assistance and support of Sun Microsystems Labs,
7     and everyone contributing, testing, and using this code.
8   
9     History:
10    Date       Who                What
11    11Jun1998  dl               Create public version
12    25aug1998  dl               added peek
13    10dec1998  dl               added isEmpty
14    10oct1999  dl               lock on node object to ensure visibility
15  */
16  
17  package org.dbunit.util.concurrent;
18  
19  import org.slf4j.Logger;
20  import org.slf4j.LoggerFactory;
21  
22  /**
23   * A linked list based channel implementation.
24   * The algorithm avoids contention between puts
25   * and takes when the queue is not empty. 
26   * Normally a put and a take can proceed simultaneously. 
27   * (Although it does not allow multiple concurrent puts or takes.)
28   * This class tends to perform more efficently than
29   * other Channel implementations in producer/consumer
30   * applications.
31   * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
32   * 
33   * 
34   * @author Doug Lea
35   * @author Last changed by: $Author$
36   * @version $Revision$ $Date$
37   * @since ? (pre 2.1)
38   */
39  public class LinkedQueue implements Channel {
40  
41      /**
42       * Logger for this class
43       */
44      private static final Logger logger = LoggerFactory.getLogger(LinkedQueue.class);
45  
46  
47    /** 
48     * Dummy header node of list. The first actual node, if it exists, is always 
49     * at head_.next. After each take, the old first node becomes the head.
50     **/
51    protected LinkedNode head_;         
52  
53    /**
54     * Helper monitor for managing access to last node.
55     **/
56    protected final Object putLock_ = new Object(); 
57  
58    /** 
59     * The last node of list. Put() appends to list, so modifies last_
60     **/
61    protected LinkedNode last_;         
62  
63    /**
64     * The number of threads waiting for a take.
65     * Notifications are provided in put only if greater than zero.
66     * The bookkeeping is worth it here since in reasonably balanced
67     * usages, the notifications will hardly ever be necessary, so
68     * the call overhead to notify can be eliminated.
69     **/
70    protected int waitingForTake_ = 0;  
71  
72    public LinkedQueue() {
73      head_ = new LinkedNode(null); 
74      last_ = head_;
75    }
76  
77    /** Main mechanics for put/offer **/
78    protected void insert(Object x) {
79          logger.debug("insert(x=" + x + ") - start");
80   
81      synchronized(putLock_) {
82        LinkedNode p = new LinkedNode(x);
83        synchronized(last_) {
84          last_.next = p;
85          last_ = p;
86        }
87        if (waitingForTake_ > 0)
88          putLock_.notify();
89      }
90    }
91  
92    /** Main mechanics for take/poll **/
93    protected synchronized Object extract() {
94          logger.debug("extract() - start");
95  
96      synchronized(head_) {
97        Object x = null;
98        LinkedNode first = head_.next;
99        if (first != null) {
100         x = first.value;
101         first.value = null;
102         head_ = first; 
103       }
104       return x;
105     }
106   }
107 
108 
109   public void put(Object x) throws InterruptedException {
110         logger.debug("put(x=" + x + ") - start");
111 
112     if (x == null) throw new IllegalArgumentException();
113     if (Thread.interrupted()) throw new InterruptedException();
114     insert(x); 
115   }
116 
117   public boolean offer(Object x, long msecs) throws InterruptedException {
118         logger.debug("offer(x=" + x + ", msecs=" + msecs + ") - start");
119  
120     if (x == null) throw new IllegalArgumentException();
121     if (Thread.interrupted()) throw new InterruptedException();
122     insert(x); 
123     return true;
124   }
125 
126   public Object take() throws InterruptedException {
127         logger.debug("take() - start");
128 
129     if (Thread.interrupted()) throw new InterruptedException();
130     // try to extract. If fail, then enter wait-based retry loop
131     Object x = extract();
132     if (x != null)
133       return x;
134     else { 
135       synchronized(putLock_) {
136         try {
137           ++waitingForTake_;
138           for (;;) {
139             x = extract();
140             if (x != null) {
141               --waitingForTake_;
142               return x;
143             }
144             else {
145               putLock_.wait(); 
146             }
147           }
148         }
149         catch(InterruptedException ex) {
150           --waitingForTake_; 
151           putLock_.notify();
152           throw ex; 
153         }
154       }
155     }
156   }
157 
158   public Object peek() {
159         logger.debug("peek() - start");
160 
161     synchronized(head_) {
162       LinkedNode first = head_.next;
163       if (first != null) 
164         return first.value;
165       else 
166         return null;
167     }
168   }    
169 
170 
171   public boolean isEmpty() {
172         logger.debug("isEmpty() - start");
173 
174     synchronized(head_) {
175       return head_.next == null;
176     }
177   }    
178 
179   public Object poll(long msecs) throws InterruptedException {
180         logger.debug("poll(msecs=" + msecs + ") - start");
181 
182     if (Thread.interrupted()) throw new InterruptedException();
183     Object x = extract();
184     if (x != null) 
185       return x;
186     else {
187       synchronized(putLock_) {
188         try {
189           long waitTime = msecs;
190           long start = (msecs <= 0)? 0 : System.currentTimeMillis();
191           ++waitingForTake_;
192           for (;;) {
193             x = extract();
194             if (x != null || waitTime <= 0) {
195               --waitingForTake_;
196               return x;
197             }
198             else {
199               putLock_.wait(waitTime); 
200               waitTime = msecs - (System.currentTimeMillis() - start);
201             }
202           }
203         }
204         catch(InterruptedException ex) {
205           --waitingForTake_; 
206           putLock_.notify();
207           throw ex; 
208         }
209       }
210     }
211   }
212 }
213 
214