dataOStream.cpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "dataOStream.h"
00019
00020 #include "node.h"
00021 #include "types.h"
00022
00023 using namespace eq::base;
00024
00025 namespace eq
00026 {
00027 namespace net
00028 {
00029
00030 uint64_t DataOStream::_highWaterMark = 4096;
00031
00032 DataOStream::DataOStream()
00033 : _bufferStart( 0 )
00034 , _enabled( false )
00035 , _dataSent( false )
00036 , _buffered( true )
00037 , _save( false )
00038 {
00039 }
00040
00041 DataOStream::~DataOStream()
00042 {
00043
00044 EQASSERT( !_enabled );
00045 }
00046
00047 void DataOStream::enable( const NodeVector& receivers )
00048 {
00049 for( NodeVector::const_iterator i = receivers.begin();
00050 i != receivers.end(); ++i )
00051 {
00052 NodePtr node = *i;
00053 ConnectionPtr connection = node->getConnection();
00054
00055 connection->lockSend();
00056 _connections.push_back( connection );
00057 }
00058
00059 enable();
00060 }
00061
00062 void DataOStream::enable( const NodePtr node )
00063 {
00064 ConnectionPtr connection = node->getConnection();
00065
00066 connection->lockSend();
00067 _connections.push_back( connection );
00068 enable();
00069 }
00070
00071 void DataOStream::enable( const ConnectionVector& receivers )
00072 {
00073 for( ConnectionVector::const_iterator i = receivers.begin();
00074 i != receivers.end(); ++i )
00075 {
00076 ConnectionPtr connection = *i;
00077
00078 connection->lockSend();
00079 _connections.push_back( connection );
00080 }
00081
00082 enable();
00083 }
00084
00085 void DataOStream::enable()
00086 {
00087 EQASSERT( !_enabled );
00088
00089 _bufferStart = 0;
00090 _dataSent = false;
00091 _buffered = true;
00092 _buffer.setSize( 0 );
00093 _enabled = true;
00094 }
00095
00096 void DataOStream::resend( const NodePtr node )
00097 {
00098 EQASSERT( !_enabled );
00099 EQASSERT( _connections.empty( ));
00100 EQASSERT( _save );
00101
00102 ConnectionPtr connection = node->getConnection();
00103 connection->lockSend();
00104 _connections.push_back( connection );
00105
00106 sendSingle( _buffer.getData(), _buffer.getSize() );
00107
00108 _connections.clear();
00109 connection->unlockSend();
00110 }
00111
00112 void DataOStream::disable()
00113 {
00114 if( !_enabled )
00115 return;
00116
00117 if( _dataSent )
00118 {
00119 if( !_connections.empty( ))
00120 sendFooter( _buffer.getData() + _bufferStart,
00121 _buffer.getSize() - _bufferStart );
00122
00123 _dataSent = true;
00124 }
00125 else if( _buffer.getSize() > 0 )
00126 {
00127 EQASSERT( _bufferStart == 0 );
00128 if( !_connections.empty( ))
00129 sendSingle( _buffer.getData(), _buffer.getSize() );
00130
00131 _dataSent = true;
00132 }
00133
00134 _resetStart();
00135 _enabled = false;
00136 _unlockConnections();
00137 }
00138
00139 void DataOStream::_unlockConnections()
00140 {
00141 for( ConnectionVector::const_iterator i = _connections.begin();
00142 i != _connections.end(); ++i )
00143 {
00144 ConnectionPtr connection = *i;
00145 connection->unlockSend();
00146 }
00147 _connections.clear();
00148 }
00149
00150 void DataOStream::enableBuffering()
00151 {
00152 _buffered = true;
00153 }
00154
00155 void DataOStream::disableBuffering()
00156 {
00157 if( !_buffered )
00158 return;
00159
00160 _buffered = false;
00161 flush();
00162 }
00163
00164 void DataOStream::enableSave()
00165 {
00166 EQASSERTINFO( !_enabled || ( !_dataSent && _buffer.getSize() == 0 ),
00167 "Can't enable saving after data has been written" );
00168 _save = true;
00169 }
00170
00171 void DataOStream::disableSave()
00172 {
00173 EQASSERTINFO( !_enabled || (!_dataSent && _buffer.getSize() == 0 ),
00174 "Can't disable saving after data has been written" );
00175 _save = false;
00176 }
00177
00178 void DataOStream::write( const void* data, uint64_t size )
00179 {
00180 EQASSERT( _enabled );
00181 if( _buffered || _save )
00182 _buffer.append( static_cast< const uint8_t* >( data ), size );
00183
00184 if( !_buffered )
00185 {
00186 _sendBuffer( data, size );
00187 return;
00188 }
00189
00190 if( _buffer.getSize() - _bufferStart > _highWaterMark )
00191 flush();
00192 }
00193
00194 void DataOStream::writeOnce( const void* data, uint64_t size )
00195 {
00196 EQASSERT( _enabled );
00197 EQASSERT( !_dataSent );
00198 EQASSERT( _bufferStart == 0 );
00199
00200 if( _save )
00201 _buffer.append( static_cast< const uint8_t* >( data ), size );
00202
00203 if( !_connections.empty( ))
00204 sendSingle( data, size );
00205
00206 _resetStart();
00207 _enabled = false;
00208 _dataSent = true;
00209 _unlockConnections();
00210 }
00211
00212 void DataOStream::flush()
00213 {
00214 EQASSERT( _enabled );
00215 _sendBuffer( _buffer.getData() + _bufferStart,
00216 _buffer.getSize() - _bufferStart );
00217 _resetStart();
00218 }
00219
00220 void DataOStream::_resetStart()
00221 {
00222 if( _save )
00223 _bufferStart = _buffer.getSize();
00224 else
00225 {
00226 _bufferStart = 0;
00227 _buffer.setSize( 0 );
00228 }
00229 }
00230
00231 void DataOStream::_sendBuffer( const void* data, const uint64_t size )
00232 {
00233 EQASSERT( _enabled );
00234 if( size == 0 )
00235 return;
00236
00237 if( !_dataSent )
00238 {
00239 if( !_connections.empty( ))
00240 sendHeader( data, size );
00241 _dataSent = true;
00242 return;
00243 }
00244
00245 if( !_connections.empty( ))
00246 sendBuffer( data, size );
00247 }
00248 }
00249 }