View Javadoc
1   /*
2     File: SynchronousChannel.java
3   
4     Originally written by Doug Lea and released into the public domain.
5     This may be used for any purposes whatsoever without acknowledgment.
6     Thanks for the assistance and support of Sun Microsystems Labs,
7     and everyone contributing, testing, and using this code.
8   
9     History:
10    Date       Who                What
11    11Jun1998  dl               Create public version
12    17Jul1998  dl               Disabled direct semaphore permit check
13    31Jul1998  dl               Replaced main algorithm with one with
14                                better scaling and fairness properties.
15    25aug1998  dl               added peek
16    24Nov2001  dl               Replaced main algorithm with faster one.
17  */
18  
19  package org.dbunit.util.concurrent;
20  
21  import org.slf4j.Logger;
22  import org.slf4j.LoggerFactory;
23  
24  /**
25   * A rendezvous channel, similar to those used in CSP and Ada. Each
26   * put must wait for a take, and vice versa. Synchronous channels
27   * are well suited for handoff designs, in which an object running in
28   * one thread must synch up with an object running in another thread
29   * in order to hand it some information, event, or task. 
30   * <p> If you only need threads to synch up without
31   * exchanging information, consider using a Barrier. If you need
32   * bidirectional exchanges, consider using a Rendezvous.  <p>
33   *
34   * <p>Read the
35   * <a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html">introduction to this package</a> 
36   * for more details.
37   * 
38   * @author Doug Lea
39   * @author Last changed by: $Author$
40   * @version $Revision$ $Date$
41   * @since ? (pre 2.1)
42   * @see <a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/Rendezvous.html">Rendezvous</a>
43   * @see <a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/CyclicBarrier.html">CyclicBarrier</a>
44   */
45  public class SynchronousChannel implements BoundedChannel {
46  
47      /**
48       * Logger for this class
49       */
50      private static final Logger logger = LoggerFactory.getLogger(SynchronousChannel.class);
51  
52    /*
53      This implementation divides actions into two cases for puts:
54  
55      * An arriving putter that does not already have a waiting taker 
56        creates a node holding item, and then waits for a taker to take it.
57      * An arriving putter that does already have a waiting taker fills
58        the slot node created by the taker, and notifies it to continue.
59  
60     And symmetrically, two for takes:
61  
62      * An arriving taker that does not already have a waiting putter
63        creates an empty slot node, and then waits for a putter to fill it.
64      * An arriving taker that does already have a waiting putter takes
65        item from the node created by the putter, and notifies it to continue.
66  
67     This requires keeping two simple queues: waitingPuts and waitingTakes.
68     
69     When a put or take waiting for the actions of its counterpart
70     aborts due to interruption or timeout, it marks the node
71     it created as "CANCELLED", which causes its counterpart to retry
72     the entire put or take sequence.
73    */
74  
75    /** 
76     * Special marker used in queue nodes to indicate that
77     * the thread waiting for a change in the node has timed out
78     * or been interrupted.
79     **/
80    protected static final Object CANCELLED = new Object();
81    
82    /**
83     * Simple FIFO queue class to hold waiting puts/takes.
84     **/
85    protected static class Queue {
86  
87          /**
88           * Logger for this class
89           */
90          private static final Logger logger = LoggerFactory.getLogger(Queue.class);
91  
92      protected LinkedNode head;
93      protected LinkedNode last;
94  
95      protected void enq(LinkedNode p) {
96              logger.debug("enq(p={}) - start", p);
97   
98        if (last == null) 
99          last = head = p;
100       else 
101         last = last.next = p;
102     }
103 
104     protected LinkedNode deq() {
105             logger.debug("deq() - start");
106 
107       LinkedNode p = head;
108       if (p != null && (head = p.next) == null) 
109         last = null;
110       return p;
111     }
112   }
113 
114   protected final Queue waitingPuts = new Queue();
115   protected final Queue waitingTakes = new Queue();
116 
117   /**
118    * @return zero --
119    * Synchronous channels have no internal capacity.
120    **/
121   public int capacity() {
122         logger.debug("capacity() - start");
123  return 0; }
124 
125   /**
126    * @return null --
127    * Synchronous channels do not hold contents unless actively taken
128    **/
129   public Object peek() {
130         logger.debug("peek() - start");
131   return null;  }
132 
133 
134   public void put(Object x) throws InterruptedException {
135         logger.debug("put(x={}) - start", x);
136 
137     if (x == null) throw new IllegalArgumentException();
138 
139     // This code is conceptually straightforward, but messy
140     // because we need to intertwine handling of put-arrives first
141     // vs take-arrives first cases.
142 
143     // Outer loop is to handle retry due to canceled waiting taker
144     for (;;) { 
145 
146       // Get out now if we are interrupted
147       if (Thread.interrupted()) throw new InterruptedException();
148 
149       // Exactly one of item or slot will be not-null at end of
150       // synchronized block, depending on whether a put or a take
151       // arrived first. 
152       LinkedNode slot;
153       LinkedNode item = null;
154 
155       synchronized(this) {
156         // Try to match up with a waiting taker; fill and signal it below
157         slot = waitingTakes.deq();
158 
159         // If no takers yet, create a node and wait below
160         if (slot == null) 
161           waitingPuts.enq(item = new LinkedNode(x));
162       }
163 
164       if (slot != null) { // There is a waiting taker.
165         // Fill in the slot created by the taker and signal taker to
166         // continue.
167         synchronized(slot) {
168           if (slot.value != CANCELLED) {
169             slot.value = x;
170             slot.notify();
171             return;
172           }
173           // else the taker has canceled, so retry outer loop
174         }
175       }
176 
177       else { 
178         // Wait for a taker to arrive and take the item.
179         synchronized(item) {
180           try {
181             while (item.value != null)
182               item.wait();
183             return;
184           }
185           catch (InterruptedException ie) {
186             // If item was taken, return normally but set interrupt status
187             if (item.value == null) {
188               Thread.currentThread().interrupt();
189               return;
190             }
191             else {
192               item.value = CANCELLED;
193               throw ie;
194             }
195           }
196         }
197       }
198     }
199   }
200 
201   public Object take() throws InterruptedException {
202         logger.debug("take() - start");
203 
204     // Entirely symmetric to put()
205 
206     for (;;) {
207       if (Thread.interrupted()) throw new InterruptedException();
208 
209       LinkedNode item;
210       LinkedNode slot = null;
211 
212       synchronized(this) {
213         item = waitingPuts.deq();
214         if (item == null) 
215           waitingTakes.enq(slot = new LinkedNode());
216       }
217 
218       if (item != null) {
219         synchronized(item) {
220           Object x = item.value;
221           if (x != CANCELLED) {
222             item.value = null;
223             item.next = null;
224             item.notify();
225             return x;
226           }
227         }
228       }
229 
230       else {
231         synchronized(slot) {
232           try {
233             for (;;) {
234               Object x = slot.value;
235               if (x != null) {
236                 slot.value = null;
237                 slot.next = null;
238                 return x;
239               }
240               else
241                 slot.wait();
242             }
243           }
244           catch(InterruptedException ie) {
245             Object x = slot.value;
246             if (x != null) {
247               slot.value = null;
248               slot.next = null;
249               Thread.currentThread().interrupt();
250               return x;
251             }
252             else {
253               slot.value = CANCELLED;
254               throw ie;
255             }
256           }
257         }
258       }
259     }
260   }
261 
262   /*
263     Offer and poll are just like put and take, except even messier.
264    */
265 
266 
267   public boolean offer(Object x, long msecs) throws InterruptedException {
268 	  if(logger.isDebugEnabled())
269         logger.debug("offer(x={}, msecs={}) - start", x, String.valueOf(msecs));
270 
271     if (x == null) throw new IllegalArgumentException();
272     long waitTime = msecs;
273     long startTime = 0; // lazily initialize below if needed
274     
275     for (;;) {
276       if (Thread.interrupted()) throw new InterruptedException();
277 
278       LinkedNode slot;
279       LinkedNode item = null;
280 
281       synchronized(this) {
282         slot = waitingTakes.deq();
283         if (slot == null) {
284           if (waitTime <= 0) 
285             return false;
286           else 
287             waitingPuts.enq(item = new LinkedNode(x));
288         }
289       }
290 
291       if (slot != null) {
292         synchronized(slot) {
293           if (slot.value != CANCELLED) {
294             slot.value = x;
295             slot.notify();
296             return true;
297           }
298         }
299       }
300 
301       long now = System.currentTimeMillis();
302       if (startTime == 0) 
303         startTime = now;
304       else 
305         waitTime = msecs - (now - startTime);
306 
307       if (item != null) {
308         synchronized(item) {
309           try {
310             for (;;) {
311               if (item.value == null) 
312                 return true;
313               if (waitTime <= 0) {
314                 item.value = CANCELLED;
315                 return false;
316               }
317               item.wait(waitTime);
318               waitTime = msecs - (System.currentTimeMillis() - startTime);
319             }
320           }
321           catch (InterruptedException ie) {
322             if (item.value == null) {
323               Thread.currentThread().interrupt();
324               return true;
325             }
326             else {
327               item.value = CANCELLED;
328               throw ie;
329             }
330           }
331         }
332       }
333     }
334   }
335 
336   public Object poll(long msecs) throws InterruptedException {
337 	  if(logger.isDebugEnabled())
338         logger.debug("poll(msecs={}) - start", String.valueOf(msecs));
339 
340     long waitTime = msecs;
341     long startTime = 0;
342 
343     for (;;) {
344       if (Thread.interrupted()) throw new InterruptedException();
345 
346       LinkedNode item;
347       LinkedNode slot = null;
348 
349       synchronized(this) {
350         item = waitingPuts.deq();
351         if (item == null) {
352           if (waitTime <= 0) 
353             return null;
354           else 
355             waitingTakes.enq(slot = new LinkedNode());
356         }
357       }
358 
359       if (item != null) {
360         synchronized(item) {
361           Object x = item.value;
362           if (x != CANCELLED) {
363             item.value = null;
364             item.next = null;
365             item.notify();
366             return x;
367           }
368         }
369       }
370 
371       long now = System.currentTimeMillis();
372       if (startTime == 0) 
373         startTime = now;
374       else 
375         waitTime = msecs - (now - startTime);
376 
377       if (slot != null) {
378         synchronized(slot) {
379           try {
380             for (;;) {
381               Object x = slot.value;
382               if (x != null) {
383                 slot.value = null;
384                 slot.next = null;
385                 return x;
386               }
387               if (waitTime <= 0) {
388                 slot.value = CANCELLED;
389                 return null;
390               }
391               slot.wait(waitTime);
392               waitTime = msecs - (System.currentTimeMillis() - startTime);
393             }
394           }
395           catch(InterruptedException ie) {
396             Object x = slot.value;
397             if (x != null) {
398               slot.value = null;
399               slot.next = null;
400               Thread.currentThread().interrupt();
401               return x;
402             }
403             else {
404               slot.value = CANCELLED;
405               throw ie;
406             }
407           }
408         }
409       }
410     }
411   }
412 
413 }