connectionSet.cpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "connectionSet.h"
00019
00020 #include "connection.h"
00021 #include "node.h"
00022 #include "pipeConnection.h"
00023
00024 #include <eq/base/base.h>
00025 #include <eq/base/stdExt.h>
00026 #include <eq/base/thread.h>
00027
00028 #include <algorithm>
00029 #include <errno.h>
00030
00031 #ifdef WIN32
00032 # define EQ_SOCKET_ERROR getErrorString( _error ) << '(' << _error << ')'
00033 #else
00034 # define EQ_SOCKET_ERROR strerror( _error )
00035 #endif
00036
00037 using namespace eq::base;
00038
00039 #define SELF_INTERRUPT 42
00040
00041 #ifdef WIN32
00042 # define SELECT_TIMEOUT WAIT_TIMEOUT
00043 # define SELECT_ERROR WAIT_FAILED
00044 #else
00045 # define SELECT_TIMEOUT 0
00046 # define SELECT_ERROR -1
00047 #endif
00048
00049 namespace eq
00050 {
00051 namespace net
00052 {
00053
00054 ConnectionSet::ConnectionSet()
00055 : _selfCommand( 0 )
00056 , _error( 0 )
00057 , _dirty( true )
00058 {
00059
00060
00061
00062
00063 _selfConnection = new PipeConnection;
00064 if( !_selfConnection->connect( ))
00065 {
00066 EQERROR << "Could not create connection" << std::endl;
00067 return;
00068 }
00069 _selfConnection->recvNB( &_selfCommand, sizeof( _selfCommand ));
00070 }
00071
00072 ConnectionSet::~ConnectionSet()
00073 {
00074 _connection = 0;
00075
00076 _selfConnection->close();
00077 _selfConnection = 0;
00078 }
00079
00080
00081 void ConnectionSet::_dirtyFDSet()
00082 {
00083 if( _dirty )
00084 return;
00085
00086 EQINFO << "FD set modified, restarting select" << std::endl;
00087 _dirty = true;
00088
00089 interrupt();
00090 }
00091
00092 void ConnectionSet::interrupt()
00093 {
00094 const char c = SELF_INTERRUPT;
00095 _selfConnection->send( &c, 1, true );
00096 }
00097
00098 void ConnectionSet::addConnection( ConnectionPtr connection )
00099 {
00100 EQASSERT( connection->isConnected() || connection->isListening( ));
00101 {
00102 base::ScopedMutex mutex( _mutex );
00103 EQASSERTINFO( _connections.size() < 63,
00104 "Connection set can't handle more than 63 connections" );
00105 _connections.push_back( connection );
00106 connection->addListener( this );
00107 }
00108 _dirtyFDSet();
00109 }
00110
00111 bool ConnectionSet::removeConnection( ConnectionPtr connection )
00112 {
00113 {
00114 ScopedMutex mutex( _mutex );
00115 ConnectionVector::iterator i = find( _connections.begin(),
00116 _connections.end(), connection );
00117 if( i == _connections.end( ))
00118 return false;
00119
00120 connection->removeListener( this );
00121 _connections.erase( i );
00122 }
00123
00124 if( _connection == connection )
00125 _connection = 0;
00126
00127 _dirtyFDSet();
00128 return true;
00129 }
00130
00131 void ConnectionSet::clear()
00132 {
00133 _connection = 0;
00134 for( ConnectionVector::iterator i = _connections.begin();
00135 i != _connections.end(); ++i )
00136
00137 (*i)->removeListener( this );
00138
00139 _connections.clear();
00140 _dirtyFDSet();
00141 _fdSet.clear();
00142 _fdSetConnections.clear();
00143 }
00144
00145 ConnectionSet::Event ConnectionSet::select( const int timeout )
00146 {
00147 while( true )
00148 {
00149 _connection = 0;
00150 _error = 0;
00151
00152 if( !_setupFDSet( ))
00153 return EVENT_INVALID_HANDLE;
00154
00155
00156 #ifdef WIN32
00157 const DWORD waitTime = timeout > 0 ? timeout : INFINITE;
00158 const DWORD ret = WaitForMultipleObjectsEx( _fdSet.getSize(),
00159 _fdSet.getData(),
00160 FALSE, waitTime, TRUE );
00161 #else
00162 const int ret = poll( _fdSet.getData(), _fdSet.getSize(), timeout );
00163 #endif
00164 switch( ret )
00165 {
00166 case SELECT_TIMEOUT:
00167 return EVENT_TIMEOUT;
00168
00169 case SELECT_ERROR:
00170 #ifdef WIN32
00171 _error = GetLastError();
00172 if( _error == WSA_INVALID_HANDLE )
00173 {
00174 _dirty = true;
00175 break;
00176 }
00177 #else
00178 if( errno == EINTR )
00179 break;
00180
00181 _error = errno;
00182 #endif
00183
00184 EQERROR << "Error during select: " << EQ_SOCKET_ERROR << std::endl;
00185 return EVENT_SELECT_ERROR;
00186
00187 default:
00188 {
00189 Event event = _getSelectResult( ret );
00190
00191 if( event == EVENT_NONE )
00192 break;
00193
00194 if( _connection == _selfConnection )
00195 {
00196 EQASSERT( event == EVENT_DATA );
00197 event = _handleSelfCommand();
00198 if( event == EVENT_NONE )
00199 break;
00200 return event;
00201 }
00202
00203 if( event == EVENT_DATA && _connection->isListening( ))
00204 event = EVENT_CONNECT;
00205
00206 EQVERB << "selected connection " << _connection << " of "
00207 << _fdSetConnections.getSize() << ", event " << event
00208 << std::endl;
00209 return event;
00210 }
00211 }
00212 }
00213 }
00214
00215 ConnectionSet::Event ConnectionSet::_getSelectResult( const uint32_t index )
00216 {
00217 #ifdef WIN32
00218 const uint32_t i = index - WAIT_OBJECT_0;
00219 if( i >= MAXIMUM_WAIT_OBJECTS )
00220 return EVENT_INTERRUPT;
00221
00222 _connection = _fdSetConnections[i];
00223 EQASSERT( _fdSet[i] == _connection->getNotifier( ));
00224
00225 return EVENT_DATA;
00226 #else
00227 for( size_t i = 0; i < _fdSet.getSize(); ++i )
00228 {
00229 const pollfd& pollFD = _fdSet[i];
00230 if( pollFD.revents == 0 )
00231 continue;
00232
00233 const int pollEvents = pollFD.revents;
00234 EQASSERT( pollFD.fd > 0 );
00235
00236 _connection = _fdSetConnections[i];
00237 EQASSERT( _connection.isValid( ));
00238
00239 EQVERB << "Got event on connection @" << (void*)_connection.get()
00240 << std::endl;
00241
00242 if( pollEvents & POLLERR )
00243 {
00244 EQINFO << "Error during poll()" << std::endl;
00245 return EVENT_ERROR;
00246 }
00247
00248 if( pollEvents & POLLHUP )
00249 return EVENT_DISCONNECT;
00250
00251 if( pollEvents & POLLNVAL )
00252 return EVENT_DISCONNECT;
00253
00254
00255
00256
00257
00258 if( pollEvents & POLLIN || pollEvents & POLLPRI )
00259 return EVENT_DATA;
00260
00261 EQERROR << "Unhandled poll event(s): " << pollEvents << std::endl;
00262 ::abort();
00263 }
00264 return EVENT_NONE;
00265 #endif
00266 }
00267
00268 ConnectionSet::Event ConnectionSet::_handleSelfCommand()
00269 {
00270 EQASSERT( _connection == _selfConnection );
00271 _connection = 0;
00272
00273 _selfConnection->recvSync( 0, 0 );
00274 const uint8_t command( _selfCommand );
00275 _selfConnection->recvNB( &_selfCommand, sizeof( _selfCommand ));
00276
00277 switch( command )
00278 {
00279 case SELF_INTERRUPT:
00280 return EVENT_INTERRUPT;
00281
00282 default:
00283 EQUNIMPLEMENTED;
00284 return EVENT_NONE;
00285 }
00286 }
00287
00288 bool ConnectionSet::_setupFDSet()
00289 {
00290 if( !_dirty )
00291 {
00292 #ifndef WIN32
00293
00294
00295 _fdSet = _fdSetCopy;
00296 #endif
00297 return true;
00298 }
00299
00300 _dirty = false;
00301 _fdSet.setSize( 0 );
00302 _fdSetConnections.setSize( 0 );
00303
00304 #ifdef WIN32
00305
00306 HANDLE readHandle = _selfConnection->getNotifier();
00307 EQASSERT( readHandle );
00308
00309 _fdSet.append( readHandle );
00310 _fdSetConnections.append( _selfConnection.get( ));
00311
00312
00313 _mutex.set();
00314 for( ConnectionVector::const_iterator i = _connections.begin();
00315 i != _connections.end(); ++i )
00316 {
00317 ConnectionPtr connection = *i;
00318 readHandle = connection->getNotifier();
00319
00320 if( !readHandle )
00321 {
00322 EQINFO << "Cannot select connection " << connection
00323 << ", connection does not provide a read handle" << std::endl;
00324 _connection = connection;
00325 _mutex.unset();
00326 return false;
00327 }
00328
00329 _fdSet.append( readHandle );
00330 _fdSetConnections.append( connection.get( ));
00331 }
00332 _mutex.unset();
00333 #else
00334 pollfd fd;
00335 fd.events = POLLIN;
00336
00337
00338 fd.fd = _selfConnection->getNotifier();
00339 EQASSERT( fd.fd > 0 );
00340 fd.revents = 0;
00341
00342 _fdSet.append( fd );
00343 _fdSetConnections.append( _selfConnection.get( ));
00344
00345
00346 _mutex.set();
00347 for( ConnectionVector::const_iterator i = _connections.begin();
00348 i != _connections.end(); ++i )
00349 {
00350 ConnectionPtr connection = *i;
00351 fd.fd = connection->getNotifier();
00352
00353 if( fd.fd <= 0 )
00354 {
00355 EQINFO << "Cannot select connection " << connection
00356 << ", connection " << typeid( *connection.get( )).name()
00357 << " does not use a file descriptor" << std::endl;
00358 _connection = connection;
00359 _mutex.unset();
00360 return false;
00361 }
00362
00363 EQVERB << "Listening on " << typeid( *connection.get( )).name()
00364 << " @" << (void*)connection.get() << std::endl;
00365 fd.revents = 0;
00366
00367 _fdSet.append( fd );
00368 _fdSetConnections.append( connection.get( ));
00369 }
00370 _mutex.unset();
00371 _fdSetCopy = _fdSet;
00372 #endif
00373
00374 return true;
00375 }
00376
00377 EQ_EXPORT std::ostream& operator << ( std::ostream& os,
00378 const ConnectionSet* set)
00379 {
00380 const ConnectionVector& connections = set->getConnections();
00381
00382 os << "connection set " << (void*)set << ", " << connections.size()
00383 << " connections";
00384
00385 for( ConnectionVector::const_iterator i = connections.begin();
00386 i != connections.end(); ++i )
00387 {
00388 os << std::endl << " " << (*i).get();
00389 }
00390
00391 return os;
00392 }
00393
00394 EQ_EXPORT std::ostream& operator << ( std::ostream& os,
00395 const ConnectionSet::Event event )
00396 {
00397 if( event >= ConnectionSet::EVENT_ALL )
00398 os << "unknown (" << static_cast<unsigned>( event ) << ')';
00399 else
00400 os << ( event == ConnectionSet::EVENT_NONE ? "none" :
00401 event == ConnectionSet::EVENT_CONNECT ? "connect" :
00402 event == ConnectionSet::EVENT_DISCONNECT ? "disconnect" :
00403 event == ConnectionSet::EVENT_DATA ? "data" :
00404 event == ConnectionSet::EVENT_TIMEOUT ? "timeout" :
00405 event == ConnectionSet::EVENT_INTERRUPT ? "interrupted" :
00406 event == ConnectionSet::EVENT_ERROR ? "error" :
00407 event == ConnectionSet::EVENT_SELECT_ERROR ? "select error" :
00408 event == ConnectionSet::EVENT_INVALID_HANDLE ? "invalid handle":
00409 "ERROR" );
00410
00411 return os;
00412 }
00413
00414 }
00415 }