server/node.cpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include <pthread.h>
00019 #include "node.h"
00020
00021 #include "channel.h"
00022 #include "config.h"
00023 #include "global.h"
00024 #include "log.h"
00025 #include "nodeVisitor.h"
00026 #include "paths.h"
00027 #include "pipe.h"
00028 #include "server.h"
00029 #include "window.h"
00030
00031 #include <eq/net/barrier.h>
00032 #include <eq/net/command.h>
00033 #include <eq/client/packets.h>
00034
00035 namespace eq
00036 {
00037 namespace server
00038 {
00039 typedef net::CommandFunc<Node> NodeFunc;
00040
00041 void Node::_construct()
00042 {
00043 _active = 0;
00044 _config = 0;
00045 _tasks = TASK_NONE;
00046 _lastDrawPipe = 0;
00047 _flushedFrame = 0;
00048 _finishedFrame = 0;
00049 EQINFO << "New node @" << (void*)this << std::endl;
00050 }
00051
00052 Node::Node()
00053 {
00054 _construct();
00055
00056 const Global* global = Global::instance();
00057 for( int i=0; i < eq::Node::IATTR_ALL; ++i )
00058 _iAttributes[i] =global->getNodeIAttribute((eq::Node::IAttribute)i);
00059 }
00060
00061 Node::Node( const Node& from, Config* config )
00062 : net::Object()
00063 {
00064 _construct();
00065
00066 _name = from._name;
00067 _node = from._node;
00068
00069 config->addNode( this );
00070
00071 memcpy( _iAttributes, from._iAttributes,
00072 eq::Node::IATTR_ALL * sizeof( int32_t ));
00073
00074 const ConnectionDescriptionVector& descriptions =
00075 from.getConnectionDescriptions();
00076 for( ConnectionDescriptionVector::const_iterator i = descriptions.begin();
00077 i != descriptions.end(); ++i )
00078 {
00079 const ConnectionDescriptionPtr desc = *i;
00080 addConnectionDescription( new ConnectionDescription( *desc ));
00081 }
00082
00083 const PipeVector& pipes = from.getPipes();
00084 for( PipeVector::const_iterator i = pipes.begin(); i != pipes.end(); ++i )
00085 new Pipe( **i, this );
00086 }
00087
00088 Node::~Node()
00089 {
00090 EQINFO << "Delete node @" << (void*)this << std::endl;
00091
00092 if( _config )
00093 _config->removeNode( this );
00094
00095 for( PipeVector::const_iterator i = _pipes.begin(); i != _pipes.end(); ++i )
00096 {
00097 Pipe* pipe = *i;
00098
00099 pipe->_node = 0;
00100 delete pipe;
00101 }
00102 _pipes.clear();
00103 }
00104
00105 void Node::attachToSession( const uint32_t id, const uint32_t instanceID,
00106 net::Session* session )
00107 {
00108 net::Object::attachToSession( id, instanceID, session );
00109
00110 net::CommandQueue* queue = getCommandThreadQueue();
00111
00112 registerCommand( CMD_NODE_CONFIG_INIT_REPLY,
00113 NodeFunc( this, &Node::_cmdConfigInitReply ), queue );
00114 registerCommand( CMD_NODE_CONFIG_EXIT_REPLY,
00115 NodeFunc( this, &Node::_cmdConfigExitReply ), queue );
00116 registerCommand( CMD_NODE_FRAME_FINISH_REPLY,
00117 NodeFunc( this, &Node::_cmdFrameFinishReply ), queue );
00118 }
00119
00120 void Node::addPipe( Pipe* pipe )
00121 {
00122 EQASSERT( pipe->getWindows().empty( ));
00123
00124 _pipes.push_back( pipe );
00125 pipe->_node = this;
00126 }
00127
00128 bool Node::removePipe( Pipe* pipe )
00129 {
00130 EQASSERT( pipe->getWindows().empty( ));
00131
00132 PipeVector::iterator i = find( _pipes.begin(), _pipes.end(), pipe );
00133 if( i == _pipes.end( ))
00134 return false;
00135
00136 _pipes.erase( i );
00137 pipe->_node = 0;
00138
00139 return true;
00140 }
00141
00142 NodePath Node::getPath() const
00143 {
00144 EQASSERT( _config );
00145
00146 const NodeVector& nodes = _config->getNodes();
00147 NodeVector::const_iterator i = std::find( nodes.begin(), nodes.end(),
00148 this );
00149 EQASSERT( i != nodes.end( ));
00150
00151 NodePath path;
00152 path.nodeIndex = std::distance( nodes.begin(), i );
00153 return path;
00154 }
00155
00156 Channel* Node::getChannel( const ChannelPath& path )
00157 {
00158 EQASSERT( _pipes.size() > path.pipeIndex );
00159
00160 if( _pipes.size() <= path.pipeIndex )
00161 return 0;
00162
00163 return _pipes[ path.pipeIndex ]->getChannel( path );
00164 }
00165
00166 namespace
00167 {
00168 template< class C, class V >
00169 VisitorResult _accept( C* node, V& visitor )
00170 {
00171 VisitorResult result = visitor.visitPre( node );
00172 if( result != TRAVERSE_CONTINUE )
00173 return result;
00174
00175 const PipeVector& pipes = node->getPipes();
00176 for( PipeVector::const_iterator i = pipes.begin(); i != pipes.end(); ++i )
00177 {
00178 switch( (*i)->accept( visitor ))
00179 {
00180 case TRAVERSE_TERMINATE:
00181 return TRAVERSE_TERMINATE;
00182
00183 case TRAVERSE_PRUNE:
00184 result = TRAVERSE_PRUNE;
00185 break;
00186
00187 case TRAVERSE_CONTINUE:
00188 default:
00189 break;
00190 }
00191 }
00192
00193 switch( visitor.visitPost( node ))
00194 {
00195 case TRAVERSE_TERMINATE:
00196 return TRAVERSE_TERMINATE;
00197
00198 case TRAVERSE_PRUNE:
00199 return TRAVERSE_PRUNE;
00200
00201 case TRAVERSE_CONTINUE:
00202 default:
00203 break;
00204 }
00205
00206 return result;
00207 }
00208 }
00209
00210 VisitorResult Node::accept( NodeVisitor& visitor )
00211 {
00212 return _accept( this, visitor );
00213 }
00214
00215 VisitorResult Node::accept( ConstNodeVisitor& visitor ) const
00216 {
00217 return _accept( this, visitor );
00218 }
00219
00220 void Node::activate()
00221 {
00222 ++_active;
00223 EQLOG( LOG_VIEW ) << "activate: " << _active << std::endl;
00224 }
00225
00226 void Node::deactivate()
00227 {
00228 EQASSERT( _active != 0 );
00229 --_active;
00230 EQLOG( LOG_VIEW ) << "deactivate: " << _active << std::endl;
00231 };
00232
00233
00234
00235
00236
00237
00238
00239
00240
00241 void Node::updateRunning( const uint32_t initID, const uint32_t frameNumber )
00242 {
00243 if( !isActive() && _state == STATE_STOPPED )
00244 return;
00245
00246 if( isActive() && _state != STATE_RUNNING )
00247 {
00248 EQASSERT( _state == STATE_STOPPED );
00249 _configInit( initID, frameNumber );
00250 }
00251
00252
00253 for( PipeVector::const_iterator i = _pipes.begin(); i != _pipes.end(); ++i )
00254 (*i)->updateRunning( initID, frameNumber );
00255
00256 if( !isActive( ))
00257 {
00258 EQASSERT( _state == STATE_RUNNING );
00259 _configExit();
00260 }
00261
00262 flushSendBuffer();
00263 }
00264
00265 bool Node::syncRunning()
00266 {
00267 if( !isActive() && _state == STATE_STOPPED )
00268 return true;
00269
00270
00271 bool success = true;
00272 for( PipeVector::const_iterator i = _pipes.begin(); i != _pipes.end(); ++i )
00273 {
00274 Pipe* pipe = *i;
00275 if( !pipe->syncRunning( ))
00276 {
00277 _error += "pipe " + pipe->getName() + ": '" +
00278 pipe->getErrorMessage() + '\'';
00279 success = false;
00280 }
00281 }
00282
00283 flushSendBuffer();
00284
00285 if( isActive() && _state != STATE_RUNNING && !_syncConfigInit( ))
00286
00287 success = false;
00288
00289 if( !isActive() && !_syncConfigExit( ))
00290
00291 success = false;
00292
00293 EQASSERT( _state == STATE_RUNNING || _state == STATE_STOPPED );
00294 return success;
00295 }
00296
00297
00298
00299
00300 void Node::_configInit( const uint32_t initID, const uint32_t frameNumber )
00301 {
00302 EQASSERT( _state == STATE_STOPPED );
00303 _state = STATE_INITIALIZING;
00304
00305 _flushedFrame = 0;
00306 _finishedFrame = 0;
00307 _frameIDs.clear();
00308
00309 _config->registerObject( this );
00310
00311 EQLOG( LOG_INIT ) << "Create node" << std::endl;
00312 ConfigCreateNodePacket createNodePacket;
00313 createNodePacket.nodeID = getID();
00314 createNodePacket.sessionID = _config->getID();
00315 _node->send( createNodePacket );
00316
00317 EQLOG( LOG_INIT ) << "Init node" << std::endl;
00318 NodeConfigInitPacket packet;
00319 packet.initID = initID;
00320 packet.tasks = _tasks;
00321 packet.frameNumber = frameNumber;
00322
00323 memcpy( packet.iAttributes, _iAttributes,
00324 eq::Node::IATTR_ALL * sizeof( int32_t ));
00325
00326 _send( packet, _name );
00327 }
00328
00329 bool Node::_syncConfigInit()
00330 {
00331 EQASSERT( _state == STATE_INITIALIZING || _state == STATE_INIT_SUCCESS ||
00332 _state == STATE_INIT_FAILED );
00333
00334 _state.waitNE( STATE_INITIALIZING );
00335
00336 const bool success = ( _state == STATE_INIT_SUCCESS );
00337 if( success )
00338 _state = STATE_RUNNING;
00339 else
00340 EQWARN << "Node initialization failed: " << _error << std::endl;
00341
00342 return success;
00343 }
00344
00345
00346
00347
00348 void Node::_configExit()
00349 {
00350 EQASSERT( _state == STATE_RUNNING || _state == STATE_INIT_FAILED );
00351 _state = STATE_EXITING;
00352
00353 EQLOG( LOG_INIT ) << "Exit node" << std::endl;
00354 NodeConfigExitPacket packet;
00355 _send( packet );
00356 flushSendBuffer();
00357
00358 EQLOG( LOG_INIT ) << "Destroy node" << std::endl;
00359 ConfigDestroyNodePacket destroyNodePacket;
00360 destroyNodePacket.nodeID = getID();
00361 destroyNodePacket.sessionID = _config->getID();
00362 _node->send( destroyNodePacket );
00363 }
00364
00365 bool Node::_syncConfigExit()
00366 {
00367 EQASSERT( _state == STATE_EXITING || _state == STATE_EXIT_SUCCESS ||
00368 _state == STATE_EXIT_FAILED );
00369
00370 _state.waitNE( STATE_EXITING );
00371 const bool success = ( _state == STATE_EXIT_SUCCESS );
00372 EQASSERT( success || _state == STATE_EXIT_FAILED );
00373
00374 _config->deregisterObject( this );
00375
00376 _state = STATE_STOPPED;
00377 _tasks = TASK_NONE;
00378 _frameIDs.clear();
00379 _flushBarriers();
00380 return success;
00381 }
00382
00383
00384
00385
00386 void Node::update( const uint32_t frameID, const uint32_t frameNumber )
00387 {
00388 EQVERB << "Start frame " << frameNumber << std::endl;
00389 EQASSERT( _state == STATE_RUNNING );
00390 EQASSERT( _active > 0 );
00391
00392 _frameIDs[ frameNumber ] = frameID;
00393
00394 NodeFrameStartPacket startPacket;
00395 startPacket.frameID = frameID;
00396 startPacket.frameNumber = frameNumber;
00397 _send( startPacket );
00398 EQLOG( LOG_TASKS ) << "TASK node start frame " << &startPacket << std::endl;
00399
00400 for( PipeVector::const_iterator i = _pipes.begin(); i != _pipes.end(); ++i )
00401 {
00402 Pipe* pipe = *i;
00403 if( pipe->isActive( ))
00404 pipe->update( frameID, frameNumber );
00405 }
00406
00407 NodeFrameTasksFinishPacket finishPacket;
00408 finishPacket.frameID = frameID;
00409 finishPacket.frameNumber = frameNumber;
00410 _send( finishPacket );
00411 EQLOG( LOG_TASKS ) << "TASK node tasks finish " << &finishPacket
00412 << std::endl;
00413
00414 _finish( frameNumber );
00415
00416 flushSendBuffer();
00417 _lastDrawPipe = 0;
00418 }
00419
00420 uint32_t Node::_getFinishLatency() const
00421 {
00422 switch( getIAttribute( eq::Node::IATTR_THREAD_MODEL ))
00423 {
00424 case DRAW_SYNC:
00425 if( _tasks & TASK_DRAW )
00426 {
00427
00428
00429 const Config* config = getConfig();
00430 const uint32_t latency = config->getLatency();
00431
00432 return EQ_MIN( latency, 1 );
00433 }
00434 break;
00435
00436 case LOCAL_SYNC:
00437 if( _tasks != TASK_NONE )
00438
00439 return 0;
00440 break;
00441
00442 case ASYNC:
00443 break;
00444 default:
00445 EQUNIMPLEMENTED;
00446 }
00447
00448 const Config* config = getConfig();
00449 return config->getLatency();
00450 }
00451
00452 void Node::_finish( const uint32_t currentFrame )
00453 {
00454 for( PipeVector::const_iterator i = _pipes.begin(); i != _pipes.end(); ++i )
00455 {
00456 const Pipe* pipe = *i;
00457 if( pipe->getIAttribute( Pipe::IATTR_HINT_THREAD ))
00458 {
00459 const uint32_t latency = _getFinishLatency();
00460 if( currentFrame > latency )
00461 flushFrames( currentFrame - latency );
00462 return;
00463 }
00464 }
00465
00466
00467 flushFrames( currentFrame );
00468 }
00469
00470 void Node::flushFrames( const uint32_t frameNumber )
00471 {
00472 EQVERB << "Flush frames including " << frameNumber << std::endl;
00473
00474 while( _flushedFrame < frameNumber )
00475 {
00476 ++_flushedFrame;
00477 _sendFrameFinish( _flushedFrame );
00478 }
00479
00480 flushSendBuffer();
00481 }
00482
00483 void Node::_sendFrameFinish( const uint32_t frameNumber )
00484 {
00485 if( _frameIDs.find( frameNumber ) == _frameIDs.end( ))
00486 return;
00487
00488 NodeFrameFinishPacket packet;
00489 packet.frameID = _frameIDs[ frameNumber ];
00490 packet.frameNumber = frameNumber;
00491
00492 _send( packet );
00493 _frameIDs.erase( frameNumber );
00494 EQLOG( LOG_TASKS ) << "TASK node finish frame " << &packet << std::endl;
00495 }
00496
00497
00498
00499
00500 net::Barrier* Node::getBarrier()
00501 {
00502 if( _barriers.empty() )
00503 {
00504 net::Barrier* barrier = new net::Barrier( _node );
00505 _config->registerObject( barrier );
00506 barrier->setAutoObsolete( getConfig()->getLatency()+1,
00507 Object::AUTO_OBSOLETE_COUNT_VERSIONS );
00508
00509 return barrier;
00510 }
00511
00512 net::Barrier* barrier = _barriers.back();
00513 _barriers.pop_back();
00514 barrier->setHeight(0);
00515 return barrier;
00516 }
00517
00518 void Node::releaseBarrier( net::Barrier* barrier )
00519 {
00520 _barriers.push_back( barrier );
00521 }
00522
00523 void Node::_flushBarriers()
00524 {
00525 for( std::vector< net::Barrier* >::const_iterator i =_barriers.begin();
00526 i != _barriers.end(); ++ i )
00527 {
00528 net::Barrier* barrier = *i;
00529 _config->deregisterObject( barrier );
00530 delete barrier;
00531 }
00532 _barriers.clear();
00533 }
00534
00535 void Node::flushSendBuffer()
00536 {
00537 _bufferedTasks.sendBuffer( _node->getConnection( ));
00538 }
00539
00540
00541
00542
00543 net::CommandResult Node::_cmdConfigInitReply( net::Command& command )
00544 {
00545 const NodeConfigInitReplyPacket* packet =
00546 command.getPacket<NodeConfigInitReplyPacket>();
00547 EQVERB << "handle configInit reply " << packet << std::endl;
00548 EQASSERT( _state == STATE_INITIALIZING );
00549
00550 _error = packet->error;
00551 _state = packet->result ? STATE_INIT_SUCCESS : STATE_INIT_FAILED;
00552 memcpy( _iAttributes, packet->iAttributes,
00553 eq::Node::IATTR_ALL * sizeof( int32_t ));
00554
00555 return net::COMMAND_HANDLED;
00556 }
00557
00558 net::CommandResult Node::_cmdConfigExitReply( net::Command& command )
00559 {
00560 const NodeConfigExitReplyPacket* packet =
00561 command.getPacket<NodeConfigExitReplyPacket>();
00562 EQVERB << "handle configExit reply " << packet << std::endl;
00563 EQASSERT( _state == STATE_EXITING );
00564
00565 _state = packet->result ? STATE_EXIT_SUCCESS : STATE_EXIT_FAILED;
00566 return net::COMMAND_HANDLED;
00567 }
00568
00569 net::CommandResult Node::_cmdFrameFinishReply( net::Command& command )
00570 {
00571 const NodeFrameFinishReplyPacket* packet =
00572 command.getPacket<NodeFrameFinishReplyPacket>();
00573 EQVERB << "handle frame finish reply " << packet << std::endl;
00574
00575 _finishedFrame = packet->frameNumber;
00576 _config->notifyNodeFrameFinished( packet->frameNumber );
00577
00578 return net::COMMAND_HANDLED;
00579 }
00580
00581 std::ostream& operator << ( std::ostream& os, const Node* node )
00582 {
00583 if( !node )
00584 return os;
00585
00586 os << base::disableFlush << base::disableHeader;
00587 const Config* config = node->getConfig();
00588 if( config && config->isApplicationNode( node ))
00589 os << "appNode" << std::endl;
00590 else
00591 os << "node" << std::endl;
00592
00593 os << "{" << std::endl << base::indent;
00594
00595 const std::string& name = node->getName();
00596 if( !name.empty( ))
00597 os << "name \"" << name << "\"" << std::endl;
00598
00599 const ConnectionDescriptionVector& descriptions =
00600 node->getConnectionDescriptions();
00601 for( ConnectionDescriptionVector::const_iterator i = descriptions.begin();
00602 i != descriptions.end(); ++i )
00603
00604 os << (*i).get();
00605
00606 bool attrPrinted = false;
00607
00608 for( eq::Node::IAttribute i = static_cast<eq::Node::IAttribute>( 0 );
00609 i<eq::Node::IATTR_ALL;
00610 i = static_cast<eq::Node::IAttribute>( static_cast<uint32_t>( i )+1))
00611 {
00612 const int value = node->getIAttribute( i );
00613 if( value == Global::instance()->getNodeIAttribute( i ))
00614 continue;
00615
00616 if( !attrPrinted )
00617 {
00618 os << std::endl << "attributes" << std::endl;
00619 os << "{" << std::endl << base::indent;
00620 attrPrinted = true;
00621 }
00622
00623 os << ( i==eq::Node::IATTR_THREAD_MODEL ?
00624 "thread_model " :
00625 "ERROR" )
00626 << static_cast<IAttrValue>( value ) << std::endl;
00627 }
00628
00629 if( attrPrinted )
00630 os << base::exdent << "}" << std::endl << std::endl;
00631
00632 const PipeVector& pipes = node->getPipes();
00633 for( PipeVector::const_iterator i = pipes.begin(); i != pipes.end(); ++i )
00634 os << *i;
00635
00636 os << base::exdent << "}" << std::endl
00637 << base::enableHeader << base::enableFlush;
00638 return os;
00639 }
00640
00641 }
00642 }