00001
00002
00003
00004
00005 #include "connection.h"
00006
00007 #include "connectionDescription.h"
00008 #include "connectionListener.h"
00009 #include "log.h"
00010 #include "node.h"
00011 #include "pipeConnection.h"
00012 #include "socketConnection.h"
00013
00014 #include <errno.h>
00015
00016 #ifdef WIN32
00017 # define EQ_SOCKET_ERROR getErrorString( WSAGetLastError( ))
00018 # include <malloc.h>
00019 #else
00020 # define EQ_SOCKET_ERROR strerror( errno )
00021 # include <alloca.h>
00022 #endif
00023
00024 using namespace eq::base;
00025 using namespace std;
00026
00027 namespace eq
00028 {
00029 namespace net
00030 {
00031
00032 Connection::Connection()
00033 : _state( STATE_CLOSED )
00034 {
00035 EQINFO << "New Connection @" << (void*)this << endl;
00036 }
00037
00038 Connection::Connection( const Connection& from )
00039 : Referenced( from )
00040 , _state( from._state )
00041 , _description( from._description )
00042 {
00043 EQINFO << "New Connection copy @" << (void*)this << endl;
00044 }
00045
00046 Connection::~Connection()
00047 {
00048 _state = STATE_CLOSED;
00049 EQINFO << "Delete Connection @" << (void*)this << endl;
00050 }
00051
00052 ConnectionPtr Connection::create( ConnectionDescriptionPtr description )
00053 {
00054 ConnectionPtr connection;
00055 switch( description->type )
00056 {
00057 case CONNECTIONTYPE_TCPIP:
00058 case CONNECTIONTYPE_SDP:
00059 connection = new SocketConnection( description->type );
00060 break;
00061
00062 case CONNECTIONTYPE_PIPE:
00063 connection = new PipeConnection();
00064 break;
00065
00066 default:
00067 EQWARN << "Connection type not implemented" << endl;
00068 return connection;
00069 }
00070
00071 connection->setDescription( description );
00072 return connection;
00073 }
00074
00075 ConnectionPtr Connection::accept( const int timeout )
00076 {
00077 if( _state != STATE_LISTENING )
00078 return 0;
00079
00080
00081 const ReadNotifier notifier = getReadNotifier();
00082
00083 if( notifier == 0 )
00084 {
00085
00086 EQWARN << "Cannot accept on connection, does not use a file descriptor"
00087 << endl;
00088 return 0;
00089 }
00090
00091 #ifdef WIN32
00092 const DWORD waitTime = timeout > 0 ? timeout : INFINITE;
00093 const DWORD ret = WaitForSingleObject( notifier, waitTime );
00094 #else
00095 const ReadNotifier fd = getReadNotifier();
00096 fd_set fdSet;
00097 FD_ZERO( &fdSet );
00098 FD_SET( fd, &fdSet );
00099
00100
00101 timeval tv;
00102 tv.tv_sec = timeout / 1000 ;
00103 tv.tv_usec = (timeout - tv.tv_sec*1000) * 1000;
00104
00105 const int ret = ::select( fd+1, &fdSet, 0, 0, timeout ? &tv : 0 );
00106 #endif
00107 switch( ret )
00108 {
00109 case SELECT_TIMEOUT:
00110 return 0;
00111
00112 case SELECT_ERROR:
00113 EQWARN << "Error during select(): " << EQ_SOCKET_ERROR << endl;
00114 return 0;
00115
00116 default:
00117 return accept();
00118 }
00119 }
00120
00121 void Connection::addListener( ConnectionListener* listener )
00122 {
00123 _listeners.push_back( listener );
00124 }
00125
00126 void Connection::removeListener( ConnectionListener* listener )
00127 {
00128 vector< ConnectionListener* >::iterator i = find( _listeners.begin(),
00129 _listeners.end(),
00130 listener );
00131 if( i != _listeners.end( ))
00132 _listeners.erase( i );
00133 }
00134
00135 void Connection::_fireStateChanged()
00136 {
00137 for( vector<ConnectionListener*>::const_iterator i= _listeners.begin();
00138 i != _listeners.end(); ++i )
00139
00140 (*i)->notifyStateChanged( this );
00141 }
00142
00143
00144
00145
00146
00147 bool Connection::recv( void* buffer, const uint64_t bytes )
00148 {
00149 EQLOG( LOG_WIRE ) << "Receiving " << bytes << " bytes on " << this << endl;
00150 if( _state != STATE_CONNECTED )
00151 return false;
00152
00153 if( bytes > 1048576 )
00154 EQLOG( LOG_NETPERF ) << "Start receive " << bytes << " bytes" << endl;
00155
00156 unsigned char* ptr = static_cast<unsigned char*>(buffer);
00157 uint64_t bytesLeft = bytes;
00158
00159 while( bytesLeft )
00160 {
00161 int64_t got = this->read( ptr, bytesLeft );
00162
00163 if( got == -1 )
00164 {
00165 if( bytes == bytesLeft )
00166 EQINFO << "Read on dead connection" << endl;
00167 else
00168 EQERROR << "Error during read after " << bytes - bytesLeft
00169 << " bytes on " << typeid(*this).name() << endl;
00170 return false;
00171 }
00172 else if( got == 0 )
00173 {
00174
00175
00176 if( bytesLeft == bytes )
00177 return false;
00178
00179 EQVERB << "Zero bytes read" << endl;
00180 }
00181
00182 if( bytes > 1048576 )
00183 EQLOG( LOG_NETPERF ) << "Got " << got << " bytes" << endl;
00184
00185 bytesLeft -= got;
00186 ptr += got;
00187 }
00188
00189 if( bytes > 1048576 )
00190 EQLOG( LOG_NETPERF ) << "End receive " << bytes << " bytes" << endl;
00191
00192 if( Log::topics & LOG_WIRE )
00193 {
00194 EQLOG( LOG_WIRE ) << disableFlush << "Received " << bytes << " bytes: ";
00195 const uint32_t printBytes = EQ_MIN( bytes, 256 );
00196 unsigned char* data = static_cast<unsigned char*>(buffer);
00197
00198 for( uint32_t i=0; i<printBytes; i++ )
00199 {
00200 if( i%4 )
00201 EQLOG( LOG_WIRE ) << " ";
00202 else if( i )
00203 EQLOG( LOG_WIRE ) << "|";
00204
00205 EQLOG( LOG_WIRE ) << static_cast<int>(data[i]);
00206 }
00207 if( printBytes < bytes )
00208 EQLOG( LOG_WIRE ) << "|...";
00209 EQLOG( LOG_WIRE ) << endl << enableFlush;
00210 }
00211
00212 return true;
00213 }
00214
00215
00216
00217
00218 bool Connection::send( const void* buffer, const uint64_t bytes,
00219 const bool isLocked ) const
00220 {
00221 if( _state != STATE_CONNECTED )
00222 return false;
00223
00224
00225
00226
00227
00228
00229 ScopedMutex<SpinLock> mutex( isLocked ? 0 : &_sendLock );
00230
00231 const unsigned char* ptr = static_cast<const unsigned char*>(buffer);
00232
00233 if( Log::topics & LOG_WIRE )
00234 {
00235 EQLOG( LOG_WIRE ) << disableFlush << "Sending " << bytes
00236 << " bytes on " << (void*)this << ":";
00237 const uint32_t printBytes = EQ_MIN( bytes, 256 );
00238
00239 for( uint32_t i=0; i<printBytes; ++i )
00240 {
00241 if( i%4 )
00242 EQLOG( LOG_WIRE ) << " ";
00243 else if( i )
00244 EQLOG( LOG_WIRE ) << "|";
00245
00246 EQLOG( LOG_WIRE ) << static_cast<int>(ptr[i]);
00247 }
00248 if( printBytes < bytes )
00249 EQLOG( LOG_WIRE ) << "|...";
00250 EQLOG( LOG_WIRE ) << endl << enableFlush;
00251 }
00252
00253 if( bytes > 1048576 )
00254 EQLOG( LOG_NETPERF ) << "Start transmit " << bytes << " bytes" << endl;
00255
00256 uint64_t bytesLeft = bytes;
00257 while( bytesLeft )
00258 {
00259 const int64_t wrote = this->write( ptr, bytesLeft );
00260 if( bytes > 1048576 )
00261 EQLOG( LOG_NETPERF ) << "Wrote " << wrote << " bytes" << endl;
00262
00263 if( wrote == -1 )
00264 {
00265 EQERROR << "Error during write after " << bytes - bytesLeft
00266 << " bytes" << endl;
00267 return false;
00268 }
00269 else if( wrote == 0 )
00270 EQWARN << "Zero bytes write" << endl;
00271
00272 bytesLeft -= wrote;
00273 ptr += wrote;
00274 }
00275
00276 if( bytes > 1048576 )
00277 EQLOG( LOG_NETPERF ) << "End transmit " << bytes << " bytes" << endl;
00278 return true;
00279 }
00280
00281 bool Connection::send( Packet& packet, const void* data,
00282 const uint64_t dataSize ) const
00283 {
00284 if( dataSize == 0 )
00285 return send( packet );
00286
00287 if( dataSize <= 8 )
00288 {
00289 memcpy( (char*)(&packet) + packet.size-8, data, dataSize );
00290 return send( packet );
00291 }
00292
00293
00294 const uint64_t headerSize = packet.size - 8;
00295 const uint64_t size = headerSize + dataSize;
00296 if( size > ASSEMBLE_THRESHOLD )
00297 {
00298
00299 packet.size = size;
00300
00301 lockSend();
00302 const bool ret = ( send( &packet, headerSize, true ) &&
00303 send( data, dataSize, true ));
00304 unlockSend();
00305 return ret;
00306 }
00307
00308
00309 char* buffer = (char*)alloca( size );
00310
00311 memcpy( buffer, &packet, headerSize );
00312 memcpy( buffer + headerSize, data, dataSize );
00313
00314 ((Packet*)buffer)->size = size;
00315 return send( buffer, size );
00316 }
00317
00318 bool Connection::send( const ConnectionVector& connections,
00319 const Packet& packet, const bool isLocked )
00320 {
00321 if( connections.empty( ))
00322 return true;
00323
00324 for( ConnectionVector::const_iterator i= connections.begin();
00325 i<connections.end(); ++i )
00326 {
00327 if( !(*i)->send( &packet, packet.size, isLocked ))
00328 return false;
00329 }
00330 return true;
00331 }
00332
00333 bool Connection::send( const ConnectionVector& connections, Packet& packet,
00334 const void* data, const uint64_t dataSize,
00335 const bool isLocked )
00336 {
00337 if( connections.empty( ))
00338 return true;
00339
00340 if( dataSize <= 8 )
00341 {
00342 if( dataSize != 0 )
00343 memcpy( (char*)(&packet) + packet.size-8, data, dataSize );
00344 return send( connections, packet, isLocked );
00345 }
00346
00347 const uint64_t headerSize = packet.size - 8;
00348 const uint64_t size = headerSize + dataSize;
00349
00350 if( size > ASSEMBLE_THRESHOLD )
00351 {
00352
00353 packet.size = size;
00354
00355 for( ConnectionVector::const_iterator i= connections.begin();
00356 i<connections.end(); ++i )
00357 {
00358 ConnectionPtr connection = *i;
00359
00360 if( !isLocked )
00361 connection->lockSend();
00362 const bool ok = (connection->send( &packet, headerSize, true ) &&
00363 connection->send( data, dataSize, true ));
00364 if( !isLocked )
00365 connection->unlockSend();
00366 if( !ok )
00367 return false;
00368 }
00369 return true;
00370 }
00371
00372 char* buffer = (char*)alloca( size );
00373 memcpy( buffer, &packet, packet.size-8 );
00374 memcpy( buffer + packet.size-8, data, dataSize );
00375
00376 ((Packet*)buffer)->size = size;
00377
00378 for( ConnectionVector::const_iterator i= connections.begin();
00379 i<connections.end(); ++i )
00380 {
00381 if( !(*i)->send( buffer, size, isLocked ))
00382 return false;
00383 }
00384
00385 return true;
00386 }
00387
00388
00389 ConnectionDescriptionPtr Connection::getDescription() const
00390 {
00391 return _description;
00392 }
00393
00394 void Connection::setDescription( ConnectionDescriptionPtr description )
00395 {
00396 EQASSERT( description.isValid( ));
00397 EQASSERTINFO( _description->type == description->type,
00398 "Wrong connection type in description" );
00399 _description = description;
00400 }
00401
00402 std::ostream& operator << ( std::ostream& os, const Connection* connection )
00403 {
00404 if( !connection )
00405 {
00406 os << "NULL connection";
00407 return os;
00408 }
00409
00410 Connection::State state = connection->getState();
00411 ConnectionDescriptionPtr desc = connection->getDescription();
00412
00413 os << "Connection " << (void*)connection << " type "
00414 << typeid(*connection).name() << " state "
00415 << ( state == Connection::STATE_CLOSED ? "closed" :
00416 state == Connection::STATE_CONNECTING ? "connecting" :
00417 state == Connection::STATE_CONNECTED ? "connected" :
00418 state == Connection::STATE_LISTENING ? "listening" :
00419 "unknown state" );
00420 if( desc.isValid( ))
00421 os << " description " << desc->toString();
00422
00423 return os;
00424 }
00425 }
00426 }