barrier.cpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "barrier.h"
00019
00020 #include "command.h"
00021 #include "connection.h"
00022 #include "dataIStream.h"
00023 #include "dataOStream.h"
00024 #include "log.h"
00025 #include "packets.h"
00026 #include "session.h"
00027
00028 using namespace eq::base;
00029 using namespace std;
00030
00031 namespace eq
00032 {
00033 namespace net
00034 {
00035 Barrier::Barrier( NodePtr master, const uint32_t height )
00036 : _masterID( master->getNodeID( ))
00037 , _height( height )
00038 , _master( master )
00039 {
00040 EQASSERT( _masterID != NodeID::ZERO );
00041 EQINFO << "New barrier of height " << _height << endl;
00042 }
00043
00044 Barrier::Barrier()
00045 {
00046 EQINFO << "Barrier instantiated" << endl;
00047 }
00048
00049 Barrier::~Barrier()
00050 {
00051 }
00052
00053
00054
00055
00056 void Barrier::getInstanceData( DataOStream& os )
00057 {
00058 os << _height << _masterID;
00059 }
00060
00061 void Barrier::applyInstanceData( DataIStream& is )
00062 {
00063 is >> _height >> _masterID;
00064 }
00065
00066 void Barrier::pack( DataOStream& os )
00067 {
00068 os << _height;
00069 }
00070
00071 void Barrier::unpack( DataIStream& is )
00072 {
00073 is >> _height;
00074 }
00075
00076
00077 void Barrier::attachToSession( const uint32_t id, const uint32_t instanceID,
00078 Session* session )
00079 {
00080 Object::attachToSession( id, instanceID, session );
00081
00082 CommandQueue* queue = session->getCommandThreadQueue();
00083
00084 registerCommand( CMD_BARRIER_ENTER,
00085 CommandFunc<Barrier>( this, &Barrier::_cmdEnter ),
00086 queue );
00087 registerCommand( CMD_BARRIER_ENTER_REPLY,
00088 CommandFunc<Barrier>( this, &Barrier::_cmdEnterReply ),
00089 queue );
00090 }
00091
00092 void Barrier::enter()
00093 {
00094 EQASSERT( _height > 0 );
00095 EQASSERT( _masterID != NodeID::ZERO );
00096
00097 if( _height == 1 )
00098 return;
00099
00100 if( !_master )
00101 {
00102 Session* session = getSession();
00103 NodePtr localNode = session->getLocalNode();
00104 _master = localNode->connect( _masterID );
00105 }
00106
00107 EQASSERT( _master.isValid( ));
00108 EQASSERT( _master->isConnected( ));
00109 EQLOG( LOG_BARRIER ) << "enter barrier " << getID() << " v" << getVersion()
00110 << ", height " << _height << endl;
00111 EQASSERT( getSession( ));
00112
00113 const uint32_t leaveVal = _leaveNotify.get() + 1;
00114
00115 BarrierEnterPacket packet;
00116 packet.version = getVersion();
00117 send( _master, packet );
00118
00119 _leaveNotify.waitEQ( leaveVal );
00120 EQLOG( LOG_BARRIER ) << "left barrier " << getID() << " v" << getVersion()
00121 << ", height " << _height << endl;
00122 }
00123
00124 CommandResult Barrier::_cmdEnter( Command& command )
00125 {
00126 CHECK_THREAD( _thread );
00127 EQASSERTINFO( !_master || _master == getSession()->getLocalNode( ),
00128 _master );
00129
00130 const BarrierEnterPacket* packet = command.getPacket<BarrierEnterPacket>();
00131
00132 EQLOG( LOG_BARRIER ) << "handle barrier enter " << packet << " barrier v"
00133 << getVersion() << endl;
00134
00135 const uint32_t version = packet->version;
00136 NodeVector& nodes = _enteredNodes[ packet->version ];
00137
00138 EQLOG( LOG_BARRIER ) << "enter barrier v" << version
00139 << ", has " << nodes.size() << " of " << _height
00140 << endl;
00141
00142 nodes.push_back( command.getNode( ));
00143
00144
00145
00146
00147
00148
00149
00150 if( version > getVersion( ))
00151 return COMMAND_DISCARD;
00152
00153 EQASSERT( version == getVersion( ));
00154
00155 if( nodes.size() < _height )
00156 return COMMAND_DISCARD;
00157
00158 EQASSERT( nodes.size() == _height );
00159 EQLOG( LOG_BARRIER ) << "Barrier reached" << endl;
00160
00161 BarrierEnterReplyPacket reply;
00162 reply.sessionID = getSession()->getID();
00163 reply.objectID = getID();
00164
00165 stde::usort( nodes );
00166
00167 for( NodeVector::iterator i = nodes.begin(); i != nodes.end(); ++i )
00168 {
00169 RefPtr< Node >& node = *i;
00170 if( node->isLocal( ))
00171 {
00172 EQLOG( LOG_BARRIER ) << "Unlock local user(s)" << endl;
00173 ++_leaveNotify;
00174 }
00175 else
00176 {
00177 EQLOG( LOG_BARRIER ) << "Unlock " << node << endl;
00178 node->send( reply );
00179 }
00180 }
00181
00182
00183 map< uint32_t, NodeVector >::iterator it = _enteredNodes.find( version );
00184 EQASSERT( it != _enteredNodes.end( ));
00185 _enteredNodes.erase( it );
00186
00187 return COMMAND_DISCARD;
00188 }
00189
00190 CommandResult Barrier::_cmdEnterReply( Command& command )
00191 {
00192 CHECK_THREAD( _thread );
00193 EQLOG( LOG_BARRIER ) << "Got ok, unlock local user(s)" << endl;
00194 ++_leaveNotify;
00195 return COMMAND_HANDLED;
00196 }
00197 }
00198 }