00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "node.h"
00019
00020 #include "command.h"
00021 #include "connectionSet.h"
00022 #include "global.h"
00023 #include "pipeConnection.h"
00024 #include "session.h"
00025 #include "socketConnection.h"
00026
00027 #include <eq/base/base.h>
00028 #include <eq/base/launcher.h>
00029 #include <eq/base/rng.h>
00030
00031 #include <errno.h>
00032 #include <sstream>
00033 #include <sys/stat.h>
00034 #include <sys/types.h>
00035
00036 #ifdef WIN32_API
00037 # include <direct.h>
00038 # define chdir _chdir
00039 #endif
00040
00041 using namespace eq::base;
00042 using namespace std;
00043
00044 namespace eq
00045 {
00046 namespace net
00047 {
00048
00049
00050
00051 Node::Node()
00052 : _requestHandler( true )
00053 , _autoLaunch( false )
00054 , _id( true )
00055 , _state( STATE_STOPPED )
00056 , _launchID( EQ_ID_INVALID )
00057 , _programName( Global::getProgramName( ))
00058 , _workDir( Global::getWorkDir( ))
00059 , _hasSendToken( true )
00060 {
00061 _receiverThread = new ReceiverThread( this );
00062 _commandThread = new CommandThread( this );
00063
00064 registerCommand( CMD_NODE_STOP,
00065 CommandFunc<Node>( this, &Node::_cmdStop ),
00066 &_commandThreadQueue );
00067 registerCommand( CMD_NODE_REGISTER_SESSION,
00068 CommandFunc<Node>( this, &Node::_cmdRegisterSession ),
00069 &_commandThreadQueue );
00070 registerCommand( CMD_NODE_REGISTER_SESSION_REPLY,
00071 CommandFunc<Node>( this, &Node::_cmdRegisterSessionReply ),
00072 &_commandThreadQueue );
00073 registerCommand( CMD_NODE_MAP_SESSION,
00074 CommandFunc<Node>( this, &Node::_cmdMapSession ),
00075 &_commandThreadQueue );
00076 registerCommand( CMD_NODE_MAP_SESSION_REPLY,
00077 CommandFunc<Node>( this, &Node::_cmdMapSessionReply ),
00078 &_commandThreadQueue );
00079 registerCommand( CMD_NODE_UNMAP_SESSION,
00080 CommandFunc<Node>( this, &Node::_cmdUnmapSession ),
00081 &_commandThreadQueue );
00082 registerCommand( CMD_NODE_UNMAP_SESSION_REPLY,
00083 CommandFunc<Node>( this, &Node::_cmdUnmapSessionReply ),
00084 &_commandThreadQueue );
00085 registerCommand( CMD_NODE_CONNECT,
00086 CommandFunc<Node>( this, &Node::_cmdConnect ), 0 );
00087 registerCommand( CMD_NODE_CONNECT_REPLY,
00088 CommandFunc<Node>( this, &Node::_cmdConnectReply ), 0 );
00089 registerCommand( CMD_NODE_DISCONNECT,
00090 CommandFunc<Node>( this, &Node::_cmdDisconnect ), 0 );
00091 registerCommand( CMD_NODE_GET_NODE_DATA,
00092 CommandFunc<Node>( this, &Node::_cmdGetNodeData),
00093 &_commandThreadQueue );
00094 registerCommand( CMD_NODE_GET_NODE_DATA_REPLY,
00095 CommandFunc<Node>( this, &Node::_cmdGetNodeDataReply ),
00096 &_commandThreadQueue );
00097 registerCommand( CMD_NODE_ACQUIRE_SEND_TOKEN,
00098 CommandFunc<Node>( this, &Node::_cmdAcquireSendToken ), 0);
00099 registerCommand( CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY,
00100 CommandFunc<Node>( this, &Node::_cmdAcquireSendTokenReply )
00101 , 0 );
00102 registerCommand( CMD_NODE_RELEASE_SEND_TOKEN,
00103 CommandFunc<Node>( this, &Node::_cmdReleaseSendToken ), 0);
00104
00105 EQINFO << "New Node @" << (void*)this << " " << _id << endl;
00106 }
00107
00108 Node::~Node()
00109 {
00110 EQINFO << "Delete Node @" << (void*)this << " " << _id << endl;
00111 EQASSERT( _connection == 0 );
00112 EQASSERT( _connectionSet.empty( ));
00113 EQASSERT( _connectionNodes.empty( ));
00114 EQASSERT( _pendingCommands.empty( ));
00115 EQASSERT( _nodes.empty( ));
00116 EQASSERT( _requestHandler.isEmpty( ));
00117
00118 #ifndef NDEBUG
00119 if( !_sessions.empty( ))
00120 {
00121 EQINFO << _sessions.size() << " mapped sessions" << endl;
00122
00123 for( SessionHash::const_iterator i = _sessions.begin();
00124 i != _sessions.end(); ++i )
00125 {
00126 const Session* session = i->second;
00127 EQINFO << " Session " << session->getID() << std::endl;
00128 }
00129 }
00130 #endif
00131
00132 EQASSERT( _sessions.empty( ));
00133
00134 EQASSERT( !_commandThread->isRunning( ));
00135 delete _commandThread;
00136 _commandThread = 0;
00137
00138 EQASSERT( !_receiverThread->isRunning( ));
00139 delete _receiverThread;
00140 _receiverThread = 0;
00141
00142 _connectionDescriptions.clear();
00143 }
00144
00145 bool Node::operator == ( const Node* node ) const
00146 {
00147 EQASSERTINFO( _id != node->_id || this == node,
00148 "Two node instances with the same ID found "
00149 << (void*)this << " and " << (void*)node );
00150
00151 return ( this == node );
00152 }
00153
00154 void Node::setProgramName( const std::string& name )
00155 {
00156 _programName = name;
00157 }
00158
00159 void Node::setWorkDir( const std::string& name )
00160 {
00161 _workDir = name;
00162 }
00163
00164 const ConnectionDescriptionVector& Node::getConnectionDescriptions() const
00165 {
00166 return _connectionDescriptions;
00167 }
00168
00169 bool Node::initLocal( const int argc, char** argv )
00170 {
00171 #ifndef NDEBUG
00172 EQINFO << disableFlush << "args: ";
00173 for( int i=0; i<argc; i++ )
00174 EQINFO << argv[i] << ", ";
00175 EQINFO << endl << enableFlush;
00176 #endif
00177
00178
00179
00180
00181
00182
00183 bool isClient = false;
00184 bool isResident = false;
00185 string clientOpts;
00186
00187 for( int i=1; i<argc; ++i )
00188 {
00189 if( string( "--eq-listen" ) == argv[i] )
00190 {
00191 if( i<argc && argv[i+1][0] != '-' )
00192 {
00193 string data = argv[++i];
00194 ConnectionDescriptionPtr desc = new ConnectionDescription;
00195 desc->TCPIP.port = Global::getDefaultPort();
00196
00197 if( desc->fromString( data ))
00198 {
00199 addConnectionDescription( desc );
00200 EQASSERTINFO( data.empty(), data );
00201 }
00202 else
00203 EQWARN << "Ignoring listen option: " << argv[i] << endl;
00204 }
00205 }
00206 else if( string( "--eq-client" ) == argv[i] )
00207 {
00208 isClient = true;
00209 if( i < argc-1 && argv[i+1][0] != '-' )
00210 {
00211 clientOpts = argv[++i];
00212
00213 if( !deserialize( clientOpts ))
00214 EQWARN << "Failed to parse client listen port parameters"
00215 << endl;
00216 EQASSERT( !clientOpts.empty( ));
00217 }
00218 else
00219 isResident = true;
00220 }
00221 }
00222
00223 if( _connectionDescriptions.empty( ))
00224 {
00225 ConnectionDescriptionPtr connDesc = new ConnectionDescription;
00226 connDesc->type = CONNECTIONTYPE_TCPIP;
00227 connDesc->TCPIP.port = Global::getDefaultPort();
00228 addConnectionDescription( connDesc );
00229 }
00230
00231 EQINFO << "Listener data: " << serialize() << endl;
00232
00233 if( !listen( ))
00234 {
00235 EQWARN << "Can't setup listener(s)" << endl;
00236 return false;
00237 }
00238
00239 if( isClient )
00240 {
00241 EQINFO << "Client node started from command line with option "
00242 << clientOpts << endl;
00243
00244 bool ret = (isResident ? clientLoop() : runClient( clientOpts ));
00245
00246 EQINFO << "Exit node process " << getRefCount() << endl;
00247 ret &= exitClient();
00248 ::exit( ret ? EXIT_SUCCESS : EXIT_FAILURE );
00249 }
00250
00251 return true;
00252 }
00253
00254 bool Node::listen()
00255 {
00256 if( _state != STATE_STOPPED )
00257 return false;
00258
00259 if( !_connectSelf( ))
00260 return false;
00261
00262 for( ConnectionDescriptionVector::const_iterator i =
00263 _connectionDescriptions.begin();
00264 i != _connectionDescriptions.end(); ++i )
00265 {
00266 ConnectionDescriptionPtr description = *i;
00267 ConnectionPtr connection = Connection::create( description );
00268
00269 if( !connection->listen( ))
00270 {
00271 EQWARN << "Can't create listener connection: " << description
00272 << endl;
00273 return false;
00274 }
00275
00276 _connectionNodes[ connection ] = this;
00277 _connectionSet.addConnection( connection );
00278 EQVERB << "Added node " << _id << " using " << connection << std::endl;
00279
00280 connection->acceptNB();
00281 }
00282
00283 _state = STATE_LISTENING;
00284
00285 EQVERB << typeid(*this).name() << " starting command and receiver thread "
00286 << endl;
00287 _receiverThread->start();
00288
00289 EQINFO << this << " listening." << endl;
00290 return true;
00291 }
00292
00293 bool Node::stopListening()
00294 {
00295 if( _state != STATE_LISTENING )
00296 return false;
00297
00298 NodeStopPacket packet;
00299 send( packet );
00300
00301 EQCHECK( _receiverThread->join( ));
00302 _cleanup();
00303
00304 EQINFO << _connectionSet.size() << " connections open after stopListening"
00305 << endl;
00306 #ifndef NDEBUG
00307 const ConnectionVector& connections = _connectionSet.getConnections();
00308 for( ConnectionVector::const_iterator i = connections.begin();
00309 i != connections.end(); ++i )
00310 {
00311 EQINFO << " " << *i << endl;
00312 }
00313 #endif
00314
00315 EQASSERT( _requestHandler.isEmpty( ));
00316 return true;
00317 }
00318
00319 void Node::_addConnection( ConnectionPtr connection )
00320 {
00321 _connectionSet.addConnection( connection );
00322 connection->recvNB( new uint64_t, sizeof( uint64_t ));
00323 }
00324
00325 void Node::_removeConnection( ConnectionPtr connection )
00326 {
00327 EQASSERT( connection.isValid( ));
00328
00329 _connectionSet.removeConnection( connection );
00330
00331 void* buffer( 0 );
00332 if( !connection->isListening( ))
00333 {
00334 uint64_t bytes( 0 );
00335 connection->getRecvData( &buffer, &bytes );
00336 EQASSERT( buffer );
00337 EQASSERT( bytes == sizeof( uint64_t ));
00338 }
00339
00340 if( !connection->isClosed( ))
00341 connection->close();
00342 delete reinterpret_cast< uint64_t* >( buffer );
00343 }
00344
00345 void Node::_cleanup()
00346 {
00347 EQVERB << "Clean up stopped node" << endl;
00348 EQASSERTINFO( _state == STATE_STOPPED, _state );
00349
00350 _removeConnection( _connection );
00351 _connectionNodes.erase( _connection );
00352 _connection = 0;
00353
00354 const ConnectionVector& connections = _connectionSet.getConnections();
00355 while( !connections.empty( ))
00356 {
00357 ConnectionPtr connection = connections.back();
00358 NodePtr node = _connectionNodes[ connection ];
00359
00360 node->_state = STATE_STOPPED;
00361 node->_connection = 0;
00362 _connectionNodes.erase( connection );
00363 _nodes.erase( node->_id );
00364
00365 _removeConnection( connection );
00366 }
00367
00368 if( !_connectionNodes.empty( ))
00369 EQINFO << _connectionNodes.size() << " open connections during cleanup"
00370 << endl;
00371 #ifndef NDEBUG
00372 for( ConnectionNodeHash::const_iterator i = _connectionNodes.begin();
00373 i != _connectionNodes.end(); ++i )
00374 {
00375 NodePtr node = i->second;
00376 EQINFO << " " << i->first << " : " << node << endl;
00377 EQINFO << " Node ref count " << node->getRefCount() - 1
00378 << ' ' << node->_connection << ' ' << node->_state
00379 << ( node == this ? " self" : "" ) << endl;
00380 }
00381 #endif
00382
00383 _connectionNodes.clear();
00384
00385 if( !_nodes.empty( ))
00386 EQINFO << _nodes.size() << " nodes connected during cleanup" << endl;
00387
00388 #ifndef NDEBUG
00389 for( base::UUIDHash< NodePtr >::const_iterator i = _nodes.begin();
00390 i != _nodes.end(); ++i )
00391 {
00392 NodePtr node = i->second;
00393 EQINFO << " " << node << " ref count " << node->getRefCount() - 1
00394 << ' ' << node->_connection << ' ' << node->_state
00395 << ( node == this ? " self" : "" ) << endl;
00396 }
00397 #endif
00398
00399 _nodes.clear();
00400 }
00401
00402 bool Node::_connectSelf()
00403 {
00404
00405 _connection = new PipeConnection;
00406 if( !_connection->connect())
00407 {
00408 EQERROR << "Could not create local connection to receiver thread."
00409 << endl;
00410 _connection = 0;
00411 return false;
00412 }
00413
00414
00415 EQASSERT( _connection->getDescription().isValid( ));
00416 EQASSERT( _connectionNodes.find( _connection ) == _connectionNodes.end( ));
00417
00418 _connectionNodes[ _connection ] = this;
00419 _nodes[ _id ] = this;
00420
00421 _addConnection( _connection );
00422
00423 EQVERB << "Added node " << _id << " using " << _connection << std::endl;
00424 return true;
00425 }
00426
00427 bool Node::connect( NodePtr node, ConnectionPtr connection )
00428 {
00429 EQASSERT( connection.isValid( ));
00430
00431 if( !node.isValid() || _state != STATE_LISTENING ||
00432 !connection->isConnected() || node->_state != STATE_STOPPED )
00433 {
00434 return false;
00435 }
00436
00437 _addConnection( connection );
00438
00439
00440 NodeConnectPacket packet;
00441 packet.requestID = _requestHandler.registerRequest( node.get( ));
00442 packet.nodeID = _id;
00443 packet.nodeID.convertToNetwork();
00444 packet.type = getType();
00445 packet.launchID = node->_launchID;
00446 node->_launchID = EQ_ID_INVALID;
00447 connection->send( packet, serialize( ));
00448
00449 bool connected = false;
00450 _requestHandler.waitRequest( packet.requestID, connected );
00451 if( !connected )
00452 return false;
00453
00454 EQASSERT( node->_id != NodeID::ZERO );
00455 EQASSERTINFO( node->_id != _id, _id );
00456 EQINFO << node << " connected to " << this << endl;
00457 return true;
00458 }
00459
00460 NodePtr Node::getNode( const NodeID& id ) const
00461 {
00462 base::UUIDHash< NodePtr >::const_iterator iter = _nodes.find( id );
00463 if( iter == _nodes.end( ))
00464 return 0;
00465 return iter->second;
00466 }
00467
00468 bool Node::disconnect( NodePtr node )
00469 {
00470 if( !node || _state != STATE_LISTENING )
00471 return false;
00472
00473 if( node->_state != STATE_CONNECTED )
00474 return true;
00475
00476 EQASSERT( !inCommandThread( ));
00477
00478 NodeDisconnectPacket packet;
00479 packet.requestID = _requestHandler.registerRequest( node.get( ));
00480 send( packet );
00481
00482 _requestHandler.waitRequest( packet.requestID );
00483 return true;
00484 }
00485
00486 void Node::addConnectionDescription( ConnectionDescriptionPtr cd )
00487 {
00488 _connectionDescriptions.push_back( cd );
00489 }
00490
00491
00492
00493
00494 void Node::_addSession( Session* session, NodePtr server,
00495 const uint32_t sessionID )
00496 {
00497 CHECK_THREAD( _thread );
00498
00499 session->_server = server;
00500 session->_id = sessionID;
00501 session->_isMaster = ( server==this && isLocal( ));
00502 session->_setLocalNode( this );
00503 if( session->_isMaster )
00504 session->_idPool.freeIDs( 1, IDPool::MAX_CAPACITY );
00505
00506 EQASSERTINFO( _sessions.find( sessionID ) == _sessions.end(),
00507 "Session " << sessionID << " @" << (void*)session
00508 << " already mapped on " << this << " @"
00509 << (void*)_sessions[ sessionID ] );
00510 _sessions[ sessionID ] = session;
00511
00512 EQINFO << (session->_isMaster ? "master" : "client") << " session, id "
00513 << sessionID << ", served by " << server.get() << ", mapped on "
00514 << this << endl;
00515 }
00516
00517 void Node::_removeSession( Session* session )
00518 {
00519 CHECK_THREAD( _thread );
00520 EQASSERT( _sessions.find( session->getID( )) != _sessions.end( ));
00521
00522 _sessions.erase( session->getID( ));
00523 EQINFO << "Erased session " << session->getID() << ", " << _sessions.size()
00524 << " left" << std::endl;
00525
00526 session->_setLocalNode( 0 );
00527 session->_server = 0;
00528 session->_id = EQ_ID_INVALID;
00529 session->_isMaster = false;
00530 }
00531
00532 bool Node::registerSession( Session* session )
00533 {
00534 EQASSERT( isLocal( ));
00535 EQASSERT( !inCommandThread( ));
00536
00537 NodeRegisterSessionPacket packet;
00538 packet.requestID = _requestHandler.registerRequest( session );
00539 send( packet );
00540
00541 uint32_t sessionID = EQ_ID_INVALID;
00542 _requestHandler.waitRequest( packet.requestID, sessionID );
00543 return (sessionID == session->getID( ));
00544 }
00545
00546 bool Node::mapSession( NodePtr server, Session* session, const uint32_t id )
00547 {
00548 EQASSERT( isLocal( ));
00549 EQASSERT( id != EQ_ID_INVALID );
00550 EQASSERT( server != this );
00551
00552 NodeMapSessionPacket packet;
00553 packet.requestID = _requestHandler.registerRequest( session );
00554 packet.sessionID = id;
00555 server->send( packet );
00556
00557 uint32_t sessionID = EQ_ID_INVALID;
00558 _requestHandler.waitRequest( packet.requestID, sessionID );
00559 return (sessionID == session->getID( ));
00560 }
00561
00562 bool Node::unmapSession( Session* session )
00563 {
00564 EQASSERT( isLocal( ));
00565
00566 NodeUnmapSessionPacket packet;
00567 packet.requestID = _requestHandler.registerRequest( session );
00568 packet.sessionID = session->getID();
00569 session->getServer()->send( packet );
00570
00571 bool ret = false;
00572 _requestHandler.waitRequest( packet.requestID, ret );
00573 return ret;
00574 }
00575
00576 Session* Node::getSession( const uint32_t id )
00577 {
00578 SessionHash::const_iterator i = _sessions.find( id );
00579
00580 EQASSERT( i != _sessions.end( ));
00581 if( i == _sessions.end( ))
00582 return 0;
00583
00584 return i->second;
00585 }
00586
00587 uint32_t Node::_generateSessionID()
00588 {
00589 CHECK_THREAD( _thread );
00590 RNG rng;
00591 uint32_t id = rng.get<uint32_t>();
00592
00593 while( id == EQ_ID_INVALID || _sessions.find( id ) != _sessions.end( ))
00594 id = rng.get<uint32_t>();
00595
00596 return id;
00597 }
00598
00599 #define SEPARATOR '#'
00600
00601 std::string Node::serialize() const
00602 {
00603 ostringstream data;
00604 data << _id << SEPARATOR << _connectionDescriptions.size() << SEPARATOR;
00605
00606 for( vector< ConnectionDescriptionPtr >::const_iterator i =
00607 _connectionDescriptions.begin();
00608 i != _connectionDescriptions.end(); ++i )
00609 {
00610 ConnectionDescriptionPtr desc = *i;
00611 desc->serialize( data );
00612 }
00613
00614 return data.str();
00615 }
00616
00617 bool Node::deserialize( std::string& data )
00618 {
00619 EQASSERT( getState() == STATE_STOPPED || getState() == STATE_LAUNCHED );
00620
00621 EQINFO << "Node data: " << data << endl;
00622 if( !_connectionDescriptions.empty( ))
00623 EQWARN << "Node already holds data while deserializing it" << endl;
00624
00625
00626 size_t nextPos = data.find( SEPARATOR );
00627 if( nextPos == string::npos || nextPos == 0 )
00628 {
00629 EQERROR << "Could not parse node data" << endl;
00630 return false;
00631 }
00632
00633 _id = data.substr( 0, nextPos );
00634 data = data.substr( nextPos + 1 );
00635
00636
00637 nextPos = data.find( SEPARATOR );
00638 if( nextPos == string::npos || nextPos == 0 )
00639 {
00640 EQERROR << "Could not parse node data" << endl;
00641 return false;
00642 }
00643
00644 const string sizeStr = data.substr( 0, nextPos );
00645 if( !isdigit( sizeStr[0] ))
00646 {
00647 EQERROR << "Could not parse node data" << endl;
00648 return false;
00649 }
00650
00651 const size_t nDesc = atoi( sizeStr.c_str( ));
00652 data = data.substr( nextPos + 1 );
00653
00654
00655 for( size_t i = 0; i<nDesc; ++i )
00656 {
00657 ConnectionDescriptionPtr desc = new ConnectionDescription;
00658 if( !desc->fromString( data ))
00659 {
00660 EQERROR << "Error during node connection data parsing" << endl;
00661 return false;
00662 }
00663 addConnectionDescription( desc );
00664 }
00665
00666 return true;
00667 }
00668
00669 NodePtr Node::createNode( const uint32_t type )
00670 {
00671 EQASSERTINFO( type == TYPE_EQNET_NODE, type );
00672 return new Node();
00673 }
00674
00675
00676 void Node::acquireSendToken( NodePtr node )
00677 {
00678 NodeAcquireSendTokenPacket packet;
00679 packet.requestID = _requestHandler.registerRequest();
00680 node->send( packet );
00681 _requestHandler.waitRequest( packet.requestID );
00682 }
00683
00684 void Node::releaseSendToken( NodePtr node )
00685 {
00686 NodeReleaseSendTokenPacket packet;
00687 node->send( packet );
00688 }
00689
00690
00691
00692
00693 void* Node::_runReceiverThread()
00694 {
00695 EQINFO << "Entered receiver thread of " << typeid( *this ).name() << endl;
00696 _commandThread->start();
00697
00698 int nErrors = 0;
00699 while( _state == STATE_LISTENING )
00700 {
00701 const ConnectionSet::Event result = _connectionSet.select();
00702 switch( result )
00703 {
00704 case ConnectionSet::EVENT_CONNECT:
00705 _handleConnect();
00706 break;
00707
00708 case ConnectionSet::EVENT_DATA:
00709 _handleData();
00710 break;
00711
00712 case ConnectionSet::EVENT_DISCONNECT:
00713 case ConnectionSet::EVENT_INVALID_HANDLE:
00714 {
00715 _handleDisconnect();
00716 EQVERB << &_connectionSet << endl;
00717 break;
00718 }
00719
00720 case ConnectionSet::EVENT_TIMEOUT:
00721 EQINFO << "select timeout" << endl;
00722 break;
00723
00724 case ConnectionSet::EVENT_ERROR:
00725 ++nErrors;
00726 EQWARN << "Connection signalled error during select" << endl;
00727 if( nErrors > 100 )
00728 {
00729 EQWARN << "Too many errors in a row, capping connection"
00730 << endl;
00731 _handleDisconnect();
00732 }
00733 break;
00734
00735 case ConnectionSet::EVENT_SELECT_ERROR:
00736 EQWARN << "Error during select" << endl;
00737 ++nErrors;
00738 if( nErrors > 10 )
00739 {
00740 EQWARN << "Too many errors in a row" << endl;
00741 EQUNIMPLEMENTED;
00742 }
00743 break;
00744
00745 case ConnectionSet::EVENT_INTERRUPT:
00746 _redispatchCommands();
00747 break;
00748
00749 default:
00750 EQUNIMPLEMENTED;
00751 }
00752 if( result != ConnectionSet::EVENT_ERROR &&
00753 result != ConnectionSet::EVENT_SELECT_ERROR )
00754
00755 nErrors = 0;
00756 }
00757
00758 if( !_pendingCommands.empty( ))
00759 EQWARN << _pendingCommands.size()
00760 << " commands pending while leaving command thread" << endl;
00761
00762 for( CommandList::const_iterator i = _pendingCommands.begin();
00763 i != _pendingCommands.end(); ++i )
00764 {
00765 Command* command = *i;
00766 command->release();
00767 }
00768
00769 EQCHECK( _commandThread->join( ));
00770 _pendingCommands.clear();
00771 _commandCache.flush();
00772
00773 EQINFO << "Leaving receiver thread of " << typeid( *this ).name() << endl;
00774 return EXIT_SUCCESS;
00775 }
00776
00777 void Node::_handleConnect()
00778 {
00779 ConnectionPtr connection = _connectionSet.getConnection();
00780 ConnectionPtr newConn = connection->acceptSync();
00781 connection->acceptNB();
00782
00783 if( !newConn )
00784 {
00785 EQINFO << "Received connect event, but accept() failed" << endl;
00786 return;
00787 }
00788
00789 _addConnection( newConn );
00790
00791 }
00792
00793 void Node::_handleDisconnect()
00794 {
00795 while( _handleData( )) ;
00796
00797 ConnectionPtr connection = _connectionSet.getConnection();
00798 NodePtr node;
00799 ConnectionNodeHash::const_iterator i = _connectionNodes.find( connection );
00800 if( i != _connectionNodes.end( ))
00801 node = i->second;
00802
00803 if( node.isValid( ))
00804 {
00805 node->_state = STATE_STOPPED;
00806 node->_connection = 0;
00807 _connectionNodes.erase( connection );
00808 _nodes.erase( node->_id );
00809 }
00810
00811 _removeConnection( connection );
00812
00813 EQINFO << node << " disconnected from " << this << " connection used "
00814 << connection->getRefCount() << endl;
00815 }
00816
00817 bool Node::_handleData()
00818 {
00819 ConnectionPtr connection = _connectionSet.getConnection();
00820 EQASSERT( connection.isValid( ));
00821
00822 NodePtr node;
00823 ConnectionNodeHash::const_iterator i = _connectionNodes.find( connection );
00824 if( i != _connectionNodes.end( ))
00825 node = i->second;
00826 EQASSERTINFO( !node || node->_connection == connection,
00827 typeid( *node.get( )).name( ));
00828
00829 EQVERB << "Handle data from " << node << endl;
00830
00831 void* sizePtr( 0 );
00832 uint64_t bytes( 0 );
00833 const bool gotSize = connection->recvSync( &sizePtr, &bytes );
00834
00835 if( !gotSize )
00836 {
00837 connection->recvNB( sizePtr, sizeof( uint64_t ));
00838 return false;
00839 }
00840
00841 EQASSERT( sizePtr );
00842 const uint64_t size = *reinterpret_cast< uint64_t* >( sizePtr );
00843 EQASSERT( size );
00844 EQASSERT( bytes == sizeof( uint64_t ));
00845 EQASSERT( size > sizeof( size ));
00846
00847 Command& command = _commandCache.alloc( node, this, size );
00848 uint8_t* ptr = reinterpret_cast< uint8_t* >( command.getPacket()) +
00849 sizeof( uint64_t );
00850
00851 connection->recvNB( ptr, size - sizeof( uint64_t ));
00852 const bool gotData = connection->recvSync( 0, 0 );
00853
00854 EQASSERT( gotData );
00855 EQASSERT( command.isValid( ));
00856
00857
00858 connection->recvNB( sizePtr, sizeof( uint64_t ));
00859
00860 if( !gotData )
00861 {
00862 EQERROR << "Incomplete packet read: " << command << std::endl;
00863 return false;
00864 }
00865
00866
00867
00868 EQASSERTINFO( node.isValid() ||
00869 ( command->datatype == DATATYPE_EQNET_NODE &&
00870 ( command->command == CMD_NODE_CONNECT ||
00871 command->command == CMD_NODE_CONNECT_REPLY)),
00872 command << " connection " << connection );
00873
00874 _dispatchCommand( command );
00875 return true;
00876 }
00877
00878 void Node::_dispatchCommand( Command& command )
00879 {
00880 EQASSERT( command.isValid( ));
00881
00882 const bool dispatched = dispatchCommand( command );
00883
00884 _redispatchCommands();
00885
00886 if( !dispatched )
00887 {
00888 command.retain();
00889 _pendingCommands.push_back( &command );
00890 }
00891 }
00892
00893 bool Node::dispatchCommand( Command& command )
00894 {
00895 EQVERB << "dispatch " << command << " by " << _id << endl;
00896 EQASSERT( command.isValid( ));
00897
00898 const uint32_t datatype = command->datatype;
00899 switch( datatype )
00900 {
00901 case DATATYPE_EQNET_NODE:
00902 return Dispatcher::dispatchCommand( command );
00903
00904 case DATATYPE_EQNET_SESSION:
00905 case DATATYPE_EQNET_OBJECT:
00906 {
00907 const SessionPacket* sessionPacket =
00908 static_cast<SessionPacket*>( command.getPacket( ));
00909 const uint32_t id = sessionPacket->sessionID;
00910
00911 EQASSERTINFO( _sessions.find( id ) != _sessions.end(),
00912 "Can't find session for " << sessionPacket );
00913 Session* session = _sessions[ id ];
00914 EQASSERT( session );
00915
00916 return session->dispatchCommand( command );
00917 }
00918
00919 default:
00920 EQABORT( "Unknown datatype " << datatype << " for " << command );
00921 return true;
00922 }
00923 }
00924
00925 void Node::_redispatchCommands()
00926 {
00927 bool changes = true;
00928 while( changes && !_pendingCommands.empty( ))
00929 {
00930 changes = false;
00931
00932 for( CommandList::iterator i = _pendingCommands.begin();
00933 i != _pendingCommands.end(); ++i )
00934 {
00935 Command* command = *i;
00936 EQASSERT( command->isValid( ));
00937
00938 if( dispatchCommand( *command ))
00939 {
00940 _pendingCommands.erase( i );
00941 command->release();
00942 changes = true;
00943 break;
00944 }
00945 }
00946 }
00947
00948 #ifndef NDEBUG
00949 if( !_pendingCommands.empty( ))
00950 EQVERB << _pendingCommands.size() << " undispatched commands" << endl;
00951 EQASSERT( _pendingCommands.size() < 1000 );
00952 #endif
00953 }
00954
00955
00956
00957
00958 void* Node::_runCommandThread()
00959 {
00960 EQINFO << "Entered command thread of " << typeid( *this ).name() << endl;
00961
00962 while( _state == STATE_LISTENING )
00963 {
00964 Command* command = _commandThreadQueue.pop();
00965 EQASSERT( command->isValid( ));
00966
00967 const CommandResult result = invokeCommand( *command );
00968 switch( result )
00969 {
00970 case COMMAND_ERROR:
00971 EQABORT( "Error handling " << *command );
00972 break;
00973
00974 case COMMAND_HANDLED:
00975 case COMMAND_DISCARD:
00976 break;
00977
00978 default:
00979 EQUNIMPLEMENTED;
00980 }
00981
00982 command->release();
00983 }
00984
00985 _commandThreadQueue.flush();
00986 EQINFO << "Leaving command thread of " << typeid( *this ).name() << endl;
00987 return EXIT_SUCCESS;
00988 }
00989
00990 CommandResult Node::invokeCommand( Command& command )
00991 {
00992 EQVERB << "dispatch " << command << " by " << _id << endl;
00993 EQASSERT( command.isValid( ));
00994
00995 const uint32_t datatype = command->datatype;
00996 switch( datatype )
00997 {
00998 case DATATYPE_EQNET_NODE:
00999 return Dispatcher::invokeCommand( command );
01000
01001 case DATATYPE_EQNET_SESSION:
01002 case DATATYPE_EQNET_OBJECT:
01003 {
01004 const SessionPacket* sessionPacket =
01005 static_cast<SessionPacket*>( command.getPacket( ));
01006 const uint32_t id = sessionPacket->sessionID;
01007
01008 EQASSERTINFO( _sessions.find( id ) != _sessions.end( ),
01009 "Can't find session for " << sessionPacket );
01010
01011 Session* session = _sessions[ id ];
01012 if( !session )
01013 return COMMAND_ERROR;
01014
01015 return session->invokeCommand( command );
01016 }
01017
01018 default:
01019 EQABORT( "Unknown datatype " << datatype << " for " << command );
01020 return COMMAND_ERROR;
01021 }
01022 }
01023
01024 CommandResult Node::_cmdStop( Command& command )
01025 {
01026 EQINFO << "Cmd stop " << this << endl;
01027 EQASSERT( _state == STATE_LISTENING );
01028
01029 _state = STATE_STOPPED;
01030 _connectionSet.interrupt();
01031
01032 return COMMAND_HANDLED;
01033 }
01034
01035 CommandResult Node::_cmdRegisterSession( Command& command )
01036 {
01037 EQASSERT( getState() == STATE_LISTENING );
01038
01039 const NodeRegisterSessionPacket* packet =
01040 command.getPacket<NodeRegisterSessionPacket>();
01041 EQVERB << "Cmd register session: " << packet << endl;
01042 CHECK_THREAD( _thread );
01043
01044 Session* session = static_cast< Session* >(
01045 _requestHandler.getRequestData( packet->requestID ));
01046
01047 EQASSERTINFO( command.getNode() == this,
01048 command.getNode() << " != " << this );
01049 EQASSERT( session );
01050
01051 const uint32_t sessionID = _generateSessionID();
01052 _addSession( session, this, sessionID );
01053
01054 NodeRegisterSessionReplyPacket reply( packet );
01055 reply.sessionID = session->getID();
01056 send( reply );
01057
01058 return COMMAND_HANDLED;
01059 }
01060
01061 CommandResult Node::_cmdRegisterSessionReply( Command& command)
01062 {
01063 const NodeRegisterSessionReplyPacket* packet =
01064 command.getPacket<NodeRegisterSessionReplyPacket>();
01065 EQVERB << "Cmd register session reply: " << packet << endl;
01066 CHECK_THREAD( _thread );
01067
01068 EQASSERT( command.getNode() == this );
01069
01070 _requestHandler.serveRequest( packet->requestID, packet->sessionID );
01071 return COMMAND_HANDLED;
01072 }
01073
01074
01075 CommandResult Node::_cmdMapSession( Command& command )
01076 {
01077 EQASSERT( getState() == STATE_LISTENING );
01078
01079 const NodeMapSessionPacket* packet =
01080 command.getPacket<NodeMapSessionPacket>();
01081 EQVERB << "Cmd map session: " << packet << endl;
01082 CHECK_THREAD( _thread );
01083
01084 NodePtr node = command.getNode();
01085 NodeMapSessionReplyPacket reply( packet );
01086
01087 if( node == this )
01088 {
01089 EQASSERTINFO( node == this,
01090 "Can't map a session using myself as server " );
01091 reply.sessionID = EQ_ID_INVALID;
01092 }
01093 else
01094 {
01095 const uint32_t sessionID = packet->sessionID;
01096 SessionHash::const_iterator i = _sessions.find( sessionID );
01097
01098 if( i == _sessions.end( ))
01099 reply.sessionID = EQ_ID_INVALID;
01100 }
01101
01102 node->send( reply );
01103 return COMMAND_HANDLED;
01104 }
01105
01106 CommandResult Node::_cmdMapSessionReply( Command& command)
01107 {
01108 const NodeMapSessionReplyPacket* packet =
01109 command.getPacket<NodeMapSessionReplyPacket>();
01110 EQVERB << "Cmd map session reply: " << packet << endl;
01111 CHECK_THREAD( _thread );
01112
01113 const uint32_t requestID = packet->requestID;
01114 if( packet->sessionID != EQ_ID_INVALID )
01115 {
01116 NodePtr node = command.getNode();
01117 Session* session = static_cast< Session* >(
01118 _requestHandler.getRequestData( requestID ));
01119 EQASSERT( session );
01120 EQASSERT( node != this );
01121
01122 _addSession( session, node, packet->sessionID );
01123 }
01124
01125 _requestHandler.serveRequest( requestID, packet->sessionID );
01126 return COMMAND_HANDLED;
01127 }
01128
01129 CommandResult Node::_cmdUnmapSession( Command& command )
01130 {
01131 const NodeUnmapSessionPacket* packet =
01132 command.getPacket<NodeUnmapSessionPacket>();
01133 EQVERB << "Cmd unmap session: " << packet << endl;
01134 CHECK_THREAD( _thread );
01135
01136 const uint32_t sessionID = packet->sessionID;
01137 SessionHash::const_iterator i = _sessions.find( sessionID );
01138 Session* session = (i == _sessions.end() ? 0 : i->second );
01139
01140 NodeUnmapSessionReplyPacket reply( packet );
01141 reply.result = (session != 0);
01142
01143 #if 0
01144 if( session && session->_server == this )
01145 ;
01146 #endif
01147
01148 command.getNode()->send( reply );
01149 return COMMAND_HANDLED;
01150 }
01151
01152 CommandResult Node::_cmdUnmapSessionReply( Command& command)
01153 {
01154 const NodeUnmapSessionReplyPacket* packet =
01155 command.getPacket<NodeUnmapSessionReplyPacket>();
01156 EQVERB << "Cmd unmap session reply: " << packet << endl;
01157 CHECK_THREAD( _thread );
01158
01159 const uint32_t requestID = packet->requestID;
01160 Session* session = static_cast< Session* >(
01161 _requestHandler.getRequestData( requestID ));
01162 EQASSERT( session );
01163
01164 if( session )
01165 {
01166 _removeSession( session );
01167 _requestHandler.serveRequest( requestID, true );
01168 }
01169 else
01170 _requestHandler.serveRequest( requestID, false );
01171
01172
01173 return COMMAND_HANDLED;
01174 }
01175
01176 CommandResult Node::_cmdConnect( Command& command )
01177 {
01178 EQASSERT( !command.getNode().isValid( ));
01179 EQASSERT( inReceiverThread( ));
01180
01181 const NodeConnectPacket* packet = command.getPacket<NodeConnectPacket>();
01182 ConnectionPtr connection = _connectionSet.getConnection();
01183
01184 NodeID nodeID = packet->nodeID;
01185 nodeID.convertToHost();
01186
01187 EQINFO << "handle connect " << packet << endl;
01188 EQASSERT( nodeID != _id );
01189 EQASSERT( _connectionNodes.find( connection ) == _connectionNodes.end( ));
01190
01191 if( _nodes.find( nodeID ) != _nodes.end( ))
01192 {
01193 EQASSERT( packet->launchID == EQ_ID_INVALID );
01194 EQINFO << "Already got node " << nodeID << ", refusing connect"
01195 << endl;
01196
01197
01198 NodeConnectReplyPacket reply( packet );
01199 connection->send( reply, serialize( ));
01200
01201
01202
01203
01204
01205
01206 _removeConnection( connection );
01207 return COMMAND_HANDLED;
01208 }
01209
01210
01211 NodePtr remoteNode;
01212 if( packet->launchID != EQ_ID_INVALID )
01213 {
01214 void* ptr = _requestHandler.getRequestData( packet->launchID );
01215 EQASSERT( dynamic_cast< Node* >( (Dispatcher*)ptr ));
01216 remoteNode = static_cast< Node* >( ptr );
01217 remoteNode->_connectionDescriptions.clear();
01218 }
01219 else
01220 remoteNode = createNode( packet->type );
01221
01222 string data = packet->nodeData;
01223 if( !remoteNode->deserialize( data ))
01224 EQWARN << "Error during node initialization" << endl;
01225 EQASSERT( data.empty( ));
01226 EQASSERTINFO( remoteNode->_id == nodeID,
01227 remoteNode->_id << "!=" << nodeID );
01228
01229 remoteNode->_connection = connection;
01230 remoteNode->_state = STATE_CONNECTED;
01231
01232 _connectionNodes[ connection ] = remoteNode;
01233 _nodes[ remoteNode->_id ] = remoteNode;
01234 EQVERB << "Added node " << nodeID << " using " << connection << std::endl;
01235
01236
01237 NodeConnectReplyPacket reply( packet );
01238 reply.nodeID = _id;
01239 reply.nodeID.convertToNetwork();
01240
01241 reply.type = getType();
01242
01243 connection->send( reply, serialize( ));
01244
01245 if( packet->launchID != EQ_ID_INVALID )
01246 _requestHandler.serveRequest( packet->launchID );
01247
01248 return COMMAND_HANDLED;
01249 }
01250
01251 CommandResult Node::_cmdConnectReply( Command& command )
01252 {
01253 EQASSERT( !command.getNode( ));
01254 EQASSERT( inReceiverThread( ));
01255
01256 const NodeConnectReplyPacket* packet =
01257 command.getPacket<NodeConnectReplyPacket>();
01258 ConnectionPtr connection = _connectionSet.getConnection();
01259
01260 NodeID nodeID = packet->nodeID;
01261 nodeID.convertToHost();
01262
01263 EQINFO << "handle connect reply " << packet << endl;
01264 EQASSERT( _connectionNodes.find( connection ) == _connectionNodes.end( ));
01265
01266 if( nodeID == NodeID::ZERO ||
01267 _nodes.find( nodeID ) != _nodes.end( ))
01268
01269 {
01270 EQINFO << "ignoring connect reply, node already connected" << endl;
01271 _removeConnection( connection );
01272
01273 if( packet->requestID != EQ_ID_INVALID )
01274 _requestHandler.serveRequest( packet->requestID, false );
01275
01276 return COMMAND_HANDLED;
01277 }
01278
01279
01280 NodePtr remoteNode;
01281 if( packet->requestID != EQ_ID_INVALID )
01282 {
01283 void* ptr = _requestHandler.getRequestData( packet->requestID );
01284 EQASSERT( dynamic_cast< Node* >( (Dispatcher*)ptr ));
01285 remoteNode = static_cast< Node* >( ptr );
01286 remoteNode->_connectionDescriptions.clear();
01287 }
01288
01289 if( !remoteNode )
01290 remoteNode = createNode( packet->type );
01291
01292 EQASSERT( remoteNode->getType() == packet->type );
01293 EQASSERT( remoteNode->getState() == STATE_STOPPED );
01294
01295 string data = packet->nodeData;
01296 if( !remoteNode->deserialize( data ))
01297 EQWARN << "Error during node initialization" << endl;
01298 EQASSERT( data.empty( ));
01299 EQASSERT( remoteNode->_id == nodeID );
01300
01301 remoteNode->_connection = connection;
01302 remoteNode->_state = STATE_CONNECTED;
01303
01304 _connectionNodes[ connection ] = remoteNode;
01305 _nodes[ remoteNode->_id ] = remoteNode;
01306 EQVERB << "Added node " << nodeID << " using " << connection << std::endl;
01307
01308 if( packet->requestID != EQ_ID_INVALID )
01309 _requestHandler.serveRequest( packet->requestID, true );
01310 return COMMAND_HANDLED;
01311 }
01312
01313 CommandResult Node::_cmdDisconnect( Command& command )
01314 {
01315 EQASSERT( inReceiverThread( ));
01316
01317 const NodeDisconnectPacket* packet =
01318 command.getPacket<NodeDisconnectPacket>();
01319
01320 NodePtr node = static_cast<Node*>(
01321 _requestHandler.getRequestData( packet->requestID ));
01322 EQASSERT( node.isValid( ));
01323
01324 ConnectionPtr connection = node->_connection;
01325 if( connection.isValid( ))
01326 {
01327 node->_state = STATE_STOPPED;
01328 node->_connection = 0;
01329
01330 _removeConnection( connection );
01331 EQASSERT( _connectionNodes.find( connection )!=_connectionNodes.end( ));
01332
01333 _connectionNodes.erase( connection );
01334 _nodes.erase( node->_id );
01335
01336 EQINFO << node << " disconnected from " << this << " connection used "
01337 << connection->getRefCount() << endl;
01338 }
01339
01340 EQASSERT( node->_state == STATE_STOPPED );
01341 _requestHandler.serveRequest( packet->requestID );
01342 return COMMAND_HANDLED;
01343 }
01344
01345 CommandResult Node::_cmdGetNodeData( Command& command)
01346 {
01347 const NodeGetNodeDataPacket* packet =
01348 command.getPacket<NodeGetNodeDataPacket>();
01349 EQINFO << "cmd get node data: " << packet << endl;
01350
01351 NodeID nodeID = packet->nodeID;
01352 nodeID.convertToHost();
01353
01354 NodePtr descNode = getNode( nodeID );
01355
01356 NodeGetNodeDataReplyPacket reply( packet );
01357
01358 string nodeData;
01359 if( descNode.isValid( ))
01360 {
01361 reply.type = descNode->getType();
01362 nodeData = descNode->serialize();
01363 }
01364 else
01365 {
01366 EQINFO << "Node " << nodeID << " unknown" << endl;
01367 reply.type = TYPE_EQNET_INVALID;
01368 }
01369
01370 NodePtr node = command.getNode();
01371 node->send( reply, nodeData );
01372 EQINFO << "Sent node data " << nodeData << " to " << node << endl;
01373 return COMMAND_HANDLED;
01374 }
01375
01376 CommandResult Node::_cmdGetNodeDataReply( Command& command )
01377 {
01378 NodeGetNodeDataReplyPacket* packet =
01379 command.getPacket<NodeGetNodeDataReplyPacket>();
01380 EQINFO << "cmd get node data reply: " << packet << endl;
01381
01382 const uint32_t requestID = packet->requestID;
01383
01384 NodeID nodeID = packet->nodeID;
01385 nodeID.convertToHost();
01386
01387 if( _nodes.find( nodeID ) != _nodes.end( ))
01388 {
01389
01390 NodePtr node = _nodes[ nodeID ];
01391 EQASSERT( node->isConnected( ));
01392
01393 node.ref();
01394 _requestHandler.serveRequest( requestID, node.get( ));
01395 return COMMAND_HANDLED;
01396 }
01397
01398 if( packet->type == TYPE_EQNET_INVALID )
01399 {
01400 _requestHandler.serveRequest( requestID, (void*)0 );
01401 return COMMAND_HANDLED;
01402 }
01403
01404 NodePtr node = createNode( packet->type );
01405 EQASSERT( node.isValid( ));
01406
01407 string data = packet->nodeData;
01408 if( !node->deserialize( data ))
01409 EQWARN << "Failed to initialize node data" << endl;
01410 EQASSERT( data.empty( ));
01411
01412 node->setAutoLaunch( false );
01413 node.ref();
01414 _requestHandler.serveRequest( requestID, node.get( ));
01415 return COMMAND_HANDLED;
01416 }
01417
01418 CommandResult Node::_cmdAcquireSendToken( Command& command )
01419 {
01420 NodeAcquireSendTokenPacket* packet =
01421 command.getPacket<NodeAcquireSendTokenPacket>();
01422
01423 if( !_hasSendToken )
01424
01425 return COMMAND_ERROR;
01426
01427 _hasSendToken = false;
01428
01429 NodeAcquireSendTokenReplyPacket reply( packet );
01430 command.getNode()->send( reply );
01431 return COMMAND_HANDLED;
01432 }
01433
01434 CommandResult Node::_cmdAcquireSendTokenReply( Command& command )
01435 {
01436 NodeAcquireSendTokenReplyPacket* packet =
01437 command.getPacket<NodeAcquireSendTokenReplyPacket>();
01438
01439 _requestHandler.serveRequest( packet->requestID );
01440 return COMMAND_HANDLED;
01441 }
01442
01443 CommandResult Node::_cmdReleaseSendToken( Command& command )
01444 {
01445 EQASSERT( !_hasSendToken );
01446 _hasSendToken = true;
01447 flushCommands();
01448 return COMMAND_HANDLED;
01449 }
01450
01451
01452
01453
01454 bool Node::connect( NodePtr node )
01455 {
01456 if( node->getState() == STATE_CONNECTED ||
01457 node->getState() == STATE_LISTENING )
01458
01459 return true;
01460
01461 if( !initConnect( node ))
01462 {
01463 EQERROR << "Connection initialization to " << node << " failed."
01464 << endl;
01465 return false;
01466 }
01467
01468 return syncConnect( node );
01469 }
01470
01471 bool Node::initConnect( NodePtr node )
01472 {
01473 EQASSERTINFO( _state == STATE_LISTENING, _state );
01474 if( node->getState() == STATE_CONNECTED ||
01475 node->getState() == STATE_LISTENING )
01476 {
01477 return true;
01478 }
01479
01480 EQASSERT( node->getState() == STATE_STOPPED );
01481
01482
01483 const ConnectionDescriptionVector& cds = node->getConnectionDescriptions();
01484 for( ConnectionDescriptionVector::const_iterator i = cds.begin();
01485 i != cds.end(); ++i )
01486 {
01487 ConnectionDescriptionPtr description = *i;
01488 ConnectionPtr connection = Connection::create( description );
01489
01490 if( !connection->connect( ))
01491 continue;
01492
01493 if( !connect( node, connection ))
01494 return false;
01495
01496 return true;
01497 }
01498
01499 EQINFO << "Node could not be connected." << endl;
01500 if( !node->_autoLaunch )
01501 return false;
01502
01503 EQINFO << "Attempting to launch node." << endl;
01504 for( ConnectionDescriptionVector::const_iterator i = cds.begin();
01505 i != cds.end(); ++i )
01506 {
01507 ConnectionDescriptionPtr description = *i;
01508
01509 if( _launch( node, description ))
01510 return true;
01511 }
01512
01513 return false;
01514 }
01515
01516 bool Node::syncConnect( NodePtr node )
01517 {
01518 if( node->_launchID == EQ_ID_INVALID )
01519 return ( node->getState() == STATE_CONNECTED );
01520
01521 void* ret;
01522 const float time = -( node->_launchTimeout.getTimef( ));
01523 const uint32_t timeout = static_cast< uint32_t >((time > 0) ? time : 0);
01524
01525 if( _requestHandler.waitRequest( node->_launchID, ret, timeout ))
01526 {
01527 EQASSERT( node->getState() == STATE_CONNECTED );
01528 node->_launchID = EQ_ID_INVALID;
01529 return true;
01530 }
01531
01532 node->_state = STATE_STOPPED;
01533 _requestHandler.unregisterRequest( node->_launchID );
01534 node->_launchID = EQ_ID_INVALID;
01535 return false;
01536 }
01537
01538 NodePtr Node::connect( const NodeID& nodeID )
01539 {
01540 EQASSERT( nodeID != NodeID::ZERO );
01541
01542
01543 NodeVector nodes;
01544 for( base::UUIDHash< NodePtr >::const_iterator i = _nodes.begin();
01545 i != _nodes.end(); ++i )
01546 {
01547 NodePtr node = i->second;
01548
01549 if( node->getNodeID() == nodeID )
01550 return node;
01551
01552 nodes.push_back( node );
01553 }
01554
01555 for( NodeVector::const_iterator i = nodes.begin(); i != nodes.end(); ++i )
01556 {
01557 NodePtr result = _connect( nodeID, *i );
01558 if( result.isValid( ))
01559 return result;
01560 }
01561
01562 return 0;
01563 }
01564
01565 NodePtr Node::_connect( const NodeID& nodeID, NodePtr server )
01566 {
01567 EQASSERT( nodeID != NodeID::ZERO );
01568
01569 base::UUIDHash< NodePtr >::const_iterator iter = _nodes.find( nodeID );
01570 if( iter != _nodes.end( ))
01571 return iter->second;
01572
01573
01574
01575
01576
01577
01578
01579 ScopedMutex mutex( _connectMutex );
01580 EQINFO << "Connecting node " << nodeID << endl;
01581
01582 iter = _nodes.find( nodeID );
01583 if( iter != _nodes.end( ))
01584 return iter->second;
01585
01586 EQASSERT( _id != nodeID );
01587 NodeGetNodeDataPacket packet;
01588 packet.requestID = _requestHandler.registerRequest();
01589 packet.nodeID = nodeID;
01590 packet.nodeID.convertToNetwork();
01591
01592 server->send( packet );
01593
01594 void* result = 0;
01595 _requestHandler.waitRequest( packet.requestID, result );
01596
01597 if( !result )
01598 {
01599 EQWARN << "Node not found on server" << endl;
01600 return 0;
01601 }
01602
01603 EQASSERT( dynamic_cast< Node* >( (Dispatcher*)result ));
01604 NodePtr node = static_cast< Node* >( result );
01605 node.unref();
01606
01607 if( node->isConnected( ))
01608 return node;
01609
01610 if( !connect( node ))
01611 {
01612
01613 iter = _nodes.find( nodeID );
01614 if( iter != _nodes.end( ))
01615 {
01616 node = iter->second;
01617 EQASSERT( node->isConnected( ));
01618 return node;
01619 }
01620
01621 EQWARN << "Node connection failed" << endl;
01622 return 0;
01623 }
01624
01625 return node;
01626 }
01627
01628 bool Node::_launch( NodePtr node,
01629 ConnectionDescriptionPtr description )
01630 {
01631 EQASSERT( node->getState() == STATE_STOPPED );
01632
01633 node->_launchID = _requestHandler.registerRequest( node.get() );
01634 node->_launchTimeout.setAlarm(
01635 static_cast< float >( description->launchTimeout ));
01636
01637 const string launchCommand = _createLaunchCommand( node, description );
01638
01639 if( !Launcher::run( launchCommand ))
01640 {
01641 EQWARN << "Could not launch node using '" << launchCommand << "'"
01642 << endl;
01643 _requestHandler.unregisterRequest( node->_launchID );
01644 node->_launchID = EQ_ID_INVALID;
01645 return false;
01646 }
01647
01648 node->_state = STATE_LAUNCHED;
01649 return true;
01650 }
01651
01652 string Node::_createLaunchCommand( NodePtr node,
01653 ConnectionDescriptionPtr description )
01654 {
01655 const string& launchCommand = description->getLaunchCommand();
01656 const size_t launchCommandLen = launchCommand.size();
01657 const char quote = description->launchCommandQuote;
01658
01659 bool commandFound = false;
01660 size_t lastPos = 0;
01661 string result;
01662
01663 for( size_t percentPos = launchCommand.find( '%' );
01664 percentPos != string::npos;
01665 percentPos = launchCommand.find( '%', percentPos+1 ))
01666 {
01667 ostringstream replacement;
01668 switch( launchCommand[percentPos+1] )
01669 {
01670 case 'c':
01671 {
01672 replacement << _createRemoteCommand( node, quote );
01673 commandFound = true;
01674 break;
01675 }
01676 case 'h':
01677 replacement << description->getHostname();
01678 break;
01679
01680 case 'n':
01681 replacement << node->getNodeID();
01682 break;
01683
01684 default:
01685 EQWARN << "Unknown token " << launchCommand[percentPos+1]
01686 << endl;
01687 }
01688
01689 result += launchCommand.substr( lastPos, percentPos-lastPos );
01690 if( !replacement.str().empty( ))
01691 result += replacement.str();
01692
01693 lastPos = percentPos+2;
01694 }
01695
01696 result += launchCommand.substr( lastPos, launchCommandLen-lastPos );
01697
01698 if( !commandFound )
01699 result += " " + _createRemoteCommand( node, quote );
01700
01701 EQINFO << "Launch command: " << result << endl;
01702 return result;
01703 }
01704
01705 string Node::_createRemoteCommand( NodePtr node, const char quote )
01706 {
01707 if( getState() != STATE_LISTENING )
01708 {
01709 EQERROR << "Node is not listening, can't launch " << this << endl;
01710 return "";
01711 }
01712
01713 ostringstream stringStream;
01714
01715
01716 #ifndef WIN32
01717 # ifdef Darwin
01718 const char libPath[] = "DYLD_LIBRARY_PATH";
01719 # else
01720 const char libPath[] = "LD_LIBRARY_PATH";
01721 # endif
01722
01723 stringStream << "env ";
01724 char* env = getenv( libPath );
01725 if( env )
01726 stringStream << libPath << "=" << env << " ";
01727
01728 for( int i=0; environ[i] != 0; i++ )
01729 if( strlen( environ[i] ) > 2 && strncmp( environ[i], "EQ_", 3 ) == 0 )
01730 stringStream << environ[i] << " ";
01731
01732 stringStream << "EQ_LOG_LEVEL=" << Log::getLogLevelString() << " ";
01733 if( eq::base::Log::topics != 0 )
01734 stringStream << "EQ_LOG_TOPICS=" << Log::topics << " ";
01735 #endif // WIN32
01736
01737
01738 string program = node->_programName;
01739 #ifdef WIN32
01740 EQASSERT( program.length() > 2 );
01741 if( !( program[1] == ':' && (program[2] == '/' || program[2] == '\\' )) &&
01742
01743 !( program[0] == '/' || program[0] == '\\' ))
01744
01745
01746 program = node->_workDir + '/' + program;
01747 #else
01748 if( program[0] != '/' )
01749 program = node->_workDir + '/' + program;
01750 #endif
01751
01752 const string ownData = serialize();
01753 const string remoteData = node->serialize();
01754
01755 stringStream
01756 << quote << program << quote << " -- --eq-client " << quote
01757 << remoteData << node->_launchID << SEPARATOR << node->_workDir
01758 << SEPARATOR << node->_id << SEPARATOR << getType() << SEPARATOR
01759 << ownData << quote;
01760
01761 return stringStream.str();
01762 }
01763
01764 bool Node::runClient( const std::string& clientArgs )
01765 {
01766 EQASSERT( _state == STATE_LISTENING );
01767
01768 size_t nextPos = clientArgs.find( SEPARATOR );
01769 if( nextPos == string::npos )
01770 {
01771 EQERROR << "Could not parse request identifier: " << clientArgs << endl;
01772 return false;
01773 }
01774
01775 const string request = clientArgs.substr( 0, nextPos );
01776 string description = clientArgs.substr( nextPos + 1 );
01777 const uint32_t launchID = strtoul( request.c_str(), 0, 10 );
01778
01779 nextPos = description.find( SEPARATOR );
01780 if( nextPos == string::npos )
01781 {
01782 EQERROR << "Could not parse working directory: " << description
01783 << " is left from " << clientArgs << endl;
01784 return false;
01785 }
01786
01787 const string workDir = description.substr( 0, nextPos );
01788 description = description.substr( nextPos + 1 );
01789
01790 Global::setWorkDir( workDir );
01791 if( !workDir.empty() && chdir( workDir.c_str( )) == -1 )
01792 EQWARN << "Can't change working directory to " << workDir << ": "
01793 << strerror( errno ) << endl;
01794
01795 EQINFO << "Launching node with launch ID=" << launchID << ", cwd="
01796 << workDir << endl;
01797
01798 nextPos = description.find( SEPARATOR );
01799 if( nextPos == string::npos )
01800 {
01801 EQERROR << "Could not parse node identifier: " << description
01802 << " is left from " << clientArgs << endl;
01803 return false;
01804 }
01805 _id = description.substr( 0, nextPos );
01806 description = description.substr( nextPos + 1 );
01807
01808 nextPos = description.find( SEPARATOR );
01809 if( nextPos == string::npos )
01810 {
01811 EQERROR << "Could not parse server node type: " << description
01812 << " is left from " << clientArgs << endl;
01813 return false;
01814 }
01815 const string nodeType = description.substr( 0, nextPos );
01816 description = description.substr( nextPos + 1 );
01817 const uint32_t type = atoi( nodeType.c_str( ));
01818
01819 RefPtr< Node > node = createNode( type );
01820 if( !node )
01821 {
01822 EQERROR << "Can't create server node" << endl;
01823 return false;
01824 }
01825
01826 node->setAutoLaunch( false );
01827 node->_launchID = launchID;
01828
01829 if( !node->deserialize( description ))
01830 EQWARN << "Can't parse node data" << endl;
01831
01832 if( !connect( node ))
01833 {
01834 EQERROR << "Can't connect node" << endl;
01835 return false;
01836 }
01837
01838 return clientLoop();
01839 }
01840
01841
01842 EQ_EXPORT std::ostream& operator << ( std::ostream& os, const Node::State state)
01843 {
01844 os << ( state == Node::STATE_STOPPED ? "stopped" :
01845 state == Node::STATE_LAUNCHED ? "launched" :
01846 state == Node::STATE_CONNECTED ? "connected" :
01847 state == Node::STATE_LISTENING ? "listening" : "ERROR" );
01848 return os;
01849 }
01850
01851 }
01852 }