00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include <pthread.h>
00019
00020 #include "node.h"
00021
00022 #include "client.h"
00023 #include "commands.h"
00024 #include "config.h"
00025 #include "frameData.h"
00026 #include "global.h"
00027 #include "log.h"
00028 #include "nodeFactory.h"
00029 #include "nodeVisitor.h"
00030 #include "packets.h"
00031 #include "pipe.h"
00032 #include "server.h"
00033 #include "task.h"
00034
00035 #ifdef AGL
00036 # include "aglEventHandler.h"
00037 #endif
00038
00039 #include <eq/base/scopedMutex.h>
00040 #include <eq/net/command.h>
00041 #include <eq/net/connection.h>
00042
00043 using namespace eq::base;
00044 using namespace std;
00045
00046 namespace eq
00047 {
00049 typedef net::CommandFunc<Node> NodeFunc;
00052 #define MAKE_ATTR_STRING( attr ) ( string("EQ_NODE_") + #attr )
00053 std::string Node::_iAttributeStrings[IATTR_ALL] = {
00054 MAKE_ATTR_STRING( IATTR_THREAD_MODEL ),
00055 MAKE_ATTR_STRING( IATTR_FILL1 ),
00056 MAKE_ATTR_STRING( IATTR_FILL2 )
00057 };
00058
00059 Node::Node( Config* parent )
00060 : transmitter( this )
00061 , _config( parent )
00062 , _tasks( TASK_NONE )
00063 , _state( STATE_STOPPED )
00064 , _unlockedFrame( 0 )
00065 , _finishedFrame( 0 )
00066 {
00067 parent->_addNode( this );
00068 EQINFO << " New eq::Node @" << (void*)this << endl;
00069 }
00070
00071 Node::~Node()
00072 {
00073 _config->_removeNode( this );
00074
00075 EQINFO << " Delete eq::Node @" << (void*)this << endl;
00076 }
00077
00078 void Node::attachToSession( const uint32_t id,
00079 const uint32_t instanceID,
00080 net::Session* session )
00081 {
00082 net::Object::attachToSession( id, instanceID, session );
00083
00084 EQASSERT( _config );
00085 net::CommandQueue* queue = _config->getNodeThreadQueue();
00086
00087 registerCommand( CMD_NODE_CREATE_PIPE,
00088 NodeFunc( this, &Node::_cmdCreatePipe ), queue );
00089 registerCommand( CMD_NODE_DESTROY_PIPE,
00090 NodeFunc( this, &Node::_cmdDestroyPipe ), queue );
00091 registerCommand( CMD_NODE_CONFIG_INIT,
00092 NodeFunc( this, &Node::_cmdConfigInit ), queue );
00093 registerCommand( CMD_NODE_CONFIG_EXIT,
00094 NodeFunc( this, &Node::_cmdConfigExit ), queue );
00095 registerCommand( CMD_NODE_FRAME_START,
00096 NodeFunc( this, &Node::_cmdFrameStart ), queue );
00097 registerCommand( CMD_NODE_FRAME_FINISH,
00098 NodeFunc( this, &Node::_cmdFrameFinish ), queue );
00099 registerCommand( CMD_NODE_FRAME_DRAW_FINISH,
00100 NodeFunc( this, &Node::_cmdFrameDrawFinish ), queue );
00101 registerCommand( CMD_NODE_FRAME_TASKS_FINISH,
00102 NodeFunc( this, &Node::_cmdFrameTasksFinish ), queue );
00103 }
00104
00105 ClientPtr Node::getClient()
00106 {
00107 EQASSERT( _config );
00108 return (_config ? _config->getClient() : 0);
00109 }
00110
00111 ServerPtr Node::getServer()
00112 {
00113 EQASSERT( _config );
00114 return (_config ? _config->getServer() : 0);
00115 }
00116
00117 CommandQueue* Node::getNodeThreadQueue()
00118 {
00119 return getClient()->getNodeThreadQueue();
00120 }
00121
00122 VisitorResult Node::accept( NodeVisitor& visitor )
00123 {
00124 VisitorResult result = visitor.visitPre( this );
00125 if( result != TRAVERSE_CONTINUE )
00126 return result;
00127
00128 for( PipeVector::const_iterator i = _pipes.begin();
00129 i != _pipes.end(); ++i )
00130 {
00131 Pipe* pipe = *i;
00132 switch( pipe->accept( visitor ))
00133 {
00134 case TRAVERSE_TERMINATE:
00135 return TRAVERSE_TERMINATE;
00136
00137 case TRAVERSE_PRUNE:
00138 result = TRAVERSE_PRUNE;
00139 break;
00140
00141 case TRAVERSE_CONTINUE:
00142 default:
00143 break;
00144 }
00145 }
00146
00147 switch( visitor.visitPost( this ))
00148 {
00149 case TRAVERSE_TERMINATE:
00150 return TRAVERSE_TERMINATE;
00151
00152 case TRAVERSE_PRUNE:
00153 return TRAVERSE_PRUNE;
00154
00155 case TRAVERSE_CONTINUE:
00156 default:
00157 break;
00158 }
00159
00160 return result;
00161 }
00162
00163 void Node::_addPipe( Pipe* pipe )
00164 {
00165 EQASSERT( pipe->getNode() == this );
00166 _pipes.push_back( pipe );
00167 }
00168
00169 void Node::_removePipe( Pipe* pipe )
00170 {
00171 PipeVector::iterator iter = find( _pipes.begin(), _pipes.end(), pipe );
00172 EQASSERT( iter != _pipes.end( ))
00173
00174 _pipes.erase( iter );
00175 }
00176
00177 Pipe* Node::_findPipe( const uint32_t id )
00178 {
00179 for( PipeVector::const_iterator i = _pipes.begin(); i != _pipes.end();
00180 ++i )
00181 {
00182 Pipe* pipe = *i;
00183 if( pipe->getID() == id )
00184 return pipe;
00185 }
00186 return 0;
00187 }
00188
00189 net::Barrier* Node::getBarrier( const net::ObjectVersion barrier )
00190 {
00191 _barriersMutex.set();
00192 net::Barrier* netBarrier = _barriers[ barrier.id ];
00193
00194 if( !netBarrier )
00195 {
00196 net::Session* session = getSession();
00197
00198 netBarrier = new net::Barrier;
00199 netBarrier->makeThreadSafe();
00200 EQCHECK( session->mapObject( netBarrier, barrier.id ));
00201
00202 _barriers[ barrier.id ] = netBarrier;
00203 }
00204 _barriersMutex.unset();
00205
00206 netBarrier->sync( barrier.version );
00207 return netBarrier;
00208 }
00209
00210 FrameData* Node::getFrameData( const net::ObjectVersion& dataVersion )
00211 {
00212 _frameDatasMutex.set();
00213 FrameData* frameData = _frameDatas[ dataVersion.id ];
00214
00215 if( !frameData )
00216 {
00217 net::Session* session = getSession();
00218
00219 frameData = new FrameData;
00220 frameData->makeThreadSafe();
00221 EQCHECK( session->mapObject( frameData, dataVersion.id ));
00222
00223 _frameDatas[ dataVersion.id ] = frameData;
00224 }
00225 _frameDatasMutex.unset();
00226
00227 if( frameData->getVersion() < dataVersion.version )
00228 {
00229 frameData->sync( dataVersion.version );
00230 frameData->update( dataVersion.version );
00231 }
00232 EQASSERT( frameData->getVersion() == dataVersion.version );
00233
00234 return frameData;
00235 }
00236
00237 void Node::waitInitialized() const
00238 {
00239 _state.waitGE( STATE_INIT_FAILED );
00240 }
00241
00242 bool Node::isRunning() const
00243 {
00244 return (_state == STATE_RUNNING);
00245 }
00246
00247 bool Node::configInit( const uint32_t initID )
00248 {
00249 #ifdef EQ_USE_MAGELLAN
00250 # ifdef AGL
00251 AGLEventHandler::initMagellan( this );
00252 # else
00253 EQUNIMPLEMENTED;
00254 # endif
00255 #endif
00256 return true;
00257 }
00258
00259 bool Node::configExit()
00260 {
00261 #ifdef EQ_USE_MAGELLAN
00262 # ifdef AGL
00263 AGLEventHandler::exitMagellan( this );
00264 # else
00265 EQUNIMPLEMENTED;
00266 # endif
00267 #endif
00268 return true;
00269 }
00270
00271 void Node::waitFrameStarted( const uint32_t frameNumber ) const
00272 {
00273 _currentFrame.waitGE( frameNumber );
00274 }
00275
00276 void Node::startFrame( const uint32_t frameNumber )
00277 {
00278 _currentFrame = frameNumber;
00279 }
00280
00281 void Node::_finishFrame( const uint32_t frameNumber ) const
00282 {
00283 for( PipeVector::const_iterator i = _pipes.begin(); i != _pipes.end(); ++i )
00284 {
00285 const Pipe* pipe = *i;
00286 EQASSERT( pipe->isThreaded() ||
00287 pipe->getFinishedFrame() >= frameNumber );
00288
00289 pipe->waitFrameLocal( frameNumber );
00290 pipe->waitFrameFinished( frameNumber );
00291 }
00292 }
00293
00294 void Node::_frameFinish( const uint32_t frameID, const uint32_t frameNumber )
00295 {
00296 frameFinish( frameID, frameNumber );
00297 EQLOG( LOG_TASKS ) << "---- Finished Frame --- " << frameNumber << endl;
00298
00299 if( _unlockedFrame < frameNumber )
00300 {
00301 EQWARN << "Finished frame was not locally unlocked, enforcing unlock"
00302 << endl;
00303 releaseFrameLocal( frameNumber );
00304 }
00305
00306 if( _finishedFrame < frameNumber )
00307 {
00308 EQWARN << "Finished frame was not released, enforcing unlock" << endl;
00309 releaseFrame( frameNumber );
00310 }
00311 }
00312
00313 void Node::releaseFrame( const uint32_t frameNumber )
00314 {
00315 EQASSERTINFO( _currentFrame >= frameNumber,
00316 "current " << _currentFrame << " release " << frameNumber );
00317
00318 if( _finishedFrame >= frameNumber )
00319 return;
00320 _finishedFrame = frameNumber;
00321
00322 NodeFrameFinishReplyPacket packet;
00323 packet.frameNumber = frameNumber;
00324
00325 ServerPtr server = _config->getServer();
00326 net::NodePtr node = server.get();
00327 send( node, packet );
00328 }
00329
00330 void Node::releaseFrameLocal( const uint32_t frameNumber )
00331 {
00332 EQASSERT( _unlockedFrame <= frameNumber );
00333 _unlockedFrame = frameNumber;
00334
00335 Config* config = getConfig();
00336 EQASSERT( config->getNodes().size() == 1 );
00337 EQASSERT( config->getNodes()[0] == this );
00338 config->releaseFrameLocal( frameNumber );
00339
00340 EQLOG( LOG_TASKS ) << "---- Unlocked Frame --- " << _unlockedFrame << endl;
00341 }
00342
00343 void Node::frameStart( const uint32_t frameID, const uint32_t frameNumber )
00344 {
00345 startFrame( frameNumber );
00346
00347 switch( getIAttribute( IATTR_THREAD_MODEL ))
00348 {
00349 case ASYNC:
00350
00351 releaseFrameLocal( frameNumber );
00352 break;
00353
00354 case DRAW_SYNC:
00355 case LOCAL_SYNC:
00356 break;
00357
00358 default:
00359 EQUNIMPLEMENTED;
00360 }
00361 }
00362
00363 void Node::frameDrawFinish( const uint32_t frameID, const uint32_t frameNumber )
00364 {
00365 switch( getIAttribute( IATTR_THREAD_MODEL ))
00366 {
00367 case ASYNC:
00368 case LOCAL_SYNC:
00369 break;
00370
00371 case DRAW_SYNC:
00372 for( PipeVector::const_iterator i = _pipes.begin();
00373 i != _pipes.end(); ++i )
00374 {
00375 const Pipe* pipe = *i;
00376 if( pipe->getTasks() & TASK_DRAW )
00377 pipe->waitFrameLocal( frameNumber );
00378 }
00379
00380 releaseFrameLocal( frameNumber );
00381 break;
00382
00383 default:
00384 EQUNIMPLEMENTED;
00385 }
00386 }
00387
00388 void Node::frameTasksFinish( const uint32_t frameID, const uint32_t frameNumber)
00389 {
00390 switch( getIAttribute( IATTR_THREAD_MODEL ))
00391 {
00392 case ASYNC:
00393 case DRAW_SYNC:
00394 break;
00395
00396 case LOCAL_SYNC:
00397 for( PipeVector::const_iterator i = _pipes.begin();
00398 i != _pipes.end(); ++i )
00399 {
00400 const Pipe* pipe = *i;
00401 if( pipe->getTasks() != TASK_NONE )
00402 pipe->waitFrameLocal( frameNumber );
00403 }
00404
00405 releaseFrameLocal( frameNumber );
00406 break;
00407
00408 default:
00409 EQUNIMPLEMENTED;
00410 }
00411 }
00412
00413 void Node::setErrorMessage( const std::string& message )
00414 {
00415 _error = message;
00416 }
00417
00418 void Node::_flushObjects()
00419 {
00420 net::Session* session = getSession();
00421
00422 _barriersMutex.set();
00423 for( BarrierHash::const_iterator i =_barriers.begin();
00424 i != _barriers.end(); ++ i )
00425 {
00426 net::Barrier* barrier = i->second;
00427 session->unmapObject( barrier );
00428 delete barrier;
00429 }
00430 _barriers.clear();
00431 _barriersMutex.unset();
00432
00433 _frameDatasMutex.set();
00434 for( FrameDataHash::const_iterator i = _frameDatas.begin();
00435 i != _frameDatas.end(); ++ i )
00436 {
00437 FrameData* frameData = i->second;
00438 session->unmapObject( frameData );
00439 delete frameData;
00440 }
00441 _frameDatas.clear();
00442 _frameDatasMutex.unset();
00443 }
00444
00445 void Node::setIAttribute( const IAttribute attr, const int32_t value )
00446 {
00447 _iAttributes[attr] = value;
00448 }
00449
00450 int32_t Node::getIAttribute( const IAttribute attr ) const
00451 {
00452 return _iAttributes[attr];
00453 }
00454
00455 const std::string& Node::getIAttributeString( const IAttribute attr )
00456 {
00457 return _iAttributeStrings[attr];
00458 }
00459
00460 void Node::TransmitThread::send( FrameData* data, net::NodePtr node,
00461 const uint32_t frameNumber )
00462 {
00463 _tasks.push( Task( data, node, frameNumber ));
00464 }
00465
00466 void* Node::TransmitThread::run()
00467 {
00468 while( true )
00469 {
00470 const Task task = _tasks.pop();
00471 if( _tasks.isEmpty() && !task.node )
00472 return 0;
00473
00474 EQLOG( LOG_ASSEMBLY ) << "node transmit " << task.data->getID()
00475 << " to " << task.node->getNodeID() << endl;
00476 task.data->transmit( task.node, task.frameNumber );
00477 }
00478 return 0;
00479 }
00480
00481
00482
00483
00484 net::CommandResult Node::_cmdCreatePipe( net::Command& command )
00485 {
00486 const NodeCreatePipePacket* packet =
00487 command.getPacket<NodeCreatePipePacket>();
00488 EQLOG( LOG_INIT ) << "Create pipe " << packet << endl;
00489
00490 CHECK_THREAD( _nodeThread );
00491 EQASSERT( packet->pipeID != EQ_ID_INVALID );
00492
00493 Pipe* pipe = Global::getNodeFactory()->createPipe( this );
00494
00495 if( packet->threaded )
00496 pipe->startThread();
00497
00498 _config->attachObject( pipe, packet->pipeID, EQ_ID_INVALID );
00499
00500 return net::COMMAND_HANDLED;
00501 }
00502
00503 net::CommandResult Node::_cmdDestroyPipe( net::Command& command )
00504 {
00505 const NodeDestroyPipePacket* packet =
00506 command.getPacket<NodeDestroyPipePacket>();
00507 EQLOG( LOG_INIT ) << "Destroy pipe " << packet << endl;
00508
00509 CHECK_THREAD( _nodeThread );
00510 Pipe* pipe = _findPipe( packet->pipeID );
00511 pipe->joinThread();
00512
00513 _config->detachObject( pipe );
00514 Global::getNodeFactory()->releasePipe( pipe );
00515
00516 return net::COMMAND_HANDLED;
00517 }
00518
00519 net::CommandResult Node::_cmdConfigInit( net::Command& command )
00520 {
00521 CHECK_THREAD( _nodeThread );
00522
00523 const NodeConfigInitPacket* packet =
00524 command.getPacket<NodeConfigInitPacket>();
00525 EQLOG( LOG_INIT ) << "Init node " << packet << endl;
00526
00527 _state = STATE_INITIALIZING;
00528 _name = packet->name;
00529 _tasks = packet->tasks;
00530
00531 memcpy( _iAttributes, packet->iAttributes, IATTR_ALL * sizeof( int32_t ));
00532
00533 _currentFrame = packet->frameNumber;
00534 _unlockedFrame = packet->frameNumber;
00535 _finishedFrame = packet->frameNumber;
00536
00537 transmitter.start();
00538 _error.clear();
00539 NodeConfigInitReplyPacket reply;
00540 reply.result = configInit( packet->initID );
00541
00542 if( _iAttributes[ IATTR_THREAD_MODEL ] == eq::UNDEFINED )
00543 _iAttributes[ IATTR_THREAD_MODEL ] = eq::DRAW_SYNC;
00544
00545 _state = reply.result ? STATE_RUNNING : STATE_INIT_FAILED;
00546
00547 memcpy( reply.iAttributes, _iAttributes, IATTR_ALL * sizeof( int32_t ));
00548 send( command.getNode(), reply, _error );
00549 return net::COMMAND_HANDLED;
00550 }
00551
00552 net::CommandResult Node::_cmdConfigExit( net::Command& command )
00553 {
00554 const NodeConfigExitPacket* packet =
00555 command.getPacket<NodeConfigExitPacket>();
00556 EQLOG( LOG_INIT ) << "Node exit " << packet << endl;
00557
00558 CHECK_THREAD( _nodeThread );
00559 for( PipeVector::const_iterator i = _pipes.begin(); i != _pipes.end();
00560 ++i )
00561 {
00562 Pipe* pipe = *i;
00563 pipe->waitExited();
00564 }
00565
00566 NodeConfigExitReplyPacket reply;
00567 reply.result = configExit();
00568
00569 transmitter.send( 0, 0, 0 );
00570 transmitter.join();
00571
00572 _state = STATE_STOPPED;
00573 _flushObjects();
00574
00575 send( command.getNode(), reply );
00576 return net::COMMAND_HANDLED;
00577 }
00578
00579 net::CommandResult Node::_cmdFrameStart( net::Command& command )
00580 {
00581 CHECK_THREAD( _nodeThread );
00582 const NodeFrameStartPacket* packet =
00583 command.getPacket<NodeFrameStartPacket>();
00584 EQVERB << "handle node frame start " << packet << endl;
00585
00586 const uint32_t frameNumber = packet->frameNumber;
00587 EQASSERT( _currentFrame == frameNumber-1 );
00588
00589 EQLOG( LOG_TASKS ) << "----- Begin Frame ----- " << frameNumber << endl;
00590
00591 frameStart( packet->frameID, frameNumber );
00592 EQASSERTINFO( _currentFrame >= frameNumber,
00593 "Node::frameStart() did not start frame " << frameNumber );
00594
00595 return net::COMMAND_HANDLED;
00596 }
00597
00598 net::CommandResult Node::_cmdFrameFinish( net::Command& command )
00599 {
00600 CHECK_THREAD( _nodeThread );
00601 const NodeFrameFinishPacket* packet =
00602 command.getPacket<NodeFrameFinishPacket>();
00603 EQLOG( LOG_TASKS ) << "TASK frame finish " << getName() << " " << packet
00604 << endl;
00605
00606 const uint32_t frameNumber = packet->frameNumber;
00607
00608 _finishFrame( frameNumber );
00609 _frameFinish( packet->frameID, frameNumber );
00610 return net::COMMAND_HANDLED;
00611 }
00612
00613 net::CommandResult Node::_cmdFrameDrawFinish( net::Command& command )
00614 {
00615 NodeFrameDrawFinishPacket* packet =
00616 command.getPacket< NodeFrameDrawFinishPacket >();
00617 EQLOG( LOG_TASKS ) << "TASK draw finish " << getName() << " " << packet
00618 << endl;
00619
00620 frameDrawFinish( packet->frameID, packet->frameNumber );
00621 return net::COMMAND_HANDLED;
00622 }
00623
00624 net::CommandResult Node::_cmdFrameTasksFinish( net::Command& command )
00625 {
00626 NodeFrameTasksFinishPacket* packet =
00627 command.getPacket< NodeFrameTasksFinishPacket >();
00628 EQLOG( LOG_TASKS ) << "TASK tasks finish " << getName() << " " << packet
00629 << endl;
00630
00631 frameTasksFinish( packet->frameID, packet->frameNumber );
00632 return net::COMMAND_HANDLED;
00633 }
00634 }