00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "viewEqualizer.h"
00019
00020 #include "../compound.h"
00021 #include "../compoundVisitor.h"
00022 #include "../config.h"
00023 #include "../constCompoundVisitor.h"
00024 #include "../log.h"
00025 #include "../pipe.h"
00026
00027 #define MIN_USAGE .1f // 10%
00028
00029 namespace eq
00030 {
00031 namespace server
00032 {
00033
00034 ViewEqualizer::ViewEqualizer()
00035 : _nPipes( 0 )
00036 {
00037 EQINFO << "New view equalizer @" << (void*)this << std::endl;
00038 }
00039
00040 ViewEqualizer::ViewEqualizer( const ViewEqualizer& from )
00041 : Equalizer( from )
00042 , _nPipes( 0 )
00043 {}
00044
00045 ViewEqualizer::~ViewEqualizer()
00046 {
00047 attach( 0 );
00048 EQINFO << "Delete view equalizer @" << (void*)this << std::endl;
00049 }
00050
00051 ViewEqualizer::Listener::Listener()
00052 {
00053 }
00054
00055 ViewEqualizer::Listener::~Listener()
00056 {
00057 }
00058
00059 void ViewEqualizer::attach( Compound* compound )
00060 {
00061 for( ListenerVector::iterator i = _listeners.begin();
00062 i != _listeners.end(); ++i )
00063 {
00064 (*i).clear();
00065 }
00066 _listeners.clear();
00067
00068 Equalizer::attach( compound );
00069 }
00070
00071 void ViewEqualizer::notifyUpdatePre( Compound* compound,
00072 const uint32_t frameNumber )
00073 {
00074 EQASSERT( compound == getCompound( ));
00075
00076 _updateListeners();
00077 _updateResources();
00078 _update( frameNumber );
00079 }
00080
00081 namespace
00082 {
00083 class SelfAssigner : public CompoundVisitor
00084 {
00085 public:
00086 SelfAssigner( const Pipe* self, float& nResources,
00087 base::PtrHash< Pipe*, float >& pipeUsage )
00088 : _self( self ), _nResources( nResources ), _pipeUsage( pipeUsage )
00089 , _numChannels( 0 ) {}
00090
00091 virtual VisitorResult visitLeaf( Compound* compound )
00092 {
00093 Pipe* pipe = compound->getPipe();
00094 EQASSERT( pipe );
00095
00096 if( pipe != _self )
00097 return TRAVERSE_CONTINUE;
00098
00099 if( _pipeUsage.find( pipe ) == _pipeUsage.end( ))
00100 _pipeUsage[ pipe ] = 0.0f;
00101
00102 float& pipeUsage = _pipeUsage[ pipe ];
00103 if( pipeUsage >= 1.0f )
00104 {
00105 compound->setUsage( 0.f );
00106 return TRAVERSE_TERMINATE;
00107 }
00108
00109 if( pipeUsage > 0.0f )
00110 {
00111 EQASSERT( pipeUsage < 1.0f );
00112
00113 float use = 1.0f - pipeUsage;
00114 use = EQ_MAX( use, MIN_USAGE );
00115
00116 compound->setUsage( use );
00117 _nResources -= use;
00118 pipeUsage = 1.0f;
00119 EQLOG( LOG_LB1 ) << " Use "
00120 << static_cast< unsigned >( use * 100.f + .5f )
00121 << "% of " << pipe->getName() << " task "
00122 << compound->getTaskID() << ", "
00123 << _nResources * 100.f << "% left" << std::endl;
00124 }
00125 else
00126 {
00127 EQASSERT( pipeUsage == 0.0f );
00128
00129 float use = EQ_MIN( 1.0f, _nResources );
00130
00131 compound->setUsage( use );
00132 _nResources -= use;
00133 pipeUsage = use;
00134 EQLOG( LOG_LB1 ) << " Use "
00135 << static_cast< unsigned >( use * 100.f + .5f )
00136 << "% of " << pipe->getName() << " task "
00137 << compound->getTaskID() << ", "
00138 << _nResources * 100.f << "% left" << std::endl;
00139 }
00140 ++_numChannels;
00141
00142 return TRAVERSE_TERMINATE;
00143 }
00144
00145 uint32_t getNumChannels() const { return _numChannels; }
00146
00147 private:
00148 const Pipe* const _self;
00149 float& _nResources;
00150 base::PtrHash< Pipe*, float >& _pipeUsage;
00151 uint32_t _numChannels;
00152 };
00153
00154 class PreviousAssigner : public CompoundVisitor
00155 {
00156 public:
00157 PreviousAssigner( const Pipe* self, float& nResources,
00158 base::PtrHash< Pipe*, float >& pipeUsage )
00159 : _self( self ), _nResources( nResources ), _pipeUsage( pipeUsage )
00160 , _numChannels( 0 ) {}
00161
00162 virtual VisitorResult visitLeaf( Compound* compound )
00163 {
00164 Pipe* pipe = compound->getPipe();
00165 EQASSERT( pipe );
00166
00167 if( compound->getUsage() == 0.0f ||
00168 pipe == _self)
00169 {
00170 return TRAVERSE_CONTINUE;
00171 }
00172
00173 compound->setUsage( 0.0f );
00174 if( _nResources <= MIN_USAGE )
00175 return TRAVERSE_CONTINUE;
00176
00177 if( _pipeUsage.find( pipe ) == _pipeUsage.end( ))
00178 _pipeUsage[ pipe ] = 0.0f;
00179
00180 float& pipeUsage = _pipeUsage[ pipe ];
00181 if( pipeUsage > 0.0f )
00182 return TRAVERSE_CONTINUE;
00183
00184 float use = EQ_MIN( 1.0f, _nResources );
00185 if( use + MIN_USAGE > 1.0f )
00186 use = 1.0f;
00187
00188 pipeUsage = use;
00189 compound->setUsage( use );
00190 _nResources -= use;
00191 ++_numChannels;
00192
00193 EQLOG( LOG_LB1 ) << " Use "
00194 << static_cast< unsigned >( use * 100.f + .5f )
00195 << "% of " << pipe->getName() << " task "
00196 << compound->getTaskID() << ", "
00197 << _nResources * 100.f << "% left" << std::endl;
00198 return TRAVERSE_CONTINUE;
00199 }
00200
00201 uint32_t getNumChannels() const { return _numChannels; }
00202
00203 private:
00204 const Pipe* const _self;
00205 float& _nResources;
00206 base::PtrHash< Pipe*, float >& _pipeUsage;
00207 uint32_t _numChannels;
00208 };
00209
00210 class NewAssigner : public CompoundVisitor
00211 {
00212 public:
00213 NewAssigner( float& nResources,
00214 base::PtrHash< Pipe*, float >& pipeUsage )
00215 : _nResources( nResources ), _pipeUsage( pipeUsage )
00216 , _numChannels( 0 )
00217 , _fallback( 0 ) {}
00218
00219 virtual VisitorResult visitLeaf( Compound* compound )
00220 {
00221 if( !_fallback )
00222 _fallback = compound;
00223
00224 if( compound->getUsage() != 0.0f )
00225 return TRAVERSE_CONTINUE;
00226
00227 Pipe* pipe = compound->getPipe();
00228 EQASSERT( pipe );
00229
00230 if( _pipeUsage.find( pipe ) == _pipeUsage.end( ))
00231 _pipeUsage[ pipe ] = 0.0f;
00232
00233 float& pipeUsage = _pipeUsage[ pipe ];
00234 if( pipeUsage >= 1.0f )
00235 return TRAVERSE_CONTINUE;
00236
00237 if( pipeUsage > 0.0f )
00238 {
00239 EQASSERT( pipeUsage < 1.0f );
00240
00241 float use = 1.0f - pipeUsage;
00242 use = EQ_MAX( use, MIN_USAGE );
00243
00244 compound->setUsage( use );
00245 _nResources -= use;
00246 pipeUsage = 1.0f;
00247 EQLOG( LOG_LB1 ) << " Use "
00248 << static_cast< unsigned >( use * 100.f + .5f )
00249 << "% of " << pipe->getName() << " task "
00250 << compound->getTaskID() << ", "
00251 << _nResources * 100.f << "% left" << std::endl;
00252 }
00253 else
00254 {
00255 EQASSERT( pipeUsage == 0.0f );
00256
00257 float use = EQ_MIN( 1.0f, _nResources );
00258
00259 compound->setUsage( use );
00260 _nResources -= use;
00261 pipeUsage = use;
00262 EQLOG( LOG_LB1 ) << " Use "
00263 << static_cast< unsigned >( use * 100.f + .5f )
00264 << "% of " << pipe->getName() << " task "
00265 << compound->getTaskID() << ", "
00266 << _nResources * 100.f << "% left" << std::endl;
00267 }
00268 ++_numChannels;
00269
00270 if( _nResources <= MIN_USAGE )
00271 return TRAVERSE_TERMINATE;
00272 return TRAVERSE_CONTINUE;
00273 }
00274
00275 uint32_t getNumChannels() const { return _numChannels; }
00276 Compound* getFallback() { return _fallback; }
00277
00278 private:
00279 float& _nResources;
00280 base::PtrHash< Pipe*, float >& _pipeUsage;
00281 uint32_t _numChannels;
00282 Compound* _fallback;
00283 };
00284
00285 }
00286
00287 void ViewEqualizer::_update( const uint32_t frameNumber )
00288 {
00289 const uint32_t frame = _findInputFrameNumber();
00290 EQLOG( LOG_LB1 ) << "Using data from frame " << frame << std::endl;
00291
00292
00293 LoadVector loads;
00294 int64_t totalTime( 0 );
00295
00296 for( ListenerVector::iterator i = _listeners.begin();
00297 i != _listeners.end(); ++i )
00298 {
00299 Listener& listener = *i;
00300 #if 0
00301 const Listener::Load& load = listener.useLoad( frame );
00302 #else
00303 Listener::Load load = listener.useLoad( frame );
00304 if( load.nResources > 0 )
00305 {
00306 const float time = static_cast< float >( load.time ) /
00307 static_cast< float >( load.nResources );
00308 load.time = static_cast< int64_t >(
00309 time * sqrtf( static_cast< float >( load.nResources )));
00310 }
00311 #endif
00312
00313 totalTime += load.time;
00314 loads.push_back( load );
00315 }
00316
00317 const Compound* compound = getCompound();
00318
00319 if( isFrozen() || !compound->isActive() || _nPipes == 0 )
00320
00321 return;
00322
00323 if( totalTime == 0 )
00324 totalTime = 1;
00325
00326 const float resourceTime( static_cast< float >( totalTime ) /
00327 static_cast< float >( _nPipes ));
00328 EQLOG( LOG_LB1 ) << resourceTime << "ms/resource" << std::endl;
00329
00330
00331 const CompoundVector& children = compound->getChildren();
00332 const size_t size( _listeners.size( ));
00333 EQASSERT( children.size() == size );
00334 base::PtrHash< Pipe*, float > pipeUsage;
00335 float* leftOvers = static_cast< float* >( alloca( size * sizeof( float )));
00336
00337
00338 for( size_t i = 0; i < size; ++i )
00339 {
00340 Listener::Load& load = loads[ i ];
00341 EQASSERT( load.missing == 0 );
00342
00343 Compound* child = children[ i ];
00344 if( !child->isActive( ))
00345 continue;
00346
00347 float segmentResources( load.time / resourceTime );
00348
00349 EQLOG( LOG_LB1 ) << "----- balance step 1 for view " << i << " ("
00350 << child->getChannel()->getName() << ") using "
00351 << segmentResources << " resources" << std::endl;
00352 SelfAssigner assigner( child->getPipe(), segmentResources, pipeUsage );
00353
00354 child->accept( assigner );
00355 load.missing = assigner.getNumChannels();
00356 leftOvers[ i ] = segmentResources;
00357 }
00358
00359
00360 for( size_t i = 0; i < size; ++i )
00361 {
00362 Listener::Load& load = loads[ i ];
00363 Compound* child = children[ i ];
00364 if( !child->isActive( ))
00365 continue;
00366
00367 float& leftOver = leftOvers[i];
00368 EQLOG( LOG_LB1 ) << "----- balance step 2 for view " << i << " ("
00369 << child->getChannel()->getName() << ") using "
00370 << leftOver << " resources" << std::endl;
00371 PreviousAssigner assigner( child->getPipe(), leftOver, pipeUsage );
00372
00373 child->accept( assigner );
00374 load.missing += assigner.getNumChannels();
00375 }
00376
00377
00378 for( size_t i = 0; i < size; ++i )
00379 {
00380 Listener& listener = _listeners[ i ];
00381 EQASSERTINFO( listener.getNLoads() <= getConfig()->getLatency() + 3,
00382 listener );
00383
00384 float& leftOver = leftOvers[i];
00385 Listener::Load& load = loads[ i ];
00386 Compound* child = children[ i ];
00387
00388 if( !child->isActive( ))
00389 continue;
00390
00391 if( leftOver > MIN_USAGE || load.missing == 0 )
00392 {
00393 EQLOG( LOG_LB1 ) << "----- balance step 3 for view " << i << " ("
00394 << child->getChannel()->getName() << ") using "
00395 << leftOver << " resources" << std::endl;
00396
00397 NewAssigner assigner( leftOver, pipeUsage );
00398 child->accept( assigner );
00399 load.missing += assigner.getNumChannels();
00400
00401 if( load.missing == 0 )
00402 {
00403 Compound* fallback = assigner.getFallback();
00404 EQASSERT( fallback );
00405 EQASSERT( leftOver > 0 );
00406
00407 fallback->setUsage( leftOver );
00408 load.missing = 1;
00409 EQLOG( LOG_LB1 ) << " Use "
00410 << static_cast< unsigned >( leftOver*100.f+.5f )
00411 << "% of " << fallback->getPipe()->getName()
00412 << " task " << fallback->getTaskID()
00413 << std::endl;
00414 }
00415 }
00416
00417 listener.newLoad( frameNumber, load.missing );
00418 }
00419 }
00420
00421 uint32_t ViewEqualizer::_findInputFrameNumber() const
00422 {
00423 EQASSERT( !_listeners.empty( ));
00424
00425 uint32_t frame = std::numeric_limits< uint32_t >::max();
00426
00427 for( ListenerVector::const_iterator i = _listeners.begin();
00428 i != _listeners.end(); ++i )
00429 {
00430 const Listener& listener = *i;
00431 const uint32_t youngest = listener.findYoungestLoad();
00432 frame = EQ_MIN( frame, youngest );
00433 }
00434
00435 return frame;
00436 }
00437
00438
00439 void ViewEqualizer::_updateListeners()
00440 {
00441 if( !_listeners.empty( ))
00442 {
00443 EQASSERT( getCompound()->getChildren().size() == _listeners.size( ));
00444 return;
00445 }
00446
00447 Compound* compound = getCompound();
00448 const CompoundVector& children = compound->getChildren();
00449 const size_t nChildren = children.size();
00450
00451 _listeners.resize( nChildren );
00452 for( size_t i = 0; i < nChildren; ++i )
00453 {
00454 EQLOG( LOG_LB1 ) << base::disableFlush << "Tasks for view " << i
00455 << ": ";
00456 Listener& listener = _listeners[ i ];
00457 listener.update( children[i] );
00458 EQLOG( LOG_LB1 ) << std::endl << base::enableFlush;
00459 }
00460 }
00461
00462 namespace
00463 {
00464 class PipeCounter : public ConstCompoundVisitor
00465 {
00466 public:
00467 virtual VisitorResult visitPre( const Compound* compound )
00468 { return compound->isActive() ? TRAVERSE_CONTINUE : TRAVERSE_PRUNE; }
00469
00470 virtual VisitorResult visitLeaf( const Compound* compound )
00471 {
00472 if( !compound->isActive( ))
00473 return TRAVERSE_PRUNE;
00474
00475 const Pipe* pipe = compound->getPipe();
00476 EQASSERT( pipe );
00477 _pipes.insert( pipe );
00478 return TRAVERSE_CONTINUE;
00479 }
00480
00481 size_t getNPipes() const { return _pipes.size(); }
00482
00483 private:
00484 std::set< const Pipe* > _pipes;
00485 };
00486 }
00487
00488 void ViewEqualizer::_updateResources()
00489 {
00490 if( _nPipes > 0 )
00491 return;
00492
00493 PipeCounter counter;
00494 getCompound()->accept( counter );
00495 _nPipes = counter.getNPipes();
00496 }
00497
00498
00499
00500
00501 namespace
00502 {
00503 class LoadSubscriber : public CompoundVisitor
00504 {
00505 public:
00506 LoadSubscriber( ChannelListener* listener,
00507 base::PtrHash< Channel*, uint32_t >& taskIDs )
00508 : _listener( listener )
00509 , _taskIDs( taskIDs ) {}
00510
00511 virtual VisitorResult visitLeaf( Compound* compound )
00512 {
00513 Channel* channel = compound->getChannel();
00514 EQASSERT( channel );
00515
00516 if( _taskIDs.find( channel ) == _taskIDs.end( ))
00517 {
00518 channel->addListener( _listener );
00519 _taskIDs[ channel ] = compound->getTaskID();
00520 EQLOG( LOG_LB1 ) << _taskIDs[ channel ] << ' ';
00521 }
00522 else
00523 {
00524 EQASSERTINFO( 0,
00525 "View equalizer does not support using channel "<<
00526 channel->getName() <<
00527 " multiple times in one branch" );
00528 }
00529 return TRAVERSE_CONTINUE;
00530 }
00531
00532 private:
00533 ChannelListener* const _listener;
00534 base::PtrHash< Channel*, uint32_t >& _taskIDs;
00535 };
00536 }
00537
00538 void ViewEqualizer::Listener::update( Compound* compound )
00539 {
00540 EQASSERT( _taskIDs.empty( ));
00541 LoadSubscriber subscriber( this, _taskIDs );
00542 compound->accept( subscriber );
00543 }
00544
00545 void ViewEqualizer::Listener::clear()
00546 {
00547 for( TaskIDHash::const_iterator i = _taskIDs.begin();
00548 i != _taskIDs.end(); ++i )
00549 {
00550 i->first->removeListener( this );
00551 }
00552 _taskIDs.clear();
00553 }
00554
00555 ViewEqualizer::Listener::Load ViewEqualizer::Listener::Load::NONE( 0, 0, 1 );
00556 ViewEqualizer::Listener::Load::Load( const uint32_t frame_,
00557 const uint32_t missing_,
00558 const int64_t time_ )
00559 : frame( frame_ ), missing( missing_ ), nResources( missing_ )
00560 , time( time_ ) {}
00561
00562 bool ViewEqualizer::Listener::Load::operator == ( const Load& rhs ) const
00563 {
00564 return ( frame == rhs.frame && missing == rhs.missing && time == rhs.time );
00565 }
00566
00567 void ViewEqualizer::Listener::notifyLoadData( Channel* channel,
00568 const uint32_t frameNumber,
00569 const uint32_t nStatistics,
00570 const eq::Statistic* statistics )
00571 {
00572 Load& load = _getLoad( frameNumber );
00573 if( load == Load::NONE )
00574 return;
00575
00576 EQASSERT( _taskIDs.find( channel ) != _taskIDs.end( ));
00577 const uint32_t taskID = _taskIDs[ channel ];
00578
00579
00580 int64_t startTime = std::numeric_limits< int64_t >::max();
00581 int64_t endTime = 0;
00582 bool loadSet = false;
00583
00584 for( uint32_t i = 0; i < nStatistics && !loadSet; ++i )
00585 {
00586 const eq::Statistic& data = statistics[i];
00587 if( data.task != taskID )
00588 continue;
00589
00590 switch( data.type )
00591 {
00592 case eq::Statistic::CHANNEL_CLEAR:
00593 case eq::Statistic::CHANNEL_DRAW:
00594 case eq::Statistic::CHANNEL_READBACK:
00595 startTime = EQ_MIN( startTime, data.startTime );
00596 endTime = EQ_MAX( endTime, data.endTime );
00597 break;
00598
00599
00600 case eq::Statistic::CHANNEL_ASSEMBLE:
00601 loadSet = true;
00602 break;
00603
00604 default:
00605 break;
00606 }
00607 }
00608
00609 if( startTime == std::numeric_limits< int64_t >::max( ))
00610 return;
00611
00612 EQASSERTINFO( load.missing > 0, load );
00613
00614 const int64_t time = endTime - startTime;
00615 load.time += time;
00616 --load.missing;
00617
00618 EQLOG( LOG_LB1 ) << "Task " << taskID << ", added time " << time << " to "
00619 << load << std::endl;
00620 }
00621
00622 uint32_t ViewEqualizer::Listener::findYoungestLoad() const
00623 {
00624 for( LoadDeque::const_iterator i = _loads.begin(); i != _loads.end(); ++i )
00625 {
00626 const Load& load = *i;
00627 if( load.missing == 0 )
00628 return load.frame;
00629 }
00630 return 0;
00631 }
00632
00633 const ViewEqualizer::Listener::Load&
00634 ViewEqualizer::Listener::useLoad( const uint32_t frame )
00635 {
00636 for( LoadDeque::iterator i = _loads.begin(); i != _loads.end(); ++i )
00637 {
00638 Load& load = *i;
00639 if( load.frame == frame )
00640 {
00641 EQASSERT( load.missing == 0 );
00642 if( load.time == 0 )
00643 load.time = 1;
00644
00645 ++i;
00646 _loads.erase( i, _loads.end( ));
00647 return load;
00648 }
00649 }
00650
00651 return Load::NONE;
00652 }
00653
00654 ViewEqualizer::Listener::Load&
00655 ViewEqualizer::Listener::_getLoad( const uint32_t frame )
00656 {
00657 for( LoadDeque::iterator i = _loads.begin(); i != _loads.end(); ++i )
00658 {
00659 const Load& load = *i;
00660 if( load.frame == frame )
00661 return *i;
00662 }
00663
00664 return Load::NONE;
00665 }
00666
00667 void ViewEqualizer::Listener::newLoad( const uint32_t frameNumber,
00668 const uint32_t nChannels )
00669 {
00670 EQASSERT( nChannels > 0 );
00671 _loads.push_front( Load( frameNumber, nChannels, 0 ));
00672 }
00673
00674 std::ostream& operator << ( std::ostream& os, const ViewEqualizer* equalizer )
00675 {
00676 if( equalizer )
00677 os << "view_equalizer {}" << std::endl;
00678 return os;
00679 }
00680
00681 std::ostream& operator << ( std::ostream& os,
00682 const ViewEqualizer::Listener& listener )
00683 {
00684 os << base::disableFlush << "Listener" << std::endl
00685 << base::indent;
00686 for( ViewEqualizer::Listener::LoadDeque::const_iterator i =
00687 listener._loads.begin(); i != listener._loads.end(); ++i )
00688 {
00689 os << *i << std::endl;
00690 }
00691 os << base::exdent << base::enableFlush;
00692 return os;
00693 }
00694
00695 std::ostream& operator << ( std::ostream& os,
00696 const ViewEqualizer::Listener::Load& load )
00697 {
00698 os << "frame " << load.frame << " missing " << load.missing << " t "
00699 << load.time;
00700 return os;
00701 }
00702
00703 }
00704 }