socketConnectionWin32.cpp

00001 
00002 /* Copyright (c) 2005-2009, Stefan Eilemann <eile@equalizergraphics.com> 
00003  *
00004  * This library is free software; you can redistribute it and/or modify it under
00005  * the terms of the GNU Lesser General Public License version 2.1 as published
00006  * by the Free Software Foundation.
00007  *  
00008  * This library is distributed in the hope that it will be useful, but WITHOUT
00009  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00010  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00011  * details.
00012  * 
00013  * You should have received a copy of the GNU Lesser General Public License
00014  * along with this library; if not, write to the Free Software Foundation, Inc.,
00015  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00016  */
00017 
00018 #include <Mswsock.h>
00019 
00020 using namespace eq::base;
00021 
00022 #ifdef WIN32
00023 namespace eq
00024 {
00025 namespace net
00026 {
00027 SocketConnection::SocketConnection( const ConnectionType type )
00028         : _overlappedAcceptData( 0 )
00029         , _overlappedSocket( INVALID_SOCKET )
00030 {
00031     memset( &_overlapped, 0, sizeof( _overlapped ));
00032 
00033     EQASSERT( type == CONNECTIONTYPE_TCPIP || type == CONNECTIONTYPE_SDP );
00034     _description =  new ConnectionDescription;
00035     _description->type = type;
00036     _description->bandwidth = (type == CONNECTIONTYPE_TCPIP) ?
00037                                   102400 : 819200;
00038 }
00039 
00040 SocketConnection::~SocketConnection()
00041 {
00042     close();
00043 }
00044 
00045 //----------------------------------------------------------------------
00046 // connect
00047 //----------------------------------------------------------------------
00048 bool SocketConnection::connect()
00049 {
00050     EQASSERT( _description->type == CONNECTIONTYPE_TCPIP ||
00051               _description->type == CONNECTIONTYPE_SDP );
00052     if( _state != STATE_CLOSED )
00053         return false;
00054 
00055     if( _description->TCPIP.port == 0 )
00056         return false;
00057 
00058     _state = STATE_CONNECTING;
00059     _fireStateChanged();
00060 
00061     if( !_createSocket( ))
00062         return false;
00063 
00064     sockaddr_in socketAddress;
00065     if( !_parseAddress( socketAddress ))
00066     {
00067         EQWARN << "Can't parse connection parameters" << std::endl;
00068         close();
00069         return false;
00070     }
00071 
00072     if( socketAddress.sin_addr.s_addr == 0 )
00073     {
00074         EQWARN << "Refuse to connect to 0.0.0.0" << std::endl;
00075         close();
00076         return false;
00077     }
00078 
00079     const bool connected = WSAConnect( _readFD, (sockaddr*)&socketAddress, 
00080                                        sizeof(socketAddress), 0, 0, 0, 0 ) == 0;
00081     if( !connected )
00082     {
00083         EQWARN << "Could not connect to '" << _description->getHostname() << ":"
00084                << _description->TCPIP.port << "': " << EQ_SOCKET_ERROR
00085                << std::endl;
00086         close();
00087         return false;
00088     }
00089 
00090     _initAIORead();
00091     _state = STATE_CONNECTED;
00092     _fireStateChanged();
00093     return true;
00094 }
00095 
00096 void SocketConnection::close()
00097 {
00098     if( !(_state == STATE_CONNECTED || _state == STATE_LISTENING ))
00099         return;
00100 
00101     if( isListening( ))
00102         _exitAIOAccept();
00103     else
00104         _exitAIORead();
00105 
00106     _state = STATE_CLOSED;
00107     EQASSERT( _readFD > 0 ); 
00108 
00109     const bool closed = ( ::closesocket(_readFD) == 0 );
00110     if( !closed )
00111         EQWARN << "Could not close socket: " << EQ_SOCKET_ERROR << std::endl;
00112 
00113     _readFD  = INVALID_SOCKET;
00114     _writeFD = INVALID_SOCKET;
00115     _fireStateChanged();
00116 }
00117 
00118 //----------------------------------------------------------------------
00119 // Async IO handle
00120 //----------------------------------------------------------------------
00121 void SocketConnection::_initAIORead()
00122 {
00123     _overlapped.hEvent = CreateEvent( 0, FALSE, FALSE, 0 );
00124     EQASSERT( _overlapped.hEvent );
00125 
00126     if( !_overlapped.hEvent )
00127         EQERROR << "Can't create event for AIO notification: " 
00128                 << EQ_SOCKET_ERROR << std::endl;
00129 }
00130 
00131 void SocketConnection::_initAIOAccept()
00132 {
00133     _initAIORead();
00134     _overlappedAcceptData = malloc( 2*( sizeof( sockaddr_in ) + 16 ));
00135 }
00136 
00137 void SocketConnection::_exitAIOAccept()
00138 {
00139     if( _overlappedAcceptData )
00140     {
00141         free( _overlappedAcceptData );
00142         _overlappedAcceptData = 0;
00143     }
00144     
00145     _exitAIORead();
00146 }
00147 void SocketConnection::_exitAIORead()
00148 {
00149     if( _overlapped.hEvent )
00150     {
00151         CloseHandle( _overlapped.hEvent );
00152         _overlapped.hEvent = 0;
00153     }
00154 }
00155 
00156 //----------------------------------------------------------------------
00157 // accept
00158 //----------------------------------------------------------------------
00159 void SocketConnection::acceptNB()
00160 {
00161     EQASSERT( _state == STATE_LISTENING );
00162 
00163     // Create new accept socket
00164     const DWORD flags = _description->type == CONNECTIONTYPE_SDP ?
00165                             WSA_FLAG_OVERLAPPED | WSA_FLAG_SDP :
00166                             WSA_FLAG_OVERLAPPED;
00167 
00168     EQASSERT( _overlappedAcceptData );
00169     EQASSERT( _overlappedSocket == INVALID_SOCKET );
00170     _overlappedSocket = WSASocket( AF_INET, SOCK_STREAM, IPPROTO_TCP, 0, 0,
00171                                    flags );
00172 
00173     if( _overlappedSocket == INVALID_SOCKET )
00174     {
00175         EQERROR << "Could not create accept socket: " << EQ_SOCKET_ERROR
00176                 << ", closing listening socket" << std::endl;
00177         close();
00178         return;
00179     }
00180 
00181     const int on = 1;
00182     setsockopt( _overlappedSocket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
00183         reinterpret_cast<const char*>( &on ), sizeof( on ));
00184 
00185     // Start accept
00186     ResetEvent( _overlapped.hEvent );
00187     DWORD got;
00188     if( !AcceptEx( _readFD, _overlappedSocket, _overlappedAcceptData, 0,
00189                    sizeof( sockaddr_in ) + 16, sizeof( sockaddr_in ) + 16,
00190                    &got, &_overlapped ) &&
00191         GetLastError() != WSA_IO_PENDING )
00192     {
00193         EQERROR << "Could not start accept operation: " << EQ_SOCKET_ERROR 
00194                 << ", closing connection" << std::endl;
00195         close();
00196     }
00197 }
00198     
00199 ConnectionPtr SocketConnection::acceptSync()
00200 {
00201     CHECK_THREAD( _recvThread );
00202     if( _state != STATE_LISTENING )
00203         return 0;
00204 
00205     EQASSERT( _overlappedAcceptData );
00206     EQASSERT( _overlappedSocket != INVALID_SOCKET );
00207     if( _overlappedSocket == INVALID_SOCKET )
00208         return 0;
00209 
00210     // complete accept
00211     DWORD got   = 0;
00212     DWORD flags = 0;
00213     if( !WSAGetOverlappedResult( _readFD, &_overlapped, &got, TRUE, &flags ))
00214     {
00215         EQWARN << "Accept completion failed: " << EQ_SOCKET_ERROR 
00216                << ", closing socket" << std::endl;
00217         close();
00218         return 0;
00219     }
00220 
00221     sockaddr_in* local     = 0;
00222     sockaddr_in* remote    = 0;
00223     int          localLen  = 0;
00224     int          remoteLen = 0;
00225     GetAcceptExSockaddrs( _overlappedAcceptData, 0, sizeof( sockaddr_in ) + 16,
00226                           sizeof( sockaddr_in ) + 16, (sockaddr**)&local, 
00227                           &localLen, (sockaddr**)&remote, &remoteLen );
00228     _tuneSocket( _overlappedSocket );
00229 
00230     SocketConnection* newConnection = new SocketConnection(_description->type );
00231 
00232     newConnection->_readFD  = _overlappedSocket;
00233     newConnection->_writeFD = _overlappedSocket;
00234     newConnection->_initAIORead();
00235     _overlappedSocket       = INVALID_SOCKET;
00236 
00237     newConnection->_state                   = STATE_CONNECTED;
00238     newConnection->_description->bandwidth  = _description->bandwidth;
00239     newConnection->_description->TCPIP.port = ntohs( remote->sin_port );
00240     newConnection->_description->setHostname( inet_ntoa( remote->sin_addr ));
00241 
00242     EQINFO << "accepted connection from " << inet_ntoa( remote->sin_addr ) 
00243            << ":" << ntohs( remote->sin_port ) << std::endl;
00244     return newConnection;
00245 }
00246 
00247 //----------------------------------------------------------------------
00248 // read
00249 //----------------------------------------------------------------------
00250 void SocketConnection::readNB( void* buffer, const uint64_t bytes )
00251 {
00252     if( _state == STATE_CLOSED )
00253         return;
00254 
00255     WSABUF wsaBuffer = { EQ_MIN( bytes, 1048576 ),
00256                          reinterpret_cast< char* >( buffer ) };
00257     DWORD  got   = 0;
00258     DWORD  flags = 0;
00259 
00260     ResetEvent( _overlapped.hEvent );
00261     if( WSARecv( _readFD, &wsaBuffer, 1, &got, &flags, &_overlapped, 0 ) != 0 &&
00262         GetLastError() != WSA_IO_PENDING )
00263     {
00264         EQWARN << "Could not start overlapped receive: " << EQ_SOCKET_ERROR
00265                << ", closing connection" << std::endl;
00266         close();
00267     }
00268 }
00269 
00270 int64_t SocketConnection::readSync( void* buffer, const uint64_t bytes )
00271 {
00272     CHECK_THREAD( _recvThread );
00273 
00274     if( _readFD == INVALID_SOCKET )
00275     {
00276         EQERROR << "Invalid read handle" << std::endl;
00277         return -1;
00278     }
00279 
00280     DWORD got   = 0;
00281     DWORD flags = 0;
00282     if( !WSAGetOverlappedResult( _readFD, &_overlapped, &got, TRUE, &flags ))
00283     {
00284         if( GetLastError() == WSASYSCALLFAILURE ) // happens sometimes!?
00285             return 0;
00286 
00287         EQWARN << "Read complete failed: " << EQ_SOCKET_ERROR 
00288                << ", closing connection" << std::endl;
00289         close();
00290         return -1;
00291     }
00292 
00293     if( got == 0 )
00294     {
00295         EQWARN << "Read operation returned with nothing read"
00296                << ", closing connection." << std::endl;
00297         close();
00298         return -1;
00299     }
00300 
00301     return got;
00302 }
00303 
00304 int64_t SocketConnection::write( const void* buffer, const uint64_t bytes)
00305 {
00306     if( _writeFD == INVALID_SOCKET )
00307         return -1;
00308 
00309     DWORD  wrote;
00310     WSABUF wsaBuffer = 
00311         { 
00312             EQ_MIN( bytes, 1048576 ),
00313             const_cast<char*>( static_cast< const char* >( buffer )) 
00314         };
00315 
00316     while( true )
00317     {
00318         if( WSASend( _writeFD, &wsaBuffer, 1, &wrote, 0, 0, 0 ) ==  0 ) // ok
00319             return wrote;
00320 
00321         // error
00322         if( GetLastError( ) != WSAEWOULDBLOCK )
00323         {
00324             EQWARN << "Error during write: " << EQ_SOCKET_ERROR << std::endl;
00325             return -1;
00326         }
00327 
00328         // Buffer full - try again
00329 #if 1
00330         // Wait for writable socket
00331         fd_set set;
00332         FD_ZERO( &set );
00333         FD_SET( _writeFD, &set );
00334 
00335         const int result = select( _writeFD+1, 0, &set, 0, 0 );
00336         if( result <= 0 )
00337         {
00338             EQWARN << "Error during select: " << EQ_SOCKET_ERROR << std::endl;
00339             return -1;
00340         }
00341 #endif
00342     }
00343 
00344     EQUNREACHABLE;
00345     return -1;
00346 }
00347 }
00348 }
00349 #else
00350 #  error "File is only for WIN32 builds"
00351 #endif
Generated on Mon Aug 10 18:58:41 2009 for Equalizer 0.9 by  doxygen 1.5.8