deltaMasterCM.cpp

00001 
00002 /* Copyright (c) 2007-2009, Stefan Eilemann <eile@equalizergraphics.com> 
00003  *
00004  * This library is free software; you can redistribute it and/or modify it under
00005  * the terms of the GNU Lesser General Public License version 2.1 as published
00006  * by the Free Software Foundation.
00007  *  
00008  * This library is distributed in the hope that it will be useful, but WITHOUT
00009  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00010  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00011  * details.
00012  * 
00013  * You should have received a copy of the GNU Lesser General Public License
00014  * along with this library; if not, write to the Free Software Foundation, Inc.,
00015  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00016  */
00017 
00018 #include "deltaMasterCM.h"
00019 
00020 #include "command.h"
00021 #include "commands.h"
00022 #include "log.h"
00023 #include "node.h"
00024 #include "object.h"
00025 #include "objectDeltaDataOStream.h"
00026 #include "packets.h"
00027 #include "session.h"
00028 
00029 using namespace eq::base;
00030 using namespace std;
00031 
00032 namespace eq
00033 {
00034 namespace net
00035 {
00036 DeltaMasterCM::DeltaMasterCM( Object* object )
00037         : _object( object ),
00038           _version( Object::VERSION_NONE ),
00039           _commitCount( 0 ),
00040           _nVersions( 0 ),
00041           _obsoleteFlags( Object::AUTO_OBSOLETE_COUNT_VERSIONS )
00042 {
00043     InstanceData* data = _newInstanceData();
00044     data->os.setVersion( 1 );
00045     data->os.enableSave();
00046 
00047     data->os.enable();
00048     _object->getInstanceData( data->os );
00049     data->os.disable();
00050         
00051     _instanceDatas.push_front( data );
00052     ++_version;
00053     ++_commitCount;
00054 
00055     registerCommand( CMD_OBJECT_COMMIT, 
00056                  CommandFunc<DeltaMasterCM>( this, &DeltaMasterCM::_cmdCommit ),
00057                      0 );
00058     // sync commands are send to all instances, even the master gets it
00059     registerCommand( CMD_OBJECT_DELTA_DATA, 
00060                 CommandFunc<DeltaMasterCM>( this, &DeltaMasterCM::_cmdDiscard ),
00061                      0 );
00062     registerCommand( CMD_OBJECT_DELTA, 
00063                 CommandFunc<DeltaMasterCM>( this, &DeltaMasterCM::_cmdDiscard ),
00064                      0 );
00065 }
00066 
00067 DeltaMasterCM::~DeltaMasterCM()
00068 {
00069     if( !_slaves.empty( ))
00070         EQWARN << _slaves.size() 
00071                << " slave nodes subscribed during deregisterObject" << endl;
00072     _slaves.clear();
00073 
00074     for( std::deque< InstanceData* >::const_iterator i = _instanceDatas.begin();
00075          i != _instanceDatas.end(); ++i )
00076 
00077         delete *i;
00078 
00079     _instanceDatas.clear();
00080 
00081     for( std::deque< DeltaData* >::const_iterator i = _deltaDatas.begin();
00082          i != _deltaDatas.end(); ++i )
00083 
00084         delete *i;
00085 
00086     _deltaDatas.clear();
00087 
00088     for(std::vector<InstanceData*>::const_iterator i=_instanceDataCache.begin();
00089          i != _instanceDataCache.end(); ++i )
00090 
00091         delete *i;
00092     
00093     _instanceDataCache.clear();
00094 
00095     for( std::vector<DeltaData*>::const_iterator i =_deltaDataCache.begin();
00096          i != _deltaDataCache.end(); ++i )
00097 
00098         delete *i;
00099     
00100     _deltaDataCache.clear();
00101 }
00102 
00103 uint32_t DeltaMasterCM::commitNB()
00104 {
00105     EQASSERTINFO( _object->getChangeType() == Object::DELTA,
00106                   "Object type " << typeid(*this).name( ));
00107 
00108     ObjectCommitPacket packet;
00109     packet.instanceID = _object->_instanceID;
00110     packet.requestID  = _requestHandler.registerRequest();
00111 
00112     _object->send( _object->getLocalNode(), packet );
00113     return packet.requestID;
00114 }
00115 
00116 uint32_t DeltaMasterCM::commitSync( const uint32_t commitID )
00117 {
00118     uint32_t version = Object::VERSION_NONE;
00119     _requestHandler.waitRequest( commitID, version );
00120     return version;
00121 }
00122 
00123 // Obsoletes old changes based on number of commits or number of versions,
00124 // depending on the obsolete flags.
00125 void DeltaMasterCM::_obsolete()
00126 {
00127     if( _obsoleteFlags & Object::AUTO_OBSOLETE_COUNT_COMMITS )
00128     {
00129         InstanceData* lastInstanceData = _instanceDatas.back();
00130         if( lastInstanceData->commitCount < (_commitCount - _nVersions) &&
00131             _instanceDatas.size() > 1 )
00132         {
00133             EQASSERT( !_deltaDatas.empty( ));
00134             _instanceDataCache.push_back( lastInstanceData );
00135             _instanceDatas.pop_back();
00136 
00137             _deltaDataCache.push_back( _deltaDatas.back( ));        
00138             _deltaDatas.pop_back();
00139         }
00140         _checkConsistency();
00141         return;
00142     }
00143     // else count versions
00144     while( _instanceDatas.size() > (_nVersions+1) )
00145     {
00146         _instanceDataCache.push_back( _instanceDatas.back( ));
00147         _instanceDatas.pop_back();
00148         
00149         if( _nVersions > 0 )
00150         {
00151             EQASSERT( !_deltaDatas.empty( ));
00152             _deltaDataCache.push_back( _deltaDatas.back( ));        
00153             _deltaDatas.pop_back();
00154         }
00155         _checkConsistency();
00156     }
00157 }
00158 
00159 uint32_t DeltaMasterCM::getOldestVersion() const
00160 {
00161     if( _version == Object::VERSION_NONE )
00162         return Object::VERSION_NONE;
00163 
00164     return _instanceDatas.back()->os.getVersion();
00165 }
00166 
00167 void DeltaMasterCM::addSlave( NodePtr node, const uint32_t instanceID,
00168                               const uint32_t inVersion )
00169 {
00170     CHECK_THREAD( _thread );
00171     EQASSERT( _version != Object::VERSION_NONE );
00172     _checkConsistency();
00173 
00174     // add to subscribers
00175     ++_slavesCount[ node->getNodeID() ];
00176     _slaves.push_back( node );
00177     stde::usort( _slaves );
00178 
00179     if( inVersion == Object::VERSION_NONE ) // no data to send
00180     {
00181         ObjectInstancePacket instPacket;
00182         instPacket.instanceID = instanceID;
00183         instPacket.dataSize   = 0;
00184         instPacket.version    = _version;
00185         instPacket.sequence   = 0;
00186 
00187         _object->send( node, instPacket );
00188         return;
00189     }
00190 
00191     const uint32_t version = (inVersion == Object::VERSION_OLDEST) ?
00192                                  getOldestVersion() : inVersion;
00193     EQLOG( LOG_OBJECTS ) << "Object id " << _object->_id << " v" << _version
00194                          << ", instantiate on " << node->getNodeID() 
00195                          << " with v" << version << endl;
00196 
00197     EQASSERT( version >= getOldestVersion( ));
00198 
00199     // send initial instance data
00200     deque< InstanceData* >::reverse_iterator i = _instanceDatas.rbegin();
00201     while( (*i)->os.getVersion() < version && i != _instanceDatas.rend( ))
00202         ++i;
00203 
00204     InstanceData* data = (i == _instanceDatas.rend( )) ? 
00205                              _instanceDatas.back() : *i;
00206     EQASSERT( data );
00207     EQASSERT( data->os.getVersion() <= version );
00208 
00209     data->os.setInstanceID( instanceID );
00210     data->os.resend( node );
00211 
00212     if( i == _instanceDatas.rend( ))
00213         return;
00214 
00215     // send all deltas since initial instance data
00216     const uint32_t deltaVersion = data->os.getVersion() + 1;
00217     for( deque< DeltaData* >::reverse_iterator j = _deltaDatas.rbegin();
00218          j != _deltaDatas.rend(); ++j )
00219     {
00220         DeltaData* deltaData = *j;
00221         if( deltaData->getVersion() < deltaVersion )
00222             continue;
00223 
00224         deltaData->setInstanceID( instanceID );
00225         deltaData->resend( node );
00226     }
00227 }
00228 
00229 void DeltaMasterCM::removeSlave( NodePtr node )
00230 {
00231     CHECK_THREAD( _thread );
00232     _checkConsistency();
00233 
00234     // remove from subscribers
00235     const NodeID& nodeID = node->getNodeID();
00236     EQASSERT( _slavesCount[ nodeID ] != 0 );
00237 
00238     --_slavesCount[ nodeID ];
00239     if( _slavesCount[ nodeID ] == 0 )
00240     {
00241         NodeVector::iterator i = find( _slaves.begin(), _slaves.end(), node );
00242         EQASSERT( i != _slaves.end( ));
00243         _slaves.erase( i );
00244         _slavesCount.erase( nodeID );
00245     }
00246 }
00247 
00248 void DeltaMasterCM::addOldMaster( NodePtr node, const uint32_t instanceID )
00249 {
00250     EQASSERT( _version != Object::VERSION_NONE );
00251 
00252     // add to subscribers
00253     ++_slavesCount[ node->getNodeID() ];
00254     _slaves.push_back( node );
00255     stde::usort( _slaves );
00256 
00257     ObjectVersionPacket packet;
00258     packet.instanceID = instanceID;
00259     packet.version    = _version;
00260     _object->send( node, packet );
00261 }
00262 
00263 void DeltaMasterCM::_checkConsistency() const
00264 {
00265 #ifndef NDEBUG
00266     EQASSERT( _object->_id != EQ_ID_INVALID );
00267     EQASSERT( _object->getChangeType() == Object::DELTA );
00268     if( _version == Object::VERSION_NONE )
00269         return;
00270 
00271     EQASSERT( _instanceDatas.size() == _deltaDatas.size() + 1 );
00272 
00273     if( !( _obsoleteFlags & Object::AUTO_OBSOLETE_COUNT_COMMITS ))
00274     {   // count versions
00275         if( _version > _nVersions )
00276         {
00277             EQASSERT( _deltaDatas.size() == _nVersions );
00278         }
00279         else
00280         {
00281             EQASSERT( _instanceDatas.size() == _version );
00282         }
00283     }
00284 
00285     uint32_t version = _version;
00286     for( deque< InstanceData* >::const_iterator i = _instanceDatas.begin();
00287          i != _instanceDatas.end(); ++i )
00288     {
00289         const InstanceData* data = *i;
00290         EQASSERT( data->os.getVersion() == version );
00291         EQASSERT( data->os.getVersion() > 0 );
00292         --version;
00293     }
00294 
00295     version = _version;
00296     for( deque< DeltaData* >::const_iterator i = _deltaDatas.begin();
00297          i != _deltaDatas.end(); ++i )
00298     {
00299         const DeltaData* data = *i;
00300         EQASSERT( data->getVersion() == version );
00301         EQASSERT( data->getVersion() > 0 );
00302         --version;
00303     }
00304 #endif
00305 }
00306 
00307 //---------------------------------------------------------------------------
00308 // cache handling
00309 //---------------------------------------------------------------------------
00310 DeltaMasterCM::InstanceData* DeltaMasterCM::_newInstanceData()
00311 {
00312     InstanceData* instanceData;
00313 
00314     if( _instanceDataCache.empty( ))
00315         instanceData = new InstanceData( _object );
00316     else
00317     {
00318         instanceData = _instanceDataCache.back();
00319         _instanceDataCache.pop_back();
00320     }
00321 
00322     instanceData->os.disableSave();
00323     instanceData->os.enableBuffering();
00324     instanceData->os.setInstanceID( EQ_ID_ANY );
00325     return instanceData;
00326 }
00327 
00328 DeltaMasterCM::DeltaData* DeltaMasterCM::_newDeltaData()
00329 {
00330     DeltaData* deltaData;
00331 
00332     if( _deltaDataCache.empty( ))
00333         deltaData = new DeltaData( _object );
00334     else
00335     {
00336         deltaData = _deltaDataCache.back();
00337         _deltaDataCache.pop_back();
00338     }
00339 
00340     deltaData->disableSave();
00341     deltaData->enableBuffering();
00342     deltaData->setInstanceID( EQ_ID_ANY );
00343     return deltaData;
00344 }
00345 
00346 
00347 //---------------------------------------------------------------------------
00348 // command handlers
00349 //---------------------------------------------------------------------------
00350 CommandResult DeltaMasterCM::_cmdCommit( Command& command )
00351 {
00352     CHECK_THREAD( _thread );
00353     const ObjectCommitPacket* packet = command.getPacket<ObjectCommitPacket>();
00354     EQLOG( LOG_OBJECTS ) << "commit v" << _version << " " << command << endl;
00355 
00356     EQASSERT( _version != Object::VERSION_NONE );
00357     EQASSERT( _instanceDatas.size() == _deltaDatas.size() + 1 );
00358 
00359     ++_commitCount;
00360 
00361     DeltaData* deltaData = _newDeltaData();
00362     const bool  saveDelta  = ( _nVersions > 0 );
00363     if( saveDelta ) deltaData->enableSave();
00364 
00365     deltaData->setVersion( _version + 1 );
00366     deltaData->enable( _slaves );
00367     _object->pack( *deltaData );
00368     deltaData->disable();
00369 
00370     if( !deltaData->hasSentData( ))
00371     {
00372         _deltaDataCache.push_back( deltaData );
00373         _obsolete();
00374         _checkConsistency();
00375 
00376         _requestHandler.serveRequest( packet->requestID, _version );
00377         return COMMAND_HANDLED;
00378     }
00379 
00380     ++_version;
00381     EQASSERT( _version );
00382     
00383     if( saveDelta )
00384         _deltaDatas.push_front( deltaData );
00385     else
00386         _deltaDataCache.push_back( deltaData );
00387 
00388     // save instance data
00389     InstanceData* instanceData = _newInstanceData();
00390     instanceData->os.enableSave();
00391     instanceData->os.setVersion( _version );
00392 
00393     instanceData->os.enable();
00394     _object->getInstanceData( instanceData->os );
00395     instanceData->os.disable();
00396 
00397     instanceData->commitCount = _commitCount;
00398     _instanceDatas.push_front( instanceData );
00399     
00400     _obsolete();
00401     _checkConsistency();
00402 
00403     EQLOG( LOG_OBJECTS ) << "Committed v" << _version << ", id " 
00404                          << _object->getID() << endl;
00405     _requestHandler.serveRequest( packet->requestID, _version );
00406     return COMMAND_HANDLED;
00407 }
00408 }
00409 }
Generated on Mon Aug 10 18:58:32 2009 for Equalizer 0.9 by  doxygen 1.5.8