View Javadoc
1   /*
2     File: BoundedBuffer.java
3   
4     Originally written by Doug Lea and released into the public domain.
5     This may be used for any purposes whatsoever without acknowledgment.
6     Thanks for the assistance and support of Sun Microsystems Labs,
7     and everyone contributing, testing, and using this code.
8   
9     History:
10    Date       Who                What
11    11Jun1998  dl               Create public version
12    17Jul1998  dl               Simplified by eliminating wait counts
13    25aug1998  dl               added peek
14     5May1999  dl               replace % with conditional (slightly faster)
15  */
16  
17  package org.dbunit.util.concurrent;
18  
19  import org.slf4j.Logger;
20  import org.slf4j.LoggerFactory;
21  
22  /**
23   * Efficient array-based bounded buffer class.
24   * Adapted from CPJ, chapter 8, which describes design.
25   * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p>
26   * 
27   * @author Doug Lea
28   * @author Last changed by: $Author$
29   * @version $Revision$ $Date$
30   * @since ? (pre 2.1)
31   */
32  public class BoundedBuffer implements BoundedChannel {
33  
34      /**
35       * Logger for this class
36       */
37      private static final Logger logger = LoggerFactory.getLogger(BoundedBuffer.class);
38  
39    protected final Object[]  array_;      // the elements
40  
41    protected int takePtr_ = 0;            // circular indices
42    protected int putPtr_ = 0;       
43  
44    protected int usedSlots_ = 0;          // length
45    protected int emptySlots_;             // capacity - length
46  
47    /**
48     * Helper monitor to handle puts. 
49     **/
50    protected final Object putMonitor_ = new Object();
51  
52    /**
53     * Create a BoundedBuffer with the given capacity.
54     * @exception IllegalArgumentException if capacity less or equal to zero
55     **/
56    public BoundedBuffer(int capacity) throws IllegalArgumentException {
57      if (capacity <= 0) throw new IllegalArgumentException();
58      array_ = new Object[capacity];
59      emptySlots_ = capacity;
60    }
61  
62    /**
63     * Create a buffer with the current default capacity
64     **/
65  
66    public BoundedBuffer() { 
67      this(DefaultChannelCapacity.get()); 
68    }
69  
70    /** 
71     * Return the number of elements in the buffer.
72     * This is only a snapshot value, that may change
73     * immediately after returning.
74     **/
75    public synchronized int size() {
76   return usedSlots_; 
77   }
78  
79    public int capacity() {
80      return array_.length;
81   }
82  
83    protected void incEmptySlots() {
84      synchronized(putMonitor_) {
85        ++emptySlots_;
86        putMonitor_.notify();
87      }
88    }
89  
90    protected synchronized void incUsedSlots() {
91      ++usedSlots_;
92      notify();
93    }
94  
95    protected final void insert(Object x) {
96          logger.debug("insert(x={}) - start", x);
97   // mechanics of put
98      --emptySlots_;
99      array_[putPtr_] = x;
100     if (++putPtr_ >= array_.length) putPtr_ = 0;
101   }
102 
103   protected final Object extract() {
104         logger.debug("extract() - start");
105  // mechanics of take
106     --usedSlots_;
107     Object old = array_[takePtr_];
108     array_[takePtr_] = null;
109     if (++takePtr_ >= array_.length) takePtr_ = 0;
110     return old;
111   }
112 
113   public Object peek() {
114         logger.debug("peek() - start");
115 
116     synchronized(this) {
117       if (usedSlots_ > 0)
118         return array_[takePtr_];
119       else
120         return null;
121     }
122   }
123 
124 
125   public void put(Object x) throws InterruptedException {
126         logger.debug("put(x={}) - start", x);
127 
128     if (x == null) throw new IllegalArgumentException();
129     if (Thread.interrupted()) throw new InterruptedException();
130 
131     synchronized(putMonitor_) {
132       while (emptySlots_ <= 0) {
133 	try { putMonitor_.wait(); }
134         catch (InterruptedException ex) {
135           putMonitor_.notify();
136           throw ex;
137         }
138       }
139       insert(x);
140     }
141     incUsedSlots();
142   }
143 
144   public boolean offer(Object x, long msecs) throws InterruptedException {
145         logger.debug("offer(x={}, msecs={}) - start", x, new Long(msecs));
146 
147     if (x == null) throw new IllegalArgumentException();
148     if (Thread.interrupted()) throw new InterruptedException();
149 
150     synchronized(putMonitor_) {
151       long start = (msecs <= 0)? 0 : System.currentTimeMillis();
152       long waitTime = msecs;
153       while (emptySlots_ <= 0) {
154         if (waitTime <= 0) return false;
155 	try { putMonitor_.wait(waitTime); }
156         catch (InterruptedException ex) {
157           putMonitor_.notify();
158           throw ex;
159         }
160         waitTime = msecs - (System.currentTimeMillis() - start);
161       }
162       insert(x);
163     }
164     incUsedSlots();
165     return true;
166   }
167 
168 
169 
170   public  Object take() throws InterruptedException {
171         logger.debug("take() - start");
172  
173     if (Thread.interrupted()) throw new InterruptedException();
174     Object old = null; 
175     synchronized(this) { 
176       while (usedSlots_ <= 0) {
177         try { wait(); }
178         catch (InterruptedException ex) {
179           notify();
180           throw ex; 
181         }
182       }
183       old = extract();
184     }
185     incEmptySlots();
186     return old;
187   }
188 
189   public  Object poll(long msecs) throws InterruptedException {
190         logger.debug("poll(msecs={}) - start", new Long(msecs));
191  
192     if (Thread.interrupted()) throw new InterruptedException();
193     Object old = null; 
194     synchronized(this) { 
195       long start = (msecs <= 0)? 0 : System.currentTimeMillis();
196       long waitTime = msecs;
197       
198       while (usedSlots_ <= 0) {
199         if (waitTime <= 0) return null;
200         try { wait(waitTime); }
201         catch (InterruptedException ex) {
202           notify();
203           throw ex; 
204         }
205         waitTime = msecs - (System.currentTimeMillis() - start);
206 
207       }
208       old = extract();
209     }
210     incEmptySlots();
211     return old;
212   }
213 
214 }
215 
216