lib/client/node.cpp

00001 
00002 /* Copyright (c) 2005-2009, Stefan Eilemann <eile@equalizergraphics.com> 
00003  *
00004  * This library is free software; you can redistribute it and/or modify it under
00005  * the terms of the GNU Lesser General Public License version 2.1 as published
00006  * by the Free Software Foundation.
00007  *  
00008  * This library is distributed in the hope that it will be useful, but WITHOUT
00009  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00010  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00011  * details.
00012  * 
00013  * You should have received a copy of the GNU Lesser General Public License
00014  * along with this library; if not, write to the Free Software Foundation, Inc.,
00015  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00016  */
00017 
00018 #include <pthread.h>
00019 
00020 #include "node.h"
00021 
00022 #include "client.h"
00023 #include "commands.h"
00024 #include "config.h"
00025 #include "frameData.h"
00026 #include "global.h"
00027 #include "log.h"
00028 #include "nodeFactory.h"
00029 #include "nodeVisitor.h"
00030 #include "packets.h"
00031 #include "pipe.h"
00032 #include "server.h"
00033 #include "task.h"
00034 
00035 #ifdef AGL
00036 #  include "aglEventHandler.h"
00037 #endif
00038 
00039 #include <eq/base/scopedMutex.h>
00040 #include <eq/net/command.h>
00041 #include <eq/net/connection.h>
00042 
00043 using namespace eq::base;
00044 using namespace std;
00045 
00046 namespace eq
00047 {
00049 typedef net::CommandFunc<Node> NodeFunc;
00052 #define MAKE_ATTR_STRING( attr ) ( string("EQ_NODE_") + #attr )
00053 std::string Node::_iAttributeStrings[IATTR_ALL] = {
00054     MAKE_ATTR_STRING( IATTR_THREAD_MODEL ),
00055     MAKE_ATTR_STRING( IATTR_FILL1 ),
00056     MAKE_ATTR_STRING( IATTR_FILL2 )
00057 };
00058 
00059 Node::Node( Config* parent )
00060         : transmitter( this )
00061         , _config( parent )
00062         , _tasks( TASK_NONE )
00063         , _state( STATE_STOPPED )
00064         , _unlockedFrame( 0 )
00065         , _finishedFrame( 0 )
00066 {
00067     parent->_addNode( this );
00068     EQINFO << " New eq::Node @" << (void*)this << endl;
00069 }
00070 
00071 Node::~Node()
00072 {
00073     _config->_removeNode( this );
00074 
00075     EQINFO << " Delete eq::Node @" << (void*)this << endl;
00076 }
00077 
00078 void Node::attachToSession( const uint32_t id, 
00079                             const uint32_t instanceID, 
00080                             net::Session* session )
00081 {
00082     net::Object::attachToSession( id, instanceID, session );
00083 
00084     EQASSERT( _config );
00085     net::CommandQueue* queue = _config->getNodeThreadQueue();
00086 
00087     registerCommand( CMD_NODE_CREATE_PIPE, 
00088                      NodeFunc( this, &Node::_cmdCreatePipe ), queue );
00089     registerCommand( CMD_NODE_DESTROY_PIPE,
00090                      NodeFunc( this, &Node::_cmdDestroyPipe ), queue );
00091     registerCommand( CMD_NODE_CONFIG_INIT, 
00092                      NodeFunc( this, &Node::_cmdConfigInit ), queue );
00093     registerCommand( CMD_NODE_CONFIG_EXIT,
00094                      NodeFunc( this, &Node::_cmdConfigExit ), queue );
00095     registerCommand( CMD_NODE_FRAME_START,
00096                      NodeFunc( this, &Node::_cmdFrameStart ), queue );
00097     registerCommand( CMD_NODE_FRAME_FINISH,
00098                      NodeFunc( this, &Node::_cmdFrameFinish ), queue );
00099     registerCommand( CMD_NODE_FRAME_DRAW_FINISH, 
00100                      NodeFunc( this, &Node::_cmdFrameDrawFinish ), queue );
00101     registerCommand( CMD_NODE_FRAME_TASKS_FINISH, 
00102                      NodeFunc( this, &Node::_cmdFrameTasksFinish ), queue );
00103 }
00104 
00105 ClientPtr Node::getClient()
00106 {
00107     EQASSERT( _config );
00108     return (_config ? _config->getClient() : 0);
00109 }
00110 
00111 ServerPtr Node::getServer()
00112 {
00113     EQASSERT( _config );
00114     return (_config ? _config->getServer() : 0);
00115 }
00116 
00117 CommandQueue* Node::getNodeThreadQueue()
00118 {
00119     return getClient()->getNodeThreadQueue();
00120 }
00121 
00122 VisitorResult Node::accept( NodeVisitor& visitor )
00123 { 
00124     VisitorResult result = visitor.visitPre( this );
00125     if( result != TRAVERSE_CONTINUE )
00126         return result;
00127 
00128     for( PipeVector::const_iterator i = _pipes.begin(); 
00129          i != _pipes.end(); ++i )
00130     {
00131         Pipe* pipe = *i;
00132         switch( pipe->accept( visitor ))
00133         {
00134             case TRAVERSE_TERMINATE:
00135                 return TRAVERSE_TERMINATE;
00136 
00137             case TRAVERSE_PRUNE:
00138                 result = TRAVERSE_PRUNE;
00139                 break;
00140                 
00141             case TRAVERSE_CONTINUE:
00142             default:
00143                 break;
00144         }
00145     }
00146 
00147     switch( visitor.visitPost( this ))
00148     {
00149         case TRAVERSE_TERMINATE:
00150             return TRAVERSE_TERMINATE;
00151 
00152         case TRAVERSE_PRUNE:
00153             return TRAVERSE_PRUNE;
00154                 
00155         case TRAVERSE_CONTINUE:
00156         default:
00157             break;
00158     }
00159 
00160     return result;
00161 }
00162 
00163 void Node::_addPipe( Pipe* pipe )
00164 {
00165     EQASSERT( pipe->getNode() == this );
00166     _pipes.push_back( pipe );
00167 }
00168 
00169 void Node::_removePipe( Pipe* pipe )
00170 {
00171     PipeVector::iterator iter = find( _pipes.begin(), _pipes.end(), pipe );
00172     EQASSERT( iter != _pipes.end( ))
00173     
00174     _pipes.erase( iter );
00175 }
00176 
00177 Pipe* Node::_findPipe( const uint32_t id )
00178 {
00179     for( PipeVector::const_iterator i = _pipes.begin(); i != _pipes.end(); 
00180          ++i )
00181     {
00182         Pipe* pipe = *i;
00183         if( pipe->getID() == id )
00184             return pipe;
00185     }
00186     return 0;
00187 }
00188 
00189 net::Barrier* Node::getBarrier( const net::ObjectVersion barrier )
00190 {
00191     _barriersMutex.set();
00192     net::Barrier* netBarrier = _barriers[ barrier.id ];
00193 
00194     if( !netBarrier )
00195     {
00196         net::Session* session = getSession();
00197 
00198         netBarrier = new net::Barrier;
00199         netBarrier->makeThreadSafe();
00200         EQCHECK( session->mapObject( netBarrier, barrier.id ));
00201 
00202         _barriers[ barrier.id ] = netBarrier;
00203     }
00204     _barriersMutex.unset();
00205 
00206     netBarrier->sync( barrier.version );
00207     return netBarrier;
00208 }
00209 
00210 FrameData* Node::getFrameData( const net::ObjectVersion& dataVersion )
00211 {
00212     _frameDatasMutex.set();
00213     FrameData* frameData = _frameDatas[ dataVersion.id ];
00214 
00215     if( !frameData )
00216     {
00217         net::Session* session = getSession();
00218         
00219         frameData = new FrameData;
00220         frameData->makeThreadSafe();
00221         EQCHECK( session->mapObject( frameData, dataVersion.id ));
00222 
00223         _frameDatas[ dataVersion.id ] = frameData;
00224     }
00225     _frameDatasMutex.unset();
00226 
00227     if( frameData->getVersion() < dataVersion.version )
00228     {
00229         frameData->sync( dataVersion.version );
00230         frameData->update( dataVersion.version );
00231     }
00232     EQASSERT( frameData->getVersion() == dataVersion.version );
00233 
00234     return frameData;
00235 }
00236 
00237 void Node::waitInitialized() const
00238 {
00239     _state.waitGE( STATE_INIT_FAILED );
00240 }
00241 
00242 bool Node::isRunning() const
00243 {
00244     return (_state == STATE_RUNNING);
00245 }
00246 
00247 bool Node::configInit( const uint32_t initID )
00248 {
00249 #ifdef EQ_USE_MAGELLAN
00250 #  ifdef AGL
00251     AGLEventHandler::initMagellan( this );
00252 #  else
00253     EQUNIMPLEMENTED;
00254 #  endif
00255 #endif
00256     return true;
00257 }
00258 
00259 bool Node::configExit()
00260 {
00261 #ifdef EQ_USE_MAGELLAN
00262 #  ifdef AGL
00263     AGLEventHandler::exitMagellan( this );
00264 #  else
00265     EQUNIMPLEMENTED;
00266 #  endif
00267 #endif
00268     return true;
00269 }
00270 
00271 void Node::waitFrameStarted( const uint32_t frameNumber ) const
00272 {
00273     _currentFrame.waitGE( frameNumber );
00274 }
00275 
00276 void Node::startFrame( const uint32_t frameNumber ) 
00277 {
00278     _currentFrame = frameNumber;
00279 }
00280 
00281 void Node::_finishFrame( const uint32_t frameNumber ) const
00282 {
00283     for( PipeVector::const_iterator i = _pipes.begin(); i != _pipes.end(); ++i )
00284     {
00285         const Pipe* pipe = *i;
00286         EQASSERT( pipe->isThreaded() || 
00287                   pipe->getFinishedFrame() >= frameNumber );
00288 
00289         pipe->waitFrameLocal( frameNumber );
00290         pipe->waitFrameFinished( frameNumber );
00291     }
00292 }
00293 
00294 void Node::_frameFinish( const uint32_t frameID, const uint32_t frameNumber )
00295 {
00296     frameFinish( frameID, frameNumber );
00297     EQLOG( LOG_TASKS ) << "---- Finished Frame --- " << frameNumber << endl;
00298 
00299     if( _unlockedFrame < frameNumber )
00300     {
00301         EQWARN << "Finished frame was not locally unlocked, enforcing unlock" 
00302                << endl;
00303         releaseFrameLocal( frameNumber );
00304     }
00305 
00306     if( _finishedFrame < frameNumber )
00307     {
00308         EQWARN << "Finished frame was not released, enforcing unlock" << endl;
00309         releaseFrame( frameNumber );
00310     }
00311 }
00312 
00313 void Node::releaseFrame( const uint32_t frameNumber )
00314 {
00315     EQASSERTINFO( _currentFrame >= frameNumber, 
00316                   "current " << _currentFrame << " release " << frameNumber );
00317 
00318     if( _finishedFrame >= frameNumber )
00319         return;
00320     _finishedFrame = frameNumber;
00321 
00322     NodeFrameFinishReplyPacket packet;
00323     packet.frameNumber = frameNumber;
00324 
00325     ServerPtr server = _config->getServer();
00326     net::NodePtr node = server.get();
00327     send( node, packet );
00328 }
00329 
00330 void Node::releaseFrameLocal( const uint32_t frameNumber )
00331 {
00332     EQASSERT( _unlockedFrame <= frameNumber );
00333     _unlockedFrame = frameNumber;
00334     
00335     Config* config = getConfig();
00336     EQASSERT( config->getNodes().size() == 1 );
00337     EQASSERT( config->getNodes()[0] == this );
00338     config->releaseFrameLocal( frameNumber );
00339 
00340     EQLOG( LOG_TASKS ) << "---- Unlocked Frame --- " << _unlockedFrame << endl;
00341 }
00342 
00343 void Node::frameStart( const uint32_t frameID, const uint32_t frameNumber )
00344 {
00345     startFrame( frameNumber ); // unlock pipe threads
00346     
00347     switch( getIAttribute( IATTR_THREAD_MODEL ))
00348     {
00349         case ASYNC:
00350             // Don't wait for pipes to release frame locally, sync not needed
00351             releaseFrameLocal( frameNumber );
00352             break;
00353 
00354         case DRAW_SYNC:  // Sync and release in frameDrawFinish
00355         case LOCAL_SYNC: // Sync and release in frameTasksFinish
00356             break;
00357 
00358         default:
00359             EQUNIMPLEMENTED;
00360     }
00361 }
00362 
00363 void Node::frameDrawFinish( const uint32_t frameID, const uint32_t frameNumber )
00364 {
00365     switch( getIAttribute( IATTR_THREAD_MODEL ))
00366     {
00367         case ASYNC:      // No sync, release in frameStart
00368         case LOCAL_SYNC: // Sync and release in frameTasksFinish
00369             break;
00370 
00371         case DRAW_SYNC:
00372             for( PipeVector::const_iterator i = _pipes.begin();
00373                  i != _pipes.end(); ++i )
00374             {
00375                 const Pipe* pipe = *i;
00376                 if( pipe->getTasks() & TASK_DRAW )
00377                     pipe->waitFrameLocal( frameNumber );
00378             }
00379             
00380             releaseFrameLocal( frameNumber );
00381             break;
00382 
00383         default:
00384             EQUNIMPLEMENTED;
00385     }
00386 }
00387 
00388 void Node::frameTasksFinish( const uint32_t frameID, const uint32_t frameNumber)
00389 {
00390     switch( getIAttribute( IATTR_THREAD_MODEL ))
00391     {
00392         case ASYNC:      // No sync, release in frameStart
00393         case DRAW_SYNC:  // Sync and release in frameDrawFinish
00394             break;
00395 
00396         case LOCAL_SYNC:
00397             for( PipeVector::const_iterator i = _pipes.begin();
00398                  i != _pipes.end(); ++i )
00399             {
00400                 const Pipe* pipe = *i;
00401                 if( pipe->getTasks() != TASK_NONE )
00402                     pipe->waitFrameLocal( frameNumber );
00403             }
00404             
00405             releaseFrameLocal( frameNumber );
00406             break;
00407 
00408         default:
00409             EQUNIMPLEMENTED;
00410     }
00411 }
00412 
00413 void Node::setErrorMessage( const std::string& message )
00414 {
00415     _error = message;
00416 }
00417 
00418 void Node::_flushObjects()
00419 {
00420     net::Session* session = getSession();
00421 
00422     _barriersMutex.set();
00423     for( BarrierHash::const_iterator i =_barriers.begin();
00424          i != _barriers.end(); ++ i )
00425     {
00426         net::Barrier* barrier = i->second;
00427         session->unmapObject( barrier );
00428         delete barrier;
00429     }
00430     _barriers.clear();
00431     _barriersMutex.unset();
00432 
00433     _frameDatasMutex.set();
00434     for( FrameDataHash::const_iterator i = _frameDatas.begin(); 
00435          i != _frameDatas.end(); ++ i )
00436     {
00437         FrameData* frameData = i->second;
00438         session->unmapObject( frameData );
00439         delete frameData;
00440     }
00441     _frameDatas.clear();
00442     _frameDatasMutex.unset();
00443 }
00444 
00445 void Node::setIAttribute( const IAttribute attr, const int32_t value )
00446 {
00447     _iAttributes[attr] = value;
00448 }
00449 
00450 int32_t Node::getIAttribute( const IAttribute attr ) const
00451 {
00452     return _iAttributes[attr];
00453 }
00454 
00455 const std::string& Node::getIAttributeString( const IAttribute attr )
00456 {
00457     return _iAttributeStrings[attr];
00458 }
00459 
00460 void Node::TransmitThread::send( FrameData* data, net::NodePtr node, 
00461                                  const uint32_t frameNumber )
00462 {
00463     _tasks.push( Task( data, node, frameNumber ));
00464 }
00465 
00466 void* Node::TransmitThread::run()
00467 {
00468     while( true )
00469     {
00470         const Task task = _tasks.pop();
00471         if( _tasks.isEmpty() && !task.node )
00472             return 0; // exit thread
00473         
00474         EQLOG( LOG_ASSEMBLY ) << "node transmit " << task.data->getID()
00475                               << " to " << task.node->getNodeID() << endl;
00476         task.data->transmit( task.node, task.frameNumber );
00477     }
00478     return 0;
00479 }
00480 
00481 //---------------------------------------------------------------------------
00482 // command handlers
00483 //---------------------------------------------------------------------------
00484 net::CommandResult Node::_cmdCreatePipe( net::Command& command )
00485 {
00486     const NodeCreatePipePacket* packet = 
00487         command.getPacket<NodeCreatePipePacket>();
00488     EQLOG( LOG_INIT ) << "Create pipe " << packet << endl;
00489 
00490     CHECK_THREAD( _nodeThread );
00491     EQASSERT( packet->pipeID != EQ_ID_INVALID );
00492 
00493     Pipe* pipe = Global::getNodeFactory()->createPipe( this );
00494 
00495     if( packet->threaded )
00496         pipe->startThread();
00497 
00498     _config->attachObject( pipe, packet->pipeID, EQ_ID_INVALID );
00499 
00500     return net::COMMAND_HANDLED;
00501 }
00502 
00503 net::CommandResult Node::_cmdDestroyPipe( net::Command& command )
00504 {
00505     const NodeDestroyPipePacket* packet = 
00506         command.getPacket<NodeDestroyPipePacket>();
00507     EQLOG( LOG_INIT ) << "Destroy pipe " << packet << endl;
00508 
00509     CHECK_THREAD( _nodeThread );
00510     Pipe* pipe = _findPipe( packet->pipeID );
00511     pipe->joinThread();
00512 
00513     _config->detachObject( pipe );
00514     Global::getNodeFactory()->releasePipe( pipe );
00515 
00516     return net::COMMAND_HANDLED;
00517 }
00518 
00519 net::CommandResult Node::_cmdConfigInit( net::Command& command )
00520 {
00521     CHECK_THREAD( _nodeThread );
00522 
00523     const NodeConfigInitPacket* packet = 
00524         command.getPacket<NodeConfigInitPacket>();
00525     EQLOG( LOG_INIT ) << "Init node " << packet << endl;
00526 
00527     _state = STATE_INITIALIZING;
00528     _name  = packet->name;
00529     _tasks = packet->tasks;
00530 
00531     memcpy( _iAttributes, packet->iAttributes, IATTR_ALL * sizeof( int32_t ));
00532 
00533     _currentFrame  = packet->frameNumber;
00534     _unlockedFrame = packet->frameNumber;
00535     _finishedFrame = packet->frameNumber;
00536 
00537     transmitter.start();
00538     _error.clear();
00539     NodeConfigInitReplyPacket reply;
00540     reply.result = configInit( packet->initID );
00541 
00542     if( _iAttributes[ IATTR_THREAD_MODEL ] == eq::UNDEFINED )
00543         _iAttributes[ IATTR_THREAD_MODEL ] = eq::DRAW_SYNC;
00544 
00545     _state = reply.result ? STATE_RUNNING : STATE_INIT_FAILED;
00546 
00547     memcpy( reply.iAttributes, _iAttributes, IATTR_ALL * sizeof( int32_t ));
00548     send( command.getNode(), reply, _error );
00549     return net::COMMAND_HANDLED;
00550 }
00551 
00552 net::CommandResult Node::_cmdConfigExit( net::Command& command )
00553 {
00554     const NodeConfigExitPacket* packet = 
00555         command.getPacket<NodeConfigExitPacket>();
00556     EQLOG( LOG_INIT ) << "Node exit " << packet << endl;
00557 
00558     CHECK_THREAD( _nodeThread );
00559     for( PipeVector::const_iterator i = _pipes.begin(); i != _pipes.end(); 
00560          ++i )
00561     {
00562         Pipe* pipe = *i;
00563         pipe->waitExited();
00564     }
00565     
00566     NodeConfigExitReplyPacket reply;
00567     reply.result = configExit();
00568 
00569     transmitter.send( 0, 0, 0 );
00570     transmitter.join();
00571 
00572     _state = STATE_STOPPED;
00573     _flushObjects();
00574 
00575     send( command.getNode(), reply );
00576     return net::COMMAND_HANDLED;
00577 }
00578 
00579 net::CommandResult Node::_cmdFrameStart( net::Command& command )
00580 {
00581     CHECK_THREAD( _nodeThread );
00582     const NodeFrameStartPacket* packet = 
00583         command.getPacket<NodeFrameStartPacket>();
00584     EQVERB << "handle node frame start " << packet << endl;
00585 
00586     const uint32_t frameNumber = packet->frameNumber;
00587     EQASSERT( _currentFrame == frameNumber-1 );
00588 
00589     EQLOG( LOG_TASKS ) << "----- Begin Frame ----- " << frameNumber << endl;
00590 
00591     frameStart( packet->frameID, frameNumber );
00592     EQASSERTINFO( _currentFrame >= frameNumber, 
00593                   "Node::frameStart() did not start frame " << frameNumber );
00594 
00595     return net::COMMAND_HANDLED;
00596 }
00597 
00598 net::CommandResult Node::_cmdFrameFinish( net::Command& command )
00599 {
00600     CHECK_THREAD( _nodeThread );
00601     const NodeFrameFinishPacket* packet = 
00602         command.getPacket<NodeFrameFinishPacket>();
00603     EQLOG( LOG_TASKS ) << "TASK frame finish " << getName() <<  " " << packet
00604                        << endl;
00605 
00606     const uint32_t frameNumber = packet->frameNumber;
00607 
00608     _finishFrame( frameNumber );
00609     _frameFinish( packet->frameID, frameNumber );
00610     return net::COMMAND_HANDLED;
00611 }
00612 
00613 net::CommandResult Node::_cmdFrameDrawFinish( net::Command& command )
00614 {
00615     NodeFrameDrawFinishPacket* packet = 
00616         command.getPacket< NodeFrameDrawFinishPacket >();
00617     EQLOG( LOG_TASKS ) << "TASK draw finish " << getName() <<  " " << packet
00618                        << endl;
00619 
00620     frameDrawFinish( packet->frameID, packet->frameNumber );
00621     return net::COMMAND_HANDLED;
00622 }
00623 
00624 net::CommandResult Node::_cmdFrameTasksFinish( net::Command& command )
00625 {
00626     NodeFrameTasksFinishPacket* packet = 
00627         command.getPacket< NodeFrameTasksFinishPacket >();
00628     EQLOG( LOG_TASKS ) << "TASK tasks finish " << getName() <<  " " << packet
00629                        << endl;
00630 
00631     frameTasksFinish( packet->frameID, packet->frameNumber );
00632     return net::COMMAND_HANDLED;
00633 }
00634 }
Generated on Mon Aug 10 18:58:40 2009 for Equalizer 0.9 by  doxygen 1.5.8