lib/client/frameData.cpp

00001 
00002 /* Copyright (c) 2006-2009, Stefan Eilemann <eile@equalizergraphics.com>
00003  *
00004  * This library is free software; you can redistribute it and/or modify it under
00005  * the terms of the GNU Lesser General Public License version 2.1 as published
00006  * by the Free Software Foundation.
00007  *  
00008  * This library is distributed in the hope that it will be useful, but WITHOUT
00009  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00010  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00011  * details.
00012  * 
00013  * You should have received a copy of the GNU Lesser General Public License
00014  * along with this library; if not, write to the Free Software Foundation, Inc.,
00015  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00016  */
00017 
00018 #include "frameData.h"
00019 
00020 #include "commands.h"
00021 #include "compressor.h"
00022 #include "frameDataStatistics.h"
00023 #include "image.h"
00024 #include "log.h"
00025 #include "packets.h"
00026 #include "roiFinder.h"
00027 
00028 #include <eq/net/command.h>
00029 #include <eq/net/commandFunc.h>
00030 #include <eq/net/dataIStream.h>
00031 #include <eq/net/dataOStream.h>
00032 #include <eq/net/session.h>
00033 #include <eq/base/monitor.h>
00034 #include <algorithm>
00035 
00036 using namespace eq::base;
00037 using namespace std;
00038 using eq::net::CommandFunc;
00039 
00040 namespace eq
00041 {
00042 FrameData::FrameData() 
00043         : _colorType( GL_RGBA )
00044         , _useAlpha( true )
00045         , _useSendToken( false )
00046 {
00047     _roiFinder = new ROIFinder();
00048     EQINFO << "New FrameData @" << (void*)this << endl;
00049 }
00050 
00051 FrameData::~FrameData()
00052 {
00053     clear();
00054 
00055     for( vector<Image*>::const_iterator i = _imageCache.begin();
00056          i != _imageCache.end(); ++i )
00057     {
00058         Image* image = *i;
00059         EQWARN << "Unflushed image in FrameData destructor" << endl;
00060         delete image;
00061     }
00062     _imageCache.clear();
00063 
00064     delete _roiFinder;
00065     _roiFinder = 0;
00066 }
00067 
00068 void FrameData::getInstanceData( net::DataOStream& os )
00069 {
00070     EQUNREACHABLE;
00071     os << _data;
00072 }
00073 
00074 void FrameData::applyInstanceData( net::DataIStream& is )
00075 {
00076     clear();
00077     is >> _data;
00078     EQLOG( LOG_ASSEMBLY ) << "applied " << this << endl;
00079 }
00080 
00081 void FrameData::update( const uint32_t version )
00082 {
00083     // trigger process of received ready packets
00084     FrameDataUpdatePacket packet;
00085     packet.instanceID = getInstanceID();
00086     packet.version    = version;
00087     send( getLocalNode(), packet );
00088 }
00089 
00090 void FrameData::attachToSession( const uint32_t id, const uint32_t instanceID,
00091                                  net::Session* session )
00092 {
00093     net::Object::attachToSession( id, instanceID, session );
00094 
00095     net::CommandQueue* queue = session->getCommandThreadQueue();
00096 
00097     registerCommand( CMD_FRAMEDATA_TRANSMIT,
00098                      CommandFunc<FrameData>( this, &FrameData::_cmdTransmit ),
00099                      queue );
00100     registerCommand( CMD_FRAMEDATA_READY,
00101                      CommandFunc<FrameData>( this, &FrameData::_cmdReady ),
00102                      queue );
00103     registerCommand( CMD_FRAMEDATA_UPDATE,
00104                      CommandFunc<FrameData>( this, &FrameData::_cmdUpdate ),
00105                      queue );
00106 }
00107 
00108 void FrameData::clear()
00109 {
00110     EQASSERT( _listeners.empty( ));
00111 
00112     _imageCacheLock.set();
00113     _imageCache.insert( _imageCache.end(), _images.begin(), _images.end( ));
00114     _imageCacheLock.unset();
00115 
00116     _images.clear();
00117 }
00118 
00119 void FrameData::flush()
00120 {
00121     clear();
00122 
00123     for( vector<Image*>::const_iterator i = _imageCache.begin();
00124          i != _imageCache.end(); ++i )
00125     {
00126         Image* image = *i;
00127         image->flush();
00128         delete image;
00129     }
00130 
00131     _imageCache.clear();
00132 }
00133 
00134 Image* FrameData::newImage( const eq::Frame::Type type )
00135 {
00136     Image* image = _allocImage( type );
00137     _images.push_back( image );
00138     return image;
00139 }
00140 
00141 Image* FrameData::_allocImage( const eq::Frame::Type type )
00142 {
00143     Image* image;
00144     _imageCacheLock.set();
00145 
00146     if( _imageCache.empty( ))
00147     {
00148         _imageCacheLock.unset();
00149         image = new Image;
00150 
00151         image->setFormat( Frame::BUFFER_DEPTH, GL_DEPTH_COMPONENT );
00152         image->setType( Frame::BUFFER_DEPTH, GL_UNSIGNED_INT );
00153     }
00154     else
00155     {
00156         image = _imageCache.back();
00157         _imageCache.pop_back();
00158         _imageCacheLock.unset();
00159 
00160         image->reset();
00161     }
00162 
00163     image->setStorageType( type );
00164     _useAlpha ? image->enableAlphaUsage() : image->disableAlphaUsage();
00165 
00166     if( type == Frame::TYPE_TEXTURE )
00167         image->setFormat( Frame::BUFFER_COLOR, GL_RGBA );
00168     else
00169         image->setFormat( Frame::BUFFER_COLOR, GL_BGRA );
00170 
00171     switch( _colorType )
00172     {
00173         case GL_RGBA16F:  
00174             image->setType( Frame::BUFFER_COLOR, GL_HALF_FLOAT );
00175             break;
00176         case GL_RGBA32F:  
00177             image->setType( Frame::BUFFER_COLOR, GL_FLOAT );
00178             break;
00179         default:
00180             image->setType( Frame::BUFFER_COLOR, GL_UNSIGNED_BYTE );
00181     }
00182 
00183     return image;
00184 }
00185 
00186 
00187 void FrameData::startReadback( const Frame& frame,
00188                                Window::ObjectManager* glObjects )
00189 {
00190     if( _data.buffers == Frame::BUFFER_NONE )
00191         return;
00192 
00193     PixelViewport absPVP = _data.pvp + frame.getOffset();
00194     if( !absPVP.isValid( ))
00195         return;
00196 
00197     const Zoom& zoom = frame.getZoom();
00198 
00199     if( !zoom.isValid( ))
00200     {
00201         EQWARN << "Invalid zoom factor, skipping frame." << endl;
00202         return;
00203     }
00204 
00205     PixelViewportVector pvps;
00206 
00207     if( _data.buffers & Frame::BUFFER_DEPTH && zoom == Zoom::NONE )
00208         pvps = _roiFinder->findRegions( _data.buffers, absPVP, zoom,
00209 //                    frame.getAssemblyStage(), frame.getFrameID(), glObjects );
00210                     0, 0, glObjects );
00211     else
00212         pvps.push_back( absPVP );
00213 
00214     for( uint32_t i = 0; i < pvps.size(); i++ )
00215     {
00216         PixelViewport pvp = pvps[ i ];
00217         pvp.intersect( absPVP );
00218                 
00219         Image* image = newImage( _data.frameType );
00220         image->startReadback( _data.buffers, pvp, zoom, glObjects );
00221         image->setOffset( pvp.x - absPVP.x, pvp.y - absPVP.y );
00222     }
00223 }
00224 
00225 void FrameData::syncReadback()
00226 {
00227     for( vector<Image*>::const_iterator iter = _images.begin();
00228          iter != _images.end(); ++iter )
00229     {
00230         Image* image = *iter;
00231         image->syncReadback();
00232 
00233 #ifndef NDEBUG
00234         if( getenv( "EQ_DUMP_IMAGES" ))
00235         {
00236             static uint32_t counter = 0;
00237             ostringstream stringstream;
00238 
00239             stringstream << "Image_" << setfill( '0' ) << setw(5) << ++counter;
00240             image->writeImages( stringstream.str( ));
00241         }
00242 #endif
00243     }
00244     setReady();
00245 }
00246 
00247 void FrameData::setReady()
00248 {
00249     _setReady( getVersion( ));
00250 }
00251 
00252 void FrameData::_setReady( const uint32_t version )
00253 {
00254     EQASSERTINFO( getVersion() == net::Object::VERSION_NONE || 
00255                   _readyVersion <= version,
00256                   "v" << getVersion() << " ready " << _readyVersion << " new "
00257                       << version );
00258 
00259     base::ScopedMutex mutex( _listenersMutex );
00260 #ifndef NDEBUG
00261     for( list<ImageVersion>::iterator i = _pendingImages.begin();
00262          i != _pendingImages.end(); ++i )
00263     {
00264         const ImageVersion& imageVersion = *i;
00265         EQASSERTINFO( imageVersion.version > version,
00266                       "Frame is ready, but not all images have been set" );
00267     }
00268 #endif
00269 
00270     if( _readyVersion >= version )
00271         return;
00272 
00273     _readyVersion = version;
00274     EQLOG( LOG_ASSEMBLY ) << "set ready " << this << ", " << _listeners.size()
00275                           << " monitoring" << endl;
00276 
00277     for( vector< Monitor<uint32_t>* >::iterator i = _listeners.begin();
00278          i != _listeners.end(); ++i )
00279     {
00280         Monitor<uint32_t>* monitor = *i;
00281         ++(*monitor);
00282     }
00283 }
00284 
00285 
00286 void FrameData::transmit( net::NodePtr toNode, const uint32_t frameNumber )
00287 {
00288     FrameDataStatistics event( Statistic::FRAME_TRANSMIT, this, frameNumber );
00289 
00290     if( _data.buffers == 0 )
00291     {
00292         EQWARN << "No buffers for frame data" << endl;
00293         return;
00294     }
00295 
00296     if ( _data.frameType == Frame::TYPE_TEXTURE )
00297     {
00298         EQWARN << "Can't transmit image of type TEXTURE" << endl;
00299         EQUNIMPLEMENTED;
00300         return;
00301     }
00302 
00303     net::ConnectionPtr             connection = toNode->getConnection();
00304     net::ConnectionDescriptionPtr description = connection->getDescription();
00305 
00306     // use compression on links up to 2 GBit/s
00307     const bool useCompression = ( description->bandwidth <= 262144 );
00308 
00309     FrameDataTransmitPacket packet;
00310     const uint64_t          packetSize = sizeof( packet ) - 8*sizeof( uint8_t );
00311     const net::Session*     session    = getSession();
00312     EQASSERT( session );
00313 
00314     packet.sessionID    = session->getID();
00315     packet.objectID     = getID();
00316     packet.version      = getVersion();
00317     packet.frameNumber  = frameNumber;
00318 
00319     // send all images
00320     for( vector<Image*>::const_iterator i = _images.begin(); 
00321          i != _images.end(); ++i )
00322     {
00323         Image* image = *i;
00324         vector< const Image::PixelData* > pixelDatas;
00325 
00326         packet.size    = packetSize;
00327         packet.buffers = Frame::BUFFER_NONE;
00328         packet.pvp     = image->getPixelViewport();
00329         packet.ignoreAlpha = image->ignoreAlpha();
00330 
00331         EQASSERT( packet.pvp.isValid( ));
00332 
00333         {
00334             uint64_t rawSize( 0 );
00335             FrameDataStatistics compressEvent( Statistic::FRAME_COMPRESS, this, 
00336                                                frameNumber );
00337             compressEvent.event.data.statistic.ratio = 1.0f;
00338             if( !useCompression )
00339                 compressEvent.event.data.statistic.frameNumber = 0;
00340 
00341             // Prepare image pixel data
00342             Frame::Buffer buffers[] = {Frame::BUFFER_COLOR,Frame::BUFFER_DEPTH};
00343 
00344             // for each image attachment
00345             for( unsigned j = 0; j < 2; ++j )
00346             {
00347                 Frame::Buffer buffer = buffers[j];
00348                 if( image->hasPixelData( buffer ))
00349                 {
00350                     // format, type, nChunks, compressor name
00351                     packet.size += 4 * sizeof( uint32_t ); 
00352 
00353                     const Image::PixelData& data = useCompression ?
00354                         image->compressPixelData( buffer ) : 
00355                         image->getPixelData( buffer );
00356                     pixelDatas.push_back( &data );
00357                     
00358                     if( data.isCompressed )
00359                     {
00360                         const uint32_t nElements = data.compressedSize.size();
00361                         for( uint32_t k = 0 ; k < nElements; ++k )
00362                         {
00363                             packet.size += sizeof( uint64_t );
00364                             packet.size += data.compressedSize[ k ];
00365                         }
00366                     }
00367                     else
00368                     {
00369                         packet.size += sizeof( uint64_t );
00370                         packet.size += data.pixels.getSize();
00371                     }
00372 
00373                     packet.buffers |= buffer;
00374                     rawSize += image->getPixelDataSize( buffer );
00375                 }
00376             }
00377 
00378             if( rawSize > 0 )
00379                 compressEvent.event.data.statistic.ratio =
00380                     static_cast< float >( packet.size ) /
00381                     static_cast< float >( rawSize );
00382         }
00383 
00384         if( pixelDatas.empty( ))
00385             continue;
00386 
00387         // send image pixel data packet
00388         if( _useSendToken )
00389             getLocalNode()->acquireSendToken( toNode );
00390 
00391         connection->lockSend();
00392         connection->send( &packet, packetSize, true );
00393 #ifndef NDEBUG
00394         size_t sentBytes = packetSize;
00395 #endif
00396 
00397         for( uint32_t j=0; j < pixelDatas.size(); ++j )
00398         {
00399 #ifndef NDEBUG
00400             sentBytes += 4 * sizeof( uint32_t );
00401 #endif
00402             const Image::PixelData* data = pixelDatas[j];
00403             const uint32_t imageHeader[4] =
00404                   { data->format,
00405                     data->type, 
00406                     data->compressorName,
00407                     data->isCompressed ? data->compressedSize.size() : 1 };
00408 
00409             connection->send( imageHeader, 4 * sizeof( uint32_t ), true );
00410             
00411             if( data->isCompressed )
00412             {
00413                 for( uint32_t k = 0 ; k < data->compressedSize.size(); ++k )
00414                 {
00415                     const uint64_t dataSize = data->compressedSize[k];
00416                     connection->send( &dataSize, sizeof( dataSize ), true );
00417                     connection->send( data->compressedData[k], dataSize, true );
00418 #ifndef NDEBUG
00419                     sentBytes += sizeof( dataSize ) + dataSize;
00420 #endif
00421                 }
00422             }
00423             else
00424             {
00425                 const uint64_t dataSize = data->pixels.getSize();
00426                 connection->send( &dataSize, sizeof( dataSize ), true );
00427                 connection->send( data->pixels.getData(), dataSize, true );
00428 #ifndef NDEBUG
00429                 sentBytes += sizeof( dataSize ) + dataSize;
00430 #endif
00431             }
00432         }
00433 #ifndef NDEBUG
00434         EQASSERTINFO( sentBytes == packet.size,
00435                       sentBytes << " != " << packet.size );
00436 #endif
00437 
00438         connection->unlockSend();
00439         if( _useSendToken )
00440             getLocalNode()->releaseSendToken( toNode );
00441     }
00442 
00443     FrameDataReadyPacket readyPacket;
00444     readyPacket.sessionID = session->getID();
00445     readyPacket.objectID  = getID();
00446     readyPacket.version   = getVersion();
00447     toNode->send( readyPacket );
00448 }
00449 
00450 void FrameData::addListener( base::Monitor<uint32_t>& listener )
00451 {
00452     _listenersMutex.set();
00453 
00454     _listeners.push_back( &listener );
00455     if( _readyVersion >= getVersion( ))
00456         ++listener;
00457 
00458     _listenersMutex.unset();
00459 }
00460 
00461 void FrameData::removeListener( base::Monitor<uint32_t>& listener )
00462 {
00463     _listenersMutex.set();
00464 
00465     vector< Monitor<uint32_t>* >::iterator i = find( _listeners.begin(),
00466                                                      _listeners.end(),
00467                                                      &listener );
00468     EQASSERT( i != _listeners.end( ));
00469     _listeners.erase( i );
00470 
00471     _listenersMutex.unset();
00472 }
00473 
00474 //----- Command handlers
00475 
00476 net::CommandResult FrameData::_cmdTransmit( net::Command& command )
00477 {
00478     CHECK_THREAD( _commandThread );
00479     const FrameDataTransmitPacket* packet =
00480         command.getPacket<FrameDataTransmitPacket>();
00481 
00482     EQLOG( LOG_ASSEMBLY )
00483         << this << " received image, buffers " << packet->buffers << " pvp "
00484         << packet->pvp << " v" << packet->version << endl;
00485 
00486     EQASSERT( packet->pvp.isValid( ));
00487 
00488     FrameDataStatistics event( Statistic::FRAME_RECEIVE, this, 
00489                                packet->frameNumber );
00490 
00491     Image*   image = _allocImage( Frame::TYPE_MEMORY );
00492     // Note on the const_cast: since the PixelData structure stores non-const
00493     // pointers, we have to go non-const at some point, even though we do not
00494     // modify the data.
00495     uint8_t* data  = const_cast< uint8_t* >( packet->data );
00496 
00497     image->setPixelViewport( packet->pvp );
00498     packet->ignoreAlpha ? image->disableAlphaUsage() :image->enableAlphaUsage();
00499 
00500     Frame::Buffer buffers[] = { Frame::BUFFER_COLOR, Frame::BUFFER_DEPTH };
00501     for( unsigned i = 0; i < 2; ++i )
00502     {
00503         Frame::Buffer buffer = buffers[i];
00504         
00505         if( packet->buffers & buffer )
00506         {
00507             Image::PixelData pixelData;
00508             const uint32_t*  u32Data   = reinterpret_cast< uint32_t* >( data );
00509             
00510             pixelData.format         = u32Data[0];
00511             pixelData.type           = u32Data[1];
00512             pixelData.compressorName = u32Data[2];
00513             const uint32_t nChunks   = u32Data[3];
00514             
00515             data += 4 * sizeof( uint32_t );
00516             
00517             if( pixelData.compressorName > EQ_COMPRESSOR_NONE )
00518             {
00519                 pixelData.compressedSize.resize( nChunks );
00520                 pixelData.compressedData.resize( nChunks );
00521 
00522                 for( uint32_t j = 0; j < nChunks; ++j )
00523                 {
00524                     const uint64_t size = *reinterpret_cast< uint64_t*>( data );
00525                     data += sizeof( uint64_t );
00526                     
00527                     pixelData.compressedSize[j] = size; 
00528                     pixelData.compressedData[j] = data;
00529                     data += size;
00530                 }
00531 
00532                 image->setPixelData( buffer, pixelData );
00533             }
00534             else
00535             {
00536                 const uint64_t size = *reinterpret_cast< uint64_t* >( data );
00537                 data += sizeof( uint64_t );
00538 
00539                 image->setFormat( buffer, pixelData.format );
00540                 image->setType( buffer, pixelData.type );
00541                 EQASSERT( size == image->getPixelDataSize( buffer ));
00542 
00543                 image->setPixelData( buffer, data );
00544                 data += size;
00545             }
00546 
00547             // Prevent ~PixelData from freeing pointers
00548             pixelData.compressedSize.clear();
00549             pixelData.compressedData.clear();
00550         }
00551     }
00552 
00553     const uint32_t version = getVersion();
00554 
00555     if( version == packet->version )
00556     {
00557         EQASSERT( _readyVersion < getVersion( ));
00558         _images.push_back( image );
00559     }
00560     else
00561     {
00562         EQASSERT( version < packet->version );
00563         _pendingImages.push_back( ImageVersion( image, packet->version ));
00564     }
00565 
00566     return net::COMMAND_HANDLED;
00567 }
00568 
00569 net::CommandResult FrameData::_cmdReady( net::Command& command )
00570 {
00571     CHECK_THREAD( _commandThread );
00572     const FrameDataReadyPacket* packet =
00573         command.getPacket<FrameDataReadyPacket>();
00574 
00575     if( getVersion() == packet->version )
00576     {
00577         _applyVersion( packet->version );
00578         _setReady( packet->version );
00579     }
00580     else
00581         _readyVersions.insert( packet->version );
00582 
00583     EQLOG( LOG_ASSEMBLY ) << this << " received v" << packet->version << endl;
00584 
00585     return net::COMMAND_HANDLED;
00586 }
00587 
00588 net::CommandResult FrameData::_cmdUpdate( net::Command& command )
00589 {
00590     CHECK_THREAD( _commandThread );
00591     const FrameDataUpdatePacket* packet =
00592         command.getPacket<FrameDataUpdatePacket>();
00593 
00594     _applyVersion( packet->version );
00595 
00596     std::set< uint32_t >::iterator i = _readyVersions.find( packet->version );
00597     if( i != _readyVersions.end( ))
00598     {
00599         _readyVersions.erase( i );
00600         _setReady( packet->version );
00601     }
00602 
00603     return net::COMMAND_HANDLED;
00604 }
00605 
00606 void FrameData::_applyVersion( const uint32_t version )
00607 {
00608     CHECK_THREAD( _commandThread );
00609     EQLOG( LOG_ASSEMBLY ) << this << " apply v" << version << endl;
00610 
00611     // Input images sync() to the new version, then send an update packet and
00612     // immediately continue. If they read back and setReady faster than we
00613     // process this update packet, the readyVersion changes at any given point
00614     // in this code.
00615     if( _readyVersion == version )
00616     {
00617 #ifndef NDEBUG
00618         for( list<ImageVersion>::iterator i = _pendingImages.begin();
00619              i != _pendingImages.end(); ++i )
00620         {
00621             const ImageVersion& imageVersion = *i;
00622             EQASSERTINFO( imageVersion.version > version,
00623                           "Frame is ready, but not all images have been set" );
00624         }
00625 #endif
00626 
00627         // already applied
00628         return;
00629     }
00630 
00631     // Even if _readyVersion jumped to version in between, there are no pending
00632     // images for it, so this loop doesn't do anything.
00633     for( list<ImageVersion>::iterator i = _pendingImages.begin();
00634          i != _pendingImages.end(); )
00635     {
00636         const ImageVersion& imageVersion = *i;
00637         EQASSERT( imageVersion.version >= version );
00638 
00639         if( imageVersion.version == version )
00640         {
00641             _images.push_back( imageVersion.image );
00642             list<ImageVersion>::iterator eraseIter = i;
00643             ++i;
00644             _pendingImages.erase( eraseIter );
00645 
00646             EQASSERT( _readyVersion < version );
00647         }
00648         else
00649             ++i;
00650     }
00651 
00652     EQLOG( LOG_ASSEMBLY ) << this << " applied v" << version << endl;
00653 }
00654 
00655 std::ostream& operator << ( std::ostream& os, const FrameData* data )
00656 {
00657     os << "frame data id " << data->getID() << "." << data->getInstanceID()
00658        << " v" << data->getVersion() << ' ' << data->getImages().size()
00659        << " images, ready " << ( data->isReady() ? 'y' :'n' );
00660     return os;
00661 }
00662 
00663 }
Generated on Mon Aug 10 18:58:33 2009 for Equalizer 0.9 by  doxygen 1.5.8