lib/net/node.h
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #ifndef EQNET_NODE_H
00019 #define EQNET_NODE_H
00020
00021 #include <eq/net/dispatcher.h>
00022 #include <eq/net/commandCache.h>
00023 #include <eq/net/commandQueue.h>
00024 #include <eq/net/connectionSet.h>
00025 #include <eq/net/nodeType.h>
00026 #include <eq/net/types.h>
00027
00028 #include <eq/base/base.h>
00029 #include <eq/base/perThread.h>
00030 #include <eq/base/requestHandler.h>
00031 #include <eq/base/thread.h>
00032
00033 #include <list>
00034
00035 #pragma warning(disable : 4190)
00036 extern "C" EQSERVER_EXPORT eq::net::ConnectionPtr eqsStartLocalServer( const
00037 std::string& );
00038 extern "C" EQSERVER_EXPORT void eqsJoinLocalServer();
00039 #pragma warning(default : 4190)
00040
00041 namespace eq
00042 {
00043 namespace net
00044 {
00045 class Command;
00046 class ConnectionDescription;
00047 class Session;
00048
00059 class Node : public Dispatcher, public base::Referenced
00060 {
00061 public:
00063 enum State
00064 {
00065 STATE_STOPPED,
00066 STATE_LAUNCHED,
00067 STATE_CONNECTED,
00068 STATE_LISTENING
00069 };
00070
00072 EQ_EXPORT Node();
00073
00076 bool operator == ( const Node* n ) const;
00077
00079 State getState() const { return _state; }
00080 bool isConnected() const
00081 { return (_state == STATE_CONNECTED || _state == STATE_LISTENING); }
00082
00083 void setAutoLaunch( const bool autoLaunch ) { _autoLaunch = autoLaunch;}
00084
00090 EQ_EXPORT void setProgramName( const std::string& name );
00091
00097 EQ_EXPORT void setWorkDir( const std::string& name );
00098
00105 NodePtr getNode( const NodeID& id ) const;
00107
00129 EQ_EXPORT virtual bool initLocal( const int argc, char** argv );
00130
00132 virtual bool exitLocal() { return stopListening(); }
00133
00135 virtual bool exitClient() { return exitLocal(); }
00136
00149 EQ_EXPORT virtual bool listen();
00150
00160 EQ_EXPORT virtual bool stopListening();
00161
00174 bool connect( NodePtr node, ConnectionPtr connection );
00175
00188 EQ_EXPORT bool connect( NodePtr node );
00189
00202 EQ_EXPORT bool initConnect( NodePtr node );
00203
00215 EQ_EXPORT bool syncConnect( NodePtr node );
00216
00224 NodePtr connect( const NodeID& nodeID );
00225
00233 EQ_EXPORT bool disconnect( NodePtr node );
00235
00236
00247 bool isLocal() const { return (_state==STATE_LISTENING); }
00248
00254 EQ_EXPORT void addConnectionDescription( ConnectionDescriptionPtr cd );
00255
00261 void removeConnectionDescription( const uint32_t index );
00262
00268 EQ_EXPORT const ConnectionDescriptionVector& getConnectionDescriptions()
00269 const;
00275 ConnectionPtr getConnection() const { return _connection; }
00277
00288 ConnectionPtr checkConnection()
00289 {
00290 ConnectionPtr connection = _connection;
00291 if( _state == STATE_CONNECTED || _state == STATE_LISTENING )
00292 return _connection;
00293 return 0;
00294 }
00295
00302 bool send( const Packet& packet )
00303 {
00304 ConnectionPtr connection = checkConnection();
00305 if( !connection )
00306 return false;
00307 return connection->send( packet );
00308 }
00309
00325 bool send( Packet& packet, const std::string& string )
00326 {
00327 ConnectionPtr connection = checkConnection();
00328 if( !connection )
00329 return false;
00330 return connection->send( packet, string );
00331 }
00332
00346 template< class T >
00347 bool send( Packet& packet, const std::vector<T>& data )
00348 {
00349 ConnectionPtr connection = checkConnection();
00350 if( !connection )
00351 return false;
00352 return connection->send( packet, data );
00353 }
00354
00371 bool send( Packet& packet, const void* data, const uint64_t size )
00372 {
00373 ConnectionPtr connection = checkConnection();
00374 if( !connection )
00375 return false;
00376 return connection->send( packet, data, size );
00377 }
00378
00385 void flushCommands() { _connectionSet.interrupt(); }
00386
00387 void acquireSendToken( NodePtr toNode );
00388 void releaseSendToken( NodePtr toNode );
00390
00404 EQ_EXPORT bool registerSession( Session* session );
00405
00407 bool deregisterSession( Session* session )
00408 { return unmapSession( session ); }
00409
00422 bool mapSession( NodePtr server, Session* session,
00423 const uint32_t id );
00424
00432 EQ_EXPORT bool unmapSession( Session* session );
00433
00435 Session* getSession( const uint32_t id );
00436
00437 bool hasSessions() const { return !_sessions.empty(); }
00439
00446 EQ_EXPORT virtual bool runClient( const std::string& clientArgs );
00447
00449 CommandQueue* getCommandThreadQueue()
00450 { EQASSERT( isLocal( )); return &_commandThreadQueue; }
00451
00456 bool inCommandThread() const { return _commandThread->isCurrent(); }
00457 bool inReceiverThread() const { return _receiverThread->isCurrent(); }
00458
00459 const NodeID& getNodeID() const { return _id; }
00460
00462 EQ_EXPORT std::string serialize() const;
00464 EQ_EXPORT bool deserialize( std::string& data );
00465
00466 protected:
00468 EQ_EXPORT virtual ~Node();
00469
00477 EQ_EXPORT virtual bool dispatchCommand( Command& command );
00478
00486 EQ_EXPORT virtual CommandResult invokeCommand( Command& command );
00487
00493 virtual bool clientLoop() { return true; }
00494
00496 virtual uint32_t getType() const { return TYPE_EQNET_NODE; }
00497
00505 EQ_EXPORT virtual NodePtr createNode( const uint32_t type );
00506
00508 base::RequestHandler _requestHandler;
00509
00510 private:
00512 bool _autoLaunch;
00513
00515 NodeID _id;
00516
00518 State _state;
00519
00521 SessionHash _sessions;
00522
00524 ConnectionPtr _connection;
00525
00527 ConnectionSet _connectionSet;
00528 friend eq::net::ConnectionPtr (::eqsStartLocalServer(const
00529 std::string& ));
00530
00532 base::UUIDHash< NodePtr > _nodes;
00533
00535 typedef base::RefPtrHash< Connection, NodePtr > ConnectionNodeHash;
00536 ConnectionNodeHash _connectionNodes;
00537
00539 CommandQueue _commandThreadQueue;
00540
00542 base::Lock _connectMutex;
00543
00545 uint32_t _launchID;
00546
00548 base::Clock _launchTimeout;
00549
00551 CommandList _pendingCommands;
00552
00554 CommandCache _commandCache;
00555
00557 ConnectionDescriptionVector _connectionDescriptions;
00558
00560 std::string _programName;
00562 std::string _workDir;
00563
00565 class ReceiverThread : public base::Thread
00566 {
00567 public:
00568 ReceiverThread( Node* node )
00569 : _node( node )
00570 {}
00571
00572 virtual void* run(){ return _node->_runReceiverThread(); }
00573
00574 private:
00575 Node* _node;
00576 };
00577 ReceiverThread* _receiverThread;
00578
00580 class CommandThread : public base::Thread
00581 {
00582 public:
00583 CommandThread( Node* node )
00584 : _node( node )
00585 {}
00586
00587 virtual void* run(){ return _node->_runCommandThread(); }
00588
00589 private:
00590 Node* _node;
00591 };
00592 CommandThread* _commandThread;
00593
00595 bool _hasSendToken;
00596
00597 bool _connectSelf();
00598 EQ_EXPORT void _addConnection( ConnectionPtr connection );
00599 void _removeConnection( ConnectionPtr connection );
00600 void _cleanup();
00601
00602 void _dispatchCommand( Command& command );
00603
00612 bool _launch( NodePtr node, ConnectionDescriptionPtr description );
00613
00623 std::string _createLaunchCommand( NodePtr node,
00624 ConnectionDescriptionPtr description);
00625 std::string _createRemoteCommand( NodePtr node, const char quote );
00626
00633 NodePtr _findConnectedNode( const char* connectionDescription );
00634
00642 void _addSession( Session* session, NodePtr server,
00643 const uint32_t sessionID );
00644
00650 void _removeSession( Session* session );
00651
00653 uint32_t _generateSessionID();
00654
00655 NodePtr _connect( const NodeID& nodeID, NodePtr server );
00656
00657 void* _runReceiverThread();
00658 void _handleConnect();
00659 void _handleDisconnect();
00660 bool _handleData();
00661
00662 void* _runCommandThread();
00663 void _redispatchCommands();
00664
00666 CommandResult _cmdStop( Command& command );
00667 CommandResult _cmdRegisterSession( Command& command );
00668 CommandResult _cmdRegisterSessionReply( Command& command );
00669 CommandResult _cmdMapSession( Command& command );
00670 CommandResult _cmdMapSessionReply( Command& command );
00671 CommandResult _cmdUnmapSession( Command& command );
00672 CommandResult _cmdUnmapSessionReply( Command& command );
00673 CommandResult _cmdConnect( Command& command );
00674 CommandResult _cmdConnectReply( Command& command );
00675 CommandResult _cmdDisconnect( Command& command );
00676 CommandResult _cmdGetNodeData( Command& command );
00677 CommandResult _cmdGetNodeDataReply( Command& command );
00678 CommandResult _cmdAcquireSendToken( Command& command );
00679 CommandResult _cmdAcquireSendTokenReply( Command& command );
00680 CommandResult _cmdReleaseSendToken( Command& command );
00681
00682 CHECK_THREAD_DECLARE( _thread );
00683 };
00684
00685 inline std::ostream& operator << ( std::ostream& os, const Node* node )
00686 {
00687 if( node )
00688 os << "node " << node->getNodeID();
00689 else
00690 os << "NULL node";
00691
00692 return os;
00693 }
00694
00695 EQ_EXPORT std::ostream& operator << ( std::ostream& os,
00696 const Node::State state );
00697 }
00698 }
00699
00700 #endif // EQNET_NODE_H