unbufferedMasterCM.cpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "unbufferedMasterCM.h"
00019
00020 #include "command.h"
00021 #include "commands.h"
00022 #include "log.h"
00023 #include "node.h"
00024 #include "object.h"
00025 #include "objectDeltaDataOStream.h"
00026 #include "packets.h"
00027 #include "session.h"
00028
00029 using namespace eq::base;
00030 using namespace std;
00031
00032 namespace eq
00033 {
00034 namespace net
00035 {
00036 UnbufferedMasterCM::UnbufferedMasterCM( Object* object )
00037 : _object( object )
00038 , _version( Object::VERSION_NONE )
00039 {
00040 registerCommand( CMD_OBJECT_COMMIT,
00041 CommandFunc<UnbufferedMasterCM>( this, &UnbufferedMasterCM::_cmdCommit ),
00042 0 );
00043
00044 registerCommand( CMD_OBJECT_DELTA_DATA,
00045 CommandFunc<UnbufferedMasterCM>( this, &UnbufferedMasterCM::_cmdDiscard ),
00046 0 );
00047 registerCommand( CMD_OBJECT_DELTA,
00048 CommandFunc<UnbufferedMasterCM>( this, &UnbufferedMasterCM::_cmdDiscard ),
00049 0 );
00050 }
00051
00052 UnbufferedMasterCM::~UnbufferedMasterCM()
00053 {
00054 if( !_slaves.empty( ))
00055 EQWARN << _slaves.size()
00056 << " slave nodes subscribed during deregisterObject of "
00057 << _object->getID() << '.' << _object->getInstanceID() << endl;
00058 _slaves.clear();
00059 }
00060
00061 uint32_t UnbufferedMasterCM::commitNB()
00062 {
00063 EQASSERTINFO( _object->getChangeType() == Object::UNBUFFERED,
00064 "Object type " << typeid(*this).name( ));
00065
00066 ObjectCommitPacket packet;
00067 packet.instanceID = _object->_instanceID;
00068 packet.requestID = _requestHandler.registerRequest();
00069
00070 _object->send( _object->getLocalNode(), packet );
00071 return packet.requestID;
00072 }
00073
00074 uint32_t UnbufferedMasterCM::commitSync( const uint32_t commitID )
00075 {
00076 uint32_t version = Object::VERSION_NONE;
00077 _requestHandler.waitRequest( commitID, version );
00078 return version;
00079 }
00080
00081 void UnbufferedMasterCM::addSlave( NodePtr node, const uint32_t instanceID,
00082 const uint32_t version )
00083 {
00084 CHECK_THREAD( _thread );
00085 EQASSERT( version == Object::VERSION_OLDEST ||
00086 version == Object::VERSION_NONE ||
00087 version == _version );
00088
00089
00090 ++_slavesCount[ node->getNodeID() ];
00091 _slaves.push_back( node );
00092 stde::usort( _slaves );
00093
00094 EQLOG( LOG_OBJECTS ) << "Object id " << _object->_id << " v" << _version
00095 << ", instantiate on " << node->getNodeID() << endl;
00096
00097
00098 ObjectInstanceDataOStream os( _object );
00099 os.setVersion( _version );
00100 os.setInstanceID( instanceID );
00101
00102 if( version != Object::VERSION_NONE )
00103 {
00104 os.enable( node );
00105 _object->getInstanceData( os );
00106 os.disable();
00107 }
00108
00109 if( !os.hasSentData( ))
00110 {
00111 os.enable( node );
00112 os.writeOnce( 0, 0 );
00113 os.disable();
00114 }
00115 }
00116
00117 void UnbufferedMasterCM::removeSlave( NodePtr node )
00118 {
00119 CHECK_THREAD( _thread );
00120
00121 const NodeID& nodeID = node->getNodeID();
00122 EQASSERT( _slavesCount[ nodeID ] != 0 );
00123
00124 --_slavesCount[ nodeID ];
00125 if( _slavesCount[ nodeID ] == 0 )
00126 {
00127 NodeVector::iterator i = find( _slaves.begin(), _slaves.end(), node );
00128 EQASSERT( i != _slaves.end( ));
00129 _slaves.erase( i );
00130 _slavesCount.erase( nodeID );
00131 }
00132 }
00133
00134 void UnbufferedMasterCM::addOldMaster( NodePtr node, const uint32_t instanceID )
00135 {
00136
00137 ++_slavesCount[ node->getNodeID() ];
00138 _slaves.push_back( node );
00139 stde::usort( _slaves );
00140
00141 ObjectVersionPacket packet;
00142 packet.instanceID = instanceID;
00143 packet.version = _version;
00144 _object->send( node, packet );
00145 }
00146
00147
00148
00149
00150
00151 CommandResult UnbufferedMasterCM::_cmdCommit( Command& command )
00152 {
00153 CHECK_THREAD( _thread );
00154 const ObjectCommitPacket* packet = command.getPacket<ObjectCommitPacket>();
00155 EQLOG( LOG_OBJECTS ) << "commit v" << _version << " " << command << endl;
00156 if( _slaves.empty( ))
00157 {
00158 _requestHandler.serveRequest( packet->requestID, _version );
00159 return COMMAND_HANDLED;
00160 }
00161
00162 ObjectDeltaDataOStream os( _object );
00163 os.setVersion( _version + 1 );
00164 os.enable( _slaves );
00165 _object->pack( os );
00166 os.disable();
00167
00168 if( os.hasSentData( ))
00169 {
00170 ++_version;
00171 EQASSERT( _version );
00172 EQLOG( LOG_OBJECTS ) << "Committed v" << _version << ", id "
00173 << _object->getID() << endl;
00174 }
00175
00176 _requestHandler.serveRequest( packet->requestID, _version );
00177 return COMMAND_HANDLED;
00178 }
00179 }
00180 }