thread.cpp

00001 
00002 /* Copyright (c) 2005-2009, Stefan Eilemann <eile@equalizergraphics.com> 
00003  *
00004  * This library is free software; you can redistribute it and/or modify it under
00005  * the terms of the GNU Lesser General Public License version 2.1 as published
00006  * by the Free Software Foundation.
00007  *  
00008  * This library is distributed in the hope that it will be useful, but WITHOUT
00009  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00010  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
00011  * details.
00012  * 
00013  * You should have received a copy of the GNU Lesser General Public License
00014  * along with this library; if not, write to the Free Software Foundation, Inc.,
00015  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00016  */
00017 
00018 #include "thread.h"
00019 
00020 #include "base.h"
00021 #include "debug.h"
00022 #include "lock.h"
00023 #include "log.h"
00024 #include "rng.h"
00025 #include "scopedMutex.h"
00026 #include "executionListener.h"
00027 
00028 #include <eq/base/lock.h>
00029 
00030 #include <errno.h>
00031 #include <string.h>
00032 #include <pthread.h>
00033 #include <algorithm>
00034 
00035 using namespace std;
00036 
00037 /* 
00038  * EQ_WIN32_SDP_JOIN_WAR: When using SDP connections on Win32, the join() of the
00039  * receiver thread blocks indefinitely, even after the thread has exited. This
00040  * workaround uses a monitor to implement the join functionality independently
00041  * of pthreads.
00042  */
00043 
00044 // Experimental Win32 thread pinning
00045 #ifdef WIN32
00046 //#  define EQ_WIN32_THREAD_AFFINITY
00047 #endif
00048 
00049 namespace eq
00050 {
00051 namespace base
00052 {
00053 namespace
00054 {
00055 static Lock& _listenerLock()
00056 {
00057     static Lock lock;
00058     return lock;
00059 }
00060 
00061 static std::vector<ExecutionListener*>& _listeners()
00062 {
00063     static std::vector<ExecutionListener*> listeners;
00064     return listeners;
00065 }
00066 }
00067 
00068 static pthread_key_t _createCleanupKey();
00069 void                 _notifyStopping( void* arg );
00070 
00071 static pthread_key_t _cleanupKey = _createCleanupKey();
00072 
00073 pthread_key_t _createCleanupKey()
00074 {
00075     const int error = pthread_key_create( &_cleanupKey, _notifyStopping );
00076     if( error )
00077     {
00078         EQERROR
00079             << "Can't create thread-specific key for thread cleanup handler: " 
00080             << strerror( error ) << std::endl;
00081         EQASSERT( !error );
00082     }
00083     return _cleanupKey;
00084 }
00085 
00086 class ThreadPrivate
00087 {
00088 public:
00089     pthread_t threadID;
00090 };
00091 
00092 Thread::Thread()
00093         : _data( new ThreadPrivate )
00094         , _state( STATE_STOPPED )
00095 #ifdef EQ_WIN32_SDP_JOIN_WAR
00096         , _retVal( 0 )
00097 #endif
00098 {
00099     memset( &_data->threadID, 0, sizeof( pthread_t ));
00100     _syncChild.set();
00101 }
00102 
00103 Thread::~Thread()
00104 {
00105     delete _data;
00106     _data = 0;
00107 }
00108 
00109 void* Thread::runChild( void* arg )
00110 {
00111     Thread* thread = static_cast<Thread*>(arg);
00112     thread->_runChild();
00113     return 0; // not reached
00114 }
00115 
00116 void Thread::_runChild()
00117 {
00118 #ifdef EQ_WIN32_SDP_JOIN_WAR
00119     _running = true;
00120 #endif
00121     pinCurrentThread();
00122 
00123     _data->threadID = pthread_self(); // XXX remove, set during create already?
00124 
00125     if( !init( ))
00126     {
00127         EQWARN << "Thread failed to initialize" << endl;
00128         _state = STATE_STOPPED;
00129         _syncChild.unset();
00130         pthread_exit( 0 );
00131     }
00132 
00133     _state    = STATE_RUNNING;
00134     EQINFO << "Thread successfully initialized" << endl;
00135     pthread_setspecific( _cleanupKey, this ); // install cleanup handler
00136     _notifyStarted();
00137     _syncChild.unset(); // sync w/ parent
00138 
00139     void* result = run();
00140     EQINFO << "Thread finished with result " << result << endl;
00141     this->exit( result );
00142 
00143     EQUNREACHABLE;
00144 }
00145 
00146 void Thread::_notifyStarted()
00147 {
00148     // make copy of vector so that listeners can add/remove listeners.
00149     _listenerLock().set();
00150     const std::vector<ExecutionListener*> listeners = _listeners();
00151     _listenerLock().unset();
00152 
00153     EQVERB << "Calling " << listeners.size() << " thread started listeners"
00154            << endl;
00155     for( vector<ExecutionListener*>::const_iterator i = listeners.begin();
00156          i != listeners.end(); ++i )
00157         
00158         (*i)->notifyExecutionStarted();
00159 }
00160 
00161 void _notifyStopping( void* )
00162 { 
00163     Thread::_notifyStopping();
00164 }
00165 
00166 void Thread::_notifyStopping()
00167 {
00168     pthread_setspecific( _cleanupKey, 0 );
00169 
00170     // make copy of vector so that listeners can add/remove listeners.
00171     _listenerLock().set();
00172     std::vector< ExecutionListener* > listeners = _listeners();
00173     _listenerLock().unset();
00174 
00175     // Call them in reverse order so that symmetry is kept
00176     for( vector< ExecutionListener* >::reverse_iterator i = listeners.rbegin();
00177          i != listeners.rend(); ++i )
00178         
00179         (*i)->notifyExecutionStopping();
00180 }
00181 
00182 bool Thread::start()
00183 {
00184     if( _state != STATE_STOPPED )
00185         return false;
00186 
00187     _state = STATE_STARTING;
00188 
00189     pthread_attr_t attributes;
00190     pthread_attr_init( &attributes );
00191     pthread_attr_setscope( &attributes, PTHREAD_SCOPE_SYSTEM );
00192 
00193     int nTries = 10;
00194     while( nTries-- )
00195     {
00196         const int error = pthread_create( &_data->threadID, &attributes,
00197                                           runChild, this );
00198 
00199         if( error == 0 ) // succeeded
00200         {
00201             EQVERB << "Created pthread " << this << endl;
00202             break;
00203         }
00204         if( error != EAGAIN || nTries==0 )
00205         {
00206             EQWARN << "Could not create thread: " << strerror( error )
00207                    << endl;
00208             return false;
00209         }
00210     }
00211 
00212     _syncChild.set(); // sync with child's entry func
00213     _state = STATE_RUNNING;
00214     return true;
00215 }
00216 
00217 void Thread::exit( void* retVal )
00218 {
00219     EQASSERTINFO( isCurrent(), "Thread::exit not called from child thread" );
00220 
00221     EQINFO << "Exiting thread" << endl;
00222     _state = STATE_STOPPING;
00223 
00224 #ifdef EQ_WIN32_SDP_JOIN_WAR
00225     _running = false;
00226     _retVal  = retVal;
00227 #endif
00228 
00229     pthread_exit( (void*)retVal );
00230     EQUNREACHABLE;
00231 }
00232 
00233 void Thread::cancel()
00234 {
00235     EQASSERTINFO( !isCurrent(), "Thread::cancel called from child thread" );
00236 
00237     EQINFO << "Cancelling thread" << endl;
00238     _state = STATE_STOPPING;
00239 
00240     pthread_cancel( _data->threadID );
00241     EQUNREACHABLE;
00242 }
00243 
00244 bool Thread::join( void** retVal )
00245 {
00246     if( _state == STATE_STOPPED )
00247         return false;
00248     if( isCurrent( )) // can't join self
00249         return false;
00250 
00251     EQVERB << "Joining thread" << endl;
00252 #ifdef EQ_WIN32_SDP_JOIN_WAR
00253     _running.waitEQ( false );
00254 #else
00255     void *_retVal;
00256     const int error = pthread_join( _data->threadID, &_retVal);
00257     if( error != 0 )
00258     {
00259         EQWARN << "Error joining thread: " << strerror(error) << endl;
00260         return false;
00261     }
00262 #endif
00263 
00264     _state = STATE_STOPPED;
00265     if( retVal )
00266         *retVal = _retVal;
00267     return true;
00268 }
00269 
00270 bool Thread::isCurrent() const
00271 {
00272     return pthread_equal( pthread_self(), _data->threadID );
00273 }
00274 
00275 size_t Thread::getSelfThreadID()
00276 {
00277 #ifdef PTW32_VERSION
00278     return reinterpret_cast< size_t >( pthread_self().p );
00279 #else
00280     return ( size_t )( pthread_self( ));
00281 #endif
00282 }
00283 
00284 void Thread::addListener( ExecutionListener* listener )
00285 {
00286     ScopedMutex mutex( _listenerLock() );
00287     _listeners().push_back( listener );
00288 }
00289 
00290 bool Thread::removeListener( ExecutionListener* listener )
00291 {
00292     ScopedMutex mutex( _listenerLock() );
00293 
00294     vector< ExecutionListener* >::iterator i = find( _listeners().begin(),
00295                                                      _listeners().end(),
00296                                                      listener );
00297     if( i == _listeners().end( ))
00298         return false;
00299 
00300     _listeners().erase( i );
00301     return true;
00302 }
00303 
00304 void Thread::removeAllListeners()
00305 {
00306     _listenerLock().set();
00307 
00308     EQINFO << _listeners().size() << " thread listeners active" << endl;
00309     for( vector<ExecutionListener*>::const_iterator i = _listeners().begin();
00310          i != _listeners().end(); ++i )
00311 
00312         EQINFO << "    " << typeid( **i ).name() << endl;
00313 
00314     _listenerLock().unset();
00315     
00316     _notifyStopping();
00317 
00318     _listenerLock().set();
00319     _listeners().clear();
00320     _listenerLock().unset();
00321 }
00322 
00323 
00324 void Thread::pinCurrentThread()
00325 {
00326 #ifdef EQ_WIN32_THREAD_AFFINITY
00327     static Lock lock;
00328     ScopedMutex mutex( lock );
00329 
00330     static DWORD_PTR processMask = 0;
00331     static DWORD_PTR processor   = 0;
00332     if( processMask == 0 )
00333     {
00334         // Get available processors
00335         DWORD_PTR systemMask;
00336         if( GetProcessAffinityMask( GetCurrentProcess(), &processMask, 
00337             &systemMask ) == 0 )
00338         {
00339             EQWARN << "Can't get usable processor mask" << endl;
00340             return;
00341         }
00342         EQINFO << "Available processors 0x" << hex << processMask << dec <<endl;
00343 
00344         // Choose random starting processor: Multiple Eq apps on the same node
00345         // would otherwise use the same processor for the same thread
00346         unsigned nProcessors = 0;
00347         for( DWORD_PTR i = 1; i != 0; i <<= 1 )
00348         {
00349             if( processMask & i )
00350                 ++nProcessors;
00351         }
00352         EQINFO << nProcessors << " available processors" << endl;
00353 
00354         unsigned chance = RNG().get< unsigned >();
00355         processor = 1 << (chance % nProcessors);
00356         EQINFO << "Starting with processor " << processor << endl;
00357     }
00358     EQASSERT( processMask != 0 );
00359 
00360     while( true )
00361     {
00362         processor <<= 1;
00363         if( processor == 0 ) // wrap around
00364             processor = 1;
00365 
00366         if( processor & processMask ) // processor is available
00367         {
00368             if( SetThreadAffinityMask( GetCurrentThread(), processor ) == 0 )
00369                 EQWARN << "Can't set thread processor" << endl;
00370             EQINFO << "Pinned thread to processor 0x" << hex << processor << dec
00371                    << endl;
00372             return;
00373         }
00374     }
00375 #endif
00376 }
00377 
00378 std::ostream& operator << ( std::ostream& os, const Thread* thread )
00379 {
00380 #ifdef PTW32_VERSION
00381     os << "Thread " << thread->_data->threadID.p;
00382 #else
00383     os << "Thread " << thread->_data->threadID;
00384 #endif
00385     os << " state " 
00386         << ( thread->_state == Thread::STATE_STOPPED ? "stopped" :
00387             thread->_state == Thread::STATE_STARTING ? "starting" :
00388             thread->_state == Thread::STATE_RUNNING ? "running" :
00389             thread->_state == Thread::STATE_STOPPING ? "stopping" : "unknown" );
00390 
00391 #ifdef PTW32_VERSION
00392     os << " called from " << pthread_self().p;
00393 #else
00394     os << " called from " << pthread_self();
00395 #endif
00396 
00397     return os;
00398 }
00399 }
00400 }
Generated on Mon Aug 10 18:58:41 2009 for Equalizer 0.9 by  doxygen 1.5.8