deltaMasterCM.cpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "deltaMasterCM.h"
00019
00020 #include "command.h"
00021 #include "commands.h"
00022 #include "log.h"
00023 #include "node.h"
00024 #include "object.h"
00025 #include "objectDeltaDataOStream.h"
00026 #include "packets.h"
00027 #include "session.h"
00028
00029 using namespace eq::base;
00030 using namespace std;
00031
00032 namespace eq
00033 {
00034 namespace net
00035 {
00036 DeltaMasterCM::DeltaMasterCM( Object* object )
00037 : _object( object ),
00038 _version( Object::VERSION_NONE ),
00039 _commitCount( 0 ),
00040 _nVersions( 0 ),
00041 _obsoleteFlags( Object::AUTO_OBSOLETE_COUNT_VERSIONS )
00042 {
00043 InstanceData* data = _newInstanceData();
00044 data->os.setVersion( 1 );
00045 data->os.enableSave();
00046
00047 data->os.enable();
00048 _object->getInstanceData( data->os );
00049 data->os.disable();
00050
00051 _instanceDatas.push_front( data );
00052 ++_version;
00053 ++_commitCount;
00054
00055 registerCommand( CMD_OBJECT_COMMIT,
00056 CommandFunc<DeltaMasterCM>( this, &DeltaMasterCM::_cmdCommit ),
00057 0 );
00058
00059 registerCommand( CMD_OBJECT_DELTA_DATA,
00060 CommandFunc<DeltaMasterCM>( this, &DeltaMasterCM::_cmdDiscard ),
00061 0 );
00062 registerCommand( CMD_OBJECT_DELTA,
00063 CommandFunc<DeltaMasterCM>( this, &DeltaMasterCM::_cmdDiscard ),
00064 0 );
00065 }
00066
00067 DeltaMasterCM::~DeltaMasterCM()
00068 {
00069 if( !_slaves.empty( ))
00070 EQWARN << _slaves.size()
00071 << " slave nodes subscribed during deregisterObject" << endl;
00072 _slaves.clear();
00073
00074 for( std::deque< InstanceData* >::const_iterator i = _instanceDatas.begin();
00075 i != _instanceDatas.end(); ++i )
00076
00077 delete *i;
00078
00079 _instanceDatas.clear();
00080
00081 for( std::deque< DeltaData* >::const_iterator i = _deltaDatas.begin();
00082 i != _deltaDatas.end(); ++i )
00083
00084 delete *i;
00085
00086 _deltaDatas.clear();
00087
00088 for(std::vector<InstanceData*>::const_iterator i=_instanceDataCache.begin();
00089 i != _instanceDataCache.end(); ++i )
00090
00091 delete *i;
00092
00093 _instanceDataCache.clear();
00094
00095 for( std::vector<DeltaData*>::const_iterator i =_deltaDataCache.begin();
00096 i != _deltaDataCache.end(); ++i )
00097
00098 delete *i;
00099
00100 _deltaDataCache.clear();
00101 }
00102
00103 uint32_t DeltaMasterCM::commitNB()
00104 {
00105 EQASSERTINFO( _object->getChangeType() == Object::DELTA,
00106 "Object type " << typeid(*this).name( ));
00107
00108 ObjectCommitPacket packet;
00109 packet.instanceID = _object->_instanceID;
00110 packet.requestID = _requestHandler.registerRequest();
00111
00112 _object->send( _object->getLocalNode(), packet );
00113 return packet.requestID;
00114 }
00115
00116 uint32_t DeltaMasterCM::commitSync( const uint32_t commitID )
00117 {
00118 uint32_t version = Object::VERSION_NONE;
00119 _requestHandler.waitRequest( commitID, version );
00120 return version;
00121 }
00122
00123
00124
00125 void DeltaMasterCM::_obsolete()
00126 {
00127 if( _obsoleteFlags & Object::AUTO_OBSOLETE_COUNT_COMMITS )
00128 {
00129 InstanceData* lastInstanceData = _instanceDatas.back();
00130 if( lastInstanceData->commitCount < (_commitCount - _nVersions) &&
00131 _instanceDatas.size() > 1 )
00132 {
00133 EQASSERT( !_deltaDatas.empty( ));
00134 _instanceDataCache.push_back( lastInstanceData );
00135 _instanceDatas.pop_back();
00136
00137 _deltaDataCache.push_back( _deltaDatas.back( ));
00138 _deltaDatas.pop_back();
00139 }
00140 _checkConsistency();
00141 return;
00142 }
00143
00144 while( _instanceDatas.size() > (_nVersions+1) )
00145 {
00146 _instanceDataCache.push_back( _instanceDatas.back( ));
00147 _instanceDatas.pop_back();
00148
00149 if( _nVersions > 0 )
00150 {
00151 EQASSERT( !_deltaDatas.empty( ));
00152 _deltaDataCache.push_back( _deltaDatas.back( ));
00153 _deltaDatas.pop_back();
00154 }
00155 _checkConsistency();
00156 }
00157 }
00158
00159 uint32_t DeltaMasterCM::getOldestVersion() const
00160 {
00161 if( _version == Object::VERSION_NONE )
00162 return Object::VERSION_NONE;
00163
00164 return _instanceDatas.back()->os.getVersion();
00165 }
00166
00167 void DeltaMasterCM::addSlave( NodePtr node, const uint32_t instanceID,
00168 const uint32_t inVersion )
00169 {
00170 CHECK_THREAD( _thread );
00171 EQASSERT( _version != Object::VERSION_NONE );
00172 _checkConsistency();
00173
00174
00175 ++_slavesCount[ node->getNodeID() ];
00176 _slaves.push_back( node );
00177 stde::usort( _slaves );
00178
00179 if( inVersion == Object::VERSION_NONE )
00180 {
00181 ObjectInstancePacket instPacket;
00182 instPacket.instanceID = instanceID;
00183 instPacket.dataSize = 0;
00184 instPacket.version = _version;
00185 instPacket.sequence = 0;
00186
00187 _object->send( node, instPacket );
00188 return;
00189 }
00190
00191 const uint32_t version = (inVersion == Object::VERSION_OLDEST) ?
00192 getOldestVersion() : inVersion;
00193 EQLOG( LOG_OBJECTS ) << "Object id " << _object->_id << " v" << _version
00194 << ", instantiate on " << node->getNodeID()
00195 << " with v" << version << endl;
00196
00197 EQASSERT( version >= getOldestVersion( ));
00198
00199
00200 deque< InstanceData* >::reverse_iterator i = _instanceDatas.rbegin();
00201 while( (*i)->os.getVersion() < version && i != _instanceDatas.rend( ))
00202 ++i;
00203
00204 InstanceData* data = (i == _instanceDatas.rend( )) ?
00205 _instanceDatas.back() : *i;
00206 EQASSERT( data );
00207 EQASSERT( data->os.getVersion() <= version );
00208
00209 data->os.setInstanceID( instanceID );
00210 data->os.resend( node );
00211
00212 if( i == _instanceDatas.rend( ))
00213 return;
00214
00215
00216 const uint32_t deltaVersion = data->os.getVersion() + 1;
00217 for( deque< DeltaData* >::reverse_iterator j = _deltaDatas.rbegin();
00218 j != _deltaDatas.rend(); ++j )
00219 {
00220 DeltaData* deltaData = *j;
00221 if( deltaData->getVersion() < deltaVersion )
00222 continue;
00223
00224 deltaData->setInstanceID( instanceID );
00225 deltaData->resend( node );
00226 }
00227 }
00228
00229 void DeltaMasterCM::removeSlave( NodePtr node )
00230 {
00231 CHECK_THREAD( _thread );
00232 _checkConsistency();
00233
00234
00235 const NodeID& nodeID = node->getNodeID();
00236 EQASSERT( _slavesCount[ nodeID ] != 0 );
00237
00238 --_slavesCount[ nodeID ];
00239 if( _slavesCount[ nodeID ] == 0 )
00240 {
00241 NodeVector::iterator i = find( _slaves.begin(), _slaves.end(), node );
00242 EQASSERT( i != _slaves.end( ));
00243 _slaves.erase( i );
00244 _slavesCount.erase( nodeID );
00245 }
00246 }
00247
00248 void DeltaMasterCM::addOldMaster( NodePtr node, const uint32_t instanceID )
00249 {
00250 EQASSERT( _version != Object::VERSION_NONE );
00251
00252
00253 ++_slavesCount[ node->getNodeID() ];
00254 _slaves.push_back( node );
00255 stde::usort( _slaves );
00256
00257 ObjectVersionPacket packet;
00258 packet.instanceID = instanceID;
00259 packet.version = _version;
00260 _object->send( node, packet );
00261 }
00262
00263 void DeltaMasterCM::_checkConsistency() const
00264 {
00265 #ifndef NDEBUG
00266 EQASSERT( _object->_id != EQ_ID_INVALID );
00267 EQASSERT( _object->getChangeType() == Object::DELTA );
00268 if( _version == Object::VERSION_NONE )
00269 return;
00270
00271 EQASSERT( _instanceDatas.size() == _deltaDatas.size() + 1 );
00272
00273 if( !( _obsoleteFlags & Object::AUTO_OBSOLETE_COUNT_COMMITS ))
00274 {
00275 if( _version > _nVersions )
00276 {
00277 EQASSERT( _deltaDatas.size() == _nVersions );
00278 }
00279 else
00280 {
00281 EQASSERT( _instanceDatas.size() == _version );
00282 }
00283 }
00284
00285 uint32_t version = _version;
00286 for( deque< InstanceData* >::const_iterator i = _instanceDatas.begin();
00287 i != _instanceDatas.end(); ++i )
00288 {
00289 const InstanceData* data = *i;
00290 EQASSERT( data->os.getVersion() == version );
00291 EQASSERT( data->os.getVersion() > 0 );
00292 --version;
00293 }
00294
00295 version = _version;
00296 for( deque< DeltaData* >::const_iterator i = _deltaDatas.begin();
00297 i != _deltaDatas.end(); ++i )
00298 {
00299 const DeltaData* data = *i;
00300 EQASSERT( data->getVersion() == version );
00301 EQASSERT( data->getVersion() > 0 );
00302 --version;
00303 }
00304 #endif
00305 }
00306
00307
00308
00309
00310 DeltaMasterCM::InstanceData* DeltaMasterCM::_newInstanceData()
00311 {
00312 InstanceData* instanceData;
00313
00314 if( _instanceDataCache.empty( ))
00315 instanceData = new InstanceData( _object );
00316 else
00317 {
00318 instanceData = _instanceDataCache.back();
00319 _instanceDataCache.pop_back();
00320 }
00321
00322 instanceData->os.disableSave();
00323 instanceData->os.enableBuffering();
00324 instanceData->os.setInstanceID( EQ_ID_ANY );
00325 return instanceData;
00326 }
00327
00328 DeltaMasterCM::DeltaData* DeltaMasterCM::_newDeltaData()
00329 {
00330 DeltaData* deltaData;
00331
00332 if( _deltaDataCache.empty( ))
00333 deltaData = new DeltaData( _object );
00334 else
00335 {
00336 deltaData = _deltaDataCache.back();
00337 _deltaDataCache.pop_back();
00338 }
00339
00340 deltaData->disableSave();
00341 deltaData->enableBuffering();
00342 deltaData->setInstanceID( EQ_ID_ANY );
00343 return deltaData;
00344 }
00345
00346
00347
00348
00349
00350 CommandResult DeltaMasterCM::_cmdCommit( Command& command )
00351 {
00352 CHECK_THREAD( _thread );
00353 const ObjectCommitPacket* packet = command.getPacket<ObjectCommitPacket>();
00354 EQLOG( LOG_OBJECTS ) << "commit v" << _version << " " << command << endl;
00355
00356 EQASSERT( _version != Object::VERSION_NONE );
00357 EQASSERT( _instanceDatas.size() == _deltaDatas.size() + 1 );
00358
00359 ++_commitCount;
00360
00361 DeltaData* deltaData = _newDeltaData();
00362 const bool saveDelta = ( _nVersions > 0 );
00363 if( saveDelta ) deltaData->enableSave();
00364
00365 deltaData->setVersion( _version + 1 );
00366 deltaData->enable( _slaves );
00367 _object->pack( *deltaData );
00368 deltaData->disable();
00369
00370 if( !deltaData->hasSentData( ))
00371 {
00372 _deltaDataCache.push_back( deltaData );
00373 _obsolete();
00374 _checkConsistency();
00375
00376 _requestHandler.serveRequest( packet->requestID, _version );
00377 return COMMAND_HANDLED;
00378 }
00379
00380 ++_version;
00381 EQASSERT( _version );
00382
00383 if( saveDelta )
00384 _deltaDatas.push_front( deltaData );
00385 else
00386 _deltaDataCache.push_back( deltaData );
00387
00388
00389 InstanceData* instanceData = _newInstanceData();
00390 instanceData->os.enableSave();
00391 instanceData->os.setVersion( _version );
00392
00393 instanceData->os.enable();
00394 _object->getInstanceData( instanceData->os );
00395 instanceData->os.disable();
00396
00397 instanceData->commitCount = _commitCount;
00398 _instanceDatas.push_front( instanceData );
00399
00400 _obsolete();
00401 _checkConsistency();
00402
00403 EQLOG( LOG_OBJECTS ) << "Committed v" << _version << ", id "
00404 << _object->getID() << endl;
00405 _requestHandler.serveRequest( packet->requestID, _version );
00406 return COMMAND_HANDLED;
00407 }
00408 }
00409 }