session.cpp

00001 
00002 /* Copyright (c) 2005-2009, Stefan Eilemann <eile@equalizergraphics.com> 
00003  *
00004  * This library is free software; you can redistribute it and/or modify it under
00005  * the terms of the GNU Lesser General Public License version 2.1 as published
00006  * by the Free Software Foundation.
00007  *  
00008  * This library is distributed in the hope that it will be useful, but WITHOUT
00009  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00010  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00011  * details.
00012  * 
00013  * You should have received a copy of the GNU Lesser General Public License
00014  * along with this library; if not, write to the Free Software Foundation, Inc.,
00015  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00016  */
00017 
00018 #include "session.h"
00019 
00020 #include "barrier.h"
00021 #include "command.h"
00022 #include "connection.h"
00023 #include "connectionDescription.h"
00024 #include "log.h"
00025 #include "objectCM.h"
00026 #include "packets.h"
00027 #include "session.h"
00028 
00029 #ifndef WIN32
00030 #  include <alloca.h>
00031 #endif
00032 
00033 using namespace eq::base;
00034 using namespace std;
00035 
00036 namespace eq
00037 {
00038 namespace net
00039 {
00040 #define MIN_ID_RANGE 1024
00041 
00042 Session::Session()
00043         : _requestHandler( true /* threadSafe */ )
00044         , _id(EQ_ID_INVALID)
00045         , _isMaster(false)
00046         , _idPool( 0 ) // Master pool is filled in Node::registerSession
00047         , _instanceIDs( 0 ) 
00048 {
00049     EQINFO << "New Session @" << (void*)this << endl;
00050 }
00051 
00052 Session::~Session()
00053 {
00054     EQINFO << "Delete Session @" << (void*)this << endl;
00055     EQASSERTINFO( _id == EQ_ID_INVALID, "Session still mapped during deletion");
00056 
00057     _id        = EQ_ID_INVALID;
00058     _isMaster  = false;
00059     _localNode = 0;
00060     _server    = 0;
00061     
00062 #ifndef NDEBUG
00063     if( !_objects.empty( ))
00064     {
00065         EQWARN << _objects.size() << " attached objects in destructor" << endl;
00066         
00067         for( ObjectVectorHash::const_iterator i = _objects.begin();
00068              i != _objects.end(); ++i )
00069         {
00070             const ObjectVector& objects = i->second;
00071             EQWARN << "  " << objects.size() << " objects with id " 
00072                    << i->first << endl;
00073             
00074             for( ObjectVector::const_iterator j = objects.begin();
00075                  j != objects.end(); ++j )
00076             {
00077                 const Object* object = *j;
00078                 EQINFO << "    object type " << typeid(*object).name() 
00079                        << endl;
00080             }
00081         }
00082     }
00083 #endif
00084     _objects.clear();
00085 }
00086 
00087 void Session::_setLocalNode( NodePtr node )
00088 {
00089     EQASSERT( _requestHandler.isEmpty( ));
00090 
00091     _localNode = node;
00092     if( !_localNode.isValid( ))
00093         return; // TODO deregister command functions?
00094 
00095     notifyMapped( node );
00096 }
00097 
00098 CommandQueue* Session::getCommandThreadQueue()
00099 {
00100     EQASSERT( _localNode.isValid( ));
00101     if( !_localNode )
00102         return 0;
00103 
00104     return _localNode->getCommandThreadQueue();
00105 }
00106 
00107 void Session::notifyMapped( NodePtr node )
00108 {
00109     EQASSERT( node.isValid( ));
00110 
00111     CommandQueue* queue = node->getCommandThreadQueue();
00112 
00113     registerCommand( CMD_SESSION_ACK_REQUEST, 
00114                      CommandFunc<Session>( this, &Session::_cmdAckRequest ), 0);
00115     registerCommand( CMD_SESSION_GEN_IDS, 
00116                      CommandFunc<Session>( this, &Session::_cmdGenIDs ),
00117                      queue );
00118     registerCommand( CMD_SESSION_GEN_IDS_REPLY,
00119                      CommandFunc<Session>( this, &Session::_cmdGenIDsReply ),
00120                      queue );
00121     registerCommand( CMD_SESSION_SET_ID_MASTER,
00122                      CommandFunc<Session>( this, &Session::_cmdSetIDMaster ),
00123                      queue );
00124     registerCommand( CMD_SESSION_GET_ID_MASTER, 
00125                      CommandFunc<Session>( this, &Session::_cmdGetIDMaster ),
00126                      queue );
00127     registerCommand( CMD_SESSION_GET_ID_MASTER_REPLY,
00128                   CommandFunc<Session>( this, &Session::_cmdGetIDMasterReply ),
00129                      queue );
00130     registerCommand( CMD_SESSION_ATTACH_OBJECT,
00131                      CommandFunc<Session>( this, &Session::_cmdAttachObject ),
00132                      0 );
00133     registerCommand( CMD_SESSION_DETACH_OBJECT,
00134                      CommandFunc<Session>( this, &Session::_cmdDetachObject ),
00135                      0 );
00136     registerCommand( CMD_SESSION_MAP_OBJECT,
00137                      CommandFunc<Session>( this, &Session::_cmdMapObject ), 0 );
00138     registerCommand( CMD_SESSION_SUBSCRIBE_OBJECT,
00139                    CommandFunc<Session>( this, &Session::_cmdSubscribeObject ),
00140                      queue );
00141     registerCommand( CMD_SESSION_SUBSCRIBE_OBJECT_SUCCESS,
00142             CommandFunc<Session>( this, &Session::_cmdSubscribeObjectSuccess ),
00143                      0 );
00144     registerCommand( CMD_SESSION_SUBSCRIBE_OBJECT_REPLY,
00145               CommandFunc<Session>( this, &Session::_cmdSubscribeObjectReply ),
00146                      queue );
00147     registerCommand( CMD_SESSION_UNSUBSCRIBE_OBJECT,
00148                   CommandFunc<Session>( this, &Session::_cmdUnsubscribeObject ),
00149                      queue );
00150 }
00151 
00152 //---------------------------------------------------------------------------
00153 // identifier generation
00154 //---------------------------------------------------------------------------
00155 uint32_t Session::genIDs( const uint32_t range )
00156 {
00157     uint32_t id = _idPool.genIDs( range );
00158     if( id != EQ_ID_INVALID || _isMaster )
00159     {
00160         if( id == EQ_ID_INVALID )
00161             EQWARN << "Out of session identifiers" << std::endl;
00162         return id;
00163     }
00164 
00165     SessionGenIDsPacket packet;
00166     packet.requestID = _requestHandler.registerRequest();
00167     packet.range     = range;
00168 
00169     send( packet );
00170     _requestHandler.waitRequest( packet.requestID, id );
00171     
00172     if( id == EQ_ID_INVALID )
00173         EQWARN << "Out of session identifiers" << std::endl;
00174 
00175     return id;
00176 }
00177 
00178 void Session::freeIDs( const uint32_t start, const uint32_t range )
00179 {
00180     _idPool.freeIDs( start, range );
00181     // TODO: could return IDs to master sometimes ?
00182 }
00183 
00184 //---------------------------------------------------------------------------
00185 // identifier master node mapping
00186 //---------------------------------------------------------------------------
00187 void Session::setIDMaster( const uint32_t id, const NodeID& master )
00188 {
00189     CHECK_NOT_THREAD( _commandThread );
00190     _setIDMasterSync( _setIDMasterNB( id, master ));
00191 }
00192 
00193 uint32_t Session::_setIDMasterNB( const uint32_t id, const NodeID& master )
00194 {
00195     CHECK_NOT_THREAD( _commandThread );
00196 
00197     SessionSetIDMasterPacket packet;
00198     packet.id       = id;
00199     packet.masterID = master;
00200     packet.masterID.convertToNetwork();
00201     
00202     if( !_isMaster )
00203         _sendLocal( packet ); // set on our slave instance (fire&forget)
00204 
00205     packet.requestID = _requestHandler.registerRequest();
00206     send( packet );       // set on master instance (need to wait for ack)
00207     return packet.requestID;
00208 }
00209 
00210 void Session::_setIDMasterSync( const uint32_t requestID )
00211 {
00212     _requestHandler.waitRequest( requestID );
00213 }
00214 
00215 const NodeID& Session::_pollIDMaster( const uint32_t id ) const 
00216 {
00217     NodeIDHash::const_iterator i = _idMasters.find( id );
00218     if( i == _idMasters.end( ))
00219         return NodeID::ZERO;
00220 
00221     return i->second;
00222 }
00223 
00224 const NodeID& Session::getIDMaster( const uint32_t id )
00225 {
00226     _idMasterMutex.set();
00227     const NodeID& master = _pollIDMaster( id );
00228     _idMasterMutex.unset();
00229         
00230     if( master != NodeID::ZERO || _isMaster )
00231         return master;
00232 
00233     // ask session master instance
00234     SessionGetIDMasterPacket packet;
00235     packet.requestID = _requestHandler.registerRequest();
00236     packet.id        = id;
00237 
00238     send( packet );
00239     _requestHandler.waitRequest( packet.requestID );
00240 
00241     ScopedMutex mutex( _idMasterMutex );
00242     EQLOG( LOG_OBJECTS ) << "Master node for id " << id << ": " 
00243         << _pollIDMaster( id ) << endl;
00244     return _pollIDMaster( id );
00245 }
00246 
00247 //---------------------------------------------------------------------------
00248 // object mapping
00249 //---------------------------------------------------------------------------
00250 void Session::attachObject( Object* object, const uint32_t id, 
00251                             const uint32_t instanceID )
00252 {
00253     EQASSERT( object );
00254     CHECK_NOT_THREAD( _receiverThread );
00255 
00256     SessionAttachObjectPacket packet;
00257     packet.objectID = id;
00258     packet.objectInstanceID = instanceID;
00259     packet.requestID = _requestHandler.registerRequest( object );
00260 
00261     _sendLocal( packet );
00262     _requestHandler.waitRequest( packet.requestID );
00263 }
00264 
00265 void Session::_attachObject( Object* object, const uint32_t id, 
00266                              const uint32_t inInstanceID )
00267 {
00268     EQASSERT( object );
00269     CHECK_THREAD( _receiverThread );
00270 
00271     uint32_t instanceID = inInstanceID;
00272     if( inInstanceID == EQ_ID_INVALID )
00273     {
00274         _instanceIDs = ( _instanceIDs + 1 ) % IDPool::MAX_CAPACITY;
00275         instanceID = _instanceIDs;
00276     }
00277     EQASSERT( instanceID <= _instanceIDs );
00278 
00279     object->attachToSession( id, instanceID, this );
00280 
00281     _objectsMutex.set();
00282     ObjectVector& objects = _objects[ id ];
00283     objects.push_back( object );
00284     _objectsMutex.unset();
00285 
00286     getLocalNode()->flushCommands(); // redispatch pending commands
00287     EQLOG( LOG_OBJECTS ) << "Attached " << typeid( *object ).name()
00288                          << " to id " << id << endl;
00289 }
00290 
00291 void Session::detachObject( Object* object )
00292 {
00293     EQASSERT( object );
00294     CHECK_NOT_THREAD( _receiverThread );
00295 
00296     SessionDetachObjectPacket packet;
00297     packet.requestID = _requestHandler.registerRequest();
00298     packet.objectID  = object->getID();
00299     packet.objectInstanceID  = object->getInstanceID();
00300 
00301     _sendLocal( packet );
00302     _requestHandler.waitRequest( packet.requestID );
00303 }
00304 
00305 void Session::_detachObject( Object* object )
00306 {
00307     EQASSERT( object );
00308     CHECK_THREAD( _receiverThread );
00309 
00310     const uint32_t id = object->getID();
00311     EQASSERT( id != EQ_ID_INVALID );
00312     EQASSERT( _objects.find( id ) != _objects.end( ));
00313 
00314     EQLOG( LOG_OBJECTS ) << "Detach " << typeid( *object ).name() 
00315                          << " from id " << id << endl;
00316 
00317     ObjectVector&          objects = _objects[ id ];
00318     ObjectVector::iterator iter = find( objects.begin(),objects.end(), object );
00319     EQASSERT( iter != objects.end( ));
00320 
00321     _objectsMutex.set();
00322 
00323     objects.erase( iter );
00324     if( objects.empty( ))
00325         _objects.erase( id );
00326 
00327     _objectsMutex.unset();
00328     EQASSERT( object->getInstanceID() != EQ_ID_INVALID );
00329 
00330     object->detachFromSession();
00331     return;
00332 }
00333 
00334 bool Session::mapObject( Object* object, const uint32_t id,
00335                          const uint32_t version )
00336 {
00337     const uint32_t requestID = mapObjectNB( object, id, version );
00338     return mapObjectSync( requestID );
00339 }
00340 
00341 uint32_t Session::mapObjectNB( Object* object, const uint32_t id,
00342                                const uint32_t version )
00343 {
00344     EQASSERT( object );
00345     EQLOG( LOG_OBJECTS ) << "Mapping " << typeid( *object ).name() << " to id "
00346                          << id << " version " << version << endl;
00347 
00348     EQASSERT( object->getID() == EQ_ID_INVALID );
00349     EQASSERT( id != EQ_ID_INVALID );
00350     EQASSERT( !_localNode->inCommandThread( ));
00351         
00352     NodeID masterNodeID;
00353     if( !object->isMaster( ))
00354     {
00355         // Connect master node, can't do that from the command thread!
00356         masterNodeID = getIDMaster( id );
00357         if( masterNodeID == NodeID::ZERO )
00358         {
00359             EQWARN << "Can't find master node for object id " << id << endl;
00360             return EQ_ID_INVALID;
00361         }
00362 
00363         NodePtr master = _localNode->connect( masterNodeID );
00364         if( !master || master->getState() == Node::STATE_STOPPED )
00365         {
00366             EQWARN << "Can't connect master node with id " << masterNodeID
00367                    << " for object id " << id << endl;
00368             return EQ_ID_INVALID;
00369         }
00370     }
00371 
00372     SessionMapObjectPacket packet;
00373     packet.requestID    = _requestHandler.registerRequest( object );
00374     packet.objectID     = id;
00375     packet.version      = version;
00376     packet.masterNodeID = masterNodeID;
00377 
00378     _sendLocal( packet );
00379     return packet.requestID;
00380 }
00381 
00382 bool Session::mapObjectSync( const uint32_t requestID )
00383 {
00384     if( requestID == EQ_ID_INVALID )
00385         return false;
00386 
00387     void* data = _requestHandler.getRequestData( requestID );    
00388     if( data == 0 )
00389         return false;
00390 
00391     Object* object = EQSAFECAST( Object*, data );
00392 
00393     uint32_t version = Object::VERSION_NONE;
00394     _requestHandler.waitRequest( requestID, version );
00395 
00396     const bool mapped = ( object->getID() != EQ_ID_INVALID );
00397     if( mapped && !object->isMaster( ))
00398     {
00399         object->_cm->applyMapData(); // apply instance data on slave instances
00400         if( version != Object::VERSION_OLDEST && 
00401             version != Object::VERSION_NONE )
00402         {
00403             object->sync( version );
00404         }
00405     }
00406 
00407     EQLOG( LOG_OBJECTS ) << "Mapped " << typeid( *object ).name() << " to id " 
00408                          << object->getID() << endl;
00409     return mapped;
00410 }
00411 
00412 void Session::unmapObject( Object* object )
00413 {
00414     const uint32_t id = object->getID();
00415     if( id == EQ_ID_INVALID ) // not registered
00416         return;
00417 
00418     EQLOG( LOG_OBJECTS ) << "Unmap " << typeid( *object ).name() << " from id "
00419         << object->getID() << endl;
00420 
00421     // Slave: send unsubscribe to master. Master will send detach packet.
00422     if( !object->isMaster( ))
00423     {
00424         CHECK_NOT_THREAD( _commandThread );
00425         const uint32_t masterInstanceID = object->getMasterInstanceID();
00426         if( masterInstanceID != EQ_ID_INVALID )
00427         {
00428             _idMasterMutex.set();
00429             const NodeID& masterNodeID = _pollIDMaster( id );
00430             _idMasterMutex.unset();
00431 
00432             NodePtr localNode = _localNode;
00433             NodePtr master    = localNode.isValid() ? 
00434                                     localNode->getNode( masterNodeID ) : 0;
00435             if( master.isValid( ))
00436             {
00437                 SessionUnsubscribeObjectPacket packet;
00438                 packet.requestID = _requestHandler.registerRequest();
00439                 packet.objectID  = id;
00440                 packet.masterInstanceID = masterInstanceID;
00441                 packet.slaveInstanceID  = object->getInstanceID();
00442                 send( master, packet );
00443 
00444                 _requestHandler.waitRequest( packet.requestID );
00445                 return;
00446             }
00447             EQERROR << "Master node for object id " << id << " not connected"
00448                     << endl;
00449         }
00450     }
00451 
00452     // Master (or no unsubscribe sent): Detach directly
00453     detachObject( object );
00454 }
00455 
00456 bool Session::registerObject( Object* object )
00457 {
00458     EQASSERT( object->getID() == EQ_ID_INVALID );
00459 
00460     const uint32_t id = genIDs( 1 );
00461     EQASSERT( id != EQ_ID_INVALID );
00462     if( id == EQ_ID_INVALID )
00463         return false;
00464 
00465     const uint32_t requestID = _setIDMasterNB( id, _localNode->getNodeID( ));
00466     object->setupChangeManager( object->getChangeType(), true );
00467 
00468     EQCHECK( mapObject( object, id ));
00469     _setIDMasterSync( requestID ); // sync, master knows our ID now
00470     EQLOG( LOG_OBJECTS ) << "Registered " << typeid( *object ).name()
00471                          << " to id " << id << endl;
00472     return true;
00473 }
00474 
00475 void Session::deregisterObject( Object* object )
00476 {
00477     const uint32_t id = object->getID();
00478 
00479     EQLOG( LOG_OBJECTS ) << "Deregister " << typeid( *object ).name() 
00480                          << " from id " << id << endl;
00481 
00482     // TODO unsetIDMaster ?
00483     unmapObject( object );
00484     freeIDs( id, 1 );
00485 }
00486 
00487 //===========================================================================
00488 // Packet handling
00489 //===========================================================================
00490 bool Session::dispatchCommand( Command& command )
00491 {
00492     EQVERB << "dispatch " << command << endl;
00493     EQASSERT( command.isValid( ));
00494     CHECK_THREAD( _receiverThread );
00495 
00496     switch( command->datatype )
00497     {
00498         case DATATYPE_EQNET_SESSION:
00499             return Dispatcher::dispatchCommand( command );
00500 
00501         case DATATYPE_EQNET_OBJECT:
00502         {
00503             EQASSERT( command.isValid( ));
00504             const ObjectPacket* objPacket = command.getPacket<ObjectPacket>();
00505             const uint32_t      id        = objPacket->objectID;
00506 
00507 
00508             if( _objects.find( id ) == _objects.end( ))
00509             {
00510                 EQVERB << "no objects to dispatch command, redispatching " 
00511                        << objPacket << endl;
00512                 return false;
00513             }
00514             EQASSERTINFO( !_objects[id].empty(), id );
00515 
00516             Object* object = _objects[id][0];
00517             EQASSERT( object );
00518 
00519             return object->dispatchCommand( command );
00520         }
00521 
00522         default:
00523             EQABORT( "Unknown datatype " << command->datatype << " for "
00524                      << command );
00525             return true;
00526     }
00527 }
00528 
00529 CommandResult Session::invokeCommand( Command& command )
00530 {
00531     EQVERB << "invoke " << command << endl;
00532     EQASSERT( command.isValid( ));
00533 
00534     switch( command->datatype )
00535     {
00536         case DATATYPE_EQNET_SESSION:
00537             return Dispatcher::invokeCommand( command );
00538 
00539         case DATATYPE_EQNET_OBJECT:
00540             return _invokeObjectCommand( command );
00541 
00542         default:
00543             EQWARN << "Unhandled command " << command << endl;
00544             return COMMAND_ERROR;
00545     }
00546 }
00547 
00548 CommandResult Session::_invokeObjectCommand( Command& command )
00549 {
00550     EQASSERT( command.isValid( ));
00551     const ObjectPacket* objPacket = command.getPacket<ObjectPacket>();
00552     const uint32_t      id        = objPacket->objectID;
00553 
00554     _objectsMutex.set();
00555 
00556     EQASSERTINFO( _objects.find( id ) != _objects.end(), 
00557                   "No objects to handle command " << objPacket );
00558 
00559     // create copy of objects vector for thread-safety
00560     ObjectVector objects = _objects[id];
00561     EQASSERTINFO( !objects.empty(), objPacket );
00562 
00563     _objectsMutex.unset();
00564 
00565     for( ObjectVector::const_iterator i = objects.begin();
00566          i != objects.end(); ++ i )
00567     {
00568         Object* object = *i;
00569         const bool isInstance = 
00570             ( objPacket->instanceID == object->getInstanceID( ));
00571 
00572         if( objPacket->instanceID == EQ_ID_ANY || isInstance )
00573         {
00574             if( !command.isValid( ))
00575             {
00576                 // NOTE: command got invalidated (last object was a pushed
00577                 // command to another thread) . Object should push copy of
00578                 // command, or we should make it clear what invalidating a
00579                 // command means here (same as discard?)
00580                 EQERROR << "Object of type " << typeid(*object).name()
00581                         << " invalidated command send all instances" << endl;
00582                 return COMMAND_ERROR;
00583             }
00584 
00585             const CommandResult result = object->invokeCommand( command );
00586             switch( result )
00587             {
00588                 case COMMAND_DISCARD:
00589                     return COMMAND_DISCARD;
00590 
00591                 case COMMAND_ERROR:
00592                     EQERROR << "Error handling command " << objPacket
00593                             << " for object of type " << typeid(*object).name()
00594                             << endl;
00595                     return COMMAND_ERROR;
00596 
00597                 case COMMAND_HANDLED:
00598                     if( isInstance )
00599                         return result;
00600                     break;
00601 
00602                 default:
00603                     EQUNREACHABLE;
00604             }
00605         }
00606     }
00607     if( objPacket->instanceID == EQ_ID_ANY )
00608         return COMMAND_HANDLED;
00609 
00610     EQWARN << "instance not found for " << objPacket << endl;
00611     return COMMAND_ERROR;
00612 }
00613 
00614 CommandResult Session::_cmdAckRequest( Command& command )
00615 {
00616     const SessionAckRequestPacket* packet = 
00617         command.getPacket<SessionAckRequestPacket>();
00618     EQASSERT( packet->requestID != EQ_ID_INVALID );
00619 
00620     _requestHandler.serveRequest( packet->requestID );
00621     return COMMAND_HANDLED;
00622 }
00623 
00624 
00625 CommandResult Session::_cmdGenIDs( Command& command )
00626 {
00627     CHECK_THREAD( _commandThread );
00628     const SessionGenIDsPacket* packet =command.getPacket<SessionGenIDsPacket>();
00629     EQVERB << "Cmd gen IDs: " << packet << endl;
00630 
00631     SessionGenIDsReplyPacket reply( packet );
00632     const uint32_t range = EQ_MAX( packet->range, MIN_ID_RANGE );
00633 
00634     reply.id = _idPool.genIDs( range );
00635     reply.allocated = range;
00636     send( command.getNode(), reply );
00637     return COMMAND_HANDLED;
00638 }
00639 
00640 CommandResult Session::_cmdGenIDsReply( Command& command )
00641 {
00642     CHECK_THREAD( _commandThread );
00643     const SessionGenIDsReplyPacket* packet =
00644         command.getPacket<SessionGenIDsReplyPacket>();
00645     EQVERB << "Cmd gen IDs reply: " << packet << endl;
00646 
00647     _requestHandler.serveRequest( packet->requestID, packet->id );
00648 
00649     const size_t additional = packet->allocated - packet->requested;
00650     if( packet->id != EQ_ID_INVALID && additional > 0 )
00651         // Merge additional identifiers into local pool
00652         _idPool.freeIDs( packet->id + packet->requested, additional );
00653 
00654     return COMMAND_HANDLED;
00655 }
00656 
00657 CommandResult Session::_cmdSetIDMaster( Command& command )
00658 {
00659     CHECK_THREAD( _commandThread );
00660     const SessionSetIDMasterPacket* packet = 
00661         command.getPacket<SessionSetIDMasterPacket>();
00662     EQLOG( LOG_OBJECTS ) << "Cmd set ID master: " << packet << endl;
00663 
00664     NodeID nodeID = packet->masterID;
00665     nodeID.convertToHost();
00666     EQASSERT( nodeID != NodeID::ZERO );
00667 
00668     ScopedMutex mutex( _idMasterMutex );
00669     _idMasters[ packet->id ] = nodeID;
00670 
00671     if( packet->requestID != EQ_ID_INVALID ) // need to ack set operation
00672     {
00673         NodePtr node = command.getNode();
00674 
00675         if( node == _localNode ) // OPT
00676             _requestHandler.serveRequest( packet->requestID );
00677         else
00678         {
00679             SessionAckRequestPacket reply( packet->requestID );
00680             send( node, reply );
00681         }
00682     }
00683     return COMMAND_HANDLED;
00684 }
00685 
00686 CommandResult Session::_cmdGetIDMaster( Command& command )
00687 {
00688     CHECK_THREAD( _commandThread );
00689     const SessionGetIDMasterPacket* packet =
00690         command.getPacket<SessionGetIDMasterPacket>();
00691     EQLOG( LOG_OBJECTS ) << "handle get idMaster " << packet << endl;
00692 
00693     SessionGetIDMasterReplyPacket reply( packet );
00694     reply.masterID = _pollIDMaster( packet->id );
00695 
00696     send( command.getNode(), reply );
00697     return COMMAND_HANDLED;
00698 }
00699 
00700 CommandResult Session::_cmdGetIDMasterReply( Command& command )
00701 {
00702     CHECK_THREAD( _commandThread );
00703     const SessionGetIDMasterReplyPacket* packet = 
00704         command.getPacket<SessionGetIDMasterReplyPacket>();
00705     EQLOG( LOG_OBJECTS ) << "handle get idMaster reply " << packet << endl;
00706 
00707     NodeID nodeID = packet->masterID;
00708     nodeID.convertToHost();
00709 
00710     if( nodeID != NodeID::ZERO )
00711     {
00712         ScopedMutex mutex( _idMasterMutex );
00713         _idMasters[ packet->id ] = packet->masterID;
00714     }
00715     // else not found
00716 
00717     _requestHandler.serveRequest( packet->requestID );
00718     return COMMAND_HANDLED;
00719 }
00720 
00721 CommandResult Session::_cmdAttachObject( Command& command )
00722 {
00723     CHECK_THREAD( _receiverThread );
00724     const SessionAttachObjectPacket* packet = 
00725         command.getPacket<SessionAttachObjectPacket>();
00726     EQLOG( LOG_OBJECTS ) << "Cmd attach object " << packet << endl;
00727 
00728     Object* object =  static_cast<Object*>( _requestHandler.getRequestData( 
00729                                                 packet->requestID ));
00730     _attachObject( object, packet->objectID, packet->objectInstanceID );
00731     _requestHandler.serveRequest( packet->requestID );
00732     return COMMAND_HANDLED;
00733 }
00734 
00735 CommandResult Session::_cmdDetachObject( Command& command )
00736 {
00737     CHECK_THREAD( _receiverThread );
00738     const SessionDetachObjectPacket* packet = 
00739         command.getPacket<SessionDetachObjectPacket>();
00740     EQLOG( LOG_OBJECTS ) << "Cmd detach object " << packet << endl;
00741 
00742     const uint32_t id = packet->objectID;
00743     if( _objects.find( id ) != _objects.end( ))
00744     {
00745         ObjectVector& objects = _objects[id];
00746 
00747         for( ObjectVector::const_iterator i = objects.begin();
00748             i != objects.end(); ++i )
00749         {
00750             Object* object = *i;
00751             if( object->getInstanceID() == packet->objectInstanceID )
00752             {
00753                 _detachObject( object );
00754                 break;
00755             }
00756         }
00757     }
00758 
00759     if( packet->requestID != EQ_ID_INVALID )
00760         _requestHandler.serveRequest( packet->requestID );
00761 
00762     return COMMAND_HANDLED;
00763 }
00764 
00765 CommandResult Session::_cmdMapObject( Command& command )
00766 {
00767     CHECK_THREAD( _receiverThread );
00768     const SessionMapObjectPacket* packet = 
00769         command.getPacket< SessionMapObjectPacket >();
00770     EQLOG( LOG_OBJECTS ) << "Cmd map object " << packet << endl;
00771 
00772     Object* object = static_cast<Object*>( _requestHandler.getRequestData( 
00773                                                packet->requestID ));    
00774     EQASSERT( object );
00775     const uint32_t id = packet->objectID;
00776 
00777     if( !object->isMaster( ))
00778     { 
00779         EQASSERT( packet->masterNodeID != NodeID::ZERO );
00780         NodePtr master = _localNode->getNode( packet->masterNodeID );
00781 
00782         EQASSERTINFO( master.isValid(), "Master node for object id " << id
00783                       << " not connected" );
00784 
00785         _instanceIDs = ( _instanceIDs + 1 ) % IDPool::MAX_CAPACITY;
00786 
00787         // slave instantiation - subscribe first
00788         SessionSubscribeObjectPacket subscribePacket( packet );
00789         subscribePacket.instanceID = _instanceIDs;
00790 
00791         send( master, subscribePacket );
00792         return COMMAND_HANDLED;
00793     }
00794 
00795     _attachObject( object, id, EQ_ID_INVALID );
00796 
00797     EQLOG( LOG_OBJECTS ) << "mapped object id " << object->getID() << " @" 
00798                          << (void*)object << " is " << typeid(*object).name()
00799                          << endl;
00800 
00801     _requestHandler.serveRequest( packet->requestID, packet->version );
00802     return COMMAND_HANDLED;
00803 }
00804 
00805 CommandResult Session::_cmdSubscribeObject( Command& command )
00806 {
00807     CHECK_THREAD( _commandThread );
00808     SessionSubscribeObjectPacket* packet =
00809         command.getPacket<SessionSubscribeObjectPacket>();
00810     EQLOG( LOG_OBJECTS ) << "Cmd subscribe object " << packet << endl;
00811 
00812     NodePtr        node = command.getNode();
00813     const uint32_t id   = packet->objectID;
00814 
00815     Object* master = 0;
00816 
00817     _objectsMutex.set();
00818     if( _objects.find( id ) != _objects.end( ))
00819     {
00820         ObjectVector& objects = _objects[id];
00821 
00822         for( ObjectVector::const_iterator i = objects.begin();
00823              i != objects.end(); ++i )
00824         {
00825             Object* object = *i;
00826             if( object->isMaster( ))
00827             {
00828                 master = object;
00829                 break;
00830             }
00831         }
00832     }
00833     _objectsMutex.unset();
00834     
00835     SessionSubscribeObjectReplyPacket reply( packet );
00836 
00837     if( master )
00838     {
00839         // Check requested version
00840         const uint32_t version = packet->version;
00841         if( version == Object::VERSION_OLDEST || 
00842             version >= master->getOldestVersion( ))
00843         {
00844             SessionSubscribeObjectSuccessPacket successPacket( packet );
00845             successPacket.changeType       = master->getChangeType();
00846             successPacket.masterInstanceID = master->getInstanceID();
00847             send( node, successPacket );
00848         
00849             master->addSlave( node, packet->instanceID, version );
00850             reply.result = true;
00851         }
00852         else
00853         {
00854             EQWARN << "Version " << version << " no longer available" << endl;
00855             reply.result = false;
00856         }
00857     }
00858     else
00859     {
00860         EQWARN << "Can't find master object to subscribe " << id << endl;
00861         reply.result = false;
00862     }
00863 
00864     send( node, reply );
00865     return COMMAND_HANDLED;
00866 }
00867 
00868 CommandResult Session::_cmdSubscribeObjectSuccess( Command& command )
00869 {
00870     CHECK_THREAD( _receiverThread );
00871     const SessionSubscribeObjectSuccessPacket* packet = 
00872         command.getPacket<SessionSubscribeObjectSuccessPacket>();
00873     EQLOG( LOG_OBJECTS ) << "Cmd object subscribe success " << packet << endl;
00874 
00875     Object* object = static_cast<Object*>( _requestHandler.getRequestData( 
00876                                                packet->requestID ));    
00877     EQASSERT( object );
00878     EQASSERT( !object->isMaster( ));
00879 
00880     object->setupChangeManager( 
00881         static_cast< Object::ChangeType >( packet->changeType ), false, 
00882         packet->masterInstanceID );
00883 
00884     _attachObject( object, packet->objectID, packet->instanceID );
00885 
00886     EQLOG( LOG_OBJECTS ) << "subscribed object id " << object->getID() << '.'
00887                          << object->getInstanceID() << " cm " 
00888                          << typeid( *(object->_cm)).name() << " @" 
00889                          << static_cast< void* >( object ) << " is "
00890                          << typeid(*object).name() << endl;
00891     return COMMAND_HANDLED;
00892 }
00893 
00894 CommandResult Session::_cmdSubscribeObjectReply( Command& command )
00895 {
00896     CHECK_THREAD( _commandThread );
00897     const SessionSubscribeObjectReplyPacket* packet = 
00898         command.getPacket<SessionSubscribeObjectReplyPacket>();
00899     EQLOG( LOG_OBJECTS ) << "Cmd object subscribe reply " << packet << endl;
00900 
00901     EQASSERT( _requestHandler.getRequestData( packet->requestID ));
00902 
00903     if( !packet->requestID )
00904         EQWARN << "Could not subscribe object " << packet->objectID << endl;
00905 
00906     _requestHandler.serveRequest( packet->requestID, packet->version );
00907     return COMMAND_HANDLED;
00908 }
00909 
00910 CommandResult Session::_cmdUnsubscribeObject( Command& command )
00911 {
00912     CHECK_THREAD( _commandThread );
00913     SessionUnsubscribeObjectPacket* packet =
00914         command.getPacket<SessionUnsubscribeObjectPacket>();
00915     EQLOG( LOG_OBJECTS ) << "Cmd unsubscribe object  " << packet << endl;
00916 
00917     NodePtr        node = command.getNode();
00918     const uint32_t id   = packet->objectID;
00919 
00920     _objectsMutex.set();
00921     if( _objects.find( id ) != _objects.end( ))
00922     {
00923         ObjectVector& objects = _objects[id];
00924 
00925         for( ObjectVector::const_iterator i = objects.begin();
00926              i != objects.end(); ++ i )
00927         {
00928             Object* object = *i;
00929             if( object->isMaster() && 
00930                 object->getInstanceID() == packet->masterInstanceID )
00931             {
00932                 object->removeSlave( node );
00933                 break;
00934             }
00935         }   
00936     }
00937     _objectsMutex.unset();
00938 
00939     SessionDetachObjectPacket detachPacket( packet );
00940     send( node, detachPacket );
00941     return COMMAND_HANDLED;
00942 }
00943 
00944 std::ostream& operator << ( std::ostream& os, Session* session )
00945 {
00946     if( !session )
00947     {
00948         os << "NULL session";
00949         return os;
00950     }
00951     
00952     os << "session " << session->getID() << "(" << (void*)session
00953        << ")";
00954 
00955     return os;
00956 }
00957 }
00958 }
Generated on Mon Aug 10 18:58:41 2009 for Equalizer 0.9 by  doxygen 1.5.8