net/object.cpp

00001 
00002 /* Copyright (c) 2005-2009, Stefan Eilemann <eile@equalizergraphics.com> 
00003  *
00004  * This library is free software; you can redistribute it and/or modify it under
00005  * the terms of the GNU Lesser General Public License version 2.1 as published
00006  * by the Free Software Foundation.
00007  *  
00008  * This library is distributed in the hope that it will be useful, but WITHOUT
00009  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00010  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00011  * details.
00012  * 
00013  * You should have received a copy of the GNU Lesser General Public License
00014  * along with this library; if not, write to the Free Software Foundation, Inc.,
00015  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00016  */
00017 
00018 #include "object.h"
00019 
00020 #include "command.h"
00021 #include "dataIStream.h"
00022 #include "dataOStream.h"
00023 #include "deltaMasterCM.h"
00024 #include "fullMasterCM.h"
00025 #include "fullSlaveCM.h"
00026 #include "log.h"
00027 #include "nullCM.h"
00028 #include "objectCM.h"
00029 #include "packets.h"
00030 #include "session.h"
00031 #include "staticMasterCM.h"
00032 #include "staticSlaveCM.h"
00033 #include "unbufferedMasterCM.h"
00034 
00035 #include <eq/base/scopedMutex.h>
00036 #include <iostream>
00037 
00038 using namespace eq::base;
00039 using namespace std;
00040 
00041 namespace eq
00042 {
00043 namespace net
00044 {
00045 Object::Object()
00046         : _session          ( 0 )
00047         , _id               ( EQ_ID_INVALID )
00048         , _instanceID       ( EQ_ID_INVALID )
00049         , _cm               ( ObjectCM::ZERO )
00050         , _threadSafe       ( false )
00051 {
00052 }
00053 
00054 Object::Object( const Object& object )
00055         : Dispatcher( object )
00056         , _session          ( 0 )
00057         , _id               ( EQ_ID_INVALID )
00058         , _instanceID       ( EQ_ID_INVALID )
00059         , _cm               ( ObjectCM::ZERO )
00060         , _threadSafe       ( object._threadSafe )
00061 {
00062 }
00063 
00064 
00065 Object::~Object()
00066 {
00067     if( _session ) // Still registered
00068         EQERROR << "Object " << _id << " is still registered in session "
00069                 << _session->getID() << " in destructor" << endl;
00070 
00071     if( _cm != ObjectCM::ZERO )
00072         delete _cm;
00073     _cm = 0;
00074 }
00075 
00076 void Object::attachToSession( const uint32_t id, const uint32_t instanceID, 
00077                               Session* session )
00078 {
00079     EQASSERT( id != EQ_ID_INVALID );
00080     EQASSERT( instanceID != EQ_ID_INVALID );
00081     EQASSERT( session );
00082 
00083     _id         = id;
00084     _instanceID = instanceID;
00085     _session    = session;
00086 
00087     CommandQueue* queue = session->getCommandThreadQueue();
00088 
00089     registerCommand( CMD_OBJECT_INSTANCE_DATA,
00090                      CommandFunc<Object>( this, &Object::_cmdForward ), queue );
00091     registerCommand( CMD_OBJECT_INSTANCE,
00092                      CommandFunc<Object>( this, &Object::_cmdForward ), queue );
00093     registerCommand( CMD_OBJECT_DELTA_DATA, 
00094                      CommandFunc<Object>( this, &Object::_cmdForward ), queue );
00095     registerCommand( CMD_OBJECT_DELTA, 
00096                      CommandFunc<Object>( this, &Object::_cmdForward ), queue );
00097     registerCommand( CMD_OBJECT_COMMIT, 
00098                      CommandFunc<Object>( this, &Object::_cmdForward ), queue );
00099     registerCommand( CMD_OBJECT_NEW_MASTER, 
00100                      CommandFunc<Object>( this, &Object::_cmdNewMaster ),queue);
00101     registerCommand( CMD_OBJECT_VERSION, 
00102                      CommandFunc<Object>( this, &Object::_cmdForward ), queue );
00103 
00104     EQINFO << _id << '.' << _instanceID << ": " << typeid( *this ).name()
00105            << (isMaster() ? " master" : " slave") << std::endl;
00106 }
00107 
00108 void Object::detachFromSession()
00109 {
00110     // Slave objects keep their cm to be able to sync queued versions
00111     if( isMaster( )) 
00112         _setChangeManager( ObjectCM::ZERO );
00113 
00114     _id         = EQ_ID_INVALID;
00115     _instanceID = EQ_ID_INVALID;
00116     _session    = 0;
00117 }
00118 
00119 void Object::_setChangeManager( ObjectCM* cm )
00120 {
00121     if( _cm != ObjectCM::ZERO )
00122     {
00123         EQVERB << "Overriding existing object change manager, obj "
00124                << typeid( *this ).name() << ", old cm " 
00125                << typeid( *_cm ).name() << ", new cm "
00126                << typeid( *cm ).name() << endl;
00127         delete _cm;
00128     }
00129 
00130     if( _threadSafe )
00131         cm->makeThreadSafe();
00132 
00133     _cm = cm;
00134     EQLOG( LOG_OBJECTS ) << "set new change manager " << typeid( *cm ).name()
00135                          << " for " << typeid( *this ).name() << endl;
00136 }
00137 
00138 void Object::makeThreadSafe()
00139 {
00140     EQASSERT( _id == EQ_ID_INVALID );
00141     if( _threadSafe )
00142         return;
00143 
00144     _threadSafe = true;
00145     _cm->makeThreadSafe();
00146 }
00147 
00148 NodePtr Object::getLocalNode()
00149 { 
00150     return _session ? _session->getLocalNode() : 0; 
00151 }
00152 
00153 bool Object::send( NodePtr node, ObjectPacket& packet )
00154 {
00155     EQASSERT( _session ); EQASSERT( _id != EQ_ID_INVALID );
00156     packet.sessionID = _session->getID();
00157     packet.objectID  = _id;
00158     return node->send( packet );
00159 }
00160 
00161 bool Object::send( NodePtr node, ObjectPacket& packet, 
00162                    const std::string& string )
00163 {
00164     EQASSERT( _session ); EQASSERT( _id != EQ_ID_INVALID );
00165     packet.sessionID = _session->getID();
00166     packet.objectID  = _id;
00167     return node->send( packet, string );
00168 }
00169 
00170 bool Object::send( NodePtr node, ObjectPacket& packet, 
00171                    const void* data, const uint64_t size )
00172 {
00173     EQASSERT( _session ); EQASSERT( _id != EQ_ID_INVALID );
00174     packet.sessionID = _session->getID();
00175     packet.objectID  = _id;
00176     return node->send( packet, data, size );
00177 }
00178 
00179 void Object::becomeMaster()
00180 {
00181     EQASSERT( _session );
00182     EQASSERT( !isMaster( ));
00183     EQASSERT( _id != EQ_ID_INVALID );
00184 
00185     // save location of master instance
00186     Session* session = _session;
00187     const NodeID& masterNodeID = session->getIDMaster( _id );
00188     EQASSERT( masterNodeID != NodeID::ZERO );
00189 
00190     NodePtr localNode = session->getLocalNode();
00191     NodePtr master    = localNode->getNode( masterNodeID );
00192     EQASSERT( master.isValid( ));
00193 
00194     const uint32_t masterID = _id;
00195     const uint32_t masterInstanceID = getMasterInstanceID();
00196     EQASSERT( masterInstanceID != EQ_ID_INVALID );
00197 
00198     // remap as master
00199     session->unmapObject( this );
00200     sync();
00201     session->registerObject( this );
00202 
00203     EQINFO << "became master " << masterID << '.' << masterInstanceID << " to "
00204            << _id << '.' << _instanceID << std::endl;
00205 
00206     // tell old master
00207     ObjectNewMasterPacket packet;
00208     packet.sessionID  = session->getID();
00209     packet.objectID   = masterID;
00210     packet.instanceID = masterInstanceID;
00211     packet.newMasterID = _id;
00212     packet.newMasterInstanceID = _instanceID;
00213     packet.changeType = getChangeType();
00214 
00215     master->send( packet );
00216 
00217     // subscribe slave (old master)
00218     _cm->addOldMaster( master, masterInstanceID );
00219 }
00220 
00221 uint32_t Object::commit()
00222 {
00223     if( !isDirty( ))
00224         return getVersion();
00225 
00226     const uint32_t requestID = commitNB();
00227     return commitSync( requestID );
00228 }
00229 
00230 void Object::setupChangeManager( const Object::ChangeType type, 
00231                                  const bool master, 
00232                                  const uint32_t masterInstanceID )
00233 {
00234     switch( type )
00235     {
00236         case Object::STATIC:
00237             if( master )
00238                 _setChangeManager( new StaticMasterCM( this ));
00239             else
00240                 _setChangeManager( new StaticSlaveCM( this ));
00241             break;
00242         case Object::INSTANCE:
00243             if( master )
00244                 _setChangeManager( new FullMasterCM( this ));
00245             else
00246                 _setChangeManager( new FullSlaveCM( this, masterInstanceID ));
00247             break;
00248         case Object::DELTA:
00249             if( master )
00250                 _setChangeManager( new DeltaMasterCM( this ));
00251             else
00252                 _setChangeManager( new FullSlaveCM( this, masterInstanceID ));
00253             break;
00254         case Object::UNBUFFERED:
00255             if( master )
00256                 _setChangeManager( new UnbufferedMasterCM( this ));
00257             else
00258                 _setChangeManager( new FullSlaveCM( this, masterInstanceID ));
00259             break;
00260 
00261         default: EQUNIMPLEMENTED;
00262     }
00263 }
00264 
00265 //---------------------------------------------------------------------------
00266 // ChangeManager forwarders
00267 //---------------------------------------------------------------------------
00268 
00269 bool Object::isMaster() const
00270 {
00271     return _cm->isMaster();
00272 }
00273 
00274 uint32_t Object::commitNB()
00275 {
00276     return _cm->commitNB();
00277 }
00278 
00279 uint32_t Object::commitSync( const uint32_t commitID ) 
00280 {
00281     return _cm->commitSync( commitID );
00282 }
00283 
00284 void Object::obsolete( const uint32_t version )
00285 {
00286     _cm->obsolete( version );
00287 }
00288 
00289 void Object::setAutoObsolete( const uint32_t count, const uint32_t flags )
00290 {
00291     _cm->setAutoObsolete( count, flags );
00292 }
00293 
00294 uint32_t Object::getAutoObsoleteCount() const 
00295 {
00296     return _cm->getAutoObsoleteCount();
00297 }
00298 
00299 uint32_t Object::sync( const uint32_t version )
00300 {
00301     return _cm->sync( version );
00302 }
00303 
00304 uint32_t Object::getHeadVersion() const
00305 {
00306     return _cm->getHeadVersion();
00307 }
00308 
00309 uint32_t Object::getVersion() const
00310 {
00311     return _cm->getVersion();
00312 }
00313 
00314 uint32_t Object::getOldestVersion() const
00315 {
00316     return _cm->getOldestVersion();
00317 }
00318 
00319 uint32_t Object::getMasterInstanceID() const
00320 {
00321     return _cm->getMasterInstanceID();
00322 }
00323 
00324 void Object::addSlave( NodePtr node, const uint32_t instanceID, 
00325                        const uint32_t version )
00326 {
00327     _cm->addSlave( node, instanceID, version );
00328 }
00329 
00330 void Object::removeSlave( NodePtr node )
00331 {
00332     _cm->removeSlave( node );
00333 }
00334 
00335 CommandResult Object::_cmdForward( Command& command )
00336 {
00337     return _cm->invokeCommand( command );
00338 }
00339 
00340 CommandResult Object::_cmdNewMaster( Command& command )
00341 {
00342     ObjectNewMasterPacket* packet = command.getPacket<ObjectNewMasterPacket>();
00343     EQVERB << "become slave " << _id << '.' << _instanceID << " to "
00344            << packet->newMasterID << '.' << packet->newMasterInstanceID
00345            << std::endl;
00346     EQASSERT( isMaster( ));
00347 
00348     const uint32_t instanceID = _instanceID; // save - reset during deregister
00349     Session* session = getSession();
00350     session->deregisterObject( this );
00351 
00352     setupChangeManager( static_cast< Object::ChangeType >( packet->changeType ),
00353                         false, packet->newMasterInstanceID );
00354     session->attachObject( this, packet->newMasterID, instanceID );
00355 
00356     return COMMAND_HANDLED;
00357 }
00358 
00359 }
00360 }
Generated on Mon Aug 10 18:58:40 2009 for Equalizer 0.9 by  doxygen 1.5.8