lib/client/pipe.cpp

00001 
00002 /* Copyright (c) 2005-2009, Stefan Eilemann <eile@equalizergraphics.com> 
00003  *
00004  * This library is free software; you can redistribute it and/or modify it under
00005  * the terms of the GNU Lesser General Public License version 2.1 as published
00006  * by the Free Software Foundation.
00007  *  
00008  * This library is distributed in the hope that it will be useful, but WITHOUT
00009  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00010  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00011  * details.
00012  * 
00013  * You should have received a copy of the GNU Lesser General Public License
00014  * along with this library; if not, write to the Free Software Foundation, Inc.,
00015  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00016  */
00017 
00018 #include "pipe.h"
00019 
00020 #include "client.h"
00021 #include "commands.h"
00022 #include "config.h"
00023 #include "frame.h"
00024 #include "global.h"
00025 #include "log.h"
00026 #include "nodeFactory.h"
00027 #include "pipeStatistics.h"
00028 #include "pipeVisitor.h"
00029 #include "packets.h"
00030 #include "server.h"
00031 #include "task.h"
00032 #include "view.h"
00033 #include "X11Connection.h"
00034 #include "window.h"
00035 
00036 #ifdef GLX
00037 #  include "glXPipe.h"
00038 #endif
00039 #ifdef WGL
00040 #  include "wglPipe.h"
00041 #endif
00042 #ifdef AGL
00043 #  include "aglPipe.h"
00044 #endif
00045 
00046 #include <eq/net/command.h>
00047 #include <sstream>
00048 
00049 namespace eq
00050 {
00051 namespace
00052 {
00053 static const Window* _ntCurrentWindow = 0;
00054 }
00055 
00057 typedef net::CommandFunc<Pipe> PipeFunc;
00060 Pipe::Pipe( Node* parent )
00061         : _osPipe( 0 )
00062         , _node( parent )
00063         , _windowSystem( WINDOW_SYSTEM_NONE )
00064         , _tasks( TASK_NONE )
00065         , _port( EQ_UNDEFINED_UINT32 )
00066         , _device( EQ_UNDEFINED_UINT32 )
00067         , _state( STATE_STOPPED )
00068         , _currentFrame( 0 )
00069         , _frameTime( 0 )
00070         , _waitTime( 0 )
00071         , _thread( 0 )
00072         , _pipeThreadQueue( 0 )
00073         , _currentWindow( 0 )
00074 {
00075     parent->_addPipe( this );
00076     EQINFO << " New eq::Pipe @" << (void*)this << std::endl;
00077 }
00078 
00079 Pipe::~Pipe()
00080 {
00081     _node->_removePipe( this );
00082     delete _thread;
00083     _thread = 0;
00084 }
00085 
00086 Config* Pipe::getConfig()
00087 {
00088     EQASSERT( _node );
00089     return (_node ? _node->getConfig() : 0);
00090 }
00091 const Config* Pipe::getConfig() const
00092 {
00093     EQASSERT( _node );
00094     return (_node ? _node->getConfig() : 0);
00095 }
00096 
00097 ClientPtr Pipe::getClient()
00098 {
00099     EQASSERT( _node );
00100     return (_node ? _node->getClient() : 0);
00101 }
00102 
00103 ServerPtr Pipe::getServer()
00104 {
00105     EQASSERT( _node );
00106     return (_node ? _node->getServer() : 0);
00107 }
00108 
00109 int64_t Pipe::getFrameTime() const
00110 {
00111     return getConfig()->getTime() - _frameTime;
00112 }
00113 
00114 VisitorResult Pipe::accept( PipeVisitor& visitor )
00115 { 
00116     VisitorResult result = visitor.visitPre( this );
00117     if( result != TRAVERSE_CONTINUE )
00118         return result;
00119 
00120     for( WindowVector::const_iterator i = _windows.begin(); 
00121          i != _windows.end(); ++i )
00122     {
00123         Window* window = *i;
00124         switch( window->accept( visitor ))
00125         {
00126             case TRAVERSE_TERMINATE:
00127                 return TRAVERSE_TERMINATE;
00128 
00129             case TRAVERSE_PRUNE:
00130                 result = TRAVERSE_PRUNE;
00131                 break;
00132                 
00133             case TRAVERSE_CONTINUE:
00134             default:
00135                 break;
00136         }
00137     }
00138 
00139     switch( visitor.visitPost( this ))
00140     {
00141         case TRAVERSE_TERMINATE:
00142             return TRAVERSE_TERMINATE;
00143 
00144         case TRAVERSE_PRUNE:
00145             return TRAVERSE_PRUNE;
00146                 
00147         case TRAVERSE_CONTINUE:
00148         default:
00149             break;
00150     }
00151 
00152     return result;
00153 }
00154 
00155 void Pipe::attachToSession( const uint32_t id, const uint32_t instanceID, 
00156                             net::Session* session )
00157 {
00158     net::Object::attachToSession( id, instanceID, session );
00159     
00160     net::CommandQueue* queue = getPipeThreadQueue();
00161 
00162     registerCommand( CMD_PIPE_CONFIG_INIT, 
00163                      PipeFunc( this, &Pipe::_cmdConfigInit ), queue );
00164     registerCommand( CMD_PIPE_CONFIG_EXIT, 
00165                      PipeFunc( this, &Pipe::_cmdConfigExit ), queue );
00166     registerCommand( CMD_PIPE_CREATE_WINDOW,
00167                      PipeFunc( this, &Pipe::_cmdCreateWindow ), queue );
00168     registerCommand( CMD_PIPE_DESTROY_WINDOW, 
00169                      PipeFunc( this, &Pipe::_cmdDestroyWindow ), queue );
00170     registerCommand( CMD_PIPE_FRAME_START,
00171                      PipeFunc( this, &Pipe::_cmdFrameStart ), queue );
00172     registerCommand( CMD_PIPE_FRAME_FINISH,
00173                      PipeFunc( this, &Pipe::_cmdFrameFinish ), queue );
00174     registerCommand( CMD_PIPE_FRAME_DRAW_FINISH, 
00175                      PipeFunc( this, &Pipe::_cmdFrameDrawFinish ), queue );
00176     registerCommand( CMD_PIPE_FRAME_START_CLOCK,
00177                      PipeFunc( this, &Pipe::_cmdFrameStartClock ), 0 );
00178 }
00179 
00180 void Pipe::_addWindow( Window* window )
00181 {
00182     EQASSERT( window->getPipe() == this );
00183     _windows.push_back( window );
00184 }
00185 
00186 void Pipe::_removeWindow( Window* window )
00187 {
00188     WindowVector::iterator iter = find( _windows.begin(), _windows.end(),
00189                                         window );
00190     EQASSERT( iter != _windows.end( ))
00191     
00192     _windows.erase( iter );
00193 }
00194 
00195 eq::Window* Pipe::_findWindow( const uint32_t id )
00196 {
00197     for( WindowVector::const_iterator i = _windows.begin(); 
00198          i != _windows.end(); ++i )
00199     {
00200         Window* window = *i;
00201         if( window->getID() == id )
00202             return window;
00203     }
00204     return 0;
00205 }
00206 
00207 bool Pipe::supportsWindowSystem( const WindowSystem windowSystem ) const
00208 {
00209     switch( windowSystem )
00210     {
00211 #ifdef GLX
00212         case WINDOW_SYSTEM_GLX: return true;
00213 #endif
00214 #ifdef AGL
00215         case WINDOW_SYSTEM_AGL: return true;
00216 #endif
00217 #ifdef WGL
00218         case WINDOW_SYSTEM_WGL: return true;
00219 #endif
00220         default:                return false;
00221     }
00222 }
00223 
00224 WindowSystem Pipe::selectWindowSystem() const
00225 {
00226     for( WindowSystem i=WINDOW_SYSTEM_NONE; i<WINDOW_SYSTEM_ALL; 
00227          i = (WindowSystem)((int)i+1) )
00228     {
00229         if( supportsWindowSystem( i ))
00230             return i;
00231     }
00232     EQABORT( "No supported window system found" );
00233     return WINDOW_SYSTEM_NONE;
00234 }
00235 
00236 void Pipe::_setupCommandQueue()
00237 {
00238     EQASSERT( _windowSystem != WINDOW_SYSTEM_NONE );
00239     if( !useMessagePump( ))
00240         return;
00241 
00242     EQINFO << "Pipe message pump set up for " << _windowSystem << std::endl;
00243 
00244     // Switch the node thread message pumps for non-threaded pipes
00245     if( !_thread )
00246     {
00247         Config* config = getConfig();
00248         config->setWindowSystem( _windowSystem );
00249         return;
00250     }
00251 
00252     if( _windowSystem == WINDOW_SYSTEM_AGL ) //AGL needs dispatch from node
00253     {
00254         Config* config = getConfig();
00255         config->setWindowSystem( _windowSystem );
00256     }
00257     
00258     EQASSERT( _pipeThreadQueue );
00259     _pipeThreadQueue->setWindowSystem( _windowSystem );
00260 }
00261 
00262 
00263 void* Pipe::_runThread()
00264 {
00265     EQINFO << "Entered pipe thread" << std::endl;
00266     CHECK_THREAD( _pipeThread );
00267 
00268     Config* config = getConfig();
00269     EQASSERT( config );
00270     EQASSERT( _pipeThreadQueue );
00271 
00272     while( _thread->isRunning( ))
00273     {
00274         const int64_t startWait = config->getTime();
00275         net::Command* command = _pipeThreadQueue->pop();
00276         _waitTime += ( config->getTime() - startWait );
00277 
00278         switch( config->invokeCommand( *command ))
00279         {
00280             case net::COMMAND_HANDLED:
00281             case net::COMMAND_DISCARD:
00282                 break;
00283 
00284             case net::COMMAND_ERROR:
00285                 EQABORT( "Error handling command packet" );
00286                 break;
00287 
00288             default:
00289                 EQABORT( "Unknown command result" );
00290                 break;
00291         }
00292         command->release();
00293     }
00294 
00295     EQUNREACHABLE; // since we are exited from _cmdConfigExit
00296     return EXIT_SUCCESS;
00297 }
00298 
00299 net::CommandQueue* Pipe::getPipeThreadQueue()
00300 {
00301     if( !_thread )
00302         return _node->getNodeThreadQueue();
00303 
00304     return _pipeThreadQueue;
00305 }
00306 
00307 Frame* Pipe::getFrame( const net::ObjectVersion& frameVersion, const Eye eye )
00308 {
00309     Frame* frame = _frames[ frameVersion.id ];
00310 
00311     if( !frame )
00312     {
00313         net::Session* session = getSession();
00314         frame = new Frame();
00315         
00316         EQCHECK( session->mapObject( frame, frameVersion.id ));
00317         _frames[ frameVersion.id ] = frame;
00318     }
00319     
00320     frame->sync( frameVersion.version );
00321 
00322     const net::ObjectVersion& data = frame->getDataVersion( eye );
00323     EQASSERT( data.id != EQ_ID_INVALID );
00324     FrameData* frameData = getNode()->getFrameData( data ); 
00325     EQASSERT( frameData );
00326 
00327     frame->setData( frameData );
00328     return frame;
00329 }
00330 
00331 void Pipe::flushFrames()
00332 {
00333     net::Session* session = getSession();
00334 
00335     for( FrameHash::const_iterator i = _frames.begin(); i != _frames.end(); ++i)
00336     {
00337         Frame* frame = i->second;
00338 
00339         frame->flush();
00340         session->unmapObject( frame );
00341         delete frame;
00342     }
00343     _frames.clear();
00344 }
00345 
00346 View* Pipe::getView( const net::ObjectVersion& viewVersion )
00347 {
00348     CHECK_THREAD( _pipeThread );
00349     if( viewVersion.id == EQ_ID_INVALID )
00350         return 0;
00351 
00352     View* view = _views[ viewVersion.id ];
00353 
00354     if( !view )
00355     {
00356         NodeFactory* nodeFactory = Global::getNodeFactory();
00357         view = nodeFactory->createView();
00358         view->_pipe = this;
00359 
00360         net::Session* session = getSession();
00361         EQCHECK( session->mapObject( view, viewVersion.id ));
00362 
00363         _views[ viewVersion.id ] = view;
00364     }
00365     
00366     view->sync( viewVersion.version );
00367     return view;
00368 }
00369 
00370 void Pipe::_releaseViews()
00371 {
00372     CHECK_THREAD( _pipeThread );
00373     for( bool changed = true; changed; )
00374     {
00375         changed = false;
00376         for( ViewHash::iterator i = _views.begin(); 
00377              i != _views.end(); ++i )
00378         {
00379             View* view = i->second;
00380             if( view->getVersion() + 20 > view->getHeadVersion( ))
00381                 continue;
00382 
00383             // release view to avoid memory leaks due to deltas piling up.
00384             net::Session* session = getSession();
00385             session->unmapObject( view );
00386             
00387             _views.erase( i );
00388 
00389             NodeFactory* nodeFactory = Global::getNodeFactory();
00390             nodeFactory->releaseView( view );
00391 
00392             changed = true;
00393             break;
00394         }
00395     }
00396 }
00397 
00398 void Pipe::_flushViews()
00399 {
00400     CHECK_THREAD( _pipeThread );
00401     NodeFactory*  nodeFactory = Global::getNodeFactory();
00402     net::Session* session     = getSession();
00403 
00404     for( ViewHash::const_iterator i = _views.begin(); i != _views.end(); ++i)
00405     {
00406         View* view = i->second;
00407 
00408         session->unmapObject( view );
00409         nodeFactory->releaseView( view );
00410     }
00411     _views.clear();
00412 }
00413 
00414 bool Pipe::isCurrent( const Window* window ) const
00415 {
00416     if( isThreaded( ))
00417         return ( window == _currentWindow );
00418     return ( window == _ntCurrentWindow );
00419 }
00420 
00421 void Pipe::setCurrent( const Window* window ) const
00422 {
00423     if( isThreaded( ))
00424         _currentWindow = window;
00425     else
00426         _ntCurrentWindow = window;
00427 }
00428 
00429 void Pipe::startThread()
00430 {
00431     _pipeThreadQueue = new CommandQueue;
00432     _thread          = new PipeThread( this );
00433 
00434     _thread->start();
00435 }
00436 
00437 void Pipe::joinThread()
00438 {
00439     if( !_thread )
00440         return;
00441 
00442     _thread->join();
00443     delete _thread;
00444     _thread = 0;
00445 
00446     delete _pipeThreadQueue;
00447     _pipeThreadQueue = 0;
00448 }
00449 
00450 WGLEWContext* Pipe::wglewGetContext()
00451 { 
00452     return _osPipe->wglewGetContext();
00453 }
00454 
00455 void Pipe::waitExited() const
00456 {
00457     _state.waitEQ( STATE_STOPPED );
00458 }
00459 
00460 bool Pipe::isRunning() const
00461 {
00462     return (_state == STATE_RUNNING);
00463 }
00464 
00465 void Pipe::waitFrameFinished( const uint32_t frameNumber ) const
00466 {
00467     _finishedFrame.waitGE( frameNumber );
00468 }
00469 
00470 void Pipe::waitFrameLocal( const uint32_t frameNumber ) const
00471 {
00472     _unlockedFrame.waitGE( frameNumber );
00473 }
00474 
00475 uint32_t Pipe::getFinishedFrame() const
00476 {
00477     return _finishedFrame.get();
00478 }
00479 
00480 
00481 //---------------------------------------------------------------------------
00482 // pipe-thread methods
00483 //---------------------------------------------------------------------------
00484 bool Pipe::configInit( const uint32_t initID )
00485 {
00486     CHECK_THREAD( _pipeThread );
00487 
00488     EQASSERT( !_osPipe );
00489 
00490     OSPipe* osPipe = 0;
00491 
00492     switch( _windowSystem )
00493     {
00494 #ifdef GLX
00495         case WINDOW_SYSTEM_GLX:
00496             EQINFO << "Pipe: using GLXWindow" << std::endl;
00497             osPipe = new GLXPipe( this );
00498             break;
00499 #endif
00500 
00501 #ifdef AGL
00502         case WINDOW_SYSTEM_AGL:
00503             EQINFO << "Pipe: using AGLWindow" << std::endl;
00504             osPipe = new AGLPipe( this );
00505             break;
00506 #endif
00507 
00508 #ifdef WGL
00509         case WINDOW_SYSTEM_WGL:
00510             EQINFO << "Pipe: using WGLWindow" << std::endl;
00511             osPipe = new WGLPipe( this );
00512             break;
00513 #endif
00514 
00515         default:
00516             EQERROR << "Unknown window system: " << _windowSystem << std::endl;
00517             setErrorMessage( "Unknown window system" );
00518             return false;
00519     }
00520 
00521     EQASSERT( osPipe );
00522     if( !osPipe->configInit( ))
00523     {
00524         setErrorMessage( "OS Pipe initialization failed: " + 
00525                          osPipe->getErrorMessage( ));
00526         EQERROR << _error << std::endl;
00527         delete osPipe;
00528         return false;
00529     }
00530 
00531     setOSPipe( osPipe );
00532     return true;
00533 }
00534 
00535 bool Pipe::configExit()
00536 {
00537     CHECK_THREAD( _pipeThread );
00538 
00539     if( _osPipe )
00540     {
00541         _osPipe->configExit( );
00542 
00543         delete _osPipe;
00544         _osPipe = 0;
00545         return true;
00546     }
00547     //else
00548 
00549     EQWARN << "Window system "<< _windowSystem <<" not initialized" << std::endl;
00550     return false;
00551 }
00552 
00553 
00554 void Pipe::frameStart( const uint32_t frameID, const uint32_t frameNumber ) 
00555 { 
00556     CHECK_THREAD( _pipeThread );
00557 
00558     const Node* node = getNode();
00559     switch( node->getIAttribute( Node::IATTR_THREAD_MODEL ))
00560     {
00561         case ASYNC:      // No sync, release immediately
00562             releaseFrameLocal( frameNumber ); 
00563             break;
00564 
00565         case DRAW_SYNC:  // Sync, release in frameDrawFinish
00566         case LOCAL_SYNC: // Sync, release in frameFinish
00567             node->waitFrameStarted( frameNumber );
00568             break;
00569 
00570         default:
00571             EQUNIMPLEMENTED;
00572     }
00573 
00574     startFrame( frameNumber );
00575 }
00576 
00577 void Pipe::frameDrawFinish( const uint32_t frameID, const uint32_t frameNumber )
00578 {
00579     const Node* node = getNode();
00580     switch( node->getIAttribute( Node::IATTR_THREAD_MODEL ))
00581     {
00582         case ASYNC:      // released in frameStart
00583             break;
00584 
00585         case DRAW_SYNC:  // release
00586             releaseFrameLocal( frameNumber ); 
00587             break;
00588 
00589         case LOCAL_SYNC: // release in frameFinish
00590             break;
00591 
00592         default:
00593             EQUNIMPLEMENTED;
00594     }
00595 }
00596 
00597 void Pipe::frameFinish( const uint32_t frameID, const uint32_t frameNumber )
00598 {
00599     const Node* node = getNode();
00600     switch( node->getIAttribute( Node::IATTR_THREAD_MODEL ))
00601     {
00602         case ASYNC:      // released in frameStart
00603             break;
00604 
00605         case DRAW_SYNC:  // released in frameDrawFinish
00606             break;
00607 
00608         case LOCAL_SYNC: // release
00609             releaseFrameLocal( frameNumber ); 
00610             break;
00611 
00612         default:
00613             EQUNIMPLEMENTED;
00614     }
00615 
00616     // Global release
00617     releaseFrame( frameNumber );
00618 }
00619 
00620 void Pipe::startFrame( const uint32_t frameNumber )
00621 { 
00622     CHECK_THREAD( _pipeThread );
00623     _currentFrame = frameNumber; 
00624     EQLOG( LOG_TASKS ) << "---- Started Frame ---- " << frameNumber << std::endl;
00625 }
00626 
00627 void Pipe::releaseFrame( const uint32_t frameNumber )
00628 { 
00629     CHECK_THREAD( _pipeThread );
00630     _finishedFrame = frameNumber; 
00631     EQLOG( LOG_TASKS ) << "---- Finished Frame --- " << frameNumber << std::endl;
00632 }
00633 
00634 void Pipe::releaseFrameLocal( const uint32_t frameNumber )
00635 { 
00636     CHECK_THREAD( _pipeThread );
00637     EQASSERT( _unlockedFrame + 1 == frameNumber );
00638 
00639     _unlockedFrame = frameNumber;
00640     EQLOG( LOG_TASKS ) << "---- Unlocked Frame --- " << _unlockedFrame.get()
00641                        << std::endl;
00642 }
00643 
00644 //---------------------------------------------------------------------------
00645 // command handlers
00646 //---------------------------------------------------------------------------
00647 net::CommandResult Pipe::_cmdCreateWindow(  net::Command& command  )
00648 {
00649     const PipeCreateWindowPacket* packet = 
00650         command.getPacket<PipeCreateWindowPacket>();
00651     EQLOG( LOG_INIT ) << "Create window " << packet << std::endl;
00652 
00653     Window* window = Global::getNodeFactory()->createWindow( this );
00654     getConfig()->attachObject( window, packet->windowID, EQ_ID_INVALID );
00655     
00656     EQASSERT( !_windows.empty( ));
00657     if( window != _windows[0] )
00658         window->setSharedContextWindow( _windows[0] );
00659 
00660     return net::COMMAND_HANDLED;
00661 }
00662 
00663 net::CommandResult Pipe::_cmdDestroyWindow(  net::Command& command  )
00664 {
00665     const PipeDestroyWindowPacket* packet =
00666         command.getPacket<PipeDestroyWindowPacket>();
00667     EQLOG( LOG_INIT ) << "Destroy window " << packet << std::endl;
00668 
00669     Window* window = _findWindow( packet->windowID );
00670     EQASSERT( window );
00671 
00672     // re-set shared windows accordingly
00673     Window* newSharedWindow = 0;
00674     for( WindowVector::const_iterator i = _windows.begin(); 
00675          i != _windows.end(); ++i )
00676     {
00677         Window* candidate = *i;
00678         
00679         if( candidate == window )
00680             continue; // ignore
00681         if( !newSharedWindow && candidate->getSharedContextWindow() == window )
00682         {
00683             newSharedWindow = candidate;
00684             newSharedWindow->setSharedContextWindow( 0 );
00685         }
00686         else if( newSharedWindow && 
00687                  candidate->getSharedContextWindow() == window )
00688         {
00689             candidate->setSharedContextWindow( newSharedWindow );
00690         }
00691 
00692         EQASSERT( candidate->getSharedContextWindow() != window );
00693     }
00694 
00695     Config* config = getConfig();
00696     config->detachObject( window );
00697     Global::getNodeFactory()->releaseWindow( window );
00698 
00699     return net::COMMAND_HANDLED;
00700 }
00701 
00702 net::CommandResult Pipe::_cmdConfigInit( net::Command& command )
00703 {
00704     CHECK_THREAD( _pipeThread );
00705     const PipeConfigInitPacket* packet = 
00706         command.getPacket<PipeConfigInitPacket>();
00707     EQLOG( LOG_INIT ) << "Init pipe " << packet << std::endl;
00708 
00709     PipeConfigInitReplyPacket reply;
00710     _error.clear();
00711 
00712     _node->waitInitialized();
00713 
00714     if( _node->isRunning( ))
00715     {
00716         _name   = packet->name;
00717         _port   = packet->port;
00718         _device = packet->device;
00719         _tasks  = packet->tasks;
00720         _pvp    = packet->pvp;
00721         
00722         _currentFrame  = packet->frameNumber;
00723         _finishedFrame = packet->frameNumber;
00724         _unlockedFrame = packet->frameNumber;
00725         
00726         _state = STATE_INITIALIZING;
00727 
00728         _windowSystem = selectWindowSystem();
00729         _setupCommandQueue();
00730 
00731         reply.result  = configInit( packet->initID );
00732     }
00733     else
00734         reply.result = false;
00735 
00736     EQLOG( LOG_INIT ) << "TASK pipe config init reply " << &reply << std::endl;
00737 
00738     net::NodePtr node = command.getNode();
00739 
00740     if( !_osPipe || !reply.result )
00741     {
00742         send( node, reply, _error );
00743         return net::COMMAND_HANDLED;
00744     }
00745 
00746     _state = STATE_RUNNING;
00747 
00748     reply.pvp = _pvp;
00749     send( node, reply );
00750     return net::COMMAND_HANDLED;
00751 }
00752 
00753 net::CommandResult Pipe::_cmdConfigExit( net::Command& command )
00754 {
00755     CHECK_THREAD( _pipeThread );
00756     const PipeConfigExitPacket* packet = 
00757         command.getPacket<PipeConfigExitPacket>();
00758     EQLOG( LOG_INIT ) << "TASK pipe config exit " << packet << std::endl;
00759 
00760     PipeConfigExitReplyPacket reply;
00761     reply.result = configExit();
00762     _flushViews();
00763 
00764     _state = STATE_STOPPED;
00765 
00766     send( command.getNode(), reply );
00767 
00768     if( packet->exitThread )
00769     {
00770         EQASSERT( _thread );
00771 
00772         // cleanup
00773         command.release();
00774         _pipeThreadQueue->flush();
00775         
00776         EQINFO << "Leaving pipe thread" << std::endl;
00777         _thread->exit( EXIT_SUCCESS );
00778         EQUNREACHABLE;
00779     }
00780 
00781     return net::COMMAND_HANDLED;
00782 }
00783 
00784 net::CommandResult Pipe::_cmdFrameStartClock( net::Command& command )
00785 {
00786     EQVERB << "start frame clock" << std::endl;
00787     _frameTimeMutex.set();
00788     _frameTimes.push_back( getConfig()->getTime( ));
00789     _frameTimeMutex.unset();
00790     return net::COMMAND_HANDLED;
00791 }
00792 
00793 net::CommandResult Pipe::_cmdFrameStart( net::Command& command )
00794 {
00795     CHECK_THREAD( _pipeThread );
00796     const PipeFrameStartPacket* packet = 
00797         command.getPacket<PipeFrameStartPacket>();
00798     EQVERB << "handle pipe frame start " << packet << std::endl;
00799     EQLOG( LOG_TASKS ) << "---- TASK start frame ---- " << packet << std::endl;
00800 
00801     const int64_t lastFrameTime = _frameTime;
00802 
00803     _frameTimeMutex.set();
00804     EQASSERT( !_frameTimes.empty( ));
00805 
00806     _frameTime = _frameTimes.front();
00807     _frameTimes.pop_front();
00808     _frameTimeMutex.unset();
00809 
00810     if( lastFrameTime > 0 )
00811     {
00812         PipeStatistics waitEvent( Statistic::PIPE_IDLE, this );
00813         waitEvent.event.data.statistic.idleTime  = _waitTime;
00814         waitEvent.event.data.statistic.totalTime = _frameTime - lastFrameTime;
00815     }
00816     _waitTime = 0;
00817 
00818     const uint32_t frameNumber = packet->frameNumber;
00819     EQASSERTINFO( _currentFrame + 1 == frameNumber,
00820                   "current " << _currentFrame << " start " << frameNumber );
00821 
00822     frameStart( packet->frameID, frameNumber );
00823     return net::COMMAND_HANDLED;
00824 }
00825 
00826 net::CommandResult Pipe::_cmdFrameFinish( net::Command& command )
00827 {
00828     CHECK_THREAD( _pipeThread );
00829     const PipeFrameFinishPacket* packet =
00830         command.getPacket<PipeFrameFinishPacket>();
00831     EQLOG( LOG_TASKS ) << "---- TASK finish frame --- " << packet << std::endl;
00832 
00833     const uint32_t frameNumber = packet->frameNumber;
00834     EQASSERTINFO( _currentFrame >= frameNumber, 
00835                   "current " << _currentFrame << " finish " << frameNumber );
00836 
00837     frameFinish( packet->frameID, frameNumber );
00838     
00839     EQASSERTINFO( _finishedFrame >= frameNumber, 
00840                   "Pipe::frameFinish() did not release frame " << frameNumber );
00841 
00842     if( _unlockedFrame < frameNumber )
00843     {
00844         EQWARN << "Finished frame was not locally unlocked, enforcing unlock" 
00845                << std::endl << "    unlocked " << _unlockedFrame.get()
00846                << " done " << frameNumber << std::endl;
00847         releaseFrameLocal( frameNumber );
00848     }
00849 
00850     if( _finishedFrame < frameNumber )
00851     {
00852         EQWARN << "Finished frame was not released, enforcing unlock" << std::endl;
00853         releaseFrame( frameNumber );
00854     }
00855 
00856     _releaseViews();
00857     return net::COMMAND_HANDLED;
00858 }
00859 
00860 net::CommandResult Pipe::_cmdFrameDrawFinish( net::Command& command )
00861 {
00862     CHECK_THREAD( _pipeThread );
00863     PipeFrameDrawFinishPacket* packet = 
00864         command.getPacket< PipeFrameDrawFinishPacket >();
00865     EQLOG( LOG_TASKS ) << "TASK draw finish " << getName() <<  " " << packet
00866                        << std::endl;
00867 
00868     frameDrawFinish( packet->frameID, packet->frameNumber );
00869     return net::COMMAND_HANDLED;
00870 }
00871 
00872 }
Generated on Mon Aug 10 18:58:40 2009 for Equalizer 0.9 by  doxygen 1.5.8