barrier.cpp

00001 
00002 /* Copyright (c) 2006-2009, Stefan Eilemann <eile@equalizergraphics.com> 
00003  *
00004  * This library is free software; you can redistribute it and/or modify it under
00005  * the terms of the GNU Lesser General Public License version 2.1 as published
00006  * by the Free Software Foundation.
00007  *  
00008  * This library is distributed in the hope that it will be useful, but WITHOUT
00009  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00010  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00011  * details.
00012  * 
00013  * You should have received a copy of the GNU Lesser General Public License
00014  * along with this library; if not, write to the Free Software Foundation, Inc.,
00015  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00016  */
00017 
00018 #include "barrier.h"
00019 
00020 #include "command.h"
00021 #include "connection.h"
00022 #include "dataIStream.h"
00023 #include "dataOStream.h"
00024 #include "log.h"
00025 #include "packets.h"
00026 #include "session.h"
00027 
00028 using namespace eq::base;
00029 using namespace std;
00030 
00031 namespace eq
00032 {
00033 namespace net
00034 {
00035 Barrier::Barrier( NodePtr master, const uint32_t height )
00036         : _masterID( master->getNodeID( ))
00037         , _height( height )
00038         , _master( master )
00039 {
00040     EQASSERT( _masterID != NodeID::ZERO );
00041     EQINFO << "New barrier of height " << _height << endl;
00042 }
00043 
00044 Barrier::Barrier()
00045 {
00046     EQINFO << "Barrier instantiated" << endl;
00047 }
00048 
00049 Barrier::~Barrier()
00050 {
00051 }
00052 
00053 //---------------------------------------------------------------------------
00054 // Serialization
00055 //---------------------------------------------------------------------------
00056 void Barrier::getInstanceData( DataOStream& os )
00057 {
00058     os << _height << _masterID;
00059 }
00060 
00061 void Barrier::applyInstanceData( DataIStream& is )
00062 {
00063     is >> _height >> _masterID;
00064 }
00065 
00066 void Barrier::pack( DataOStream& os )
00067 {
00068     os << _height;
00069 }
00070 
00071 void Barrier::unpack( DataIStream& is )
00072 {
00073     is >> _height;
00074 }
00075 //---------------------------------------------------------------------------
00076 
00077 void Barrier::attachToSession( const uint32_t id, const uint32_t instanceID, 
00078                                Session* session )
00079 {
00080     Object::attachToSession( id, instanceID, session );
00081 
00082     CommandQueue* queue = session->getCommandThreadQueue();
00083 
00084     registerCommand( CMD_BARRIER_ENTER, 
00085                      CommandFunc<Barrier>( this, &Barrier::_cmdEnter ),
00086                      queue );
00087     registerCommand( CMD_BARRIER_ENTER_REPLY, 
00088                      CommandFunc<Barrier>( this, &Barrier::_cmdEnterReply ),
00089                      queue );
00090 }
00091 
00092 void Barrier::enter()
00093 {
00094     EQASSERT( _height > 0 );
00095     EQASSERT( _masterID != NodeID::ZERO );
00096 
00097     if( _height == 1 ) // trivial ;)
00098         return;
00099 
00100     if( !_master )
00101     {
00102         Session* session   = getSession();
00103         NodePtr  localNode = session->getLocalNode();
00104         _master = localNode->connect( _masterID );
00105     }
00106 
00107     EQASSERT( _master.isValid( ));
00108     EQASSERT( _master->isConnected( ));
00109     EQLOG( LOG_BARRIER ) << "enter barrier " << getID() << " v" << getVersion()
00110                          << ", height " << _height << endl;
00111     EQASSERT( getSession( ));
00112 
00113     const uint32_t leaveVal = _leaveNotify.get() + 1;
00114 
00115     BarrierEnterPacket packet;
00116     packet.version = getVersion();
00117     send( _master, packet );
00118     
00119     _leaveNotify.waitEQ( leaveVal );
00120     EQLOG( LOG_BARRIER ) << "left barrier " << getID() << " v" << getVersion()
00121                          << ", height " << _height << endl;
00122 }
00123 
00124 CommandResult Barrier::_cmdEnter( Command& command )
00125 {
00126     CHECK_THREAD( _thread );
00127     EQASSERTINFO( !_master || _master == getSession()->getLocalNode( ),
00128                   _master );
00129 
00130     const BarrierEnterPacket* packet = command.getPacket<BarrierEnterPacket>();
00131 
00132     EQLOG( LOG_BARRIER ) << "handle barrier enter " << packet << " barrier v"
00133                          << getVersion() << endl;
00134 
00135     const uint32_t version = packet->version;
00136     NodeVector&    nodes   = _enteredNodes[ packet->version ];
00137 
00138     EQLOG( LOG_BARRIER ) << "enter barrier v" << version 
00139                          << ", has " << nodes.size() << " of " << _height
00140                          << endl;
00141 
00142     nodes.push_back( command.getNode( ));
00143 
00144     // If we got early entry requests for this barrier, just note their
00145     // appearance. This requires that another request for the later version
00146     // arrives once the barrier reaches this version. The only case when this is
00147     // not the case is when no contributor to the current version contributes to
00148     // the later version, in which case deadlocks might happen because the later
00149     // version never leaves the barrier. We simply assume this is not the case.
00150     if( version > getVersion( ))
00151         return COMMAND_DISCARD;
00152     
00153     EQASSERT( version == getVersion( ));
00154 
00155     if( nodes.size() < _height )
00156         return COMMAND_DISCARD;
00157 
00158     EQASSERT( nodes.size() == _height );
00159     EQLOG( LOG_BARRIER ) << "Barrier reached" << endl;
00160 
00161     BarrierEnterReplyPacket reply;
00162     reply.sessionID  = getSession()->getID();
00163     reply.objectID   = getID();
00164 
00165     stde::usort( nodes );
00166 
00167     for( NodeVector::iterator i = nodes.begin(); i != nodes.end(); ++i )
00168     {
00169         RefPtr< Node >& node = *i;
00170         if( node->isLocal( )) // OPT
00171         {
00172             EQLOG( LOG_BARRIER ) << "Unlock local user(s)" << endl;
00173             ++_leaveNotify;
00174         }
00175         else
00176         {
00177             EQLOG( LOG_BARRIER ) << "Unlock " << node << endl;
00178             node->send( reply );
00179         }
00180     }
00181 
00182     // delete node vector for version
00183     map< uint32_t, NodeVector >::iterator it = _enteredNodes.find( version );
00184     EQASSERT( it != _enteredNodes.end( ));
00185     _enteredNodes.erase( it );
00186 
00187     return COMMAND_DISCARD;
00188 }
00189 
00190 CommandResult Barrier::_cmdEnterReply( Command& command )
00191 {
00192     CHECK_THREAD( _thread );
00193     EQLOG( LOG_BARRIER ) << "Got ok, unlock local user(s)" << endl;
00194     ++_leaveNotify;
00195     return COMMAND_HANDLED;
00196 }
00197 }
00198 }
Generated on Sat Feb 6 12:59:41 2010 for Equalizer 0.9.1 by  doxygen 1.6.1