00001
00002
00003
00004
00005 #include "barrier.h"
00006
00007 #include "command.h"
00008 #include "connection.h"
00009 #include "dataIStream.h"
00010 #include "dataOStream.h"
00011 #include "log.h"
00012 #include "packets.h"
00013 #include "session.h"
00014
00015 using namespace eq::base;
00016 using namespace std;
00017
00018 namespace eq
00019 {
00020 namespace net
00021 {
00022 Barrier::Barrier( NodePtr master, const uint32_t height )
00023 : _masterID( master->getNodeID( ))
00024 , _height( height )
00025 , _master( master )
00026 {
00027 EQASSERT( _masterID != NodeID::ZERO );
00028 EQINFO << "New barrier of height " << _height << endl;
00029 }
00030
00031 Barrier::Barrier()
00032 {
00033 EQINFO << "Barrier instantiated" << endl;
00034 }
00035
00036 Barrier::~Barrier()
00037 {
00038 }
00039
00040
00041
00042
00043 void Barrier::getInstanceData( DataOStream& os )
00044 {
00045 os << _height << _masterID;
00046 }
00047
00048 void Barrier::applyInstanceData( DataIStream& is )
00049 {
00050 is >> _height >> _masterID;
00051 }
00052
00053 void Barrier::pack( DataOStream& os )
00054 {
00055 os << _height;
00056 }
00057
00058 void Barrier::unpack( DataIStream& is )
00059 {
00060 is >> _height;
00061 }
00062
00063
00064 void Barrier::attachToSession( const uint32_t id, const uint32_t instanceID,
00065 Session* session )
00066 {
00067 Object::attachToSession( id, instanceID, session );
00068
00069 CommandQueue* queue = session->getCommandThreadQueue();
00070
00071 registerCommand( CMD_BARRIER_ENTER,
00072 CommandFunc<Barrier>( this, &Barrier::_cmdEnter ),
00073 queue );
00074 registerCommand( CMD_BARRIER_ENTER_REPLY,
00075 CommandFunc<Barrier>( this, &Barrier::_cmdEnterReply ),
00076 queue );
00077 }
00078
00079 void Barrier::enter()
00080 {
00081 EQASSERT( _height > 0 );
00082 EQASSERT( _masterID != NodeID::ZERO );
00083
00084 if( _height == 1 )
00085 return;
00086
00087 if( !_master )
00088 {
00089 Session* session = getSession();
00090 NodePtr localNode = session->getLocalNode();
00091 _master = localNode->connect( _masterID, session->getServer( ));
00092 }
00093
00094 EQASSERT( _master.isValid( ));
00095 EQASSERT( _master->isConnected( ));
00096 EQLOG( LOG_BARRIER ) << "enter barrier " << getID() << " v" << getVersion()
00097 << ", height " << _height << endl;
00098 EQASSERT( getSession( ));
00099
00100 const uint32_t leaveVal = _leaveNotify.get() + 1;
00101
00102 BarrierEnterPacket packet;
00103 packet.version = getVersion();
00104 send( _master, packet );
00105
00106 _leaveNotify.waitEQ( leaveVal );
00107 EQLOG( LOG_BARRIER ) << "left barrier " << getID() << " v" << getVersion()
00108 << ", height " << _height << endl;
00109 }
00110
00111 CommandResult Barrier::_cmdEnter( Command& command )
00112 {
00113 CHECK_THREAD( _thread );
00114 EQASSERTINFO( !_master || _master == getSession()->getLocalNode( ),
00115 _master );
00116
00117 const BarrierEnterPacket* packet = command.getPacket<BarrierEnterPacket>();
00118
00119 EQLOG( LOG_BARRIER ) << "handle barrier enter " << packet << " barrier v"
00120 << getVersion() << endl;
00121
00122 const uint32_t version = packet->version;
00123 NodeVector& nodes = _enteredNodes[ packet->version ];
00124
00125 EQLOG( LOG_BARRIER ) << "enter barrier v" << version
00126 << ", has " << nodes.size() << " of " << _height
00127 << endl;
00128
00129 nodes.push_back( command.getNode( ));
00130
00131
00132
00133
00134
00135
00136
00137 if( version > getVersion( ))
00138 return COMMAND_DISCARD;
00139
00140 EQASSERT( version == getVersion( ));
00141
00142 if( nodes.size() < _height )
00143 return COMMAND_DISCARD;
00144
00145 EQASSERT( nodes.size() == _height );
00146 EQLOG( LOG_BARRIER ) << "Barrier reached" << endl;
00147
00148 BarrierEnterReplyPacket reply;
00149 reply.sessionID = getSession()->getID();
00150 reply.objectID = getID();
00151
00152 stde::usort( nodes );
00153
00154 for( NodeVector::iterator i = nodes.begin(); i != nodes.end(); ++i )
00155 {
00156 RefPtr< Node >& node = *i;
00157 if( node->isLocal( ))
00158 {
00159 EQLOG( LOG_BARRIER ) << "Unlock local user(s)" << endl;
00160 ++_leaveNotify;
00161 }
00162 else
00163 {
00164 EQLOG( LOG_BARRIER ) << "Unlock " << node << endl;
00165 node->send( reply );
00166 }
00167 }
00168
00169
00170 map< uint32_t, NodeVector >::iterator it = _enteredNodes.find( version );
00171 EQASSERT( it != _enteredNodes.end( ));
00172 _enteredNodes.erase( it );
00173
00174 return COMMAND_DISCARD;
00175 }
00176
00177 CommandResult Barrier::_cmdEnterReply( Command& command )
00178 {
00179 CHECK_THREAD( _thread );
00180 EQLOG( LOG_BARRIER ) << "Got ok, unlock local user(s)" << endl;
00181 ++_leaveNotify;
00182 return COMMAND_HANDLED;
00183 }
00184 }
00185 }