fullMasterCM.cpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "fullMasterCM.h"
00019
00020 #include "command.h"
00021 #include "commands.h"
00022 #include "log.h"
00023 #include "node.h"
00024 #include "object.h"
00025 #include "packets.h"
00026 #include "session.h"
00027
00028 using namespace eq::base;
00029 using namespace std;
00030
00031 namespace eq
00032 {
00033 namespace net
00034 {
00035 typedef CommandFunc<FullMasterCM> CmdFunc;
00036
00037 FullMasterCM::FullMasterCM( Object* object )
00038 : _object( object ),
00039 _version( Object::VERSION_NONE ),
00040 _commitCount( 0 ),
00041 _nVersions( 0 ),
00042 _obsoleteFlags( Object::AUTO_OBSOLETE_COUNT_VERSIONS )
00043 {
00044 DeltaData* data = _newDeltaData();
00045 data->os.setVersion( 1 );
00046
00047 data->os.enable();
00048 _object->getInstanceData( data->os );
00049 data->os.disable();
00050
00051 _deltaDatas.push_front( data );
00052 ++_version;
00053 ++_commitCount;
00054
00055 registerCommand( CMD_OBJECT_COMMIT,
00056 CmdFunc( this, &FullMasterCM::_cmdCommit ), 0 );
00057
00058 registerCommand( CMD_OBJECT_DELTA_DATA,
00059 CmdFunc( this, &FullMasterCM::_cmdDiscard ), 0 );
00060 registerCommand( CMD_OBJECT_DELTA,
00061 CmdFunc( this, &FullMasterCM::_cmdDiscard ), 0 );
00062 }
00063
00064 FullMasterCM::~FullMasterCM()
00065 {
00066 if( !_slaves.empty( ))
00067 EQWARN << _slaves.size()
00068 << " slave nodes subscribed during deregisterObject of "
00069 << typeid( *_object ).name() << std::endl;
00070 _slaves.clear();
00071
00072 for( std::deque< DeltaData* >::const_iterator i = _deltaDatas.begin();
00073 i != _deltaDatas.end(); ++i )
00074
00075 delete *i;
00076
00077 _deltaDatas.clear();
00078
00079 for(std::vector<DeltaData*>::const_iterator i=_deltaDataCache.begin();
00080 i != _deltaDataCache.end(); ++i )
00081
00082 delete *i;
00083
00084 _deltaDataCache.clear();
00085 }
00086
00087 uint32_t FullMasterCM::commitNB()
00088 {
00089 EQASSERTINFO( _object->getChangeType() == Object::INSTANCE,
00090 "Object type " << typeid(*this).name( ));
00091
00092 ObjectCommitPacket packet;
00093 packet.instanceID = _object->_instanceID;
00094 packet.requestID = _requestHandler.registerRequest();
00095
00096 _object->send( _object->getLocalNode(), packet );
00097 return packet.requestID;
00098 }
00099
00100 uint32_t FullMasterCM::commitSync( const uint32_t commitID )
00101 {
00102 uint32_t version = Object::VERSION_NONE;
00103 _requestHandler.waitRequest( commitID, version );
00104 return version;
00105 }
00106
00107
00108
00109 void FullMasterCM::_obsolete()
00110 {
00111 if( _obsoleteFlags & Object::AUTO_OBSOLETE_COUNT_COMMITS )
00112 {
00113 DeltaData* lastDeltaData = _deltaDatas.back();
00114 if( lastDeltaData->commitCount < (_commitCount - _nVersions) &&
00115 _deltaDatas.size() > 1 )
00116 {
00117 _deltaDataCache.push_back( lastDeltaData );
00118 _deltaDatas.pop_back();
00119 }
00120 _checkConsistency();
00121 return;
00122 }
00123
00124 while( _deltaDatas.size() > (_nVersions+1) )
00125 {
00126 _deltaDataCache.push_back( _deltaDatas.back( ));
00127 _deltaDatas.pop_back();
00128 _checkConsistency();
00129 }
00130 }
00131
00132 uint32_t FullMasterCM::getOldestVersion() const
00133 {
00134 if( _version == Object::VERSION_NONE )
00135 return Object::VERSION_NONE;
00136
00137 return _deltaDatas.back()->os.getVersion();
00138 }
00139
00140 void FullMasterCM::addSlave( NodePtr node, const uint32_t instanceID,
00141 const uint32_t inVersion )
00142 {
00143 CHECK_THREAD( _thread );
00144 EQASSERT( _version != Object::VERSION_NONE );
00145 _checkConsistency();
00146
00147
00148 ++_slavesCount[ node->getNodeID() ];
00149 _slaves.push_back( node );
00150 stde::usort( _slaves );
00151
00152 if( inVersion == Object::VERSION_NONE )
00153 {
00154 ObjectInstancePacket instPacket;
00155 instPacket.instanceID = instanceID;
00156 instPacket.dataSize = 0;
00157 instPacket.version = _version;
00158 instPacket.sequence = 0;
00159
00160 _object->send( node, instPacket );
00161 return;
00162 }
00163
00164 const uint32_t version = (inVersion == Object::VERSION_OLDEST) ?
00165 getOldestVersion() : inVersion;
00166 EQLOG( LOG_OBJECTS ) << "Object id " << _object->_id << " v" << _version
00167 << ", instantiate on " << node->getNodeID()
00168 << " with v" << version << endl;
00169
00170 EQASSERT( _object->getChangeType() == Object::INSTANCE );
00171 EQASSERT( version >= getOldestVersion( ));
00172
00173 deque< DeltaData* >::reverse_iterator i = _deltaDatas.rbegin();
00174 while( (*i)->os.getVersion() < version && i != _deltaDatas.rend( ))
00175 ++i;
00176
00177 const DeltaData* data = (i == _deltaDatas.rend()) ? _deltaDatas.back() : *i;
00178
00179
00180 const Bufferb& buffer = data->os.getSaveBuffer();
00181 ObjectInstancePacket instPacket;
00182 instPacket.instanceID = instanceID;
00183 instPacket.dataSize = buffer.getSize();
00184 instPacket.version = data->os.getVersion();
00185 instPacket.sequence = 0;
00186
00187 _object->send( node, instPacket, buffer.getData(), buffer.getSize() );
00188
00189 if( i == _deltaDatas.rend( ))
00190 return;
00191
00192
00193 for( ++i; i != _deltaDatas.rend(); ++i )
00194 {
00195 DeltaData* deltaData = *i;
00196 deltaData->os.setInstanceID( instanceID );
00197 deltaData->os.resend( node );
00198 EQASSERT( ++instPacket.version == deltaData->os.getVersion( ));
00199 }
00200 EQASSERT( instPacket.version == _version );
00201 }
00202
00203 void FullMasterCM::removeSlave( NodePtr node )
00204 {
00205 CHECK_THREAD( _thread );
00206 _checkConsistency();
00207
00208
00209 const NodeID& nodeID = node->getNodeID();
00210 EQASSERT( _slavesCount[ nodeID ] != 0 );
00211
00212 --_slavesCount[ nodeID ];
00213 if( _slavesCount[ nodeID ] == 0 )
00214 {
00215 NodeVector::iterator i = find( _slaves.begin(), _slaves.end(), node );
00216 EQASSERT( i != _slaves.end( ));
00217 _slaves.erase( i );
00218 _slavesCount.erase( nodeID );
00219 }
00220 }
00221
00222 void FullMasterCM::addOldMaster( NodePtr node, const uint32_t instanceID )
00223 {
00224 EQASSERT( _version != Object::VERSION_NONE );
00225
00226
00227 ++_slavesCount[ node->getNodeID() ];
00228 _slaves.push_back( node );
00229 stde::usort( _slaves );
00230
00231 ObjectVersionPacket packet;
00232 packet.instanceID = instanceID;
00233 packet.version = _version;
00234 _object->send( node, packet );
00235 }
00236
00237 void FullMasterCM::_checkConsistency() const
00238 {
00239 #ifndef NDEBUG
00240 EQASSERT( _object->_id != EQ_ID_INVALID );
00241 EQASSERT( _object->getChangeType() == Object::INSTANCE );
00242
00243 if( _version == Object::VERSION_NONE )
00244 return;
00245
00246 if( !( _obsoleteFlags & Object::AUTO_OBSOLETE_COUNT_COMMITS ))
00247 {
00248 if( _version <= _nVersions )
00249 EQASSERT( _deltaDatas.size() == _version );
00250 }
00251 #endif
00252 }
00253
00254
00255
00256
00257 FullMasterCM::DeltaData* FullMasterCM::_newDeltaData()
00258 {
00259 DeltaData* deltaData;
00260
00261 if( _deltaDataCache.empty( ))
00262 deltaData = new DeltaData( _object );
00263 else
00264 {
00265 deltaData = _deltaDataCache.back();
00266 _deltaDataCache.pop_back();
00267 }
00268
00269 deltaData->os.enableSave();
00270 deltaData->os.enableBuffering();
00271 deltaData->os.setInstanceID( EQ_ID_ANY );
00272 return deltaData;
00273 }
00274
00275
00276
00277
00278 CommandResult FullMasterCM::_cmdCommit( Command& command )
00279 {
00280 CHECK_THREAD( _thread );
00281 const ObjectCommitPacket* packet = command.getPacket<ObjectCommitPacket>();
00282 EQLOG( LOG_OBJECTS ) << "commit v" << _version << " " << command << endl;
00283
00284 EQASSERT( _version != Object::VERSION_NONE );
00285
00286 ++_commitCount;
00287
00288 DeltaData* deltaData = _newDeltaData();
00289
00290 deltaData->commitCount = _commitCount;
00291 deltaData->os.setVersion( _version + 1 );
00292
00293 deltaData->os.enable( _slaves );
00294 _object->pack( deltaData->os );
00295 deltaData->os.disable();
00296
00297 if( deltaData->os.hasSentData( ))
00298 {
00299 ++_version;
00300 EQASSERT( _version );
00301
00302 _deltaDatas.push_front( deltaData );
00303 EQLOG( LOG_OBJECTS ) << "Committed v" << _version << ", id "
00304 << _object->getID() << endl;
00305 }
00306 else
00307 _deltaDataCache.push_back( deltaData );
00308
00309 _obsolete();
00310 _checkConsistency();
00311 _requestHandler.serveRequest( packet->requestID, _version );
00312 return COMMAND_HANDLED;
00313 }
00314
00315 }
00316 }