server/server.cpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "server.h"
00019
00020 #include "channel.h"
00021 #include "compound.h"
00022 #include "config.h"
00023 #include "global.h"
00024 #include "loader.h"
00025 #include "node.h"
00026 #include "pipe.h"
00027 #include "serverVisitor.h"
00028 #include "window.h"
00029
00030 #include <eq/base/refPtr.h>
00031 #include <eq/net/command.h>
00032 #include <eq/net/connectionDescription.h>
00033 #include <eq/net/init.h>
00034 #include <eq/net/node.h>
00035 #include <eq/client/packets.h>
00036
00037 #include <sstream>
00038
00039 namespace eq
00040 {
00041 namespace server
00042 {
00043 typedef net::CommandFunc<Server> ServerFunc;
00044
00045 Server::Server()
00046 : _configID(0)
00047 {
00048 registerCommand( eq::CMD_SERVER_CHOOSE_CONFIG,
00049 ServerFunc( this, &Server::_cmdChooseConfig ),
00050 &_serverThreadQueue );
00051 registerCommand( eq::CMD_SERVER_USE_CONFIG,
00052 ServerFunc( this, &Server::_cmdUseConfig ),
00053 &_serverThreadQueue );
00054 registerCommand( eq::CMD_SERVER_RELEASE_CONFIG,
00055 ServerFunc( this, &Server::_cmdReleaseConfig ),
00056 &_serverThreadQueue );
00057 registerCommand( eq::CMD_SERVER_SHUTDOWN,
00058 ServerFunc( this, &Server::_cmdShutdown ),
00059 &_serverThreadQueue );
00060 EQINFO << "New server @" << (void*)this << std::endl;
00061 }
00062
00063 Server::~Server()
00064 {
00065 for( ConfigVector::const_iterator i = _configs.begin();
00066 i != _configs.end(); ++i )
00067 {
00068 Config* config = *i;
00069
00070 config->_server = 0;
00071 delete config;
00072 }
00073
00074 _configs.clear();
00075 }
00076
00077 namespace
00078 {
00079 template< class C, class V >
00080 VisitorResult _accept( C* server, V& visitor )
00081 {
00082 VisitorResult result = visitor.visitPre( server );
00083 if( result != TRAVERSE_CONTINUE )
00084 return result;
00085
00086 const ConfigVector& configs = server->getConfigs();
00087 for( ConfigVector::const_iterator i = configs.begin();
00088 i != configs.end(); ++i )
00089 {
00090 switch( (*i)->accept( visitor ))
00091 {
00092 case TRAVERSE_TERMINATE:
00093 return TRAVERSE_TERMINATE;
00094
00095 case TRAVERSE_PRUNE:
00096 result = TRAVERSE_PRUNE;
00097 break;
00098
00099 case TRAVERSE_CONTINUE:
00100 default:
00101 break;
00102 }
00103 }
00104
00105 switch( visitor.visitPost( server ))
00106 {
00107 case TRAVERSE_TERMINATE:
00108 return TRAVERSE_TERMINATE;
00109
00110 case TRAVERSE_PRUNE:
00111 return TRAVERSE_PRUNE;
00112
00113 case TRAVERSE_CONTINUE:
00114 default:
00115 break;
00116 }
00117
00118 return result;
00119 }
00120 }
00121
00122 VisitorResult Server::accept( ServerVisitor& visitor )
00123 {
00124 return _accept( this, visitor );
00125 }
00126
00127 VisitorResult Server::accept( ConstServerVisitor& visitor ) const
00128 {
00129 return _accept( this, visitor );
00130 }
00131
00132 bool Server::run()
00133 {
00134 EQASSERT( getState() == net::Node::STATE_LISTENING );
00135
00136 if( _configs.empty( ))
00137 {
00138 EQERROR << "No configurations loaded" << std::endl;
00139 return false;
00140 }
00141
00142 EQINFO << base::disableFlush << "Running server: " << std::endl
00143 << base::indent << Global::instance() << this << base::exdent
00144 << base::enableFlush;
00145
00146 _handleCommands();
00147 return true;
00148 }
00149
00150 void Server::addConfig( Config* config )
00151 {
00152 config->_server = this;
00153 _configs.push_back( config );
00154 }
00155
00156 void Server::registerConfig( Config* config )
00157 {
00158 if( config->getName().empty( ))
00159 {
00160 std::ostringstream stringStream;
00161 stringStream << "EQ_CONFIG_" << (++_configID);
00162 config->setName( stringStream.str( ));
00163 }
00164
00165 EQCHECK( registerSession( config ));
00166 }
00167
00168
00169
00170
00171 bool Server::dispatchCommand( net::Command& command )
00172 {
00173 switch( command->datatype )
00174 {
00175 case eq::DATATYPE_EQ_SERVER:
00176 return net::Dispatcher::dispatchCommand( command );
00177
00178 default:
00179 return net::Node::dispatchCommand( command );
00180 }
00181 }
00182
00183 net::CommandResult Server::invokeCommand( net::Command& command )
00184 {
00185 switch( command->datatype )
00186 {
00187 case eq::DATATYPE_EQ_SERVER:
00188 return net::Dispatcher::invokeCommand( command );
00189
00190 default:
00191 return net::Node::invokeCommand( command );
00192 }
00193 }
00194
00195 void Server::_handleCommands()
00196 {
00197 _running = true;
00198 while( _running )
00199 {
00200 net::Command* command = _serverThreadQueue.pop();
00201
00202 switch( invokeCommand( *command ))
00203 {
00204 case net::COMMAND_HANDLED:
00205 case net::COMMAND_DISCARD:
00206 break;
00207
00208 case net::COMMAND_ERROR:
00209 EQABORT( "Error handling command " << command );
00210 break;
00211 default:
00212 EQABORT( "Unknown command result" );
00213 break;
00214 }
00215
00216 command->release();
00217 }
00218 _serverThreadQueue.flush();
00219 }
00220
00221 net::CommandResult Server::_cmdChooseConfig( net::Command& command )
00222 {
00223 const eq::ServerChooseConfigPacket* packet =
00224 command.getPacket<eq::ServerChooseConfigPacket>();
00225 EQINFO << "Handle choose config " << packet << std::endl;
00226
00227
00228 Config* config = _configs.empty() ? 0 : _configs[0];
00229
00230 eq::ServerChooseConfigReplyPacket reply( packet );
00231 net::NodePtr node = command.getNode();
00232
00233 if( !config )
00234 {
00235 reply.configID = EQ_ID_INVALID;
00236 node->send( reply );
00237 return net::COMMAND_HANDLED;
00238 }
00239
00240 Config* appConfig = new Config( *config );
00241 appConfig->setApplicationNetNode( node );
00242
00243 registerConfig( appConfig );
00244
00245
00246 const std::string rendererInfo = packet->rendererInfo;
00247 const size_t colonPos = rendererInfo.find( '#' );
00248 const std::string workDir = rendererInfo.substr( 0, colonPos );
00249 const std::string renderClient = rendererInfo.substr( colonPos + 1 );
00250
00251 const uint32_t configID = appConfig->getID();
00252 appConfig->setWorkDir( workDir );
00253 appConfig->setRenderClient( renderClient );
00254 _appConfigs[configID] = appConfig;
00255
00256 const std::string& name = appConfig->getName();
00257
00258 eq::ServerCreateConfigPacket createConfigPacket;
00259 createConfigPacket.configID = configID;
00260 createConfigPacket.objectID = appConfig->getDistributorID();
00261 createConfigPacket.appNodeID = node->getNodeID();
00262 createConfigPacket.appNodeID.convertToNetwork();
00263 node->send( createConfigPacket, name );
00264
00265 reply.configID = configID;
00266 node->send( reply );
00267
00268 return net::COMMAND_HANDLED;
00269 }
00270
00271 net::CommandResult Server::_cmdUseConfig( net::Command& command )
00272 {
00273 const eq::ServerUseConfigPacket* packet =
00274 command.getPacket<eq::ServerUseConfigPacket>();
00275 EQINFO << "Handle use config " << packet << std::endl;
00276
00277 std::string configInfo = packet->configInfo;
00278 size_t colonPos = configInfo.find( '#' );
00279 const std::string workDir = configInfo.substr( 0, colonPos );
00280
00281 configInfo = configInfo.substr( colonPos + 1 );
00282 colonPos = configInfo.find( '#' );
00283 const std::string renderClient = configInfo.substr( 0, colonPos );
00284 const std::string configData = configInfo.substr( colonPos + 1 );
00285
00286 Loader loader;
00287 Config* config = loader.parseConfig( configData.c_str( ));
00288
00289 eq::ServerChooseConfigReplyPacket reply( packet );
00290 net::NodePtr node = command.getNode();
00291
00292 if( !config )
00293 {
00294 EQWARN << "Use config parsing failed " << std::endl;
00295 reply.configID = EQ_ID_INVALID;
00296 node->send( reply );
00297 return net::COMMAND_HANDLED;
00298 }
00299
00300 EQINFO << "Using config: " << std::endl << Global::instance() << config
00301 << std::endl;
00302 config->setApplicationNetNode( node );
00303 config->_server = this;
00304 registerConfig( config );
00305
00306 const uint32_t configID = config->getID();
00307 config->setWorkDir( workDir );
00308 config->setRenderClient( renderClient );
00309 _appConfigs[configID] = config;
00310
00311 const std::string& name = config->getName();
00312
00313 eq::ServerCreateConfigPacket createConfigPacket;
00314 createConfigPacket.configID = configID;
00315 createConfigPacket.objectID = config->getDistributorID();
00316 createConfigPacket.appNodeID = node->getNodeID();
00317 createConfigPacket.appNodeID.convertToNetwork();
00318 node->send( createConfigPacket, name );
00319
00320 reply.configID = configID;
00321 node->send( reply );
00322 return net::COMMAND_HANDLED;
00323 }
00324
00325 net::CommandResult Server::_cmdReleaseConfig( net::Command& command )
00326 {
00327 const eq::ServerReleaseConfigPacket* packet =
00328 command.getPacket<eq::ServerReleaseConfigPacket>();
00329 EQINFO << "Handle release config " << packet << std::endl;
00330
00331 eq::ServerReleaseConfigReplyPacket reply( packet );
00332 Config* config = _appConfigs[packet->configID];
00333 net::NodePtr node = command.getNode();
00334
00335 if( !config )
00336 {
00337 EQWARN << "Release request for unknown config" << std::endl;
00338 node->send( reply );
00339 return net::COMMAND_HANDLED;
00340 }
00341
00342 if( config->isRunning( ))
00343 {
00344 EQWARN << "Release of running configuration" << std::endl;
00345 config->exit();
00346 }
00347
00348 config->unmap();
00349
00350 eq::ServerDestroyConfigPacket destroyConfigPacket;
00351 destroyConfigPacket.configID = config->getID();
00352 node->send( destroyConfigPacket );
00353
00354 EQCHECK( deregisterSession( config ));
00355
00356 _appConfigs.erase( packet->configID );
00357 delete config;
00358
00359 node->send( reply );
00360 EQLOG( base::LOG_ANY ) << "----- Released Config -----" << std::endl;
00361
00362 return net::COMMAND_HANDLED;
00363 }
00364
00365 net::CommandResult Server::_cmdShutdown( net::Command& command )
00366 {
00367 const eq::ServerShutdownPacket* packet =
00368 command.getPacket< eq::ServerShutdownPacket >();
00369
00370 eq::ServerShutdownReplyPacket reply( packet );
00371
00372 reply.result = _appConfigs.empty();
00373 if( reply.result )
00374 {
00375 _running = false;
00376 EQINFO << "Shutting down server" << std::endl;
00377 }
00378 else
00379 EQWARN << "Ignoring shutdown request, " << _appConfigs.size()
00380 << " configs still active" << std::endl;
00381
00382 net::NodePtr node = command.getNode();
00383 node->send( reply );
00384 return net::COMMAND_HANDLED;
00385 }
00386
00387 std::ostream& operator << ( std::ostream& os, const Server* server )
00388 {
00389 if( !server )
00390 return os;
00391
00392 os << base::disableFlush << base::disableHeader << "server " << std::endl;
00393 os << "{" << std::endl << base::indent;
00394
00395 const net::ConnectionDescriptionVector& cds =
00396 server->getConnectionDescriptions();
00397 for( net::ConnectionDescriptionVector::const_iterator i = cds.begin();
00398 i != cds.end(); ++i )
00399
00400 os << static_cast< const ConnectionDescription* >( (*i).get( ));
00401
00402 const ConfigVector& configs = server->getConfigs();
00403 for( ConfigVector::const_iterator i = configs.begin();
00404 i != configs.end(); ++i )
00405
00406 os << *i;
00407
00408 os << base::exdent << "}" << base::enableHeader << base::enableFlush
00409 << std::endl;
00410
00411 return os;
00412 }
00413
00414 }
00415 }