server/config.cpp

00001 
00002 /* Copyright (c) 2005-2009, Stefan Eilemann <eile@equalizergraphics.com> 
00003  *
00004  * This library is free software; you can redistribute it and/or modify it under
00005  * the terms of the GNU Lesser General Public License version 2.1 as published
00006  * by the Free Software Foundation.
00007  *  
00008  * This library is distributed in the hope that it will be useful, but WITHOUT
00009  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00010  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00011  * details.
00012  * 
00013  * You should have received a copy of the GNU Lesser General Public License
00014  * along with this library; if not, write to the Free Software Foundation, Inc.,
00015  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00016  */
00017 
00018 #include <pthread.h>
00019 #include "config.h"
00020 
00021 #include "canvas.h"
00022 #include "compound.h"
00023 #include "compoundVisitor.h"
00024 #include "configUpdateDataVisitor.h"
00025 #include "configSerializer.h"
00026 #include "constCompoundVisitor.h"
00027 #include "equalizers/equalizer.h"
00028 #include "global.h"
00029 #include "idFinder.h"
00030 #include "layout.h"
00031 #include "log.h"
00032 #include "nameFinder.h"
00033 #include "node.h"
00034 #include "observer.h"
00035 #include "paths.h"
00036 #include "segment.h"
00037 #include "server.h"
00038 #include "view.h"
00039 #include "window.h"
00040 
00041 #include <eq/client/configEvent.h>
00042 #include <eq/net/command.h>
00043 #include <eq/net/global.h>
00044 #include <eq/base/sleep.h>
00045 
00046 #include "configSyncVisitor.h"
00047 #include "configUnmapVisitor.h"
00048 
00049 using namespace eq::base;
00050 using namespace std;
00051 using eq::net::ConnectionDescriptionVector;
00052 
00053 namespace eq
00054 {
00055 namespace server
00056 {
00057 typedef net::CommandFunc<Config> ConfigFunc;
00058 
00059 #define MAKE_ATTR_STRING( attr ) ( string("EQ_CONFIG_") + #attr )
00060 std::string Config::_fAttributeStrings[FATTR_ALL] = 
00061 {
00062     MAKE_ATTR_STRING( FATTR_EYE_BASE ),
00063     MAKE_ATTR_STRING( FATTR_VERSION ),
00064     MAKE_ATTR_STRING( FATTR_FILL1 ),
00065     MAKE_ATTR_STRING( FATTR_FILL2 )
00066 };
00067 
00068 void Config::_construct()
00069 {
00070     _latency       = 1;
00071     _currentFrame  = 0;
00072     _finishedFrame = 0;
00073     _state         = STATE_STOPPED;
00074     _appNode       = 0;
00075     _serializer    = 0;
00076     base::Log::setClock( &_clock );
00077 
00078     EQINFO << "New config @" << (void*)this << endl;
00079 }
00080 
00081 Config::Config()
00082 {
00083     _construct();
00084     const Global* global = Global::instance();    
00085     for( int i=0; i<FATTR_ALL; ++i )
00086         _fAttributes[i] = global->getConfigFAttribute( (FAttribute)i );
00087 }
00088 
00089 Config::Config( const Config& from )
00090         : net::Session()
00091         , _name( from._name )
00092         , _server( from._server )
00093 {
00094     _construct();
00095     _appNetNode = from._appNetNode;
00096     _latency    = from._latency;
00097     for( int i=0; i<FATTR_ALL; ++i )
00098         _fAttributes[i] = from.getFAttribute( (FAttribute)i );
00099 
00100     const NodeVector& nodes = from.getNodes();
00101     for( NodeVector::const_iterator i = nodes.begin(); i != nodes.end(); ++i )
00102     {
00103         const Node* node      = *i;
00104         Node*       nodeClone = new Node( *node, this );
00105         
00106         if( node == from._appNode )
00107             _appNode = nodeClone;
00108     }
00109 
00110     const ObserverVector& observers = from.getObservers();
00111     for( ObserverVector::const_iterator i = observers.begin(); 
00112          i != observers.end(); ++i )
00113     {
00114         new Observer( **i, this );
00115     }
00116 
00117     const LayoutVector& layouts = from.getLayouts();
00118     for( LayoutVector::const_iterator i = layouts.begin(); 
00119          i != layouts.end(); ++i )
00120     {
00121         new Layout( **i, this );
00122     }
00123 
00124     const CanvasVector& canvases = from.getCanvases();
00125     for( CanvasVector::const_iterator i = canvases.begin(); 
00126          i != canvases.end(); ++i )
00127     {
00128         new Canvas( **i, this );
00129     }
00130 
00131     const CompoundVector& compounds = from.getCompounds();
00132     for( CompoundVector::const_iterator i = compounds.begin(); 
00133          i != compounds.end(); ++i )
00134     {
00135         new Compound( **i, this, 0 );
00136     }
00137 }
00138 
00139 Config::~Config()
00140 {
00141     EQINFO << "Delete config @" << (void*)this << endl;
00142     _server     = 0;
00143     _appNode    = 0;
00144     _appNetNode = 0;
00145 
00146     for( CompoundVector::const_iterator i = _compounds.begin(); 
00147          i != _compounds.end(); ++i )
00148     {
00149         Compound* compound = *i;
00150         compound->_config = 0;
00151         delete compound;
00152     }
00153     _compounds.clear();
00154 
00155     for( CanvasVector::const_iterator i = _canvases.begin(); 
00156          i != _canvases.end(); ++i )
00157     {
00158         Canvas* canvas = *i;
00159 
00160         canvas->_config = 0;
00161         delete canvas;
00162     }
00163     _canvases.clear();
00164 
00165     for( LayoutVector::const_iterator i = _layouts.begin(); 
00166          i != _layouts.end(); ++i )
00167     {
00168         Layout* layout = *i;
00169 
00170         layout->_config = 0;
00171         delete layout;
00172     }
00173     _layouts.clear();
00174 
00175     for( ObserverVector::const_iterator i = _observers.begin(); 
00176          i != _observers.end(); ++i )
00177     {
00178         Observer* observer = *i;
00179 
00180         observer->_config = 0;
00181         delete observer;
00182     }
00183     _observers.clear();
00184 
00185     for( NodeVector::const_iterator i = _nodes.begin(); i != _nodes.end(); ++i )
00186     {
00187         Node* node = *i;
00188 
00189         node->_config = 0;
00190         delete node;
00191     }
00192     _nodes.clear();
00193 
00194     base::Log::setClock( 0 );
00195 }
00196 
00197 void Config::notifyMapped( net::NodePtr node )
00198 {
00199     net::Session::notifyMapped( node );
00200 
00201     net::CommandQueue* serverQueue  = getServerThreadQueue();
00202     net::CommandQueue* commandQueue = getCommandThreadQueue();
00203 
00204     registerCommand( eq::CMD_CONFIG_INIT,
00205                      ConfigFunc( this, &Config::_cmdInit), serverQueue );
00206     registerCommand( eq::CMD_CONFIG_EXIT,
00207                      ConfigFunc( this, &Config::_cmdExit ), serverQueue );
00208     registerCommand( eq::CMD_CONFIG_CREATE_REPLY,
00209                      ConfigFunc( this, &Config::_cmdCreateReply ),
00210                      commandQueue );
00211     registerCommand( eq::CMD_CONFIG_START_FRAME, 
00212                      ConfigFunc( this, &Config::_cmdStartFrame ), serverQueue );
00213     registerCommand( eq::CMD_CONFIG_FINISH_ALL_FRAMES, 
00214                      ConfigFunc( this, &Config::_cmdFinishAllFrames ),
00215                      serverQueue );
00216     registerCommand( eq::CMD_CONFIG_FREEZE_LOAD_BALANCING, 
00217                      ConfigFunc( this, &Config::_cmdFreezeLoadBalancing ), 
00218                      serverQueue );
00219     registerCommand( eq::CMD_CONFIG_UNMAP_REPLY,
00220                      ConfigFunc( this, &Config::_cmdUnmapReply ), 
00221                      commandQueue );
00222 }
00223 
00224 void Config::addNode( Node* node )
00225 {
00226     _nodes.push_back( node ); 
00227     
00228     node->_config = this; 
00229 }
00230 
00231 bool Config::removeNode( Node* node )
00232 {
00233     NodeVector::iterator i = find( _nodes.begin(), _nodes.end(), node );
00234     if( i == _nodes.end( ))
00235         return false;
00236 
00237     _nodes.erase( i );
00238     node->_config = 0; 
00239 
00240     return true;
00241 }
00242 
00243 void Config::addObserver( Observer* observer )
00244 {
00245     observer->_config = this;
00246     _observers.push_back( observer );
00247 }
00248 
00249 bool Config::removeObserver( Observer* observer )
00250 {
00251     vector<Observer*>::iterator i = find( _observers.begin(), _observers.end(),
00252                                           observer );
00253     if( i == _observers.end( ))
00254         return false;
00255 
00256     _observers.erase( i );
00257     observer->_config = 0;
00258     return true;
00259 }
00260 
00261 Observer* Config::findObserver( const std::string& name )
00262 {
00263     ObserverFinder finder( name );
00264     accept( finder );
00265     return finder.getResult();
00266 }
00267 const Observer* Config::findObserver( const std::string& name ) const
00268 {
00269     ConstObserverFinder finder( name );
00270     accept( finder );
00271     return finder.getResult();
00272 }
00273 Observer* Config::findObserver( const uint32_t id )
00274 {
00275     ObserverIDFinder finder( id );
00276     accept( finder );
00277     return finder.getResult();
00278 }
00279 
00280 
00281 void Config::addLayout( Layout* layout )
00282 {
00283     layout->_config = this;
00284     _layouts.push_back( layout );
00285 }
00286 
00287 bool Config::removeLayout( Layout* layout )
00288 {
00289     vector<Layout*>::iterator i = find( _layouts.begin(), _layouts.end(),
00290                                           layout );
00291     if( i == _layouts.end( ))
00292         return false;
00293 
00294     _layouts.erase( i );
00295     layout->_config = 0;
00296     return true;
00297 }
00298 
00299 
00300 Layout* Config::findLayout( const std::string& name )
00301 {
00302     LayoutFinder finder( name );
00303     accept( finder );
00304     return finder.getResult();
00305 }
00306 const Layout* Config::findLayout( const std::string& name ) const
00307 {
00308     ConstLayoutFinder finder( name );
00309     accept( finder );
00310     return finder.getResult();
00311 }
00312 Layout* Config::findLayout( const uint32_t id )
00313 {
00314     LayoutIDFinder finder( id );
00315     accept( finder );
00316     return finder.getResult();
00317 }
00318 
00319 View* Config::findView( const std::string& name )
00320 {
00321     ViewFinder finder( name );
00322     accept( finder );
00323     return finder.getResult();
00324 }
00325 
00326 const View* Config::findView( const std::string& name ) const
00327 {
00328     ConstViewFinder finder( name );
00329     accept( finder );
00330     return finder.getResult();
00331 }
00332 
00333 namespace
00334 {
00335 class ChannelViewFinder : public ConfigVisitor
00336 {
00337 public:
00338     ChannelViewFinder( const Segment* const segment, const View* const view ) 
00339             : _segment( segment ), _view( view ), _result( 0 ) {}
00340 
00341     virtual ~ChannelViewFinder(){}
00342 
00343     virtual VisitorResult visit( Channel* channel )
00344         {
00345             if( channel->getView() != _view )
00346                 return TRAVERSE_CONTINUE;
00347 
00348             if( channel->getSegment() != _segment )
00349                 return TRAVERSE_CONTINUE;
00350 
00351             _result = channel;
00352             return TRAVERSE_TERMINATE;
00353         }
00354 
00355     Channel* getResult() { return _result; }
00356 
00357 private:
00358     const Segment* const _segment;
00359     const View* const    _view;
00360     Channel*             _result;
00361 };
00362 }
00363 
00364 Channel* Config::findChannel( const Segment* segment, const View* view )
00365 {
00366     ChannelViewFinder finder( segment, view );
00367     accept( finder );
00368     return finder.getResult();
00369 }
00370 
00371 void Config::addCanvas( Canvas* canvas )
00372 {
00373     const LayoutVector& layouts = canvas->getLayouts();
00374     const SegmentVector& segments = canvas->getSegments();
00375 
00376     for( LayoutVector::const_iterator i = layouts.begin();
00377          i != layouts.end(); ++i )
00378     {
00379         const Layout* layout = *i;
00380         if( !layout )
00381             continue;
00382 
00383         const ViewVector& views = layout->getViews();
00384         for( ViewVector::const_iterator j = views.begin(); 
00385              j != views.end(); ++j )
00386         {
00387             View* view = *j;
00388 
00389             for( SegmentVector::const_iterator k = segments.begin();
00390                  k != segments.end(); ++k )
00391             {
00392                 Segment* segment = *k;
00393                 Viewport viewport = segment->getViewport();
00394                 viewport.intersect( view->getViewport( ));
00395 
00396                 if( !viewport.hasArea())
00397                 {
00398                     EQLOG( LOG_VIEW )
00399                         << "View " << view->getName() << view->getViewport()
00400                         << " doesn't intersect " << segment->getName()
00401                         << segment->getViewport() << std::endl;
00402                 
00403                     continue;
00404                 }
00405                       
00406                 Channel* segmentChannel = segment->getChannel( );
00407                 if (!segmentChannel)
00408                 {
00409                     EQWARN << "Segment " << segment->getName()
00410                            << " has no output channel" << std::endl;
00411                     continue;
00412                 }
00413 
00414                 // try to reuse channel
00415                 Channel* channel = findChannel( segment, view );
00416 
00417                 if( !channel ) // create and add new channel
00418                 {
00419                     Window* window = segmentChannel->getWindow();
00420                     channel = new Channel( *segmentChannel, window );
00421 
00422                     channel->setOutput( view, segment );
00423                 }
00424 
00425                 //----- compute channel viewport:
00426                 // segment/view intersection in canvas space...
00427                 Viewport contribution = viewport;
00428                 // ... in segment space...
00429                 contribution.transform( segment->getViewport( ));
00430             
00431                 // segment output area
00432                 Viewport subViewport = segmentChannel->getViewport();
00433                 // ...our part of it    
00434                 subViewport.apply( contribution );
00435             
00436                 channel->setViewport( subViewport );
00437             
00438                 EQLOG( LOG_VIEW ) 
00439                     << "View @" << (void*)view << ' ' << view->getViewport()
00440                     << " intersects " << segment->getName()
00441                     << segment->getViewport() << " at " << subViewport
00442                     << " using channel @" << (void*)channel << std::endl;
00443             }
00444         }
00445     }
00446 
00447     canvas->_config = this;
00448     _canvases.push_back( canvas );
00449 }
00450 
00451 bool Config::removeCanvas( Canvas* canvas )
00452 {
00453     vector<Canvas*>::iterator i = find( _canvases.begin(), _canvases.end(),
00454                                         canvas );
00455     if( i == _canvases.end( ))
00456         return false;
00457 
00458     _canvases.erase( i );
00459     canvas->_config = 0;
00460     return true;
00461 }
00462 
00463 Canvas* Config::findCanvas( const std::string& name )
00464 {
00465     CanvasFinder finder( name );
00466     accept( finder );
00467     return finder.getResult();
00468 }
00469 
00470 const Canvas* Config::findCanvas( const std::string& name ) const
00471 {
00472     ConstCanvasFinder finder( name );
00473     accept( finder );
00474     return finder.getResult();
00475 }
00476 
00477 Segment* Config::findSegment( const std::string& name )
00478 {
00479     SegmentFinder finder( name );
00480     accept( finder );
00481     return finder.getResult();
00482 }
00483 const Segment* Config::findSegment( const std::string& name ) const
00484 {
00485     ConstSegmentFinder finder( name );
00486     accept( finder );
00487     return finder.getResult();
00488 }
00489 
00490 
00491 void Config::addCompound( Compound* compound )
00492 {
00493     compound->_config = this;
00494     _compounds.push_back( compound );
00495 }
00496 
00497 bool Config::removeCompound( Compound* compound )
00498 {
00499     vector<Compound*>::iterator i = find( _compounds.begin(), _compounds.end(),
00500                                           compound );
00501     if( i == _compounds.end( ))
00502         return false;
00503 
00504     _compounds.erase( i );
00505     compound->_config = 0;
00506     return true;
00507 }
00508 
00509 Channel* Config::findChannel( const std::string& name )
00510 {
00511     ChannelFinder finder( name );
00512     accept( finder );
00513     return finder.getResult();
00514 }
00515 
00516 const Channel* Config::findChannel( const std::string& name ) const
00517 {
00518     ConstChannelFinder finder( name );
00519     accept( finder );
00520     return finder.getResult();
00521 }
00522 
00523 void Config::addApplicationNode( Node* node )
00524 {
00525     EQASSERT( _state == STATE_STOPPED );
00526     EQASSERTINFO( !_appNode, "Only one application node per config possible" );
00527 
00528     _appNode = node;
00529     addNode( node );
00530 }
00531 
00532 void Config::setApplicationNetNode( net::NodePtr node )
00533 {
00534     EQASSERT( _state == STATE_STOPPED );
00535     EQASSERT( !_appNetNode );
00536 
00537     _appNetNode = node;
00538 }
00539 
00540 Channel* Config::getChannel( const ChannelPath& path )
00541 {
00542     EQASSERTINFO( _nodes.size() > path.nodeIndex,
00543                   _nodes.size() << " <= " << path.nodeIndex );
00544 
00545     if( _nodes.size() <= path.nodeIndex )
00546         return 0;
00547 
00548     return _nodes[ path.nodeIndex ]->getChannel( path );
00549 }
00550 
00551 Canvas* Config::getCanvas( const CanvasPath& path )
00552 {
00553     EQASSERTINFO( _canvases.size() > path.canvasIndex,
00554                   _canvases.size() << " <= " << path.canvasIndex );
00555 
00556     if( _canvases.size() <= path.canvasIndex )
00557         return 0;
00558 
00559     return _canvases[ path.canvasIndex ];
00560 }
00561 
00562 Segment* Config::getSegment( const SegmentPath& path )
00563 {
00564     Canvas* canvas = getCanvas( path );
00565     EQASSERT( canvas );
00566 
00567     if( canvas )
00568         return canvas->getSegment( path );
00569 
00570     return 0;
00571 }
00572 
00573 Observer* Config::getObserver( const ObserverPath& path )
00574 {
00575     EQASSERTINFO( _observers.size() > path.observerIndex,
00576                   _observers.size() << " <= " << path.observerIndex );
00577 
00578     if( _observers.size() <= path.observerIndex )
00579         return 0;
00580 
00581     return _observers[ path.observerIndex ];
00582 }
00583 
00584 Layout* Config::getLayout( const LayoutPath& path )
00585 {
00586     EQASSERTINFO( _layouts.size() > path.layoutIndex,
00587                   _layouts.size() << " <= " << path.layoutIndex );
00588 
00589     if( _layouts.size() <= path.layoutIndex )
00590         return 0;
00591 
00592     return _layouts[ path.layoutIndex ];
00593 }
00594 
00595 View* Config::getView( const ViewPath& path )
00596 {
00597     Layout* layout = getLayout( path );
00598     EQASSERT( layout );
00599 
00600     if( layout )
00601         return layout->getView( path );
00602 
00603     return 0;
00604 }
00605 
00606 namespace
00607 {
00608 template< class C, class V >
00609 VisitorResult _accept( C* config, V& visitor )
00610 { 
00611     VisitorResult result = visitor.visitPre( config );
00612     if( result != TRAVERSE_CONTINUE )
00613         return result;
00614 
00615     const NodeVector& nodes = config->getNodes();
00616     for( NodeVector::const_iterator i = nodes.begin(); i != nodes.end(); ++i )
00617     {
00618         switch( (*i)->accept( visitor ))
00619         {
00620             case TRAVERSE_TERMINATE:
00621                 return TRAVERSE_TERMINATE;
00622 
00623             case TRAVERSE_PRUNE:
00624                 result = TRAVERSE_PRUNE;
00625                 break;
00626                 
00627             case TRAVERSE_CONTINUE:
00628             default:
00629                 break;
00630         }
00631     }
00632 
00633     const ObserverVector& observers = config->getObservers();
00634     for( ObserverVector::const_iterator i = observers.begin(); 
00635          i != observers.end(); ++i )
00636     {
00637         switch( (*i)->accept( visitor ))
00638         {
00639             case TRAVERSE_TERMINATE:
00640                 return TRAVERSE_TERMINATE;
00641 
00642             case TRAVERSE_PRUNE:
00643                 result = TRAVERSE_PRUNE;
00644                 break;
00645                 
00646             case TRAVERSE_CONTINUE:
00647             default:
00648                 break;
00649         }
00650     }
00651 
00652     const LayoutVector& layouts = config->getLayouts();
00653     for( LayoutVector::const_iterator i = layouts.begin(); 
00654          i != layouts.end(); ++i )
00655     {
00656         switch( (*i)->accept( visitor ))
00657         {
00658             case TRAVERSE_TERMINATE:
00659                 return TRAVERSE_TERMINATE;
00660 
00661             case TRAVERSE_PRUNE:
00662                 result = TRAVERSE_PRUNE;
00663                 break;
00664                 
00665             case TRAVERSE_CONTINUE:
00666             default:
00667                 break;
00668         }
00669     }
00670 
00671     const CanvasVector& canvases = config->getCanvases();
00672     for( CanvasVector::const_iterator i = canvases.begin();
00673          i != canvases.end(); ++i )
00674     {
00675         switch( (*i)->accept( visitor ))
00676         {
00677             case TRAVERSE_TERMINATE:
00678                 return TRAVERSE_TERMINATE;
00679 
00680             case TRAVERSE_PRUNE:
00681                 result = TRAVERSE_PRUNE;
00682                 break;
00683                 
00684             case TRAVERSE_CONTINUE:
00685             default:
00686                 break;
00687         }
00688     }
00689 
00690     const CompoundVector& compounds = config->getCompounds();
00691     for( CompoundVector::const_iterator i = compounds.begin();
00692          i != compounds.end(); ++i )
00693     {
00694         switch( (*i)->accept( visitor ))
00695         {
00696             case TRAVERSE_TERMINATE:
00697                 return TRAVERSE_TERMINATE;
00698 
00699             case TRAVERSE_PRUNE:
00700                 result = TRAVERSE_PRUNE;
00701                 break;
00702                 
00703             case TRAVERSE_CONTINUE:
00704             default:
00705                 break;
00706         }
00707     }                                                           
00708 
00709     switch( visitor.visitPost( config ))
00710     {
00711         case TRAVERSE_TERMINATE:
00712             return TRAVERSE_TERMINATE;
00713 
00714         case TRAVERSE_PRUNE:
00715             result = TRAVERSE_PRUNE;
00716             break;
00717                 
00718         case TRAVERSE_CONTINUE:
00719         default:
00720             break;
00721     }
00722 
00723     return result;
00724 }
00725 }
00726 
00727 VisitorResult Config::accept( ConfigVisitor& visitor )
00728 {
00729     return _accept( this, visitor );
00730 }
00731 
00732 VisitorResult Config::accept( ConstConfigVisitor& visitor ) const
00733 {
00734     return _accept( this, visitor );
00735 }
00736 
00737 
00738 //===========================================================================
00739 // operations
00740 //===========================================================================
00741 
00742 uint32_t Config::getDistributorID()
00743 {
00744     EQASSERT( !_serializer );
00745 
00746     _serializer = new ConfigSerializer( this );
00747     registerObject( _serializer );
00748     return _serializer->getID();
00749 }
00750 
00751 //---------------------------------------------------------------------------
00752 // update running entities (init/exit)
00753 //---------------------------------------------------------------------------
00754 
00755 bool Config::_updateRunning()
00756 {
00757     if( _state == STATE_STOPPED )
00758         return true;
00759 
00760     EQASSERT( _state == STATE_RUNNING || _state == STATE_INITIALIZING ||
00761               _state == STATE_EXITING );
00762 
00763     _error.clear();
00764 
00765     if( !_connectNodes( ))
00766         return false;
00767 
00768     _startNodes();
00769 
00770     // Let all running nodes update their running state (incl. children)
00771     for( NodeVector::const_iterator i = _nodes.begin(); i != _nodes.end(); ++i )
00772         (*i)->updateRunning( _initID, _currentFrame );
00773 
00774     // Sync state updates
00775     bool success = true;
00776     for( NodeVector::const_iterator i = _nodes.begin(); i != _nodes.end(); ++i )
00777     {
00778         Node* node = *i;
00779         if( !node->syncRunning( ))
00780         {
00781             _error += "node " + node->getName() + ": '" +
00782                           node->getErrorMessage() + '\'';
00783             success = false;
00784         }
00785     }
00786 
00787     _stopNodes();
00788     _syncClock();
00789     return success;
00790 }
00791 
00792 //----- connect new nodes
00793 bool Config::_connectNodes()
00794 {
00795     bool success = true;
00796     for( NodeVector::const_iterator i = _nodes.begin(); i != _nodes.end(); ++i )
00797     {
00798         Node* node = *i;
00799         if( node->isActive() && !_connectNode( node ))
00800         {
00801             success = false;
00802             break;
00803         }
00804     }
00805 
00806     for( NodeVector::const_iterator i = _nodes.begin(); i != _nodes.end(); ++i )
00807     {
00808         Node* node = *i;
00809         if( node->isActive() && !_syncConnectNode( node ))
00810             success = false;
00811     }
00812 
00813     return success;
00814 }
00815 
00816 
00817 namespace
00818 {
00819 static net::NodePtr _createNetNode( Node* node )
00820 {
00821     net::NodePtr netNode = new net::Node;
00822 
00823     const ConnectionDescriptionVector& descriptions = 
00824         node->getConnectionDescriptions();
00825     for( ConnectionDescriptionVector::const_iterator i = descriptions.begin();
00826          i != descriptions.end(); ++i )
00827     {
00828         const net::ConnectionDescription* desc = (*i).get();
00829         
00830         netNode->addConnectionDescription( 
00831             new net::ConnectionDescription( *desc ));
00832     }
00833 
00834     netNode->setAutoLaunch( true );
00835     return netNode;
00836 }
00837 }
00838 
00839 bool Config::_connectNode( Node* node )
00840 {
00841     EQASSERT( node->isActive( ));
00842 
00843     net::NodePtr netNode = node->getNode();
00844     if( netNode.isValid( ))
00845         return ( netNode->getState() == net::Node::STATE_CONNECTED );
00846 
00847     net::NodePtr localNode = getLocalNode();
00848     EQASSERT( localNode.isValid( ));
00849     
00850     if( node == _appNode )
00851         netNode = _appNetNode;
00852     else
00853     {
00854         netNode = _createNetNode( node );
00855         netNode->setProgramName( _renderClient );
00856         netNode->setWorkDir( _workDir );
00857     }
00858 
00859     EQLOG( LOG_INIT ) << "Connecting node" << std::endl;
00860     if( !localNode->initConnect( netNode ))
00861     {
00862         stringstream nodeString;
00863         nodeString << node;
00864         
00865         _error += string( "Connection to node failed, node does not run " ) +
00866             string( "and launch command failed:\n " ) + 
00867             nodeString.str();
00868         EQERROR << "Connection to " << netNode->getNodeID() << " failed." 
00869                 << endl;
00870         return false;
00871     }
00872 
00873     node->setNode( netNode );
00874     return true;
00875 }
00876 
00877 bool Config::_syncConnectNode( Node* node )
00878 {
00879     EQASSERT( node->isActive( ));
00880 
00881     net::NodePtr netNode = node->getNode();
00882     if( !netNode )
00883         return false;
00884 
00885     net::NodePtr localNode = getLocalNode();
00886     EQASSERT( localNode.isValid( ));
00887 
00888     if( !localNode->syncConnect( netNode ))
00889     {
00890         stringstream nodeString;
00891         nodeString << netNode->serialize();
00892 
00893         _error += "Connection of node failed, node did not start (" +
00894             nodeString.str() + ") ";
00895         EQERROR << _error << std::endl;
00896 
00897         node->setNode( 0 );
00898         EQASSERT( netNode->getRefCount() == 1 );
00899         return false;
00900     }
00901     return true;
00902 }
00903 
00904 void Config::_startNodes()
00905 {
00906     // start up newly running nodes
00907     std::vector< uint32_t > requests;
00908     NodeVector startingNodes;
00909     for( NodeVector::const_iterator i = _nodes.begin(); i != _nodes.end(); ++i )
00910     {
00911         Node* node = *i;
00912         const Node::State state = node->getState();
00913 
00914         if( node->isActive() && state != Node::STATE_RUNNING )
00915         {
00916             EQASSERT( state == Node::STATE_STOPPED );
00917             startingNodes.push_back( node );
00918             if( node != _appNode )
00919                 requests.push_back( _createConfig( node ));
00920         }
00921     }
00922 
00923     // sync create config requests on starting nodes
00924     for( std::vector< uint32_t >::const_iterator i = requests.begin();
00925          i != requests.end(); ++i )
00926     {
00927         _requestHandler.waitRequest( *i );
00928     }
00929 }
00930 
00931 void Config::_stopNodes()
00932 {
00933     // wait for the nodes to stop, destroy entities, disconnect
00934     NodeVector stoppingNodes;
00935     for( NodeVector::const_iterator i = _nodes.begin(); i != _nodes.end(); ++i )
00936     {
00937         Node* node = *i;
00938         if( node->getState() != Node::STATE_STOPPED || node == _appNode )
00939             continue;
00940 
00941         net::NodePtr netNode = node->getNode();
00942         if( !netNode ) // already disconnected
00943             continue;
00944 
00945         EQLOG( LOG_INIT ) << "Exiting node" << std::endl;
00946 
00947         stoppingNodes.push_back( node );
00948         EQASSERT( !node->isActive( ));
00949         EQASSERT( netNode.isValid( ));
00950 
00951         eq::ServerDestroyConfigPacket destroyConfigPacket;
00952         destroyConfigPacket.configID = getID();
00953         netNode->send( destroyConfigPacket );
00954 
00955         eq::ClientExitPacket clientExitPacket;
00956         netNode->send( clientExitPacket );
00957     }
00958 
00959     // now wait that the render clients disconnect
00960     uint32_t nSleeps = 50; // max 5 seconds for all clients
00961     for( NodeVector::const_iterator i = stoppingNodes.begin();
00962          i != stoppingNodes.end(); ++i )
00963     {
00964         Node*        node    = *i;
00965         net::NodePtr netNode = node->getNode();
00966 
00967         node->setNode( 0 );
00968 
00969         while( netNode->getState() == net::Node::STATE_CONNECTED && --nSleeps )
00970             base::sleep( 100 ); // ms
00971 
00972         if( netNode->getState() == net::Node::STATE_CONNECTED )
00973         {
00974             net::NodePtr localNode = getLocalNode();
00975             EQASSERT( localNode.isValid( ));
00976 
00977             EQWARN << "Forcefully disconnecting exited render client node"
00978                    << std::endl;
00979             localNode->disconnect( netNode );
00980         }
00981 
00982         EQLOG( LOG_INIT ) << "Disconnected node" << std::endl;
00983     }
00984 }
00985 
00986 uint32_t Config::_createConfig( Node* node )
00987 {
00988     EQASSERT( node != _appNode );
00989     EQASSERT( node->isActive( ));
00990 
00991     // create config (session) on each non-app node
00992     //   app-node already has config from chooseConfig
00993     eq::ServerCreateConfigPacket createConfigPacket;
00994     createConfigPacket.configID  = getID();
00995     createConfigPacket.appNodeID = _appNetNode->getNodeID();
00996     createConfigPacket.appNodeID.convertToNetwork();
00997     createConfigPacket.requestID = _requestHandler.registerRequest();
00998 
00999     const string&   name = getName();
01000     net::NodePtr netNode = node->getNode();
01001     netNode->send( createConfigPacket, name );
01002 
01003     return createConfigPacket.requestID;
01004 }
01005 
01006 void Config::_syncClock()
01007 {
01008     eq::ConfigSyncClockPacket packet;
01009     packet.time = _clock.getTime64();
01010 
01011     send( _appNetNode, packet );
01012 
01013     for( NodeVector::const_iterator i = _nodes.begin(); i != _nodes.end(); ++i )
01014     {
01015         Node* node = *i;
01016         if( node->isActive( ))
01017         {
01018             net::NodePtr netNode = node->getNode();
01019             EQASSERT( netNode->isConnected( ));
01020 
01021             send( netNode, packet );
01022         }
01023     }
01024 }
01025 
01026 //---------------------------------------------------------------------------
01027 // init
01028 //---------------------------------------------------------------------------
01029 bool Config::_init( const uint32_t initID )
01030 {
01031     EQASSERT( _state == STATE_STOPPED );
01032     _state = STATE_INITIALIZING;
01033     _currentFrame  = 0;
01034     _finishedFrame = 0;
01035     _initID = initID;
01036 
01037     for( ObserverVector::const_iterator i = _observers.begin();
01038          i != _observers.end(); ++i )
01039     {
01040         Observer* observer = *i;
01041         observer->init();
01042     }
01043 
01044     for( CanvasVector::const_iterator i = _canvases.begin();
01045          i != _canvases.end(); ++i )
01046     {
01047         Canvas* canvas = *i;
01048         canvas->init();
01049     }
01050 
01051     for( CompoundVector::const_iterator i = _compounds.begin();
01052          i != _compounds.end(); ++i )
01053     {
01054         Compound* compound = *i;
01055         compound->init();
01056     }
01057 
01058     if( !_updateRunning( ))
01059         return false;
01060 
01061     _state = STATE_RUNNING;
01062     return true;
01063 }
01064 
01065 //---------------------------------------------------------------------------
01066 // exit
01067 //---------------------------------------------------------------------------
01068 
01069 bool Config::exit()
01070 {
01071     if( _state != STATE_RUNNING )
01072         EQWARN << "Exiting non-initialized config" << endl;
01073 
01074     EQASSERT( _state == STATE_RUNNING || _state == STATE_INITIALIZING );
01075     _state = STATE_EXITING;
01076 
01077     for( vector< Compound* >::const_iterator i = _compounds.begin();
01078          i != _compounds.end(); ++i )
01079     {
01080         Compound* compound = *i;
01081         compound->exit();
01082     }
01083 
01084     for( vector< Canvas* >::const_iterator i = _canvases.begin();
01085          i != _canvases.end(); ++i )
01086     {
01087         Canvas* canvas = *i;
01088         canvas->exit();
01089     }
01090 
01091     const bool success = _updateRunning();
01092 
01093     eq::ConfigEvent exitEvent;
01094     exitEvent.data.type = eq::Event::EXIT;
01095     send( _appNetNode, exitEvent );
01096     
01097     _state         = STATE_STOPPED;
01098     return success;
01099 }
01100 
01101 void Config::unmap()
01102 {
01103     eq::ConfigUnmapPacket packet;
01104     packet.requestID = _requestHandler.registerRequest();
01105     send( _appNetNode, packet );
01106 
01107     if( _serializer ) // Config::init never happened
01108     {
01109         deregisterObject( _serializer );
01110         delete _serializer;
01111         _serializer = 0;
01112     }
01113 
01114     UnmapVisitor unmapper;
01115     accept( unmapper );
01116 
01117     _requestHandler.waitRequest( packet.requestID );
01118 }
01119 
01120 bool Config::_startFrame( const uint32_t frameID )
01121 {
01122     if( !_updateRunning( ))
01123     {
01124         ++_currentFrame;
01125         return false;
01126     }
01127 
01128     ++_currentFrame;
01129     EQLOG( LOG_ANY ) << "----- Start Frame ----- " << _currentFrame << endl;
01130 
01131     if( _state == STATE_STOPPED )
01132         return true;
01133     EQASSERT( _state == STATE_RUNNING );
01134 
01135     if( !_appNode || !_appNode->isActive( )) // release appNode local sync
01136     {
01137         ConfigReleaseFrameLocalPacket packet;
01138         packet.frameNumber = _currentFrame;
01139         send( _appNetNode, packet );
01140     }
01141 
01142     for( vector< Compound* >::const_iterator i = _compounds.begin(); 
01143          i != _compounds.end(); ++i )
01144     {
01145         Compound* compound = *i;
01146         compound->update( _currentFrame );
01147     }
01148     
01149     ConfigUpdateDataVisitor configDataVisitor;
01150     accept( configDataVisitor );
01151 
01152     for( vector< Node* >::const_iterator i = _nodes.begin(); 
01153          i != _nodes.end(); ++i )
01154     {
01155         Node* node = *i;
01156         if( node->isActive( ))
01157             node->update( frameID, _currentFrame );
01158     }
01159     return true;
01160 }
01161 
01162 void Config::notifyNodeFrameFinished( const uint32_t frameNumber )
01163 {
01164     if( _finishedFrame >= frameNumber ) // node finish already done
01165         return;
01166 
01167     for( NodeVector::const_iterator i = _nodes.begin(); i != _nodes.end(); ++i )
01168     {
01169         const Node* node = *i;
01170         if( node->isActive() && node->getFinishedFrame() < frameNumber )
01171             return;
01172     }
01173 
01174     _finishedFrame = frameNumber;
01175 
01176     // All nodes have finished the frame. Notify the application's config that
01177     // the frame is finished
01178     eq::ConfigFrameFinishPacket packet;
01179     packet.frameNumber = frameNumber;
01180     packet.sessionID   = getID();
01181 
01182     // do not use send/_bufferedTasks, not thread-safe!
01183     _appNetNode->send( packet );
01184     EQLOG( eq::LOG_TASKS ) << "TASK config frame finished  " << &packet
01185                            << std::endl;
01186 }
01187 
01188 void Config::_flushAllFrames()
01189 {
01190     if( _currentFrame == 0 )
01191         return;
01192 
01193     for( NodeVector::const_iterator i = _nodes.begin(); i != _nodes.end(); ++i )
01194     {
01195         Node* node = *i;
01196         if( node->isActive( ))
01197             node->flushFrames( _currentFrame );
01198     }
01199 
01200     EQLOG( LOG_ANY ) << "--- Flush All Frames -- " << endl;
01201 }
01202 
01203 //---------------------------------------------------------------------------
01204 // command handlers
01205 //---------------------------------------------------------------------------
01206 net::CommandResult Config::_cmdInit( net::Command& command )
01207 {
01208     // clients have retrieved distributed data
01209     EQASSERT( _serializer );
01210     deregisterObject( _serializer );
01211     delete _serializer;
01212     _serializer = 0;
01213 
01214     const eq::ConfigInitPacket* packet =
01215         command.getPacket<eq::ConfigInitPacket>();
01216     EQVERB << "handle config start init " << packet << endl;
01217 
01218     eq::ConfigInitReplyPacket reply( packet );
01219     reply.result = _init( packet->initID );
01220     std::string error = _error;
01221 
01222     if( !reply.result )
01223         exit();
01224 
01225     EQINFO << "Config init " << (reply.result ? "successful": "failed: ") 
01226            << error << endl;
01227 
01228     send( command.getNode(), reply, error );
01229     return net::COMMAND_HANDLED;
01230 }
01231 
01232 net::CommandResult Config::_cmdExit( net::Command& command ) 
01233 {
01234     const eq::ConfigExitPacket* packet = 
01235         command.getPacket<eq::ConfigExitPacket>();
01236     eq::ConfigExitReplyPacket   reply( packet );
01237     EQVERB << "handle config exit " << packet << endl;
01238 
01239     if( _state == STATE_RUNNING )
01240         reply.result = exit();
01241     else
01242         reply.result = false;
01243 
01244     EQINFO << "config exit result: " << reply.result << endl;
01245     send( command.getNode(), reply );
01246     return net::COMMAND_HANDLED;
01247 }
01248 
01249 net::CommandResult Config::_cmdStartFrame( net::Command& command ) 
01250 {
01251     const eq::ConfigStartFramePacket* packet = 
01252         command.getPacket<eq::ConfigStartFramePacket>();
01253     EQVERB << "handle config frame start " << packet << endl;
01254 
01255     if( packet->nChanges > 0 )
01256     {
01257         ConfigSyncVisitor syncer( packet->nChanges, packet->changes );
01258         accept( syncer );
01259     }
01260 
01261     if( !_startFrame( packet->frameID ))
01262     {
01263         EQWARN << "Start frame failed, exiting config: " << _error << std::endl;
01264         exit();
01265     }
01266 
01267     if( _state == STATE_STOPPED )
01268     {
01269         // unlock app
01270         eq::ConfigFrameFinishPacket frameFinishPacket;
01271         frameFinishPacket.frameNumber = _currentFrame;
01272         send( command.getNode(), frameFinishPacket );        
01273     }
01274 
01275     return net::COMMAND_HANDLED;
01276 }
01277 
01278 net::CommandResult Config::_cmdFinishAllFrames( net::Command& command ) 
01279 {
01280     const eq::ConfigFinishAllFramesPacket* packet = 
01281         command.getPacket<eq::ConfigFinishAllFramesPacket>();
01282     EQVERB << "handle config all frames finish " << packet << endl;
01283 
01284     _flushAllFrames();
01285     return net::COMMAND_HANDLED;
01286 }
01287 
01288 net::CommandResult Config::_cmdCreateReply( net::Command& command ) 
01289 {
01290     const eq::ConfigCreateReplyPacket* packet = 
01291         command.getPacket<eq::ConfigCreateReplyPacket>();
01292 
01293     _requestHandler.serveRequest( packet->requestID );
01294     return net::COMMAND_HANDLED;
01295 }
01296 
01297 namespace
01298 {
01299 class FreezeVisitor : public ConfigVisitor
01300 {
01301 public:
01302     // No need to go down on nodes.
01303     virtual VisitorResult visitPre( Node* node ) { return TRAVERSE_PRUNE; }
01304 
01305     FreezeVisitor( const bool freeze ) : _freeze( freeze ) {}
01306 
01307     virtual VisitorResult visit( Compound* compound )
01308         { 
01309             const EqualizerVector& equalizers = compound->getEqualizers();
01310             for( EqualizerVector::const_iterator i = equalizers.begin();
01311                  i != equalizers.end(); ++i )
01312             {
01313                 (*i)->setFrozen( _freeze );
01314             }
01315             return TRAVERSE_CONTINUE; 
01316         }
01317 
01318 private:
01319     const bool _freeze;
01320 };
01321 }
01322 
01323 net::CommandResult Config::_cmdFreezeLoadBalancing( net::Command& command ) 
01324 {
01325     const eq::ConfigFreezeLoadBalancingPacket* packet = 
01326         command.getPacket<eq::ConfigFreezeLoadBalancingPacket>();
01327 
01328     FreezeVisitor visitor( packet->freeze );
01329     accept( visitor );
01330 
01331     return net::COMMAND_HANDLED;
01332 }
01333 
01334 net::CommandResult Config::_cmdUnmapReply( net::Command& command ) 
01335 {
01336     const eq::ConfigUnmapReplyPacket* packet = 
01337         command.getPacket< eq::ConfigUnmapReplyPacket >();
01338     EQVERB << "Handle unmap reply " << packet << endl;
01339 
01340     _requestHandler.serveRequest( packet->requestID );
01341     return net::COMMAND_HANDLED;
01342 }
01343 
01344 
01345 ostream& operator << ( ostream& os, const Config* config )
01346 {
01347     if( !config )
01348         return os;
01349 
01350     os << disableFlush << disableHeader << "config " << endl;
01351     os << "{" << endl << indent;
01352 
01353     if( !config->getName().empty( ))
01354         os << "name    \"" << config->getName() << '"' << endl;
01355 
01356     if( config->getLatency() != 1 )
01357         os << "latency " << config->getLatency() << endl;
01358     os << endl;
01359 
01360     EQASSERTINFO( config->getFAttribute( Config::FATTR_VERSION ) ==
01361                   Global::instance()->getConfigFAttribute(Config::FATTR_VERSION)
01362                   , "Per-config versioning not implemented" );
01363 
01364     const float value = config->getFAttribute( Config::FATTR_EYE_BASE );
01365     if( value != 
01366         Global::instance()->getConfigFAttribute( Config::FATTR_EYE_BASE ))
01367     {
01368         os << "attributes" << endl << "{" << endl << indent
01369            << "eye_base     " << value << endl
01370            << exdent << "}" << endl;
01371     }
01372 
01373     const NodeVector& nodes = config->getNodes();
01374     for( NodeVector::const_iterator i = nodes.begin(); i != nodes.end(); ++i )
01375         os << *i;
01376 
01377     const ObserverVector& observers = config->getObservers();
01378     for( ObserverVector::const_iterator i = observers.begin(); 
01379          i !=observers.end(); ++i )
01380     {
01381         os << *i;
01382     }
01383     const LayoutVector& layouts = config->getLayouts();
01384     for( LayoutVector::const_iterator i = layouts.begin(); 
01385          i !=layouts.end(); ++i )
01386     {
01387         os << *i;
01388     }
01389     const CanvasVector& canvases = config->getCanvases();
01390     for( CanvasVector::const_iterator i = canvases.begin(); 
01391          i != canvases.end(); ++i )
01392     {
01393         os << *i;
01394     }
01395 
01396     const CompoundVector& compounds = config->getCompounds();
01397     for( CompoundVector::const_iterator i = compounds.begin(); 
01398          i != compounds.end(); ++i )
01399     {
01400         os << *i;
01401     }
01402     os << exdent << "}" << endl << enableHeader << enableFlush;
01403 
01404     return os;
01405 }
01406 
01407 }
01408 }
Generated on Mon Aug 10 18:58:32 2009 for Equalizer 0.9 by  doxygen 1.5.8