View Javadoc
1   /*
2    *
3    * The DbUnit Database Testing Framework
4    * Copyright (C)2002-2004, DbUnit.org
5    *
6    * This library is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Lesser General Public
8    * License as published by the Free Software Foundation; either
9    * version 2.1 of the License, or (at your option) any later version.
10   *
11   * This library is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14   * Lesser General Public License for more details.
15   *
16   * You should have received a copy of the GNU Lesser General Public
17   * License along with this library; if not, write to the Free Software
18   * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19   *
20   */
21  package org.dbunit.dataset.stream;
22  
23  import org.dbunit.dataset.AbstractTable;
24  import org.dbunit.dataset.DataSetException;
25  import org.dbunit.dataset.ITable;
26  import org.dbunit.dataset.ITableIterator;
27  import org.dbunit.dataset.ITableMetaData;
28  import org.dbunit.dataset.RowOutOfBoundsException;
29  import org.dbunit.util.concurrent.BoundedBuffer;
30  import org.dbunit.util.concurrent.Channel;
31  import org.dbunit.util.concurrent.Puttable;
32  import org.dbunit.util.concurrent.Takable;
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  
36  /**
37   * Asynchronous table iterator that uses a new Thread for asynchronous processing.
38   * 
39   * @author Manuel Laflamme
40   * @author Last changed by: $Author$
41   * @version $Revision$ $Date$
42   * @since Apr 17, 2003
43   */
44  public class StreamingIterator implements ITableIterator
45  {
46  
47      /**
48       * Logger for this class
49       */
50      private static final Logger logger = LoggerFactory.getLogger(StreamingIterator.class);
51  
52      private static final Object EOD = new Object(); // end of dataset marker
53  
54      private final Takable _channel;
55      private StreamingTable _activeTable;
56      private Object _taken = null;
57      private boolean _eod = false;
58      /**
59       * Variable to store an exception that might occur in the asynchronous consumer
60       */
61  	private Exception _asyncException;
62  
63  	
64      /**
65       * Iterator that creates a table iterator by reading the input from
66       * the given source in an asynchronous way. Therefore a Thread is
67       * created.
68       * @param source The source of the data
69       * @throws DataSetException
70       */
71      public StreamingIterator(IDataSetProducer source) throws DataSetException
72      {
73          Channel channel = new BoundedBuffer(30);
74          _channel = channel;
75  
76          AsynchronousConsumer consumer = new AsynchronousConsumer(source, channel, this);
77          Thread thread = new Thread(consumer, "StreamingIterator");
78          thread.setDaemon(true);
79          thread.start();
80  
81          // Take first element from asynchronous handler
82          try
83          {
84              _taken = _channel.take();
85          }
86          catch (InterruptedException e)
87          {
88          	logger.debug("Thread '" + Thread.currentThread() + "' was interrupted");
89          	throw resolveException(e);
90          }
91      }
92  
93      private DataSetException resolveException(InterruptedException cause) throws DataSetException 
94      {
95      	String msg = "Current thread was interrupted (Thread=" + Thread.currentThread() + ")";
96      	if(this._asyncException != null)
97      	{
98              return new DataSetException(msg, this._asyncException);
99      	}
100     	else 
101     	{
102     		return new DataSetException(msg, cause);
103     	}
104 	}
105 
106 	////////////////////////////////////////////////////////////////////////////
107     // ITableIterator interface
108 
109     public boolean next() throws DataSetException
110     {
111         logger.debug("next() - start");
112 
113         // End of dataset has previously been reach
114         if (_eod)
115         {
116             return false;
117         }
118 
119         // Iterate to the end of current table.
120         while (_activeTable != null && _activeTable.next())
121             ;
122 
123         // End of dataset is reach
124         if (_taken == EOD)
125         {
126             _eod = true;
127             _activeTable = null;
128 
129             logger.debug("End of iterator.");
130             return false;
131         }
132 
133         // New table
134         if (_taken instanceof ITableMetaData)
135         {
136             _activeTable = new StreamingTable((ITableMetaData)_taken);
137             return true;
138         }
139 
140         throw new IllegalStateException(
141                 "Unexpected object taken from asyncronous handler: " + _taken);
142     }
143 
144     public ITableMetaData getTableMetaData() throws DataSetException
145     {
146         logger.debug("getTableMetaData() - start");
147 
148         return _activeTable.getTableMetaData();
149     }
150 
151     public ITable getTable() throws DataSetException
152     {
153         logger.debug("getTable() - start");
154 
155         return _activeTable;
156     }
157 
158 	private void handleException(Exception e)
159 	{
160 		// Is invoked when the asynchronous thread reports an exception
161 		this._asyncException = e;
162 	}
163 
164     ////////////////////////////////////////////////////////////////////////////
165     // StreamingTable class
166 
167     private class StreamingTable extends AbstractTable
168     {
169 
170         /**
171          * Logger for this class
172          */
173         private final Logger logger = LoggerFactory.getLogger(StreamingTable.class);
174 
175         private ITableMetaData _metaData;
176         private int _lastRow = -1;
177         private boolean _eot = false;
178         private Object[] _rowValues;
179 
180         public StreamingTable(ITableMetaData metaData)
181         {
182             _metaData = metaData;
183         }
184 
185         boolean next() throws DataSetException
186         {
187             logger.debug("next() - start");
188 
189             // End of table has previously been reach
190             if (_eot)
191             {
192                 return false;
193             }
194 
195             try
196             {
197                 _taken = _channel.take();
198                 if (!(_taken instanceof Object[]))
199                 {
200                     _eot = true;
201                     return false;
202                 }
203 
204                 _lastRow++;
205                 _rowValues = (Object[])_taken;
206                 return true;
207             }
208             catch (InterruptedException e)
209             {
210             	throw resolveException(e);
211             }
212         }
213 
214         ////////////////////////////////////////////////////////////////////////
215         // ITable interface
216 
217         public ITableMetaData getTableMetaData()
218         {
219             logger.debug("getTableMetaData() - start");
220 
221             return _metaData;
222         }
223 
224         public int getRowCount()
225         {
226             logger.debug("getRowCount() - start");
227 
228             throw new UnsupportedOperationException();
229         }
230 
231         public Object getValue(int row, String columnName) throws DataSetException
232         {
233             if(logger.isDebugEnabled())
234                 logger.debug("getValue(row={}, columnName={}) - start", Integer.toString(row), columnName);
235 
236             // Iterate up to specified row
237             while (!_eot && row > _lastRow)
238             {
239                 next();
240             }
241 
242             if (row < _lastRow)
243             {
244                 throw new UnsupportedOperationException("Cannot go backward!");
245             }
246 
247             if (_eot || row > _lastRow)
248             {
249                 throw new RowOutOfBoundsException(row + " > " + _lastRow);
250             }
251 
252             return _rowValues[getColumnIndex(columnName)];
253         }
254 
255         public String toString()
256         {
257             StringBuilder sb = new StringBuilder();
258             sb.append(getClass().getName()).append("[");
259             sb.append("_metaData=")
260                     .append(this._metaData == null ? "null" : this._metaData
261                             .toString());
262             sb.append(", _eot=").append(this._eot);
263             sb.append(", _lastRow=").append(this._lastRow);
264             sb.append(", _rowValues=").append(
265                     this._rowValues == null ? "null" : this._rowValues
266                             .toString());
267             sb.append("]");
268             return sb.toString();
269         }
270     }
271 
272     ////////////////////////////////////////////////////////////////////////////
273     // AsynchronousConsumer class
274 
275     private static class AsynchronousConsumer implements Runnable, IDataSetConsumer
276     {
277 
278         /**
279          * Logger for this class
280          */
281         private static final Logger logger = LoggerFactory.getLogger(AsynchronousConsumer.class);
282 
283         private final IDataSetProducer _producer;
284         private final Puttable _channel;
285         private final StreamingIterator _exceptionHandler;
286         private final Thread _invokerThread;
287 
288         public AsynchronousConsumer(IDataSetProducer source, Puttable channel, StreamingIterator exceptionHandler)
289         {
290             _producer = source;
291             _channel = channel;
292             _exceptionHandler = exceptionHandler;
293             _invokerThread = Thread.currentThread();
294         }
295 
296         ////////////////////////////////////////////////////////////////////////
297         // Runnable interface
298 
299         public void run()
300         {
301             logger.debug("run() - start");
302 
303             try
304             {
305                 _producer.setConsumer(this);
306                 _producer.produce();
307             }
308             catch (Exception e)
309             {
310             	_exceptionHandler.handleException(e);
311             	// Since the invoker thread probably waits tell it that we have finished here
312             	_invokerThread.interrupt();
313             }
314             
315             logger.debug("End of thread " + Thread.currentThread());
316         }
317 
318         ////////////////////////////////////////////////////////////////////////
319         // IDataSetConsumer interface
320 
321         public void startDataSet() throws DataSetException
322         {
323         }
324 
325         public void endDataSet() throws DataSetException
326         {
327             logger.debug("endDataSet() - start");
328 
329             try
330             {
331                 _channel.put(EOD);
332             }
333             catch (InterruptedException e)
334             {
335                 throw new DataSetException("Operation was interrupted");
336             }
337         }
338 
339         public void startTable(ITableMetaData metaData) throws DataSetException
340         {
341             logger.debug("startTable(metaData={}) - start", metaData);
342 
343             try
344             {
345                 _channel.put(metaData);
346             }
347             catch (InterruptedException e)
348             {
349                 throw new DataSetException("Operation was interrupted");
350             }
351         }
352 
353         public void endTable() throws DataSetException
354         {
355         }
356 
357         public void row(Object[] values) throws DataSetException
358         {
359             logger.debug("row(values={}) - start", values);
360 
361             try
362             {
363                 _channel.put(values);
364             }
365             catch (InterruptedException e)
366             {
367                 throw new DataSetException("Operation was interrupted");
368             }
369         }
370     }
371 
372 }