lib/net/node.cpp

00001 
00002 /* Copyright (c) 2005-2009, Stefan Eilemann <eile@equalizergraphics.com> 
00003  *
00004  * This library is free software; you can redistribute it and/or modify it under
00005  * the terms of the GNU Lesser General Public License version 2.1 as published
00006  * by the Free Software Foundation.
00007  *  
00008  * This library is distributed in the hope that it will be useful, but WITHOUT
00009  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00010  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00011  * details.
00012  * 
00013  * You should have received a copy of the GNU Lesser General Public License
00014  * along with this library; if not, write to the Free Software Foundation, Inc.,
00015  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00016  */
00017 
00018 #include "node.h"
00019 
00020 #include "command.h"
00021 #include "connectionSet.h"
00022 #include "global.h"
00023 #include "pipeConnection.h"
00024 #include "session.h"
00025 #include "socketConnection.h"
00026 
00027 #include <eq/base/base.h>
00028 #include <eq/base/launcher.h>
00029 #include <eq/base/rng.h>
00030 
00031 #include <errno.h>
00032 #include <sstream>
00033 #include <sys/stat.h>
00034 #include <sys/types.h>
00035 
00036 #ifdef WIN32_API
00037 #  include <direct.h>  // for chdir
00038 #  define chdir _chdir
00039 #endif
00040 
00041 using namespace eq::base;
00042 using namespace std;
00043 
00044 namespace eq
00045 {
00046 namespace net
00047 {
00048 //----------------------------------------------------------------------
00049 // State management
00050 //----------------------------------------------------------------------
00051 Node::Node()
00052         : _requestHandler( true )
00053         , _autoLaunch( false )
00054         , _id( true )
00055         , _state( STATE_STOPPED )
00056         , _launchID( EQ_ID_INVALID )
00057         , _programName( Global::getProgramName( ))
00058         , _workDir( Global::getWorkDir( ))
00059         , _hasSendToken( true )
00060 {
00061     _receiverThread = new ReceiverThread( this );
00062     _commandThread  = new CommandThread( this );
00063 
00064     registerCommand( CMD_NODE_STOP, 
00065                      CommandFunc<Node>( this, &Node::_cmdStop ),
00066                      &_commandThreadQueue );
00067     registerCommand( CMD_NODE_REGISTER_SESSION, 
00068                      CommandFunc<Node>( this, &Node::_cmdRegisterSession ),
00069                      &_commandThreadQueue );
00070     registerCommand( CMD_NODE_REGISTER_SESSION_REPLY,
00071                      CommandFunc<Node>( this, &Node::_cmdRegisterSessionReply ),
00072                      &_commandThreadQueue );
00073     registerCommand( CMD_NODE_MAP_SESSION, 
00074                      CommandFunc<Node>( this, &Node::_cmdMapSession ),
00075                      &_commandThreadQueue );
00076     registerCommand( CMD_NODE_MAP_SESSION_REPLY,
00077                      CommandFunc<Node>( this, &Node::_cmdMapSessionReply ),
00078                      &_commandThreadQueue );
00079     registerCommand( CMD_NODE_UNMAP_SESSION, 
00080                      CommandFunc<Node>( this, &Node::_cmdUnmapSession ),
00081                      &_commandThreadQueue );
00082     registerCommand( CMD_NODE_UNMAP_SESSION_REPLY,
00083                      CommandFunc<Node>( this, &Node::_cmdUnmapSessionReply ),
00084                      &_commandThreadQueue );
00085     registerCommand( CMD_NODE_CONNECT,
00086                      CommandFunc<Node>( this, &Node::_cmdConnect ), 0 );
00087     registerCommand( CMD_NODE_CONNECT_REPLY,
00088                      CommandFunc<Node>( this, &Node::_cmdConnectReply ), 0 );
00089     registerCommand( CMD_NODE_DISCONNECT,
00090                      CommandFunc<Node>( this, &Node::_cmdDisconnect ), 0 );
00091     registerCommand( CMD_NODE_GET_NODE_DATA,
00092                      CommandFunc<Node>( this, &Node::_cmdGetNodeData),
00093                      &_commandThreadQueue );
00094     registerCommand( CMD_NODE_GET_NODE_DATA_REPLY,
00095                      CommandFunc<Node>( this, &Node::_cmdGetNodeDataReply ),
00096                      &_commandThreadQueue );
00097     registerCommand( CMD_NODE_ACQUIRE_SEND_TOKEN,
00098                      CommandFunc<Node>( this, &Node::_cmdAcquireSendToken ), 0);
00099     registerCommand( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY,
00100                      CommandFunc<Node>( this, &Node::_cmdAcquireSendTokenReply )
00101                      , 0 );
00102     registerCommand( CMD_NODE_RELEASE_SEND_TOKEN,
00103                      CommandFunc<Node>( this, &Node::_cmdReleaseSendToken ), 0);
00104 
00105     EQINFO << "New Node @" << (void*)this << " " << _id << endl;
00106 }
00107 
00108 Node::~Node()
00109 {
00110     EQINFO << "Delete Node @" << (void*)this << " " << _id << endl;
00111     EQASSERT( _connection == 0 );
00112     EQASSERT( _connectionSet.empty( ));
00113     EQASSERT( _connectionNodes.empty( ));
00114     EQASSERT( _pendingCommands.empty( ));
00115     EQASSERT( _nodes.empty( ));
00116     EQASSERT( _requestHandler.isEmpty( ));
00117 
00118 #ifndef NDEBUG
00119     if( !_sessions.empty( ))
00120     {
00121         EQINFO << _sessions.size() << " mapped sessions" << endl;
00122         
00123         for( SessionHash::const_iterator i = _sessions.begin();
00124              i != _sessions.end(); ++i )
00125         {
00126             const Session* session = i->second;
00127             EQINFO << "    Session " << session->getID() << std::endl;
00128         }
00129     }
00130 #endif
00131 
00132     EQASSERT( _sessions.empty( ));
00133 
00134     EQASSERT( !_commandThread->isRunning( ));
00135     delete _commandThread;
00136     _commandThread = 0;
00137 
00138     EQASSERT( !_receiverThread->isRunning( ));
00139     delete _receiverThread;
00140     _receiverThread = 0;
00141 
00142     _connectionDescriptions.clear();
00143 }
00144 
00145 bool Node::operator == ( const Node* node ) const
00146 { 
00147     EQASSERTINFO( _id != node->_id || this == node,
00148                   "Two node instances with the same ID found "
00149                   << (void*)this << " and " << (void*)node );
00150 
00151     return ( this == node );
00152 }
00153 
00154 void Node::setProgramName( const std::string& name )
00155 {
00156     _programName = name;
00157 }
00158 
00159 void Node::setWorkDir( const std::string& name )
00160 {
00161     _workDir = name;
00162 }
00163 
00164 const ConnectionDescriptionVector& Node::getConnectionDescriptions() const 
00165 {
00166     return _connectionDescriptions;
00167 }
00168 
00169 bool Node::initLocal( const int argc, char** argv )
00170 {
00171 #ifndef NDEBUG
00172     EQINFO << disableFlush << "args: ";
00173     for( int i=0; i<argc; i++ )
00174          EQINFO << argv[i] << ", ";
00175     EQINFO << endl << enableFlush;
00176 #endif
00177 
00178     // We do not use getopt_long because it really does not work due to the
00179     // following aspects:
00180     // - reordering of arguments
00181     // - different behaviour of GNU and BSD implementations
00182     // - incomplete man pages
00183     bool   isClient   = false;
00184     bool   isResident = false;
00185     string clientOpts;
00186 
00187     for( int i=1; i<argc; ++i )
00188     {
00189         if( string( "--eq-listen" ) == argv[i] )
00190         {
00191             if( i<argc && argv[i+1][0] != '-' )
00192             {
00193                 string                        data = argv[++i];
00194                 ConnectionDescriptionPtr desc = new ConnectionDescription;
00195                 desc->TCPIP.port = Global::getDefaultPort();
00196 
00197                 if( desc->fromString( data ))
00198                 {
00199                     addConnectionDescription( desc );
00200                     EQASSERTINFO( data.empty(), data );
00201                 }
00202                 else
00203                     EQWARN << "Ignoring listen option: " << argv[i] << endl;
00204             }
00205         }
00206         else if( string( "--eq-client" ) == argv[i] )
00207         {
00208             isClient = true;
00209             if( i < argc-1 && argv[i+1][0] != '-' ) // server-started client
00210             {
00211                 clientOpts = argv[++i];
00212 
00213                 if( !deserialize( clientOpts ))
00214                     EQWARN << "Failed to parse client listen port parameters"
00215                            << endl;
00216                 EQASSERT( !clientOpts.empty( ));
00217             }
00218             else // resident render client
00219                 isResident = true;
00220         }
00221     }
00222     
00223     if( _connectionDescriptions.empty( )) // add default listener
00224     {
00225         ConnectionDescriptionPtr connDesc = new ConnectionDescription;
00226         connDesc->type       = CONNECTIONTYPE_TCPIP;
00227         connDesc->TCPIP.port = Global::getDefaultPort();
00228         addConnectionDescription( connDesc );
00229     }
00230 
00231     EQINFO << "Listener data: " << serialize() << endl;
00232 
00233     if( !listen( ))
00234     {
00235         EQWARN << "Can't setup listener(s)" << endl; 
00236         return false;
00237     }
00238     
00239     if( isClient )
00240     {
00241         EQINFO << "Client node started from command line with option " 
00242                << clientOpts << endl;
00243 
00244         bool ret = (isResident ? clientLoop() : runClient( clientOpts ));
00245 
00246         EQINFO << "Exit node process " << getRefCount() << endl;
00247         ret &= exitClient();
00248         ::exit( ret ? EXIT_SUCCESS : EXIT_FAILURE );
00249     }
00250 
00251     return true;
00252 }
00253 
00254 bool Node::listen()
00255 {
00256     if( _state != STATE_STOPPED )
00257         return false;
00258 
00259     if( !_connectSelf( ))
00260         return false;
00261 
00262     for( ConnectionDescriptionVector::const_iterator i =
00263              _connectionDescriptions.begin();
00264          i != _connectionDescriptions.end(); ++i )
00265     {
00266         ConnectionDescriptionPtr description = *i;
00267         ConnectionPtr            connection = Connection::create( description );
00268 
00269         if( !connection->listen( ))
00270         {
00271             EQWARN << "Can't create listener connection: " << description
00272                    << endl;
00273             return false;
00274         }
00275 
00276         _connectionNodes[ connection ] = this;
00277         _connectionSet.addConnection( connection );
00278         EQVERB << "Added node " << _id << " using " << connection << std::endl;
00279         
00280         connection->acceptNB();
00281     }
00282 
00283     _state = STATE_LISTENING;
00284 
00285     EQVERB << typeid(*this).name() << " starting command and receiver thread "
00286            << endl;
00287     _receiverThread->start();
00288 
00289     EQINFO << this << " listening." << endl;
00290     return true;
00291 }
00292 
00293 bool Node::stopListening()
00294 {
00295     if( _state != STATE_LISTENING )
00296         return false;
00297 
00298     NodeStopPacket packet;
00299     send( packet );
00300 
00301     EQCHECK( _receiverThread->join( ));
00302     _cleanup();
00303 
00304     EQINFO << _connectionSet.size() << " connections open after stopListening"
00305            << endl;
00306 #ifndef NDEBUG
00307     const ConnectionVector& connections = _connectionSet.getConnections();
00308     for( ConnectionVector::const_iterator i = connections.begin();
00309          i != connections.end(); ++i )
00310     {
00311         EQINFO << "    " << *i << endl;
00312     }
00313 #endif
00314 
00315     EQASSERT( _requestHandler.isEmpty( ));
00316     return true;
00317 }
00318 
00319 void Node::_addConnection( ConnectionPtr connection )
00320 {
00321     _connectionSet.addConnection( connection );
00322     connection->recvNB( new uint64_t, sizeof( uint64_t ));
00323 }
00324 
00325 void Node::_removeConnection( ConnectionPtr connection )
00326 {
00327     EQASSERT( connection.isValid( ));
00328 
00329     _connectionSet.removeConnection( connection );
00330 
00331     void* buffer( 0 );
00332     if( !connection->isListening( ))
00333     {
00334         uint64_t bytes( 0 );
00335         connection->getRecvData( &buffer, &bytes );
00336         EQASSERT( buffer );
00337         EQASSERT( bytes == sizeof( uint64_t ));
00338     }
00339 
00340     if( !connection->isClosed( ))
00341         connection->close(); // cancels pending IO's
00342     delete reinterpret_cast< uint64_t* >( buffer );
00343 }
00344 
00345 void Node::_cleanup()
00346 {
00347     EQVERB << "Clean up stopped node" << endl;
00348     EQASSERTINFO( _state == STATE_STOPPED, _state );
00349 
00350     _removeConnection( _connection );
00351     _connectionNodes.erase( _connection );
00352     _connection = 0;
00353 
00354     const ConnectionVector& connections = _connectionSet.getConnections();
00355     while( !connections.empty( ))
00356     {
00357         ConnectionPtr connection = connections.back();
00358         NodePtr       node       = _connectionNodes[ connection ];
00359 
00360         node->_state      = STATE_STOPPED;
00361         node->_connection = 0;
00362         _connectionNodes.erase( connection );
00363         _nodes.erase( node->_id );
00364 
00365         _removeConnection( connection );
00366     }
00367 
00368     if( !_connectionNodes.empty( ))
00369         EQINFO << _connectionNodes.size() << " open connections during cleanup"
00370                << endl;
00371 #ifndef NDEBUG
00372     for( ConnectionNodeHash::const_iterator i = _connectionNodes.begin();
00373          i != _connectionNodes.end(); ++i )
00374     {
00375         NodePtr node = i->second;
00376         EQINFO << "    " << i->first << " : " << node << endl;
00377         EQINFO << "    Node ref count " << node->getRefCount() - 1 
00378                << ' ' << node->_connection << ' ' << node->_state
00379                << ( node == this ? " self" : "" ) << endl;
00380     }
00381 #endif
00382 
00383     _connectionNodes.clear();
00384 
00385     if( !_nodes.empty( ))
00386         EQINFO << _nodes.size() << " nodes connected during cleanup" << endl;
00387 
00388 #ifndef NDEBUG
00389     for( base::UUIDHash< NodePtr >::const_iterator i = _nodes.begin();
00390          i != _nodes.end(); ++i )
00391     {
00392         NodePtr node = i->second;
00393         EQINFO << "    " << node << " ref count " << node->getRefCount() - 1 
00394                << ' ' << node->_connection << ' ' << node->_state
00395                << ( node == this ? " self" : "" ) << endl;
00396     }
00397 #endif
00398 
00399     _nodes.clear();
00400 }
00401 
00402 bool Node::_connectSelf()
00403 {
00404     // setup local connection to myself
00405     _connection = new PipeConnection;
00406     if( !_connection->connect())
00407     {
00408         EQERROR << "Could not create local connection to receiver thread."
00409                 << endl;
00410         _connection = 0;
00411         return false;
00412     }
00413 
00414     // add to connection set
00415     EQASSERT( _connection->getDescription().isValid( )); 
00416     EQASSERT( _connectionNodes.find( _connection ) == _connectionNodes.end( ));
00417 
00418     _connectionNodes[ _connection ] = this;
00419     _nodes[ _id ] = this;
00420 
00421     _addConnection( _connection );
00422 
00423     EQVERB << "Added node " << _id << " using " << _connection << std::endl;
00424     return true;
00425 }
00426 
00427 bool Node::connect( NodePtr node, ConnectionPtr connection )
00428 {
00429     EQASSERT( connection.isValid( ));
00430 
00431     if( !node.isValid() || _state != STATE_LISTENING ||
00432         !connection->isConnected() || node->_state != STATE_STOPPED )
00433     {
00434         return false;
00435     }
00436 
00437     _addConnection( connection );
00438 
00439     // send connect packet to peer
00440     NodeConnectPacket packet;
00441     packet.requestID = _requestHandler.registerRequest( node.get( ));
00442     packet.nodeID    = _id;
00443     packet.nodeID.convertToNetwork();
00444     packet.type      = getType();
00445     packet.launchID  = node->_launchID;
00446     node->_launchID  = EQ_ID_INVALID;
00447     connection->send( packet, serialize( ));
00448 
00449     bool connected = false;
00450     _requestHandler.waitRequest( packet.requestID, connected );
00451     if( !connected )
00452         return false;
00453 
00454     EQASSERT( node->_id != NodeID::ZERO );
00455     EQASSERTINFO( node->_id != _id, _id );
00456     EQINFO << node << " connected to " << this << endl;
00457     return true;
00458 }
00459 
00460 NodePtr Node::getNode( const NodeID& id ) const
00461 { 
00462     base::UUIDHash< NodePtr >::const_iterator iter = _nodes.find( id );
00463     if( iter == _nodes.end( ))
00464         return 0;
00465     return iter->second;
00466 }
00467 
00468 bool Node::disconnect( NodePtr node )
00469 {
00470     if( !node || _state != STATE_LISTENING )
00471         return false;
00472 
00473     if( node->_state != STATE_CONNECTED )
00474         return true;
00475 
00476     EQASSERT( !inCommandThread( ));
00477 
00478     NodeDisconnectPacket packet;
00479     packet.requestID = _requestHandler.registerRequest( node.get( ));
00480     send( packet );
00481 
00482     _requestHandler.waitRequest( packet.requestID );
00483     return true;
00484 }
00485 
00486 void Node::addConnectionDescription( ConnectionDescriptionPtr cd )
00487 {
00488     _connectionDescriptions.push_back( cd ); 
00489 }
00490 
00491 //----------------------------------------------------------------------
00492 // Node functionality
00493 //----------------------------------------------------------------------
00494 void Node::_addSession( Session* session, NodePtr server,
00495                        const uint32_t sessionID )
00496 {
00497     CHECK_THREAD( _thread );
00498 
00499     session->_server    = server;
00500     session->_id        = sessionID;
00501     session->_isMaster  = ( server==this && isLocal( ));
00502     session->_setLocalNode( this );
00503     if( session->_isMaster )
00504         session->_idPool.freeIDs( 1, IDPool::MAX_CAPACITY );
00505 
00506     EQASSERTINFO( _sessions.find( sessionID ) == _sessions.end(),
00507                   "Session " << sessionID << " @" << (void*)session
00508                   << " already mapped on " << this << " @"
00509                   <<  (void*)_sessions[ sessionID ] );
00510     _sessions[ sessionID ] = session;
00511 
00512     EQINFO << (session->_isMaster ? "master" : "client") << " session, id "
00513            << sessionID << ", served by " << server.get() << ", mapped on "
00514            << this << endl;
00515 }
00516 
00517 void Node::_removeSession( Session* session )
00518 {
00519     CHECK_THREAD( _thread );
00520     EQASSERT( _sessions.find( session->getID( )) != _sessions.end( ));
00521 
00522     _sessions.erase( session->getID( ));
00523     EQINFO << "Erased session " << session->getID() << ", " << _sessions.size()
00524            << " left" << std::endl;
00525 
00526     session->_setLocalNode( 0 );
00527     session->_server    = 0;
00528     session->_id        = EQ_ID_INVALID;
00529     session->_isMaster  = false;
00530 }
00531 
00532 bool Node::registerSession( Session* session )
00533 {
00534     EQASSERT( isLocal( ));
00535     EQASSERT( !inCommandThread( ));
00536 
00537     NodeRegisterSessionPacket packet;
00538     packet.requestID  = _requestHandler.registerRequest( session );
00539     send( packet );
00540 
00541     uint32_t sessionID = EQ_ID_INVALID;
00542     _requestHandler.waitRequest( packet.requestID, sessionID );
00543     return (sessionID == session->getID( ));
00544 }
00545 
00546 bool Node::mapSession( NodePtr server, Session* session, const uint32_t id )
00547 {
00548     EQASSERT( isLocal( ));
00549     EQASSERT( id != EQ_ID_INVALID );
00550     EQASSERT( server != this );
00551 
00552     NodeMapSessionPacket packet;
00553     packet.requestID = _requestHandler.registerRequest( session );
00554     packet.sessionID = id;
00555     server->send( packet );
00556 
00557     uint32_t sessionID = EQ_ID_INVALID;
00558     _requestHandler.waitRequest( packet.requestID, sessionID );
00559     return (sessionID == session->getID( ));
00560 }
00561 
00562 bool Node::unmapSession( Session* session )
00563 {
00564     EQASSERT( isLocal( ));
00565 
00566     NodeUnmapSessionPacket packet;
00567     packet.requestID = _requestHandler.registerRequest( session );
00568     packet.sessionID = session->getID();
00569     session->getServer()->send( packet );
00570 
00571     bool ret = false;
00572     _requestHandler.waitRequest( packet.requestID, ret );
00573     return ret;
00574 }
00575 
00576 Session* Node::getSession( const uint32_t id )
00577 {
00578     SessionHash::const_iterator i = _sessions.find( id );
00579     
00580     EQASSERT( i != _sessions.end( ));
00581     if( i == _sessions.end( ))
00582         return 0;
00583 
00584     return i->second;
00585 }
00586 
00587 uint32_t Node::_generateSessionID()
00588 {
00589     CHECK_THREAD( _thread );
00590     RNG      rng;
00591     uint32_t id  = rng.get<uint32_t>();
00592 
00593     while( id == EQ_ID_INVALID || _sessions.find( id ) != _sessions.end( ))
00594         id = rng.get<uint32_t>();
00595 
00596     return id;  
00597 }
00598 
00599 #define SEPARATOR '#'
00600 
00601 std::string Node::serialize() const
00602 {
00603     ostringstream data;
00604     data << _id << SEPARATOR << _connectionDescriptions.size() << SEPARATOR;
00605 
00606     for( vector< ConnectionDescriptionPtr >::const_iterator i = 
00607              _connectionDescriptions.begin(); 
00608          i != _connectionDescriptions.end(); ++i )
00609     {
00610         ConnectionDescriptionPtr desc = *i;
00611         desc->serialize( data );
00612     }
00613     
00614     return data.str();
00615 }
00616  
00617 bool Node::deserialize( std::string& data )
00618 {
00619     EQASSERT( getState() == STATE_STOPPED || getState() == STATE_LAUNCHED );
00620 
00621     EQINFO << "Node data: " << data << endl;
00622     if( !_connectionDescriptions.empty( ))
00623         EQWARN << "Node already holds data while deserializing it" << endl;
00624 
00625     // node id
00626     size_t nextPos = data.find( SEPARATOR );
00627     if( nextPos == string::npos || nextPos == 0 )
00628     {
00629         EQERROR << "Could not parse node data" << endl;
00630         return false;
00631     }
00632 
00633     _id = data.substr( 0, nextPos );
00634     data = data.substr( nextPos + 1 );
00635 
00636     // num connection descriptions
00637     nextPos = data.find( SEPARATOR );
00638     if( nextPos == string::npos || nextPos == 0 )
00639     {
00640         EQERROR << "Could not parse node data" << endl;
00641         return false;
00642     }
00643 
00644     const string sizeStr = data.substr( 0, nextPos );
00645     if( !isdigit( sizeStr[0] ))
00646     {
00647         EQERROR << "Could not parse node data" << endl;
00648         return false;
00649     }
00650 
00651     const size_t nDesc = atoi( sizeStr.c_str( ));
00652     data = data.substr( nextPos + 1 );
00653 
00654     // connection descriptions
00655     for( size_t i = 0; i<nDesc; ++i )
00656     {
00657         ConnectionDescriptionPtr desc = new ConnectionDescription;
00658         if( !desc->fromString( data ))
00659         {
00660             EQERROR << "Error during node connection data parsing" << endl;
00661             return false;
00662         }
00663         addConnectionDescription( desc );
00664     }
00665 
00666     return true;
00667 }
00668 
00669 NodePtr Node::createNode( const uint32_t type )
00670 {
00671     EQASSERTINFO( type == TYPE_EQNET_NODE, type );
00672     return new Node();
00673 }
00674 
00675 
00676 void Node::acquireSendToken( NodePtr node )
00677 {
00678     NodeAcquireSendTokenPacket packet;
00679     packet.requestID = _requestHandler.registerRequest();
00680     node->send( packet );
00681     _requestHandler.waitRequest( packet.requestID );
00682 }
00683 
00684 void Node::releaseSendToken( NodePtr node )
00685 {
00686     NodeReleaseSendTokenPacket packet;
00687     node->send( packet );
00688 }
00689 
00690 //----------------------------------------------------------------------
00691 // receiver thread functions
00692 //----------------------------------------------------------------------
00693 void* Node::_runReceiverThread()
00694 {
00695     EQINFO << "Entered receiver thread of " << typeid( *this ).name() << endl;
00696     _commandThread->start();
00697 
00698     int nErrors = 0;
00699     while( _state == STATE_LISTENING )
00700     {
00701         const ConnectionSet::Event result = _connectionSet.select();
00702         switch( result )
00703         {
00704             case ConnectionSet::EVENT_CONNECT:
00705                 _handleConnect();
00706                 break;
00707 
00708             case ConnectionSet::EVENT_DATA:      
00709                 _handleData();
00710                 break;
00711 
00712             case ConnectionSet::EVENT_DISCONNECT:
00713             case ConnectionSet::EVENT_INVALID_HANDLE:
00714             {
00715                 _handleDisconnect();
00716                 EQVERB << &_connectionSet << endl;
00717                 break;
00718             } 
00719 
00720             case ConnectionSet::EVENT_TIMEOUT:   
00721                 EQINFO << "select timeout" << endl;
00722                 break;
00723 
00724             case ConnectionSet::EVENT_ERROR:      
00725                 ++nErrors;
00726                 EQWARN << "Connection signalled error during select" << endl;
00727                 if( nErrors > 100 )
00728                 {
00729                     EQWARN << "Too many errors in a row, capping connection" 
00730                            << endl;
00731                     _handleDisconnect();
00732                 }
00733                 break;
00734 
00735             case ConnectionSet::EVENT_SELECT_ERROR:      
00736                 EQWARN << "Error during select" << endl;
00737                 ++nErrors;
00738                 if( nErrors > 10 )
00739                 {
00740                     EQWARN << "Too many errors in a row" << endl;
00741                     EQUNIMPLEMENTED;
00742                 }
00743                 break;
00744 
00745             case ConnectionSet::EVENT_INTERRUPT:
00746                 _redispatchCommands();
00747                 break;
00748 
00749             default:
00750                 EQUNIMPLEMENTED;
00751         }
00752         if( result != ConnectionSet::EVENT_ERROR && 
00753             result != ConnectionSet::EVENT_SELECT_ERROR )
00754 
00755             nErrors = 0;
00756     }
00757 
00758     if( !_pendingCommands.empty( ))
00759         EQWARN << _pendingCommands.size() 
00760                << " commands pending while leaving command thread" << endl;
00761 
00762     for( CommandList::const_iterator i = _pendingCommands.begin();
00763          i != _pendingCommands.end(); ++i )
00764     {
00765         Command* command = *i;
00766         command->release();
00767     }
00768 
00769     EQCHECK( _commandThread->join( ));
00770     _pendingCommands.clear();
00771     _commandCache.flush();
00772 
00773     EQINFO << "Leaving receiver thread of " << typeid( *this ).name() << endl;
00774     return EXIT_SUCCESS;
00775 }
00776 
00777 void Node::_handleConnect()
00778 {
00779     ConnectionPtr connection = _connectionSet.getConnection();
00780     ConnectionPtr newConn    = connection->acceptSync();
00781     connection->acceptNB();
00782 
00783     if( !newConn )
00784     {
00785         EQINFO << "Received connect event, but accept() failed" << endl;
00786         return;
00787     }
00788 
00789     _addConnection( newConn );
00790     // Node will be created when receiving NodeConnectPacket from other side
00791 }
00792 
00793 void Node::_handleDisconnect()
00794 {
00795     while( _handleData( )) ; // read remaining data off connection
00796 
00797     ConnectionPtr connection = _connectionSet.getConnection();
00798     NodePtr node;
00799     ConnectionNodeHash::const_iterator i = _connectionNodes.find( connection );
00800     if( i != _connectionNodes.end( ))
00801         node = i->second;
00802 
00803     if( node.isValid( ))
00804     {
00805         node->_state      = STATE_STOPPED;
00806         node->_connection = 0;
00807         _connectionNodes.erase( connection );
00808         _nodes.erase( node->_id );
00809     }
00810 
00811     _removeConnection( connection );
00812 
00813     EQINFO << node << " disconnected from " << this << " connection used " 
00814            << connection->getRefCount() << endl;
00815 }
00816 
00817 bool Node::_handleData()
00818 {
00819     ConnectionPtr connection = _connectionSet.getConnection();
00820     EQASSERT( connection.isValid( ));
00821 
00822     NodePtr node;
00823     ConnectionNodeHash::const_iterator i = _connectionNodes.find( connection );
00824     if( i != _connectionNodes.end( ))
00825         node = i->second;
00826     EQASSERTINFO( !node || node->_connection == connection, 
00827                   typeid( *node.get( )).name( ));
00828 
00829     EQVERB << "Handle data from " << node << endl;
00830 
00831     void* sizePtr( 0 );
00832     uint64_t bytes( 0 );
00833     const bool gotSize = connection->recvSync( &sizePtr, &bytes );
00834 
00835     if( !gotSize ) // Some systems signal data on dead connections.
00836     {
00837         connection->recvNB( sizePtr, sizeof( uint64_t ));
00838         return false;
00839     }
00840 
00841     EQASSERT( sizePtr );
00842     const uint64_t size = *reinterpret_cast< uint64_t* >( sizePtr );
00843     EQASSERT( size );
00844     EQASSERT( bytes == sizeof( uint64_t ));
00845     EQASSERT( size > sizeof( size ));
00846 
00847     Command& command = _commandCache.alloc( node, this, size );
00848     uint8_t* ptr = reinterpret_cast< uint8_t* >( command.getPacket()) +
00849                                                  sizeof( uint64_t );
00850 
00851     connection->recvNB( ptr, size - sizeof( uint64_t ));
00852     const bool gotData = connection->recvSync( 0, 0 );    
00853 
00854     EQASSERT( gotData );
00855     EQASSERT( command.isValid( ));
00856 
00857     // start next receive
00858     connection->recvNB( sizePtr, sizeof( uint64_t ));
00859 
00860     if( !gotData )
00861     {
00862         EQERROR << "Incomplete packet read: " << command << std::endl;
00863         return false;
00864     }
00865 
00866     // This is one of the initial packets during the connection handshake, at
00867     // this point the remote node is not yet available.
00868     EQASSERTINFO( node.isValid() ||
00869                  ( command->datatype == DATATYPE_EQNET_NODE &&
00870                   ( command->command == CMD_NODE_CONNECT  || 
00871                     command->command == CMD_NODE_CONNECT_REPLY)),
00872                   command << " connection " << connection );
00873 
00874     _dispatchCommand( command );
00875     return true;
00876 }
00877 
00878 void Node::_dispatchCommand( Command& command )
00879 {
00880     EQASSERT( command.isValid( ));
00881 
00882     const bool dispatched = dispatchCommand( command );
00883  
00884     _redispatchCommands();
00885   
00886     if( !dispatched )
00887     {
00888         command.retain();
00889         _pendingCommands.push_back( &command );
00890     }
00891 }
00892 
00893 bool Node::dispatchCommand( Command& command )
00894 {
00895     EQVERB << "dispatch " << command << " by " << _id << endl;
00896     EQASSERT( command.isValid( ));
00897 
00898     const uint32_t datatype = command->datatype;
00899     switch( datatype )
00900     {
00901         case DATATYPE_EQNET_NODE:
00902             return Dispatcher::dispatchCommand( command );
00903 
00904         case DATATYPE_EQNET_SESSION:
00905         case DATATYPE_EQNET_OBJECT:
00906         {
00907             const SessionPacket* sessionPacket = 
00908                 static_cast<SessionPacket*>( command.getPacket( ));
00909             const uint32_t       id            = sessionPacket->sessionID;
00910 
00911             EQASSERTINFO( _sessions.find( id ) != _sessions.end(), 
00912                           "Can't find session for " << sessionPacket );
00913             Session*             session       = _sessions[ id ];
00914             EQASSERT( session );
00915             
00916             return session->dispatchCommand( command );
00917         }
00918 
00919         default:
00920             EQABORT( "Unknown datatype " << datatype << " for " << command );
00921             return true;
00922     }
00923 }
00924 
00925 void Node::_redispatchCommands()
00926 {
00927     bool changes = true;
00928     while( changes && !_pendingCommands.empty( ))
00929     {
00930         changes = false;
00931 
00932         for( CommandList::iterator i = _pendingCommands.begin();
00933              i != _pendingCommands.end(); ++i )
00934         {
00935             Command* command = *i;
00936             EQASSERT( command->isValid( ));
00937 
00938             if( dispatchCommand( *command ))
00939             {
00940                 _pendingCommands.erase( i );
00941                 command->release();
00942                 changes = true;
00943                 break;
00944             }
00945         }
00946     }
00947 
00948 #ifndef NDEBUG
00949     if( !_pendingCommands.empty( ))
00950         EQVERB << _pendingCommands.size() << " undispatched commands" << endl;
00951     EQASSERT( _pendingCommands.size() < 1000 );
00952 #endif
00953 }
00954 
00955 //----------------------------------------------------------------------
00956 // command thread functions
00957 //----------------------------------------------------------------------
00958 void* Node::_runCommandThread()
00959 {
00960     EQINFO << "Entered command thread of " << typeid( *this ).name() << endl;
00961 
00962     while( _state == STATE_LISTENING )
00963     {
00964         Command* command = _commandThreadQueue.pop();
00965         EQASSERT( command->isValid( ));
00966 
00967         const CommandResult result  = invokeCommand( *command );
00968         switch( result )
00969         {
00970             case COMMAND_ERROR:
00971                 EQABORT( "Error handling " << *command );
00972                 break;
00973 
00974             case COMMAND_HANDLED:
00975             case COMMAND_DISCARD:
00976                 break;
00977 
00978             default:
00979                 EQUNIMPLEMENTED;
00980         }
00981 
00982         command->release();
00983     }
00984  
00985     _commandThreadQueue.flush();
00986     EQINFO << "Leaving command thread of " << typeid( *this ).name() << endl;
00987     return EXIT_SUCCESS;
00988 }
00989 
00990 CommandResult Node::invokeCommand( Command& command )
00991 {
00992     EQVERB << "dispatch " << command << " by " << _id << endl;
00993     EQASSERT( command.isValid( ));
00994 
00995     const uint32_t datatype = command->datatype;
00996     switch( datatype )
00997     {
00998         case DATATYPE_EQNET_NODE:
00999             return Dispatcher::invokeCommand( command );
01000 
01001         case DATATYPE_EQNET_SESSION:
01002         case DATATYPE_EQNET_OBJECT:
01003         {
01004             const SessionPacket* sessionPacket = 
01005                 static_cast<SessionPacket*>( command.getPacket( ));
01006             const uint32_t id = sessionPacket->sessionID;
01007 
01008             EQASSERTINFO( _sessions.find( id ) != _sessions.end( ),
01009                           "Can't find session for " << sessionPacket );
01010 
01011             Session* session = _sessions[ id ];
01012             if( !session )
01013                 return COMMAND_ERROR;
01014 
01015             return session->invokeCommand( command );
01016         }
01017 
01018         default:
01019             EQABORT( "Unknown datatype " << datatype << " for " << command );
01020             return COMMAND_ERROR;
01021     }
01022 }
01023 
01024 CommandResult Node::_cmdStop( Command& command )
01025 {
01026     EQINFO << "Cmd stop " << this << endl;
01027     EQASSERT( _state == STATE_LISTENING );
01028 
01029     _state = STATE_STOPPED;
01030     _connectionSet.interrupt();
01031 
01032     return COMMAND_HANDLED;
01033 }
01034 
01035 CommandResult Node::_cmdRegisterSession( Command& command )
01036 {
01037     EQASSERT( getState() == STATE_LISTENING );
01038 
01039     const NodeRegisterSessionPacket* packet = 
01040         command.getPacket<NodeRegisterSessionPacket>();
01041     EQVERB << "Cmd register session: " << packet << endl;
01042     CHECK_THREAD( _thread );
01043     
01044     Session* session = static_cast< Session* >( 
01045         _requestHandler.getRequestData( packet->requestID ));
01046 
01047     EQASSERTINFO( command.getNode() == this, 
01048                   command.getNode() << " != " << this );
01049     EQASSERT( session );
01050 
01051     const uint32_t sessionID = _generateSessionID();
01052     _addSession( session, this, sessionID );
01053 
01054     NodeRegisterSessionReplyPacket reply( packet );
01055     reply.sessionID = session->getID();
01056     send( reply );
01057         
01058     return COMMAND_HANDLED;
01059 }
01060 
01061 CommandResult Node::_cmdRegisterSessionReply( Command& command)
01062 {
01063     const NodeRegisterSessionReplyPacket* packet = 
01064         command.getPacket<NodeRegisterSessionReplyPacket>();
01065     EQVERB << "Cmd register session reply: " << packet << endl;
01066     CHECK_THREAD( _thread );
01067 
01068     EQASSERT( command.getNode() == this );
01069 
01070     _requestHandler.serveRequest( packet->requestID, packet->sessionID );
01071     return COMMAND_HANDLED;
01072 }
01073 
01074 
01075 CommandResult Node::_cmdMapSession( Command& command )
01076 {
01077     EQASSERT( getState() == STATE_LISTENING );
01078 
01079     const NodeMapSessionPacket* packet = 
01080         command.getPacket<NodeMapSessionPacket>();
01081     EQVERB << "Cmd map session: " << packet << endl;
01082     CHECK_THREAD( _thread );
01083     
01084     NodePtr node = command.getNode();
01085     NodeMapSessionReplyPacket reply( packet );
01086 
01087     if( node == this )
01088     {
01089         EQASSERTINFO( node == this, 
01090                       "Can't map a session using myself as server " );
01091         reply.sessionID = EQ_ID_INVALID;
01092     }
01093     else
01094     {
01095         const uint32_t sessionID = packet->sessionID;
01096         SessionHash::const_iterator i = _sessions.find( sessionID );
01097         
01098         if( i == _sessions.end( ))
01099             reply.sessionID = EQ_ID_INVALID;
01100     }
01101 
01102     node->send( reply );
01103     return COMMAND_HANDLED;
01104 }
01105 
01106 CommandResult Node::_cmdMapSessionReply( Command& command)
01107 {
01108     const NodeMapSessionReplyPacket* packet = 
01109         command.getPacket<NodeMapSessionReplyPacket>();
01110     EQVERB << "Cmd map session reply: " << packet << endl;
01111     CHECK_THREAD( _thread );
01112 
01113     const uint32_t requestID = packet->requestID;
01114     if( packet->sessionID != EQ_ID_INVALID )
01115     {
01116         NodePtr  node    = command.getNode(); 
01117         Session* session = static_cast< Session* >( 
01118             _requestHandler.getRequestData( requestID ));
01119         EQASSERT( session );
01120         EQASSERT( node != this );
01121 
01122         _addSession( session, node, packet->sessionID );
01123     }
01124 
01125     _requestHandler.serveRequest( requestID, packet->sessionID );
01126     return COMMAND_HANDLED;
01127 }
01128 
01129 CommandResult Node::_cmdUnmapSession( Command& command )
01130 {
01131     const NodeUnmapSessionPacket* packet =
01132         command.getPacket<NodeUnmapSessionPacket>();
01133     EQVERB << "Cmd unmap session: " << packet << endl;
01134     CHECK_THREAD( _thread );
01135     
01136     const uint32_t sessionID = packet->sessionID;
01137     SessionHash::const_iterator i = _sessions.find( sessionID );
01138     Session* session = (i == _sessions.end() ? 0 : i->second );
01139 
01140     NodeUnmapSessionReplyPacket reply( packet );
01141     reply.result = (session != 0);
01142 
01143 #if 0
01144     if( session && session->_server == this )
01145         ;// TODO: unmap all session slave instances.
01146 #endif
01147 
01148     command.getNode()->send( reply );
01149     return COMMAND_HANDLED;
01150 }
01151 
01152 CommandResult Node::_cmdUnmapSessionReply( Command& command)
01153 {
01154     const NodeUnmapSessionReplyPacket* packet = 
01155         command.getPacket<NodeUnmapSessionReplyPacket>();
01156     EQVERB << "Cmd unmap session reply: " << packet << endl;
01157     CHECK_THREAD( _thread );
01158 
01159     const uint32_t requestID = packet->requestID;
01160     Session* session = static_cast< Session* >(
01161         _requestHandler.getRequestData( requestID ));
01162     EQASSERT( session );
01163 
01164     if( session )
01165     {
01166         _removeSession( session ); // TODO use session existence as return value
01167         _requestHandler.serveRequest( requestID, true );
01168     }
01169     else
01170         _requestHandler.serveRequest( requestID, false );
01171 
01172     // packet->result is false if server-side session was already unmapped
01173     return COMMAND_HANDLED;
01174 }
01175 
01176 CommandResult Node::_cmdConnect( Command& command )
01177 {
01178     EQASSERT( !command.getNode().isValid( ));
01179     EQASSERT( inReceiverThread( ));
01180 
01181     const NodeConnectPacket* packet = command.getPacket<NodeConnectPacket>();
01182     ConnectionPtr        connection = _connectionSet.getConnection();
01183 
01184     NodeID nodeID = packet->nodeID;
01185     nodeID.convertToHost();
01186 
01187     EQINFO << "handle connect " << packet << endl;
01188     EQASSERT( nodeID != _id );
01189     EQASSERT( _connectionNodes.find( connection ) == _connectionNodes.end( ));
01190 
01191     if( _nodes.find( nodeID ) != _nodes.end( ))
01192     {   // Node exists, probably simultaneous connect from peer
01193         EQASSERT( packet->launchID == EQ_ID_INVALID );
01194         EQINFO << "Already got node " << nodeID << ", refusing connect"
01195                << endl;
01196 
01197         // refuse connection
01198         NodeConnectReplyPacket reply( packet );
01199         connection->send( reply, serialize( ));
01200 
01201         // NOTE: There used to be no close() here. If deadlocks occur, it is
01202         // likely that the reply packet above can't be received by the peer
01203         // because the connection is closed by us. In that case, do not close
01204         // the connection here. Take care to cancel the pending IO for the next
01205         // packet size, and to delete the memory for the packet size.
01206         _removeConnection( connection );
01207         return COMMAND_HANDLED;
01208     }
01209 
01210     // create and add connected node
01211     NodePtr remoteNode;
01212     if( packet->launchID != EQ_ID_INVALID )
01213     {
01214         void* ptr = _requestHandler.getRequestData( packet->launchID );
01215         EQASSERT( dynamic_cast< Node* >( (Dispatcher*)ptr ));
01216         remoteNode = static_cast< Node* >( ptr );
01217         remoteNode->_connectionDescriptions.clear(); //use actual data from peer
01218     }
01219     else
01220         remoteNode = createNode( packet->type );
01221 
01222     string data = packet->nodeData;
01223     if( !remoteNode->deserialize( data ))
01224         EQWARN << "Error during node initialization" << endl;
01225     EQASSERT( data.empty( ));
01226     EQASSERTINFO( remoteNode->_id == nodeID,
01227                   remoteNode->_id << "!=" << nodeID );
01228 
01229     remoteNode->_connection = connection;
01230     remoteNode->_state      = STATE_CONNECTED;
01231     
01232     _connectionNodes[ connection ] = remoteNode;
01233     _nodes[ remoteNode->_id ]      = remoteNode;
01234     EQVERB << "Added node " << nodeID << " using " << connection << std::endl;
01235 
01236     // send our information as reply
01237     NodeConnectReplyPacket reply( packet );
01238     reply.nodeID    = _id;
01239     reply.nodeID.convertToNetwork();
01240 
01241     reply.type      = getType();
01242 
01243     connection->send( reply, serialize( ));
01244 
01245     if( packet->launchID != EQ_ID_INVALID )
01246         _requestHandler.serveRequest( packet->launchID );
01247     
01248     return COMMAND_HANDLED;
01249 }
01250 
01251 CommandResult Node::_cmdConnectReply( Command& command )
01252 {
01253     EQASSERT( !command.getNode( ));
01254     EQASSERT( inReceiverThread( ));
01255 
01256     const NodeConnectReplyPacket* packet = 
01257         command.getPacket<NodeConnectReplyPacket>();
01258     ConnectionPtr connection = _connectionSet.getConnection();
01259 
01260     NodeID nodeID = packet->nodeID;
01261     nodeID.convertToHost();
01262 
01263     EQINFO << "handle connect reply " << packet << endl;
01264     EQASSERT( _connectionNodes.find( connection ) == _connectionNodes.end( ));
01265 
01266     if( nodeID == NodeID::ZERO ||               // connection refused
01267         _nodes.find( nodeID ) != _nodes.end( )) // Node exists, probably
01268                                                 // simultaneous connect
01269     {
01270         EQINFO << "ignoring connect reply, node already connected" << endl;
01271         _removeConnection( connection );
01272         
01273         if( packet->requestID != EQ_ID_INVALID )
01274             _requestHandler.serveRequest( packet->requestID, false );
01275         
01276         return COMMAND_HANDLED;
01277     }
01278 
01279     // create and add node
01280     NodePtr remoteNode;
01281     if( packet->requestID != EQ_ID_INVALID )
01282     {
01283         void* ptr = _requestHandler.getRequestData( packet->requestID );
01284         EQASSERT( dynamic_cast< Node* >( (Dispatcher*)ptr ));
01285         remoteNode = static_cast< Node* >( ptr );
01286         remoteNode->_connectionDescriptions.clear(); //get actual data from peer
01287     }
01288 
01289     if( !remoteNode )
01290         remoteNode = createNode( packet->type );
01291 
01292     EQASSERT( remoteNode->getType() == packet->type );
01293     EQASSERT( remoteNode->getState() == STATE_STOPPED );
01294 
01295     string data = packet->nodeData;
01296     if( !remoteNode->deserialize( data ))
01297         EQWARN << "Error during node initialization" << endl;
01298     EQASSERT( data.empty( ));
01299     EQASSERT( remoteNode->_id == nodeID );
01300 
01301     remoteNode->_connection = connection;
01302     remoteNode->_state      = STATE_CONNECTED;
01303     
01304     _connectionNodes[ connection ] = remoteNode;
01305     _nodes[ remoteNode->_id ]      = remoteNode;
01306     EQVERB << "Added node " << nodeID << " using " << connection << std::endl;
01307 
01308     if( packet->requestID != EQ_ID_INVALID )
01309         _requestHandler.serveRequest( packet->requestID, true );
01310     return COMMAND_HANDLED;
01311 }
01312 
01313 CommandResult Node::_cmdDisconnect( Command& command )
01314 {
01315     EQASSERT( inReceiverThread( ));
01316 
01317     const NodeDisconnectPacket* packet =
01318         command.getPacket<NodeDisconnectPacket>();
01319 
01320     NodePtr node = static_cast<Node*>( 
01321         _requestHandler.getRequestData( packet->requestID ));
01322     EQASSERT( node.isValid( ));
01323 
01324     ConnectionPtr connection = node->_connection;
01325     if( connection.isValid( ))
01326     {
01327         node->_state      = STATE_STOPPED;
01328         node->_connection = 0;
01329 
01330         _removeConnection( connection );
01331         EQASSERT( _connectionNodes.find( connection )!=_connectionNodes.end( ));
01332 
01333         _connectionNodes.erase( connection );
01334         _nodes.erase( node->_id );
01335 
01336         EQINFO << node << " disconnected from " << this << " connection used " 
01337                << connection->getRefCount() << endl;
01338     }
01339 
01340     EQASSERT( node->_state == STATE_STOPPED );
01341     _requestHandler.serveRequest( packet->requestID );
01342     return COMMAND_HANDLED;
01343 }
01344 
01345 CommandResult Node::_cmdGetNodeData( Command& command)
01346 {
01347     const NodeGetNodeDataPacket* packet = 
01348         command.getPacket<NodeGetNodeDataPacket>();
01349     EQINFO << "cmd get node data: " << packet << endl;
01350 
01351     NodeID nodeID = packet->nodeID;
01352     nodeID.convertToHost();
01353 
01354     NodePtr descNode = getNode( nodeID );
01355     
01356     NodeGetNodeDataReplyPacket reply( packet );
01357 
01358     string nodeData;
01359     if( descNode.isValid( ))
01360     {
01361         reply.type = descNode->getType();
01362         nodeData   = descNode->serialize();
01363     }
01364     else
01365     {
01366         EQINFO << "Node " << nodeID << " unknown" << endl;
01367         reply.type = TYPE_EQNET_INVALID;
01368     }
01369 
01370     NodePtr node = command.getNode();
01371     node->send( reply, nodeData );
01372     EQINFO << "Sent node data " << nodeData << " to " << node << endl;
01373     return COMMAND_HANDLED;
01374 }
01375 
01376 CommandResult Node::_cmdGetNodeDataReply( Command& command )
01377 {
01378     NodeGetNodeDataReplyPacket* packet = 
01379         command.getPacket<NodeGetNodeDataReplyPacket>();
01380     EQINFO << "cmd get node data reply: " << packet << endl;
01381 
01382     const uint32_t requestID = packet->requestID;
01383 
01384     NodeID nodeID = packet->nodeID;
01385     nodeID.convertToHost();
01386 
01387     if( _nodes.find( nodeID ) != _nodes.end( ))
01388     {   
01389         // Requested node connected to us in the meantime
01390         NodePtr node = _nodes[ nodeID ];
01391         EQASSERT( node->isConnected( ));
01392 
01393         node.ref();
01394         _requestHandler.serveRequest( requestID, node.get( ));
01395         return COMMAND_HANDLED;
01396     }
01397 
01398     if( packet->type == TYPE_EQNET_INVALID )
01399     {
01400         _requestHandler.serveRequest( requestID, (void*)0 );
01401         return COMMAND_HANDLED;
01402     }
01403         
01404     NodePtr node = createNode( packet->type );
01405     EQASSERT( node.isValid( ));
01406 
01407     string data = packet->nodeData;
01408     if( !node->deserialize( data ))
01409         EQWARN << "Failed to initialize node data" << endl;
01410     EQASSERT( data.empty( ));
01411 
01412     node->setAutoLaunch( false );
01413     node.ref();
01414     _requestHandler.serveRequest( requestID, node.get( ));
01415     return COMMAND_HANDLED;
01416 }
01417 
01418 CommandResult Node::_cmdAcquireSendToken( Command& command )
01419 {
01420     NodeAcquireSendTokenPacket* packet = 
01421         command.getPacket<NodeAcquireSendTokenPacket>();
01422 
01423     if( !_hasSendToken ) // no token available
01424         // HACK: returning not COMMAND_HANDLED causes redispatch, see base.cpp
01425         return COMMAND_ERROR;
01426 
01427     _hasSendToken = false;
01428 
01429     NodeAcquireSendTokenReplyPacket reply( packet );
01430     command.getNode()->send( reply );
01431     return COMMAND_HANDLED;
01432 }
01433 
01434 CommandResult Node::_cmdAcquireSendTokenReply( Command& command )
01435 {
01436     NodeAcquireSendTokenReplyPacket* packet = 
01437         command.getPacket<NodeAcquireSendTokenReplyPacket>();
01438 
01439     _requestHandler.serveRequest( packet->requestID );
01440     return COMMAND_HANDLED;
01441 }
01442 
01443 CommandResult Node::_cmdReleaseSendToken( Command& command )
01444 {
01445     EQASSERT( !_hasSendToken );
01446     _hasSendToken = true;
01447     flushCommands(); // redispatch pending commands
01448     return COMMAND_HANDLED;
01449 }
01450 
01451 //----------------------------------------------------------------------
01452 // Connecting and launching a node
01453 //----------------------------------------------------------------------
01454 bool Node::connect( NodePtr node )
01455 {
01456     if( node->getState() == STATE_CONNECTED ||
01457         node->getState() == STATE_LISTENING )
01458 
01459         return true;
01460 
01461     if( !initConnect( node ))
01462     {
01463         EQERROR << "Connection initialization to " << node << " failed." 
01464                 << endl;
01465         return false;
01466     }
01467 
01468     return syncConnect( node );
01469 }
01470 
01471 bool Node::initConnect( NodePtr node )
01472 {
01473     EQASSERTINFO( _state == STATE_LISTENING, _state );
01474     if( node->getState() == STATE_CONNECTED ||
01475         node->getState() == STATE_LISTENING )
01476     {
01477         return true;
01478     }
01479 
01480     EQASSERT( node->getState() == STATE_STOPPED );
01481 
01482     // try connecting first
01483     const ConnectionDescriptionVector& cds = node->getConnectionDescriptions();
01484     for( ConnectionDescriptionVector::const_iterator i = cds.begin();
01485         i != cds.end(); ++i )
01486     {
01487         ConnectionDescriptionPtr description = *i;
01488         ConnectionPtr connection = Connection::create( description );
01489 
01490         if( !connection->connect( ))
01491             continue;
01492 
01493         if( !connect( node, connection ))
01494             return false;
01495 
01496         return true;
01497     }
01498 
01499     EQINFO << "Node could not be connected." << endl;
01500     if( !node->_autoLaunch )
01501         return false;
01502     
01503     EQINFO << "Attempting to launch node." << endl;
01504     for( ConnectionDescriptionVector::const_iterator i = cds.begin();
01505         i != cds.end(); ++i )
01506     {
01507         ConnectionDescriptionPtr description = *i;
01508 
01509         if( _launch( node, description ))
01510             return true;
01511     }
01512 
01513     return false;
01514 }
01515 
01516 bool Node::syncConnect( NodePtr node )
01517 {
01518     if( node->_launchID == EQ_ID_INVALID )
01519         return ( node->getState() == STATE_CONNECTED );
01520 
01521     void*          ret;
01522     const float    time    = -( node->_launchTimeout.getTimef( )); 
01523     const uint32_t timeout = static_cast< uint32_t >((time > 0) ? time : 0);
01524 
01525     if( _requestHandler.waitRequest( node->_launchID, ret, timeout ))
01526     {
01527         EQASSERT( node->getState() == STATE_CONNECTED );
01528         node->_launchID = EQ_ID_INVALID;
01529         return true;
01530     }
01531 
01532     node->_state = STATE_STOPPED;
01533     _requestHandler.unregisterRequest( node->_launchID );
01534     node->_launchID = EQ_ID_INVALID;
01535     return false;
01536 }
01537 
01538 NodePtr Node::connect( const NodeID& nodeID )
01539 {
01540     EQASSERT( nodeID != NodeID::ZERO );
01541     
01542     // Extract all node pointers - _nodes hash might be modified later
01543     NodeVector nodes;
01544     for( base::UUIDHash< NodePtr >::const_iterator i = _nodes.begin();
01545          i != _nodes.end(); ++i )
01546     {
01547         NodePtr node = i->second;
01548 
01549         if( node->getNodeID() == nodeID ) // early out
01550             return node;
01551 
01552         nodes.push_back( node );
01553     }
01554 
01555     for( NodeVector::const_iterator i = nodes.begin(); i != nodes.end(); ++i )
01556     {
01557         NodePtr result = _connect( nodeID, *i );
01558         if( result.isValid( ))
01559             return result;
01560     }
01561 
01562     return 0;
01563 }
01564 
01565 NodePtr Node::_connect( const NodeID& nodeID, NodePtr server )
01566 {
01567     EQASSERT( nodeID != NodeID::ZERO );
01568 
01569     base::UUIDHash< NodePtr >::const_iterator iter = _nodes.find( nodeID );
01570     if( iter != _nodes.end( ))
01571         return iter->second;
01572 
01573     // Make sure that only one connection request based on the node identifier
01574     // is pending at a given time. Otherwise a node with the same id might be
01575     // instantiated twice in _cmdGetNodeDataReply(). The alternative to this
01576     // mutex is to register connecting nodes with this local node, and handle
01577     // all cases correctly, which is far more complex. Node connections only
01578     // happen a lot during initialization, and are therefore not time-critical.
01579     ScopedMutex mutex( _connectMutex );
01580     EQINFO << "Connecting node " << nodeID << endl;
01581 
01582     iter = _nodes.find( nodeID );
01583     if( iter != _nodes.end( ))
01584         return iter->second;
01585  
01586     EQASSERT( _id != nodeID );
01587     NodeGetNodeDataPacket packet;
01588     packet.requestID = _requestHandler.registerRequest();
01589     packet.nodeID    = nodeID;
01590     packet.nodeID.convertToNetwork();
01591 
01592     server->send( packet );
01593 
01594     void* result = 0;
01595     _requestHandler.waitRequest( packet.requestID, result );
01596 
01597     if( !result )
01598     {
01599         EQWARN << "Node not found on server" << endl;
01600         return 0;
01601     }
01602 
01603     EQASSERT( dynamic_cast< Node* >( (Dispatcher*)result ));
01604     NodePtr node = static_cast< Node* >( result );
01605     node.unref(); // ref'd before serveRequest()
01606     
01607     if( node->isConnected( ))
01608         return node;
01609 
01610     if( !connect( node ))
01611     {
01612         // connect failed - maybe simultaneous connect from peer?
01613         iter = _nodes.find( nodeID );
01614         if( iter != _nodes.end( ))
01615         {
01616             node = iter->second;
01617             EQASSERT( node->isConnected( ));
01618             return node;
01619         }
01620         
01621         EQWARN << "Node connection failed" << endl;
01622         return 0;
01623     }
01624 
01625     return node;
01626 }
01627 
01628 bool Node::_launch( NodePtr node,
01629                     ConnectionDescriptionPtr description )
01630 {
01631     EQASSERT( node->getState() == STATE_STOPPED );
01632 
01633     node->_launchID = _requestHandler.registerRequest( node.get() );
01634     node->_launchTimeout.setAlarm( 
01635         static_cast< float >( description->launchTimeout ));
01636 
01637     const string launchCommand = _createLaunchCommand( node, description );
01638 
01639     if( !Launcher::run( launchCommand ))
01640     {
01641         EQWARN << "Could not launch node using '" << launchCommand << "'" 
01642                << endl;
01643         _requestHandler.unregisterRequest( node->_launchID );
01644         node->_launchID = EQ_ID_INVALID;
01645         return false;
01646     }
01647     
01648     node->_state = STATE_LAUNCHED;
01649     return true;
01650 }
01651 
01652 string Node::_createLaunchCommand( NodePtr node,
01653                                    ConnectionDescriptionPtr description )
01654 {
01655     const string& launchCommand    = description->getLaunchCommand();
01656     const size_t  launchCommandLen = launchCommand.size();
01657     const char    quote            = description->launchCommandQuote;
01658 
01659     bool          commandFound     = false;
01660     size_t        lastPos          = 0;
01661     string        result;
01662 
01663     for( size_t percentPos = launchCommand.find( '%' );
01664          percentPos != string::npos; 
01665          percentPos = launchCommand.find( '%', percentPos+1 ))
01666     {
01667         ostringstream replacement;
01668         switch( launchCommand[percentPos+1] )
01669         {
01670             case 'c':
01671             {
01672                 replacement << _createRemoteCommand( node, quote );
01673                 commandFound = true;
01674                 break;
01675             }
01676             case 'h':
01677                 replacement << description->getHostname();
01678                 break;
01679 
01680             case 'n':
01681                 replacement << node->getNodeID();
01682                 break;
01683 
01684             default:
01685                 EQWARN << "Unknown token " << launchCommand[percentPos+1] 
01686                        << endl;
01687         }
01688 
01689         result += launchCommand.substr( lastPos, percentPos-lastPos );
01690         if( !replacement.str().empty( ))
01691             result += replacement.str();
01692 
01693         lastPos  = percentPos+2;
01694     }
01695 
01696     result += launchCommand.substr( lastPos, launchCommandLen-lastPos );
01697 
01698     if( !commandFound )
01699         result += " " + _createRemoteCommand( node, quote );
01700 
01701     EQINFO << "Launch command: " << result << endl;
01702     return result;
01703 }
01704 
01705 string Node::_createRemoteCommand( NodePtr node, const char quote )
01706 {
01707     if( getState() != STATE_LISTENING )
01708     {
01709         EQERROR << "Node is not listening, can't launch " << this << endl;
01710         return "";
01711     }
01712 
01713     ostringstream stringStream;
01714 
01715     //----- environment
01716 #ifndef WIN32
01717 #  ifdef Darwin
01718     const char libPath[] = "DYLD_LIBRARY_PATH";
01719 #  else
01720     const char libPath[] = "LD_LIBRARY_PATH";
01721 #  endif
01722 
01723     stringStream << "env "; // XXX
01724     char* env = getenv( libPath );
01725     if( env )
01726         stringStream << libPath << "=" << env << " ";
01727 
01728     for( int i=0; environ[i] != 0; i++ )
01729         if( strlen( environ[i] ) > 2 && strncmp( environ[i], "EQ_", 3 ) == 0 )
01730             stringStream << environ[i] << " ";
01731 
01732     stringStream << "EQ_LOG_LEVEL=" << Log::getLogLevelString() << " ";
01733     if( eq::base::Log::topics != 0 )
01734         stringStream << "EQ_LOG_TOPICS=" << Log::topics << " ";
01735 #endif // WIN32
01736 
01737     //----- program + args
01738     string program = node->_programName;
01739 #ifdef WIN32
01740     EQASSERT( program.length() > 2 );
01741     if( !( program[1] == ':' && (program[2] == '/' || program[2] == '\\' )) &&
01742         // !( drive letter and full path present )
01743         !( program[0] == '/' || program[0] == '\\' ))
01744         // !full path without drive letter
01745 
01746         program = node->_workDir + '/' + program; // add _workDir to rel. path
01747 #else
01748     if( program[0] != '/' )
01749         program = node->_workDir + '/' + program;
01750 #endif
01751 
01752     const string ownData    = serialize();
01753     const string remoteData = node->serialize();
01754 
01755     stringStream
01756         << quote << program << quote << " -- --eq-client " << quote
01757         << remoteData << node->_launchID << SEPARATOR << node->_workDir 
01758         << SEPARATOR << node->_id << SEPARATOR << getType() << SEPARATOR
01759         << ownData << quote;
01760 
01761     return stringStream.str();
01762 }
01763 
01764 bool Node::runClient( const std::string& clientArgs )
01765 {
01766     EQASSERT( _state == STATE_LISTENING );
01767 
01768     size_t nextPos = clientArgs.find( SEPARATOR );
01769     if( nextPos == string::npos )
01770     {
01771         EQERROR << "Could not parse request identifier: " << clientArgs << endl;
01772         return false;
01773     }
01774 
01775     const string   request     = clientArgs.substr( 0, nextPos );
01776     string         description = clientArgs.substr( nextPos + 1 );
01777     const uint32_t launchID    = strtoul( request.c_str(), 0, 10 );
01778 
01779     nextPos = description.find( SEPARATOR );
01780     if( nextPos == string::npos )
01781     {
01782         EQERROR << "Could not parse working directory: " << description 
01783                 << " is left from " << clientArgs << endl;
01784         return false;
01785     }
01786 
01787     const string workDir = description.substr( 0, nextPos );
01788     description          = description.substr( nextPos + 1 );
01789 
01790     Global::setWorkDir( workDir );
01791     if( !workDir.empty() && chdir( workDir.c_str( )) == -1 )
01792         EQWARN << "Can't change working directory to " << workDir << ": "
01793                << strerror( errno ) << endl;
01794     
01795     EQINFO << "Launching node with launch ID=" << launchID << ", cwd="
01796            << workDir << endl;
01797 
01798     nextPos = description.find( SEPARATOR );
01799     if( nextPos == string::npos )
01800     {
01801         EQERROR << "Could not parse node identifier: " << description
01802                 << " is left from " << clientArgs << endl;
01803         return false;
01804     }
01805     _id         = description.substr( 0, nextPos );
01806     description = description.substr( nextPos + 1 );
01807 
01808     nextPos = description.find( SEPARATOR );
01809     if( nextPos == string::npos )
01810     {
01811         EQERROR << "Could not parse server node type: " << description
01812                 << " is left from " << clientArgs << endl;
01813         return false;
01814     }
01815     const string nodeType = description.substr( 0, nextPos );
01816     description           = description.substr( nextPos + 1 );
01817     const uint32_t   type = atoi( nodeType.c_str( ));
01818 
01819     RefPtr< Node >   node = createNode( type );
01820     if( !node )
01821     {
01822         EQERROR << "Can't create server node" << endl;
01823         return false;
01824     }
01825     
01826     node->setAutoLaunch( false );
01827     node->_launchID = launchID;
01828 
01829     if( !node->deserialize( description ))
01830         EQWARN << "Can't parse node data" << endl;
01831 
01832     if( !connect( node ))
01833     {
01834         EQERROR << "Can't connect node" << endl;
01835         return false;
01836     }
01837 
01838     return clientLoop();
01839 }
01840 
01841 
01842 EQ_EXPORT std::ostream& operator << ( std::ostream& os, const Node::State state)
01843 {
01844     os << ( state == Node::STATE_STOPPED ? "stopped" :
01845             state == Node::STATE_LAUNCHED ? "launched" :
01846             state == Node::STATE_CONNECTED ? "connected" :
01847             state == Node::STATE_LISTENING ? "listening" : "ERROR" );
01848     return os;
01849 }
01850 
01851 }
01852 }
Generated on Mon Aug 10 18:58:40 2009 for Equalizer 0.9 by  doxygen 1.5.8