00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "frameData.h"
00019
00020 #include "commands.h"
00021 #include "compressor.h"
00022 #include "frameDataStatistics.h"
00023 #include "image.h"
00024 #include "log.h"
00025 #include "packets.h"
00026 #include "roiFinder.h"
00027
00028 #include <eq/net/command.h>
00029 #include <eq/net/commandFunc.h>
00030 #include <eq/net/dataIStream.h>
00031 #include <eq/net/dataOStream.h>
00032 #include <eq/net/session.h>
00033 #include <eq/base/monitor.h>
00034 #include <algorithm>
00035
00036 using namespace eq::base;
00037 using namespace std;
00038 using eq::net::CommandFunc;
00039
00040 namespace eq
00041 {
00042 FrameData::FrameData()
00043 : _colorType( GL_RGBA )
00044 , _useAlpha( true )
00045 , _useSendToken( false )
00046 {
00047 _roiFinder = new ROIFinder();
00048 EQINFO << "New FrameData @" << (void*)this << endl;
00049 }
00050
00051 FrameData::~FrameData()
00052 {
00053 clear();
00054
00055 for( vector<Image*>::const_iterator i = _imageCache.begin();
00056 i != _imageCache.end(); ++i )
00057 {
00058 Image* image = *i;
00059 EQWARN << "Unflushed image in FrameData destructor" << endl;
00060 delete image;
00061 }
00062 _imageCache.clear();
00063
00064 delete _roiFinder;
00065 _roiFinder = 0;
00066 }
00067
00068 void FrameData::getInstanceData( net::DataOStream& os )
00069 {
00070 EQUNREACHABLE;
00071 os << _data;
00072 }
00073
00074 void FrameData::applyInstanceData( net::DataIStream& is )
00075 {
00076 clear();
00077 is >> _data;
00078 EQLOG( LOG_ASSEMBLY ) << "applied " << this << endl;
00079 }
00080
00081 void FrameData::update( const uint32_t version )
00082 {
00083
00084 FrameDataUpdatePacket packet;
00085 packet.instanceID = getInstanceID();
00086 packet.version = version;
00087 send( getLocalNode(), packet );
00088 }
00089
00090 void FrameData::attachToSession( const uint32_t id, const uint32_t instanceID,
00091 net::Session* session )
00092 {
00093 net::Object::attachToSession( id, instanceID, session );
00094
00095 net::CommandQueue* queue = session->getCommandThreadQueue();
00096
00097 registerCommand( CMD_FRAMEDATA_TRANSMIT,
00098 CommandFunc<FrameData>( this, &FrameData::_cmdTransmit ),
00099 queue );
00100 registerCommand( CMD_FRAMEDATA_READY,
00101 CommandFunc<FrameData>( this, &FrameData::_cmdReady ),
00102 queue );
00103 registerCommand( CMD_FRAMEDATA_UPDATE,
00104 CommandFunc<FrameData>( this, &FrameData::_cmdUpdate ),
00105 queue );
00106 }
00107
00108 void FrameData::clear()
00109 {
00110 EQASSERT( _listeners.empty( ));
00111
00112 _imageCacheLock.set();
00113 _imageCache.insert( _imageCache.end(), _images.begin(), _images.end( ));
00114 _imageCacheLock.unset();
00115
00116 _images.clear();
00117 }
00118
00119 void FrameData::flush()
00120 {
00121 clear();
00122
00123 for( vector<Image*>::const_iterator i = _imageCache.begin();
00124 i != _imageCache.end(); ++i )
00125 {
00126 Image* image = *i;
00127 image->flush();
00128 delete image;
00129 }
00130
00131 _imageCache.clear();
00132 }
00133
00134 Image* FrameData::newImage( const eq::Frame::Type type )
00135 {
00136 Image* image = _allocImage( type );
00137 _images.push_back( image );
00138 return image;
00139 }
00140
00141 Image* FrameData::_allocImage( const eq::Frame::Type type )
00142 {
00143 Image* image;
00144 _imageCacheLock.set();
00145
00146 if( _imageCache.empty( ))
00147 {
00148 _imageCacheLock.unset();
00149 image = new Image;
00150
00151 image->setFormat( Frame::BUFFER_DEPTH, GL_DEPTH_COMPONENT );
00152 image->setType( Frame::BUFFER_DEPTH, GL_UNSIGNED_INT );
00153 }
00154 else
00155 {
00156 image = _imageCache.back();
00157 _imageCache.pop_back();
00158 _imageCacheLock.unset();
00159
00160 image->reset();
00161 }
00162
00163 image->setStorageType( type );
00164 _useAlpha ? image->enableAlphaUsage() : image->disableAlphaUsage();
00165
00166 if( type == Frame::TYPE_TEXTURE )
00167 image->setFormat( Frame::BUFFER_COLOR, GL_RGBA );
00168 else
00169 image->setFormat( Frame::BUFFER_COLOR, GL_BGRA );
00170
00171 switch( _colorType )
00172 {
00173 case GL_RGBA16F:
00174 image->setType( Frame::BUFFER_COLOR, GL_HALF_FLOAT );
00175 break;
00176 case GL_RGBA32F:
00177 image->setType( Frame::BUFFER_COLOR, GL_FLOAT );
00178 break;
00179 default:
00180 image->setType( Frame::BUFFER_COLOR, GL_UNSIGNED_BYTE );
00181 }
00182
00183 return image;
00184 }
00185
00186
00187 void FrameData::startReadback( const Frame& frame,
00188 Window::ObjectManager* glObjects )
00189 {
00190 if( _data.buffers == Frame::BUFFER_NONE )
00191 return;
00192
00193 PixelViewport absPVP = _data.pvp + frame.getOffset();
00194 if( !absPVP.isValid( ))
00195 return;
00196
00197 const Zoom& zoom = frame.getZoom();
00198
00199 if( !zoom.isValid( ))
00200 {
00201 EQWARN << "Invalid zoom factor, skipping frame." << endl;
00202 return;
00203 }
00204
00205 PixelViewportVector pvps;
00206
00207 if( _data.buffers & Frame::BUFFER_DEPTH && zoom == Zoom::NONE )
00208 pvps = _roiFinder->findRegions( _data.buffers, absPVP, zoom,
00209
00210 0, 0, glObjects );
00211 else
00212 pvps.push_back( absPVP );
00213
00214 for( uint32_t i = 0; i < pvps.size(); i++ )
00215 {
00216 PixelViewport pvp = pvps[ i ];
00217 pvp.intersect( absPVP );
00218
00219 Image* image = newImage( _data.frameType );
00220 image->startReadback( _data.buffers, pvp, zoom, glObjects );
00221 image->setOffset( pvp.x - absPVP.x, pvp.y - absPVP.y );
00222 }
00223 }
00224
00225 void FrameData::syncReadback()
00226 {
00227 for( vector<Image*>::const_iterator iter = _images.begin();
00228 iter != _images.end(); ++iter )
00229 {
00230 Image* image = *iter;
00231 image->syncReadback();
00232
00233 #ifndef NDEBUG
00234 if( getenv( "EQ_DUMP_IMAGES" ))
00235 {
00236 static uint32_t counter = 0;
00237 ostringstream stringstream;
00238
00239 stringstream << "Image_" << setfill( '0' ) << setw(5) << ++counter;
00240 image->writeImages( stringstream.str( ));
00241 }
00242 #endif
00243 }
00244 setReady();
00245 }
00246
00247 void FrameData::setReady()
00248 {
00249 _setReady( getVersion( ));
00250 }
00251
00252 void FrameData::_setReady( const uint32_t version )
00253 {
00254 EQASSERTINFO( getVersion() == net::Object::VERSION_NONE ||
00255 _readyVersion <= version,
00256 "v" << getVersion() << " ready " << _readyVersion << " new "
00257 << version );
00258
00259 base::ScopedMutex mutex( _listenersMutex );
00260 #ifndef NDEBUG
00261 for( list<ImageVersion>::iterator i = _pendingImages.begin();
00262 i != _pendingImages.end(); ++i )
00263 {
00264 const ImageVersion& imageVersion = *i;
00265 EQASSERTINFO( imageVersion.version > version,
00266 "Frame is ready, but not all images have been set" );
00267 }
00268 #endif
00269
00270 if( _readyVersion >= version )
00271 return;
00272
00273 _readyVersion = version;
00274 EQLOG( LOG_ASSEMBLY ) << "set ready " << this << ", " << _listeners.size()
00275 << " monitoring" << endl;
00276
00277 for( vector< Monitor<uint32_t>* >::iterator i = _listeners.begin();
00278 i != _listeners.end(); ++i )
00279 {
00280 Monitor<uint32_t>* monitor = *i;
00281 ++(*monitor);
00282 }
00283 }
00284
00285
00286 void FrameData::transmit( net::NodePtr toNode, const uint32_t frameNumber )
00287 {
00288 FrameDataStatistics event( Statistic::FRAME_TRANSMIT, this, frameNumber );
00289
00290 if( _data.buffers == 0 )
00291 {
00292 EQWARN << "No buffers for frame data" << endl;
00293 return;
00294 }
00295
00296 if ( _data.frameType == Frame::TYPE_TEXTURE )
00297 {
00298 EQWARN << "Can't transmit image of type TEXTURE" << endl;
00299 EQUNIMPLEMENTED;
00300 return;
00301 }
00302
00303 net::ConnectionPtr connection = toNode->getConnection();
00304 net::ConnectionDescriptionPtr description = connection->getDescription();
00305
00306
00307 const bool useCompression = ( description->bandwidth <= 262144 );
00308
00309 FrameDataTransmitPacket packet;
00310 const uint64_t packetSize = sizeof( packet ) - 8*sizeof( uint8_t );
00311 const net::Session* session = getSession();
00312 EQASSERT( session );
00313
00314 packet.sessionID = session->getID();
00315 packet.objectID = getID();
00316 packet.version = getVersion();
00317 packet.frameNumber = frameNumber;
00318
00319
00320 for( vector<Image*>::const_iterator i = _images.begin();
00321 i != _images.end(); ++i )
00322 {
00323 Image* image = *i;
00324 vector< const Image::PixelData* > pixelDatas;
00325
00326 packet.size = packetSize;
00327 packet.buffers = Frame::BUFFER_NONE;
00328 packet.pvp = image->getPixelViewport();
00329 packet.ignoreAlpha = image->ignoreAlpha();
00330
00331 EQASSERT( packet.pvp.isValid( ));
00332
00333 {
00334 uint64_t rawSize( 0 );
00335 FrameDataStatistics compressEvent( Statistic::FRAME_COMPRESS, this,
00336 frameNumber );
00337 compressEvent.event.data.statistic.ratio = 1.0f;
00338 if( !useCompression )
00339 compressEvent.event.data.statistic.frameNumber = 0;
00340
00341
00342 Frame::Buffer buffers[] = {Frame::BUFFER_COLOR,Frame::BUFFER_DEPTH};
00343
00344
00345 for( unsigned j = 0; j < 2; ++j )
00346 {
00347 Frame::Buffer buffer = buffers[j];
00348 if( image->hasPixelData( buffer ))
00349 {
00350
00351 packet.size += 4 * sizeof( uint32_t );
00352
00353 const Image::PixelData& data = useCompression ?
00354 image->compressPixelData( buffer ) :
00355 image->getPixelData( buffer );
00356 pixelDatas.push_back( &data );
00357
00358 if( data.isCompressed )
00359 {
00360 const uint32_t nElements = data.compressedSize.size();
00361 for( uint32_t k = 0 ; k < nElements; ++k )
00362 {
00363 packet.size += sizeof( uint64_t );
00364 packet.size += data.compressedSize[ k ];
00365 }
00366 }
00367 else
00368 {
00369 packet.size += sizeof( uint64_t );
00370 packet.size += data.pixels.getSize();
00371 }
00372
00373 packet.buffers |= buffer;
00374 rawSize += image->getPixelDataSize( buffer );
00375 }
00376 }
00377
00378 if( rawSize > 0 )
00379 compressEvent.event.data.statistic.ratio =
00380 static_cast< float >( packet.size ) /
00381 static_cast< float >( rawSize );
00382 }
00383
00384 if( pixelDatas.empty( ))
00385 continue;
00386
00387
00388 if( _useSendToken )
00389 getLocalNode()->acquireSendToken( toNode );
00390
00391 connection->lockSend();
00392 connection->send( &packet, packetSize, true );
00393 #ifndef NDEBUG
00394 size_t sentBytes = packetSize;
00395 #endif
00396
00397 for( uint32_t j=0; j < pixelDatas.size(); ++j )
00398 {
00399 #ifndef NDEBUG
00400 sentBytes += 4 * sizeof( uint32_t );
00401 #endif
00402 const Image::PixelData* data = pixelDatas[j];
00403 const uint32_t imageHeader[4] =
00404 { data->format,
00405 data->type,
00406 data->compressorName,
00407 data->isCompressed ? data->compressedSize.size() : 1 };
00408
00409 connection->send( imageHeader, 4 * sizeof( uint32_t ), true );
00410
00411 if( data->isCompressed )
00412 {
00413 for( uint32_t k = 0 ; k < data->compressedSize.size(); ++k )
00414 {
00415 const uint64_t dataSize = data->compressedSize[k];
00416 connection->send( &dataSize, sizeof( dataSize ), true );
00417 connection->send( data->compressedData[k], dataSize, true );
00418 #ifndef NDEBUG
00419 sentBytes += sizeof( dataSize ) + dataSize;
00420 #endif
00421 }
00422 }
00423 else
00424 {
00425 const uint64_t dataSize = data->pixels.getSize();
00426 connection->send( &dataSize, sizeof( dataSize ), true );
00427 connection->send( data->pixels.getData(), dataSize, true );
00428 #ifndef NDEBUG
00429 sentBytes += sizeof( dataSize ) + dataSize;
00430 #endif
00431 }
00432 }
00433 #ifndef NDEBUG
00434 EQASSERTINFO( sentBytes == packet.size,
00435 sentBytes << " != " << packet.size );
00436 #endif
00437
00438 connection->unlockSend();
00439 if( _useSendToken )
00440 getLocalNode()->releaseSendToken( toNode );
00441 }
00442
00443 FrameDataReadyPacket readyPacket;
00444 readyPacket.sessionID = session->getID();
00445 readyPacket.objectID = getID();
00446 readyPacket.version = getVersion();
00447 toNode->send( readyPacket );
00448 }
00449
00450 void FrameData::addListener( base::Monitor<uint32_t>& listener )
00451 {
00452 _listenersMutex.set();
00453
00454 _listeners.push_back( &listener );
00455 if( _readyVersion >= getVersion( ))
00456 ++listener;
00457
00458 _listenersMutex.unset();
00459 }
00460
00461 void FrameData::removeListener( base::Monitor<uint32_t>& listener )
00462 {
00463 _listenersMutex.set();
00464
00465 vector< Monitor<uint32_t>* >::iterator i = find( _listeners.begin(),
00466 _listeners.end(),
00467 &listener );
00468 EQASSERT( i != _listeners.end( ));
00469 _listeners.erase( i );
00470
00471 _listenersMutex.unset();
00472 }
00473
00474
00475
00476 net::CommandResult FrameData::_cmdTransmit( net::Command& command )
00477 {
00478 CHECK_THREAD( _commandThread );
00479 const FrameDataTransmitPacket* packet =
00480 command.getPacket<FrameDataTransmitPacket>();
00481
00482 EQLOG( LOG_ASSEMBLY )
00483 << this << " received image, buffers " << packet->buffers << " pvp "
00484 << packet->pvp << " v" << packet->version << endl;
00485
00486 EQASSERT( packet->pvp.isValid( ));
00487
00488 FrameDataStatistics event( Statistic::FRAME_RECEIVE, this,
00489 packet->frameNumber );
00490
00491 Image* image = _allocImage( Frame::TYPE_MEMORY );
00492
00493
00494
00495 uint8_t* data = const_cast< uint8_t* >( packet->data );
00496
00497 image->setPixelViewport( packet->pvp );
00498 packet->ignoreAlpha ? image->disableAlphaUsage() :image->enableAlphaUsage();
00499
00500 Frame::Buffer buffers[] = { Frame::BUFFER_COLOR, Frame::BUFFER_DEPTH };
00501 for( unsigned i = 0; i < 2; ++i )
00502 {
00503 Frame::Buffer buffer = buffers[i];
00504
00505 if( packet->buffers & buffer )
00506 {
00507 Image::PixelData pixelData;
00508 const uint32_t* u32Data = reinterpret_cast< uint32_t* >( data );
00509
00510 pixelData.format = u32Data[0];
00511 pixelData.type = u32Data[1];
00512 pixelData.compressorName = u32Data[2];
00513 const uint32_t nChunks = u32Data[3];
00514
00515 data += 4 * sizeof( uint32_t );
00516
00517 if( pixelData.compressorName > EQ_COMPRESSOR_NONE )
00518 {
00519 pixelData.compressedSize.resize( nChunks );
00520 pixelData.compressedData.resize( nChunks );
00521
00522 for( uint32_t j = 0; j < nChunks; ++j )
00523 {
00524 const uint64_t size = *reinterpret_cast< uint64_t*>( data );
00525 data += sizeof( uint64_t );
00526
00527 pixelData.compressedSize[j] = size;
00528 pixelData.compressedData[j] = data;
00529 data += size;
00530 }
00531
00532 image->setPixelData( buffer, pixelData );
00533 }
00534 else
00535 {
00536 const uint64_t size = *reinterpret_cast< uint64_t* >( data );
00537 data += sizeof( uint64_t );
00538
00539 image->setFormat( buffer, pixelData.format );
00540 image->setType( buffer, pixelData.type );
00541 EQASSERT( size == image->getPixelDataSize( buffer ));
00542
00543 image->setPixelData( buffer, data );
00544 data += size;
00545 }
00546
00547
00548 pixelData.compressedSize.clear();
00549 pixelData.compressedData.clear();
00550 }
00551 }
00552
00553 const uint32_t version = getVersion();
00554
00555 if( version == packet->version )
00556 {
00557 EQASSERT( _readyVersion < getVersion( ));
00558 _images.push_back( image );
00559 }
00560 else
00561 {
00562 EQASSERT( version < packet->version );
00563 _pendingImages.push_back( ImageVersion( image, packet->version ));
00564 }
00565
00566 return net::COMMAND_HANDLED;
00567 }
00568
00569 net::CommandResult FrameData::_cmdReady( net::Command& command )
00570 {
00571 CHECK_THREAD( _commandThread );
00572 const FrameDataReadyPacket* packet =
00573 command.getPacket<FrameDataReadyPacket>();
00574
00575 if( getVersion() == packet->version )
00576 {
00577 _applyVersion( packet->version );
00578 _setReady( packet->version );
00579 }
00580 else
00581 _readyVersions.insert( packet->version );
00582
00583 EQLOG( LOG_ASSEMBLY ) << this << " received v" << packet->version << endl;
00584
00585 return net::COMMAND_HANDLED;
00586 }
00587
00588 net::CommandResult FrameData::_cmdUpdate( net::Command& command )
00589 {
00590 CHECK_THREAD( _commandThread );
00591 const FrameDataUpdatePacket* packet =
00592 command.getPacket<FrameDataUpdatePacket>();
00593
00594 _applyVersion( packet->version );
00595
00596 std::set< uint32_t >::iterator i = _readyVersions.find( packet->version );
00597 if( i != _readyVersions.end( ))
00598 {
00599 _readyVersions.erase( i );
00600 _setReady( packet->version );
00601 }
00602
00603 return net::COMMAND_HANDLED;
00604 }
00605
00606 void FrameData::_applyVersion( const uint32_t version )
00607 {
00608 CHECK_THREAD( _commandThread );
00609 EQLOG( LOG_ASSEMBLY ) << this << " apply v" << version << endl;
00610
00611
00612
00613
00614
00615 if( _readyVersion == version )
00616 {
00617 #ifndef NDEBUG
00618 for( list<ImageVersion>::iterator i = _pendingImages.begin();
00619 i != _pendingImages.end(); ++i )
00620 {
00621 const ImageVersion& imageVersion = *i;
00622 EQASSERTINFO( imageVersion.version > version,
00623 "Frame is ready, but not all images have been set" );
00624 }
00625 #endif
00626
00627
00628 return;
00629 }
00630
00631
00632
00633 for( list<ImageVersion>::iterator i = _pendingImages.begin();
00634 i != _pendingImages.end(); )
00635 {
00636 const ImageVersion& imageVersion = *i;
00637 EQASSERT( imageVersion.version >= version );
00638
00639 if( imageVersion.version == version )
00640 {
00641 _images.push_back( imageVersion.image );
00642 list<ImageVersion>::iterator eraseIter = i;
00643 ++i;
00644 _pendingImages.erase( eraseIter );
00645
00646 EQASSERT( _readyVersion < version );
00647 }
00648 else
00649 ++i;
00650 }
00651
00652 EQLOG( LOG_ASSEMBLY ) << this << " applied v" << version << endl;
00653 }
00654
00655 std::ostream& operator << ( std::ostream& os, const FrameData* data )
00656 {
00657 os << "frame data id " << data->getID() << "." << data->getInstanceID()
00658 << " v" << data->getVersion() << ' ' << data->getImages().size()
00659 << " images, ready " << ( data->isReady() ? 'y' :'n' );
00660 return os;
00661 }
00662
00663 }