00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 using namespace eq::base;
00021 #define EQ_PIPE_BUFFER_SIZE 515072
00022 #define EQ_READ_BUFFER_SIZE 257536
00023 #define EQ_WRITE_BUFFER_SIZE 257536
00024
00025 #ifdef WIN32
00026 namespace eq
00027 {
00028 namespace net
00029 {
00030
00031 NamedPipeConnection::NamedPipeConnection()
00032 {
00033 memset( &_read, 0, sizeof( _read ));
00034 memset( &_write, 0, sizeof( _write ));
00035
00036 _description = new ConnectionDescription;
00037 _description->type = CONNECTIONTYPE_NAMEDPIPE;
00038 _description->bandwidth = 768000;
00039 }
00040
00041 NamedPipeConnection::~NamedPipeConnection()
00042 {
00043 close();
00044 }
00045
00046 std::string NamedPipeConnection::_getFilename() const
00047 {
00048 const std::string& filename = _description->getFilename();
00049 if( filename.find( "\\\\.\\pipe\\" ) == 0 )
00050 return filename;
00051
00052 return "\\\\.\\pipe\\" + filename;
00053 }
00054
00055
00056
00057
00058 bool NamedPipeConnection::connect()
00059 {
00060 EQASSERT( _description->type == CONNECTIONTYPE_NAMEDPIPE );
00061
00062 if( _state != STATE_CLOSED )
00063 return false;
00064
00065 _state = STATE_CONNECTING;
00066 _fireStateChanged();
00067
00068 if( !_createNamedPipe( ))
00069 return false;
00070
00071 _initAIORead();
00072 _state = STATE_CONNECTED;
00073 _fireStateChanged();
00074
00075 return true;
00076 }
00077
00078 void NamedPipeConnection::close()
00079 {
00080 if( !(_state == STATE_CONNECTED || _state == STATE_LISTENING ))
00081 return;
00082
00083 EQASSERT( _readFD > 0 );
00084
00085 if( isListening( ))
00086 {
00087 _exitAIOAccept();
00088
00089 if( !DisconnectNamedPipe( _readFD ))
00090 EQERROR << "Could not disconnect named pipe: " << EQ_PIPE_ERROR
00091 << std::endl;
00092 }
00093 else
00094 {
00095 _exitAIORead();
00096 if( !CloseHandle( _readFD ))
00097 EQERROR << "Could not close named pipe: " << EQ_PIPE_ERROR
00098 << std::endl;
00099 }
00100
00101 _readFD = INVALID_HANDLE_VALUE;
00102 _state = STATE_CLOSED;
00103 _fireStateChanged();
00104 }
00105
00106 bool NamedPipeConnection::_createNamedPipe()
00107 {
00108 const std::string filename = _getFilename();
00109 if ( !WaitNamedPipe( filename.c_str(), 20000 ))
00110 {
00111 EQERROR << "Can't create named pipe: " << EQ_PIPE_ERROR << std::endl;
00112 return false;
00113 }
00114
00115 _readFD = CreateFile(
00116 filename.c_str(),
00117 GENERIC_READ |
00118 GENERIC_WRITE,
00119 0,
00120 0,
00121 OPEN_EXISTING,
00122 FILE_FLAG_OVERLAPPED,
00123 0);
00124
00125 if( _readFD != INVALID_HANDLE_VALUE )
00126 return true;
00127
00128 if( GetLastError() != ERROR_PIPE_BUSY )
00129 {
00130 EQERROR << "Can't create named pipe: "
00131 << EQ_PIPE_ERROR << std::endl;
00132 return false;
00133
00134 }
00135
00136 return _readFD != INVALID_HANDLE_VALUE;
00137 }
00138
00139
00140
00141
00142 bool NamedPipeConnection::listen()
00143 {
00144 EQASSERT( _description->type == CONNECTIONTYPE_NAMEDPIPE );
00145
00146 if( _state != STATE_CLOSED )
00147 return false;
00148
00149 _state = STATE_CONNECTING;
00150 _fireStateChanged();
00151
00152 _initAIOAccept();
00153 _state = STATE_LISTENING;
00154 _fireStateChanged();
00155 return true;
00156 }
00157
00158
00159 bool NamedPipeConnection::_connectToNewClient( HANDLE hPipe )
00160 {
00161
00162 const bool fConnected = ConnectNamedPipe( hPipe, &_read );
00163
00164 EQASSERT( !fConnected );
00165
00166 switch( GetLastError() )
00167 {
00168
00169 case ERROR_IO_PENDING:
00170 return true;
00171
00172
00173 case ERROR_PIPE_CONNECTED:
00174 if( SetEvent( _read.hEvent ) )
00175 return true;
00176
00177
00178 default:
00179 {
00180 EQWARN << "ConnectNamedPipe failed : " << EQ_PIPE_ERROR << std::endl;
00181 return false;
00182 }
00183 }
00184 }
00185
00186
00187
00188
00189 void NamedPipeConnection::_initAIORead()
00190 {
00191 _read.hEvent = CreateEvent( 0, FALSE, FALSE, 0 );
00192 EQASSERT( _read.hEvent );
00193 _write.hEvent = CreateEvent( 0, FALSE, FALSE, 0 );
00194 EQASSERT( _write.hEvent );
00195
00196 if( !_read.hEvent || !_write.hEvent )
00197 EQERROR << "Can't create events for AIO notification: "
00198 << EQ_PIPE_ERROR << std::endl;
00199 }
00200
00201 void NamedPipeConnection::_initAIOAccept()
00202 {
00203 _initAIORead();
00204 }
00205
00206 void NamedPipeConnection::_exitAIOAccept()
00207 {
00208 _exitAIORead();
00209 }
00210 void NamedPipeConnection::_exitAIORead()
00211 {
00212 if( _read.hEvent )
00213 {
00214 CloseHandle( _read.hEvent );
00215 _read.hEvent = 0;
00216 }
00217 if( _write.hEvent )
00218 {
00219 CloseHandle( _write.hEvent );
00220 _write.hEvent = 0;
00221 }
00222 }
00223
00224
00225
00226
00227 void NamedPipeConnection::acceptNB()
00228 {
00229 EQASSERT( _state == STATE_LISTENING );
00230 ResetEvent( _read.hEvent );
00231
00232
00233 #if 0
00234 SECURITY_ATTRIBUTES sa;
00235 sa.lpSecurityDescriptor =
00236 ( PSECURITY_DESCRIPTOR )malloc( SECURITY_DESCRIPTOR_MIN_LENGTH );
00237 InitializeSecurityDescriptor( sa.lpSecurityDescriptor,
00238 SECURITY_DESCRIPTOR_REVISION );
00239
00240 SetSecurityDescriptorDacl(sa.lpSecurityDescriptor, true, 0, false);
00241 sa.nLength = sizeof(sa);
00242 sa.bInheritHandle = TRUE;
00243 #endif
00244
00245
00246 const std::string filename = _getFilename();
00247 _readFD = CreateNamedPipe(
00248 filename.c_str(),
00249 PIPE_ACCESS_DUPLEX |
00250 FILE_FLAG_OVERLAPPED,
00251 PIPE_TYPE_BYTE |
00252 PIPE_READMODE_BYTE |
00253 PIPE_WAIT,
00254 PIPE_UNLIMITED_INSTANCES,
00255 EQ_PIPE_BUFFER_SIZE,
00256 EQ_PIPE_BUFFER_SIZE,
00257 0,
00258 0 );
00259
00260 if ( _readFD == INVALID_HANDLE_VALUE )
00261 {
00262 EQERROR << "Could not create named pipe: "
00263 << EQ_PIPE_ERROR << " file : " << filename << std::endl;
00264 close();
00265 return;
00266 }
00267
00268 _connectToNewClient( _readFD );
00269 }
00270
00271 ConnectionPtr NamedPipeConnection::acceptSync()
00272 {
00273 CHECK_THREAD( _recvThread );
00274 if( _state != STATE_LISTENING )
00275 return 0;
00276
00277
00278 DWORD got = 0;
00279 DWORD flags = 0;
00280 if( !GetOverlappedResult( _readFD, &_read, &got, TRUE ))
00281 {
00282 if (GetLastError() == ERROR_PIPE_CONNECTED)
00283 {
00284 return 0;
00285 }
00286 EQWARN << "Accept completion failed: " << EQ_PIPE_ERROR
00287 << ", closing named pipe" << std::endl;
00288
00289 close();
00290 return 0;
00291 }
00292
00293
00294 NamedPipeConnection* newConnection = new NamedPipeConnection;
00295 ConnectionPtr conn( newConnection );
00296
00297 newConnection->setDescription( _description );
00298 newConnection->_readFD = _readFD;
00299 newConnection->_state = STATE_CONNECTED;
00300 newConnection->_initAIORead();
00301
00302 newConnection->_state = STATE_CONNECTED;
00303 _readFD = INVALID_HANDLE_VALUE;
00304
00305 EQINFO << "accepted connection" << std::endl;
00306 return conn;
00307 }
00308
00309
00310
00311
00312 void NamedPipeConnection::readNB( void* buffer, const uint64_t bytes )
00313 {
00314 if( _state == STATE_CLOSED )
00315 return;
00316
00317 ResetEvent( _read.hEvent );
00318 DWORD use = EQ_MIN( bytes, EQ_READ_BUFFER_SIZE );
00319
00320 if( !ReadFile( _readFD, buffer, use, 0, &_read ) &&
00321 GetLastError() != ERROR_IO_PENDING )
00322 {
00323 EQWARN << "Could not start overlapped receive: " << EQ_PIPE_ERROR
00324 << ", closing connection" << std::endl;
00325 close();
00326 }
00327 }
00328
00329 int64_t NamedPipeConnection::readSync( void* buffer, const uint64_t bytes )
00330 {
00331 CHECK_THREAD( _recvThread );
00332
00333 if( _readFD == INVALID_HANDLE_VALUE )
00334 {
00335 EQERROR << "Invalid read handle" << std::endl;
00336 return -1;
00337 }
00338
00339 DWORD got = 0;
00340 if( !GetOverlappedResult( _readFD, &_read, &got, true ))
00341 {
00342 if( GetLastError() == ERROR_PIPE_CONNECTED )
00343 {
00344 return 0;
00345 }
00346
00347 EQWARN << "Read complete failed: " << EQ_PIPE_ERROR
00348 << ", closing connection" << std::endl;
00349 close();
00350 return 0;
00351
00352 }
00353
00354 return got;
00355 }
00356
00357 int64_t NamedPipeConnection::write( const void* buffer, const uint64_t bytes )
00358 {
00359 if( _readFD == INVALID_HANDLE_VALUE )
00360 return -1;
00361
00362 DWORD wrote;
00363 DWORD use = EQ_MIN( bytes, EQ_WRITE_BUFFER_SIZE );
00364
00365 ResetEvent( _write.hEvent );
00366 if( !WriteFile( _readFD, buffer, use, &wrote, &_write ) &&
00367 GetLastError() != ERROR_IO_PENDING )
00368 {
00369 EQWARN << "Could not start write: " << EQ_PIPE_ERROR
00370 << ", closing connection" << std::endl;
00371 close();
00372 }
00373
00374 DWORD got = 0;
00375 if( !GetOverlappedResult( _readFD, &_write, &got, true ))
00376 {
00377 if( GetLastError() == ERROR_PIPE_CONNECTED )
00378 {
00379 return 0;
00380 }
00381
00382 EQWARN << "Write complete failed: " << EQ_PIPE_ERROR
00383 << ", closing connection" << std::endl;
00384 close();
00385 return -1;
00386 }
00387
00388 return got;
00389 }
00390 }
00391 }
00392 #else
00393 # error "File is only for WIN32 builds"
00394 #endif