rspConnection.h

00001 
00002 /* Copyright (c) 2009, Cedric Stalder <[email protected]> 
00003  *               2009-2010, Stefan Eilemann <[email protected]>
00004  *
00005  * This library is free software; you can redistribute it and/or modify it under
00006  * the terms of the GNU Lesser General Public License version 2.1 as published
00007  * by the Free Software Foundation.
00008  *
00009  * This library is distributed in the hope that it will be useful, but WITHOUT
00010  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00011  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00012  * details.
00013  *
00014  * You should have received a copy of the GNU Lesser General Public License
00015  * along with this library; if not, write to the Free Software Foundation, Inc.,
00016  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00017  */
00018 
00019 #ifndef EQNET_RSPCONNECTION_H
00020 #define EQNET_RSPCONNECTION_H
00021 
00022 #include <eq/net/connection.h>
00023 
00024 #ifdef EQ_USE_BOOST
00025 #include <eq/net/connectionSet.h> // member
00026 #include <eq/net/types.h>
00027 #include "eventConnection.h" // member
00028 
00029 #include <eq/base/base.h>
00030 #include <eq/base/buffer.h>  // member
00031 #include <eq/base/lfQueue.h> // member
00032 #include <eq/base/mtQueue.h> // member
00033 
00034 #include <boost/asio.hpp>
00035 
00036 namespace eq
00037 {
00038 namespace net
00039 {
00040     class ConnectionDescription;
00041 
00049     class RSPConnection : public Connection
00050     {
00051     public:
00053         RSPConnection();
00054         
00055         virtual bool listen();
00056         void close();
00057 
00059         bool connect(){ return listen(); }
00060 
00061         virtual void acceptNB(){ EQASSERT( _state == STATE_LISTENING ); }
00062 
00063         virtual ConnectionPtr acceptSync();
00064         virtual void readNB( void*, const uint64_t ) {/* NOP */}
00065         virtual int64_t readSync( void* buffer, const uint64_t bytes,
00066                                   const bool ignored );
00067         virtual int64_t write( const void* buffer, const uint64_t bytes );
00068 
00070         int64_t getSendRate() const { return _sendRate; }
00071 
00077         uint16_t getID() const { return _id; }
00078         
00079         virtual Notifier getNotifier() const { return _event->getNotifier(); }
00080 
00081     protected:
00082         virtual ~RSPConnection();
00083     
00084     private:
00085         typedef base::RefPtr< RSPConnection > RSPConnectionPtr;
00086 
00088         class Thread : public base::Thread
00089         {
00090         public: 
00091             Thread( RSPConnectionPtr connection )
00092                 : _connection( connection ){}
00093             virtual ~Thread(){ _connection = 0; }
00094         protected:
00095             virtual void run();
00096             virtual bool init() { return _connection->_initThread(); }
00097             
00098         private:
00099             RSPConnectionPtr _connection;
00100         };
00101 
00103         enum DatagramType
00104         { 
00105             DATA,      
00106             ACKREQ,    
00107             NACK,      
00108             ACK,       
00109             ID_HELLO,  
00110             ID_DENY,   
00111             ID_CONFIRM,
00112             ID_EXIT,   
00113             COUNTNODE  
00114         };
00115         
00117         struct DatagramNode
00118         {
00119             uint16_t type;
00120             uint16_t connectionID;
00121         };
00122 
00124         struct DatagramCount
00125         {
00126             uint16_t type;
00127             uint16_t clientID;
00128             uint16_t numConnections;
00129         };
00130 
00132         struct DatagramAckRequest
00133         {
00134             uint16_t type;
00135             uint16_t writerID;
00136             uint16_t sequence;
00137         };
00138         
00140         struct Nack
00141         {
00142             uint16_t start;
00143             uint16_t end;
00144         };
00145 
00146 #       define EQ_RSP_MAX_NACKS 300 // fits in a single IP frame
00147 
00148         struct DatagramNack
00149         {
00150             void set( uint16_t rID, uint16_t wID, uint16_t n )
00151             {
00152                 type       = NACK;
00153                 readerID   = rID; 
00154                 writerID   = wID;   
00155                 count      = n;
00156             }
00157 
00158             uint16_t       type;
00159             uint16_t       readerID;
00160             uint16_t       writerID;
00161             uint16_t       count;       
00162             Nack           nacks[ EQ_RSP_MAX_NACKS ];
00163         };
00164 
00166         struct DatagramAck
00167         {
00168             uint16_t        type;
00169             uint16_t        readerID;
00170             uint16_t        writerID;
00171             uint16_t        sequence;
00172         };
00173 
00175         struct DatagramData
00176         {
00177             uint16_t    type;
00178             uint16_t    size;
00179             uint16_t    writerID;
00180             uint16_t    sequence;
00181         };
00182 
00183         typedef std::vector< RSPConnectionPtr > RSPConnections;
00184 
00185         RSPConnectionPtr _parent;
00186         RSPConnections _children;
00187 
00188         // a link for all connection in the connecting state 
00189         RSPConnections _childrenConnecting;
00190 
00191         uint16_t _id; 
00192         bool     _idAccepted;
00193         int32_t  _mtu;
00194         int32_t  _ackFreq;
00195         uint32_t _payloadSize;
00196         int32_t  _timeouts;
00197 
00198         typedef base::RefPtr< EventConnection > EventConnectionPtr;
00199         EventConnectionPtr _event;
00200 
00201         boost::asio::io_service        _ioService;
00202         boost::asio::ip::udp::socket*  _read;
00203         boost::asio::ip::udp::socket*  _write;
00204         boost::asio::ip::udp::endpoint _readAddr;
00205         boost::asio::deadline_timer    _timeout;
00206         boost::asio::deadline_timer    _wakeup;
00207         
00208         eq::base::Clock _clock;
00209         uint64_t        _maxBucketSize;
00210         size_t          _bucketSize;
00211         int64_t         _sendRate;
00212 
00213         Thread*          _thread;
00214         base::Lock       _mutexConnection;
00215         base::Lock       _mutexEvent;
00216         uint16_t         _acked;        // sequence ID of last confirmed ack
00217 
00218         typedef base::Bufferb Buffer;
00219         typedef std::vector< Buffer* > Buffers;
00220 
00221         Buffers _buffers;                   
00222 
00223         base::LFQueue< Buffer* > _threadBuffers;
00225         base::MTQueue< Buffer* > _appBuffers;
00226 
00227         Buffer _recvBuffer;                      
00228         std::deque< Buffer* > _recvBuffers;      
00229 
00230         Buffer* _readBuffer;                     
00231         uint64_t _readBufferPos;                 
00232 
00233         uint16_t _sequence; 
00234         std::deque< Buffer* > _writeBuffers;    
00235 
00236         typedef std::deque< Nack > RepeatQueue;
00237         RepeatQueue _repeatQueue; 
00238 
00239         void _close();
00240         uint16_t _buildNewID();
00241         
00242         void _processOutgoing();
00243         void _writeData();
00244         void _repeatData();
00245         void _finishWriteQueue( const uint16_t sequence );
00246 
00247         bool _handleData( Buffer& buffer );
00248         bool _handleAck( const DatagramAck* ack );
00249         bool _handleNack( const DatagramNack* nack );
00250         bool _handleAckRequest( const DatagramAckRequest* ackRequest );
00251 
00253         bool _handleCountNode();
00254 
00255         Buffer* _newDataBuffer( Buffer& inBuffer );
00256         void _pushDataBuffer( Buffer* buffer );
00257 
00258         /* Run the reader thread */
00259         void _runThread();
00260 
00261         /* init the reader thread */
00262         bool _initThread();
00263         /* Make all buffers available for reading */
00264         void initBuffers();
00265         /* handle data about the comunication state */ 
00266         void _handlePacket( const boost::system::error_code& error,
00267                             const size_t bytes );
00268         void _handleConnectedData( const void* data );
00269         void _handleInitData( const void* data );
00270         void _handleAcceptIDData( const void* data );
00271 
00272         /* handle timeout about the comunication state */
00273         void _handleTimeout( const boost::system::error_code& error );
00274         void _handleConnectedTimeout( );
00275         void _handleInitTimeout( );
00276         void _handleAcceptIDTimeout( );
00277 
00279         RSPConnectionPtr _findConnection( const uint16_t id );
00280         
00282         void _waitWritable( const uint64_t bytes );
00283 
00285         void _sendDatagramCountNode();
00286 
00287         void _addRepeat( const Nack* nacks, const uint16_t num );
00288 
00290         void _sendSimpleDatagram( DatagramType type, uint16_t id );
00291         
00293         void _sendAckRequest();
00294 
00296         void _sendAck( const uint16_t writerID, const uint16_t sequence );
00297         
00299         void _sendNack( const uint16_t toWriterID, const Nack* nacks,
00300                         const uint16_t num );
00301         
00302         void _checkNewID( const uint16_t id );
00303 
00304         /* add a new connection detected in the multicast network */
00305         bool _addNewConnection( const uint16_t id );
00306         void _removeConnection( const uint16_t id );
00307 
00308         void _setTimeout( const int32_t timeOut );
00309         void _postWakeup();
00310         void _asyncReceiveFrom();
00311         bool _isWriting()
00312             { return !_threadBuffers.isEmpty() || !_writeBuffers.empty( ); }
00313     };
00314 
00315     std::ostream& operator << ( std::ostream&, const RSPConnection& );
00316 }
00317 }
00318 
00319 #endif //EQNET_RSPCONNECTION_H
00320 #endif //EQ_USE_BOOST
Generated on Wed Dec 1 2010 15:34:21 for EqualizerInternal 1.0-alpha by  1.7.2