From d7dffba4bd27bbca2707c8c0d9efdb9a25a99b80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CGe?= Date: Thu, 16 May 2024 01:49:41 -0700 Subject: [PATCH] partial revert to XMutex implementation (note: avert chunity hang-on-exit) --- src/core/chuck_errmsg.cpp | 31 ++++++--- src/core/chuck_oo.cpp | 136 +++++++++++++++++++++++++------------- src/core/chuck_oo.h | 9 ++- src/core/util_buffers.cpp | 79 +++++++++++++++++----- src/core/util_buffers.h | 63 +++++++----------- 5 files changed, 206 insertions(+), 112 deletions(-) diff --git a/src/core/chuck_errmsg.cpp b/src/core/chuck_errmsg.cpp index 5755b2b6d..52831f81c 100644 --- a/src/core/chuck_errmsg.cpp +++ b/src/core/chuck_errmsg.cpp @@ -37,13 +37,16 @@ #include "util_platforms.h" #include "util_string.h" +#ifndef __DISABLE_THREADS__ +#include "util_thread.h" +#endif + #include #include #include #include #include #include -#include // c++11 using namespace std; @@ -72,7 +75,9 @@ static std::string g_error2str = ""; // log globals t_CKINT g_loglevel = CK_LOG_CORE; t_CKINT g_logstack = 0; -std::mutex g_logmutex; +#ifndef __DISABLE_THREADS__ +XMutex g_logmutex; +#endif // more local globals std::stringstream g_stdout_stream; @@ -877,9 +882,6 @@ void EM_print2magenta( const char * message, ... ) //----------------------------------------------------------------------------- void EM_log( t_CKINT level, const char * message, ... ) { - // 1.5.2.5 (ge) added - std::lock_guard lock(g_logmutex); - va_list ap; if( level > CK_LOG_ALL ) level = CK_LOG_ALL; @@ -888,6 +890,10 @@ void EM_log( t_CKINT level, const char * message, ... ) // check level if( level > g_loglevel ) return; + #ifndef __DISABLE_THREADS__ + g_logmutex.acquire(); + #endif + TC::off(); CK_FPRINTF_STDERR( "[%s:%s:%s]: ", TC::blue("chuck", true).c_str(), @@ -905,6 +911,10 @@ void EM_log( t_CKINT level, const char * message, ... ) CK_FPRINTF_STDERR( "\n" ); CK_FFLUSH_STDERR(); + + #ifndef __DISABLE_THREADS__ + g_logmutex.release(); + #endif } @@ -916,9 +926,6 @@ void EM_log( t_CKINT level, const char * message, ... ) //----------------------------------------------------------------------------- void EM_log_opts( t_CKINT level, enum em_LogOpts options, const char * message, ... ) { - // 1.5.2.5 (ge) added - std::lock_guard lock(g_logmutex); - va_list ap; if( level > CK_LOG_ALL ) level = CK_LOG_ALL; @@ -932,6 +939,10 @@ void EM_log_opts( t_CKINT level, enum em_LogOpts options, const char * message, // whether to print newline at end t_CKBOOL nl = !(options & EM_LOG_NO_NEWLINE); + #ifndef __DISABLE_THREADS__ + g_logmutex.acquire(); + #endif + // check option if( prefix ) { @@ -956,6 +967,10 @@ void EM_log_opts( t_CKINT level, enum em_LogOpts options, const char * message, // flush CK_FFLUSH_STDERR(); + + #ifndef __DISABLE_THREADS__ + g_logmutex.release(); + #endif } diff --git a/src/core/chuck_oo.cpp b/src/core/chuck_oo.cpp index 55ae385dc..e2559ebc4 100644 --- a/src/core/chuck_oo.cpp +++ b/src/core/chuck_oo.cpp @@ -3103,19 +3103,19 @@ t_CKUINT Chuck_Event::our_waiting_on = 0; //----------------------------------------------------------------------------- void Chuck_Event::signal_local() { - // 1.5.2.5 (ge) updated to std::mutex - std::lock_guard lock(m_queue_lock); - - // if queue not empty + #ifndef __DISABLE_THREADS__ + m_queue_lock.acquire(); + #endif if( !m_queue.empty() ) { // get the shred on top of the queue Chuck_VM_Shred * shred = m_queue.front(); // pop the top m_queue.pop(); - // release lock - m_queue_lock.unlock(); - + // release it! + #ifndef __DISABLE_THREADS__ + m_queue_lock.release(); + #endif // REFACTOR-2017: BUG-FIX // release the extra ref we added when we started waiting for this event CK_SAFE_RELEASE( shred->event ); @@ -3129,6 +3129,12 @@ void Chuck_Event::signal_local() t_CKTIME *& sp = (t_CKTIME *&)shred->reg->sp; push_( sp, shreduler->now_system ); } + else + { + #ifndef __DISABLE_THREADS__ + m_queue_lock.release(); + #endif + } } @@ -3140,13 +3146,13 @@ void Chuck_Event::signal_local() //----------------------------------------------------------------------------- t_CKBOOL Chuck_Event::remove( Chuck_VM_Shred * shred ) { - // 1.5.2.5 (ge) updated to std::mutex - std::lock_guard lock(m_queue_lock); - - // queue of shred pointers queue temp; t_CKBOOL removed = FALSE; + // lock + #ifndef __DISABLE_THREADS__ + m_queue_lock.acquire(); + #endif // while something in queue while( !m_queue.empty() ) { @@ -3175,6 +3181,10 @@ t_CKBOOL Chuck_Event::remove( Chuck_VM_Shred * shred ) // copy temp back to queue m_queue = temp; + // release lock + #ifndef __DISABLE_THREADS__ + m_queue_lock.release(); + #endif return removed; } @@ -3256,12 +3266,13 @@ void Chuck_Event::global_listen( t_CKINT id, void (* cb)(t_CKINT), //----------------------------------------------------------------------------- t_CKBOOL Chuck_Event::remove_listen( void (* cb)(void) ) { - // 1.5.2.5 (ge) updated to std::mutex - std::lock_guard lock(m_queue_lock); - // temp queue std::queue temp; t_CKBOOL removed = FALSE; + // lock + #ifndef __DISABLE_THREADS__ + m_queue_lock.acquire(); + #endif // while something in queue while( !m_global_queue.empty() ) { @@ -3283,6 +3294,10 @@ t_CKBOOL Chuck_Event::remove_listen( void (* cb)(void) ) // copy temp back to queue m_global_queue = temp; + // release lock + #ifndef __DISABLE_THREADS__ + m_queue_lock.release(); + #endif return removed; } @@ -3296,12 +3311,13 @@ t_CKBOOL Chuck_Event::remove_listen( void (* cb)(void) ) //----------------------------------------------------------------------------- t_CKBOOL Chuck_Event::remove_listen( std::string name, void (* cb)(const char *) ) { - // 1.5.2.5 (ge) updated to std::mutex - std::lock_guard lock(m_queue_lock); - std::queue temp; t_CKBOOL removed = FALSE; + // lock + #ifndef __DISABLE_THREADS__ + m_queue_lock.acquire(); + #endif // while something in queue while( !m_global_queue.empty() ) { @@ -3323,6 +3339,10 @@ t_CKBOOL Chuck_Event::remove_listen( std::string name, void (* cb)(const char *) // copy temp back to queue m_global_queue = temp; + // release lock + #ifndef __DISABLE_THREADS__ + m_queue_lock.release(); + #endif return removed; } @@ -3336,12 +3356,13 @@ t_CKBOOL Chuck_Event::remove_listen( std::string name, void (* cb)(const char *) //----------------------------------------------------------------------------- t_CKBOOL Chuck_Event::remove_listen( t_CKINT id, void (* cb)(t_CKINT) ) { - // 1.5.2.5 (ge) updated to std::mutex - std::lock_guard lock(m_queue_lock); - // temp queue of listeners std::queue temp; t_CKBOOL removed = FALSE; + // lock + #ifndef __DISABLE_THREADS__ + m_queue_lock.acquire(); + #endif // while something in queue while( !m_global_queue.empty() ) { @@ -3363,6 +3384,10 @@ t_CKBOOL Chuck_Event::remove_listen( t_CKINT id, void (* cb)(t_CKINT) ) // copy temp back to queue m_global_queue = temp; + // release lock + #ifndef __DISABLE_THREADS__ + m_queue_lock.release(); + #endif return removed; } @@ -3376,10 +3401,10 @@ t_CKBOOL Chuck_Event::remove_listen( t_CKINT id, void (* cb)(t_CKINT) ) //----------------------------------------------------------------------------- void Chuck_Event::signal_global() { - // 1.5.2.5 (ge) updated to std::mutex - std::lock_guard lock(m_queue_lock); + #ifndef __DISABLE_THREADS__ + m_queue_lock.acquire(); + #endif - // if global queue not empty if( !m_global_queue.empty() ) { // get the listener on top of the queue @@ -3414,6 +3439,10 @@ void Chuck_Event::signal_global() m_global_queue.push( listener ); } } + + #ifndef __DISABLE_THREADS__ + m_queue_lock.release(); + #endif } @@ -3425,12 +3454,11 @@ void Chuck_Event::signal_global() //----------------------------------------------------------------------------- void Chuck_Event::broadcast_global() { - // 1.5.2.5 (ge) updated to std::mutex - std::lock_guard lock(m_queue_lock); - // queue of event listeners + #ifndef __DISABLE_THREADS__ + m_queue_lock.acquire(); + #endif std::queue< Chuck_Global_Event_Listener > call_again; - // check globals queue while( !m_global_queue.empty() ) { // get the listener on top of the queue @@ -3468,6 +3496,10 @@ void Chuck_Event::broadcast_global() // for those that should be called again, store them again m_global_queue = call_again; + + #ifndef __DISABLE_THREADS__ + m_queue_lock.release(); + #endif } @@ -3481,22 +3513,27 @@ void Chuck_Event::broadcast_global() //----------------------------------------------------------------------------- void Chuck_Event::queue_broadcast( CBufferSimple * event_buffer ) { - // 1.5.2.5 (ge) updated to std::mutex - std::lock_guard lock(m_queue_lock); // TODO: handle multiple VM - - // if not empty + #ifndef __DISABLE_THREADS__ + m_queue_lock.acquire(); + #endif if( !m_queue.empty() ) { // get shred (only to get the VM ref) Chuck_VM_Shred * shred = m_queue.front(); - // release lock - m_queue_lock.unlock(); - + #ifndef __DISABLE_THREADS__ + m_queue_lock.release(); + #endif // queue the event on the vm (added 1.3.0.0: event_buffer) shred->vm_ref->queue_event( this, 1, event_buffer ); } + else + { + #ifndef __DISABLE_THREADS__ + m_queue_lock.release(); + #endif + } } @@ -3509,21 +3546,28 @@ void Chuck_Event::queue_broadcast( CBufferSimple * event_buffer ) //----------------------------------------------------------------------------- void Chuck_Event::broadcast_local() { - // 1.5.2.5 (ge) updated to std::mutex - std::lock_guard lock(m_queue_lock); - + // lock queue + #ifndef __DISABLE_THREADS__ + m_queue_lock.acquire(); + #endif // while not empty while( !m_queue.empty() ) { // release first - m_queue_lock.unlock(); - + #ifndef __DISABLE_THREADS__ + m_queue_lock.release(); + #endif // signal the next shred this->signal_local(); - // lock again - m_queue_lock.lock(); + #ifndef __DISABLE_THREADS__ + m_queue_lock.acquire(); + #endif } + // release + #ifndef __DISABLE_THREADS__ + m_queue_lock.release(); + #endif } @@ -3548,12 +3592,14 @@ void Chuck_Event::wait( Chuck_VM_Shred * shred, Chuck_VM * vm ) // suspend shred->is_running = FALSE; - // 1.5.2.5 (ge) updated to std::mutex - std::lock_guard lock(m_queue_lock); // add to waiting list + #ifndef __DISABLE_THREADS__ + m_queue_lock.acquire(); + #endif m_queue.push( shred ); - // release - m_queue_lock.unlock(); + #ifndef __DISABLE_THREADS__ + m_queue_lock.release(); + #endif // add event to shred assert( shred->event == NULL ); diff --git a/src/core/chuck_oo.h b/src/core/chuck_oo.h index 82d8d2852..3f2af3f31 100644 --- a/src/core/chuck_oo.h +++ b/src/core/chuck_oo.h @@ -36,12 +36,14 @@ #include "chuck_def.h" #include "chuck_carrier.h" +#ifndef __DISABLE_THREADS__ +#include "util_thread.h" // added 1.3.0.0 +#endif #include #include #include #include -#include // c++11 @@ -788,10 +790,11 @@ struct Chuck_Event : public Chuck_Object std::queue m_queue; std::queue m_global_queue; + #ifndef __DISABLE_THREADS__ // 1.4.1.0 (ge/jack) TODO: rewrite queue_broadcast to use a lock-free queue // and avoid the use of a lock in events - // 1.5.2.5 (ge) updated from XMutex to std::mutex - std::mutex m_queue_lock; + XMutex m_queue_lock; + #endif }; diff --git a/src/core/util_buffers.cpp b/src/core/util_buffers.cpp index 9fd9e0e2a..6536f744b 100644 --- a/src/core/util_buffers.cpp +++ b/src/core/util_buffers.cpp @@ -120,8 +120,10 @@ void CBufferAdvance::cleanup() //----------------------------------------------------------------------------- UINT__ CBufferAdvance::join( Chuck_Event * event ) { - // lock | 1.5.2.5 (ge) added - std::lock_guard lock(m_mutex); + // TODO: necessary? + #ifndef __DISABLE_THREADS__ + m_mutex.acquire(); + #endif // index of new pointer that will be pushed back UINT__ read_offset_index; @@ -142,32 +144,42 @@ UINT__ CBufferAdvance::join( Chuck_Event * event ) m_read_offsets.push_back( ReadOffset( m_write_offset, event ) ); } + // TODO: necessary? + #ifndef __DISABLE_THREADS__ + m_mutex.release(); + #endif + // return index return read_offset_index; } - - //----------------------------------------------------------------------------- // name: resign // desc: shred quits buffer; frees its index //----------------------------------------------------------------------------- void CBufferAdvance::resign( UINT__ read_offset_index ) { - // lock | 1.5.2.5 (ge) added - std::lock_guard lock(m_mutex); - // make sure read_offset_index passed in is valid if( read_offset_index >= m_read_offsets.size() ) return; + // TODO: necessary? + #ifndef __DISABLE_THREADS__ + m_mutex.acquire(); + #endif + // add this index to free queue m_free.push( read_offset_index ); // "invalidate" the pointer at that index m_read_offsets[read_offset_index].read_offset = -1; m_read_offsets[read_offset_index].event = NULL; + + // TODO: necessary? + #ifndef __DISABLE_THREADS__ + m_mutex.release(); + #endif } @@ -200,12 +212,14 @@ void CBufferAdvance::resign( UINT__ read_offset_index ) void CBufferAdvance::put( void * data, UINT__ num_elem ) { - // lock | 1.5.2.5 (ge) added - std::lock_guard lock(m_mutex); - UINT__ i, j; BYTE__ * d = (BYTE__ *)data; + // TODO: necessary? + #ifndef __DISABLE_THREADS__ + m_mutex.acquire(); + #endif + // copy for( i = 0; i < num_elem; i++ ) { @@ -234,6 +248,11 @@ void CBufferAdvance::put( void * data, UINT__ num_elem ) m_read_offsets[j].event->queue_broadcast( m_event_buffer ); } } + + // TODO: necessary? + #ifndef __DISABLE_THREADS__ + m_mutex.release(); + #endif } @@ -300,16 +319,24 @@ UINT__ CBufferAdvance::get( void * data, UINT__ num_elem, UINT__ read_offset_ind UINT__ i, j; BYTE__ * d = (BYTE__ *)data; - // lock | 1.5.2.5 (ge) added - std::lock_guard lock(m_mutex); + // TODO: necessary? + #ifndef __DISABLE_THREADS__ + m_mutex.acquire(); + #endif // make sure index is valid if( read_offset_index >= m_read_offsets.size() ) { + #ifndef __DISABLE_THREADS__ + m_mutex.release(); + #endif return 0; } if( m_read_offsets[read_offset_index].read_offset < 0 ) { + #ifndef __DISABLE_THREADS__ + m_mutex.release(); + #endif return 0; } @@ -318,6 +345,9 @@ UINT__ CBufferAdvance::get( void * data, UINT__ num_elem, UINT__ read_offset_ind // read catch up with write if( m_read_offset == m_write_offset ) { + #ifndef __DISABLE_THREADS__ + m_mutex.release(); + #endif return 0; } @@ -346,6 +376,11 @@ UINT__ CBufferAdvance::get( void * data, UINT__ num_elem, UINT__ read_offset_ind // update read offset at given index m_read_offsets[read_offset_index].read_offset = m_read_offset; + // TODO: necessary? + #ifndef __DISABLE_THREADS__ + m_mutex.release(); + #endif + // return number of elems return i; } @@ -430,8 +465,9 @@ void CBufferSimple::put( void * data, UINT__ num_elem ) UINT__ i, j; BYTE__ * d = (BYTE__ *)data; - // lock | 1.5.2.5 (ge) added - std::lock_guard lock(m_mutex); +#ifndef __DISABLE_THREADS__ + m_mutex.acquire(); +#endif // copy for( i = 0; i < num_elem; i++ ) @@ -446,6 +482,10 @@ void CBufferSimple::put( void * data, UINT__ num_elem ) // change to fully "atomic" increment+wrap m_write_offset = (m_write_offset + 1) % m_max_elem; } + +#ifndef __DISABLE_THREADS__ + m_mutex.release(); +#endif } @@ -460,13 +500,14 @@ UINT__ CBufferSimple::get( void * data, UINT__ num_elem ) UINT__ i, j; BYTE__ * d = (BYTE__ *)data; - // lock | 1.5.2.5 (ge) added - std::lock_guard lock(m_mutex); - // read catch up with write if( m_read_offset == m_write_offset ) return 0; +#ifndef __DISABLE_THREADS__ + m_mutex.acquire(); +#endif + // copy for( i = 0; i < num_elem; i++ ) { @@ -488,6 +529,10 @@ UINT__ CBufferSimple::get( void * data, UINT__ num_elem ) } } +#ifndef __DISABLE_THREADS__ + m_mutex.release(); +#endif + // return number of elems return 1; // shouldn't it return i? } diff --git a/src/core/util_buffers.h b/src/core/util_buffers.h index 679e7a88b..1407c4d17 100644 --- a/src/core/util_buffers.h +++ b/src/core/util_buffers.h @@ -37,11 +37,13 @@ #include "chuck_oo.h" #include "chuck_errmsg.h" +#ifndef __DISABLE_THREADS__ +#include "util_thread.h" +#endif #include #include #include -#include // added 1.5.2.5 (ge) for "lock-free" circle buffer | c++11 -#include // added 1.5.2.5 (ge) hmm so much for "lock-free"... | c++11 +#include // c++11 #define DWORD__ t_CKUINT #define SINT__ t_CKINT @@ -82,7 +84,7 @@ class CBufferAdvance protected: BYTE__ * m_data; UINT__ m_data_width; - // UINT__ m_read_offset; + //UINT__ m_read_offset; // this holds the offset allocated by join(), paired with an optional // Chuck_Event to notify when things are put in the buffer @@ -95,12 +97,15 @@ class CBufferAdvance }; std::vector m_read_offsets; std::queue m_free; + SINT__ m_write_offset; SINT__ m_max_elem; - // updated | 1.5.2.5 (ge) from XMutex to std::mutex - std::mutex m_mutex; // TODO necessary? - // buffer + // TODO: necessary? + #ifndef __DISABLE_THREADS__ + XMutex m_mutex; + #endif + CBufferSimple * m_event_buffer; }; @@ -127,14 +132,15 @@ class CBufferSimple protected: BYTE__ * m_data; - std::atomic_ulong m_data_width; - std::atomic_ulong m_read_offset; - std::atomic_ulong m_write_offset; - std::atomic_ulong m_max_elem; + UINT__ m_data_width; + UINT__ m_read_offset; + UINT__ m_write_offset; + UINT__ m_max_elem; +#ifndef __DISABLE_THREADS__ // added | 1.5.1.5 (ge & andrew) twilight zone - // updated | 1.5.2.5 (ge) to std::mutex - std::mutex m_mutex; + XMutex m_mutex; +#endif }; @@ -338,6 +344,8 @@ class FastCircularBuffer //----------------------------------------------------------------------------- // name: class XCircleBuffer // desc: templated circular buffer class (different impl from ge-X-lib) +// NOTE: the lock-free queue DOES NOT WORK; can crash; keeping for study +// NOTE: use FinalRingBuffer instead | 1.5.2.5 (ge) //----------------------------------------------------------------------------- template class XCircleBuffer @@ -374,6 +382,8 @@ class XCircleBuffer inline void advanceRead(); protected: + // the buffer + T * m_buffer; // the buffer length (capacity) std::atomic_ulong m_length; // write index @@ -382,12 +392,6 @@ class XCircleBuffer std::atomic_ulong m_readIndex; // num elements std::atomic_ulong m_numElements; - - // the buffer - T * m_buffer; - - // mutex | 1.5.2.5 (ge) - std::mutex m_mutex; // TODO necessary? review code }; @@ -418,8 +422,7 @@ XCircleBuffer::XCircleBuffer( long length ) template XCircleBuffer::~XCircleBuffer() { - CK_SAFE_DELETE_ARRAY( m_buffer ); - m_length = m_readIndex = m_writeIndex = m_numElements = 0; + } @@ -432,9 +435,6 @@ XCircleBuffer::~XCircleBuffer() template void XCircleBuffer::init( long length ) { - // 1.5.2.5 (ge) added - std::lock_guard lock(m_mutex); - // clean up is necessary if( m_buffer ) { @@ -498,9 +498,6 @@ long XCircleBuffer::length() const template void XCircleBuffer::clear() { - // 1.5.2.5 (ge) added - std::lock_guard lock(m_mutex); - // zero out m_readIndex = m_writeIndex = m_numElements = 0; } @@ -564,9 +561,6 @@ void XCircleBuffer::advanceRead() template void XCircleBuffer::put( const T & item ) { - // 1.5.2.5 (ge) added - std::lock_guard lock(m_mutex); - // sanity check if( m_buffer == NULL ) return; @@ -623,9 +617,6 @@ bool XCircleBuffer::more() const template long XCircleBuffer::peek( T * array, long numItems, unsigned long stride ) { - // 1.5.2.5 (ge) added - std::lock_guard lock(m_mutex); - // sanity check if( m_buffer == NULL ) return 0; @@ -677,9 +668,6 @@ long XCircleBuffer::peek( T * array, long numItems, unsigned long stride ) template long XCircleBuffer::pop( long numItems ) { - // 1.5.2.5 (ge) added - std::lock_guard lock(m_mutex); - // sanity check if( m_buffer == NULL ) return 0; @@ -708,9 +696,6 @@ long XCircleBuffer::pop( long numItems ) template bool XCircleBuffer::get( T * result ) { - // 1.5.2.5 (ge) added - std::lock_guard lock(m_mutex); - // sanity check if( m_buffer == NULL || m_readIndex == m_writeIndex ) return false; @@ -799,7 +784,7 @@ void FinalRingBuffer::init( t_CKUINT capacity ) template FinalRingBuffer::~FinalRingBuffer() { - CK_SAFE_DELETE_ARRAY(m_buf); + CK_SAFE_DELETE_ARRAY( m_buf ); m_end = m_head = m_tail = 0; }