connection.cpp

00001 
00002 /* Copyright (c) 2005-2009, Stefan Eilemann <eile@equalizergraphics.com> 
00003  *
00004  * This library is free software; you can redistribute it and/or modify it under
00005  * the terms of the GNU Lesser General Public License version 2.1 as published
00006  * by the Free Software Foundation.
00007  *  
00008  * This library is distributed in the hope that it will be useful, but WITHOUT
00009  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00010  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00011  * details.
00012  * 
00013  * You should have received a copy of the GNU Lesser General Public License
00014  * along with this library; if not, write to the Free Software Foundation, Inc.,
00015  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00016  */
00017 
00018 #include "connection.h"
00019 
00020 #include "connectionDescription.h"
00021 #include "connectionListener.h"
00022 #include "log.h"
00023 #include "node.h"
00024 #include "pipeConnection.h"
00025 #include "socketConnection.h"
00026 #include "namedPipeConnection.h"
00027 #include <errno.h>
00028 
00029 #ifdef WIN32
00030 #  define EQ_SOCKET_ERROR getErrorString( WSAGetLastError( ))
00031 #  include <malloc.h>
00032 #else
00033 #  define EQ_SOCKET_ERROR strerror( errno )
00034 #  include <alloca.h>
00035 #endif
00036 
00037 using namespace eq::base;
00038 using namespace std;
00039 
00040 namespace eq
00041 {
00042 namespace net
00043 {
00044 
00045 Connection::Connection()
00046         : _state( STATE_CLOSED )
00047         , _aioBuffer( 0 )
00048         , _aioBytes( 0 )
00049 {
00050     EQVERB << "New Connection @" << (void*)this << endl;
00051 }
00052 
00053 Connection::~Connection()
00054 {
00055     _state = STATE_CLOSED;
00056     EQVERB << "Delete Connection @" << (void*)this << endl;
00057 }
00058 
00059 ConnectionPtr Connection::create( ConnectionDescriptionPtr description )
00060 {
00061     ConnectionPtr connection;
00062     switch( description->type )
00063     {
00064         case CONNECTIONTYPE_TCPIP:
00065         case CONNECTIONTYPE_SDP:
00066             connection = new SocketConnection( description->type );
00067             break;
00068 
00069         case CONNECTIONTYPE_PIPE:
00070             connection = new PipeConnection();
00071             break;
00072             
00073         case CONNECTIONTYPE_NAMEDPIPE:
00074             connection = new NamedPipeConnection();
00075             break;
00076         default:
00077             EQWARN << "Connection type not implemented" << endl;
00078             return connection;
00079     }
00080 
00081     connection->setDescription( description );
00082     return connection;
00083 }
00084 
00085 void Connection::addListener( ConnectionListener* listener )
00086 {
00087     _listeners.push_back( listener );
00088 }
00089 
00090 void Connection::removeListener( ConnectionListener* listener )
00091 {
00092     vector< ConnectionListener* >::iterator i = find( _listeners.begin(),
00093                                                       _listeners.end(),
00094                                                       listener );
00095     if( i != _listeners.end( ))
00096         _listeners.erase( i );
00097 }
00098 
00099 void Connection::_fireStateChanged()
00100 {
00101     for( vector<ConnectionListener*>::const_iterator i= _listeners.begin();
00102          i != _listeners.end(); ++i )
00103 
00104         (*i)->notifyStateChanged( this );
00105 }
00106 
00107 
00108 //----------------------------------------------------------------------
00109 // read
00110 //----------------------------------------------------------------------
00111 void Connection::recvNB( void* buffer, const uint64_t bytes )
00112 {
00113     EQASSERT( !_aioBuffer );
00114     EQASSERT( !_aioBytes );
00115     EQASSERT( buffer );
00116     EQASSERT( bytes );
00117 
00118     _aioBuffer = buffer;
00119     _aioBytes  = bytes;
00120     readNB( buffer, bytes );
00121 }
00122 
00123 bool Connection::recvSync( void** outBuffer, uint64_t* outBytes )
00124 {
00125     EQASSERT( _aioBuffer );
00126     EQASSERT( _aioBytes );
00127 
00128     if( outBuffer )
00129         *outBuffer = _aioBuffer;
00130     if( outBytes )
00131         *outBytes = _aioBytes;
00132 
00133     void* buffer( _aioBuffer );
00134     const uint64_t bytes( _aioBytes );
00135     _aioBuffer = 0;
00136     _aioBytes  = 0;
00137 
00138     if( _state != STATE_CONNECTED || !buffer || !bytes )
00139         return false;
00140 
00141     uint8_t* ptr = static_cast< uint8_t* >( buffer );
00142     uint64_t bytesLeft = bytes;
00143 
00144     while( bytesLeft )
00145     {
00146         const int64_t got = readSync( ptr, bytesLeft );
00147 
00148         if( got < 0 ) // error
00149         {
00150             if( outBytes )
00151                 *outBytes -= bytesLeft;
00152             if( bytes == bytesLeft )
00153                 EQINFO << "Read on dead connection" << endl;
00154             else
00155                 EQERROR << "Error during read after " << bytes - bytesLeft
00156                         << " bytes on " << typeid(*this).name() << endl;
00157             return false;
00158         }
00159         else if( got == 0 )
00160         {
00161             // ConnectionSet::select may report data on an 'empty' connection.
00162             // If we have nothing read so far, we have hit this case.
00163             if( bytes == bytesLeft )
00164             {
00165                 if( outBytes )
00166                     *outBytes = 0;
00167                 return false;
00168             }
00169             EQVERB << "Zero bytes read" << endl;
00170         }
00171 
00172         if( bytesLeft > static_cast< uint64_t >( got )) // partial read
00173         {
00174             ptr += got;
00175             bytesLeft -= got;
00176 
00177             readNB( ptr, bytesLeft );
00178         }
00179         else
00180         {
00181             EQASSERT( static_cast< uint64_t >( got ) == bytesLeft );
00182             bytesLeft = 0;
00183         }
00184     }
00185 
00186     return true;
00187 }
00188 
00189 //----------------------------------------------------------------------
00190 // write
00191 //----------------------------------------------------------------------
00192 bool Connection::send( const void* buffer, const uint64_t bytes, 
00193                        const bool isLocked )
00194 {
00195     if( _state != STATE_CONNECTED )
00196         return false;
00197 
00198     // possible OPT: We need to lock here to guarantee an atomic transmission of
00199     // the buffer. Possible improvements are:
00200     // 1) Disassemble buffer into 'small enough' pieces and use a header to
00201     //    reassemble correctly on the other side (aka reliable UDP)
00202     // 2) Introduce a send thread with a thread-safe task queue
00203     ScopedMutex mutex( isLocked ? 0 : &_sendLock );
00204 
00205     const uint8_t* ptr = static_cast< const uint8_t* >( buffer );
00206     uint64_t bytesLeft = bytes;
00207     while( bytesLeft )
00208     {
00209         const int64_t wrote = this->write( ptr, bytesLeft );
00210 
00211         if( wrote == -1 ) // error
00212         {
00213             EQERROR << "Error during write after " << bytes - bytesLeft 
00214                     << " bytes" << endl;
00215             return false;
00216         }
00217         else if( wrote == 0 )
00218             EQWARN << "Zero bytes write" << endl;
00219 
00220         bytesLeft -= wrote;
00221         ptr += wrote;
00222     }
00223     return true;
00224 }
00225 
00226 bool Connection::send( Packet& packet, const void* data,
00227                        const uint64_t dataSize )
00228 {
00229     if( dataSize == 0 )
00230         return send( packet );
00231 
00232     if( dataSize <= 8 ) // fits in existing packet
00233     {
00234         memcpy( (char*)(&packet) + packet.size-8, data, dataSize );
00235         return send( packet );
00236     }
00237     // else
00238 
00239     const uint64_t headerSize  = packet.size - 8;
00240     const uint64_t size        = headerSize + dataSize;
00241     if( size > ASSEMBLE_THRESHOLD )
00242     {
00243         // OPT: lock the connection and use two send() to avoid big memcpy
00244         packet.size = size;
00245 
00246         lockSend();
00247         const bool ret = ( send( &packet, headerSize, true ) &&
00248                            send( data,    dataSize,   true ));
00249         unlockSend();
00250         return ret;
00251     }
00252     // else
00253 
00254     char*          buffer = (char*)alloca( size );
00255 
00256     memcpy( buffer,              &packet, headerSize );
00257     memcpy( buffer + headerSize, data,    dataSize );
00258 
00259     ((Packet*)buffer)->size = size;
00260     return send( buffer, size );
00261 }
00262 
00263 bool Connection::send( const ConnectionVector& connections,
00264                        const Packet& packet, const bool isLocked )
00265 {
00266     if( connections.empty( ))
00267         return true;
00268 
00269     for( ConnectionVector::const_iterator i= connections.begin(); 
00270          i<connections.end(); ++i )
00271     {        
00272         ConnectionPtr connection = *i;
00273         if( !connection->send( &packet, packet.size, isLocked ))
00274             return false;
00275     }
00276     return true;
00277 }
00278 
00279 bool Connection::send( const ConnectionVector& connections, Packet& packet,
00280                        const void* data, const uint64_t dataSize,
00281                        const bool isLocked )
00282 {
00283     if( connections.empty( ))
00284         return true;
00285 
00286     if( dataSize <= 8 ) // fits in existing packet
00287     {
00288         if( dataSize != 0 )
00289             memcpy( (char*)(&packet) + packet.size-8, data, dataSize );
00290         return send( connections, packet, isLocked );
00291     }
00292 
00293     const uint64_t headerSize  = packet.size - 8;
00294     const uint64_t size        = headerSize + dataSize;
00295 
00296     if( size > ASSEMBLE_THRESHOLD )
00297     {
00298         // OPT: lock the connection and use two send() to avoid big memcpy
00299         packet.size = size;
00300 
00301         for( ConnectionVector::const_iterator i= connections.begin(); 
00302              i<connections.end(); ++i )
00303         {        
00304             ConnectionPtr connection = *i;
00305 
00306             if( !isLocked )
00307                 connection->lockSend();
00308             const bool ok = (connection->send( &packet, headerSize, true ) &&
00309                              connection->send( data, dataSize, true ));
00310             if( !isLocked )
00311                 connection->unlockSend();
00312             if( !ok )
00313                 return false;
00314         }
00315         return true;
00316     }
00317 
00318     char*          buffer = (char*)alloca( size );
00319     memcpy( buffer, &packet, packet.size-8 );
00320     memcpy( buffer + packet.size-8, data, dataSize );
00321 
00322     ((Packet*)buffer)->size = size;
00323 
00324     for( ConnectionVector::const_iterator i= connections.begin(); 
00325          i<connections.end(); ++i )
00326     {        
00327         ConnectionPtr connection = *i;
00328         if( !connection->send( buffer, size, isLocked ))
00329             return false;
00330     }
00331 
00332     return true;
00333 }
00334 
00335 
00336 ConnectionDescriptionPtr Connection::getDescription() const
00337 {
00338     return _description;
00339 }
00340 
00341 void Connection::setDescription( ConnectionDescriptionPtr description )
00342 {
00343     EQASSERT( description.isValid( ));
00344     EQASSERTINFO( _description->type == description->type,
00345                   "Wrong connection type in description" );
00346     _description = description;
00347 
00348     if( _description->bandwidth > 0 )
00349         return;
00350 
00351     switch( description->type )
00352     {
00353         case CONNECTIONTYPE_NAMEDPIPE:
00354             _description->bandwidth = 768000;
00355             break;
00356         case CONNECTIONTYPE_SDP:
00357             _description->bandwidth = 819200;
00358             break;
00359         case CONNECTIONTYPE_TCPIP:
00360         default:
00361             _description->bandwidth = 102400;
00362             break;
00363     }
00364 }
00365 
00366 std::ostream& operator << ( std::ostream& os, const Connection* connection )
00367 {
00368     if( !connection )
00369     {
00370         os << "NULL connection";
00371         return os;
00372     }
00373     
00374     Connection::State        state = connection->getState();
00375     ConnectionDescriptionPtr desc  = connection->getDescription();
00376 
00377     os << "Connection " << (void*)connection << " type "
00378        << typeid(*connection).name() << " state "
00379        << ( state == Connection::STATE_CLOSED     ? "closed" :
00380             state == Connection::STATE_CONNECTING ? "connecting" :
00381             state == Connection::STATE_CONNECTED  ? "connected" :
00382             state == Connection::STATE_LISTENING  ? "listening" :
00383             "unknown state" );
00384     if( desc.isValid( ))
00385         os << " description " << desc->toString();
00386     
00387     return os;
00388 }
00389 }
00390 }
Generated on Mon Aug 10 18:58:32 2009 for Equalizer 0.9 by  doxygen 1.5.8