fullSlaveCM.cpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include <pthread.h>
00019 #include "fullSlaveCM.h"
00020
00021 #include "command.h"
00022 #include "commands.h"
00023 #include "log.h"
00024 #include "object.h"
00025 #include "objectDeltaDataIStream.h"
00026 #include "objectInstanceDataIStream.h"
00027 #include "session.h"
00028
00029 #include <eq/base/scopedMutex.h>
00030
00031 using namespace eq::base;
00032 using namespace std;
00033
00034 namespace eq
00035 {
00036 namespace net
00037 {
00038 typedef CommandFunc<FullSlaveCM> CmdFunc;
00039
00040 FullSlaveCM::FullSlaveCM( Object* object, uint32_t masterInstanceID )
00041 : StaticSlaveCM( object )
00042 , _version( Object::VERSION_NONE )
00043 , _mutex( 0 )
00044 , _currentDeltaStream( 0 )
00045 , _masterInstanceID( masterInstanceID )
00046 {
00047 registerCommand( CMD_OBJECT_DELTA_DATA,
00048 CmdFunc( this, &FullSlaveCM::_cmdDeltaData ), 0 );
00049 registerCommand( CMD_OBJECT_DELTA,
00050 CmdFunc( this, &FullSlaveCM::_cmdDelta ), 0 );
00051 registerCommand( CMD_OBJECT_VERSION,
00052 CmdFunc( this, &FullSlaveCM::_cmdVersion ), 0 );
00053 }
00054
00055 FullSlaveCM::~FullSlaveCM()
00056 {
00057 delete _mutex;
00058 _mutex = 0;
00059
00060 while( !_queuedVersions.isEmpty( ))
00061 delete _queuedVersions.pop();
00062
00063 delete _currentDeltaStream;
00064 _currentDeltaStream = 0;
00065
00066 _version = Object::VERSION_NONE;
00067 }
00068
00069 void FullSlaveCM::makeThreadSafe()
00070 {
00071 if( _mutex ) return;
00072
00073 _mutex = new Lock;
00074 }
00075
00076 uint32_t FullSlaveCM::sync( const uint32_t version )
00077 {
00078 EQLOG( LOG_OBJECTS ) << "sync to v" << version << ", id "
00079 << _object->getID() << "." << _object->getInstanceID()
00080 << endl;
00081 if( _version == version )
00082 return _version;
00083
00084 if( !_mutex )
00085 CHECK_THREAD( _thread );
00086
00087 ScopedMutex mutex( _mutex );
00088
00089 if( version == Object::VERSION_HEAD )
00090 {
00091 _syncToHead();
00092 return _version;
00093 }
00094
00095 EQASSERTINFO( _version <= version,
00096 "can't sync to older version of object (" << _version <<
00097 ", " << version <<")");
00098
00099 while( _version < version )
00100 {
00101 ObjectDataIStream* is = _queuedVersions.pop();
00102 _unpackOneVersion( is );
00103 EQASSERTINFO( _version == is->getVersion(), "Have version "
00104 << _version << " instead of " << is->getVersion( ));
00105 delete is;
00106 }
00107
00108 _object->getLocalNode()->flushCommands();
00109 return _version;
00110 }
00111
00112 void FullSlaveCM::_syncToHead()
00113 {
00114 if( _queuedVersions.isEmpty( ))
00115 return;
00116
00117 for( ObjectDataIStream* is = _queuedVersions.tryPop();
00118 is; is = _queuedVersions.tryPop( ))
00119 {
00120 _unpackOneVersion( is );
00121 EQASSERTINFO( _version == is->getVersion(), "Have version "
00122 << _version << " instead of " << is->getVersion( ));
00123 delete is;
00124 }
00125
00126 _object->getLocalNode()->flushCommands();
00127 }
00128
00129
00130 uint32_t FullSlaveCM::getHeadVersion() const
00131 {
00132 ObjectDataIStream* is = _queuedVersions.back();
00133 if( is )
00134 return is->getVersion();
00135
00136 return _version;
00137 }
00138
00139 void FullSlaveCM::_unpackOneVersion( ObjectDataIStream* is )
00140 {
00141 EQASSERT( is );
00142 EQASSERTINFO( _version == is->getVersion() - 1, "Expected version "
00143 << _version + 1 << ", got " << is->getVersion() );
00144
00145 _object->unpack( *is );
00146 _version = is->getVersion();
00147 EQLOG( LOG_OBJECTS ) << "applied v" << _version << ", id "
00148 << _object->getID() << "." << _object->getInstanceID()
00149 << endl;
00150
00151 if( is->getRemainingBufferSize() > 0 || is->nRemainingBuffers() > 0 )
00152 EQWARN << "Object " << typeid( *_object ).name()
00153 << " did not unpack all data" << endl;
00154 }
00155
00156
00157 void FullSlaveCM::applyMapData()
00158 {
00159 EQASSERTINFO( _currentIStream, typeid( *_object ).name() << " id " <<
00160 _object->getID() << "." << _object->getInstanceID( ));
00161
00162 _currentIStream->waitReady();
00163
00164 _object->applyInstanceData( *_currentIStream );
00165 _version = _currentIStream->getVersion();
00166
00167 delete _currentIStream;
00168 _currentIStream = 0;
00169
00170 EQLOG( LOG_OBJECTS ) << "Mapped initial data for " << _object->getID()
00171 << "." << _object->getInstanceID() << " v" << _version
00172 << " ready" << std::endl;
00173 }
00174
00175
00176
00177
00178
00179 CommandResult FullSlaveCM::_cmdDeltaData( Command& command )
00180 {
00181 if( !_currentDeltaStream )
00182 _currentDeltaStream = new ObjectDeltaDataIStream;
00183
00184 _currentDeltaStream->addDataPacket( command );
00185 return COMMAND_HANDLED;
00186 }
00187
00188 CommandResult FullSlaveCM::_cmdDelta( Command& command )
00189 {
00190 if( !_currentDeltaStream )
00191 _currentDeltaStream = new ObjectDeltaDataIStream;
00192
00193 const ObjectDeltaPacket* packet = command.getPacket<ObjectDeltaPacket>();
00194 EQLOG( LOG_OBJECTS ) << "cmd delta " << command << endl;
00195
00196 _currentDeltaStream->addDataPacket( command );
00197 _currentDeltaStream->setVersion( packet->version );
00198
00199 EQLOG( LOG_OBJECTS ) << "v" << packet->version << ", id "
00200 << _object->getID() << "." << _object->getInstanceID()
00201 << " ready" << endl;
00202
00203 _queuedVersions.push( _currentDeltaStream );
00204 _object->notifyNewHeadVersion( packet->version );
00205 _currentDeltaStream = 0;
00206
00207 return COMMAND_HANDLED;
00208 }
00209
00210 CommandResult FullSlaveCM::_cmdVersion( Command& command )
00211 {
00212 const ObjectVersionPacket* packet =
00213 command.getPacket< ObjectVersionPacket >();
00214 _version = packet->version;
00215 return COMMAND_HANDLED;
00216 }
00217
00218 }
00219 }