connectionSet.cpp

00001 
00002 /* Copyright (c) 2005-2009, Stefan Eilemann <eile@equalizergraphics.com> 
00003  *
00004  * This library is free software; you can redistribute it and/or modify it under
00005  * the terms of the GNU Lesser General Public License version 2.1 as published
00006  * by the Free Software Foundation.
00007  *  
00008  * This library is distributed in the hope that it will be useful, but WITHOUT
00009  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00010  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00011  * details.
00012  * 
00013  * You should have received a copy of the GNU Lesser General Public License
00014  * along with this library; if not, write to the Free Software Foundation, Inc.,
00015  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00016  */
00017 
00018 #include "connectionSet.h"
00019 
00020 #include "connection.h"
00021 #include "node.h"
00022 #include "pipeConnection.h"
00023 
00024 #include <eq/base/base.h>
00025 #include <eq/base/stdExt.h>
00026 #include <eq/base/thread.h>
00027 
00028 #include <algorithm>
00029 #include <errno.h>
00030 
00031 #ifdef WIN32
00032 #  define EQ_SOCKET_ERROR getErrorString( _error ) << '(' << _error << ')'
00033 #else
00034 #  define EQ_SOCKET_ERROR strerror( _error )
00035 #endif
00036 
00037 using namespace eq::base;
00038 
00039 #define SELF_INTERRUPT 42
00040 
00041 #ifdef WIN32
00042 #  define SELECT_TIMEOUT WAIT_TIMEOUT
00043 #  define SELECT_ERROR   WAIT_FAILED
00044 #else
00045 #  define SELECT_TIMEOUT  0
00046 #  define SELECT_ERROR   -1
00047 #endif
00048 
00049 namespace eq
00050 {
00051 namespace net
00052 {
00053 
00054 ConnectionSet::ConnectionSet()
00055         : _selfCommand( 0 )
00056         , _error( 0 )
00057         , _dirty( true )
00058 {
00059     // Whenever another threads modifies the connection list while the
00060     // connection set is waiting in a select, the select is interrupted by
00061     // sending a character through this connection. select() will recognize
00062     // this and restart with the modified fd set.
00063     _selfConnection = new PipeConnection;
00064     if( !_selfConnection->connect( ))
00065     {
00066         EQERROR << "Could not create connection" << std::endl;
00067         return;
00068     }
00069     _selfConnection->recvNB( &_selfCommand, sizeof( _selfCommand ));
00070 }
00071 
00072 ConnectionSet::~ConnectionSet()
00073 {
00074     _connection = 0;
00075 
00076     _selfConnection->close();
00077     _selfConnection = 0;
00078 }
00079 
00080 
00081 void ConnectionSet::_dirtyFDSet()
00082 {
00083     if( _dirty )
00084         return;
00085 
00086     EQINFO << "FD set modified, restarting select" << std::endl;
00087     _dirty = true;
00088 
00089     interrupt();
00090 }
00091 
00092 void ConnectionSet::interrupt()
00093 {
00094     const char c = SELF_INTERRUPT;
00095     _selfConnection->send( &c, 1, true );
00096 }
00097 
00098 void ConnectionSet::addConnection( ConnectionPtr connection )
00099 {
00100     EQASSERT( connection->isConnected() || connection->isListening( ));
00101     {
00102         base::ScopedMutex mutex( _mutex );
00103         EQASSERTINFO( _connections.size() < 63,
00104             "Connection set can't handle more than 63 connections" );
00105         _connections.push_back( connection );
00106         connection->addListener( this );
00107     }
00108     _dirtyFDSet();
00109 }
00110 
00111 bool ConnectionSet::removeConnection( ConnectionPtr connection )
00112 {
00113     {
00114         ScopedMutex mutex( _mutex );
00115         ConnectionVector::iterator i = find( _connections.begin(),
00116                                              _connections.end(), connection );
00117         if( i == _connections.end( ))
00118             return false;
00119 
00120         connection->removeListener( this );
00121         _connections.erase( i );
00122     }
00123 
00124     if( _connection == connection )
00125         _connection = 0;
00126 
00127     _dirtyFDSet();
00128     return true;
00129 }
00130 
00131 void ConnectionSet::clear()
00132 {
00133     _connection = 0;
00134     for( ConnectionVector::iterator i = _connections.begin(); 
00135          i != _connections.end(); ++i )
00136 
00137         (*i)->removeListener( this );
00138 
00139     _connections.clear();
00140     _dirtyFDSet();
00141     _fdSet.clear();
00142     _fdSetConnections.clear();
00143 }
00144         
00145 ConnectionSet::Event ConnectionSet::select( const int timeout )
00146 {
00147     while( true )
00148     {
00149         _connection = 0;
00150         _error      = 0;
00151 
00152         if( !_setupFDSet( ))
00153             return EVENT_INVALID_HANDLE;
00154 
00155         // poll for a result
00156 #ifdef WIN32
00157         const DWORD waitTime = timeout > 0 ? timeout : INFINITE;
00158         const DWORD ret = WaitForMultipleObjectsEx( _fdSet.getSize(),
00159                                                     _fdSet.getData(),
00160                                                     FALSE, waitTime, TRUE );
00161 #else
00162         const int ret = poll( _fdSet.getData(), _fdSet.getSize(), timeout );
00163 #endif
00164         switch( ret )
00165         {
00166             case SELECT_TIMEOUT:
00167                 return EVENT_TIMEOUT;
00168 
00169             case SELECT_ERROR:
00170 #ifdef WIN32
00171                 _error = GetLastError();
00172                 if( _error == WSA_INVALID_HANDLE )
00173                 {
00174                     _dirty = true;
00175                     break;
00176                 }
00177 #else
00178                 if( errno == EINTR ) // Interrupted system call (gdb) - ignore
00179                     break;
00180 
00181                 _error = errno;
00182 #endif
00183 
00184                 EQERROR << "Error during select: " << EQ_SOCKET_ERROR << std::endl;
00185                 return EVENT_SELECT_ERROR;
00186 
00187             default: // SUCCESS
00188                 {
00189                     Event event = _getSelectResult( ret );
00190 
00191                     if( event == EVENT_NONE )
00192                          break;
00193 
00194                     if( _connection == _selfConnection )
00195                     {
00196                         EQASSERT( event == EVENT_DATA );
00197                         event = _handleSelfCommand();
00198                         if( event == EVENT_NONE )
00199                             break;
00200                         return event;
00201                     }
00202                     
00203                     if( event == EVENT_DATA && _connection->isListening( ))
00204                         event = EVENT_CONNECT;
00205 
00206                     EQVERB << "selected connection " << _connection << " of "
00207                            << _fdSetConnections.getSize() << ", event " << event
00208                            << std::endl;
00209                     return event;
00210                 }
00211         }
00212     }
00213 }
00214      
00215 ConnectionSet::Event ConnectionSet::_getSelectResult( const uint32_t index )
00216 {
00217 #ifdef WIN32
00218     const uint32_t i = index - WAIT_OBJECT_0;
00219     if( i >= MAXIMUM_WAIT_OBJECTS )
00220         return EVENT_INTERRUPT;
00221 
00222     _connection = _fdSetConnections[i];
00223     EQASSERT( _fdSet[i] == _connection->getNotifier( ));
00224 
00225     return EVENT_DATA;
00226 #else
00227     for( size_t i = 0; i < _fdSet.getSize(); ++i )
00228     {
00229         const pollfd& pollFD = _fdSet[i];
00230         if( pollFD.revents == 0 )
00231             continue;
00232 
00233         const int pollEvents = pollFD.revents;
00234         EQASSERT( pollFD.fd > 0 );
00235 
00236         _connection = _fdSetConnections[i];
00237         EQASSERT( _connection.isValid( ));
00238 
00239         EQVERB << "Got event on connection @" << (void*)_connection.get()
00240                << std::endl;
00241 
00242         if( pollEvents & POLLERR )
00243         {
00244             EQINFO << "Error during poll()" << std::endl;
00245             return EVENT_ERROR;
00246         }
00247 
00248         if( pollEvents & POLLHUP ) // disconnect happened
00249             return EVENT_DISCONNECT;
00250 
00251         if( pollEvents & POLLNVAL ) // disconnected connection
00252             return EVENT_DISCONNECT;
00253 
00254         // Note: Intuitively I would handle the read before HUP to
00255         // read remaining data of the connection, but at least on
00256         // OS X both events happen simultaneously and no more data
00257         // can be read.
00258         if( pollEvents & POLLIN || pollEvents & POLLPRI )
00259             return EVENT_DATA;
00260 
00261         EQERROR << "Unhandled poll event(s): " << pollEvents << std::endl;
00262         ::abort();
00263     }
00264     return EVENT_NONE;
00265 #endif
00266 }
00267 
00268 ConnectionSet::Event ConnectionSet::_handleSelfCommand()
00269 {
00270     EQASSERT( _connection == _selfConnection );
00271     _connection = 0;
00272     
00273     _selfConnection->recvSync( 0, 0 );
00274     const uint8_t command( _selfCommand );
00275     _selfConnection->recvNB( &_selfCommand, sizeof( _selfCommand ));
00276 
00277     switch( command ) 
00278     {
00279         case SELF_INTERRUPT:
00280             return EVENT_INTERRUPT;
00281 
00282         default:
00283             EQUNIMPLEMENTED;
00284             return EVENT_NONE;
00285     }
00286 }
00287 
00288 bool ConnectionSet::_setupFDSet()
00289 {
00290     if( !_dirty )
00291     {
00292 #ifndef WIN32
00293         // TODO: verify that poll() really modifies _fdSet, and remove the copy
00294         // if it doesn't. The man page seems to hint that poll changes fds.
00295         _fdSet = _fdSetCopy;
00296 #endif
00297         return true;
00298     }
00299 
00300     _dirty = false;
00301     _fdSet.setSize( 0 );
00302     _fdSetConnections.setSize( 0 );
00303 
00304 #ifdef WIN32
00305     // add self connection
00306     HANDLE readHandle = _selfConnection->getNotifier();
00307     EQASSERT( readHandle );
00308 
00309     _fdSet.append( readHandle );
00310     _fdSetConnections.append( _selfConnection.get( ));
00311 
00312     // add regular connections
00313     _mutex.set();
00314     for( ConnectionVector::const_iterator i = _connections.begin();
00315          i != _connections.end(); ++i )
00316     {
00317         ConnectionPtr connection = *i;
00318         readHandle = connection->getNotifier();
00319 
00320         if( !readHandle )
00321         {
00322             EQINFO << "Cannot select connection " << connection
00323                  << ", connection does not provide a read handle" << std::endl;
00324             _connection = connection;
00325             _mutex.unset();
00326             return false;
00327         }
00328         
00329         _fdSet.append( readHandle );
00330         _fdSetConnections.append( connection.get( ));
00331     }
00332     _mutex.unset();
00333 #else
00334     pollfd fd;
00335     fd.events = POLLIN; // | POLLPRI;
00336 
00337     // add self 'connection'
00338     fd.fd      = _selfConnection->getNotifier();
00339     EQASSERT( fd.fd > 0 );
00340     fd.revents = 0;
00341 
00342     _fdSet.append( fd );
00343     _fdSetConnections.append( _selfConnection.get( ));
00344 
00345     // add regular connections
00346     _mutex.set();
00347     for( ConnectionVector::const_iterator i = _connections.begin();
00348          i != _connections.end(); ++i )
00349     {
00350         ConnectionPtr connection = *i;
00351         fd.fd = connection->getNotifier();
00352 
00353         if( fd.fd <= 0 )
00354         {
00355             EQINFO << "Cannot select connection " << connection
00356                    << ", connection " << typeid( *connection.get( )).name() 
00357                    << " does not use a file descriptor" << std::endl;
00358             _connection = connection;
00359             _mutex.unset();
00360             return false;
00361         }
00362 
00363         EQVERB << "Listening on " << typeid( *connection.get( )).name() 
00364                << " @" << (void*)connection.get() << std::endl;
00365         fd.revents = 0;
00366 
00367         _fdSet.append( fd );
00368         _fdSetConnections.append( connection.get( ));
00369     }
00370     _mutex.unset();
00371     _fdSetCopy = _fdSet;
00372 #endif
00373 
00374     return true;
00375 }   
00376 
00377 EQ_EXPORT std::ostream& operator << ( std::ostream& os,
00378                                       const ConnectionSet* set)
00379 {
00380     const ConnectionVector& connections = set->getConnections();
00381 
00382     os << "connection set " << (void*)set << ", " << connections.size()
00383        << " connections";
00384     
00385     for( ConnectionVector::const_iterator i = connections.begin(); 
00386          i != connections.end(); ++i )
00387     {
00388         os << std::endl << "    " << (*i).get();
00389     }
00390     
00391     return os;
00392 }
00393 
00394 EQ_EXPORT std::ostream& operator << ( std::ostream& os, 
00395                                       const ConnectionSet::Event event )
00396 {
00397     if( event >= ConnectionSet::EVENT_ALL )
00398         os << "unknown (" << static_cast<unsigned>( event ) << ')';
00399     else 
00400         os << ( event == ConnectionSet::EVENT_NONE ? "none" :       
00401                 event == ConnectionSet::EVENT_CONNECT ? "connect" :        
00402                 event == ConnectionSet::EVENT_DISCONNECT ? "disconnect" :     
00403                 event == ConnectionSet::EVENT_DATA ? "data" :           
00404                 event == ConnectionSet::EVENT_TIMEOUT ? "timeout" :        
00405                 event == ConnectionSet::EVENT_INTERRUPT ? "interrupted" :      
00406                 event == ConnectionSet::EVENT_ERROR ? "error" :          
00407                 event == ConnectionSet::EVENT_SELECT_ERROR ? "select error" :   
00408                 event == ConnectionSet::EVENT_INVALID_HANDLE ? "invalid handle":
00409                 "ERROR" );
00410 
00411     return os;
00412 }
00413 
00414 }
00415 }
Generated on Mon Aug 10 18:58:32 2009 for Equalizer 0.9 by  doxygen 1.5.8