thread.cpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "thread.h"
00019
00020 #include "base.h"
00021 #include "debug.h"
00022 #include "lock.h"
00023 #include "log.h"
00024 #include "rng.h"
00025 #include "scopedMutex.h"
00026 #include "executionListener.h"
00027
00028 #include <eq/base/lock.h>
00029
00030 #include <errno.h>
00031 #include <string.h>
00032 #include <pthread.h>
00033 #include <algorithm>
00034
00035 using namespace std;
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045 #ifdef WIN32
00046
00047 #endif
00048
00049 namespace eq
00050 {
00051 namespace base
00052 {
00053 namespace
00054 {
00055 static Lock& _listenerLock()
00056 {
00057 static Lock lock;
00058 return lock;
00059 }
00060
00061 static std::vector<ExecutionListener*>& _listeners()
00062 {
00063 static std::vector<ExecutionListener*> listeners;
00064 return listeners;
00065 }
00066 }
00067
00068 static pthread_key_t _createCleanupKey();
00069 void _notifyStopping( void* arg );
00070
00071 static pthread_key_t _cleanupKey = _createCleanupKey();
00072
00073 pthread_key_t _createCleanupKey()
00074 {
00075 const int error = pthread_key_create( &_cleanupKey, _notifyStopping );
00076 if( error )
00077 {
00078 EQERROR
00079 << "Can't create thread-specific key for thread cleanup handler: "
00080 << strerror( error ) << std::endl;
00081 EQASSERT( !error );
00082 }
00083 return _cleanupKey;
00084 }
00085
00086 class ThreadPrivate
00087 {
00088 public:
00089 pthread_t threadID;
00090 };
00091
00092 Thread::Thread()
00093 : _data( new ThreadPrivate )
00094 , _state( STATE_STOPPED )
00095 #ifdef EQ_WIN32_SDP_JOIN_WAR
00096 , _retVal( 0 )
00097 #endif
00098 {
00099 memset( &_data->threadID, 0, sizeof( pthread_t ));
00100 _syncChild.set();
00101 }
00102
00103 Thread::~Thread()
00104 {
00105 delete _data;
00106 _data = 0;
00107 }
00108
00109 void* Thread::runChild( void* arg )
00110 {
00111 Thread* thread = static_cast<Thread*>(arg);
00112 thread->_runChild();
00113 return 0;
00114 }
00115
00116 void Thread::_runChild()
00117 {
00118 #ifdef EQ_WIN32_SDP_JOIN_WAR
00119 _running = true;
00120 #endif
00121 pinCurrentThread();
00122
00123 _data->threadID = pthread_self();
00124
00125 if( !init( ))
00126 {
00127 EQWARN << "Thread failed to initialize" << endl;
00128 _state = STATE_STOPPED;
00129 _syncChild.unset();
00130 pthread_exit( 0 );
00131 }
00132
00133 _state = STATE_RUNNING;
00134 EQINFO << "Thread successfully initialized" << endl;
00135 pthread_setspecific( _cleanupKey, this );
00136 _notifyStarted();
00137 _syncChild.unset();
00138
00139 void* result = run();
00140 EQINFO << "Thread finished with result " << result << endl;
00141 this->exit( result );
00142
00143 EQUNREACHABLE;
00144 }
00145
00146 void Thread::_notifyStarted()
00147 {
00148
00149 _listenerLock().set();
00150 const std::vector<ExecutionListener*> listeners = _listeners();
00151 _listenerLock().unset();
00152
00153 EQVERB << "Calling " << listeners.size() << " thread started listeners"
00154 << endl;
00155 for( vector<ExecutionListener*>::const_iterator i = listeners.begin();
00156 i != listeners.end(); ++i )
00157
00158 (*i)->notifyExecutionStarted();
00159 }
00160
00161 void _notifyStopping( void* )
00162 {
00163 Thread::_notifyStopping();
00164 }
00165
00166 void Thread::_notifyStopping()
00167 {
00168 pthread_setspecific( _cleanupKey, 0 );
00169
00170
00171 _listenerLock().set();
00172 std::vector< ExecutionListener* > listeners = _listeners();
00173 _listenerLock().unset();
00174
00175
00176 for( vector< ExecutionListener* >::reverse_iterator i = listeners.rbegin();
00177 i != listeners.rend(); ++i )
00178
00179 (*i)->notifyExecutionStopping();
00180 }
00181
00182 bool Thread::start()
00183 {
00184 if( _state != STATE_STOPPED )
00185 return false;
00186
00187 _state = STATE_STARTING;
00188
00189 pthread_attr_t attributes;
00190 pthread_attr_init( &attributes );
00191 pthread_attr_setscope( &attributes, PTHREAD_SCOPE_SYSTEM );
00192
00193 int nTries = 10;
00194 while( nTries-- )
00195 {
00196 const int error = pthread_create( &_data->threadID, &attributes,
00197 runChild, this );
00198
00199 if( error == 0 )
00200 {
00201 EQVERB << "Created pthread " << this << endl;
00202 break;
00203 }
00204 if( error != EAGAIN || nTries==0 )
00205 {
00206 EQWARN << "Could not create thread: " << strerror( error )
00207 << endl;
00208 return false;
00209 }
00210 }
00211
00212 _syncChild.set();
00213 _state = STATE_RUNNING;
00214 return true;
00215 }
00216
00217 void Thread::exit( void* retVal )
00218 {
00219 EQASSERTINFO( isCurrent(), "Thread::exit not called from child thread" );
00220
00221 EQINFO << "Exiting thread" << endl;
00222 _state = STATE_STOPPING;
00223
00224 #ifdef EQ_WIN32_SDP_JOIN_WAR
00225 _running = false;
00226 _retVal = retVal;
00227 #endif
00228
00229 pthread_exit( (void*)retVal );
00230 EQUNREACHABLE;
00231 }
00232
00233 void Thread::cancel()
00234 {
00235 EQASSERTINFO( !isCurrent(), "Thread::cancel called from child thread" );
00236
00237 EQINFO << "Cancelling thread" << endl;
00238 _state = STATE_STOPPING;
00239
00240 pthread_cancel( _data->threadID );
00241 EQUNREACHABLE;
00242 }
00243
00244 bool Thread::join( void** retVal )
00245 {
00246 if( _state == STATE_STOPPED )
00247 return false;
00248 if( isCurrent( ))
00249 return false;
00250
00251 EQVERB << "Joining thread" << endl;
00252 #ifdef EQ_WIN32_SDP_JOIN_WAR
00253 _running.waitEQ( false );
00254 #else
00255 void *_retVal;
00256 const int error = pthread_join( _data->threadID, &_retVal);
00257 if( error != 0 )
00258 {
00259 EQWARN << "Error joining thread: " << strerror(error) << endl;
00260 return false;
00261 }
00262 #endif
00263
00264 _state = STATE_STOPPED;
00265 if( retVal )
00266 *retVal = _retVal;
00267 return true;
00268 }
00269
00270 bool Thread::isCurrent() const
00271 {
00272 return pthread_equal( pthread_self(), _data->threadID );
00273 }
00274
00275 size_t Thread::getSelfThreadID()
00276 {
00277 #ifdef PTW32_VERSION
00278 return reinterpret_cast< size_t >( pthread_self().p );
00279 #else
00280 return ( size_t )( pthread_self( ));
00281 #endif
00282 }
00283
00284 void Thread::addListener( ExecutionListener* listener )
00285 {
00286 ScopedMutex mutex( _listenerLock() );
00287 _listeners().push_back( listener );
00288 }
00289
00290 bool Thread::removeListener( ExecutionListener* listener )
00291 {
00292 ScopedMutex mutex( _listenerLock() );
00293
00294 vector< ExecutionListener* >::iterator i = find( _listeners().begin(),
00295 _listeners().end(),
00296 listener );
00297 if( i == _listeners().end( ))
00298 return false;
00299
00300 _listeners().erase( i );
00301 return true;
00302 }
00303
00304 void Thread::removeAllListeners()
00305 {
00306 _listenerLock().set();
00307
00308 EQINFO << _listeners().size() << " thread listeners active" << endl;
00309 for( vector<ExecutionListener*>::const_iterator i = _listeners().begin();
00310 i != _listeners().end(); ++i )
00311
00312 EQINFO << " " << typeid( **i ).name() << endl;
00313
00314 _listenerLock().unset();
00315
00316 _notifyStopping();
00317
00318 _listenerLock().set();
00319 _listeners().clear();
00320 _listenerLock().unset();
00321 }
00322
00323
00324 void Thread::pinCurrentThread()
00325 {
00326 #ifdef EQ_WIN32_THREAD_AFFINITY
00327 static Lock lock;
00328 ScopedMutex mutex( lock );
00329
00330 static DWORD_PTR processMask = 0;
00331 static DWORD_PTR processor = 0;
00332 if( processMask == 0 )
00333 {
00334
00335 DWORD_PTR systemMask;
00336 if( GetProcessAffinityMask( GetCurrentProcess(), &processMask,
00337 &systemMask ) == 0 )
00338 {
00339 EQWARN << "Can't get usable processor mask" << endl;
00340 return;
00341 }
00342 EQINFO << "Available processors 0x" << hex << processMask << dec <<endl;
00343
00344
00345
00346 unsigned nProcessors = 0;
00347 for( DWORD_PTR i = 1; i != 0; i <<= 1 )
00348 {
00349 if( processMask & i )
00350 ++nProcessors;
00351 }
00352 EQINFO << nProcessors << " available processors" << endl;
00353
00354 unsigned chance = RNG().get< unsigned >();
00355 processor = 1 << (chance % nProcessors);
00356 EQINFO << "Starting with processor " << processor << endl;
00357 }
00358 EQASSERT( processMask != 0 );
00359
00360 while( true )
00361 {
00362 processor <<= 1;
00363 if( processor == 0 )
00364 processor = 1;
00365
00366 if( processor & processMask )
00367 {
00368 if( SetThreadAffinityMask( GetCurrentThread(), processor ) == 0 )
00369 EQWARN << "Can't set thread processor" << endl;
00370 EQINFO << "Pinned thread to processor 0x" << hex << processor << dec
00371 << endl;
00372 return;
00373 }
00374 }
00375 #endif
00376 }
00377
00378 std::ostream& operator << ( std::ostream& os, const Thread* thread )
00379 {
00380 #ifdef PTW32_VERSION
00381 os << "Thread " << thread->_data->threadID.p;
00382 #else
00383 os << "Thread " << thread->_data->threadID;
00384 #endif
00385 os << " state "
00386 << ( thread->_state == Thread::STATE_STOPPED ? "stopped" :
00387 thread->_state == Thread::STATE_STARTING ? "starting" :
00388 thread->_state == Thread::STATE_RUNNING ? "running" :
00389 thread->_state == Thread::STATE_STOPPING ? "stopping" : "unknown" );
00390
00391 #ifdef PTW32_VERSION
00392 os << " called from " << pthread_self().p;
00393 #else
00394 os << " called from " << pthread_self();
00395 #endif
00396
00397 return os;
00398 }
00399 }
00400 }