00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "object.h"
00019
00020 #include "command.h"
00021 #include "dataIStream.h"
00022 #include "dataOStream.h"
00023 #include "deltaMasterCM.h"
00024 #include "fullMasterCM.h"
00025 #include "fullSlaveCM.h"
00026 #include "log.h"
00027 #include "nullCM.h"
00028 #include "objectCM.h"
00029 #include "packets.h"
00030 #include "session.h"
00031 #include "staticMasterCM.h"
00032 #include "staticSlaveCM.h"
00033 #include "unbufferedMasterCM.h"
00034
00035 #include <eq/base/scopedMutex.h>
00036 #include <iostream>
00037
00038 using namespace eq::base;
00039 using namespace std;
00040
00041 namespace eq
00042 {
00043 namespace net
00044 {
00045 Object::Object()
00046 : _session ( 0 )
00047 , _id ( EQ_ID_INVALID )
00048 , _instanceID ( EQ_ID_INVALID )
00049 , _cm ( ObjectCM::ZERO )
00050 , _threadSafe ( false )
00051 {
00052 }
00053
00054 Object::Object( const Object& object )
00055 : Dispatcher( object )
00056 , _session ( 0 )
00057 , _id ( EQ_ID_INVALID )
00058 , _instanceID ( EQ_ID_INVALID )
00059 , _cm ( ObjectCM::ZERO )
00060 , _threadSafe ( object._threadSafe )
00061 {
00062 }
00063
00064
00065 Object::~Object()
00066 {
00067 if( _session )
00068 EQERROR << "Object " << _id << " is still registered in session "
00069 << _session->getID() << " in destructor" << endl;
00070
00071 if( _cm != ObjectCM::ZERO )
00072 delete _cm;
00073 _cm = 0;
00074 }
00075
00076 void Object::attachToSession( const uint32_t id, const uint32_t instanceID,
00077 Session* session )
00078 {
00079 EQASSERT( id != EQ_ID_INVALID );
00080 EQASSERT( instanceID != EQ_ID_INVALID );
00081 EQASSERT( session );
00082
00083 _id = id;
00084 _instanceID = instanceID;
00085 _session = session;
00086
00087 CommandQueue* queue = session->getCommandThreadQueue();
00088
00089 registerCommand( CMD_OBJECT_INSTANCE_DATA,
00090 CommandFunc<Object>( this, &Object::_cmdForward ), queue );
00091 registerCommand( CMD_OBJECT_INSTANCE,
00092 CommandFunc<Object>( this, &Object::_cmdForward ), queue );
00093 registerCommand( CMD_OBJECT_DELTA_DATA,
00094 CommandFunc<Object>( this, &Object::_cmdForward ), queue );
00095 registerCommand( CMD_OBJECT_DELTA,
00096 CommandFunc<Object>( this, &Object::_cmdForward ), queue );
00097 registerCommand( CMD_OBJECT_COMMIT,
00098 CommandFunc<Object>( this, &Object::_cmdForward ), queue );
00099 registerCommand( CMD_OBJECT_NEW_MASTER,
00100 CommandFunc<Object>( this, &Object::_cmdNewMaster ),queue);
00101 registerCommand( CMD_OBJECT_VERSION,
00102 CommandFunc<Object>( this, &Object::_cmdForward ), queue );
00103
00104 EQINFO << _id << '.' << _instanceID << ": " << typeid( *this ).name()
00105 << (isMaster() ? " master" : " slave") << std::endl;
00106 }
00107
00108 void Object::detachFromSession()
00109 {
00110
00111 if( isMaster( ))
00112 _setChangeManager( ObjectCM::ZERO );
00113
00114 _id = EQ_ID_INVALID;
00115 _instanceID = EQ_ID_INVALID;
00116 _session = 0;
00117 }
00118
00119 void Object::_setChangeManager( ObjectCM* cm )
00120 {
00121 if( _cm != ObjectCM::ZERO )
00122 {
00123 EQVERB << "Overriding existing object change manager, obj "
00124 << typeid( *this ).name() << ", old cm "
00125 << typeid( *_cm ).name() << ", new cm "
00126 << typeid( *cm ).name() << endl;
00127 delete _cm;
00128 }
00129
00130 if( _threadSafe )
00131 cm->makeThreadSafe();
00132
00133 _cm = cm;
00134 EQLOG( LOG_OBJECTS ) << "set new change manager " << typeid( *cm ).name()
00135 << " for " << typeid( *this ).name() << endl;
00136 }
00137
00138 void Object::makeThreadSafe()
00139 {
00140 EQASSERT( _id == EQ_ID_INVALID );
00141 if( _threadSafe )
00142 return;
00143
00144 _threadSafe = true;
00145 _cm->makeThreadSafe();
00146 }
00147
00148 NodePtr Object::getLocalNode()
00149 {
00150 return _session ? _session->getLocalNode() : 0;
00151 }
00152
00153 bool Object::send( NodePtr node, ObjectPacket& packet )
00154 {
00155 EQASSERT( _session ); EQASSERT( _id != EQ_ID_INVALID );
00156 packet.sessionID = _session->getID();
00157 packet.objectID = _id;
00158 return node->send( packet );
00159 }
00160
00161 bool Object::send( NodePtr node, ObjectPacket& packet,
00162 const std::string& string )
00163 {
00164 EQASSERT( _session ); EQASSERT( _id != EQ_ID_INVALID );
00165 packet.sessionID = _session->getID();
00166 packet.objectID = _id;
00167 return node->send( packet, string );
00168 }
00169
00170 bool Object::send( NodePtr node, ObjectPacket& packet,
00171 const void* data, const uint64_t size )
00172 {
00173 EQASSERT( _session ); EQASSERT( _id != EQ_ID_INVALID );
00174 packet.sessionID = _session->getID();
00175 packet.objectID = _id;
00176 return node->send( packet, data, size );
00177 }
00178
00179 void Object::becomeMaster()
00180 {
00181 EQASSERT( _session );
00182 EQASSERT( !isMaster( ));
00183 EQASSERT( _id != EQ_ID_INVALID );
00184
00185
00186 Session* session = _session;
00187 const NodeID& masterNodeID = session->getIDMaster( _id );
00188 EQASSERT( masterNodeID != NodeID::ZERO );
00189
00190 NodePtr localNode = session->getLocalNode();
00191 NodePtr master = localNode->getNode( masterNodeID );
00192 EQASSERT( master.isValid( ));
00193
00194 const uint32_t masterID = _id;
00195 const uint32_t masterInstanceID = getMasterInstanceID();
00196 EQASSERT( masterInstanceID != EQ_ID_INVALID );
00197
00198
00199 session->unmapObject( this );
00200 sync();
00201 session->registerObject( this );
00202
00203 EQINFO << "became master " << masterID << '.' << masterInstanceID << " to "
00204 << _id << '.' << _instanceID << std::endl;
00205
00206
00207 ObjectNewMasterPacket packet;
00208 packet.sessionID = session->getID();
00209 packet.objectID = masterID;
00210 packet.instanceID = masterInstanceID;
00211 packet.newMasterID = _id;
00212 packet.newMasterInstanceID = _instanceID;
00213 packet.changeType = getChangeType();
00214
00215 master->send( packet );
00216
00217
00218 _cm->addOldMaster( master, masterInstanceID );
00219 }
00220
00221 uint32_t Object::commit()
00222 {
00223 if( !isDirty( ))
00224 return getVersion();
00225
00226 const uint32_t requestID = commitNB();
00227 return commitSync( requestID );
00228 }
00229
00230 void Object::setupChangeManager( const Object::ChangeType type,
00231 const bool master,
00232 const uint32_t masterInstanceID )
00233 {
00234 switch( type )
00235 {
00236 case Object::STATIC:
00237 if( master )
00238 _setChangeManager( new StaticMasterCM( this ));
00239 else
00240 _setChangeManager( new StaticSlaveCM( this ));
00241 break;
00242 case Object::INSTANCE:
00243 if( master )
00244 _setChangeManager( new FullMasterCM( this ));
00245 else
00246 _setChangeManager( new FullSlaveCM( this, masterInstanceID ));
00247 break;
00248 case Object::DELTA:
00249 if( master )
00250 _setChangeManager( new DeltaMasterCM( this ));
00251 else
00252 _setChangeManager( new FullSlaveCM( this, masterInstanceID ));
00253 break;
00254 case Object::UNBUFFERED:
00255 if( master )
00256 _setChangeManager( new UnbufferedMasterCM( this ));
00257 else
00258 _setChangeManager( new FullSlaveCM( this, masterInstanceID ));
00259 break;
00260
00261 default: EQUNIMPLEMENTED;
00262 }
00263 }
00264
00265
00266
00267
00268
00269 bool Object::isMaster() const
00270 {
00271 return _cm->isMaster();
00272 }
00273
00274 uint32_t Object::commitNB()
00275 {
00276 return _cm->commitNB();
00277 }
00278
00279 uint32_t Object::commitSync( const uint32_t commitID )
00280 {
00281 return _cm->commitSync( commitID );
00282 }
00283
00284 void Object::obsolete( const uint32_t version )
00285 {
00286 _cm->obsolete( version );
00287 }
00288
00289 void Object::setAutoObsolete( const uint32_t count, const uint32_t flags )
00290 {
00291 _cm->setAutoObsolete( count, flags );
00292 }
00293
00294 uint32_t Object::getAutoObsoleteCount() const
00295 {
00296 return _cm->getAutoObsoleteCount();
00297 }
00298
00299 uint32_t Object::sync( const uint32_t version )
00300 {
00301 return _cm->sync( version );
00302 }
00303
00304 uint32_t Object::getHeadVersion() const
00305 {
00306 return _cm->getHeadVersion();
00307 }
00308
00309 uint32_t Object::getVersion() const
00310 {
00311 return _cm->getVersion();
00312 }
00313
00314 uint32_t Object::getOldestVersion() const
00315 {
00316 return _cm->getOldestVersion();
00317 }
00318
00319 uint32_t Object::getMasterInstanceID() const
00320 {
00321 return _cm->getMasterInstanceID();
00322 }
00323
00324 void Object::addSlave( NodePtr node, const uint32_t instanceID,
00325 const uint32_t version )
00326 {
00327 _cm->addSlave( node, instanceID, version );
00328 }
00329
00330 void Object::removeSlave( NodePtr node )
00331 {
00332 _cm->removeSlave( node );
00333 }
00334
00335 CommandResult Object::_cmdForward( Command& command )
00336 {
00337 return _cm->invokeCommand( command );
00338 }
00339
00340 CommandResult Object::_cmdNewMaster( Command& command )
00341 {
00342 ObjectNewMasterPacket* packet = command.getPacket<ObjectNewMasterPacket>();
00343 EQVERB << "become slave " << _id << '.' << _instanceID << " to "
00344 << packet->newMasterID << '.' << packet->newMasterInstanceID
00345 << std::endl;
00346 EQASSERT( isMaster( ));
00347
00348 const uint32_t instanceID = _instanceID;
00349 Session* session = getSession();
00350 session->deregisterObject( this );
00351
00352 setupChangeManager( static_cast< Object::ChangeType >( packet->changeType ),
00353 false, packet->newMasterInstanceID );
00354 session->attachObject( this, packet->newMasterID, instanceID );
00355
00356 return COMMAND_HANDLED;
00357 }
00358
00359 }
00360 }