lib/client/client.cpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "client.h"
00019
00020 #include "commandQueue.h"
00021 #include "commands.h"
00022 #include "global.h"
00023 #include "init.h"
00024 #include "nodeFactory.h"
00025 #include "packets.h"
00026 #include "server.h"
00027
00028 #include <eq/net/command.h>
00029 #include <eq/net/connection.h>
00030 #include <eq/base/dso.h>
00031
00032 namespace eq
00033 {
00035 typedef net::CommandFunc<Client> ClientFunc;
00036
00037 static net::ConnectionPtr _startLocalServer();
00038 static void _joinLocalServer();
00039
00040 typedef net::ConnectionPtr (*eqsStartLocalServer_t)( const std::string& file );
00041 typedef void (*eqsJoinLocalServer_t)();
00044 Client::Client()
00045 : _nodeThreadQueue( 0 )
00046 , _running( false )
00047 {
00048 EQINFO << "New client at " << (void*)this << std::endl;
00049 }
00050
00051 Client::~Client()
00052 {
00053 EQINFO << "Delete client at " << (void*)this << std::endl;
00054 stopListening();
00055 }
00056
00057 bool Client::listen()
00058 {
00059 EQASSERT( !_nodeThreadQueue );
00060 _nodeThreadQueue = new CommandQueue;
00061
00062 registerCommand( CMD_CLIENT_EXIT, ClientFunc( this, &Client::_cmdExit ),
00063 _nodeThreadQueue );
00064
00065 return net::Node::listen();
00066 }
00067
00068 void Client::setWindowSystem( const WindowSystem windowSystem )
00069 {
00070
00071 if( _nodeThreadQueue->getWindowSystem() == WINDOW_SYSTEM_NONE )
00072 {
00073 _nodeThreadQueue->setWindowSystem( windowSystem );
00074 EQINFO << "Client command message pump set up for " << windowSystem
00075 << std::endl;
00076 }
00077 else if( _nodeThreadQueue->getWindowSystem() != windowSystem )
00078 EQWARN << "Can't switch to window system " << windowSystem
00079 << ", already using " << _nodeThreadQueue->getWindowSystem()
00080 << std::endl;
00081 }
00082
00083 bool Client::stopListening()
00084 {
00085 if( !net::Node::stopListening( ))
00086 return false;
00087
00088 delete _nodeThreadQueue;
00089 _nodeThreadQueue = 0;
00090 return true;
00091 }
00092
00093 bool Client::connectServer( ServerPtr server )
00094 {
00095 if( server->isConnected( ))
00096 return true;
00097 bool explicitAddress = true;
00098
00099 if( server->getConnectionDescriptions().empty( ))
00100 {
00101 net::ConnectionDescriptionPtr connDesc =
00102 new net::ConnectionDescription;
00103 connDesc->TCPIP.port = EQ_DEFAULT_PORT;
00104
00105 const std::string globalServer = Global::getServer();
00106 const char* envServer = getenv( "EQ_SERVER" );
00107 std::string address = !globalServer.empty() ? globalServer :
00108 envServer ? envServer : "localhost";
00109
00110 if( !connDesc->fromString( address ))
00111 EQWARN << "Can't parse server address " << address << std::endl;
00112 EQASSERT( address.empty( ));
00113 EQINFO << "Connecting to " << connDesc->toString() << std::endl;
00114
00115 server->addConnectionDescription( connDesc );
00116
00117 if( globalServer.empty() && !envServer )
00118 explicitAddress = false;
00119 }
00120
00121 if( connect( net::NodePtr( server.get( )) ))
00122 {
00123 server->setClient( this );
00124 return true;
00125 }
00126
00127 if( explicitAddress )
00128 return false;
00129
00130
00131 net::ConnectionPtr connection = _startLocalServer();
00132 if( !connection )
00133 return false;
00134
00135 if( connect( server.get(), connection ))
00136 {
00137 server->setClient( this );
00138 server->_localServer = true;
00139 return true;
00140 }
00141
00142
00143 return false;
00144 }
00145
00147 namespace
00148 {
00149 base::DSO _libeqserver;
00150 }
00151
00152 net::ConnectionPtr _startLocalServer()
00153 {
00154 if( !_libeqserver.open(
00155 #ifdef WIN32_VC
00156 "EqualizerServer.dll"
00157 #elif defined (WIN32)
00158 "libeqserver.dll"
00159 #elif defined (Darwin)
00160 "libeqserver.dylib"
00161 #else
00162 "libeqserver.so"
00163 #endif
00164 ))
00165 {
00166 EQWARN << "Can't open Equalizer server library" << std::endl;
00167 return 0;
00168 }
00169
00170 eqsStartLocalServer_t eqsStartLocalServer = (eqsStartLocalServer_t)
00171 _libeqserver.getFunctionPointer( "eqsStartLocalServer" );
00172
00173 if( !eqsStartLocalServer )
00174 {
00175 EQWARN << "Can't find server entry function eqsStartLocalServer"
00176 << std::endl;
00177 return 0;
00178 }
00179
00180 return eqsStartLocalServer( Global::getConfigFile( ));
00181 }
00182
00183 static void _joinLocalServer()
00184 {
00185 eqsJoinLocalServer_t eqsJoinLocalServer = (eqsJoinLocalServer_t)
00186 _libeqserver.getFunctionPointer( "eqsJoinLocalServer" );
00187
00188 if( !eqsJoinLocalServer )
00189 {
00190 EQWARN << "Can't find server entry function eqsJoinLocalServer"
00191 << std::endl;
00192 return;
00193 }
00194
00195 eqsJoinLocalServer();
00196 _libeqserver.close();
00197 }
00200 bool Client::disconnectServer( ServerPtr server )
00201 {
00202 if( !server->isConnected( ))
00203 {
00204 EQWARN << "Trying to disconnect unconnected server" << std::endl;
00205 return false;
00206 }
00207
00208
00209 if( server->_localServer )
00210 {
00211 server->shutdown();
00212 _joinLocalServer();
00213 }
00214
00215 server->setClient( 0 );
00216 server->_localServer = false;
00217
00218 const int success = disconnect( server.get( ));
00219 if( !success )
00220 EQWARN << "Server disconnect failed" << std::endl;
00221
00222
00223 _nodeThreadQueue->flush();
00224 return success;
00225 }
00226
00227
00228 net::NodePtr Client::createNode( const uint32_t type )
00229 {
00230 switch( type )
00231 {
00232 case TYPE_EQ_SERVER:
00233 {
00234 Server* server = new Server;
00235 server->setClient( this );
00236 return server;
00237 }
00238
00239 default:
00240 return net::Node::createNode( type );
00241 }
00242 }
00243
00244 bool Client::clientLoop()
00245 {
00246 EQINFO << "Entered client loop" << std::endl;
00247
00248 _running = true;
00249 while( _running )
00250 processCommand();
00251
00252
00253 _nodeThreadQueue->flush();
00254 EQASSERT( !hasSessions( ));
00255
00256 return true;
00257 }
00258
00259 bool Client::exitClient()
00260 {
00261 return (net::Node::exitClient() & eq::exit( ));
00262 }
00263
00264 bool Client::hasCommands()
00265 {
00266 return !_nodeThreadQueue->isEmpty();
00267 }
00268
00269 void Client::processCommand()
00270 {
00271 net::Command* command = _nodeThreadQueue->pop();
00272 if( !command )
00273 return;
00274
00275 switch( invokeCommand( *command ))
00276 {
00277 case net::COMMAND_HANDLED:
00278 case net::COMMAND_DISCARD:
00279 break;
00280
00281 case net::COMMAND_ERROR:
00282 EQABORT( "Error handling command packet" );
00283 break;
00284 }
00285 command->release();
00286 }
00287
00288 bool Client::dispatchCommand( net::Command& command )
00289 {
00290 EQVERB << "dispatchCommand " << command << std::endl;
00291
00292 switch( command->datatype )
00293 {
00294 case DATATYPE_EQ_CLIENT:
00295 return net::Dispatcher::dispatchCommand( command );
00296
00297 case DATATYPE_EQ_SERVER:
00298 {
00299 net::NodePtr node = command.getNode();
00300
00301 EQASSERT( dynamic_cast< Server* >( node.get( )) );
00302 ServerPtr server = static_cast< Server* >( node.get( ));
00303
00304 return server->net::Dispatcher::dispatchCommand( command );
00305 }
00306
00307 default:
00308 return net::Node::dispatchCommand( command );
00309 }
00310 }
00311
00312 net::CommandResult Client::invokeCommand( net::Command& command )
00313 {
00314 EQVERB << "invokeCommand " << command << std::endl;
00315
00316 switch( command->datatype )
00317 {
00318 case DATATYPE_EQ_CLIENT:
00319 return net::Dispatcher::invokeCommand( command );
00320
00321 case DATATYPE_EQ_SERVER:
00322 {
00323 net::NodePtr node = command.getNode();
00324
00325 EQASSERT( dynamic_cast<Server*>( node.get( )) );
00326 ServerPtr server = static_cast<Server*>( node.get( ));
00327
00328 return server->net::Dispatcher::invokeCommand( command );
00329 }
00330 default:
00331 return net::Node::invokeCommand( command );
00332 }
00333 }
00334
00335 net::CommandResult Client::_cmdExit( net::Command& command )
00336 {
00337 _running = false;
00338
00339 command.getLocalNode()->disconnect( command.getNode( ));
00340 return net::COMMAND_HANDLED;
00341 }
00342 }