View Javadoc
1   /*
2     File: SemaphoreControlledChannel.java
3   
4     Originally written by Doug Lea and released into the public domain.
5     This may be used for any purposes whatsoever without acknowledgment.
6     Thanks for the assistance and support of Sun Microsystems Labs,
7     and everyone contributing, testing, and using this code.
8   
9     History:
10    Date       Who                What
11    16Jun1998  dl               Create public version
12     5Aug1998  dl               replaced int counters with longs
13    08dec2001  dl               reflective constructor now uses longs too.
14  */
15  
16  package org.dbunit.util.concurrent;
17  
18  import org.slf4j.Logger;
19  import org.slf4j.LoggerFactory;
20  
21  import java.lang.reflect.Constructor;
22  import java.lang.reflect.InvocationTargetException;
23  
24  /**
25   * Abstract class for channels that use Semaphores to
26   * control puts and takes.
27   * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
28   * 
29   * @author Doug Lea
30   * @author Last changed by: $Author$
31   * @version $Revision$ $Date$
32   * @since ? (pre 2.1)
33   */
34  public abstract class SemaphoreControlledChannel implements BoundedChannel {
35  
36      /**
37       * Logger for this class
38       */
39      private static final Logger logger = LoggerFactory.getLogger(SemaphoreControlledChannel.class);
40  
41    protected final Semaphore putGuard_;
42    protected final Semaphore takeGuard_;
43    protected int capacity_;
44  
45    /**
46     * Create a channel with the given capacity and default
47     * semaphore implementation
48     * @exception IllegalArgumentException if capacity less or equal to zero
49     **/
50  
51    public SemaphoreControlledChannel(int capacity) 
52     throws IllegalArgumentException {
53      if (capacity <= 0) throw new IllegalArgumentException();
54      capacity_ = capacity;
55      putGuard_ = new Semaphore(capacity);
56      takeGuard_ = new Semaphore(0);
57    }
58  
59  
60    /**
61     * Create a channel with the given capacity and 
62     * semaphore implementations instantiated from the supplied class
63     * @exception IllegalArgumentException if capacity less or equal to zero.
64     * @exception NoSuchMethodException If class does not have constructor 
65     * that intializes permits
66     * @exception SecurityException if constructor information 
67     * not accessible
68     * @exception InstantiationException if semaphore class is abstract
69     * @exception IllegalAccessException if constructor cannot be called
70     * @exception InvocationTargetException if semaphore constructor throws an
71     * exception
72     **/
73    public SemaphoreControlledChannel(int capacity, Class semaphoreClass) 
74     throws IllegalArgumentException, 
75            NoSuchMethodException, 
76            SecurityException, 
77            InstantiationException, 
78            IllegalAccessException, 
79            InvocationTargetException {
80      if (capacity <= 0) throw new IllegalArgumentException();
81      capacity_ = capacity;
82      Class[] longarg = { Long.TYPE };
83      Constructor ctor = semaphoreClass.getDeclaredConstructor(longarg);
84      Long[] cap = { new Long(capacity) };
85      putGuard_ = (Semaphore)(ctor.newInstance(cap));
86      Long[] zero = { new Long(0) };
87      takeGuard_ = (Semaphore)(ctor.newInstance(zero));
88    }
89  
90  
91  
92    public int  capacity() {
93          logger.debug("capacity() - start");
94   return capacity_; }
95  
96    /** 
97     * Return the number of elements in the buffer.
98     * This is only a snapshot value, that may change
99     * immediately after returning.
100    **/
101 
102   public int size() {
103         logger.debug("size() - start");
104  return (int)(takeGuard_.permits());  }
105 
106   /**
107    * Internal mechanics of put.
108    **/
109   protected abstract void insert(Object x);
110 
111   /**
112    * Internal mechanics of take.
113    **/
114   protected abstract Object extract();
115 
116   public void put(Object x) throws InterruptedException {
117         logger.debug("put(x=" + x + ") - start");
118 
119     if (x == null) throw new IllegalArgumentException();
120     if (Thread.interrupted()) throw new InterruptedException();
121     putGuard_.acquire();
122     try {
123       insert(x);
124       takeGuard_.release();
125     }
126     catch (ClassCastException ex) {
127       putGuard_.release();
128       throw ex;
129     }
130   }
131 
132   public boolean offer(Object x, long msecs) throws InterruptedException {
133         logger.debug("offer(x=" + x + ", msecs=" + msecs + ") - start");
134 
135     if (x == null) throw new IllegalArgumentException();
136     if (Thread.interrupted()) throw new InterruptedException();
137     if (!putGuard_.attempt(msecs)) 
138       return false;
139     else {
140       try {
141         insert(x);
142         takeGuard_.release();
143         return true;
144       }
145       catch (ClassCastException ex) {
146         putGuard_.release();
147         throw ex;
148       }
149     }
150   }
151 
152   public Object take() throws InterruptedException {
153         logger.debug("take() - start");
154 
155     if (Thread.interrupted()) throw new InterruptedException();
156     takeGuard_.acquire();
157     try {
158       Object x = extract();
159       putGuard_.release();
160       return x;
161     }
162     catch (ClassCastException ex) {
163       takeGuard_.release();
164       throw ex;
165     }
166   }
167 
168   public Object poll(long msecs) throws InterruptedException {
169         logger.debug("poll(msecs=" + msecs + ") - start");
170 
171     if (Thread.interrupted()) throw new InterruptedException();
172     if (!takeGuard_.attempt(msecs))
173       return null;
174     else {
175       try {
176         Object x = extract();
177         putGuard_.release();
178         return x;
179       }
180       catch (ClassCastException ex) {
181         takeGuard_.release();
182         throw ex;
183       }
184     }
185   }
186 
187 }