00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #ifndef EQNET_RSPCONNECTION_H
00020 #define EQNET_RSPCONNECTION_H
00021
00022 #include <eq/net/connection.h>
00023
00024 #ifdef EQ_USE_BOOST
00025 #include <eq/net/connectionSet.h>
00026 #include <eq/net/types.h>
00027 #include "eventConnection.h"
00028
00029 #include <eq/base/base.h>
00030 #include <eq/base/buffer.h>
00031 #include <eq/base/lfQueue.h>
00032 #include <eq/base/mtQueue.h>
00033
00034 #include <boost/asio.hpp>
00035
00036 namespace eq
00037 {
00038 namespace net
00039 {
00040 class ConnectionDescription;
00041
00049 class RSPConnection : public Connection
00050 {
00051 public:
00053 RSPConnection();
00054
00055 virtual bool listen();
00056 void close();
00057
00059 bool connect(){ return listen(); }
00060
00061 virtual void acceptNB(){ EQASSERT( _state == STATE_LISTENING ); }
00062
00063 virtual ConnectionPtr acceptSync();
00064 virtual void readNB( void*, const uint64_t ) {}
00065 virtual int64_t readSync( void* buffer, const uint64_t bytes,
00066 const bool ignored );
00067 virtual int64_t write( const void* buffer, const uint64_t bytes );
00068
00070 int64_t getSendRate() const { return _sendRate; }
00071
00077 uint16_t getID() const { return _id; }
00078
00079 virtual Notifier getNotifier() const { return _event->getNotifier(); }
00080
00081 protected:
00082 virtual ~RSPConnection();
00083
00084 private:
00085 typedef base::RefPtr< RSPConnection > RSPConnectionPtr;
00086
00088 class Thread : public base::Thread
00089 {
00090 public:
00091 Thread( RSPConnectionPtr connection )
00092 : _connection( connection ){}
00093 virtual ~Thread(){ _connection = 0; }
00094 protected:
00095 virtual void run();
00096 virtual bool init() { return _connection->_initThread(); }
00097
00098 private:
00099 RSPConnectionPtr _connection;
00100 };
00101
00103 enum DatagramType
00104 {
00105 DATA,
00106 ACKREQ,
00107 NACK,
00108 ACK,
00109 ID_HELLO,
00110 ID_DENY,
00111 ID_CONFIRM,
00112 ID_EXIT,
00113 COUNTNODE
00114 };
00115
00117 struct DatagramNode
00118 {
00119 uint16_t type;
00120 uint16_t connectionID;
00121 };
00122
00124 struct DatagramCount
00125 {
00126 uint16_t type;
00127 uint16_t clientID;
00128 uint16_t numConnections;
00129 };
00130
00132 struct DatagramAckRequest
00133 {
00134 uint16_t type;
00135 uint16_t writerID;
00136 uint16_t sequence;
00137 };
00138
00140 struct Nack
00141 {
00142 uint16_t start;
00143 uint16_t end;
00144 };
00145
00146 # define EQ_RSP_MAX_NACKS 300 // fits in a single IP frame
00147
00148 struct DatagramNack
00149 {
00150 void set( uint16_t rID, uint16_t wID, uint16_t n )
00151 {
00152 type = NACK;
00153 readerID = rID;
00154 writerID = wID;
00155 count = n;
00156 }
00157
00158 uint16_t type;
00159 uint16_t readerID;
00160 uint16_t writerID;
00161 uint16_t count;
00162 Nack nacks[ EQ_RSP_MAX_NACKS ];
00163 };
00164
00166 struct DatagramAck
00167 {
00168 uint16_t type;
00169 uint16_t readerID;
00170 uint16_t writerID;
00171 uint16_t sequence;
00172 };
00173
00175 struct DatagramData
00176 {
00177 uint16_t type;
00178 uint16_t size;
00179 uint16_t writerID;
00180 uint16_t sequence;
00181 };
00182
00183 typedef std::vector< RSPConnectionPtr > RSPConnections;
00184
00185 RSPConnectionPtr _parent;
00186 RSPConnections _children;
00187
00188
00189 RSPConnections _childrenConnecting;
00190
00191 uint16_t _id;
00192 bool _idAccepted;
00193 int32_t _mtu;
00194 int32_t _ackFreq;
00195 uint32_t _payloadSize;
00196 int32_t _timeouts;
00197
00198 typedef base::RefPtr< EventConnection > EventConnectionPtr;
00199 EventConnectionPtr _event;
00200
00201 boost::asio::io_service _ioService;
00202 boost::asio::ip::udp::socket* _read;
00203 boost::asio::ip::udp::socket* _write;
00204 boost::asio::ip::udp::endpoint _readAddr;
00205 boost::asio::deadline_timer _timeout;
00206 boost::asio::deadline_timer _wakeup;
00207
00208 eq::base::Clock _clock;
00209 uint64_t _maxBucketSize;
00210 size_t _bucketSize;
00211 int64_t _sendRate;
00212
00213 Thread* _thread;
00214 base::Lock _mutexConnection;
00215 base::Lock _mutexEvent;
00216 uint16_t _acked;
00217
00218 typedef base::Bufferb Buffer;
00219 typedef std::vector< Buffer* > Buffers;
00220
00221 Buffers _buffers;
00222
00223 base::LFQueue< Buffer* > _threadBuffers;
00225 base::MTQueue< Buffer* > _appBuffers;
00226
00227 Buffer _recvBuffer;
00228 std::deque< Buffer* > _recvBuffers;
00229
00230 Buffer* _readBuffer;
00231 uint64_t _readBufferPos;
00232
00233 uint16_t _sequence;
00234 std::deque< Buffer* > _writeBuffers;
00235
00236 typedef std::deque< Nack > RepeatQueue;
00237 RepeatQueue _repeatQueue;
00238
00239 void _close();
00240 uint16_t _buildNewID();
00241
00242 void _processOutgoing();
00243 void _writeData();
00244 void _repeatData();
00245 void _finishWriteQueue( const uint16_t sequence );
00246
00247 bool _handleData( Buffer& buffer );
00248 bool _handleAck( const DatagramAck* ack );
00249 bool _handleNack( const DatagramNack* nack );
00250 bool _handleAckRequest( const DatagramAckRequest* ackRequest );
00251
00253 bool _handleCountNode();
00254
00255 Buffer* _newDataBuffer( Buffer& inBuffer );
00256 void _pushDataBuffer( Buffer* buffer );
00257
00258
00259 void _runThread();
00260
00261
00262 bool _initThread();
00263
00264 void initBuffers();
00265
00266 void _handlePacket( const boost::system::error_code& error,
00267 const size_t bytes );
00268 void _handleConnectedData( const void* data );
00269 void _handleInitData( const void* data );
00270 void _handleAcceptIDData( const void* data );
00271
00272
00273 void _handleTimeout( const boost::system::error_code& error );
00274 void _handleConnectedTimeout( );
00275 void _handleInitTimeout( );
00276 void _handleAcceptIDTimeout( );
00277
00279 RSPConnectionPtr _findConnection( const uint16_t id );
00280
00282 void _waitWritable( const uint64_t bytes );
00283
00285 void _sendDatagramCountNode();
00286
00287 void _addRepeat( const Nack* nacks, const uint16_t num );
00288
00290 void _sendSimpleDatagram( DatagramType type, uint16_t id );
00291
00293 void _sendAckRequest();
00294
00296 void _sendAck( const uint16_t writerID, const uint16_t sequence );
00297
00299 void _sendNack( const uint16_t toWriterID, const Nack* nacks,
00300 const uint16_t num );
00301
00302 void _checkNewID( const uint16_t id );
00303
00304
00305 bool _addNewConnection( const uint16_t id );
00306 void _removeConnection( const uint16_t id );
00307
00308 void _setTimeout( const int32_t timeOut );
00309 void _postWakeup();
00310 void _asyncReceiveFrom();
00311 bool _isWriting()
00312 { return !_threadBuffers.isEmpty() || !_writeBuffers.empty( ); }
00313 };
00314
00315 std::ostream& operator << ( std::ostream&, const RSPConnection& );
00316 }
00317 }
00318
00319 #endif //EQNET_RSPCONNECTION_H
00320 #endif //EQ_USE_BOOST