fullMasterCM.cpp

00001 
00002 /* Copyright (c) 2007-2009, Stefan Eilemann <eile@equalizergraphics.com> 
00003  *
00004  * This library is free software; you can redistribute it and/or modify it under
00005  * the terms of the GNU Lesser General Public License version 2.1 as published
00006  * by the Free Software Foundation.
00007  *  
00008  * This library is distributed in the hope that it will be useful, but WITHOUT
00009  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00010  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00011  * details.
00012  * 
00013  * You should have received a copy of the GNU Lesser General Public License
00014  * along with this library; if not, write to the Free Software Foundation, Inc.,
00015  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00016  */
00017 
00018 #include "fullMasterCM.h"
00019 
00020 #include "command.h"
00021 #include "commands.h"
00022 #include "log.h"
00023 #include "node.h"
00024 #include "object.h"
00025 #include "packets.h"
00026 #include "session.h"
00027 
00028 using namespace eq::base;
00029 using namespace std;
00030 
00031 namespace eq
00032 {
00033 namespace net
00034 {
00035 typedef CommandFunc<FullMasterCM> CmdFunc;
00036 
00037 FullMasterCM::FullMasterCM( Object* object )
00038         : _object( object ),
00039           _version( Object::VERSION_NONE ),
00040           _commitCount( 0 ),
00041           _nVersions( 0 ),
00042           _obsoleteFlags( Object::AUTO_OBSOLETE_COUNT_VERSIONS )
00043 {
00044     DeltaData* data = _newDeltaData();
00045     data->os.setVersion( 1 );
00046 
00047     data->os.enable();
00048     _object->getInstanceData( data->os );
00049     data->os.disable();
00050         
00051     _deltaDatas.push_front( data );
00052     ++_version;
00053     ++_commitCount;
00054 
00055     registerCommand( CMD_OBJECT_COMMIT, 
00056                      CmdFunc( this, &FullMasterCM::_cmdCommit ), 0 );
00057     // sync commands are send to any instance, even the master gets the command
00058     registerCommand( CMD_OBJECT_DELTA_DATA,
00059                      CmdFunc( this, &FullMasterCM::_cmdDiscard ), 0 );
00060     registerCommand( CMD_OBJECT_DELTA,
00061                      CmdFunc( this, &FullMasterCM::_cmdDiscard ), 0 );
00062 }
00063 
00064 FullMasterCM::~FullMasterCM()
00065 {
00066     if( !_slaves.empty( ))
00067         EQWARN << _slaves.size() 
00068                << " slave nodes subscribed during deregisterObject of "
00069                << typeid( *_object ).name() << std::endl;
00070     _slaves.clear();
00071 
00072     for( std::deque< DeltaData* >::const_iterator i = _deltaDatas.begin();
00073          i != _deltaDatas.end(); ++i )
00074 
00075         delete *i;
00076 
00077     _deltaDatas.clear();
00078 
00079     for(std::vector<DeltaData*>::const_iterator i=_deltaDataCache.begin();
00080          i != _deltaDataCache.end(); ++i )
00081 
00082         delete *i;
00083     
00084     _deltaDataCache.clear();
00085 }
00086 
00087 uint32_t FullMasterCM::commitNB()
00088 {
00089     EQASSERTINFO( _object->getChangeType() == Object::INSTANCE,
00090                   "Object type " << typeid(*this).name( ));
00091 
00092     ObjectCommitPacket packet;
00093     packet.instanceID = _object->_instanceID;
00094     packet.requestID  = _requestHandler.registerRequest();
00095 
00096     _object->send( _object->getLocalNode(), packet );
00097     return packet.requestID;
00098 }
00099 
00100 uint32_t FullMasterCM::commitSync( const uint32_t commitID )
00101 {
00102     uint32_t version = Object::VERSION_NONE;
00103     _requestHandler.waitRequest( commitID, version );
00104     return version;
00105 }
00106 
00107 // Obsoletes old changes based on number of commits or number of versions,
00108 // depending on the obsolete flags.
00109 void FullMasterCM::_obsolete()
00110 {
00111     if( _obsoleteFlags & Object::AUTO_OBSOLETE_COUNT_COMMITS )
00112     {
00113         DeltaData* lastDeltaData = _deltaDatas.back();
00114         if( lastDeltaData->commitCount < (_commitCount - _nVersions) &&
00115             _deltaDatas.size() > 1 )
00116         {
00117             _deltaDataCache.push_back( lastDeltaData );
00118             _deltaDatas.pop_back();
00119         }
00120         _checkConsistency();
00121         return;
00122     }
00123     // else count versions
00124     while( _deltaDatas.size() > (_nVersions+1) )
00125     {
00126         _deltaDataCache.push_back( _deltaDatas.back( ));
00127         _deltaDatas.pop_back();
00128         _checkConsistency();
00129     }
00130 }
00131 
00132 uint32_t FullMasterCM::getOldestVersion() const
00133 {
00134     if( _version == Object::VERSION_NONE )
00135         return Object::VERSION_NONE;
00136 
00137     return _deltaDatas.back()->os.getVersion();
00138 }
00139 
00140 void FullMasterCM::addSlave( NodePtr node, const uint32_t instanceID, 
00141                              const uint32_t inVersion )
00142 {
00143     CHECK_THREAD( _thread );
00144     EQASSERT( _version != Object::VERSION_NONE );
00145     _checkConsistency();
00146 
00147     // add to subscribers
00148     ++_slavesCount[ node->getNodeID() ];
00149     _slaves.push_back( node );
00150     stde::usort( _slaves );
00151 
00152     if( inVersion == Object::VERSION_NONE ) // no data to send
00153     {
00154         ObjectInstancePacket instPacket;
00155         instPacket.instanceID = instanceID;
00156         instPacket.dataSize   = 0;
00157         instPacket.version    = _version;
00158         instPacket.sequence   = 0;
00159 
00160         _object->send( node, instPacket );
00161         return;
00162     }
00163 
00164     const uint32_t version = (inVersion == Object::VERSION_OLDEST) ?
00165                                  getOldestVersion() : inVersion;
00166     EQLOG( LOG_OBJECTS ) << "Object id " << _object->_id << " v" << _version
00167                          << ", instantiate on " << node->getNodeID() 
00168                          << " with v" << version << endl;
00169 
00170     EQASSERT( _object->getChangeType() == Object::INSTANCE );
00171     EQASSERT( version >= getOldestVersion( ));
00172 
00173     deque< DeltaData* >::reverse_iterator i = _deltaDatas.rbegin();
00174     while( (*i)->os.getVersion() < version && i != _deltaDatas.rend( ))
00175         ++i;
00176 
00177     const DeltaData* data = (i == _deltaDatas.rend()) ? _deltaDatas.back() : *i;
00178          
00179     // first packet has to be an instance packet, to be applied immediately
00180     const Bufferb&       buffer     = data->os.getSaveBuffer();
00181     ObjectInstancePacket instPacket;
00182     instPacket.instanceID = instanceID;
00183     instPacket.dataSize   = buffer.getSize();
00184     instPacket.version    = data->os.getVersion();
00185     instPacket.sequence   = 0;
00186 
00187     _object->send( node, instPacket, buffer.getData(), buffer.getSize() );
00188 
00189     if( i == _deltaDatas.rend( ))
00190         return;
00191 
00192     // versions oldest-1..newest are delta packets
00193     for( ++i; i != _deltaDatas.rend(); ++i )
00194     {
00195         DeltaData* deltaData = *i;
00196         deltaData->os.setInstanceID( instanceID );
00197         deltaData->os.resend( node );
00198         EQASSERT( ++instPacket.version == deltaData->os.getVersion( ));
00199     }
00200     EQASSERT( instPacket.version == _version );
00201 }
00202 
00203 void FullMasterCM::removeSlave( NodePtr node )
00204 {
00205     CHECK_THREAD( _thread );
00206     _checkConsistency();
00207 
00208     // remove from subscribers
00209     const NodeID& nodeID = node->getNodeID();
00210     EQASSERT( _slavesCount[ nodeID ] != 0 );
00211 
00212     --_slavesCount[ nodeID ];
00213     if( _slavesCount[ nodeID ] == 0 )
00214     {
00215         NodeVector::iterator i = find( _slaves.begin(), _slaves.end(), node );
00216         EQASSERT( i != _slaves.end( ));
00217         _slaves.erase( i );
00218         _slavesCount.erase( nodeID );
00219     }
00220 }
00221 
00222 void FullMasterCM::addOldMaster( NodePtr node, const uint32_t instanceID )
00223 {
00224     EQASSERT( _version != Object::VERSION_NONE );
00225 
00226     // add to subscribers
00227     ++_slavesCount[ node->getNodeID() ];
00228     _slaves.push_back( node );
00229     stde::usort( _slaves );
00230 
00231     ObjectVersionPacket packet;
00232     packet.instanceID = instanceID;
00233     packet.version    = _version;
00234     _object->send( node, packet );
00235 }
00236 
00237 void FullMasterCM::_checkConsistency() const
00238 {
00239 #ifndef NDEBUG
00240     EQASSERT( _object->_id != EQ_ID_INVALID );
00241     EQASSERT( _object->getChangeType() == Object::INSTANCE );
00242 
00243     if( _version == Object::VERSION_NONE )
00244         return;
00245 
00246     if( !( _obsoleteFlags & Object::AUTO_OBSOLETE_COUNT_COMMITS ))
00247     {   // count versions
00248         if( _version <= _nVersions )
00249             EQASSERT( _deltaDatas.size() == _version );
00250     }
00251 #endif
00252 }
00253 
00254 //---------------------------------------------------------------------------
00255 // cache handling
00256 //---------------------------------------------------------------------------
00257 FullMasterCM::DeltaData* FullMasterCM::_newDeltaData()
00258 {
00259     DeltaData* deltaData;
00260 
00261     if( _deltaDataCache.empty( ))
00262         deltaData = new DeltaData( _object );
00263     else
00264     {
00265         deltaData = _deltaDataCache.back();
00266         _deltaDataCache.pop_back();
00267     }
00268 
00269     deltaData->os.enableSave();
00270     deltaData->os.enableBuffering();
00271     deltaData->os.setInstanceID( EQ_ID_ANY );
00272     return deltaData;
00273 }
00274 
00275 //---------------------------------------------------------------------------
00276 // command handlers
00277 //---------------------------------------------------------------------------
00278 CommandResult FullMasterCM::_cmdCommit( Command& command )
00279 {
00280     CHECK_THREAD( _thread );
00281     const ObjectCommitPacket* packet = command.getPacket<ObjectCommitPacket>();
00282     EQLOG( LOG_OBJECTS ) << "commit v" << _version << " " << command << endl;
00283 
00284     EQASSERT( _version != Object::VERSION_NONE );
00285 
00286     ++_commitCount;
00287 
00288     DeltaData* deltaData = _newDeltaData();
00289 
00290     deltaData->commitCount = _commitCount;
00291     deltaData->os.setVersion( _version + 1 );
00292 
00293     deltaData->os.enable( _slaves );
00294     _object->pack( deltaData->os );
00295     deltaData->os.disable();
00296 
00297     if( deltaData->os.hasSentData( ))
00298     {
00299         ++_version;
00300         EQASSERT( _version );
00301     
00302         _deltaDatas.push_front( deltaData );
00303         EQLOG( LOG_OBJECTS ) << "Committed v" << _version << ", id " 
00304                              << _object->getID() << endl;
00305     }
00306     else
00307         _deltaDataCache.push_back( deltaData );
00308 
00309     _obsolete();
00310     _checkConsistency();
00311     _requestHandler.serveRequest( packet->requestID, _version );
00312     return COMMAND_HANDLED;
00313 }
00314 
00315 }
00316 }
Generated on Mon Aug 10 18:58:33 2009 for Equalizer 0.9 by  doxygen 1.5.8