00001
00002
00003
00004
00005 #include "connectionSet.h"
00006
00007 #include "connection.h"
00008 #include "node.h"
00009 #include "pipeConnection.h"
00010
00011 #include <eq/base/base.h>
00012 #include <eq/base/stdExt.h>
00013 #include <eq/base/thread.h>
00014
00015 #include <algorithm>
00016 #include <errno.h>
00017
00018 #ifdef WIN32
00019 # define EQ_SOCKET_ERROR getErrorString( _error ) << '(' << _error << ')'
00020 #else
00021 # define EQ_SOCKET_ERROR strerror( _error )
00022 #endif
00023
00024 using namespace eq::base;
00025 using namespace std;
00026
00027 namespace eq
00028 {
00029 namespace net
00030 {
00031
00032 ConnectionSet::ConnectionSet()
00033 : _error(0)
00034 , _dirty( true )
00035 {
00036
00037
00038
00039
00040 _selfConnection = new PipeConnection;
00041 if( !_selfConnection->connect( ))
00042 {
00043 EQERROR << "Could not create connection" << endl;
00044 return;
00045 }
00046 }
00047
00048 ConnectionSet::~ConnectionSet()
00049 {
00050 _connection = 0;
00051
00052 _selfConnection->close();
00053 _selfConnection = 0;
00054 }
00055
00056
00057 void ConnectionSet::_dirtyFDSet()
00058 {
00059 if( _dirty )
00060 return;
00061
00062 EQINFO << "FD set modified, restarting select" << endl;
00063 _dirty = true;
00064 if( !_selfConnection->hasData( ))
00065 {
00066 const char c = SELF_INTERRUPT;
00067 _selfConnection->send( &c, 1, true );
00068 }
00069 }
00070
00071 void ConnectionSet::interrupt()
00072 {
00073 if( !_selfConnection->hasData( ))
00074 {
00075 const char c = SELF_INTERRUPT;
00076 _selfConnection->send( &c, 1, true );
00077 }
00078 }
00079
00080 void ConnectionSet::addConnection( ConnectionPtr connection )
00081 {
00082 EQASSERT( connection->isConnected() || connection->isListening( ));
00083
00084 _mutex.set();
00085 _connections.push_back( connection );
00086 connection->addListener( this );
00087 _mutex.unset();
00088 _dirtyFDSet();
00089 }
00090
00091 bool ConnectionSet::removeConnection( ConnectionPtr connection )
00092 {
00093 {
00094 ScopedMutex< SpinLock > mutex( _mutex );
00095 ConnectionVector::iterator i = find( _connections.begin(),
00096 _connections.end(), connection );
00097 if( i == _connections.end( ))
00098 return false;
00099
00100 connection->removeListener( this );
00101 _connections.erase( i );
00102 }
00103
00104 if( _connection == connection )
00105 _connection = 0;
00106
00107 _dirtyFDSet();
00108 return true;
00109 }
00110
00111 void ConnectionSet::clear()
00112 {
00113 _connection = 0;
00114 for( ConnectionVector::iterator i = _connections.begin();
00115 i != _connections.end(); ++i )
00116
00117 (*i)->removeListener( this );
00118
00119 _connections.clear();
00120 _dirtyFDSet();
00121 _fdSet.clear();
00122 _fdSetConnections.clear();
00123 }
00124
00125 ConnectionSet::Event ConnectionSet::select( const int timeout )
00126 {
00127 while( true )
00128 {
00129 _connection = 0;
00130 _error = 0;
00131
00132 if( !_setupFDSet( ))
00133 return EVENT_INVALID_HANDLE;
00134
00135
00136 #ifdef WIN32
00137 const DWORD waitTime = timeout > 0 ? timeout : INFINITE;
00138 const DWORD ret = WaitForMultipleObjectsEx( _fdSet.size, _fdSet.data,
00139 FALSE, waitTime, TRUE );
00140 #else
00141 const int ret = poll( _fdSet.data, _fdSet.size, timeout );
00142 #endif
00143 switch( ret )
00144 {
00145 case Connection::SELECT_TIMEOUT:
00146 return EVENT_TIMEOUT;
00147
00148 case Connection::SELECT_ERROR:
00149 #ifdef WIN32
00150 _error = GetLastError();
00151 if( _error == WSA_INVALID_HANDLE )
00152 {
00153 _dirty = true;
00154 break;
00155 }
00156 #else
00157 if( errno == EINTR )
00158 break;
00159
00160 _error = errno;
00161 #endif
00162
00163 EQERROR << "Error during select: " << EQ_SOCKET_ERROR << endl;
00164 return EVENT_SELECT_ERROR;
00165
00166 default:
00167 {
00168 Event event = _getSelectResult( ret );
00169
00170 if( event == EVENT_NONE )
00171 break;
00172
00173 if( _connection == _selfConnection.get( ))
00174 {
00175 EQASSERT( event == EVENT_DATA );
00176 event = _handleSelfCommand();
00177 if( event == EVENT_NONE )
00178 break;
00179 return event;
00180 }
00181
00182 if( event == EVENT_DATA && _connection->isListening( ))
00183 event = EVENT_CONNECT;
00184
00185 EQVERB << "selected connection " << _connection << " of "
00186 << _fdSetConnections.size << ", event " << event
00187 << endl;
00188 return event;
00189 }
00190 }
00191 }
00192 }
00193
00194 ConnectionSet::Event ConnectionSet::_getSelectResult( const uint32_t index )
00195 {
00196 #ifdef WIN32
00197 const uint32_t i = index - WAIT_OBJECT_0;
00198 _connection = _fdSetConnections[i];
00199
00200 EQASSERT( _fdSet[i] == _connection->getReadNotifier( ));
00201 return EVENT_DATA;
00202 #else
00203 for( size_t i = 0; i < _fdSet.size; ++i )
00204 {
00205 const pollfd& pollFD = _fdSet[i];
00206 if( pollFD.revents == 0 )
00207 continue;
00208
00209 const int pollEvents = pollFD.revents;
00210 EQASSERT( pollFD.fd > 0 );
00211
00212 _connection = _fdSetConnections[i];
00213 EQASSERT( _connection.isValid( ));
00214
00215 EQVERB << "Got event on connection @" << (void*)_connection.get()<<endl;
00216
00217 if( pollEvents & POLLERR )
00218 {
00219 EQINFO << "Error during poll()" << endl;
00220 return EVENT_ERROR;
00221 }
00222
00223 if( pollEvents & POLLHUP )
00224 return EVENT_DISCONNECT;
00225
00226 if( pollEvents & POLLNVAL )
00227 return EVENT_DISCONNECT;
00228
00229
00230
00231
00232
00233 if( pollEvents & POLLIN || pollEvents & POLLPRI )
00234 return EVENT_DATA;
00235
00236 EQERROR << "Unhandled poll event(s): " << pollEvents <<endl;
00237 ::abort();
00238 }
00239 return EVENT_NONE;
00240 #endif
00241 }
00242
00243 ConnectionSet::Event ConnectionSet::_handleSelfCommand()
00244 {
00245 char c = 0;
00246 _connection->recv( &c, 1 );
00247 _connection = 0;
00248
00249 switch( c )
00250 {
00251 case SELF_INTERRUPT:
00252 return EVENT_INTERRUPT;
00253
00254 default:
00255 EQUNIMPLEMENTED;
00256 return EVENT_NONE;
00257 }
00258 }
00259
00260 bool ConnectionSet::_setupFDSet()
00261 {
00262 if( !_dirty )
00263 {
00264 #ifndef WIN32
00265
00266
00267 _fdSet = _fdSetCopy;
00268 #endif
00269 return true;
00270 }
00271
00272 _dirty = false;
00273 _fdSet.size = 0;
00274 _fdSetConnections.size = 0;
00275
00276 #ifdef WIN32
00277
00278 HANDLE readHandle = _selfConnection->getReadNotifier();
00279 EQASSERT( readHandle );
00280
00281 _fdSet.append( readHandle );
00282 _fdSetConnections.append( _selfConnection.get( ));
00283
00284
00285 _mutex.set();
00286 for( vector< ConnectionPtr >::const_iterator i = _connections.begin();
00287 i != _connections.end(); ++i )
00288 {
00289 ConnectionPtr connection = *i;
00290 readHandle = connection->getReadNotifier();
00291
00292 if( !readHandle )
00293 {
00294 EQWARN << "Cannot select connection " << connection
00295 << ", connection does not provide a read handle" << endl;
00296 _connection = connection;
00297 _mutex.unset();
00298 return false;
00299 }
00300
00301 _fdSet.append( readHandle );
00302 _fdSetConnections.append( connection.get( ));
00303 }
00304 _mutex.unset();
00305 #else
00306 pollfd fd;
00307 fd.events = POLLIN;
00308
00309
00310 fd.fd = _selfConnection->getReadNotifier();
00311 EQASSERT( fd.fd > 0 );
00312 fd.revents = 0;
00313
00314 _fdSet.append( fd );
00315 _fdSetConnections.append( _selfConnection.get( ));
00316
00317
00318 _mutex.set();
00319 for( vector< ConnectionPtr >::const_iterator i = _connections.begin();
00320 i != _connections.end(); ++i )
00321 {
00322 ConnectionPtr connection = *i;
00323 fd.fd = connection->getReadNotifier();
00324
00325 if( fd.fd <= 0 )
00326 {
00327 EQWARN << "Cannot select connection " << connection
00328 << ", connection " << typeid( *connection.get( )).name()
00329 << " does not use a file descriptor" << endl;
00330 _connection = connection;
00331 _mutex.unset();
00332 return false;
00333 }
00334
00335 EQVERB << "Listening on " << typeid( *connection.get( )).name()
00336 << " @" << (void*)connection.get() << endl;
00337 fd.revents = 0;
00338
00339 _fdSet.append( fd );
00340 _fdSetConnections.append( connection.get( ));
00341 }
00342 _mutex.unset();
00343 _fdSetCopy = _fdSet;
00344 #endif
00345
00346 return true;
00347 }
00348
00349 EQ_EXPORT std::ostream& operator << ( std::ostream& os,
00350 const ConnectionSet* set)
00351 {
00352 const ConnectionVector& connections = set->getConnections();
00353
00354 os << "connection set " << (void*)set << ", " << connections.size()
00355 << " connections";
00356
00357 for( ConnectionVector::const_iterator i = connections.begin();
00358 i != connections.end(); ++i )
00359 {
00360 os << endl << " " << (*i).get();
00361 }
00362
00363 return os;
00364 }
00365
00366 EQ_EXPORT std::ostream& operator << ( std::ostream& os,
00367 const ConnectionSet::Event event )
00368 {
00369 if( event >= ConnectionSet::EVENT_ALL )
00370 os << "unknown (" << static_cast<unsigned>( event ) << ')';
00371 else
00372 os << ( event == ConnectionSet::EVENT_NONE ? "none" :
00373 event == ConnectionSet::EVENT_CONNECT ? "connect" :
00374 event == ConnectionSet::EVENT_DISCONNECT ? "disconnect" :
00375 event == ConnectionSet::EVENT_DATA ? "data" :
00376 event == ConnectionSet::EVENT_TIMEOUT ? "timeout" :
00377 event == ConnectionSet::EVENT_INTERRUPT ? "interrupted" :
00378 event == ConnectionSet::EVENT_ERROR ? "error" :
00379 event == ConnectionSet::EVENT_SELECT_ERROR ? "select error" :
00380 event == ConnectionSet::EVENT_INVALID_HANDLE ? "invalid handle":
00381 "ERROR" );
00382
00383 return os;
00384 }
00385
00386 }
00387 }