00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include <Mswsock.h>
00019
00020 using namespace eq::base;
00021
00022 #ifdef WIN32
00023 namespace eq
00024 {
00025 namespace net
00026 {
00027 SocketConnection::SocketConnection( const ConnectionType type )
00028 : _overlappedAcceptData( 0 )
00029 , _overlappedSocket( INVALID_SOCKET )
00030 {
00031 memset( &_overlapped, 0, sizeof( _overlapped ));
00032
00033 EQASSERT( type == CONNECTIONTYPE_TCPIP || type == CONNECTIONTYPE_SDP );
00034 _description = new ConnectionDescription;
00035 _description->type = type;
00036 _description->bandwidth = (type == CONNECTIONTYPE_TCPIP) ?
00037 102400 : 819200;
00038 }
00039
00040 SocketConnection::~SocketConnection()
00041 {
00042 close();
00043 }
00044
00045
00046
00047
00048 bool SocketConnection::connect()
00049 {
00050 EQASSERT( _description->type == CONNECTIONTYPE_TCPIP ||
00051 _description->type == CONNECTIONTYPE_SDP );
00052 if( _state != STATE_CLOSED )
00053 return false;
00054
00055 if( _description->TCPIP.port == 0 )
00056 return false;
00057
00058 _state = STATE_CONNECTING;
00059 _fireStateChanged();
00060
00061 if( !_createSocket( ))
00062 return false;
00063
00064 sockaddr_in socketAddress;
00065 if( !_parseAddress( socketAddress ))
00066 {
00067 EQWARN << "Can't parse connection parameters" << std::endl;
00068 close();
00069 return false;
00070 }
00071
00072 if( socketAddress.sin_addr.s_addr == 0 )
00073 {
00074 EQWARN << "Refuse to connect to 0.0.0.0" << std::endl;
00075 close();
00076 return false;
00077 }
00078
00079 const bool connected = WSAConnect( _readFD, (sockaddr*)&socketAddress,
00080 sizeof(socketAddress), 0, 0, 0, 0 ) == 0;
00081 if( !connected )
00082 {
00083 EQWARN << "Could not connect to '" << _description->getHostname() << ":"
00084 << _description->TCPIP.port << "': " << EQ_SOCKET_ERROR
00085 << std::endl;
00086 close();
00087 return false;
00088 }
00089
00090 _initAIORead();
00091 _state = STATE_CONNECTED;
00092 _fireStateChanged();
00093 return true;
00094 }
00095
00096 void SocketConnection::close()
00097 {
00098 if( !(_state == STATE_CONNECTED || _state == STATE_LISTENING ))
00099 return;
00100
00101 if( isListening( ))
00102 _exitAIOAccept();
00103 else
00104 _exitAIORead();
00105
00106 _state = STATE_CLOSED;
00107 EQASSERT( _readFD > 0 );
00108
00109 const bool closed = ( ::closesocket(_readFD) == 0 );
00110 if( !closed )
00111 EQWARN << "Could not close socket: " << EQ_SOCKET_ERROR << std::endl;
00112
00113 _readFD = INVALID_SOCKET;
00114 _writeFD = INVALID_SOCKET;
00115 _fireStateChanged();
00116 }
00117
00118
00119
00120
00121 void SocketConnection::_initAIORead()
00122 {
00123 _overlapped.hEvent = CreateEvent( 0, FALSE, FALSE, 0 );
00124 EQASSERT( _overlapped.hEvent );
00125
00126 if( !_overlapped.hEvent )
00127 EQERROR << "Can't create event for AIO notification: "
00128 << EQ_SOCKET_ERROR << std::endl;
00129 }
00130
00131 void SocketConnection::_initAIOAccept()
00132 {
00133 _initAIORead();
00134 _overlappedAcceptData = malloc( 2*( sizeof( sockaddr_in ) + 16 ));
00135 }
00136
00137 void SocketConnection::_exitAIOAccept()
00138 {
00139 if( _overlappedAcceptData )
00140 {
00141 free( _overlappedAcceptData );
00142 _overlappedAcceptData = 0;
00143 }
00144
00145 _exitAIORead();
00146 }
00147 void SocketConnection::_exitAIORead()
00148 {
00149 if( _overlapped.hEvent )
00150 {
00151 CloseHandle( _overlapped.hEvent );
00152 _overlapped.hEvent = 0;
00153 }
00154 }
00155
00156
00157
00158
00159 void SocketConnection::acceptNB()
00160 {
00161 EQASSERT( _state == STATE_LISTENING );
00162
00163
00164 const DWORD flags = _description->type == CONNECTIONTYPE_SDP ?
00165 WSA_FLAG_OVERLAPPED | WSA_FLAG_SDP :
00166 WSA_FLAG_OVERLAPPED;
00167
00168 EQASSERT( _overlappedAcceptData );
00169 EQASSERT( _overlappedSocket == INVALID_SOCKET );
00170 _overlappedSocket = WSASocket( AF_INET, SOCK_STREAM, IPPROTO_TCP, 0, 0,
00171 flags );
00172
00173 if( _overlappedSocket == INVALID_SOCKET )
00174 {
00175 EQERROR << "Could not create accept socket: " << EQ_SOCKET_ERROR
00176 << ", closing listening socket" << std::endl;
00177 close();
00178 return;
00179 }
00180
00181 const int on = 1;
00182 setsockopt( _overlappedSocket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
00183 reinterpret_cast<const char*>( &on ), sizeof( on ));
00184
00185
00186 ResetEvent( _overlapped.hEvent );
00187 DWORD got;
00188 if( !AcceptEx( _readFD, _overlappedSocket, _overlappedAcceptData, 0,
00189 sizeof( sockaddr_in ) + 16, sizeof( sockaddr_in ) + 16,
00190 &got, &_overlapped ) &&
00191 GetLastError() != WSA_IO_PENDING )
00192 {
00193 EQERROR << "Could not start accept operation: " << EQ_SOCKET_ERROR
00194 << ", closing connection" << std::endl;
00195 close();
00196 }
00197 }
00198
00199 ConnectionPtr SocketConnection::acceptSync()
00200 {
00201 CHECK_THREAD( _recvThread );
00202 if( _state != STATE_LISTENING )
00203 return 0;
00204
00205 EQASSERT( _overlappedAcceptData );
00206 EQASSERT( _overlappedSocket != INVALID_SOCKET );
00207 if( _overlappedSocket == INVALID_SOCKET )
00208 return 0;
00209
00210
00211 DWORD got = 0;
00212 DWORD flags = 0;
00213 if( !WSAGetOverlappedResult( _readFD, &_overlapped, &got, TRUE, &flags ))
00214 {
00215 EQWARN << "Accept completion failed: " << EQ_SOCKET_ERROR
00216 << ", closing socket" << std::endl;
00217 close();
00218 return 0;
00219 }
00220
00221 sockaddr_in* local = 0;
00222 sockaddr_in* remote = 0;
00223 int localLen = 0;
00224 int remoteLen = 0;
00225 GetAcceptExSockaddrs( _overlappedAcceptData, 0, sizeof( sockaddr_in ) + 16,
00226 sizeof( sockaddr_in ) + 16, (sockaddr**)&local,
00227 &localLen, (sockaddr**)&remote, &remoteLen );
00228 _tuneSocket( _overlappedSocket );
00229
00230 SocketConnection* newConnection = new SocketConnection(_description->type );
00231
00232 newConnection->_readFD = _overlappedSocket;
00233 newConnection->_writeFD = _overlappedSocket;
00234 newConnection->_initAIORead();
00235 _overlappedSocket = INVALID_SOCKET;
00236
00237 newConnection->_state = STATE_CONNECTED;
00238 newConnection->_description->bandwidth = _description->bandwidth;
00239 newConnection->_description->TCPIP.port = ntohs( remote->sin_port );
00240 newConnection->_description->setHostname( inet_ntoa( remote->sin_addr ));
00241
00242 EQINFO << "accepted connection from " << inet_ntoa( remote->sin_addr )
00243 << ":" << ntohs( remote->sin_port ) << std::endl;
00244 return newConnection;
00245 }
00246
00247
00248
00249
00250 void SocketConnection::readNB( void* buffer, const uint64_t bytes )
00251 {
00252 if( _state == STATE_CLOSED )
00253 return;
00254
00255 WSABUF wsaBuffer = { EQ_MIN( bytes, 1048576 ),
00256 reinterpret_cast< char* >( buffer ) };
00257 DWORD got = 0;
00258 DWORD flags = 0;
00259
00260 ResetEvent( _overlapped.hEvent );
00261 if( WSARecv( _readFD, &wsaBuffer, 1, &got, &flags, &_overlapped, 0 ) != 0 &&
00262 GetLastError() != WSA_IO_PENDING )
00263 {
00264 EQWARN << "Could not start overlapped receive: " << EQ_SOCKET_ERROR
00265 << ", closing connection" << std::endl;
00266 close();
00267 }
00268 }
00269
00270 int64_t SocketConnection::readSync( void* buffer, const uint64_t bytes )
00271 {
00272 CHECK_THREAD( _recvThread );
00273
00274 if( _readFD == INVALID_SOCKET )
00275 {
00276 EQERROR << "Invalid read handle" << std::endl;
00277 return -1;
00278 }
00279
00280 DWORD got = 0;
00281 DWORD flags = 0;
00282 if( !WSAGetOverlappedResult( _readFD, &_overlapped, &got, TRUE, &flags ))
00283 {
00284 if( GetLastError() == WSASYSCALLFAILURE )
00285 return 0;
00286
00287 EQWARN << "Read complete failed: " << EQ_SOCKET_ERROR
00288 << ", closing connection" << std::endl;
00289 close();
00290 return -1;
00291 }
00292
00293 if( got == 0 )
00294 {
00295 EQWARN << "Read operation returned with nothing read"
00296 << ", closing connection." << std::endl;
00297 close();
00298 return -1;
00299 }
00300
00301 return got;
00302 }
00303
00304 int64_t SocketConnection::write( const void* buffer, const uint64_t bytes)
00305 {
00306 if( _writeFD == INVALID_SOCKET )
00307 return -1;
00308
00309 DWORD wrote;
00310 WSABUF wsaBuffer =
00311 {
00312 EQ_MIN( bytes, 1048576 ),
00313 const_cast<char*>( static_cast< const char* >( buffer ))
00314 };
00315
00316 while( true )
00317 {
00318 if( WSASend( _writeFD, &wsaBuffer, 1, &wrote, 0, 0, 0 ) == 0 )
00319 return wrote;
00320
00321
00322 if( GetLastError( ) != WSAEWOULDBLOCK )
00323 {
00324 EQWARN << "Error during write: " << EQ_SOCKET_ERROR << std::endl;
00325 return -1;
00326 }
00327
00328
00329 #if 1
00330
00331 fd_set set;
00332 FD_ZERO( &set );
00333 FD_SET( _writeFD, &set );
00334
00335 const int result = select( _writeFD+1, 0, &set, 0, 0 );
00336 if( result <= 0 )
00337 {
00338 EQWARN << "Error during select: " << EQ_SOCKET_ERROR << std::endl;
00339 return -1;
00340 }
00341 #endif
00342 }
00343
00344 EQUNREACHABLE;
00345 return -1;
00346 }
00347 }
00348 }
00349 #else
00350 # error "File is only for WIN32 builds"
00351 #endif