00001
00002
00003
00004
00005 #include "dataOStream.h"
00006
00007 #include "node.h"
00008 #include "types.h"
00009
00010 using namespace eq::base;
00011
00012 namespace eq
00013 {
00014 namespace net
00015 {
00016
00017 uint64_t DataOStream::_highWaterMark = 4096;
00018
00019 DataOStream::DataOStream()
00020 : _bufferStart( 0 )
00021 , _enabled( false )
00022 , _dataSent( false )
00023 , _buffered( true )
00024 , _save( false )
00025 {
00026 }
00027
00028 DataOStream::~DataOStream()
00029 {
00030
00031 EQASSERT( !_enabled );
00032 }
00033
00034 void DataOStream::enable( const NodeVector& receivers )
00035 {
00036 for( NodeVector::const_iterator i = receivers.begin();
00037 i != receivers.end(); ++i )
00038 {
00039 NodePtr node = *i;
00040 ConnectionPtr connection = node->getConnection();
00041
00042 connection->lockSend();
00043 _connections.push_back( connection );
00044 }
00045
00046 enable();
00047 }
00048
00049 void DataOStream::enable( const NodePtr node )
00050 {
00051 ConnectionPtr connection = node->getConnection();
00052
00053 connection->lockSend();
00054 _connections.push_back( connection );
00055 enable();
00056 }
00057
00058 void DataOStream::enable( const ConnectionVector& receivers )
00059 {
00060 for( ConnectionVector::const_iterator i = receivers.begin();
00061 i != receivers.end(); ++i )
00062 {
00063 ConnectionPtr connection = *i;
00064
00065 connection->lockSend();
00066 _connections.push_back( connection );
00067 }
00068
00069 enable();
00070 }
00071
00072 void DataOStream::enable()
00073 {
00074 EQASSERT( !_enabled );
00075
00076 _bufferStart = 0;
00077 _dataSent = false;
00078 _buffered = true;
00079 _buffer.size = 0;
00080 _enabled = true;
00081 }
00082
00083 void DataOStream::resend( const NodePtr node )
00084 {
00085 EQASSERT( !_enabled );
00086 EQASSERT( _connections.empty( ));
00087 EQASSERT( _save );
00088
00089 ConnectionPtr connection = node->getConnection();
00090 connection->lockSend();
00091 _connections.push_back( connection );
00092
00093 sendSingle( _buffer.data, _buffer.size );
00094
00095 _connections.clear();
00096 connection->unlockSend();
00097 }
00098
00099 void DataOStream::disable()
00100 {
00101 if( !_enabled )
00102 return;
00103
00104 if( _dataSent )
00105 {
00106 if( !_connections.empty( ))
00107 sendFooter( _buffer.data + _bufferStart,
00108 _buffer.size - _bufferStart );
00109
00110 _dataSent = true;
00111 }
00112 else if( _buffer.size > 0 )
00113 {
00114 EQASSERT( _bufferStart == 0 );
00115 if( !_connections.empty( ))
00116 sendSingle( _buffer.data, _buffer.size );
00117
00118 _dataSent = true;
00119 }
00120
00121 _resetStart();
00122 _enabled = false;
00123 _unlockConnections();
00124 }
00125
00126 void DataOStream::_unlockConnections()
00127 {
00128 for( ConnectionVector::const_iterator i = _connections.begin();
00129 i != _connections.end(); ++i )
00130 {
00131 ConnectionPtr connection = *i;
00132 connection->unlockSend();
00133 }
00134 _connections.clear();
00135 }
00136
00137 void DataOStream::enableBuffering()
00138 {
00139 _buffered = true;
00140 }
00141
00142 void DataOStream::disableBuffering()
00143 {
00144 if( !_buffered )
00145 return;
00146
00147 _buffered = false;
00148 flush();
00149 }
00150
00151 void DataOStream::enableSave()
00152 {
00153 EQASSERTINFO( !_enabled || ( !_dataSent && _buffer.size == 0 ),
00154 "Can't enable saving after data has been written" );
00155 _save = true;
00156 }
00157
00158 void DataOStream::disableSave()
00159 {
00160 EQASSERTINFO( !_enabled || (!_dataSent && _buffer.size == 0 ),
00161 "Can't disable saving after data has been written" );
00162 _save = false;
00163 }
00164
00165 void DataOStream::write( const void* data, uint64_t size )
00166 {
00167 EQASSERT( _enabled );
00168 if( _buffered || _save )
00169 _buffer.append( static_cast< const uint8_t* >( data ), size );
00170
00171 if( !_buffered )
00172 {
00173 _sendBuffer( data, size );
00174 return;
00175 }
00176
00177 if( _buffer.size - _bufferStart > _highWaterMark )
00178 flush();
00179 }
00180
00181 void DataOStream::writeOnce( const void* data, uint64_t size )
00182 {
00183 EQASSERT( _enabled );
00184 EQASSERT( !_dataSent );
00185 EQASSERT( _bufferStart == 0 );
00186
00187 if( _save )
00188 _buffer.append( static_cast< const uint8_t* >( data ), size );
00189
00190 if( !_connections.empty( ))
00191 sendSingle( data, size );
00192
00193 _resetStart();
00194 _enabled = false;
00195 _dataSent = true;
00196 _unlockConnections();
00197 }
00198
00199 void DataOStream::flush()
00200 {
00201 EQASSERT( _enabled );
00202 _sendBuffer( _buffer.data + _bufferStart, _buffer.size - _bufferStart );
00203 _resetStart();
00204 }
00205
00206 void DataOStream::_resetStart()
00207 {
00208 if( _save )
00209 _bufferStart = _buffer.size;
00210 else
00211 {
00212 _bufferStart = 0;
00213 _buffer.size = 0;
00214 }
00215 }
00216
00217 void DataOStream::_sendBuffer( const void* data, const uint64_t size )
00218 {
00219 EQASSERT( _enabled );
00220 if( size == 0 )
00221 return;
00222
00223 if( !_dataSent )
00224 {
00225 if( !_connections.empty( ))
00226 sendHeader( data, size );
00227 _dataSent = true;
00228 return;
00229 }
00230
00231 if( !_connections.empty( ))
00232 sendBuffer( data, size );
00233 }
00234 }
00235 }