StreamingIterator.java

  1. /*
  2.  *
  3.  * The DbUnit Database Testing Framework
  4.  * Copyright (C)2002-2004, DbUnit.org
  5.  *
  6.  * This library is free software; you can redistribute it and/or
  7.  * modify it under the terms of the GNU Lesser General Public
  8.  * License as published by the Free Software Foundation; either
  9.  * version 2.1 of the License, or (at your option) any later version.
  10.  *
  11.  * This library is distributed in the hope that it will be useful,
  12.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14.  * Lesser General Public License for more details.
  15.  *
  16.  * You should have received a copy of the GNU Lesser General Public
  17.  * License along with this library; if not, write to the Free Software
  18.  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  19.  *
  20.  */
  21. package org.dbunit.dataset.stream;

  22. import org.dbunit.dataset.AbstractTable;
  23. import org.dbunit.dataset.DataSetException;
  24. import org.dbunit.dataset.ITable;
  25. import org.dbunit.dataset.ITableIterator;
  26. import org.dbunit.dataset.ITableMetaData;
  27. import org.dbunit.dataset.RowOutOfBoundsException;
  28. import org.dbunit.util.concurrent.BoundedBuffer;
  29. import org.dbunit.util.concurrent.Channel;
  30. import org.dbunit.util.concurrent.Puttable;
  31. import org.dbunit.util.concurrent.Takable;
  32. import org.slf4j.Logger;
  33. import org.slf4j.LoggerFactory;

  34. /**
  35.  * Asynchronous table iterator that uses a new Thread for asynchronous processing.
  36.  *
  37.  * @author Manuel Laflamme
  38.  * @author Last changed by: $Author$
  39.  * @version $Revision$ $Date$
  40.  * @since Apr 17, 2003
  41.  */
  42. public class StreamingIterator implements ITableIterator
  43. {

  44.     /**
  45.      * Logger for this class
  46.      */
  47.     private static final Logger logger = LoggerFactory.getLogger(StreamingIterator.class);

  48.     private static final Object EOD = new Object(); // end of dataset marker

  49.     private final Takable _channel;
  50.     private StreamingTable _activeTable;
  51.     private Object _taken = null;
  52.     private boolean _eod = false;
  53.     /**
  54.      * Variable to store an exception that might occur in the asynchronous consumer
  55.      */
  56.     private Exception _asyncException;

  57.    
  58.     /**
  59.      * Iterator that creates a table iterator by reading the input from
  60.      * the given source in an asynchronous way. Therefore a Thread is
  61.      * created.
  62.      * @param source The source of the data
  63.      * @throws DataSetException
  64.      */
  65.     public StreamingIterator(IDataSetProducer source) throws DataSetException
  66.     {
  67.         Channel channel = new BoundedBuffer(30);
  68.         _channel = channel;

  69.         AsynchronousConsumer consumer = new AsynchronousConsumer(source, channel, this);
  70.         Thread thread = new Thread(consumer, "StreamingIterator");
  71.         thread.setDaemon(true);
  72.         thread.start();

  73.         // Take first element from asynchronous handler
  74.         try
  75.         {
  76.             _taken = _channel.take();
  77.         }
  78.         catch (InterruptedException e)
  79.         {
  80.             logger.debug("Thread '" + Thread.currentThread() + "' was interrupted");
  81.             throw resolveException(e);
  82.         }
  83.     }

  84.     private DataSetException resolveException(InterruptedException cause) throws DataSetException
  85.     {
  86.         String msg = "Current thread was interrupted (Thread=" + Thread.currentThread() + ")";
  87.         if(this._asyncException != null)
  88.         {
  89.             return new DataSetException(msg, this._asyncException);
  90.         }
  91.         else
  92.         {
  93.             return new DataSetException(msg, cause);
  94.         }
  95.     }

  96.     ////////////////////////////////////////////////////////////////////////////
  97.     // ITableIterator interface

  98.     public boolean next() throws DataSetException
  99.     {
  100.         logger.debug("next() - start");

  101.         // End of dataset has previously been reach
  102.         if (_eod)
  103.         {
  104.             return false;
  105.         }

  106.         // Iterate to the end of current table.
  107.         while (_activeTable != null && _activeTable.next())
  108.             ;

  109.         // End of dataset is reach
  110.         if (_taken == EOD)
  111.         {
  112.             _eod = true;
  113.             _activeTable = null;

  114.             logger.debug("End of iterator.");
  115.             return false;
  116.         }

  117.         // New table
  118.         if (_taken instanceof ITableMetaData)
  119.         {
  120.             _activeTable = new StreamingTable((ITableMetaData)_taken);
  121.             return true;
  122.         }

  123.         throw new IllegalStateException(
  124.                 "Unexpected object taken from asyncronous handler: " + _taken);
  125.     }

  126.     public ITableMetaData getTableMetaData() throws DataSetException
  127.     {
  128.         logger.debug("getTableMetaData() - start");

  129.         return _activeTable.getTableMetaData();
  130.     }

  131.     public ITable getTable() throws DataSetException
  132.     {
  133.         logger.debug("getTable() - start");

  134.         return _activeTable;
  135.     }

  136.     private void handleException(Exception e)
  137.     {
  138.         // Is invoked when the asynchronous thread reports an exception
  139.         this._asyncException = e;
  140.     }

  141.     ////////////////////////////////////////////////////////////////////////////
  142.     // StreamingTable class

  143.     private class StreamingTable extends AbstractTable
  144.     {

  145.         /**
  146.          * Logger for this class
  147.          */
  148.         private final Logger logger = LoggerFactory.getLogger(StreamingTable.class);

  149.         private ITableMetaData _metaData;
  150.         private int _lastRow = -1;
  151.         private boolean _eot = false;
  152.         private Object[] _rowValues;

  153.         public StreamingTable(ITableMetaData metaData)
  154.         {
  155.             _metaData = metaData;
  156.         }

  157.         boolean next() throws DataSetException
  158.         {
  159.             logger.debug("next() - start");

  160.             // End of table has previously been reach
  161.             if (_eot)
  162.             {
  163.                 return false;
  164.             }

  165.             try
  166.             {
  167.                 _taken = _channel.take();
  168.                 if (!(_taken instanceof Object[]))
  169.                 {
  170.                     _eot = true;
  171.                     return false;
  172.                 }

  173.                 _lastRow++;
  174.                 _rowValues = (Object[])_taken;
  175.                 return true;
  176.             }
  177.             catch (InterruptedException e)
  178.             {
  179.                 throw resolveException(e);
  180.             }
  181.         }

  182.         ////////////////////////////////////////////////////////////////////////
  183.         // ITable interface

  184.         public ITableMetaData getTableMetaData()
  185.         {
  186.             logger.debug("getTableMetaData() - start");

  187.             return _metaData;
  188.         }

  189.         public int getRowCount()
  190.         {
  191.             logger.debug("getRowCount() - start");

  192.             throw new UnsupportedOperationException();
  193.         }

  194.         public Object getValue(int row, String columnName) throws DataSetException
  195.         {
  196.             if(logger.isDebugEnabled())
  197.                 logger.debug("getValue(row={}, columnName={}) - start", Integer.toString(row), columnName);

  198.             // Iterate up to specified row
  199.             while (!_eot && row > _lastRow)
  200.             {
  201.                 next();
  202.             }

  203.             if (row < _lastRow)
  204.             {
  205.                 throw new UnsupportedOperationException("Cannot go backward!");
  206.             }

  207.             if (_eot || row > _lastRow)
  208.             {
  209.                 throw new RowOutOfBoundsException(row + " > " + _lastRow);
  210.             }

  211.             return _rowValues[getColumnIndex(columnName)];
  212.         }

  213.         public String toString()
  214.         {
  215.             StringBuilder sb = new StringBuilder();
  216.             sb.append(getClass().getName()).append("[");
  217.             sb.append("_metaData=")
  218.                     .append(this._metaData == null ? "null" : this._metaData
  219.                             .toString());
  220.             sb.append(", _eot=").append(this._eot);
  221.             sb.append(", _lastRow=").append(this._lastRow);
  222.             sb.append(", _rowValues=").append(
  223.                     this._rowValues == null ? "null" : this._rowValues
  224.                             .toString());
  225.             sb.append("]");
  226.             return sb.toString();
  227.         }
  228.     }

  229.     ////////////////////////////////////////////////////////////////////////////
  230.     // AsynchronousConsumer class

  231.     private static class AsynchronousConsumer implements Runnable, IDataSetConsumer
  232.     {

  233.         /**
  234.          * Logger for this class
  235.          */
  236.         private static final Logger logger = LoggerFactory.getLogger(AsynchronousConsumer.class);

  237.         private final IDataSetProducer _producer;
  238.         private final Puttable _channel;
  239.         private final StreamingIterator _exceptionHandler;
  240.         private final Thread _invokerThread;

  241.         public AsynchronousConsumer(IDataSetProducer source, Puttable channel, StreamingIterator exceptionHandler)
  242.         {
  243.             _producer = source;
  244.             _channel = channel;
  245.             _exceptionHandler = exceptionHandler;
  246.             _invokerThread = Thread.currentThread();
  247.         }

  248.         ////////////////////////////////////////////////////////////////////////
  249.         // Runnable interface

  250.         public void run()
  251.         {
  252.             logger.debug("run() - start");

  253.             try
  254.             {
  255.                 _producer.setConsumer(this);
  256.                 _producer.produce();
  257.             }
  258.             catch (Exception e)
  259.             {
  260.                 _exceptionHandler.handleException(e);
  261.                 // Since the invoker thread probably waits tell it that we have finished here
  262.                 _invokerThread.interrupt();
  263.             }
  264.            
  265.             logger.debug("End of thread " + Thread.currentThread());
  266.         }

  267.         ////////////////////////////////////////////////////////////////////////
  268.         // IDataSetConsumer interface

  269.         public void startDataSet() throws DataSetException
  270.         {
  271.         }

  272.         public void endDataSet() throws DataSetException
  273.         {
  274.             logger.debug("endDataSet() - start");

  275.             try
  276.             {
  277.                 _channel.put(EOD);
  278.             }
  279.             catch (InterruptedException e)
  280.             {
  281.                 throw new DataSetException("Operation was interrupted");
  282.             }
  283.         }

  284.         public void startTable(ITableMetaData metaData) throws DataSetException
  285.         {
  286.             logger.debug("startTable(metaData={}) - start", metaData);

  287.             try
  288.             {
  289.                 _channel.put(metaData);
  290.             }
  291.             catch (InterruptedException e)
  292.             {
  293.                 throw new DataSetException("Operation was interrupted");
  294.             }
  295.         }

  296.         public void endTable() throws DataSetException
  297.         {
  298.         }

  299.         public void row(Object[] values) throws DataSetException
  300.         {
  301.             logger.debug("row(values={}) - start", values);

  302.             try
  303.             {
  304.                 _channel.put(values);
  305.             }
  306.             catch (InterruptedException e)
  307.             {
  308.                 throw new DataSetException("Operation was interrupted");
  309.             }
  310.         }
  311.     }

  312. }