server/node.cpp

00001 
00002 /* Copyright (c) 2005-2009, Stefan Eilemann <eile@equalizergraphics.com> 
00003  *
00004  * This library is free software; you can redistribute it and/or modify it under
00005  * the terms of the GNU Lesser General Public License version 2.1 as published
00006  * by the Free Software Foundation.
00007  *  
00008  * This library is distributed in the hope that it will be useful, but WITHOUT
00009  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00010  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00011  * details.
00012  * 
00013  * You should have received a copy of the GNU Lesser General Public License
00014  * along with this library; if not, write to the Free Software Foundation, Inc.,
00015  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00016  */
00017 
00018 #include <pthread.h>
00019 #include "node.h"
00020 
00021 #include "channel.h"
00022 #include "config.h"
00023 #include "global.h"
00024 #include "log.h"
00025 #include "nodeVisitor.h"
00026 #include "paths.h"
00027 #include "pipe.h"
00028 #include "server.h"
00029 #include "window.h"
00030 
00031 #include <eq/net/barrier.h>
00032 #include <eq/net/command.h>
00033 #include <eq/client/packets.h>
00034 
00035 namespace eq
00036 {
00037 namespace server
00038 {
00039 typedef net::CommandFunc<Node> NodeFunc;
00040 
00041 void Node::_construct()
00042 {
00043     _active         = 0;
00044     _config         = 0;
00045     _tasks          = TASK_NONE;
00046     _lastDrawPipe   = 0;
00047     _flushedFrame   = 0;
00048     _finishedFrame  = 0;
00049     EQINFO << "New node @" << (void*)this << std::endl;
00050 }
00051 
00052 Node::Node()
00053 {
00054     _construct();
00055 
00056     const Global* global = Global::instance();    
00057     for( int i=0; i < eq::Node::IATTR_ALL; ++i )
00058         _iAttributes[i] =global->getNodeIAttribute((eq::Node::IAttribute)i);
00059 }
00060 
00061 Node::Node( const Node& from, Config* config )
00062         : net::Object()
00063 {
00064     _construct();
00065 
00066     _name = from._name;
00067     _node = from._node;
00068 
00069     config->addNode( this );
00070 
00071     memcpy( _iAttributes, from._iAttributes, 
00072             eq::Node::IATTR_ALL * sizeof( int32_t ));
00073 
00074     const ConnectionDescriptionVector& descriptions =
00075         from.getConnectionDescriptions();
00076     for( ConnectionDescriptionVector::const_iterator i = descriptions.begin();
00077          i != descriptions.end(); ++i )
00078     {
00079         const ConnectionDescriptionPtr desc = *i;
00080         addConnectionDescription( new ConnectionDescription( *desc ));
00081     }
00082 
00083     const PipeVector& pipes = from.getPipes();
00084     for( PipeVector::const_iterator i = pipes.begin(); i != pipes.end(); ++i )
00085         new Pipe( **i, this );
00086 }
00087 
00088 Node::~Node()
00089 {
00090     EQINFO << "Delete node @" << (void*)this << std::endl;
00091 
00092     if( _config )
00093         _config->removeNode( this );
00094     
00095     for( PipeVector::const_iterator i = _pipes.begin(); i != _pipes.end(); ++i )
00096     {
00097         Pipe* pipe = *i;
00098 
00099         pipe->_node = 0;
00100         delete pipe;
00101     }
00102     _pipes.clear();
00103 }
00104 
00105 void Node::attachToSession( const uint32_t id, const uint32_t instanceID, 
00106                                net::Session* session )
00107 {
00108     net::Object::attachToSession( id, instanceID, session );
00109     
00110     net::CommandQueue* queue = getCommandThreadQueue();
00111 
00112     registerCommand( CMD_NODE_CONFIG_INIT_REPLY, 
00113                      NodeFunc( this, &Node::_cmdConfigInitReply ), queue );
00114     registerCommand( CMD_NODE_CONFIG_EXIT_REPLY, 
00115                      NodeFunc( this, &Node::_cmdConfigExitReply ), queue );
00116     registerCommand( CMD_NODE_FRAME_FINISH_REPLY,
00117                      NodeFunc( this, &Node::_cmdFrameFinishReply ), queue );
00118 }
00119 
00120 void Node::addPipe( Pipe* pipe )
00121 {
00122     EQASSERT( pipe->getWindows().empty( ));
00123 
00124     _pipes.push_back( pipe ); 
00125     pipe->_node = this;
00126 }
00127 
00128 bool Node::removePipe( Pipe* pipe )
00129 {
00130     EQASSERT( pipe->getWindows().empty( ));
00131 
00132     PipeVector::iterator i = find( _pipes.begin(), _pipes.end(), pipe );
00133     if( i == _pipes.end( ))
00134         return false;
00135 
00136     _pipes.erase( i );
00137     pipe->_node = 0; 
00138 
00139     return true;
00140 }
00141 
00142 NodePath Node::getPath() const
00143 {
00144     EQASSERT( _config );
00145     
00146     const NodeVector&      nodes = _config->getNodes();
00147     NodeVector::const_iterator i = std::find( nodes.begin(), nodes.end(),
00148                                               this );
00149     EQASSERT( i != nodes.end( ));
00150 
00151     NodePath path;
00152     path.nodeIndex = std::distance( nodes.begin(), i );
00153     return path;
00154 }
00155 
00156 Channel* Node::getChannel( const ChannelPath& path )
00157 {
00158     EQASSERT( _pipes.size() > path.pipeIndex );
00159 
00160     if( _pipes.size() <= path.pipeIndex )
00161         return 0;
00162 
00163     return _pipes[ path.pipeIndex ]->getChannel( path );
00164 }
00165 
00166 namespace
00167 {
00168 template< class C, class V >
00169 VisitorResult _accept( C* node, V& visitor )
00170 { 
00171     VisitorResult result = visitor.visitPre( node );
00172     if( result != TRAVERSE_CONTINUE )
00173         return result;
00174 
00175     const PipeVector& pipes = node->getPipes();
00176     for( PipeVector::const_iterator i = pipes.begin(); i != pipes.end(); ++i )
00177     {
00178         switch( (*i)->accept( visitor ))
00179         {
00180             case TRAVERSE_TERMINATE:
00181                 return TRAVERSE_TERMINATE;
00182 
00183             case TRAVERSE_PRUNE:
00184                 result = TRAVERSE_PRUNE;
00185                 break;
00186                 
00187             case TRAVERSE_CONTINUE:
00188             default:
00189                 break;
00190         }
00191     }
00192 
00193     switch( visitor.visitPost( node ))
00194     {
00195         case TRAVERSE_TERMINATE:
00196             return TRAVERSE_TERMINATE;
00197 
00198         case TRAVERSE_PRUNE:
00199             return TRAVERSE_PRUNE;
00200                 
00201         case TRAVERSE_CONTINUE:
00202         default:
00203             break;
00204     }
00205 
00206     return result;
00207 }
00208 }
00209 
00210 VisitorResult Node::accept( NodeVisitor& visitor )
00211 {
00212     return _accept( this, visitor );
00213 }
00214 
00215 VisitorResult Node::accept( ConstNodeVisitor& visitor ) const
00216 {
00217     return _accept( this, visitor );
00218 }
00219 
00220 void Node::activate()
00221 {   
00222     ++_active;
00223     EQLOG( LOG_VIEW ) << "activate: " << _active << std::endl;
00224 }
00225 
00226 void Node::deactivate()
00227 { 
00228     EQASSERT( _active != 0 );
00229     --_active; 
00230     EQLOG( LOG_VIEW ) << "deactivate: " << _active << std::endl;
00231 };
00232 
00233 //===========================================================================
00234 // Operations
00235 //===========================================================================
00236 
00237 //---------------------------------------------------------------------------
00238 // update running entities (init/exit)
00239 //---------------------------------------------------------------------------
00240 
00241 void Node::updateRunning( const uint32_t initID, const uint32_t frameNumber )
00242 {
00243     if( !isActive() && _state == STATE_STOPPED ) // inactive
00244         return;
00245 
00246     if( isActive() && _state != STATE_RUNNING ) // becoming active
00247     {
00248         EQASSERT( _state == STATE_STOPPED );
00249         _configInit( initID, frameNumber );
00250     }
00251 
00252     // Let all running pipes update their running state (incl. children)
00253     for( PipeVector::const_iterator i = _pipes.begin(); i != _pipes.end(); ++i )
00254         (*i)->updateRunning( initID, frameNumber );
00255 
00256     if( !isActive( )) // becoming inactive
00257     {
00258         EQASSERT( _state == STATE_RUNNING );
00259         _configExit();
00260     }
00261 
00262     flushSendBuffer();
00263 }
00264 
00265 bool Node::syncRunning()
00266 {
00267     if( !isActive() && _state == STATE_STOPPED ) // inactive
00268         return true;
00269 
00270     // Sync state updates
00271     bool success = true;
00272     for( PipeVector::const_iterator i = _pipes.begin(); i != _pipes.end(); ++i )
00273     {
00274         Pipe* pipe = *i;
00275         if( !pipe->syncRunning( ))
00276         {
00277             _error += "pipe " + pipe->getName() + ": '" + 
00278                       pipe->getErrorMessage() + '\'';
00279             success = false;
00280         }
00281     }
00282 
00283     flushSendBuffer();
00284 
00285     if( isActive() && _state != STATE_RUNNING && !_syncConfigInit( ))
00286         // becoming active
00287         success = false;
00288 
00289     if( !isActive() && !_syncConfigExit( ))
00290         // becoming inactive
00291         success = false;
00292 
00293     EQASSERT( _state == STATE_RUNNING || _state == STATE_STOPPED );
00294     return success;
00295 }
00296 
00297 //---------------------------------------------------------------------------
00298 // init
00299 //---------------------------------------------------------------------------
00300 void Node::_configInit( const uint32_t initID, const uint32_t frameNumber )
00301 {
00302     EQASSERT( _state == STATE_STOPPED );
00303     _state = STATE_INITIALIZING;
00304 
00305     _flushedFrame  = 0;
00306     _finishedFrame = 0;
00307     _frameIDs.clear();
00308 
00309     _config->registerObject( this );
00310 
00311     EQLOG( LOG_INIT ) << "Create node" << std::endl;
00312     ConfigCreateNodePacket createNodePacket;
00313     createNodePacket.nodeID = getID();
00314     createNodePacket.sessionID = _config->getID();
00315     _node->send( createNodePacket );
00316 
00317     EQLOG( LOG_INIT ) << "Init node" << std::endl;
00318     NodeConfigInitPacket packet;
00319     packet.initID      = initID;
00320     packet.tasks       = _tasks;
00321     packet.frameNumber = frameNumber;
00322 
00323     memcpy( packet.iAttributes, _iAttributes, 
00324             eq::Node::IATTR_ALL * sizeof( int32_t ));
00325 
00326     _send( packet, _name );
00327 }
00328 
00329 bool Node::_syncConfigInit()
00330 {
00331     EQASSERT( _state == STATE_INITIALIZING || _state == STATE_INIT_SUCCESS ||
00332               _state == STATE_INIT_FAILED );
00333 
00334     _state.waitNE( STATE_INITIALIZING );
00335 
00336     const bool success = ( _state == STATE_INIT_SUCCESS );
00337     if( success )
00338         _state = STATE_RUNNING;
00339     else
00340         EQWARN << "Node initialization failed: " << _error << std::endl;
00341 
00342     return success;
00343 }
00344 
00345 //---------------------------------------------------------------------------
00346 // exit
00347 //---------------------------------------------------------------------------
00348 void Node::_configExit()
00349 {
00350     EQASSERT( _state == STATE_RUNNING || _state == STATE_INIT_FAILED );
00351     _state = STATE_EXITING;
00352 
00353     EQLOG( LOG_INIT ) << "Exit node" << std::endl;
00354     NodeConfigExitPacket packet;
00355     _send( packet );
00356     flushSendBuffer();
00357 
00358     EQLOG( LOG_INIT ) << "Destroy node" << std::endl;
00359     ConfigDestroyNodePacket destroyNodePacket;
00360     destroyNodePacket.nodeID = getID();
00361     destroyNodePacket.sessionID = _config->getID();
00362     _node->send( destroyNodePacket );
00363 }
00364 
00365 bool Node::_syncConfigExit()
00366 {
00367     EQASSERT( _state == STATE_EXITING || _state == STATE_EXIT_SUCCESS || 
00368               _state == STATE_EXIT_FAILED );
00369     
00370     _state.waitNE( STATE_EXITING );
00371     const bool success = ( _state == STATE_EXIT_SUCCESS );
00372     EQASSERT( success || _state == STATE_EXIT_FAILED );
00373 
00374     _config->deregisterObject( this );
00375 
00376     _state = STATE_STOPPED; // EXIT_FAILED -> STOPPED transition
00377     _tasks = TASK_NONE;
00378     _frameIDs.clear();
00379     _flushBarriers();
00380     return success;
00381 }
00382 
00383 //---------------------------------------------------------------------------
00384 // update
00385 //---------------------------------------------------------------------------
00386 void Node::update( const uint32_t frameID, const uint32_t frameNumber )
00387 {
00388     EQVERB << "Start frame " << frameNumber << std::endl;
00389     EQASSERT( _state == STATE_RUNNING );
00390     EQASSERT( _active > 0 );
00391 
00392     _frameIDs[ frameNumber ] = frameID;
00393     
00394     NodeFrameStartPacket startPacket;
00395     startPacket.frameID     = frameID;
00396     startPacket.frameNumber = frameNumber;
00397     _send( startPacket );
00398     EQLOG( LOG_TASKS ) << "TASK node start frame " << &startPacket << std::endl;
00399 
00400     for( PipeVector::const_iterator i = _pipes.begin(); i != _pipes.end(); ++i )
00401     {
00402         Pipe* pipe = *i;
00403         if( pipe->isActive( ))
00404             pipe->update( frameID, frameNumber );
00405     }
00406 
00407     NodeFrameTasksFinishPacket finishPacket;
00408     finishPacket.frameID     = frameID;
00409     finishPacket.frameNumber = frameNumber;
00410     _send( finishPacket );
00411     EQLOG( LOG_TASKS ) << "TASK node tasks finish " << &finishPacket
00412                            << std::endl;
00413 
00414     _finish( frameNumber );
00415 
00416     flushSendBuffer();
00417     _lastDrawPipe = 0;
00418 }
00419 
00420 uint32_t Node::_getFinishLatency() const
00421 {
00422     switch( getIAttribute( eq::Node::IATTR_THREAD_MODEL ))
00423     {
00424         case DRAW_SYNC:
00425             if( _tasks & TASK_DRAW )
00426             {
00427                 // More than one frame latency doesn't make sense, since the
00428                 // draw sync for frame+1 does not allow for more
00429                 const Config* config = getConfig();
00430                 const uint32_t latency = config->getLatency();
00431 
00432                 return EQ_MIN( latency, 1 );
00433             }
00434             break;
00435 
00436         case LOCAL_SYNC:
00437             if( _tasks != TASK_NONE )
00438                 // local sync enforces no latency
00439                 return 0;
00440             break;
00441 
00442         case ASYNC:
00443             break;
00444         default:
00445             EQUNIMPLEMENTED;
00446     }
00447 
00448     const Config* config = getConfig();
00449     return config->getLatency();
00450 }
00451 
00452 void Node::_finish( const uint32_t currentFrame )
00453 {
00454     for( PipeVector::const_iterator i = _pipes.begin(); i != _pipes.end(); ++i )
00455     {
00456         const Pipe* pipe = *i;
00457         if( pipe->getIAttribute( Pipe::IATTR_HINT_THREAD ))
00458         {
00459             const uint32_t latency = _getFinishLatency();
00460             if( currentFrame > latency )
00461                 flushFrames( currentFrame - latency );
00462             return;
00463         }
00464     }
00465 
00466     // else only non-threaded pipes, all local tasks are done, send finish now.
00467     flushFrames( currentFrame );
00468 }
00469 
00470 void Node::flushFrames( const uint32_t frameNumber )
00471 {
00472     EQVERB << "Flush frames including " << frameNumber << std::endl;
00473 
00474     while( _flushedFrame < frameNumber )
00475     {
00476         ++_flushedFrame;
00477         _sendFrameFinish( _flushedFrame );
00478     }
00479 
00480     flushSendBuffer();
00481 }
00482 
00483 void Node::_sendFrameFinish( const uint32_t frameNumber )
00484 {
00485     if( _frameIDs.find( frameNumber ) == _frameIDs.end( ))
00486         return; // finish already send
00487 
00488     NodeFrameFinishPacket packet;
00489     packet.frameID     = _frameIDs[ frameNumber ];
00490     packet.frameNumber = frameNumber;
00491 
00492     _send( packet );
00493     _frameIDs.erase( frameNumber );
00494     EQLOG( LOG_TASKS ) << "TASK node finish frame  " << &packet << std::endl;
00495 }
00496 
00497 //---------------------------------------------------------------------------
00498 // Barrier cache
00499 //---------------------------------------------------------------------------
00500 net::Barrier* Node::getBarrier()
00501 {
00502     if( _barriers.empty() )
00503     {
00504         net::Barrier* barrier = new net::Barrier( _node );
00505         _config->registerObject( barrier );
00506         barrier->setAutoObsolete( getConfig()->getLatency()+1, 
00507                                   Object::AUTO_OBSOLETE_COUNT_VERSIONS );
00508         
00509         return barrier;
00510     }
00511 
00512     net::Barrier* barrier = _barriers.back();
00513     _barriers.pop_back();
00514     barrier->setHeight(0);
00515     return barrier;
00516 }
00517 
00518 void Node::releaseBarrier( net::Barrier* barrier )
00519 {
00520     _barriers.push_back( barrier );
00521 }
00522 
00523 void Node::_flushBarriers()
00524 {
00525     for( std::vector< net::Barrier* >::const_iterator i =_barriers.begin(); 
00526          i != _barriers.end(); ++ i )
00527     {
00528         net::Barrier* barrier = *i;
00529         _config->deregisterObject( barrier );
00530         delete barrier;
00531     }
00532     _barriers.clear();
00533 }
00534 
00535 void Node::flushSendBuffer()
00536 {
00537     _bufferedTasks.sendBuffer( _node->getConnection( ));
00538 }
00539 
00540 //===========================================================================
00541 // command handling
00542 //===========================================================================
00543 net::CommandResult Node::_cmdConfigInitReply( net::Command& command )
00544 {
00545     const NodeConfigInitReplyPacket* packet = 
00546         command.getPacket<NodeConfigInitReplyPacket>();
00547     EQVERB << "handle configInit reply " << packet << std::endl;
00548     EQASSERT( _state == STATE_INITIALIZING );
00549 
00550     _error = packet->error;
00551     _state = packet->result ? STATE_INIT_SUCCESS : STATE_INIT_FAILED;
00552     memcpy( _iAttributes, packet->iAttributes, 
00553             eq::Node::IATTR_ALL * sizeof( int32_t ));
00554 
00555     return net::COMMAND_HANDLED;
00556 }
00557 
00558 net::CommandResult Node::_cmdConfigExitReply( net::Command& command )
00559 {
00560     const NodeConfigExitReplyPacket* packet =
00561         command.getPacket<NodeConfigExitReplyPacket>();
00562     EQVERB << "handle configExit reply " << packet << std::endl;
00563     EQASSERT( _state == STATE_EXITING );
00564 
00565     _state = packet->result ? STATE_EXIT_SUCCESS : STATE_EXIT_FAILED;
00566     return net::COMMAND_HANDLED;
00567 }
00568 
00569 net::CommandResult Node::_cmdFrameFinishReply( net::Command& command )
00570 {
00571     const NodeFrameFinishReplyPacket* packet = 
00572         command.getPacket<NodeFrameFinishReplyPacket>();
00573     EQVERB << "handle frame finish reply " << packet << std::endl;
00574     
00575     _finishedFrame = packet->frameNumber;
00576     _config->notifyNodeFrameFinished( packet->frameNumber );
00577 
00578     return net::COMMAND_HANDLED;
00579 }
00580 
00581 std::ostream& operator << ( std::ostream& os, const Node* node )
00582 {
00583     if( !node )
00584         return os;
00585     
00586     os << base::disableFlush << base::disableHeader;
00587     const Config* config = node->getConfig();
00588     if( config && config->isApplicationNode( node ))
00589         os << "appNode" << std::endl;
00590     else
00591         os << "node" << std::endl;
00592 
00593     os << "{" << std::endl << base::indent;
00594 
00595     const std::string& name = node->getName();
00596     if( !name.empty( ))
00597         os << "name     \"" << name << "\"" << std::endl;
00598 
00599     const ConnectionDescriptionVector& descriptions = 
00600         node->getConnectionDescriptions();
00601     for( ConnectionDescriptionVector::const_iterator i = descriptions.begin();
00602          i != descriptions.end(); ++i )
00603 
00604         os << (*i).get();
00605 
00606     bool attrPrinted   = false;
00607     
00608     for( eq::Node::IAttribute i = static_cast<eq::Node::IAttribute>( 0 );
00609          i<eq::Node::IATTR_ALL; 
00610          i = static_cast<eq::Node::IAttribute>( static_cast<uint32_t>( i )+1))
00611     {
00612         const int value = node->getIAttribute( i );
00613         if( value == Global::instance()->getNodeIAttribute( i ))
00614             continue;
00615 
00616         if( !attrPrinted )
00617         {
00618             os << std::endl << "attributes" << std::endl;
00619             os << "{" << std::endl << base::indent;
00620             attrPrinted = true;
00621         }
00622         
00623         os << ( i==eq::Node::IATTR_THREAD_MODEL ?
00624                     "thread_model       " :
00625                 "ERROR" )
00626            << static_cast<IAttrValue>( value ) << std::endl;
00627     }
00628     
00629     if( attrPrinted )
00630         os << base::exdent << "}" << std::endl << std::endl;
00631 
00632     const PipeVector& pipes = node->getPipes();
00633     for( PipeVector::const_iterator i = pipes.begin(); i != pipes.end(); ++i )
00634         os << *i;
00635 
00636     os << base::exdent << "}" << std::endl
00637        << base::enableHeader << base::enableFlush;
00638     return os;
00639 }
00640 
00641 }
00642 }
Generated on Mon Aug 10 18:58:40 2009 for Equalizer 0.9 by  doxygen 1.5.8