server/pipe.cpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include <pthread.h>
00019 #include "pipe.h"
00020
00021 #include "channel.h"
00022 #include "config.h"
00023 #include "global.h"
00024 #include "log.h"
00025 #include "node.h"
00026 #include "paths.h"
00027 #include "pipeVisitor.h"
00028 #include "window.h"
00029
00030 #include <eq/client/commands.h>
00031 #include <eq/client/packets.h>
00032 #include <eq/net/command.h>
00033
00034 using namespace eq::base;
00035 using namespace std;
00036
00037 namespace eq
00038 {
00039 namespace server
00040 {
00041 typedef net::CommandFunc<Pipe> PipeFunc;
00042
00043 #define MAKE_ATTR_STRING( attr ) ( string("EQ_PIPE_") + #attr )
00044 std::string Pipe::_iAttributeStrings[IATTR_ALL] =
00045 {
00046 MAKE_ATTR_STRING( IATTR_HINT_THREAD ),
00047 MAKE_ATTR_STRING( IATTR_FILL1 ),
00048 MAKE_ATTR_STRING( IATTR_FILL2 )
00049 };
00050
00051 void Pipe::_construct()
00052 {
00053 _active = 0;
00054 _node = 0;
00055 _tasks = eq::TASK_NONE;
00056 _port = EQ_UNDEFINED_UINT32;
00057 _device = EQ_UNDEFINED_UINT32;
00058 _lastDrawWindow = 0;
00059
00060 EQINFO << "New pipe @" << (void*)this << endl;
00061 }
00062
00063 Pipe::Pipe()
00064 {
00065 _construct();
00066
00067 const Global* global = Global::instance();
00068 for( int i=0; i<IATTR_ALL; ++i )
00069 _iAttributes[i] = global->getPipeIAttribute(
00070 static_cast<IAttribute>( i ));
00071 }
00072
00073 Pipe::Pipe( const Pipe& from, Node* node )
00074 : eq::net::Object()
00075 {
00076 _construct();
00077
00078 _name = from._name;
00079 _port = from._port;
00080 _device = from._device;
00081 _pvp = from._pvp;
00082
00083 node->addPipe( this );
00084
00085 for( int i=0; i<IATTR_ALL; ++i )
00086 _iAttributes[i] = from._iAttributes[i];
00087
00088 const WindowVector& windows = from.getWindows();
00089 for( WindowVector::const_iterator i = windows.begin();
00090 i != windows.end(); ++i )
00091 {
00092 new Window( **i, this );
00093 }
00094 }
00095
00096 Pipe::~Pipe()
00097 {
00098 EQINFO << "Delete pipe @" << (void*)this << endl;
00099
00100 if( _node )
00101 _node->removePipe( this );
00102
00103 for( vector<Window*>::const_iterator i = _windows.begin();
00104 i != _windows.end(); ++i )
00105 {
00106 Window* window = *i;
00107
00108 window->_pipe = 0;
00109 delete window;
00110 }
00111 _windows.clear();
00112 }
00113
00114 void Pipe::attachToSession( const uint32_t id, const uint32_t instanceID,
00115 eq::net::Session* session )
00116 {
00117 eq::net::Object::attachToSession( id, instanceID, session );
00118
00119 eq::net::CommandQueue* queue = getCommandThreadQueue();
00120
00121 registerCommand( eq::CMD_PIPE_CONFIG_INIT_REPLY,
00122 PipeFunc( this, &Pipe::_cmdConfigInitReply ), queue );
00123 registerCommand( eq::CMD_PIPE_CONFIG_EXIT_REPLY,
00124 PipeFunc( this, &Pipe::_cmdConfigExitReply ), queue );
00125 }
00126
00127 void Pipe::addWindow( Window* window )
00128 {
00129 EQASSERT( window->getChannels().empty( ));
00130
00131 _windows.push_back( window );
00132 window->_pipe = this;
00133 window->notifyViewportChanged();
00134 }
00135
00136 bool Pipe::removeWindow( Window* window )
00137 {
00138 EQASSERT( window->getChannels().empty( ));
00139
00140 vector<Window*>::iterator i = find( _windows.begin(), _windows.end(),
00141 window );
00142 if( i == _windows.end( ))
00143 return false;
00144
00145 _windows.erase( i );
00146 window->_pipe = 0;
00147 return true;
00148 }
00149
00150 Server* Pipe::getServer()
00151 {
00152 EQASSERT( _node );
00153 return _node ? _node->getServer() : 0;
00154 }
00155 const Server* Pipe::getServer() const
00156 {
00157 EQASSERT( _node );
00158 return _node ? _node->getServer() : 0;
00159 }
00160
00161 Config* Pipe::getConfig()
00162 {
00163 EQASSERT( _node );
00164 return (_node ? _node->getConfig() : 0);
00165 }
00166 const Config* Pipe::getConfig() const
00167 {
00168 EQASSERT( _node );
00169 return (_node ? _node->getConfig() : 0);
00170 }
00171
00172 net::CommandQueue* Pipe::getServerThreadQueue()
00173 {
00174 EQASSERT( _node );
00175 return _node->getServerThreadQueue();
00176 }
00177
00178 net::CommandQueue* Pipe::getCommandThreadQueue()
00179 {
00180 EQASSERT( _node );
00181 return _node->getCommandThreadQueue();
00182 }
00183
00184 PipePath Pipe::getPath() const
00185 {
00186 EQASSERT( _node );
00187 PipePath path( _node->getPath( ));
00188
00189 const PipeVector& pipes = _node->getPipes();
00190 PipeVector::const_iterator i = std::find( pipes.begin(), pipes.end(),
00191 this );
00192 EQASSERT( i != pipes.end( ));
00193 path.pipeIndex = std::distance( pipes.begin(), i );
00194 return path;
00195 }
00196
00197 Channel* Pipe::getChannel( const ChannelPath& path )
00198 {
00199 EQASSERT( _windows.size() > path.windowIndex );
00200
00201 if( _windows.size() <= path.windowIndex )
00202 return 0;
00203
00204 return _windows[ path.windowIndex ]->getChannel( path );
00205 }
00206
00207 namespace
00208 {
00209 template< class C, class V >
00210 VisitorResult _accept( C* pipe, V& visitor )
00211 {
00212 VisitorResult result = visitor.visitPre( pipe );
00213 if( result != TRAVERSE_CONTINUE )
00214 return result;
00215
00216 const WindowVector& windows = pipe->getWindows();
00217 for( WindowVector::const_iterator i = windows.begin();
00218 i != windows.end(); ++i )
00219 {
00220 switch( (*i)->accept( visitor ))
00221 {
00222 case TRAVERSE_TERMINATE:
00223 return TRAVERSE_TERMINATE;
00224
00225 case TRAVERSE_PRUNE:
00226 result = TRAVERSE_PRUNE;
00227 break;
00228
00229 case TRAVERSE_CONTINUE:
00230 default:
00231 break;
00232 }
00233 }
00234
00235 switch( visitor.visitPost( pipe ))
00236 {
00237 case TRAVERSE_TERMINATE:
00238 return TRAVERSE_TERMINATE;
00239
00240 case TRAVERSE_PRUNE:
00241 return TRAVERSE_PRUNE;
00242
00243 case TRAVERSE_CONTINUE:
00244 default:
00245 break;
00246 }
00247
00248 return result;
00249 }
00250 }
00251
00252 VisitorResult Pipe::accept( PipeVisitor& visitor )
00253 {
00254 return _accept( this, visitor );
00255 }
00256
00257 VisitorResult Pipe::accept( ConstPipeVisitor& visitor ) const
00258 {
00259 return _accept( this, visitor );
00260 }
00261
00262 void Pipe::activate()
00263 {
00264 EQASSERT( _node );
00265
00266 ++_active;
00267 if( _node )
00268 _node->activate();
00269
00270 EQLOG( LOG_VIEW ) << "activate: " << _active << std::endl;
00271 }
00272
00273 void Pipe::deactivate()
00274 {
00275 EQASSERT( _active != 0 );
00276 EQASSERT( _node );
00277
00278 --_active;
00279 if( _node )
00280 _node->deactivate();
00281
00282 EQLOG( LOG_VIEW ) << "deactivate: " << _active << std::endl;
00283 };
00284
00285 void Pipe::addTasks( const uint32_t tasks )
00286 {
00287 EQASSERT( _node );
00288 _tasks |= tasks;
00289 _node->addTasks( tasks );
00290 }
00291
00292 void Pipe::send( net::ObjectPacket& packet )
00293 {
00294 EQASSERT( _node );
00295 packet.objectID = getID();
00296 _node->send( packet );
00297 }
00298
00299 void Pipe::_send( net::ObjectPacket& packet, const std::string& string )
00300 {
00301 EQASSERT( _node );
00302 packet.objectID = getID();
00303 _node->send( packet, string );
00304 }
00305
00306
00307
00308
00309
00310
00311
00312
00313
00314 void Pipe::updateRunning( const uint32_t initID, const uint32_t frameNumber )
00315 {
00316 if( !isActive() && _state == STATE_STOPPED )
00317 return;
00318
00319 _error.clear();
00320
00321 if( isActive() && _state != STATE_RUNNING )
00322 _configInit( initID, frameNumber );
00323
00324
00325 for( WindowVector::const_iterator i = _windows.begin();
00326 i != _windows.end(); ++i )
00327 {
00328 (*i)->updateRunning( initID );
00329 }
00330
00331 if( !isActive( ))
00332 _configExit();
00333 }
00334
00335 bool Pipe::syncRunning()
00336 {
00337 if( !isActive() && _state == STATE_STOPPED )
00338 return true;
00339
00340
00341 bool success = true;
00342 for( WindowVector::const_iterator i = _windows.begin();
00343 i != _windows.end(); ++i )
00344 {
00345 Window* window = *i;
00346 if( !window->syncRunning( ))
00347 {
00348 _error += " window " + window->getName() + ": '" +
00349 window->getErrorMessage() + '\'';
00350 success = false;
00351 }
00352 }
00353
00354 if( isActive() && _state != STATE_RUNNING && !_syncConfigInit( ))
00355
00356 success = false;
00357
00358 if( !isActive() && !_syncConfigExit( ))
00359
00360 success = false;
00361
00362 EQASSERT( _state == STATE_RUNNING || _state == STATE_STOPPED ||
00363 _state == STATE_INIT_FAILED );
00364 return success;
00365 }
00366
00367
00368
00369
00370 void Pipe::_configInit( const uint32_t initID, const uint32_t frameNumber )
00371 {
00372 EQASSERT( _state == STATE_STOPPED );
00373 _state = STATE_INITIALIZING;
00374
00375 getConfig()->registerObject( this );
00376
00377 EQLOG( LOG_INIT ) << "Create pipe" << std::endl;
00378 eq::NodeCreatePipePacket createPipePacket;
00379 createPipePacket.objectID = _node->getID();
00380 createPipePacket.pipeID = getID();
00381 createPipePacket.threaded = getIAttribute( IATTR_HINT_THREAD );
00382 _node->send( createPipePacket );
00383
00384 EQLOG( LOG_INIT ) << "Init pipe" << std::endl;
00385 eq::PipeConfigInitPacket packet;
00386 packet.initID = initID;
00387 packet.port = _port;
00388 packet.device = _device;
00389 packet.tasks = _tasks;
00390 packet.pvp = _pvp;
00391 packet.frameNumber = frameNumber;
00392 _send( packet, _name );
00393 }
00394
00395 bool Pipe::_syncConfigInit()
00396 {
00397 EQASSERT( _state == STATE_INITIALIZING || _state == STATE_INIT_SUCCESS ||
00398 _state == STATE_INIT_FAILED );
00399
00400 _state.waitNE( STATE_INITIALIZING );
00401
00402 const bool success = ( _state == STATE_INIT_SUCCESS );
00403 if( success )
00404 _state = STATE_RUNNING;
00405 else
00406 EQWARN << "Pipe initialization failed: " << _error << endl;
00407
00408 return success;
00409 }
00410
00411
00412
00413
00414 void Pipe::_configExit()
00415 {
00416 EQASSERT( _state == STATE_RUNNING || _state == STATE_INIT_FAILED );
00417 _state = STATE_EXITING;
00418
00419 EQLOG( LOG_INIT ) << "Exit pipe" << std::endl;
00420 eq::PipeConfigExitPacket packet;
00421 packet.exitThread = ( getIAttribute( IATTR_HINT_THREAD ) != eq::OFF );
00422 send( packet );
00423
00424 EQLOG( LOG_INIT ) << "Destroy pipe" << std::endl;
00425 eq::NodeDestroyPipePacket destroyPipePacket;
00426 destroyPipePacket.objectID = _node->getID();
00427 destroyPipePacket.pipeID = getID();
00428 _node->send( destroyPipePacket );
00429 }
00430
00431 bool Pipe::_syncConfigExit()
00432 {
00433 EQASSERT( _state == STATE_EXITING || _state == STATE_EXIT_SUCCESS ||
00434 _state == STATE_EXIT_FAILED );
00435
00436 _state.waitNE( STATE_EXITING );
00437 const bool success = ( _state == STATE_EXIT_SUCCESS );
00438 EQASSERT( success || _state == STATE_EXIT_FAILED );
00439
00440 getConfig()->deregisterObject( this );
00441
00442 _state = STATE_STOPPED;
00443 _tasks = eq::TASK_NONE;
00444 return success;
00445 }
00446
00447
00448
00449
00450 void Pipe::update( const uint32_t frameID, const uint32_t frameNumber )
00451 {
00452 EQASSERT( _state == STATE_RUNNING );
00453 EQASSERT( _active > 0 );
00454
00455 eq::PipeFrameStartClockPacket startClockPacket;
00456 send( startClockPacket );
00457
00458 eq::PipeFrameStartPacket startPacket;
00459 startPacket.frameID = frameID;
00460 startPacket.frameNumber = frameNumber;
00461 send( startPacket );
00462 EQLOG( eq::LOG_TASKS ) << "TASK pipe start frame " << &startPacket << endl;
00463
00464 for( vector< Window* >::const_iterator i = _windows.begin();
00465 i != _windows.end(); ++i )
00466 {
00467 Window* window = *i;
00468 if( window->isActive( ))
00469 window->updateDraw( frameID, frameNumber );
00470 }
00471
00472 for( vector< Window* >::const_iterator i = _windows.begin();
00473 i != _windows.end(); ++i )
00474 {
00475 Window* window = *i;
00476 if( window->isActive( ))
00477 window->updatePost( frameID, frameNumber );
00478 }
00479
00480 eq::PipeFrameFinishPacket finishPacket;
00481 finishPacket.frameID = frameID;
00482 finishPacket.frameNumber = frameNumber;
00483
00484 send( finishPacket );
00485 EQLOG( eq::LOG_TASKS ) << "TASK pipe finish frame " << &finishPacket
00486 << endl;
00487 _lastDrawWindow = 0;
00488 }
00489
00490
00491
00492
00493 void Pipe::setPixelViewport( const eq::PixelViewport& pvp )
00494 {
00495 if( pvp == _pvp || !pvp.hasArea( ))
00496 return;
00497
00498 _pvp = pvp;
00499 EQINFO << "Pipe pvp set: " << _pvp << endl;
00500 }
00501
00502
00503
00504
00505 eq::net::CommandResult Pipe::_cmdConfigInitReply( eq::net::Command& command )
00506 {
00507 const eq::PipeConfigInitReplyPacket* packet =
00508 command.getPacket<eq::PipeConfigInitReplyPacket>();
00509 EQVERB << "handle pipe configInit reply " << packet << endl;
00510
00511 _error += packet->error;
00512 setPixelViewport( packet->pvp );
00513
00514 if( packet->result )
00515 _state = STATE_RUNNING;
00516 else
00517 _state = STATE_INIT_FAILED;
00518
00519 return eq::net::COMMAND_HANDLED;
00520 }
00521
00522 eq::net::CommandResult Pipe::_cmdConfigExitReply( eq::net::Command& command )
00523 {
00524 const eq::PipeConfigExitReplyPacket* packet =
00525 command.getPacket<eq::PipeConfigExitReplyPacket>();
00526 EQVERB << "handle pipe configExit reply " << packet << endl;
00527
00528 if( packet->result )
00529 _state = STATE_EXIT_SUCCESS;
00530 else
00531 _state = STATE_EXIT_FAILED;
00532
00533 return eq::net::COMMAND_HANDLED;
00534 }
00535
00536
00537 std::ostream& operator << ( std::ostream& os, const Pipe* pipe )
00538 {
00539 if( !pipe )
00540 return os;
00541
00542 os << disableFlush << disableHeader << "pipe" << endl;
00543 os << "{" << endl << indent;
00544
00545 const std::string& name = pipe->getName();
00546 if( !name.empty( ))
00547 os << "name \"" << name << "\"" << endl;
00548
00549 if( pipe->getPort() != EQ_UNDEFINED_UINT32 )
00550 os << "port " << pipe->getPort() << endl;
00551
00552 if( pipe->getDevice() != EQ_UNDEFINED_UINT32 )
00553 os << "device " << pipe->getDevice() << endl;
00554
00555 const eq::PixelViewport& pvp = pipe->getPixelViewport();
00556 if( pvp.isValid( ))
00557 os << "viewport " << pvp << endl;
00558
00559 bool attrPrinted = false;
00560 for( Pipe::IAttribute i = static_cast<Pipe::IAttribute>( 0 );
00561 i < Pipe::IATTR_ALL;
00562 i = static_cast<Pipe::IAttribute>( static_cast<uint32_t>( i )+1))
00563 {
00564 const int value = pipe->getIAttribute( i );
00565 if( value == Global::instance()->getPipeIAttribute( i ))
00566 continue;
00567
00568 if( !attrPrinted )
00569 {
00570 os << endl << "attributes" << endl;
00571 os << "{" << endl << indent;
00572 attrPrinted = true;
00573 }
00574
00575 os << ( i==Pipe::IATTR_HINT_THREAD ? "hint_thread " : "ERROR" )
00576 << static_cast<eq::IAttrValue>( value ) << endl;
00577 }
00578
00579 if( attrPrinted )
00580 os << exdent << "}" << endl << endl;
00581
00582 os << endl;
00583
00584 const WindowVector& windows = pipe->getWindows();
00585 for( WindowVector::const_iterator i = windows.begin();
00586 i != windows.end(); ++i )
00587
00588 os << *i;
00589
00590 os << exdent << "}" << endl << enableHeader << enableFlush;
00591 return os;
00592 }
00593
00594 }
00595 }