dataOStream.cpp

00001 
00002 /* Copyright (c) 2007-2009, Stefan Eilemann <eile@equalizergraphics.com> 
00003  *
00004  * This library is free software; you can redistribute it and/or modify it under
00005  * the terms of the GNU Lesser General Public License version 2.1 as published
00006  * by the Free Software Foundation.
00007  *  
00008  * This library is distributed in the hope that it will be useful, but WITHOUT
00009  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00010  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00011  * details.
00012  * 
00013  * You should have received a copy of the GNU Lesser General Public License
00014  * along with this library; if not, write to the Free Software Foundation, Inc.,
00015  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00016  */
00017 
00018 #include "dataOStream.h"
00019 
00020 #include "node.h"
00021 #include "types.h"
00022 
00023 using namespace eq::base;
00024 
00025 namespace eq
00026 {
00027 namespace net
00028 {
00029 
00030 uint64_t DataOStream::_highWaterMark = 4096;
00031 
00032 DataOStream::DataOStream()
00033         : _bufferStart( 0 )
00034         , _enabled( false )
00035         , _dataSent( false )
00036         , _buffered( true )
00037         , _save( false )
00038 {
00039 }
00040 
00041 DataOStream::~DataOStream()
00042 {
00043     // Can't call disable() from destructor since it uses virtual functions
00044     EQASSERT( !_enabled );
00045 }
00046 
00047 void DataOStream::enable( const NodeVector& receivers )
00048 {
00049     for( NodeVector::const_iterator i = receivers.begin(); 
00050          i != receivers.end(); ++i )
00051     {
00052         NodePtr       node       = *i;
00053         ConnectionPtr connection = node->getConnection();
00054         
00055         connection->lockSend();
00056         _connections.push_back( connection );
00057     }
00058 
00059     enable();
00060 }
00061 
00062 void DataOStream::enable( const NodePtr node )
00063 {
00064     ConnectionPtr connection = node->getConnection();
00065         
00066     connection->lockSend();
00067     _connections.push_back( connection );
00068     enable();
00069 }
00070 
00071 void DataOStream::enable( const ConnectionVector& receivers )
00072 {
00073     for( ConnectionVector::const_iterator i = receivers.begin(); 
00074          i != receivers.end(); ++i )
00075     {
00076         ConnectionPtr connection = *i;
00077         
00078         connection->lockSend();
00079         _connections.push_back( connection );
00080     }
00081 
00082     enable();
00083 }
00084 
00085 void DataOStream::enable()
00086 {
00087     EQASSERT( !_enabled );
00088 
00089     _bufferStart = 0;
00090     _dataSent = false;
00091     _buffered = true;
00092     _buffer.setSize( 0 );
00093     _enabled  = true;
00094 }
00095 
00096 void DataOStream::resend( const NodePtr node )
00097 {
00098     EQASSERT( !_enabled );
00099     EQASSERT( _connections.empty( ));
00100     EQASSERT( _save );
00101     
00102     ConnectionPtr connection = node->getConnection();        
00103     connection->lockSend();
00104     _connections.push_back( connection );
00105 
00106     sendSingle( _buffer.getData(), _buffer.getSize() );
00107 
00108     _connections.clear();
00109     connection->unlockSend();
00110 }
00111 
00112 void DataOStream::disable()
00113 {
00114     if( !_enabled )
00115         return;
00116 
00117     if( _dataSent )
00118     {
00119         if( !_connections.empty( ))
00120             sendFooter( _buffer.getData() + _bufferStart, 
00121                         _buffer.getSize() - _bufferStart );
00122 
00123         _dataSent = true;
00124     }
00125     else if( _buffer.getSize() > 0 )
00126     {
00127         EQASSERT( _bufferStart == 0 );
00128         if( !_connections.empty( ))
00129             sendSingle( _buffer.getData(), _buffer.getSize() );
00130 
00131         _dataSent = true;
00132     }
00133 
00134     _resetStart();
00135     _enabled = false;
00136     _unlockConnections();
00137 }
00138 
00139 void DataOStream::_unlockConnections()
00140 {
00141     for( ConnectionVector::const_iterator i = _connections.begin(); 
00142          i != _connections.end(); ++i )
00143     {
00144         ConnectionPtr connection = *i;
00145         connection->unlockSend();
00146     }
00147     _connections.clear();
00148 }
00149 
00150 void DataOStream::enableBuffering()
00151 {
00152     _buffered = true;
00153 }
00154 
00155 void DataOStream::disableBuffering()
00156 {
00157     if( !_buffered )
00158         return;
00159 
00160     _buffered = false;
00161     flush();
00162 }
00163 
00164 void DataOStream::enableSave()
00165 {
00166     EQASSERTINFO( !_enabled || ( !_dataSent && _buffer.getSize() == 0 ),
00167                   "Can't enable saving after data has been written" );
00168     _save = true;
00169 }
00170 
00171 void DataOStream::disableSave()
00172 {
00173     EQASSERTINFO( !_enabled || (!_dataSent && _buffer.getSize() == 0 ),
00174                   "Can't disable saving after data has been written" );
00175     _save = false;
00176 }
00177 
00178 void DataOStream::write( const void* data, uint64_t size )
00179 {
00180     EQASSERT( _enabled );
00181     if( _buffered || _save )
00182         _buffer.append( static_cast< const uint8_t* >( data ), size );
00183 
00184     if( !_buffered )
00185     {
00186         _sendBuffer( data, size );
00187         return;
00188     }
00189 
00190     if( _buffer.getSize() - _bufferStart > _highWaterMark )
00191         flush();
00192 }
00193 
00194 void DataOStream::writeOnce( const void* data, uint64_t size )
00195 {
00196     EQASSERT( _enabled );
00197     EQASSERT( !_dataSent );
00198     EQASSERT( _bufferStart == 0 );
00199 
00200     if( _save )
00201         _buffer.append( static_cast< const uint8_t* >( data ), size );
00202 
00203     if( !_connections.empty( ))
00204         sendSingle( data, size );
00205 
00206     _resetStart();
00207     _enabled = false;
00208     _dataSent = true;
00209     _unlockConnections();
00210 }
00211 
00212 void DataOStream::flush()
00213 {
00214     EQASSERT( _enabled );
00215     _sendBuffer( _buffer.getData() + _bufferStart, 
00216                  _buffer.getSize() - _bufferStart );
00217     _resetStart();
00218 }
00219 
00220 void DataOStream::_resetStart()
00221 {
00222     if( _save )
00223         _bufferStart = _buffer.getSize();
00224     else
00225     {
00226         _bufferStart = 0;
00227         _buffer.setSize( 0 );
00228     }
00229 }
00230 
00231 void DataOStream::_sendBuffer( const void* data, const uint64_t size )
00232 {
00233     EQASSERT( _enabled );
00234     if( size == 0 )
00235         return;
00236 
00237     if( !_dataSent )
00238     {
00239         if( !_connections.empty( ))
00240             sendHeader( data, size );
00241         _dataSent = true;
00242         return;
00243     }
00244 
00245     if( !_connections.empty( ))
00246         sendBuffer( data, size );
00247 }
00248 }
00249 }
Generated on Mon Aug 10 18:58:32 2009 for Equalizer 0.9 by  doxygen 1.5.8