fullSlaveCM.cpp

00001 
00002 /* Copyright (c) 2007-2009, Stefan Eilemann <eile@equalizergraphics.com> 
00003  *
00004  * This library is free software; you can redistribute it and/or modify it under
00005  * the terms of the GNU Lesser General Public License version 2.1 as published
00006  * by the Free Software Foundation.
00007  *  
00008  * This library is distributed in the hope that it will be useful, but WITHOUT
00009  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00010  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00011  * details.
00012  * 
00013  * You should have received a copy of the GNU Lesser General Public License
00014  * along with this library; if not, write to the Free Software Foundation, Inc.,
00015  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00016  */
00017 
00018 #include <pthread.h>
00019 #include "fullSlaveCM.h"
00020 
00021 #include "command.h"
00022 #include "commands.h"
00023 #include "log.h"
00024 #include "object.h"
00025 #include "objectDeltaDataIStream.h"
00026 #include "objectInstanceDataIStream.h"
00027 #include "session.h"
00028 
00029 #include <eq/base/scopedMutex.h>
00030 
00031 using namespace eq::base;
00032 using namespace std;
00033 
00034 namespace eq
00035 {
00036 namespace net
00037 {
00038 typedef CommandFunc<FullSlaveCM> CmdFunc;
00039 
00040 FullSlaveCM::FullSlaveCM( Object* object, uint32_t masterInstanceID )
00041         : StaticSlaveCM( object )
00042         , _version( Object::VERSION_NONE )
00043         , _mutex( 0 )
00044         , _currentDeltaStream( 0 )
00045         , _masterInstanceID( masterInstanceID )
00046 {
00047     registerCommand( CMD_OBJECT_DELTA_DATA,
00048                      CmdFunc( this, &FullSlaveCM::_cmdDeltaData ), 0 );
00049     registerCommand( CMD_OBJECT_DELTA,
00050                      CmdFunc( this, &FullSlaveCM::_cmdDelta ), 0 );
00051     registerCommand( CMD_OBJECT_VERSION,
00052                      CmdFunc( this, &FullSlaveCM::_cmdVersion ), 0 );
00053 }
00054 
00055 FullSlaveCM::~FullSlaveCM()
00056 {
00057     delete _mutex;
00058     _mutex = 0;
00059 
00060     while( !_queuedVersions.isEmpty( ))
00061         delete _queuedVersions.pop();
00062 
00063     delete _currentDeltaStream;
00064     _currentDeltaStream = 0;
00065 
00066     _version = Object::VERSION_NONE;
00067 }
00068 
00069 void FullSlaveCM::makeThreadSafe()
00070 {
00071     if( _mutex ) return;
00072 
00073     _mutex = new Lock;
00074 }
00075 
00076 uint32_t FullSlaveCM::sync( const uint32_t version )
00077 {
00078     EQLOG( LOG_OBJECTS ) << "sync to v" << version << ", id " 
00079                          << _object->getID() << "." << _object->getInstanceID()
00080                          << endl;
00081     if( _version == version )
00082         return _version;
00083 
00084     if( !_mutex )
00085         CHECK_THREAD( _thread );
00086 
00087     ScopedMutex mutex( _mutex );
00088 
00089     if( version == Object::VERSION_HEAD )
00090     {
00091         _syncToHead();
00092         return _version;
00093     }
00094 
00095     EQASSERTINFO( _version <= version,
00096                   "can't sync to older version of object (" << _version << 
00097                   ", " << version <<")");
00098 
00099     while( _version < version )
00100     {
00101         ObjectDataIStream* is = _queuedVersions.pop();
00102         _unpackOneVersion( is );
00103         EQASSERTINFO( _version == is->getVersion(), "Have version " 
00104                       << _version << " instead of " << is->getVersion( ));
00105         delete is;
00106     }
00107 
00108     _object->getLocalNode()->flushCommands();
00109     return _version;
00110 }
00111 
00112 void FullSlaveCM::_syncToHead()
00113 {
00114     if( _queuedVersions.isEmpty( ))
00115         return;
00116 
00117     for( ObjectDataIStream* is = _queuedVersions.tryPop(); 
00118          is; is = _queuedVersions.tryPop( ))
00119     {
00120         _unpackOneVersion( is );
00121         EQASSERTINFO( _version == is->getVersion(), "Have version " 
00122                       << _version << " instead of " << is->getVersion( ));
00123         delete is;
00124     }
00125 
00126     _object->getLocalNode()->flushCommands();
00127 }
00128 
00129 
00130 uint32_t FullSlaveCM::getHeadVersion() const
00131 {
00132     ObjectDataIStream* is = _queuedVersions.back();
00133     if( is )
00134         return is->getVersion();
00135 
00136     return _version;    
00137 }
00138 
00139 void FullSlaveCM::_unpackOneVersion( ObjectDataIStream* is )
00140 {
00141     EQASSERT( is );
00142     EQASSERTINFO( _version == is->getVersion() - 1, "Expected version " 
00143                   << _version + 1 << ", got " << is->getVersion() );
00144     
00145     _object->unpack( *is );
00146     _version = is->getVersion();
00147     EQLOG( LOG_OBJECTS ) << "applied v" << _version << ", id "
00148                          << _object->getID() << "." << _object->getInstanceID()
00149                          << endl;
00150 
00151     if( is->getRemainingBufferSize() > 0 || is->nRemainingBuffers() > 0 )
00152         EQWARN << "Object " << typeid( *_object ).name() 
00153             << " did not unpack all data" << endl;
00154 }
00155 
00156 
00157 void FullSlaveCM::applyMapData()
00158 {
00159     EQASSERTINFO( _currentIStream, typeid( *_object ).name() << " id " <<
00160                   _object->getID() << "." << _object->getInstanceID( ));
00161 
00162     _currentIStream->waitReady();
00163 
00164     _object->applyInstanceData( *_currentIStream );
00165     _version = _currentIStream->getVersion();
00166 
00167     delete _currentIStream;
00168     _currentIStream = 0;
00169 
00170     EQLOG( LOG_OBJECTS ) << "Mapped initial data for " << _object->getID()
00171                          << "." << _object->getInstanceID() << " v" << _version
00172                          << " ready" << std::endl;
00173 }
00174 
00175 //---------------------------------------------------------------------------
00176 // command handlers
00177 //---------------------------------------------------------------------------
00178 
00179 CommandResult FullSlaveCM::_cmdDeltaData( Command& command )
00180 {
00181     if( !_currentDeltaStream )
00182         _currentDeltaStream = new ObjectDeltaDataIStream;
00183 
00184     _currentDeltaStream->addDataPacket( command );
00185     return COMMAND_HANDLED;
00186 }
00187 
00188 CommandResult FullSlaveCM::_cmdDelta( Command& command )
00189 {
00190     if( !_currentDeltaStream )
00191         _currentDeltaStream = new ObjectDeltaDataIStream;
00192 
00193     const ObjectDeltaPacket* packet = command.getPacket<ObjectDeltaPacket>();
00194     EQLOG( LOG_OBJECTS ) << "cmd delta " << command << endl;
00195 
00196     _currentDeltaStream->addDataPacket( command );
00197     _currentDeltaStream->setVersion( packet->version );
00198     
00199     EQLOG( LOG_OBJECTS ) << "v" << packet->version << ", id "
00200                          << _object->getID() << "." << _object->getInstanceID()
00201                          << " ready" << endl;
00202 
00203     _queuedVersions.push( _currentDeltaStream );
00204     _object->notifyNewHeadVersion( packet->version );
00205     _currentDeltaStream = 0;
00206 
00207     return COMMAND_HANDLED;
00208 }
00209 
00210 CommandResult FullSlaveCM::_cmdVersion( Command& command )
00211 {
00212     const ObjectVersionPacket* packet = 
00213         command.getPacket< ObjectVersionPacket >();
00214     _version = packet->version;
00215     return COMMAND_HANDLED;
00216 }
00217 
00218 }
00219 }
Generated on Mon Aug 10 18:58:33 2009 for Equalizer 0.9 by  doxygen 1.5.8