unbufferedMasterCM.cpp

00001 
00002 /* Copyright (c) 2007-2009, Stefan Eilemann <eile@equalizergraphics.com> 
00003  *
00004  * This library is free software; you can redistribute it and/or modify it under
00005  * the terms of the GNU Lesser General Public License version 2.1 as published
00006  * by the Free Software Foundation.
00007  *  
00008  * This library is distributed in the hope that it will be useful, but WITHOUT
00009  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00010  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00011  * details.
00012  * 
00013  * You should have received a copy of the GNU Lesser General Public License
00014  * along with this library; if not, write to the Free Software Foundation, Inc.,
00015  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00016  */
00017 
00018 #include "unbufferedMasterCM.h"
00019 
00020 #include "command.h"
00021 #include "commands.h"
00022 #include "log.h"
00023 #include "node.h"
00024 #include "object.h"
00025 #include "objectDeltaDataOStream.h"
00026 #include "packets.h"
00027 #include "session.h"
00028 
00029 using namespace eq::base;
00030 using namespace std;
00031 
00032 namespace eq
00033 {
00034 namespace net
00035 {
00036 UnbufferedMasterCM::UnbufferedMasterCM( Object* object )
00037         : _object( object )
00038         , _version( Object::VERSION_NONE )
00039 {
00040     registerCommand( CMD_OBJECT_COMMIT, 
00041        CommandFunc<UnbufferedMasterCM>( this, &UnbufferedMasterCM::_cmdCommit ),
00042                      0 );
00043     // sync commands are send to any instance, even the master gets the command
00044     registerCommand( CMD_OBJECT_DELTA_DATA, 
00045       CommandFunc<UnbufferedMasterCM>( this, &UnbufferedMasterCM::_cmdDiscard ),
00046                      0 );
00047     registerCommand( CMD_OBJECT_DELTA, 
00048       CommandFunc<UnbufferedMasterCM>( this, &UnbufferedMasterCM::_cmdDiscard ),
00049                      0 );
00050 }
00051 
00052 UnbufferedMasterCM::~UnbufferedMasterCM()
00053 {
00054     if( !_slaves.empty( ))
00055         EQWARN << _slaves.size() 
00056                << " slave nodes subscribed during deregisterObject of "
00057                << _object->getID() << '.' << _object->getInstanceID() << endl;
00058     _slaves.clear();
00059 }
00060 
00061 uint32_t UnbufferedMasterCM::commitNB()
00062 {
00063     EQASSERTINFO( _object->getChangeType() == Object::UNBUFFERED,
00064                   "Object type " << typeid(*this).name( ));
00065 
00066     ObjectCommitPacket packet;
00067     packet.instanceID = _object->_instanceID;
00068     packet.requestID  = _requestHandler.registerRequest();
00069 
00070     _object->send( _object->getLocalNode(), packet );
00071     return packet.requestID;
00072 }
00073 
00074 uint32_t UnbufferedMasterCM::commitSync( const uint32_t commitID )
00075 {
00076     uint32_t version = Object::VERSION_NONE;
00077     _requestHandler.waitRequest( commitID, version );
00078     return version;
00079 }
00080 
00081 void UnbufferedMasterCM::addSlave( NodePtr node, const uint32_t instanceID, 
00082                                    const uint32_t version )
00083 {
00084     CHECK_THREAD( _thread );
00085     EQASSERT( version == Object::VERSION_OLDEST ||
00086               version == Object::VERSION_NONE   ||
00087               version == _version );
00088 
00089     // add to subscribers
00090     ++_slavesCount[ node->getNodeID() ];
00091     _slaves.push_back( node );
00092     stde::usort( _slaves );
00093 
00094     EQLOG( LOG_OBJECTS ) << "Object id " << _object->_id << " v" << _version
00095                          << ", instantiate on " << node->getNodeID() << endl;
00096 
00097     // send instance data
00098     ObjectInstanceDataOStream os( _object );
00099     os.setVersion( _version );
00100     os.setInstanceID( instanceID );
00101     
00102     if( version != Object::VERSION_NONE ) // send current data
00103     {
00104         os.enable( node );
00105         _object->getInstanceData( os );
00106         os.disable();
00107     }
00108 
00109     if( !os.hasSentData( )) // if no data, send empty packet to set version
00110     {
00111         os.enable( node );
00112         os.writeOnce( 0, 0 );
00113         os.disable();
00114     }
00115 }
00116 
00117 void UnbufferedMasterCM::removeSlave( NodePtr node )
00118 {
00119     CHECK_THREAD( _thread );
00120     // remove from subscribers
00121     const NodeID& nodeID = node->getNodeID();
00122     EQASSERT( _slavesCount[ nodeID ] != 0 );
00123 
00124     --_slavesCount[ nodeID ];
00125     if( _slavesCount[ nodeID ] == 0 )
00126     {
00127         NodeVector::iterator i = find( _slaves.begin(), _slaves.end(), node );
00128         EQASSERT( i != _slaves.end( ));
00129         _slaves.erase( i );
00130         _slavesCount.erase( nodeID );
00131     }
00132 }
00133 
00134 void UnbufferedMasterCM::addOldMaster( NodePtr node, const uint32_t instanceID )
00135 {
00136     // add to subscribers
00137     ++_slavesCount[ node->getNodeID() ];
00138     _slaves.push_back( node );
00139     stde::usort( _slaves );
00140 
00141     ObjectVersionPacket packet;
00142     packet.instanceID = instanceID;
00143     packet.version    = _version;
00144     _object->send( node, packet );
00145 }
00146 
00147 
00148 //---------------------------------------------------------------------------
00149 // command handlers
00150 //---------------------------------------------------------------------------
00151 CommandResult UnbufferedMasterCM::_cmdCommit( Command& command )
00152 {
00153     CHECK_THREAD( _thread );
00154     const ObjectCommitPacket* packet = command.getPacket<ObjectCommitPacket>();
00155     EQLOG( LOG_OBJECTS ) << "commit v" << _version << " " << command << endl;
00156     if( _slaves.empty( ))
00157     {
00158         _requestHandler.serveRequest( packet->requestID, _version );
00159         return COMMAND_HANDLED;
00160     }
00161 
00162     ObjectDeltaDataOStream os( _object );
00163     os.setVersion( _version + 1 );
00164     os.enable( _slaves );
00165     _object->pack( os );
00166     os.disable();
00167 
00168     if( os.hasSentData( ))
00169     {
00170         ++_version;
00171         EQASSERT( _version );
00172         EQLOG( LOG_OBJECTS ) << "Committed v" << _version << ", id " 
00173                              << _object->getID() << endl;
00174     }
00175 
00176     _requestHandler.serveRequest( packet->requestID, _version );
00177     return COMMAND_HANDLED;
00178 }
00179 }
00180 }
Generated on Mon Aug 10 18:58:41 2009 for Equalizer 0.9 by  doxygen 1.5.8