00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "session.h"
00019
00020 #include "barrier.h"
00021 #include "command.h"
00022 #include "connection.h"
00023 #include "connectionDescription.h"
00024 #include "log.h"
00025 #include "objectCM.h"
00026 #include "packets.h"
00027 #include "session.h"
00028
00029 #ifndef WIN32
00030 # include <alloca.h>
00031 #endif
00032
00033 using namespace eq::base;
00034 using namespace std;
00035
00036 namespace eq
00037 {
00038 namespace net
00039 {
00040 #define MIN_ID_RANGE 1024
00041
00042 Session::Session()
00043 : _requestHandler( true )
00044 , _id(EQ_ID_INVALID)
00045 , _isMaster(false)
00046 , _idPool( 0 )
00047 , _instanceIDs( 0 )
00048 {
00049 EQINFO << "New Session @" << (void*)this << endl;
00050 }
00051
00052 Session::~Session()
00053 {
00054 EQINFO << "Delete Session @" << (void*)this << endl;
00055 EQASSERTINFO( _id == EQ_ID_INVALID, "Session still mapped during deletion");
00056
00057 _id = EQ_ID_INVALID;
00058 _isMaster = false;
00059 _localNode = 0;
00060 _server = 0;
00061
00062 #ifndef NDEBUG
00063 if( !_objects.empty( ))
00064 {
00065 EQWARN << _objects.size() << " attached objects in destructor" << endl;
00066
00067 for( ObjectVectorHash::const_iterator i = _objects.begin();
00068 i != _objects.end(); ++i )
00069 {
00070 const ObjectVector& objects = i->second;
00071 EQWARN << " " << objects.size() << " objects with id "
00072 << i->first << endl;
00073
00074 for( ObjectVector::const_iterator j = objects.begin();
00075 j != objects.end(); ++j )
00076 {
00077 const Object* object = *j;
00078 EQINFO << " object type " << typeid(*object).name()
00079 << endl;
00080 }
00081 }
00082 }
00083 #endif
00084 _objects.clear();
00085 }
00086
00087 void Session::_setLocalNode( NodePtr node )
00088 {
00089 EQASSERT( _requestHandler.isEmpty( ));
00090
00091 _localNode = node;
00092 if( !_localNode.isValid( ))
00093 return;
00094
00095 notifyMapped( node );
00096 }
00097
00098 CommandQueue* Session::getCommandThreadQueue()
00099 {
00100 EQASSERT( _localNode.isValid( ));
00101 if( !_localNode )
00102 return 0;
00103
00104 return _localNode->getCommandThreadQueue();
00105 }
00106
00107 void Session::notifyMapped( NodePtr node )
00108 {
00109 EQASSERT( node.isValid( ));
00110
00111 CommandQueue* queue = node->getCommandThreadQueue();
00112
00113 registerCommand( CMD_SESSION_ACK_REQUEST,
00114 CommandFunc<Session>( this, &Session::_cmdAckRequest ), 0);
00115 registerCommand( CMD_SESSION_GEN_IDS,
00116 CommandFunc<Session>( this, &Session::_cmdGenIDs ),
00117 queue );
00118 registerCommand( CMD_SESSION_GEN_IDS_REPLY,
00119 CommandFunc<Session>( this, &Session::_cmdGenIDsReply ),
00120 queue );
00121 registerCommand( CMD_SESSION_SET_ID_MASTER,
00122 CommandFunc<Session>( this, &Session::_cmdSetIDMaster ),
00123 queue );
00124 registerCommand( CMD_SESSION_GET_ID_MASTER,
00125 CommandFunc<Session>( this, &Session::_cmdGetIDMaster ),
00126 queue );
00127 registerCommand( CMD_SESSION_GET_ID_MASTER_REPLY,
00128 CommandFunc<Session>( this, &Session::_cmdGetIDMasterReply ),
00129 queue );
00130 registerCommand( CMD_SESSION_ATTACH_OBJECT,
00131 CommandFunc<Session>( this, &Session::_cmdAttachObject ),
00132 0 );
00133 registerCommand( CMD_SESSION_DETACH_OBJECT,
00134 CommandFunc<Session>( this, &Session::_cmdDetachObject ),
00135 0 );
00136 registerCommand( CMD_SESSION_MAP_OBJECT,
00137 CommandFunc<Session>( this, &Session::_cmdMapObject ), 0 );
00138 registerCommand( CMD_SESSION_SUBSCRIBE_OBJECT,
00139 CommandFunc<Session>( this, &Session::_cmdSubscribeObject ),
00140 queue );
00141 registerCommand( CMD_SESSION_SUBSCRIBE_OBJECT_SUCCESS,
00142 CommandFunc<Session>( this, &Session::_cmdSubscribeObjectSuccess ),
00143 0 );
00144 registerCommand( CMD_SESSION_SUBSCRIBE_OBJECT_REPLY,
00145 CommandFunc<Session>( this, &Session::_cmdSubscribeObjectReply ),
00146 queue );
00147 registerCommand( CMD_SESSION_UNSUBSCRIBE_OBJECT,
00148 CommandFunc<Session>( this, &Session::_cmdUnsubscribeObject ),
00149 queue );
00150 }
00151
00152
00153
00154
00155 uint32_t Session::genIDs( const uint32_t range )
00156 {
00157 uint32_t id = _idPool.genIDs( range );
00158 if( id != EQ_ID_INVALID || _isMaster )
00159 {
00160 if( id == EQ_ID_INVALID )
00161 EQWARN << "Out of session identifiers" << std::endl;
00162 return id;
00163 }
00164
00165 SessionGenIDsPacket packet;
00166 packet.requestID = _requestHandler.registerRequest();
00167 packet.range = range;
00168
00169 send( packet );
00170 _requestHandler.waitRequest( packet.requestID, id );
00171
00172 if( id == EQ_ID_INVALID )
00173 EQWARN << "Out of session identifiers" << std::endl;
00174
00175 return id;
00176 }
00177
00178 void Session::freeIDs( const uint32_t start, const uint32_t range )
00179 {
00180 _idPool.freeIDs( start, range );
00181
00182 }
00183
00184
00185
00186
00187 void Session::setIDMaster( const uint32_t id, const NodeID& master )
00188 {
00189 CHECK_NOT_THREAD( _commandThread );
00190 _setIDMasterSync( _setIDMasterNB( id, master ));
00191 }
00192
00193 uint32_t Session::_setIDMasterNB( const uint32_t id, const NodeID& master )
00194 {
00195 CHECK_NOT_THREAD( _commandThread );
00196
00197 SessionSetIDMasterPacket packet;
00198 packet.id = id;
00199 packet.masterID = master;
00200 packet.masterID.convertToNetwork();
00201
00202 if( !_isMaster )
00203 _sendLocal( packet );
00204
00205 packet.requestID = _requestHandler.registerRequest();
00206 send( packet );
00207 return packet.requestID;
00208 }
00209
00210 void Session::_setIDMasterSync( const uint32_t requestID )
00211 {
00212 _requestHandler.waitRequest( requestID );
00213 }
00214
00215 const NodeID& Session::_pollIDMaster( const uint32_t id ) const
00216 {
00217 NodeIDHash::const_iterator i = _idMasters.find( id );
00218 if( i == _idMasters.end( ))
00219 return NodeID::ZERO;
00220
00221 return i->second;
00222 }
00223
00224 const NodeID& Session::getIDMaster( const uint32_t id )
00225 {
00226 _idMasterMutex.set();
00227 const NodeID& master = _pollIDMaster( id );
00228 _idMasterMutex.unset();
00229
00230 if( master != NodeID::ZERO || _isMaster )
00231 return master;
00232
00233
00234 SessionGetIDMasterPacket packet;
00235 packet.requestID = _requestHandler.registerRequest();
00236 packet.id = id;
00237
00238 send( packet );
00239 _requestHandler.waitRequest( packet.requestID );
00240
00241 ScopedMutex mutex( _idMasterMutex );
00242 EQLOG( LOG_OBJECTS ) << "Master node for id " << id << ": "
00243 << _pollIDMaster( id ) << endl;
00244 return _pollIDMaster( id );
00245 }
00246
00247
00248
00249
00250 void Session::attachObject( Object* object, const uint32_t id,
00251 const uint32_t instanceID )
00252 {
00253 EQASSERT( object );
00254 CHECK_NOT_THREAD( _receiverThread );
00255
00256 SessionAttachObjectPacket packet;
00257 packet.objectID = id;
00258 packet.objectInstanceID = instanceID;
00259 packet.requestID = _requestHandler.registerRequest( object );
00260
00261 _sendLocal( packet );
00262 _requestHandler.waitRequest( packet.requestID );
00263 }
00264
00265 void Session::_attachObject( Object* object, const uint32_t id,
00266 const uint32_t inInstanceID )
00267 {
00268 EQASSERT( object );
00269 CHECK_THREAD( _receiverThread );
00270
00271 uint32_t instanceID = inInstanceID;
00272 if( inInstanceID == EQ_ID_INVALID )
00273 {
00274 _instanceIDs = ( _instanceIDs + 1 ) % IDPool::MAX_CAPACITY;
00275 instanceID = _instanceIDs;
00276 }
00277 EQASSERT( instanceID <= _instanceIDs );
00278
00279 object->attachToSession( id, instanceID, this );
00280
00281 _objectsMutex.set();
00282 ObjectVector& objects = _objects[ id ];
00283 objects.push_back( object );
00284 _objectsMutex.unset();
00285
00286 getLocalNode()->flushCommands();
00287 EQLOG( LOG_OBJECTS ) << "Attached " << typeid( *object ).name()
00288 << " to id " << id << endl;
00289 }
00290
00291 void Session::detachObject( Object* object )
00292 {
00293 EQASSERT( object );
00294 CHECK_NOT_THREAD( _receiverThread );
00295
00296 SessionDetachObjectPacket packet;
00297 packet.requestID = _requestHandler.registerRequest();
00298 packet.objectID = object->getID();
00299 packet.objectInstanceID = object->getInstanceID();
00300
00301 _sendLocal( packet );
00302 _requestHandler.waitRequest( packet.requestID );
00303 }
00304
00305 void Session::_detachObject( Object* object )
00306 {
00307 EQASSERT( object );
00308 CHECK_THREAD( _receiverThread );
00309
00310 const uint32_t id = object->getID();
00311 EQASSERT( id != EQ_ID_INVALID );
00312 EQASSERT( _objects.find( id ) != _objects.end( ));
00313
00314 EQLOG( LOG_OBJECTS ) << "Detach " << typeid( *object ).name()
00315 << " from id " << id << endl;
00316
00317 ObjectVector& objects = _objects[ id ];
00318 ObjectVector::iterator iter = find( objects.begin(),objects.end(), object );
00319 EQASSERT( iter != objects.end( ));
00320
00321 _objectsMutex.set();
00322
00323 objects.erase( iter );
00324 if( objects.empty( ))
00325 _objects.erase( id );
00326
00327 _objectsMutex.unset();
00328 EQASSERT( object->getInstanceID() != EQ_ID_INVALID );
00329
00330 object->detachFromSession();
00331 return;
00332 }
00333
00334 bool Session::mapObject( Object* object, const uint32_t id,
00335 const uint32_t version )
00336 {
00337 const uint32_t requestID = mapObjectNB( object, id, version );
00338 return mapObjectSync( requestID );
00339 }
00340
00341 uint32_t Session::mapObjectNB( Object* object, const uint32_t id,
00342 const uint32_t version )
00343 {
00344 EQASSERT( object );
00345 EQLOG( LOG_OBJECTS ) << "Mapping " << typeid( *object ).name() << " to id "
00346 << id << " version " << version << endl;
00347
00348 EQASSERT( object->getID() == EQ_ID_INVALID );
00349 EQASSERT( id != EQ_ID_INVALID );
00350 EQASSERT( !_localNode->inCommandThread( ));
00351
00352 NodeID masterNodeID;
00353 if( !object->isMaster( ))
00354 {
00355
00356 masterNodeID = getIDMaster( id );
00357 if( masterNodeID == NodeID::ZERO )
00358 {
00359 EQWARN << "Can't find master node for object id " << id << endl;
00360 return EQ_ID_INVALID;
00361 }
00362
00363 NodePtr master = _localNode->connect( masterNodeID );
00364 if( !master || master->getState() == Node::STATE_STOPPED )
00365 {
00366 EQWARN << "Can't connect master node with id " << masterNodeID
00367 << " for object id " << id << endl;
00368 return EQ_ID_INVALID;
00369 }
00370 }
00371
00372 SessionMapObjectPacket packet;
00373 packet.requestID = _requestHandler.registerRequest( object );
00374 packet.objectID = id;
00375 packet.version = version;
00376 packet.masterNodeID = masterNodeID;
00377
00378 _sendLocal( packet );
00379 return packet.requestID;
00380 }
00381
00382 bool Session::mapObjectSync( const uint32_t requestID )
00383 {
00384 if( requestID == EQ_ID_INVALID )
00385 return false;
00386
00387 void* data = _requestHandler.getRequestData( requestID );
00388 if( data == 0 )
00389 return false;
00390
00391 Object* object = EQSAFECAST( Object*, data );
00392
00393 uint32_t version = Object::VERSION_NONE;
00394 _requestHandler.waitRequest( requestID, version );
00395
00396 const bool mapped = ( object->getID() != EQ_ID_INVALID );
00397 if( mapped && !object->isMaster( ))
00398 {
00399 object->_cm->applyMapData();
00400 if( version != Object::VERSION_OLDEST &&
00401 version != Object::VERSION_NONE )
00402 {
00403 object->sync( version );
00404 }
00405 }
00406
00407 EQLOG( LOG_OBJECTS ) << "Mapped " << typeid( *object ).name() << " to id "
00408 << object->getID() << endl;
00409 return mapped;
00410 }
00411
00412 void Session::unmapObject( Object* object )
00413 {
00414 const uint32_t id = object->getID();
00415 if( id == EQ_ID_INVALID )
00416 return;
00417
00418 EQLOG( LOG_OBJECTS ) << "Unmap " << typeid( *object ).name() << " from id "
00419 << object->getID() << endl;
00420
00421
00422 if( !object->isMaster( ))
00423 {
00424 CHECK_NOT_THREAD( _commandThread );
00425 const uint32_t masterInstanceID = object->getMasterInstanceID();
00426 if( masterInstanceID != EQ_ID_INVALID )
00427 {
00428 _idMasterMutex.set();
00429 const NodeID& masterNodeID = _pollIDMaster( id );
00430 _idMasterMutex.unset();
00431
00432 NodePtr localNode = _localNode;
00433 NodePtr master = localNode.isValid() ?
00434 localNode->getNode( masterNodeID ) : 0;
00435 if( master.isValid( ))
00436 {
00437 SessionUnsubscribeObjectPacket packet;
00438 packet.requestID = _requestHandler.registerRequest();
00439 packet.objectID = id;
00440 packet.masterInstanceID = masterInstanceID;
00441 packet.slaveInstanceID = object->getInstanceID();
00442 send( master, packet );
00443
00444 _requestHandler.waitRequest( packet.requestID );
00445 return;
00446 }
00447 EQERROR << "Master node for object id " << id << " not connected"
00448 << endl;
00449 }
00450 }
00451
00452
00453 detachObject( object );
00454 }
00455
00456 bool Session::registerObject( Object* object )
00457 {
00458 EQASSERT( object->getID() == EQ_ID_INVALID );
00459
00460 const uint32_t id = genIDs( 1 );
00461 EQASSERT( id != EQ_ID_INVALID );
00462 if( id == EQ_ID_INVALID )
00463 return false;
00464
00465 const uint32_t requestID = _setIDMasterNB( id, _localNode->getNodeID( ));
00466 object->setupChangeManager( object->getChangeType(), true );
00467
00468 EQCHECK( mapObject( object, id ));
00469 _setIDMasterSync( requestID );
00470 EQLOG( LOG_OBJECTS ) << "Registered " << typeid( *object ).name()
00471 << " to id " << id << endl;
00472 return true;
00473 }
00474
00475 void Session::deregisterObject( Object* object )
00476 {
00477 const uint32_t id = object->getID();
00478
00479 EQLOG( LOG_OBJECTS ) << "Deregister " << typeid( *object ).name()
00480 << " from id " << id << endl;
00481
00482
00483 unmapObject( object );
00484 freeIDs( id, 1 );
00485 }
00486
00487
00488
00489
00490 bool Session::dispatchCommand( Command& command )
00491 {
00492 EQVERB << "dispatch " << command << endl;
00493 EQASSERT( command.isValid( ));
00494 CHECK_THREAD( _receiverThread );
00495
00496 switch( command->datatype )
00497 {
00498 case DATATYPE_EQNET_SESSION:
00499 return Dispatcher::dispatchCommand( command );
00500
00501 case DATATYPE_EQNET_OBJECT:
00502 {
00503 EQASSERT( command.isValid( ));
00504 const ObjectPacket* objPacket = command.getPacket<ObjectPacket>();
00505 const uint32_t id = objPacket->objectID;
00506
00507
00508 if( _objects.find( id ) == _objects.end( ))
00509 {
00510 EQVERB << "no objects to dispatch command, redispatching "
00511 << objPacket << endl;
00512 return false;
00513 }
00514 EQASSERTINFO( !_objects[id].empty(), id );
00515
00516 Object* object = _objects[id][0];
00517 EQASSERT( object );
00518
00519 return object->dispatchCommand( command );
00520 }
00521
00522 default:
00523 EQABORT( "Unknown datatype " << command->datatype << " for "
00524 << command );
00525 return true;
00526 }
00527 }
00528
00529 CommandResult Session::invokeCommand( Command& command )
00530 {
00531 EQVERB << "invoke " << command << endl;
00532 EQASSERT( command.isValid( ));
00533
00534 switch( command->datatype )
00535 {
00536 case DATATYPE_EQNET_SESSION:
00537 return Dispatcher::invokeCommand( command );
00538
00539 case DATATYPE_EQNET_OBJECT:
00540 return _invokeObjectCommand( command );
00541
00542 default:
00543 EQWARN << "Unhandled command " << command << endl;
00544 return COMMAND_ERROR;
00545 }
00546 }
00547
00548 CommandResult Session::_invokeObjectCommand( Command& command )
00549 {
00550 EQASSERT( command.isValid( ));
00551 const ObjectPacket* objPacket = command.getPacket<ObjectPacket>();
00552 const uint32_t id = objPacket->objectID;
00553
00554 _objectsMutex.set();
00555
00556 EQASSERTINFO( _objects.find( id ) != _objects.end(),
00557 "No objects to handle command " << objPacket );
00558
00559
00560 ObjectVector objects = _objects[id];
00561 EQASSERTINFO( !objects.empty(), objPacket );
00562
00563 _objectsMutex.unset();
00564
00565 for( ObjectVector::const_iterator i = objects.begin();
00566 i != objects.end(); ++ i )
00567 {
00568 Object* object = *i;
00569 const bool isInstance =
00570 ( objPacket->instanceID == object->getInstanceID( ));
00571
00572 if( objPacket->instanceID == EQ_ID_ANY || isInstance )
00573 {
00574 if( !command.isValid( ))
00575 {
00576
00577
00578
00579
00580 EQERROR << "Object of type " << typeid(*object).name()
00581 << " invalidated command send all instances" << endl;
00582 return COMMAND_ERROR;
00583 }
00584
00585 const CommandResult result = object->invokeCommand( command );
00586 switch( result )
00587 {
00588 case COMMAND_DISCARD:
00589 return COMMAND_DISCARD;
00590
00591 case COMMAND_ERROR:
00592 EQERROR << "Error handling command " << objPacket
00593 << " for object of type " << typeid(*object).name()
00594 << endl;
00595 return COMMAND_ERROR;
00596
00597 case COMMAND_HANDLED:
00598 if( isInstance )
00599 return result;
00600 break;
00601
00602 default:
00603 EQUNREACHABLE;
00604 }
00605 }
00606 }
00607 if( objPacket->instanceID == EQ_ID_ANY )
00608 return COMMAND_HANDLED;
00609
00610 EQWARN << "instance not found for " << objPacket << endl;
00611 return COMMAND_ERROR;
00612 }
00613
00614 CommandResult Session::_cmdAckRequest( Command& command )
00615 {
00616 const SessionAckRequestPacket* packet =
00617 command.getPacket<SessionAckRequestPacket>();
00618 EQASSERT( packet->requestID != EQ_ID_INVALID );
00619
00620 _requestHandler.serveRequest( packet->requestID );
00621 return COMMAND_HANDLED;
00622 }
00623
00624
00625 CommandResult Session::_cmdGenIDs( Command& command )
00626 {
00627 CHECK_THREAD( _commandThread );
00628 const SessionGenIDsPacket* packet =command.getPacket<SessionGenIDsPacket>();
00629 EQVERB << "Cmd gen IDs: " << packet << endl;
00630
00631 SessionGenIDsReplyPacket reply( packet );
00632 const uint32_t range = EQ_MAX( packet->range, MIN_ID_RANGE );
00633
00634 reply.id = _idPool.genIDs( range );
00635 reply.allocated = range;
00636 send( command.getNode(), reply );
00637 return COMMAND_HANDLED;
00638 }
00639
00640 CommandResult Session::_cmdGenIDsReply( Command& command )
00641 {
00642 CHECK_THREAD( _commandThread );
00643 const SessionGenIDsReplyPacket* packet =
00644 command.getPacket<SessionGenIDsReplyPacket>();
00645 EQVERB << "Cmd gen IDs reply: " << packet << endl;
00646
00647 _requestHandler.serveRequest( packet->requestID, packet->id );
00648
00649 const size_t additional = packet->allocated - packet->requested;
00650 if( packet->id != EQ_ID_INVALID && additional > 0 )
00651
00652 _idPool.freeIDs( packet->id + packet->requested, additional );
00653
00654 return COMMAND_HANDLED;
00655 }
00656
00657 CommandResult Session::_cmdSetIDMaster( Command& command )
00658 {
00659 CHECK_THREAD( _commandThread );
00660 const SessionSetIDMasterPacket* packet =
00661 command.getPacket<SessionSetIDMasterPacket>();
00662 EQLOG( LOG_OBJECTS ) << "Cmd set ID master: " << packet << endl;
00663
00664 NodeID nodeID = packet->masterID;
00665 nodeID.convertToHost();
00666 EQASSERT( nodeID != NodeID::ZERO );
00667
00668 ScopedMutex mutex( _idMasterMutex );
00669 _idMasters[ packet->id ] = nodeID;
00670
00671 if( packet->requestID != EQ_ID_INVALID )
00672 {
00673 NodePtr node = command.getNode();
00674
00675 if( node == _localNode )
00676 _requestHandler.serveRequest( packet->requestID );
00677 else
00678 {
00679 SessionAckRequestPacket reply( packet->requestID );
00680 send( node, reply );
00681 }
00682 }
00683 return COMMAND_HANDLED;
00684 }
00685
00686 CommandResult Session::_cmdGetIDMaster( Command& command )
00687 {
00688 CHECK_THREAD( _commandThread );
00689 const SessionGetIDMasterPacket* packet =
00690 command.getPacket<SessionGetIDMasterPacket>();
00691 EQLOG( LOG_OBJECTS ) << "handle get idMaster " << packet << endl;
00692
00693 SessionGetIDMasterReplyPacket reply( packet );
00694 reply.masterID = _pollIDMaster( packet->id );
00695
00696 send( command.getNode(), reply );
00697 return COMMAND_HANDLED;
00698 }
00699
00700 CommandResult Session::_cmdGetIDMasterReply( Command& command )
00701 {
00702 CHECK_THREAD( _commandThread );
00703 const SessionGetIDMasterReplyPacket* packet =
00704 command.getPacket<SessionGetIDMasterReplyPacket>();
00705 EQLOG( LOG_OBJECTS ) << "handle get idMaster reply " << packet << endl;
00706
00707 NodeID nodeID = packet->masterID;
00708 nodeID.convertToHost();
00709
00710 if( nodeID != NodeID::ZERO )
00711 {
00712 ScopedMutex mutex( _idMasterMutex );
00713 _idMasters[ packet->id ] = packet->masterID;
00714 }
00715
00716
00717 _requestHandler.serveRequest( packet->requestID );
00718 return COMMAND_HANDLED;
00719 }
00720
00721 CommandResult Session::_cmdAttachObject( Command& command )
00722 {
00723 CHECK_THREAD( _receiverThread );
00724 const SessionAttachObjectPacket* packet =
00725 command.getPacket<SessionAttachObjectPacket>();
00726 EQLOG( LOG_OBJECTS ) << "Cmd attach object " << packet << endl;
00727
00728 Object* object = static_cast<Object*>( _requestHandler.getRequestData(
00729 packet->requestID ));
00730 _attachObject( object, packet->objectID, packet->objectInstanceID );
00731 _requestHandler.serveRequest( packet->requestID );
00732 return COMMAND_HANDLED;
00733 }
00734
00735 CommandResult Session::_cmdDetachObject( Command& command )
00736 {
00737 CHECK_THREAD( _receiverThread );
00738 const SessionDetachObjectPacket* packet =
00739 command.getPacket<SessionDetachObjectPacket>();
00740 EQLOG( LOG_OBJECTS ) << "Cmd detach object " << packet << endl;
00741
00742 const uint32_t id = packet->objectID;
00743 if( _objects.find( id ) != _objects.end( ))
00744 {
00745 ObjectVector& objects = _objects[id];
00746
00747 for( ObjectVector::const_iterator i = objects.begin();
00748 i != objects.end(); ++i )
00749 {
00750 Object* object = *i;
00751 if( object->getInstanceID() == packet->objectInstanceID )
00752 {
00753 _detachObject( object );
00754 break;
00755 }
00756 }
00757 }
00758
00759 if( packet->requestID != EQ_ID_INVALID )
00760 _requestHandler.serveRequest( packet->requestID );
00761
00762 return COMMAND_HANDLED;
00763 }
00764
00765 CommandResult Session::_cmdMapObject( Command& command )
00766 {
00767 CHECK_THREAD( _receiverThread );
00768 const SessionMapObjectPacket* packet =
00769 command.getPacket< SessionMapObjectPacket >();
00770 EQLOG( LOG_OBJECTS ) << "Cmd map object " << packet << endl;
00771
00772 Object* object = static_cast<Object*>( _requestHandler.getRequestData(
00773 packet->requestID ));
00774 EQASSERT( object );
00775 const uint32_t id = packet->objectID;
00776
00777 if( !object->isMaster( ))
00778 {
00779 EQASSERT( packet->masterNodeID != NodeID::ZERO );
00780 NodePtr master = _localNode->getNode( packet->masterNodeID );
00781
00782 EQASSERTINFO( master.isValid(), "Master node for object id " << id
00783 << " not connected" );
00784
00785 _instanceIDs = ( _instanceIDs + 1 ) % IDPool::MAX_CAPACITY;
00786
00787
00788 SessionSubscribeObjectPacket subscribePacket( packet );
00789 subscribePacket.instanceID = _instanceIDs;
00790
00791 send( master, subscribePacket );
00792 return COMMAND_HANDLED;
00793 }
00794
00795 _attachObject( object, id, EQ_ID_INVALID );
00796
00797 EQLOG( LOG_OBJECTS ) << "mapped object id " << object->getID() << " @"
00798 << (void*)object << " is " << typeid(*object).name()
00799 << endl;
00800
00801 _requestHandler.serveRequest( packet->requestID, packet->version );
00802 return COMMAND_HANDLED;
00803 }
00804
00805 CommandResult Session::_cmdSubscribeObject( Command& command )
00806 {
00807 CHECK_THREAD( _commandThread );
00808 SessionSubscribeObjectPacket* packet =
00809 command.getPacket<SessionSubscribeObjectPacket>();
00810 EQLOG( LOG_OBJECTS ) << "Cmd subscribe object " << packet << endl;
00811
00812 NodePtr node = command.getNode();
00813 const uint32_t id = packet->objectID;
00814
00815 Object* master = 0;
00816
00817 _objectsMutex.set();
00818 if( _objects.find( id ) != _objects.end( ))
00819 {
00820 ObjectVector& objects = _objects[id];
00821
00822 for( ObjectVector::const_iterator i = objects.begin();
00823 i != objects.end(); ++i )
00824 {
00825 Object* object = *i;
00826 if( object->isMaster( ))
00827 {
00828 master = object;
00829 break;
00830 }
00831 }
00832 }
00833 _objectsMutex.unset();
00834
00835 SessionSubscribeObjectReplyPacket reply( packet );
00836
00837 if( master )
00838 {
00839
00840 const uint32_t version = packet->version;
00841 if( version == Object::VERSION_OLDEST ||
00842 version >= master->getOldestVersion( ))
00843 {
00844 SessionSubscribeObjectSuccessPacket successPacket( packet );
00845 successPacket.changeType = master->getChangeType();
00846 successPacket.masterInstanceID = master->getInstanceID();
00847 send( node, successPacket );
00848
00849 master->addSlave( node, packet->instanceID, version );
00850 reply.result = true;
00851 }
00852 else
00853 {
00854 EQWARN << "Version " << version << " no longer available" << endl;
00855 reply.result = false;
00856 }
00857 }
00858 else
00859 {
00860 EQWARN << "Can't find master object to subscribe " << id << endl;
00861 reply.result = false;
00862 }
00863
00864 send( node, reply );
00865 return COMMAND_HANDLED;
00866 }
00867
00868 CommandResult Session::_cmdSubscribeObjectSuccess( Command& command )
00869 {
00870 CHECK_THREAD( _receiverThread );
00871 const SessionSubscribeObjectSuccessPacket* packet =
00872 command.getPacket<SessionSubscribeObjectSuccessPacket>();
00873 EQLOG( LOG_OBJECTS ) << "Cmd object subscribe success " << packet << endl;
00874
00875 Object* object = static_cast<Object*>( _requestHandler.getRequestData(
00876 packet->requestID ));
00877 EQASSERT( object );
00878 EQASSERT( !object->isMaster( ));
00879
00880 object->setupChangeManager(
00881 static_cast< Object::ChangeType >( packet->changeType ), false,
00882 packet->masterInstanceID );
00883
00884 _attachObject( object, packet->objectID, packet->instanceID );
00885
00886 EQLOG( LOG_OBJECTS ) << "subscribed object id " << object->getID() << '.'
00887 << object->getInstanceID() << " cm "
00888 << typeid( *(object->_cm)).name() << " @"
00889 << static_cast< void* >( object ) << " is "
00890 << typeid(*object).name() << endl;
00891 return COMMAND_HANDLED;
00892 }
00893
00894 CommandResult Session::_cmdSubscribeObjectReply( Command& command )
00895 {
00896 CHECK_THREAD( _commandThread );
00897 const SessionSubscribeObjectReplyPacket* packet =
00898 command.getPacket<SessionSubscribeObjectReplyPacket>();
00899 EQLOG( LOG_OBJECTS ) << "Cmd object subscribe reply " << packet << endl;
00900
00901 EQASSERT( _requestHandler.getRequestData( packet->requestID ));
00902
00903 if( !packet->requestID )
00904 EQWARN << "Could not subscribe object " << packet->objectID << endl;
00905
00906 _requestHandler.serveRequest( packet->requestID, packet->version );
00907 return COMMAND_HANDLED;
00908 }
00909
00910 CommandResult Session::_cmdUnsubscribeObject( Command& command )
00911 {
00912 CHECK_THREAD( _commandThread );
00913 SessionUnsubscribeObjectPacket* packet =
00914 command.getPacket<SessionUnsubscribeObjectPacket>();
00915 EQLOG( LOG_OBJECTS ) << "Cmd unsubscribe object " << packet << endl;
00916
00917 NodePtr node = command.getNode();
00918 const uint32_t id = packet->objectID;
00919
00920 _objectsMutex.set();
00921 if( _objects.find( id ) != _objects.end( ))
00922 {
00923 ObjectVector& objects = _objects[id];
00924
00925 for( ObjectVector::const_iterator i = objects.begin();
00926 i != objects.end(); ++ i )
00927 {
00928 Object* object = *i;
00929 if( object->isMaster() &&
00930 object->getInstanceID() == packet->masterInstanceID )
00931 {
00932 object->removeSlave( node );
00933 break;
00934 }
00935 }
00936 }
00937 _objectsMutex.unset();
00938
00939 SessionDetachObjectPacket detachPacket( packet );
00940 send( node, detachPacket );
00941 return COMMAND_HANDLED;
00942 }
00943
00944 std::ostream& operator << ( std::ostream& os, Session* session )
00945 {
00946 if( !session )
00947 {
00948 os << "NULL session";
00949 return os;
00950 }
00951
00952 os << "session " << session->getID() << "(" << (void*)session
00953 << ")";
00954
00955 return os;
00956 }
00957 }
00958 }