server/pipe.cpp

00001 
00002 /* Copyright (c) 2005-2009, Stefan Eilemann <eile@equalizergraphics.com> 
00003  *
00004  * This library is free software; you can redistribute it and/or modify it under
00005  * the terms of the GNU Lesser General Public License version 2.1 as published
00006  * by the Free Software Foundation.
00007  *  
00008  * This library is distributed in the hope that it will be useful, but WITHOUT
00009  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00010  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00011  * details.
00012  * 
00013  * You should have received a copy of the GNU Lesser General Public License
00014  * along with this library; if not, write to the Free Software Foundation, Inc.,
00015  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00016  */
00017 
00018 #include <pthread.h>
00019 #include "pipe.h"
00020 
00021 #include "channel.h"
00022 #include "config.h"
00023 #include "global.h"
00024 #include "log.h"
00025 #include "node.h"
00026 #include "paths.h"
00027 #include "pipeVisitor.h"
00028 #include "window.h"
00029 
00030 #include <eq/client/commands.h>
00031 #include <eq/client/packets.h>
00032 #include <eq/net/command.h>
00033 
00034 using namespace eq::base;
00035 using namespace std;
00036 
00037 namespace eq
00038 {
00039 namespace server
00040 {
00041 typedef net::CommandFunc<Pipe> PipeFunc;
00042 
00043 #define MAKE_ATTR_STRING( attr ) ( string("EQ_PIPE_") + #attr )
00044 std::string Pipe::_iAttributeStrings[IATTR_ALL] = 
00045 {
00046     MAKE_ATTR_STRING( IATTR_HINT_THREAD ),
00047     MAKE_ATTR_STRING( IATTR_FILL1 ),
00048     MAKE_ATTR_STRING( IATTR_FILL2 )
00049 };
00050 
00051 void Pipe::_construct()
00052 {
00053     _active         = 0;
00054     _node           = 0;
00055     _tasks          = eq::TASK_NONE;
00056     _port           = EQ_UNDEFINED_UINT32;
00057     _device         = EQ_UNDEFINED_UINT32;
00058     _lastDrawWindow = 0;
00059 
00060     EQINFO << "New pipe @" << (void*)this << endl;
00061 }
00062 
00063 Pipe::Pipe()
00064 {
00065     _construct();
00066 
00067     const Global* global = Global::instance();
00068     for( int i=0; i<IATTR_ALL; ++i )
00069         _iAttributes[i] = global->getPipeIAttribute(
00070             static_cast<IAttribute>( i ));
00071 }
00072 
00073 Pipe::Pipe( const Pipe& from, Node* node )
00074         : eq::net::Object()
00075 {
00076     _construct();
00077 
00078     _name   = from._name;
00079     _port   = from._port;
00080     _device = from._device;
00081     _pvp    = from._pvp;
00082 
00083     node->addPipe( this );
00084 
00085     for( int i=0; i<IATTR_ALL; ++i )
00086         _iAttributes[i] = from._iAttributes[i];
00087 
00088     const WindowVector& windows = from.getWindows();
00089     for( WindowVector::const_iterator i = windows.begin(); 
00090          i != windows.end(); ++i )
00091     {
00092         new Window( **i, this );
00093     }
00094 }
00095 
00096 Pipe::~Pipe()
00097 {
00098     EQINFO << "Delete pipe @" << (void*)this << endl;
00099 
00100     if( _node )
00101         _node->removePipe( this );
00102     
00103     for( vector<Window*>::const_iterator i = _windows.begin(); 
00104          i != _windows.end(); ++i )
00105     {
00106         Window* window = *i;
00107 
00108         window->_pipe = 0;
00109         delete window;
00110     }
00111     _windows.clear();
00112 }
00113 
00114 void Pipe::attachToSession( const uint32_t id, const uint32_t instanceID, 
00115                                eq::net::Session* session )
00116 {
00117     eq::net::Object::attachToSession( id, instanceID, session );
00118     
00119     eq::net::CommandQueue* queue = getCommandThreadQueue();
00120 
00121     registerCommand( eq::CMD_PIPE_CONFIG_INIT_REPLY,
00122                      PipeFunc( this, &Pipe::_cmdConfigInitReply ), queue );
00123     registerCommand( eq::CMD_PIPE_CONFIG_EXIT_REPLY, 
00124                      PipeFunc( this, &Pipe::_cmdConfigExitReply ), queue );
00125 }
00126 
00127 void Pipe::addWindow( Window* window )
00128 {
00129     EQASSERT( window->getChannels().empty( ));
00130 
00131     _windows.push_back( window ); 
00132     window->_pipe = this;
00133     window->notifyViewportChanged();
00134 }
00135 
00136 bool Pipe::removeWindow( Window* window )
00137 {
00138     EQASSERT( window->getChannels().empty( ));
00139 
00140     vector<Window*>::iterator i = find( _windows.begin(), _windows.end(),
00141                                         window );
00142     if( i == _windows.end( ))
00143         return false;
00144 
00145     _windows.erase( i );
00146     window->_pipe = 0;
00147     return true;
00148 }
00149 
00150 Server* Pipe::getServer()
00151 { 
00152     EQASSERT( _node );
00153     return _node ? _node->getServer() : 0; 
00154 }
00155 const Server* Pipe::getServer() const
00156 { 
00157     EQASSERT( _node );
00158     return _node ? _node->getServer() : 0; 
00159 }
00160 
00161 Config* Pipe::getConfig()
00162 {
00163     EQASSERT( _node );
00164     return (_node ? _node->getConfig() : 0);
00165 }
00166 const Config* Pipe::getConfig() const
00167 {
00168     EQASSERT( _node );
00169     return (_node ? _node->getConfig() : 0);
00170 }
00171 
00172 net::CommandQueue* Pipe::getServerThreadQueue()
00173 { 
00174     EQASSERT( _node );
00175     return _node->getServerThreadQueue(); 
00176 }
00177 
00178 net::CommandQueue* Pipe::getCommandThreadQueue()
00179 { 
00180     EQASSERT( _node );
00181     return _node->getCommandThreadQueue(); 
00182 }
00183 
00184 PipePath Pipe::getPath() const
00185 {
00186     EQASSERT( _node );
00187     PipePath path( _node->getPath( ));
00188     
00189     const PipeVector&      pipes = _node->getPipes();
00190     PipeVector::const_iterator i = std::find( pipes.begin(), pipes.end(),
00191                                               this );
00192     EQASSERT( i != pipes.end( ));
00193     path.pipeIndex = std::distance( pipes.begin(), i );
00194     return path;
00195 }
00196 
00197 Channel* Pipe::getChannel( const ChannelPath& path )
00198 {
00199     EQASSERT( _windows.size() > path.windowIndex );
00200 
00201     if( _windows.size() <= path.windowIndex )
00202         return 0;
00203 
00204     return _windows[ path.windowIndex ]->getChannel( path );
00205 }
00206 
00207 namespace
00208 {
00209 template< class C, class V >
00210 VisitorResult _accept( C* pipe, V& visitor )
00211 { 
00212     VisitorResult result = visitor.visitPre( pipe );
00213     if( result != TRAVERSE_CONTINUE )
00214         return result;
00215 
00216     const WindowVector& windows = pipe->getWindows();
00217     for( WindowVector::const_iterator i = windows.begin(); 
00218          i != windows.end(); ++i )
00219     {
00220         switch( (*i)->accept( visitor ))
00221         {
00222             case TRAVERSE_TERMINATE:
00223                 return TRAVERSE_TERMINATE;
00224 
00225             case TRAVERSE_PRUNE:
00226                 result = TRAVERSE_PRUNE;
00227                 break;
00228                 
00229             case TRAVERSE_CONTINUE:
00230             default:
00231                 break;
00232         }
00233     }
00234 
00235     switch( visitor.visitPost( pipe ))
00236     {
00237         case TRAVERSE_TERMINATE:
00238             return TRAVERSE_TERMINATE;
00239 
00240         case TRAVERSE_PRUNE:
00241             return TRAVERSE_PRUNE;
00242                 
00243         case TRAVERSE_CONTINUE:
00244         default:
00245             break;
00246     }
00247 
00248     return result;
00249 }
00250 }
00251 
00252 VisitorResult Pipe::accept( PipeVisitor& visitor )
00253 {
00254     return _accept( this, visitor );
00255 }
00256 
00257 VisitorResult Pipe::accept( ConstPipeVisitor& visitor ) const
00258 {
00259     return _accept( this, visitor );
00260 }
00261 
00262 void Pipe::activate()
00263 {   
00264     EQASSERT( _node );
00265 
00266     ++_active;
00267     if( _node ) 
00268         _node->activate();
00269 
00270     EQLOG( LOG_VIEW ) << "activate: " << _active << std::endl;
00271 }
00272 
00273 void Pipe::deactivate()
00274 { 
00275     EQASSERT( _active != 0 );
00276     EQASSERT( _node );
00277 
00278     --_active; 
00279     if( _node ) 
00280         _node->deactivate(); 
00281 
00282     EQLOG( LOG_VIEW ) << "deactivate: " << _active << std::endl;
00283 };
00284 
00285 void Pipe::addTasks( const uint32_t tasks )
00286 {
00287     EQASSERT( _node );
00288     _tasks |= tasks;
00289     _node->addTasks( tasks );
00290 }
00291 
00292 void Pipe::send( net::ObjectPacket& packet )
00293 { 
00294     EQASSERT( _node );
00295     packet.objectID = getID();
00296     _node->send( packet ); 
00297 }
00298 
00299 void Pipe::_send( net::ObjectPacket& packet, const std::string& string ) 
00300 {
00301     EQASSERT( _node );
00302     packet.objectID = getID(); 
00303     _node->send( packet, string ); 
00304 }
00305 
00306 //===========================================================================
00307 // Operations
00308 //===========================================================================
00309 
00310 //---------------------------------------------------------------------------
00311 // update running entities (init/exit)
00312 //---------------------------------------------------------------------------
00313 
00314 void Pipe::updateRunning( const uint32_t initID, const uint32_t frameNumber )
00315 {
00316     if( !isActive() && _state == STATE_STOPPED ) // inactive
00317         return;
00318 
00319     _error.clear();
00320 
00321     if( isActive() && _state != STATE_RUNNING ) // becoming active
00322         _configInit( initID, frameNumber );
00323 
00324     // Let all running windows update their running state (incl. children)
00325     for( WindowVector::const_iterator i = _windows.begin(); 
00326          i != _windows.end(); ++i )
00327     {
00328         (*i)->updateRunning( initID );
00329     }
00330 
00331     if( !isActive( )) // becoming inactive
00332         _configExit();
00333 }
00334 
00335 bool Pipe::syncRunning()
00336 {
00337     if( !isActive() && _state == STATE_STOPPED ) // inactive
00338         return true;
00339 
00340     // Sync state updates
00341     bool success = true;
00342     for( WindowVector::const_iterator i = _windows.begin();
00343          i != _windows.end(); ++i )
00344     {
00345         Window* window = *i;
00346         if( !window->syncRunning( ))
00347         {
00348             _error += " window " + window->getName() + ": '" + 
00349                       window->getErrorMessage() + '\'';
00350             success = false;
00351         }
00352     }
00353 
00354     if( isActive() && _state != STATE_RUNNING && !_syncConfigInit( ))
00355         // becoming active
00356         success = false;
00357 
00358     if( !isActive() && !_syncConfigExit( ))
00359         // becoming inactive
00360         success = false;
00361 
00362     EQASSERT( _state == STATE_RUNNING || _state == STATE_STOPPED ||
00363               _state == STATE_INIT_FAILED );
00364     return success;
00365 }
00366 
00367 //---------------------------------------------------------------------------
00368 // init
00369 //---------------------------------------------------------------------------
00370 void Pipe::_configInit( const uint32_t initID, const uint32_t frameNumber )
00371 {
00372     EQASSERT( _state == STATE_STOPPED );
00373     _state         = STATE_INITIALIZING;
00374 
00375     getConfig()->registerObject( this );
00376 
00377     EQLOG( LOG_INIT ) << "Create pipe" << std::endl;
00378     eq::NodeCreatePipePacket createPipePacket;
00379     createPipePacket.objectID = _node->getID();
00380     createPipePacket.pipeID   = getID();
00381     createPipePacket.threaded = getIAttribute( IATTR_HINT_THREAD );
00382     _node->send( createPipePacket );
00383 
00384     EQLOG( LOG_INIT ) << "Init pipe" << std::endl;
00385     eq::PipeConfigInitPacket packet;
00386     packet.initID = initID;
00387     packet.port   = _port;
00388     packet.device = _device;
00389     packet.tasks  = _tasks;
00390     packet.pvp    = _pvp;
00391     packet.frameNumber = frameNumber;
00392     _send( packet, _name );
00393 }
00394 
00395 bool Pipe::_syncConfigInit()
00396 {
00397     EQASSERT( _state == STATE_INITIALIZING || _state == STATE_INIT_SUCCESS ||
00398               _state == STATE_INIT_FAILED );
00399 
00400     _state.waitNE( STATE_INITIALIZING );
00401 
00402     const bool success = ( _state == STATE_INIT_SUCCESS );
00403     if( success )
00404         _state = STATE_RUNNING;
00405     else
00406         EQWARN << "Pipe initialization failed: " << _error << endl;
00407 
00408     return success;
00409 }
00410 
00411 //---------------------------------------------------------------------------
00412 // exit
00413 //---------------------------------------------------------------------------
00414 void Pipe::_configExit()
00415 {
00416     EQASSERT( _state == STATE_RUNNING || _state == STATE_INIT_FAILED );
00417     _state = STATE_EXITING;
00418 
00419     EQLOG( LOG_INIT ) << "Exit pipe" << std::endl;
00420     eq::PipeConfigExitPacket packet;
00421     packet.exitThread = ( getIAttribute( IATTR_HINT_THREAD ) != eq::OFF );
00422     send( packet );
00423 
00424     EQLOG( LOG_INIT ) << "Destroy pipe" << std::endl;
00425     eq::NodeDestroyPipePacket destroyPipePacket;
00426     destroyPipePacket.objectID = _node->getID();
00427     destroyPipePacket.pipeID   = getID();
00428     _node->send( destroyPipePacket );
00429 }
00430 
00431 bool Pipe::_syncConfigExit()
00432 {
00433     EQASSERT( _state == STATE_EXITING || _state == STATE_EXIT_SUCCESS || 
00434               _state == STATE_EXIT_FAILED );
00435     
00436     _state.waitNE( STATE_EXITING );
00437     const bool success = ( _state == STATE_EXIT_SUCCESS );
00438     EQASSERT( success || _state == STATE_EXIT_FAILED );
00439 
00440     getConfig()->deregisterObject( this );
00441 
00442     _state = STATE_STOPPED; // EXIT_FAILED -> STOPPED transition
00443     _tasks = eq::TASK_NONE;
00444     return success;
00445 }
00446 
00447 //---------------------------------------------------------------------------
00448 // update
00449 //---------------------------------------------------------------------------
00450 void Pipe::update( const uint32_t frameID, const uint32_t frameNumber )
00451 {
00452     EQASSERT( _state == STATE_RUNNING );
00453     EQASSERT( _active > 0 );
00454 
00455     eq::PipeFrameStartClockPacket startClockPacket;
00456     send( startClockPacket );
00457 
00458     eq::PipeFrameStartPacket startPacket;
00459     startPacket.frameID     = frameID;
00460     startPacket.frameNumber = frameNumber;
00461     send( startPacket );
00462     EQLOG( eq::LOG_TASKS ) << "TASK pipe start frame " << &startPacket << endl;
00463 
00464     for( vector< Window* >::const_iterator i = _windows.begin(); 
00465          i != _windows.end(); ++i )
00466     {
00467         Window* window = *i;
00468         if( window->isActive( ))
00469             window->updateDraw( frameID, frameNumber );
00470     }
00471 
00472     for( vector< Window* >::const_iterator i = _windows.begin(); 
00473          i != _windows.end(); ++i )
00474     {
00475         Window* window = *i;
00476         if( window->isActive( ))
00477             window->updatePost( frameID, frameNumber );
00478     }
00479 
00480     eq::PipeFrameFinishPacket finishPacket;
00481     finishPacket.frameID      = frameID;
00482     finishPacket.frameNumber  = frameNumber;
00483 
00484     send( finishPacket );
00485     EQLOG( eq::LOG_TASKS ) << "TASK pipe finish frame  " << &finishPacket
00486                            << endl;
00487     _lastDrawWindow = 0;
00488 }
00489 
00490 //----------------------------------------------------------------------
00491 // viewport
00492 //----------------------------------------------------------------------
00493 void Pipe::setPixelViewport( const eq::PixelViewport& pvp )
00494 {
00495     if( pvp == _pvp || !pvp.hasArea( ))
00496         return;
00497 
00498     _pvp = pvp;
00499     EQINFO << "Pipe pvp set: " << _pvp << endl;
00500 }
00501 
00502 //===========================================================================
00503 // command handling
00504 //===========================================================================
00505 eq::net::CommandResult Pipe::_cmdConfigInitReply( eq::net::Command& command ) 
00506 {
00507     const eq::PipeConfigInitReplyPacket* packet = 
00508         command.getPacket<eq::PipeConfigInitReplyPacket>();
00509     EQVERB << "handle pipe configInit reply " << packet << endl;
00510 
00511     _error += packet->error;
00512     setPixelViewport( packet->pvp );
00513 
00514     if( packet->result )
00515         _state = STATE_RUNNING;
00516     else
00517         _state = STATE_INIT_FAILED;
00518 
00519     return eq::net::COMMAND_HANDLED;
00520 }
00521 
00522 eq::net::CommandResult Pipe::_cmdConfigExitReply( eq::net::Command& command ) 
00523 {
00524     const eq::PipeConfigExitReplyPacket* packet = 
00525         command.getPacket<eq::PipeConfigExitReplyPacket>();
00526     EQVERB << "handle pipe configExit reply " << packet << endl;
00527 
00528     if( packet->result )
00529         _state = STATE_EXIT_SUCCESS;
00530     else
00531         _state = STATE_EXIT_FAILED;
00532 
00533     return eq::net::COMMAND_HANDLED;
00534 }
00535 
00536 
00537 std::ostream& operator << ( std::ostream& os, const Pipe* pipe )
00538 {
00539     if( !pipe )
00540         return os;
00541     
00542     os << disableFlush << disableHeader << "pipe" << endl;
00543     os << "{" << endl << indent;
00544 
00545     const std::string& name = pipe->getName();
00546     if( !name.empty( ))
00547         os << "name     \"" << name << "\"" << endl;
00548 
00549     if( pipe->getPort() != EQ_UNDEFINED_UINT32 )
00550         os << "port     " << pipe->getPort() << endl;
00551         
00552     if( pipe->getDevice() != EQ_UNDEFINED_UINT32 )
00553         os << "device   " << pipe->getDevice() << endl;
00554     
00555     const eq::PixelViewport& pvp = pipe->getPixelViewport();
00556     if( pvp.isValid( ))
00557         os << "viewport " << pvp << endl;
00558 
00559     bool attrPrinted   = false;
00560     for( Pipe::IAttribute i = static_cast<Pipe::IAttribute>( 0 ); 
00561          i < Pipe::IATTR_ALL; 
00562          i = static_cast<Pipe::IAttribute>( static_cast<uint32_t>( i )+1))
00563     {
00564         const int value = pipe->getIAttribute( i );
00565         if( value == Global::instance()->getPipeIAttribute( i ))
00566             continue;
00567 
00568         if( !attrPrinted )
00569         {
00570             os << endl << "attributes" << endl;
00571             os << "{" << endl << indent;
00572             attrPrinted = true;
00573         }
00574         
00575         os << ( i==Pipe::IATTR_HINT_THREAD ? "hint_thread       " : "ERROR" )
00576            << static_cast<eq::IAttrValue>( value ) << endl;
00577     }
00578     
00579     if( attrPrinted )
00580         os << exdent << "}" << endl << endl;
00581 
00582     os << endl;
00583 
00584     const WindowVector& windows = pipe->getWindows();
00585     for( WindowVector::const_iterator i = windows.begin();
00586          i != windows.end(); ++i )
00587 
00588         os << *i;
00589 
00590     os << exdent << "}" << endl << enableHeader << enableFlush;
00591     return os;
00592 }
00593 
00594 }
00595 }
Generated on Mon Aug 10 18:58:40 2009 for Equalizer 0.9 by  doxygen 1.5.8