00001
00002
00003
00004
00005 #include "deltaMasterCM.h"
00006
00007 #include "command.h"
00008 #include "commands.h"
00009 #include "log.h"
00010 #include "node.h"
00011 #include "object.h"
00012 #include "objectDeltaDataOStream.h"
00013 #include "packets.h"
00014 #include "session.h"
00015
00016 using namespace eq::base;
00017 using namespace std;
00018
00019 namespace eq
00020 {
00021 namespace net
00022 {
00023 DeltaMasterCM::DeltaMasterCM( Object* object )
00024 : _object( object ),
00025 _version( Object::VERSION_NONE ),
00026 _commitCount( 0 ),
00027 _nVersions( 0 ),
00028 _obsoleteFlags( Object::AUTO_OBSOLETE_COUNT_VERSIONS )
00029 {
00030 InstanceData* data = _newInstanceData();
00031 data->os.setVersion( 1 );
00032 data->os.enableSave();
00033
00034 data->os.enable();
00035 _object->getInstanceData( data->os );
00036 data->os.disable();
00037
00038 _instanceDatas.push_front( data );
00039 ++_version;
00040 ++_commitCount;
00041 }
00042
00043 DeltaMasterCM::~DeltaMasterCM()
00044 {
00045 if( !_slaves.empty( ))
00046 EQWARN << _slaves.size()
00047 << " slave nodes subscribed during deregisterObject" << endl;
00048 _slaves.clear();
00049
00050 for( std::deque< InstanceData* >::const_iterator i = _instanceDatas.begin();
00051 i != _instanceDatas.end(); ++i )
00052
00053 delete *i;
00054
00055 _instanceDatas.clear();
00056
00057 for( std::deque< DeltaData* >::const_iterator i = _deltaDatas.begin();
00058 i != _deltaDatas.end(); ++i )
00059
00060 delete *i;
00061
00062 _deltaDatas.clear();
00063
00064 for(std::vector<InstanceData*>::const_iterator i=_instanceDataCache.begin();
00065 i != _instanceDataCache.end(); ++i )
00066
00067 delete *i;
00068
00069 _instanceDataCache.clear();
00070
00071 for( std::vector<DeltaData*>::const_iterator i =_deltaDataCache.begin();
00072 i != _deltaDataCache.end(); ++i )
00073
00074 delete *i;
00075
00076 _deltaDataCache.clear();
00077 }
00078
00079 void DeltaMasterCM::notifyAttached()
00080 {
00081 Session* session = _object->getSession();
00082 EQASSERT( session );
00083 CommandQueue* queue = session->getCommandThreadQueue();
00084
00085 registerCommand( CMD_OBJECT_COMMIT,
00086 CommandFunc<DeltaMasterCM>( this, &DeltaMasterCM::_cmdCommit ),
00087 queue );
00088
00089 registerCommand( CMD_OBJECT_DELTA_DATA,
00090 CommandFunc<DeltaMasterCM>( this, &DeltaMasterCM::_cmdDiscard ),
00091 queue );
00092 registerCommand( CMD_OBJECT_DELTA,
00093 CommandFunc<DeltaMasterCM>( this, &DeltaMasterCM::_cmdDiscard ),
00094 queue );
00095 }
00096
00097 uint32_t DeltaMasterCM::commitNB()
00098 {
00099 EQASSERTINFO( _object->getChangeType() == Object::DELTA,
00100 "Object type " << typeid(*this).name( ));
00101
00102 ObjectCommitPacket packet;
00103 packet.instanceID = _object->_instanceID;
00104 packet.requestID = _requestHandler.registerRequest();
00105
00106 _object->send( _object->getLocalNode(), packet );
00107 return packet.requestID;
00108 }
00109
00110 uint32_t DeltaMasterCM::commitSync( const uint32_t commitID )
00111 {
00112 uint32_t version = Object::VERSION_NONE;
00113 _requestHandler.waitRequest( commitID, version );
00114 return version;
00115 }
00116
00117
00118
00119 void DeltaMasterCM::_obsolete()
00120 {
00121 if( _obsoleteFlags & Object::AUTO_OBSOLETE_COUNT_COMMITS )
00122 {
00123 InstanceData* lastInstanceData = _instanceDatas.back();
00124 if( lastInstanceData->commitCount < (_commitCount - _nVersions) &&
00125 _instanceDatas.size() > 1 )
00126 {
00127 EQASSERT( !_deltaDatas.empty( ));
00128 _instanceDataCache.push_back( lastInstanceData );
00129 _instanceDatas.pop_back();
00130
00131 _deltaDataCache.push_back( _deltaDatas.back( ));
00132 _deltaDatas.pop_back();
00133 }
00134 _checkConsistency();
00135 return;
00136 }
00137
00138 while( _instanceDatas.size() > (_nVersions+1) )
00139 {
00140 _instanceDataCache.push_back( _instanceDatas.back( ));
00141 _instanceDatas.pop_back();
00142
00143 if( _nVersions > 0 )
00144 {
00145 EQASSERT( !_deltaDatas.empty( ));
00146 _deltaDataCache.push_back( _deltaDatas.back( ));
00147 _deltaDatas.pop_back();
00148 }
00149 _checkConsistency();
00150 }
00151 }
00152
00153 uint32_t DeltaMasterCM::getOldestVersion() const
00154 {
00155 if( _version == Object::VERSION_NONE )
00156 return Object::VERSION_NONE;
00157
00158 return _instanceDatas.back()->os.getVersion();
00159 }
00160
00161 void DeltaMasterCM::addSlave( NodePtr node, const uint32_t instanceID,
00162 const uint32_t inVersion )
00163 {
00164 CHECK_THREAD( _thread );
00165 EQASSERT( _version != Object::VERSION_NONE );
00166 _checkConsistency();
00167
00168
00169 ++_slavesCount[ node->getNodeID() ];
00170 _slaves.push_back( node );
00171 stde::usort( _slaves );
00172
00173 const uint32_t version = (inVersion == Object::VERSION_NONE) ?
00174 getOldestVersion() : inVersion;
00175 EQLOG( LOG_OBJECTS ) << "Object id " << _object->_id << " v" << _version
00176 << ", instantiate on " << node->getNodeID()
00177 << " with v" << version << endl;
00178
00179 EQASSERT( version >= getOldestVersion( ));
00180
00181
00182 deque< InstanceData* >::reverse_iterator i = _instanceDatas.rbegin();
00183 while( (*i)->os.getVersion() < version && i != _instanceDatas.rend( ))
00184 ++i;
00185
00186 InstanceData* data = (i == _instanceDatas.rend( )) ?
00187 _instanceDatas.back() : *i;
00188 EQASSERT( data );
00189 EQASSERT( data->os.getVersion() <= version );
00190
00191 data->os.setInstanceID( instanceID );
00192 data->os.resend( node );
00193
00194 if( i == _instanceDatas.rend( ))
00195 return;
00196
00197
00198 const uint32_t deltaVersion = data->os.getVersion() + 1;
00199 for( deque< DeltaData* >::reverse_iterator j = _deltaDatas.rbegin();
00200 j != _deltaDatas.rend(); ++j )
00201 {
00202 DeltaData* deltaData = *j;
00203 if( deltaData->getVersion() < deltaVersion )
00204 continue;
00205
00206 deltaData->setInstanceID( instanceID );
00207 deltaData->resend( node );
00208 }
00209 }
00210
00211 void DeltaMasterCM::removeSlave( NodePtr node )
00212 {
00213 CHECK_THREAD( _thread );
00214 _checkConsistency();
00215
00216
00217 const NodeID& nodeID = node->getNodeID();
00218 EQASSERT( _slavesCount[ nodeID ] != 0 );
00219
00220 --_slavesCount[ nodeID ];
00221 if( _slavesCount[ nodeID ] == 0 )
00222 {
00223 NodeVector::iterator i = find( _slaves.begin(), _slaves.end(), node );
00224 EQASSERT( i != _slaves.end( ));
00225 _slaves.erase( i );
00226 _slavesCount.erase( nodeID );
00227 }
00228 }
00229
00230 void DeltaMasterCM::_checkConsistency() const
00231 {
00232 #ifndef NDEBUG
00233 EQASSERT( _object->_id != EQ_ID_INVALID );
00234 EQASSERT( _object->getChangeType() == Object::DELTA );
00235 if( _version == Object::VERSION_NONE )
00236 return;
00237
00238 EQASSERT( _instanceDatas.size() == _deltaDatas.size() + 1 );
00239
00240 if( !( _obsoleteFlags & Object::AUTO_OBSOLETE_COUNT_COMMITS ))
00241 {
00242 if( _version > _nVersions )
00243 {
00244 EQASSERT( _deltaDatas.size() == _nVersions );
00245 }
00246 else
00247 {
00248 EQASSERT( _instanceDatas.size() == _version );
00249 }
00250 }
00251
00252 uint32_t version = _version;
00253 for( deque< InstanceData* >::const_iterator i = _instanceDatas.begin();
00254 i != _instanceDatas.end(); ++i )
00255 {
00256 const InstanceData* data = *i;
00257 EQASSERT( data->os.getVersion() == version );
00258 EQASSERT( data->os.getVersion() > 0 );
00259 --version;
00260 }
00261
00262 version = _version;
00263 for( deque< DeltaData* >::const_iterator i = _deltaDatas.begin();
00264 i != _deltaDatas.end(); ++i )
00265 {
00266 const DeltaData* data = *i;
00267 EQASSERT( data->getVersion() == version );
00268 EQASSERT( data->getVersion() > 0 );
00269 --version;
00270 }
00271 #endif
00272 }
00273
00274
00275
00276
00277 DeltaMasterCM::InstanceData* DeltaMasterCM::_newInstanceData()
00278 {
00279 InstanceData* instanceData;
00280
00281 if( _instanceDataCache.empty( ))
00282 instanceData = new InstanceData( _object );
00283 else
00284 {
00285 instanceData = _instanceDataCache.back();
00286 _instanceDataCache.pop_back();
00287 }
00288
00289 instanceData->os.disableSave();
00290 instanceData->os.enableBuffering();
00291 instanceData->os.setInstanceID( EQ_ID_ANY );
00292 return instanceData;
00293 }
00294
00295 DeltaMasterCM::DeltaData* DeltaMasterCM::_newDeltaData()
00296 {
00297 DeltaData* deltaData;
00298
00299 if( _deltaDataCache.empty( ))
00300 deltaData = new DeltaData( _object );
00301 else
00302 {
00303 deltaData = _deltaDataCache.back();
00304 _deltaDataCache.pop_back();
00305 }
00306
00307 deltaData->disableSave();
00308 deltaData->enableBuffering();
00309 deltaData->setInstanceID( EQ_ID_ANY );
00310 return deltaData;
00311 }
00312
00313
00314
00315
00316
00317 CommandResult DeltaMasterCM::_cmdCommit( Command& command )
00318 {
00319 CHECK_THREAD( _thread );
00320 const ObjectCommitPacket* packet = command.getPacket<ObjectCommitPacket>();
00321 EQLOG( LOG_OBJECTS ) << "commit v" << _version << " " << command << endl;
00322
00323 EQASSERT( _version != Object::VERSION_NONE );
00324 EQASSERT( _instanceDatas.size() == _deltaDatas.size() + 1 );
00325
00326 ++_commitCount;
00327
00328 DeltaData* deltaData = _newDeltaData();
00329 const bool saveDelta = ( _nVersions > 0 );
00330 if( saveDelta ) deltaData->enableSave();
00331
00332 deltaData->setVersion( _version + 1 );
00333 deltaData->enable( _slaves );
00334 _object->pack( *deltaData );
00335 deltaData->disable();
00336
00337 if( !deltaData->hasSentData( ))
00338 {
00339 _deltaDataCache.push_back( deltaData );
00340 _obsolete();
00341 _checkConsistency();
00342
00343 _requestHandler.serveRequest( packet->requestID, _version );
00344 return COMMAND_HANDLED;
00345 }
00346
00347 ++_version;
00348 EQASSERT( _version );
00349
00350 if( saveDelta )
00351 _deltaDatas.push_front( deltaData );
00352 else
00353 _deltaDataCache.push_back( deltaData );
00354
00355
00356 InstanceData* instanceData = _newInstanceData();
00357 instanceData->os.enableSave();
00358 instanceData->os.setVersion( _version );
00359
00360 instanceData->os.enable();
00361 _object->getInstanceData( instanceData->os );
00362 instanceData->os.disable();
00363
00364 instanceData->commitCount = _commitCount;
00365 _instanceDatas.push_front( instanceData );
00366
00367 _obsolete();
00368 _checkConsistency();
00369
00370 EQLOG( LOG_OBJECTS ) << "Committed v" << _version << ", id "
00371 << _object->getID() << endl;
00372 _requestHandler.serveRequest( packet->requestID, _version );
00373 return COMMAND_HANDLED;
00374 }
00375 }
00376 }