00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "connection.h"
00019
00020 #include "connectionDescription.h"
00021 #include "connectionListener.h"
00022 #include "log.h"
00023 #include "node.h"
00024 #include "pipeConnection.h"
00025 #include "socketConnection.h"
00026 #include "namedPipeConnection.h"
00027 #include <errno.h>
00028
00029 #ifdef WIN32
00030 # define EQ_SOCKET_ERROR getErrorString( WSAGetLastError( ))
00031 # include <malloc.h>
00032 #else
00033 # define EQ_SOCKET_ERROR strerror( errno )
00034 # include <alloca.h>
00035 #endif
00036
00037 using namespace eq::base;
00038 using namespace std;
00039
00040 namespace eq
00041 {
00042 namespace net
00043 {
00044
00045 Connection::Connection()
00046 : _state( STATE_CLOSED )
00047 , _aioBuffer( 0 )
00048 , _aioBytes( 0 )
00049 {
00050 EQVERB << "New Connection @" << (void*)this << endl;
00051 }
00052
00053 Connection::~Connection()
00054 {
00055 _state = STATE_CLOSED;
00056 EQVERB << "Delete Connection @" << (void*)this << endl;
00057 }
00058
00059 ConnectionPtr Connection::create( ConnectionDescriptionPtr description )
00060 {
00061 ConnectionPtr connection;
00062 switch( description->type )
00063 {
00064 case CONNECTIONTYPE_TCPIP:
00065 case CONNECTIONTYPE_SDP:
00066 connection = new SocketConnection( description->type );
00067 break;
00068
00069 case CONNECTIONTYPE_PIPE:
00070 connection = new PipeConnection();
00071 break;
00072
00073 case CONNECTIONTYPE_NAMEDPIPE:
00074 connection = new NamedPipeConnection();
00075 break;
00076 default:
00077 EQWARN << "Connection type not implemented" << endl;
00078 return connection;
00079 }
00080
00081 connection->setDescription( description );
00082 return connection;
00083 }
00084
00085 void Connection::addListener( ConnectionListener* listener )
00086 {
00087 _listeners.push_back( listener );
00088 }
00089
00090 void Connection::removeListener( ConnectionListener* listener )
00091 {
00092 vector< ConnectionListener* >::iterator i = find( _listeners.begin(),
00093 _listeners.end(),
00094 listener );
00095 if( i != _listeners.end( ))
00096 _listeners.erase( i );
00097 }
00098
00099 void Connection::_fireStateChanged()
00100 {
00101 for( vector<ConnectionListener*>::const_iterator i= _listeners.begin();
00102 i != _listeners.end(); ++i )
00103
00104 (*i)->notifyStateChanged( this );
00105 }
00106
00107
00108
00109
00110
00111 void Connection::recvNB( void* buffer, const uint64_t bytes )
00112 {
00113 EQASSERT( !_aioBuffer );
00114 EQASSERT( !_aioBytes );
00115 EQASSERT( buffer );
00116 EQASSERT( bytes );
00117
00118 _aioBuffer = buffer;
00119 _aioBytes = bytes;
00120 readNB( buffer, bytes );
00121 }
00122
00123 bool Connection::recvSync( void** outBuffer, uint64_t* outBytes )
00124 {
00125 EQASSERT( _aioBuffer );
00126 EQASSERT( _aioBytes );
00127
00128 if( outBuffer )
00129 *outBuffer = _aioBuffer;
00130 if( outBytes )
00131 *outBytes = _aioBytes;
00132
00133 void* buffer( _aioBuffer );
00134 const uint64_t bytes( _aioBytes );
00135 _aioBuffer = 0;
00136 _aioBytes = 0;
00137
00138 if( _state != STATE_CONNECTED || !buffer || !bytes )
00139 return false;
00140
00141 uint8_t* ptr = static_cast< uint8_t* >( buffer );
00142 uint64_t bytesLeft = bytes;
00143
00144 while( bytesLeft )
00145 {
00146 const int64_t got = readSync( ptr, bytesLeft );
00147
00148 if( got < 0 )
00149 {
00150 if( outBytes )
00151 *outBytes -= bytesLeft;
00152 if( bytes == bytesLeft )
00153 EQINFO << "Read on dead connection" << endl;
00154 else
00155 EQERROR << "Error during read after " << bytes - bytesLeft
00156 << " bytes on " << typeid(*this).name() << endl;
00157 return false;
00158 }
00159 else if( got == 0 )
00160 {
00161
00162
00163 if( bytes == bytesLeft )
00164 {
00165 if( outBytes )
00166 *outBytes = 0;
00167 return false;
00168 }
00169 EQVERB << "Zero bytes read" << endl;
00170 }
00171
00172 if( bytesLeft > static_cast< uint64_t >( got ))
00173 {
00174 ptr += got;
00175 bytesLeft -= got;
00176
00177 readNB( ptr, bytesLeft );
00178 }
00179 else
00180 {
00181 EQASSERT( static_cast< uint64_t >( got ) == bytesLeft );
00182 bytesLeft = 0;
00183 }
00184 }
00185
00186 return true;
00187 }
00188
00189
00190
00191
00192 bool Connection::send( const void* buffer, const uint64_t bytes,
00193 const bool isLocked )
00194 {
00195 if( _state != STATE_CONNECTED )
00196 return false;
00197
00198
00199
00200
00201
00202
00203 ScopedMutex mutex( isLocked ? 0 : &_sendLock );
00204
00205 const uint8_t* ptr = static_cast< const uint8_t* >( buffer );
00206 uint64_t bytesLeft = bytes;
00207 while( bytesLeft )
00208 {
00209 const int64_t wrote = this->write( ptr, bytesLeft );
00210
00211 if( wrote == -1 )
00212 {
00213 EQERROR << "Error during write after " << bytes - bytesLeft
00214 << " bytes" << endl;
00215 return false;
00216 }
00217 else if( wrote == 0 )
00218 EQWARN << "Zero bytes write" << endl;
00219
00220 bytesLeft -= wrote;
00221 ptr += wrote;
00222 }
00223 return true;
00224 }
00225
00226 bool Connection::send( Packet& packet, const void* data,
00227 const uint64_t dataSize )
00228 {
00229 if( dataSize == 0 )
00230 return send( packet );
00231
00232 if( dataSize <= 8 )
00233 {
00234 memcpy( (char*)(&packet) + packet.size-8, data, dataSize );
00235 return send( packet );
00236 }
00237
00238
00239 const uint64_t headerSize = packet.size - 8;
00240 const uint64_t size = headerSize + dataSize;
00241 if( size > ASSEMBLE_THRESHOLD )
00242 {
00243
00244 packet.size = size;
00245
00246 lockSend();
00247 const bool ret = ( send( &packet, headerSize, true ) &&
00248 send( data, dataSize, true ));
00249 unlockSend();
00250 return ret;
00251 }
00252
00253
00254 char* buffer = (char*)alloca( size );
00255
00256 memcpy( buffer, &packet, headerSize );
00257 memcpy( buffer + headerSize, data, dataSize );
00258
00259 ((Packet*)buffer)->size = size;
00260 return send( buffer, size );
00261 }
00262
00263 bool Connection::send( const ConnectionVector& connections,
00264 const Packet& packet, const bool isLocked )
00265 {
00266 if( connections.empty( ))
00267 return true;
00268
00269 for( ConnectionVector::const_iterator i= connections.begin();
00270 i<connections.end(); ++i )
00271 {
00272 ConnectionPtr connection = *i;
00273 if( !connection->send( &packet, packet.size, isLocked ))
00274 return false;
00275 }
00276 return true;
00277 }
00278
00279 bool Connection::send( const ConnectionVector& connections, Packet& packet,
00280 const void* data, const uint64_t dataSize,
00281 const bool isLocked )
00282 {
00283 if( connections.empty( ))
00284 return true;
00285
00286 if( dataSize <= 8 )
00287 {
00288 if( dataSize != 0 )
00289 memcpy( (char*)(&packet) + packet.size-8, data, dataSize );
00290 return send( connections, packet, isLocked );
00291 }
00292
00293 const uint64_t headerSize = packet.size - 8;
00294 const uint64_t size = headerSize + dataSize;
00295
00296 if( size > ASSEMBLE_THRESHOLD )
00297 {
00298
00299 packet.size = size;
00300
00301 for( ConnectionVector::const_iterator i= connections.begin();
00302 i<connections.end(); ++i )
00303 {
00304 ConnectionPtr connection = *i;
00305
00306 if( !isLocked )
00307 connection->lockSend();
00308 const bool ok = (connection->send( &packet, headerSize, true ) &&
00309 connection->send( data, dataSize, true ));
00310 if( !isLocked )
00311 connection->unlockSend();
00312 if( !ok )
00313 return false;
00314 }
00315 return true;
00316 }
00317
00318 char* buffer = (char*)alloca( size );
00319 memcpy( buffer, &packet, packet.size-8 );
00320 memcpy( buffer + packet.size-8, data, dataSize );
00321
00322 ((Packet*)buffer)->size = size;
00323
00324 for( ConnectionVector::const_iterator i= connections.begin();
00325 i<connections.end(); ++i )
00326 {
00327 ConnectionPtr connection = *i;
00328 if( !connection->send( buffer, size, isLocked ))
00329 return false;
00330 }
00331
00332 return true;
00333 }
00334
00335
00336 ConnectionDescriptionPtr Connection::getDescription() const
00337 {
00338 return _description;
00339 }
00340
00341 void Connection::setDescription( ConnectionDescriptionPtr description )
00342 {
00343 EQASSERT( description.isValid( ));
00344 EQASSERTINFO( _description->type == description->type,
00345 "Wrong connection type in description" );
00346 _description = description;
00347
00348 if( _description->bandwidth > 0 )
00349 return;
00350
00351 switch( description->type )
00352 {
00353 case CONNECTIONTYPE_NAMEDPIPE:
00354 _description->bandwidth = 768000;
00355 break;
00356 case CONNECTIONTYPE_SDP:
00357 _description->bandwidth = 819200;
00358 break;
00359 case CONNECTIONTYPE_TCPIP:
00360 default:
00361 _description->bandwidth = 102400;
00362 break;
00363 }
00364 }
00365
00366 std::ostream& operator << ( std::ostream& os, const Connection* connection )
00367 {
00368 if( !connection )
00369 {
00370 os << "NULL connection";
00371 return os;
00372 }
00373
00374 Connection::State state = connection->getState();
00375 ConnectionDescriptionPtr desc = connection->getDescription();
00376
00377 os << "Connection " << (void*)connection << " type "
00378 << typeid(*connection).name() << " state "
00379 << ( state == Connection::STATE_CLOSED ? "closed" :
00380 state == Connection::STATE_CONNECTING ? "connecting" :
00381 state == Connection::STATE_CONNECTED ? "connected" :
00382 state == Connection::STATE_LISTENING ? "listening" :
00383 "unknown state" );
00384 if( desc.isValid( ))
00385 os << " description " << desc->toString();
00386
00387 return os;
00388 }
00389 }
00390 }