diff --git a/platform/gucefMT/src/gucefMT_DVRWLOCK.c b/platform/gucefMT/src/gucefMT_DVRWLOCK.c index 4da28abff..bdb303a63 100644 --- a/platform/gucefMT/src/gucefMT_DVRWLOCK.c +++ b/platform/gucefMT/src/gucefMT_DVRWLOCK.c @@ -43,7 +43,7 @@ // // //-------------------------------------------------------------------------*/ -#define RWLOCK_DEFAULT_ACTIVE_READER_STACK_GROW_AMT 16 +#define RWLOCK_DEFAULT_THREAD_INFO_STACK_GROW_AMT 24 /*-------------------------------------------------------------------------// // // @@ -55,7 +55,6 @@ struct SThreadSlot { UInt32 threadId; Int32 reentrancyCount; - char isQueuedReader; struct SThreadSlot* nextSlot; }; @@ -69,6 +68,7 @@ struct SRWLock UInt8 delflag; Int32 activeReaderCount; TThreadSlot* activeReaders; + TThreadSlot* queuedWriters; TThreadSlot* freeThreadInfo; Int32 queuedReaderCount; UInt32 lastQueuedReaderThreadId; @@ -97,7 +97,7 @@ rwl_impl_get_free_slot( TRWLock* rwlock ) if ( GUCEF_NULL == rwlock->freeThreadInfo ) { - for ( Int32 i=0; iqueuedWriters; + + /* + * Try to find an existing slot for this thread + */ + while ( GUCEF_NULL != queuedWriterSlot ) + { + if ( queuedWriterSlot->threadId == threadId ) + return queuedWriterSlot; + queuedWriterSlot = queuedWriterSlot->nextSlot; + } + + /* + * We dont have a reader slot for this thread. + * Its new to this lock. As such grab a new free slot and use that instead + */ + slot = rwl_impl_get_free_slot( rwlock ); + if ( GUCEF_NULL != slot ) + { + slot->threadId = threadId; + queuedWriterSlot = rwlock->queuedWriters; + rwlock->queuedWriters = slot; + slot->nextSlot = queuedWriterSlot; + + /* + * We are adding to the total queued writer threads + */ + ++rwlock->queuedWriterCount; + + GUCEF_END; + return slot; + } + + GUCEF_END; + return slot; +} + +/*--------------------------------------------------------------------------*/ + GUCEF_MT_PRIVATE_C void rwl_impl_free_reader_slot( TRWLock* rwlock, UInt32 threadId ) @@ -225,6 +271,50 @@ rwl_impl_free_reader_slot( TRWLock* rwlock, UInt32 threadId ) /*--------------------------------------------------------------------------*/ +GUCEF_MT_PRIVATE_C +void +rwl_impl_free_queued_writer_slot( TRWLock* rwlock, UInt32 threadId ) +{ + GUCEF_BEGIN; + + TThreadSlot* prevSlot = GUCEF_NULL; + TThreadSlot* queuedWriterSlot = rwlock->queuedWriters; + + /* + * Try to find the existing slot for this thread + */ + while ( GUCEF_NULL != queuedWriterSlot ) + { + if ( queuedWriterSlot->threadId == threadId ) + { + /* + * Remove the thread's slot from the chain of queued writers + */ + if ( GUCEF_NULL != prevSlot ) + prevSlot->nextSlot = queuedWriterSlot->nextSlot; + else + rwlock->queuedWriters = queuedWriterSlot->nextSlot; + + /* + * Reset the slot for re-use and add to the free slot list + */ + queuedWriterSlot->threadId = 0; + queuedWriterSlot->reentrancyCount = 0; + queuedWriterSlot->nextSlot = rwlock->freeThreadInfo; + rwlock->freeThreadInfo = queuedWriterSlot; + + --rwlock->queuedWriterCount; + break; + } + prevSlot = queuedWriterSlot; + queuedWriterSlot = queuedWriterSlot->nextSlot; + } + + GUCEF_END; +} + +/*--------------------------------------------------------------------------*/ + GUCEF_MT_PRIVATE_C char rwl_impl_push_active_reader( TRWLock* rwlock ) @@ -284,6 +374,65 @@ rwl_impl_pop_active_reader( TRWLock* rwlock ) /*--------------------------------------------------------------------------*/ +GUCEF_MT_PRIVATE_C +char +rwl_impl_push_queued_writer( TRWLock* rwlock ) +{ + GUCEF_BEGIN; + + UInt32 threadId = GetCurrentTaskID(); + + TThreadSlot* threadSlot = rwl_impl_get_queued_writer_slot( rwlock, threadId ); + if ( threadSlot == GUCEF_NULL ) + return 0; + + ++threadSlot->reentrancyCount; + + GUCEF_END; + return 1; +} + +/*--------------------------------------------------------------------------*/ + +GUCEF_MT_PRIVATE_C +char +rwl_impl_pop_queued_writer( TRWLock* rwlock ) +{ + GUCEF_BEGIN; + + if ( rwlock->queuedWriterCount > 0 ) + { + UInt32 threadId = GetCurrentTaskID(); + + TThreadSlot* threadSlot = rwl_impl_get_queued_writer_slot( rwlock, threadId ); + if ( threadSlot == GUCEF_NULL ) + return 0; + + --threadSlot->reentrancyCount; + + if ( 0 > threadSlot->reentrancyCount ) + { + /* we should never get here if your push/pop counts match */ + GUCEF_ASSERT_ALWAYS; + } + + if ( 0 == threadSlot->reentrancyCount ) + rwl_impl_free_queued_writer_slot( rwlock, threadId ); + + GUCEF_END; + return 1; + } + else + { + /* we should never get here if your push/pop counts match */ + GUCEF_ASSERT_ALWAYS; + GUCEF_END; + return 0; + } +} + +/*--------------------------------------------------------------------------*/ + GUCEF_MT_PRIVATE_C char rwl_impl_has_foreign_reader( TRWLock* rwlock ) @@ -309,6 +458,82 @@ rwl_impl_has_foreign_reader( TRWLock* rwlock ) /*--------------------------------------------------------------------------*/ +GUCEF_MT_PRIVATE_C +char +rwl_impl_has_special_access_for_multiple_readers_escalating_to_writers( TRWLock* rwlock ) +{ + GUCEF_BEGIN; + + UInt32 calledThreadId = GetCurrentTaskID(); + + /* + * We need to verify if ALL active readers are also queued writers + * no exceptions allowed. If true then that tells you that all those readers are waiting for the other readers to finish + * in order to grab the write lock. That will never happen since they are all waiting on eachohter. + * To eleviate this situation we will pick a winner which will be allowed to proceed, being granted 'special access' to + * escalate their read lock to a write lock + */ + + TThreadSlot* lastReaderSlot = GUCEF_NULL; + TThreadSlot* readerSlot = rwlock->activeReaders; + + while ( GUCEF_NULL != readerSlot ) + { + TThreadSlot* matchingQueuedWriter = GUCEF_NULL; + TThreadSlot* queuedWriterSlot = rwlock->queuedWriters; + while ( GUCEF_NULL != queuedWriterSlot ) + { + if ( readerSlot->threadId == queuedWriterSlot->threadId ) + { + matchingQueuedWriter = queuedWriterSlot; + break; + } + queuedWriterSlot = queuedWriterSlot->nextSlot; + } + + if ( GUCEF_NULL == matchingQueuedWriter ) + { + /* + * We have an active reader which does not have a matching queued writer + * This active reader should resolve their work normally and as such no special access will be granted at this time + * No point checking any of the other readers + */ + GUCEF_END; + return 0; + } + + lastReaderSlot = readerSlot; + readerSlot = readerSlot->nextSlot; + } + + /* + * If we get here we are in the scenario we are looking for + * ALL currently active readers are also queued for writing + * + * The way we resolve this and the purpose of this function is to pick a winner + * In order to make it first-come first-serve we will use the first active reader as the winner + * This is because the read attempt presumably is earlier in the workflow and the last reader in our linked list + * is thus our first reader thread regardless of when that thread tried to escalate to being a writer + */ + if ( GUCEF_NULL != lastReaderSlot ) + { + if ( lastReaderSlot->threadId == calledThreadId ) + { + /* + * The calling thread is the first active reader + * You are the winner! + */ + GUCEF_END; + return 1; + } + } + + GUCEF_END; + return 0; +} + +/*--------------------------------------------------------------------------*/ + /** * Function that creates a readers/writers lock data storage struct * writer_overrules is a boolean. When non zero writers will be given @@ -332,6 +557,7 @@ rwl_create( UInt32 writer_overrules ) rwlock->activeWriterCount = 0; rwlock->activeWriterReentrancyCount = 0; rwlock->queuedWriterCount = 0; + rwlock->queuedWriters = GUCEF_NULL; rwlock->writeLockAquisitionInProgress = 0; rwlock->writeLockAquisitionThreadId = 0; rwlock->wpriority = writer_overrules; @@ -339,7 +565,7 @@ rwl_create( UInt32 writer_overrules ) rwlock->datalock = MutexCreate(); rwlock->writerlock = MutexCreate(); - for ( i; ifreeThreadInfo; TThreadSlot* slot = (TThreadSlot*) calloc( 1, sizeof(TThreadSlot) ); @@ -421,6 +647,15 @@ rwl_destroy( TRWLock* rwlock ) } rwlock->activeReaders = GUCEF_NULL; + slot = rwlock->queuedWriters; + while ( GUCEF_NULL != slot ) + { + TThreadSlot* nextSlot = slot->nextSlot; + free( slot ); + slot = nextSlot; + } + rwlock->queuedWriters = GUCEF_NULL; + slot = rwlock->freeThreadInfo; while ( GUCEF_NULL != slot ) { @@ -558,7 +793,9 @@ rwl_active_reader_reentrancy_depth( const TRWLock *rwlock, UInt32 threadId ) UInt32 rwl_queued_writers( const TRWLock* rwlock ) { - if ( GUCEF_NULL != rwlock ) + GUCEF_BEGIN; + + if ( GUCEF_NULL != rwlock ) { do { @@ -566,11 +803,15 @@ rwl_queued_writers( const TRWLock* rwlock ) { UInt32 queuedWriterCount = rwlock->queuedWriterCount; MutexUnlock( rwlock->datalock ); + + GUCEF_END; return queuedWriterCount; } } while ( 0 == rwlock->delflag ); } + + GUCEF_END; return 0; } @@ -579,7 +820,9 @@ rwl_queued_writers( const TRWLock* rwlock ) UInt32 rwl_active_readers( const TRWLock *rwlock ) { - if ( GUCEF_NULL != rwlock ) + GUCEF_BEGIN; + + if ( GUCEF_NULL != rwlock ) { do { @@ -587,11 +830,15 @@ rwl_active_readers( const TRWLock *rwlock ) { UInt32 activeReaderCount = rwlock->activeReaderCount; MutexUnlock( rwlock->datalock ); + + GUCEF_END; return activeReaderCount; } } while ( 0 == rwlock->delflag ); } + + GUCEF_END; return 0; } @@ -1120,7 +1367,14 @@ rwl_reader_transition_to_writer( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) if ( rwl_impl_pop_active_reader( rwlock ) != 0 ) { - rwlock->queuedWriterCount++; + if ( rwl_impl_push_queued_writer( rwlock ) != 0 ) + { + + } + else + { + + } } else { @@ -1136,7 +1390,14 @@ rwl_reader_transition_to_writer( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) */ while ( GUCEF_MUTEX_OPERATION_SUCCESS != MutexLock( rwlock->writerlock, GUCEF_MUTEX_INFINITE_TIMEOUT ) ) {}; rwlock->activeWriterCount++; - rwlock->queuedWriterCount--; + if ( rwl_impl_pop_queued_writer( rwlock ) != 0 ) + { + + } + else + { + + } rwlock->activeWriterReentrancyCount = MutexReentrancy( rwlock->writerlock ); rwlock->lastWriterThreadId = GetCurrentTaskID(); MutexUnlock( rwlock->datalock ); @@ -1197,7 +1458,14 @@ rwl_reader_transition_to_writer( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) rwlock->activeWriterCount++; rwlock->lastWriterThreadId = GetCurrentTaskID(); rwlock->writeLockAquisitionInProgress = 0; - rwlock->queuedWriterCount--; + if ( rwl_impl_pop_queued_writer( rwlock ) != 0 ) + { + + } + else + { + + } rwlock->activeWriterReentrancyCount = MutexReentrancy( rwlock->writerlock ); MutexUnlock( rwlock->datalock ); GUCEF_END; @@ -1211,7 +1479,14 @@ rwl_reader_transition_to_writer( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) */ GUCEF_ASSERT_ALWAYS; rwlock->writeLockAquisitionInProgress = 0; - rwlock->queuedWriterCount--; + if ( rwl_impl_pop_queued_writer( rwlock ) != 0 ) + { + + } + else + { + + } MutexUnlock( rwlock->writerlock ); GUCEF_END; return GUCEF_RWLOCK_OPERATION_FAILED; @@ -1228,7 +1503,14 @@ rwl_reader_transition_to_writer( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) */ if ( MutexLock( rwlock->datalock, GUCEF_MUTEX_INFINITE_TIMEOUT ) == GUCEF_MUTEX_OPERATION_SUCCESS ) { - --rwlock->queuedWriterCount; + if ( rwl_impl_pop_queued_writer( rwlock ) != 0 ) + { + + } + else + { + + } rwlock->writeLockAquisitionInProgress = 0; MutexUnlock( rwlock->datalock ); break; @@ -1264,7 +1546,14 @@ rwl_reader_transition_to_writer( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) */ if ( MutexLock( rwlock->datalock, GUCEF_MUTEX_INFINITE_TIMEOUT ) == GUCEF_MUTEX_OPERATION_SUCCESS ) { - --rwlock->queuedWriterCount; + if ( rwl_impl_pop_queued_writer( rwlock ) != 0 ) + { + + } + else + { + + } MutexUnlock( rwlock->datalock ); break; } @@ -1335,7 +1624,14 @@ rwl_reader_transition_to_writer( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) rwlock->lastWriterThreadId = GetCurrentTaskID(); rwlock->writeLockAquisitionInProgress = 0; rwlock->activeWriterReentrancyCount = MutexReentrancy( rwlock->writerlock ); - rwlock->queuedWriterCount--; + if ( rwl_impl_pop_queued_writer( rwlock ) != 0 ) + { + + } + else + { + + } MutexUnlock( rwlock->datalock ); GUCEF_END; return GUCEF_RWLOCK_OPERATION_SUCCESS; @@ -1348,7 +1644,14 @@ rwl_reader_transition_to_writer( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) */ GUCEF_ASSERT_ALWAYS; rwlock->writeLockAquisitionInProgress = 0; - rwlock->queuedWriterCount--; + if ( rwl_impl_pop_queued_writer( rwlock ) != 0 ) + { + + } + else + { + + } rwlock->activeWriterReentrancyCount = MutexReentrancy( rwlock->writerlock ); MutexUnlock( rwlock->writerlock ); GUCEF_END; @@ -1366,7 +1669,14 @@ rwl_reader_transition_to_writer( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) */ if ( MutexLock( rwlock->datalock, GUCEF_MUTEX_INFINITE_TIMEOUT ) == GUCEF_MUTEX_OPERATION_SUCCESS ) { - --rwlock->queuedWriterCount; + if ( rwl_impl_pop_queued_writer( rwlock ) != 0 ) + { + + } + else + { + + } rwlock->writeLockAquisitionInProgress = 0; MutexUnlock( rwlock->datalock ); break; @@ -1402,7 +1712,14 @@ rwl_reader_transition_to_writer( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) */ if ( MutexLock( rwlock->datalock, GUCEF_MUTEX_INFINITE_TIMEOUT ) == GUCEF_MUTEX_OPERATION_SUCCESS ) { - --rwlock->queuedWriterCount; + if ( rwl_impl_pop_queued_writer( rwlock ) != 0 ) + { + + } + else + { + + } MutexUnlock( rwlock->datalock ); break; } @@ -1445,7 +1762,14 @@ rwl_writer_start( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) return GUCEF_RWLOCK_OPERATION_FAILED; } - ++rwlock->queuedWriterCount; + if ( rwl_impl_push_queued_writer( rwlock ) != 0 ) + { + + } + else + { + + } if ( 0 < rwlock->activeWriterCount ) { /* @@ -1459,7 +1783,14 @@ rwl_writer_start( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) * The caller is already the currently active writer thread * allow it to proceed since we already have write access anyway */ - --rwlock->queuedWriterCount; + if ( rwl_impl_pop_queued_writer( rwlock ) != 0 ) + { + + } + else + { + + } do { lockResult = MutexLock( rwlock->writerlock, GUCEF_MT_SHORT_LOCK_TIMEOUT ); @@ -1521,7 +1852,14 @@ rwl_writer_start( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) case GUCEF_MUTEX_ABANDONED : { rwlock->activeWriterCount++; - rwlock->queuedWriterCount--; + if ( rwl_impl_pop_queued_writer( rwlock ) != 0 ) + { + + } + else + { + + } rwlock->lastWriterThreadId = GetCurrentTaskID(); rwlock->activeWriterReentrancyCount = MutexReentrancy( rwlock->writerlock ); MutexUnlock( rwlock->datalock ); @@ -1560,14 +1898,19 @@ rwl_writer_start( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) MutexUnlock( rwlock->datalock ); /* - * Wait for a moment with no active readers + * Wait for the right moment with high likelyhood of getting the lock * We dont care about queued readers since writers take priority * The queued writer counter/flag will prevent new readers from becoming active */ do { - if ( 1 >= rwlock->activeReaderCount && - 0 == rwlock->writeLockAquisitionInProgress ) + if ( ( rwlock->queuedWriterCount > 0 && + rwlock->activeReaderCount > 0 && + rwlock->activeReaderCount <= rwlock->queuedWriterCount && /* first 5 checks are for the special secenario where all active readers are trying to get a write lock */ + 0 == rwlock->activeWriterCount && + 0 == rwlock->writeLockAquisitionInProgress ) || + ( 1 >= rwlock->activeReaderCount && /* last 2 are for the more commen case where the reader is trying to escalate to a write lock implicitly */ + 0 == rwlock->writeLockAquisitionInProgress ) ) { /* * Reverify with a data lock @@ -1585,6 +1928,7 @@ rwl_writer_start( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) if ( 0 == rwlock->writeLockAquisitionInProgress ) { UInt32 hasOtherReaderThreadsActive = 0; + char specialAccessGranted = 0; if ( 1 == rwlock->activeReaderCount ) { /* @@ -1597,12 +1941,26 @@ rwl_writer_start( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) else { hasOtherReaderThreadsActive = 0 != rwlock->activeReaderCount; + + /* + * Check for possibility of the special secenario where all active readers are trying to + * get a write lock and they end up waiting on eachother to stop reading + */ + if ( rwlock->queuedWriterCount > 0 && + rwlock->activeReaderCount > 0 && + 0 == rwlock->activeWriterCount && + rwlock->activeReaderCount <= rwlock->queuedWriterCount ) + { + /* its possible: do a more comprehensive check */ + specialAccessGranted = rwl_impl_has_special_access_for_multiple_readers_escalating_to_writers( rwlock ); + } } - if ( 0 == hasOtherReaderThreadsActive ) + if ( 0 == hasOtherReaderThreadsActive || 0 != specialAccessGranted ) { /* * We found a moment without any active readers at all, excluding this thread + * or special access was granted * Now we just have to wait for other writers if any * * The 'queuedWriterCount' already blocks new active readers but for consistency we also @@ -1647,7 +2005,14 @@ rwl_writer_start( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) */ MutexUnlock( rwlock->writerlock ); rwlock->writeLockAquisitionInProgress = 0; - rwlock->queuedWriterCount--; + if ( rwl_impl_pop_queued_writer( rwlock ) != 0 ) + { + + } + else + { + + } MutexUnlock( rwlock->datalock ); return GUCEF_RWLOCK_OPERATION_FAILED; } @@ -1659,7 +2024,14 @@ rwl_writer_start( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) rwlock->activeWriterCount++; rwlock->lastWriterThreadId = GetCurrentTaskID(); rwlock->writeLockAquisitionInProgress = 0; - rwlock->queuedWriterCount--; + if ( rwl_impl_pop_queued_writer( rwlock ) != 0 ) + { + + } + else + { + + } rwlock->activeWriterReentrancyCount = MutexReentrancy( rwlock->writerlock ); MutexUnlock( rwlock->datalock ); return GUCEF_RWLOCK_OPERATION_SUCCESS; @@ -1689,7 +2061,14 @@ rwl_writer_start( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) * before exiting and reporting a wait timeout */ rwlock->writeLockAquisitionInProgress = 0; - rwlock->queuedWriterCount--; + if ( rwl_impl_pop_queued_writer( rwlock ) != 0 ) + { + + } + else + { + + } MutexUnlock( rwlock->datalock ); return GUCEF_RWLOCK_WAIT_TIMEOUT; } @@ -1701,7 +2080,14 @@ rwl_writer_start( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) */ GUCEF_ASSERT_ALWAYS; rwlock->writeLockAquisitionInProgress = 0; - rwlock->queuedWriterCount--; + if ( rwl_impl_pop_queued_writer( rwlock ) != 0 ) + { + + } + else + { + + } MutexUnlock( rwlock->writerlock ); return GUCEF_RWLOCK_OPERATION_FAILED; } @@ -1719,7 +2105,14 @@ rwl_writer_start( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) * before exiting and reporting a wait timeout */ rwlock->writeLockAquisitionInProgress = 0; - rwlock->queuedWriterCount--; + if ( rwl_impl_pop_queued_writer( rwlock ) != 0 ) + { + + } + else + { + + } MutexUnlock( rwlock->datalock ); return GUCEF_RWLOCK_OPERATION_FAILED; } @@ -1731,7 +2124,14 @@ rwl_writer_start( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) */ GUCEF_ASSERT_ALWAYS; rwlock->writeLockAquisitionInProgress = 0; - rwlock->queuedWriterCount--; + if ( rwl_impl_pop_queued_writer( rwlock ) != 0 ) + { + + } + else + { + + } return GUCEF_RWLOCK_OPERATION_FAILED; } } @@ -1747,7 +2147,14 @@ rwl_writer_start( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) */ if ( MutexLock( rwlock->datalock, GUCEF_MUTEX_INFINITE_TIMEOUT ) == GUCEF_MUTEX_OPERATION_SUCCESS ) { - --rwlock->queuedWriterCount; + if ( rwl_impl_pop_queued_writer( rwlock ) != 0 ) + { + + } + else + { + + } rwlock->writeLockAquisitionInProgress = 0; MutexUnlock( rwlock->datalock ); break; @@ -1799,7 +2206,14 @@ rwl_writer_start( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) */ if ( MutexLock( rwlock->datalock, GUCEF_MUTEX_INFINITE_TIMEOUT ) == GUCEF_MUTEX_OPERATION_SUCCESS ) { - --rwlock->queuedWriterCount; + if ( rwl_impl_pop_queued_writer( rwlock ) != 0 ) + { + + } + else + { + + } MutexUnlock( rwlock->datalock ); break; } @@ -1872,7 +2286,14 @@ rwl_writer_start( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) */ MutexUnlock( rwlock->writerlock ); rwlock->writeLockAquisitionInProgress = 0; - rwlock->queuedWriterCount--; + if ( rwl_impl_pop_queued_writer( rwlock ) != 0 ) + { + + } + else + { + + } MutexUnlock( rwlock->datalock ); return GUCEF_RWLOCK_OPERATION_FAILED; } @@ -1884,7 +2305,14 @@ rwl_writer_start( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) rwlock->activeWriterCount++; rwlock->lastWriterThreadId = GetCurrentTaskID(); rwlock->writeLockAquisitionInProgress = 0; - rwlock->queuedWriterCount--; + if ( rwl_impl_pop_queued_writer( rwlock ) != 0 ) + { + + } + else + { + + } MutexUnlock( rwlock->datalock ); return GUCEF_RWLOCK_OPERATION_SUCCESS; } @@ -1912,7 +2340,14 @@ rwl_writer_start( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) */ if ( MutexLock( rwlock->datalock, GUCEF_MUTEX_INFINITE_TIMEOUT ) == GUCEF_MUTEX_OPERATION_SUCCESS ) { - --rwlock->queuedWriterCount; + if ( rwl_impl_pop_queued_writer( rwlock ) != 0 ) + { + + } + else + { + + } rwlock->writeLockAquisitionInProgress = 0; MutexUnlock( rwlock->datalock ); break; @@ -1947,7 +2382,14 @@ rwl_writer_start( TRWLock* rwlock, UInt32 lockWaitTimeoutInMs ) */ if ( MutexLock( rwlock->datalock, GUCEF_MUTEX_INFINITE_TIMEOUT ) == GUCEF_MUTEX_OPERATION_SUCCESS ) { - --rwlock->queuedWriterCount; + if ( rwl_impl_pop_queued_writer( rwlock ) != 0 ) + { + + } + else + { + + } MutexUnlock( rwlock->datalock ); break; } @@ -2158,7 +2600,6 @@ rwl_signal_thread_killed( TRWLock *rwlock, UInt32 killedThreadId ) { if ( readerSlot->threadId == killedThreadId ) { - readerSlot->isQueuedReader = 0; readerSlot->reentrancyCount = 0; readerSlot->threadId = 0; if ( GUCEF_NULL != prevSlot ) @@ -2176,6 +2617,35 @@ rwl_signal_thread_killed( TRWLock *rwlock, UInt32 killedThreadId ) } } } + + /* + * Check if the killed thread was a queued writer + */ + if ( rwlock->queuedWriterCount > 0 ) + { + TThreadSlot* prevSlot = GUCEF_NULL; + TThreadSlot* queuedWriterSlot = rwlock->queuedWriters; + while ( GUCEF_NULL != queuedWriterSlot ) + { + if ( queuedWriterSlot->threadId == killedThreadId ) + { + queuedWriterSlot->reentrancyCount = 0; + queuedWriterSlot->threadId = 0; + if ( GUCEF_NULL != prevSlot ) + prevSlot->nextSlot = queuedWriterSlot->nextSlot; + else + rwlock->activeReaders = queuedWriterSlot->nextSlot; + queuedWriterSlot->nextSlot = rwlock->freeThreadInfo; + rwlock->freeThreadInfo = queuedWriterSlot; + break; + } + else + { + prevSlot = queuedWriterSlot; + queuedWriterSlot = queuedWriterSlot->nextSlot; + } + } + } MutexUnlock( rwlock->datalock ); GUCEF_END; diff --git a/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubClientSide.cpp b/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubClientSide.cpp index a53ef04c4..80b826ef3 100644 --- a/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubClientSide.cpp +++ b/platform/gucefPUBSUB/src/gucefPUBSUB_CPubSubClientSide.cpp @@ -272,7 +272,7 @@ CPubSubClientSide::TopicLink::AddInFlightMsgs( const CPubSubClientTopic::TPublis size_t max = SMALLEST( publishActionIds.size(), msgs.size() ); if ( publishActionIds.size() != msgs.size() ) { - GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "PubSubClientSide(" + CORE::ToString( this ) + "):TopicLink:AddInFlightMsgs: Nr of publishActionIds (" + CORE::ToString( publishActionIds.size() ) + ") does not match Nr of msgs (" + CORE::ToString( msgs.size() ) + "). Will proceed best effort but this will likely cause issues" ); } @@ -296,7 +296,7 @@ CPubSubClientSide::TopicLink::AddInFlightMsgs( const CPubSubClientTopic::TPublis size_t max = SMALLEST( publishActionIds.size(), msgs.size() ); if ( publishActionIds.size() != msgs.size() ) { - GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "PubSubClientSide(" + CORE::ToString( this ) + "):TopicLink:AddInFlightMsgs: Nr of publishActionIds (" + CORE::ToString( publishActionIds.size() ) + ") does not match Nr of msgs (" + CORE::ToString( msgs.size() ) + "). Will proceed best effort but this will likely cause issues" ); } @@ -700,7 +700,7 @@ CPubSubClientSide::OnTopicAccessDestroyed( CORE::CNotifier* notifier , if ( pubsubClient != m_pubsubClient ) { // the pointer is suspect, this should not happen - GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):OnTopicAccessDestroyed: client pointer doesnt match. Event=" + CORE::ToString( pubsubClient ) + " Ours=" + CORE::ToString( m_pubsubClient.GetPointerAlways() ) ); return; } @@ -716,7 +716,7 @@ CPubSubClientSide::OnTopicAccessDestroyed( CORE::CNotifier* notifier , { TopicLinkPtr topicLink = (*i).second; - GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):OnTopicAccessDestroyed: Removing topic link info for destroyed topic " + topicAccess->GetTopicName() + ". There are " + CORE::ToString( topicLink->GetTotalMsgsInFlight() ) + " messages left in-flight" ); @@ -726,7 +726,7 @@ CPubSubClientSide::OnTopicAccessDestroyed( CORE::CNotifier* notifier , } else { - GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):OnTopicAccessDestroyed: Cannot find topic link info for destroyed topic " + topicAccess->GetTopicName() ); } @@ -809,7 +809,7 @@ CPubSubClientSide::OnTopicsAccessAutoDestroyed( CORE::CNotifier* notifier , { TopicLinkPtr topicLink = (*i).second; - GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):OnTopicsAccessAutoDestroyed: Removing topic link info for destroyed topic " + topicAccess->GetTopicName() + ". There are " + CORE::ToString( topicLink->GetTotalMsgsInFlight() ) + " messages left in-flight" ); @@ -821,7 +821,7 @@ CPubSubClientSide::OnTopicsAccessAutoDestroyed( CORE::CNotifier* notifier , } else { - GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):OnTopicAccessDestroyed: Cannot find topic link info for destroyed topic " + topicAccess->GetTopicName() ); } } @@ -939,13 +939,13 @@ CPubSubClientSide::BroadcastPublishMsgsSync( const TMsgCollection& msgs ) if ( !totalSuccess ) { - GUCEF_WARNING_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_WARNING_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):BroadcastPublishMsgsSync: Failures encountered broadcasting messages to " + CORE::ToString( topicsToPublishOn ) + " successfully broadcast to " + CORE::ToString( topicsPublishedOn ) + " topics" ); } else { - GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):BroadcastPublishMsgsSync: Successfully broadcasted messages to " + CORE::ToString( topicsPublishedOn ) + " topics, out of " + CORE::ToString( topicsToPublishOn ) + " topics available for publishing" ); } @@ -994,14 +994,14 @@ CPubSubClientSide::PublishMsgsSync( const TMsgCollection& msgs , // depending on thread timings ConfigureTopicLink( m_sideSettings, newTopicObj, true ); - GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):PublishMsgsASync: specificTargetTopic passed " + CORE::ToString( specificTargetTopic ) + " was not registered yet, will do so now to instance " + CORE::ToString( newTopicObj.GetPointerAlways() ) + " for topic " + newTopicObj->GetTopicName() ); } else { - GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):PublishMsgsSync: specificTargetTopic passed " + CORE::ToString( specificTargetTopic ) + " seems to have been replaced by a new instance " + CORE::ToString( newTopicObj.GetPointerAlways() ) + " for topic " + newTopicObj->GetTopicName() ); @@ -1010,7 +1010,7 @@ CPubSubClientSide::PublishMsgsSync( const TMsgCollection& msgs , } else { - GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):PublishMsgsSync: specificTargetTopic passed " + CORE::ToString( specificTargetTopic ) + " is not a known topic nor does a topic by that name exist: " + specificTargetTopic->GetTopicName() ); return false; @@ -1019,7 +1019,7 @@ CPubSubClientSide::PublishMsgsSync( const TMsgCollection& msgs , } catch ( const std::exception& e ) { - GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):PublishMsgsSync: exception occured: " + CORE::ToString( e.what() ) ); } @@ -1332,19 +1332,19 @@ CPubSubClientSide::PublishMsgs( const CPubSubClientTopic::TPubSubMsgsRefVector& // depending on thread timings ConfigureTopicLink( m_sideSettings, newTopicObj, true ); - GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):PublishMsgs: specificTargetTopic passed " + CORE::ToString( specificTargetTopic ) + " was not registered yet, will do so now to instance " + CORE::ToString( newTopicObj.GetPointerAlways() ) + " for topic " + newTopicObj->GetTopicName() ); i = m_topicPtrs.find( specificTargetTopic ); - if ( i == m_topicPtrs.end() ) + if ( i != m_topicPtrs.end() ) { topicLink = (*i).second; } else { - GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):PublishMsgs: specificTargetTopic passed " + CORE::ToString( specificTargetTopic ) + " was not registered yet, failed to register instance " + CORE::ToString( newTopicObj.GetPointerAlways() ) + " for topic " + newTopicObj->GetTopicName() ); @@ -1354,7 +1354,7 @@ CPubSubClientSide::PublishMsgs( const CPubSubClientTopic::TPubSubMsgsRefVector& } else { - GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):PublishMsgs: specificTargetTopic passed " + CORE::ToString( specificTargetTopic ) + " seems to have been replaced by a new instance " + CORE::ToString( newTopicObj.GetPointerAlways() ) + " for topic " + newTopicObj->GetTopicName() ); @@ -1362,7 +1362,7 @@ CPubSubClientSide::PublishMsgs( const CPubSubClientTopic::TPubSubMsgsRefVector& } else { - GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):PublishMsgs: specificTargetTopic passed " + CORE::ToString( specificTargetTopic ) + " is not a known topic nor does a topic by that name exist: " + specificTargetTopic->GetTopicName() ); return false; @@ -1845,7 +1845,7 @@ CPubSubClientSide::TopicLink::RetryPublishFailedMsgs( void ) } if ( discardedMsgs > 0 ) { - GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "TopicLink(" + CORE::PointerToString( this ) + + GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "TopicLink(" + CORE::ToString( this ) + "):RetryPublishFailedMsgs: For topic " + topic->GetTopicName() + " we discarded a total of " + CORE::ToString( discardedMsgs ) + " messages due to exceeding the max retry attempts and/or sanity checks" ); } @@ -2114,7 +2114,7 @@ CPubSubClientSide::TopicLink::OnPubSubTopicMsgsReceived( CORE::CNotifier* notifi { // Better luck next batch, we will potentially not be able to use bookmarking to cover the current batch in isolation // if derivation of a bookmark from a message is not supported - GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):OnPubSubTopicMsgsReceived: Current bookmark is temp not available for the current message batch" ); break; } @@ -2122,7 +2122,7 @@ CPubSubClientSide::TopicLink::OnPubSubTopicMsgsReceived( CORE::CNotifier* notifi { // We should not get here, if we do there is some logic error in feature checking here or in // bookmark management in the backend code - GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):OnPubSubTopicMsgsReceived: Current bookmark obtained from backend came back as not applicable. This should not happen based on the feature set!" ); break; } @@ -2130,7 +2130,7 @@ CPubSubClientSide::TopicLink::OnPubSubTopicMsgsReceived( CORE::CNotifier* notifi { // We should not get here, if we do there is some logic error in feature checking here or in // bookmark management in the backend code - GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):OnPubSubTopicMsgsReceived: Current bookmark obtained from backend came back as not initialized. This should not happen!" ); break; } @@ -2151,7 +2151,7 @@ CPubSubClientSide::TopicLink::OnPubSubTopicMsgsReceived( CORE::CNotifier* notifi // Note that entries added here are removed at a later time via calls to CleanupMsgBatchBookmarksUpTo() bookmarksOnMsgReceived[ lastMsgRef->GetReceiveActionId() ] = currentBookmark; - GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):OnPubSubTopicMsgsReceived: Retaining bookmark at recieveActionId " + CORE::ToString( lastMsgRef->GetReceiveActionId() ) ); } } @@ -2172,7 +2172,7 @@ CPubSubClientSide::TopicLink::OnPubSubTopicMsgsReceived( CORE::CNotifier* notifi } catch ( const std::exception& e ) { - GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + "):OnPubSubTopicMsgsReceived: exception: " + CORE::CString( e.what() ) ); + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):OnPubSubTopicMsgsReceived: exception: " + CORE::CString( e.what() ) ); } } @@ -2593,7 +2593,7 @@ CPubSubClientSide::TopicLink::OnPubSubTopicMsgsPublishFailure( CORE::CNotifier* msgTrackingEntry.readyToAckPublishSuccessButAckFailed = false; publishFailedMsgs.insert( publishActionId ); - GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):OnPubSubTopicMsgsPublishFailure: Message with publishActionId of " + CORE::ToString( publishActionId ) + " queued for retry, publish failed" + ". receiveActionId=" + CORE::ToString( msgTrackingEntry.msg->GetReceiveActionId() ) ); } @@ -2604,7 +2604,7 @@ CPubSubClientSide::TopicLink::OnPubSubTopicMsgsPublishFailure( CORE::CNotifier* // This should not happen // Only understandable reasons are bad eventing from the backend or if the in-flight time-out is configured too aggressively - GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "PubSubClientSide(" + CORE::ToString( this ) + "):OnPubSubTopicMsgsPublishFailure: Failed to locate original in-flight message related to publishActionId " + CORE::ToString( (*n) ) ); } ++n; @@ -2697,7 +2697,7 @@ CPubSubClientSide::DisconnectPubSubClient( bool destroyClient ) if ( !m_pubsubClient->Disconnect() ) { - GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):ConnectPubSubClient: Failed to disconnect the pub-sub client" ); return false; } @@ -2898,7 +2898,7 @@ CPubSubClientSide::ConnectPubSubClientTopic( CPubSubClientTopic& topic bool subscribeSuccess = false; if ( !clientFeatures.supportsBookmarkingConcept ) // We have no control bookmark wise with this backend, just subscribe and hope for the best { - GUCEF_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):ConnectPubSubClientTopic: Bookmarking concept is not supported by the backend, we will attempt to subscribe as-is" ); subscribeSuccess = topic.Subscribe(); @@ -2906,7 +2906,7 @@ CPubSubClientSide::ConnectPubSubClientTopic( CPubSubClientTopic& topic else if ( clientFeatures.supportsServerSideBookmarkPersistance ) // first preference is always backend managed bookmarking if available { - GUCEF_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):ConnectPubSubClientTopic: Bookmarking concept is natively supported and managed by the backend independently and we will attempt to subscribe as such" ); subscribeSuccess = topic.Subscribe(); @@ -2918,7 +2918,7 @@ CPubSubClientSide::ConnectPubSubClientTopic( CPubSubClientTopic& topic CPubSubBookmark bookmark; if ( GetLatestBookmark( topic, bookmark ) ) { - GUCEF_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):ConnectPubSubClientTopic: Bookmarking concept is supported by the backend via a client-side bookmark. Bookmark type=" + CORE::ToString( bookmark.GetBookmarkType() ) + ". Bookmark=" + bookmark.GetBookmarkData().AsString() ); subscribeSuccess = topic.SubscribeStartingAtBookmark( bookmark ); @@ -2927,7 +2927,7 @@ CPubSubClientSide::ConnectPubSubClientTopic( CPubSubClientTopic& topic { // GUCEF_ERROR_LOG // This is not fully supported yet, make it a non-error log statement for now - GUCEF_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):ConnectPubSubClientTopic: Bookmarking concept is supported by the backend via a client-side message index marker but we failed at obtaining the last used message index" ); if ( pubSubSideSettings.subscribeWithoutBookmarkIfNoneIsPersisted ) @@ -2935,7 +2935,7 @@ CPubSubClientSide::ConnectPubSubClientTopic( CPubSubClientTopic& topic subscribeSuccess = topic.Subscribe(); if ( !subscribeSuccess ) { - GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):ConnectPubSubClientTopic: Also unable to subscribe using the default bookmark as a fallback" ); return false; } @@ -2947,7 +2947,7 @@ CPubSubClientSide::ConnectPubSubClientTopic( CPubSubClientTopic& topic if ( !subscribeSuccess ) { - GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):ConnectPubSubClient: Failed to subscribe to topic: " + topic.GetTopicName() ); return false; } @@ -3042,12 +3042,12 @@ CPubSubClientSide::PerformPubSubClientSetup( bool hardReset ) pubSubConfig.journal = CPubSubGlobal::Instance()->GetPubSubJournalFactory().Create( pubSubConfig.journalConfig.journalType, pubSubConfig.journalConfig ); if ( pubSubConfig.journal.IsNULL() ) { - GUCEF_ERROR_LOG( CORE::LOGLEVEL_VERY_IMPORTANT, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_ERROR_LOG( CORE::LOGLEVEL_VERY_IMPORTANT, "PubSubClientSide(" + CORE::ToString( this ) + "):PerformPubSubClientSetup: Failed to create a pub-sub journal of type \"" + pubSubConfig.journalConfig.journalType + "\". No journaling capability" ); } else { - GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):PerformPubSubClientSetup: Created a pub-sub journal of type \"" + pubSubConfig.journalConfig.journalType + "\" for side with id " + GetSideId() ); } @@ -3058,7 +3058,7 @@ CPubSubClientSide::PerformPubSubClientSetup( bool hardReset ) if ( m_pubsubClient.IsNULL() ) { - GUCEF_ERROR_LOG( CORE::LOGLEVEL_CRITICAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_ERROR_LOG( CORE::LOGLEVEL_CRITICAL, "PubSubClientSide(" + CORE::ToString( this ) + "):PerformPubSubClientSetup: Failed to create a pub-sub client of type \"" + pubSubConfig.pubsubClientType + "\". Cannot proceed" ); return false; } @@ -3075,7 +3075,7 @@ CPubSubClientSide::PerformPubSubClientSetup( bool hardReset ) // Refresh our client features cache if ( !m_pubsubClient->GetSupportedFeatures( m_clientFeatures ) ) { - GUCEF_ERROR_LOG( CORE::LOGLEVEL_CRITICAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_ERROR_LOG( CORE::LOGLEVEL_CRITICAL, "PubSubClientSide(" + CORE::ToString( this ) + "):PerformPubSubClientSetup: Failed to obtain supported a pub-sub client supported features" ); return false; } @@ -3087,7 +3087,7 @@ CPubSubClientSide::PerformPubSubClientSetup( bool hardReset ) if ( clientSetupWasNeeded ) { - GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):PerformPubSubClientSetup: Setup completed for pub-sub client of type \"" + pubSubConfig.pubsubClientType + "\" for side with id " + GetSideId() ); @@ -3120,7 +3120,7 @@ CPubSubClientSide::ConnectPubSubClient( bool reset ) if ( !IsPubSubClientInfraReadyToConnect() ) { - GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):ConnectPubSubClient: Deferring pubsub client connect awaiting prereqs" ); m_pubsubClientReconnectTimer.SetEnabled( true ); @@ -3137,7 +3137,7 @@ CPubSubClientSide::ConnectPubSubClient( bool reset ) if ( GUCEF_NULL != m_flowRouter ) m_sideSettings.needToTrackInFlightPublishedMsgsForAck = m_flowRouter->IsTrackingInFlightPublishedMsgsForAcksNeeded( this ); - GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_BELOW_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_BELOW_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):ConnectPubSubClient: needToTrackInFlightPublishedMsgsForAck=" + CORE::ToString( m_sideSettings.needToTrackInFlightPublishedMsgsForAck ) ); if ( m_pubsubBookmarkPersistence.IsNULL() || reset ) @@ -3147,7 +3147,7 @@ CPubSubClientSide::ConnectPubSubClient( bool reset ) if ( m_pubsubBookmarkPersistence.IsNULL() ) { - GUCEF_ERROR_LOG( CORE::LOGLEVEL_CRITICAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_ERROR_LOG( CORE::LOGLEVEL_CRITICAL, "PubSubClientSide(" + CORE::ToString( this ) + "):ConnectPubSubClient: Failed to create bookmark persistance access of type \"" + pubsubBookmarkPersistenceConfig.bookmarkPersistenceType + "\". Cannot proceed" ); return false; } @@ -3155,7 +3155,7 @@ CPubSubClientSide::ConnectPubSubClient( bool reset ) if ( !m_pubsubClient->Connect( reset ) ) { - GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):ConnectPubSubClient: Failed to connect the pub-sub client" ); return false; } @@ -3203,14 +3203,14 @@ CPubSubClientSide::ConnectPubSubClient( bool reset ) { if ( !(*i)->isOptional ) { - GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):ConnectPubSubClient: Failed to create a pub-sub client topic access for topic \"" + (*i)->topicName + "\". Cannot proceed" ); DisconnectPubSubClient( true ); return false; } else { - GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):ConnectPubSubClient: Unable to create a pub-sub client topic access for optional topic \"" + (*i)->topicName + "\". Proceeding" ); } } @@ -3348,12 +3348,12 @@ CPubSubClientSide::OnTaskStart( CORE::CICloneable* taskData ) { if ( SetCpuAffinityByCpuId( m_sideSettings.cpuAffinityForPubSubThread ) ) { - GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):OnTaskStart: Successfully set a CPU affinity for logical CPU " + CORE::UInt32ToString( m_sideSettings.cpuAffinityForPubSubThread ) ); } else { - GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):OnTaskStart: Failed to set a CPU affinity for logical CPU " + CORE::UInt32ToString( m_sideSettings.cpuAffinityForPubSubThread ) + ". Proceeding without affinity"); } @@ -3366,14 +3366,14 @@ CPubSubClientSide::OnTaskStart( CORE::CICloneable* taskData ) { if ( !ConnectPubSubClient( false ) ) { - GUCEF_WARNING_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_WARNING_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):OnTaskStart: Failed initial connection attempt on task start, will rely on auto-reconnect" ); m_pubsubClientReconnectTimer.SetEnabled( true ); } } else { - GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::PointerToString( this ) + + GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "PubSubClientSide(" + CORE::ToString( this ) + "):OnTaskStart: Deferring pubsub client connect awaiting prereqs" ); m_pubsubClientReconnectTimer.SetEnabled( true ); diff --git a/plugins/PUBSUB/pubsubpluginREDISCLUSTER/src/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClientTopic.cpp b/plugins/PUBSUB/pubsubpluginREDISCLUSTER/src/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClientTopic.cpp index b506028dc..82d76b3bd 100644 --- a/plugins/PUBSUB/pubsubpluginREDISCLUSTER/src/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClientTopic.cpp +++ b/plugins/PUBSUB/pubsubpluginREDISCLUSTER/src/pubsubpluginREDISCLUSTER_CRedisClusterPubSubClientTopic.cpp @@ -318,7 +318,7 @@ CRedisClusterPubSubClientTopic::LoadConfig( const PUBSUB::CPubSubClientTopicConf // note that a connection timeout of 0 means infinite for redis++ if ( m_client->GetConfig().redisConnectionOptionSocketTimeoutInMs > 0 && m_config.redisXReadBlockTimeoutInMs > m_client->GetConfig().redisConnectionOptionSocketTimeoutInMs ) { - GUCEF_WARNING_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):LoadConfig: redisXReadBlockTimeoutInMs (" + CORE::ToString( m_config.redisXReadBlockTimeoutInMs ) + + GUCEF_WARNING_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):LoadConfig: redisXReadBlockTimeoutInMs (" + CORE::ToString( m_config.redisXReadBlockTimeoutInMs ) + ") at the topic level should be configured to a lower value than the client's overall socket timeout (" + CORE::ToString( m_client->GetConfig().redisConnectionOptionSocketTimeoutInMs ) + "). Will clamp the value." ); @@ -367,13 +367,13 @@ CRedisClusterPubSubClientTopic::LoadConfig( const PUBSUB::CPubSubClientTopicConf { if ( !m_client->GetThreadPool()->SetupTask( m_readerThread ) ) { - GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):LoadConfig: Failed to start blocking reader thread for async subscription" ); + GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):LoadConfig: Failed to start blocking reader thread for async subscription" ); return false; } } else { - GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):LoadConfig: blocking reader thread for async subscription was already active, no need to activate" ); + GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):LoadConfig: blocking reader thread for async subscription was already active, no need to activate" ); } } } @@ -452,7 +452,7 @@ CRedisClusterPubSubClientTopic::RedisSendSyncImpl( CORE::UInt64& publishActionId { case REDIS_OK: { - GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):RedisSendSyncImpl: Successfully sent message with " + + GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):RedisSendSyncImpl: Successfully sent message with " + CORE::ToString( kvPairs.size() ) + " fields. MsgID=" + CORE::ToString( reply.str ) ); ++m_redisMsgsTransmitted; @@ -465,7 +465,7 @@ CRedisClusterPubSubClientTopic::RedisSendSyncImpl( CORE::UInt64& publishActionId { ++m_redisErrorReplies; - GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):RedisSendSyncImpl: Error sending message with " + + GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):RedisSendSyncImpl: Error sending message with " + CORE::ToString( kvPairs.size() ) + " fields. Error=" + CORE::ToString( reply.str ) ); break; } @@ -483,12 +483,12 @@ CRedisClusterPubSubClientTopic::RedisSendSyncImpl( CORE::UInt64& publishActionId catch ( const sw::redis::TimeoutError& e ) { ++m_redisTimeoutErrors; - GUCEF_WARNING_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):RedisSendSyncImpl: Redis++ Timeout exception: " + e.what() ); + GUCEF_WARNING_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):RedisSendSyncImpl: Redis++ Timeout exception: " + e.what() ); } catch ( const sw::redis::MovedError& e ) { ++m_redisErrorReplies; - GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):RedisSendSyncImpl: Redis++ MovedError (Redirect failed?) . Current slot: " + + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):RedisSendSyncImpl: Redis++ MovedError (Redirect failed?) . Current slot: " + CORE::ToString( m_redisHashSlot ) + ", new slot: " + CORE::ToString( e.slot() ) + " at node " + e.node().host + ":" + CORE::ToString( e.node().port ) + " exception: " + e.what() ); Reconnect(); @@ -496,7 +496,7 @@ CRedisClusterPubSubClientTopic::RedisSendSyncImpl( CORE::UInt64& publishActionId catch ( const sw::redis::RedirectionError& e ) { ++m_redisErrorReplies; - GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):RedisSendSyncImpl: Redis++ RedirectionError (rebalance? node failure?). Current slot: " + + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):RedisSendSyncImpl: Redis++ RedirectionError (rebalance? node failure?). Current slot: " + CORE::ToString( m_redisHashSlot ) + ", new slot: " + CORE::ToString( e.slot() ) + " at node " + e.node().host + ":" + CORE::ToString( e.node().port ) + " exception: " + e.what() ); @@ -505,22 +505,22 @@ CRedisClusterPubSubClientTopic::RedisSendSyncImpl( CORE::UInt64& publishActionId catch ( const sw::redis::ReplyError& e ) { ++m_redisErrorReplies; - GUCEF_WARNING_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):RedisSendSyncImpl: Redis++ Reply error exception: " + e.what() ); + GUCEF_WARNING_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):RedisSendSyncImpl: Redis++ Reply error exception: " + e.what() ); } catch ( const sw::redis::OomError& e ) { ++m_redisErrorReplies; - GUCEF_WARNING_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):RedisSendSyncImpl: Redis++ OOM exception: " + e.what() ); + GUCEF_WARNING_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):RedisSendSyncImpl: Redis++ OOM exception: " + e.what() ); } catch ( const sw::redis::Error& e ) { ++m_redisErrorReplies; - GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):RedisSendSyncImpl: Redis++ exception: " + e.what() ); + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):RedisSendSyncImpl: Redis++ exception: " + e.what() ); Reconnect(); } catch ( const std::exception& e ) { - GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):RedisSendSyncImpl: exception: " + e.what() ); + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):RedisSendSyncImpl: exception: " + e.what() ); Reconnect(); } @@ -782,7 +782,7 @@ CRedisClusterPubSubClientTopic::RedisRead( void ) if ( REDIS_REPLY_ERROR == type ) { ++m_redisErrorReplies; - GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):RedisRead: Error using pipeline to receive messages. Error=" + CORE::ToString( reply.str ) ); + GUCEF_ERROR_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):RedisRead: Error using pipeline to receive messages. Error=" + CORE::ToString( reply.str ) ); return false; } } @@ -875,7 +875,7 @@ CRedisClusterPubSubClientTopic::RedisRead( void ) ++i; } - GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):RedisRead: read " + + GUCEF_DEBUG_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):RedisRead: read " + CORE::ToString( m_readVars.m_pubsubMsgsRefs.size() ) + " messages" ); // Communicate all the messages received via an event notification @@ -928,17 +928,17 @@ CRedisClusterPubSubClientTopic::RedisRead( void ) catch ( const sw::redis::TimeoutError& e ) { ++m_redisTimeoutErrors; - GUCEF_WARNING_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):RedisRead: Redis++ Timeout exception: " + e.what() ); + GUCEF_WARNING_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):RedisRead: Redis++ Timeout exception: " + e.what() ); } catch ( const sw::redis::OomError& e ) { ++m_redisErrorReplies; - GUCEF_WARNING_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):RedisRead: Redis++ OOM exception: " + e.what() ); + GUCEF_WARNING_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):RedisRead: Redis++ OOM exception: " + e.what() ); } catch ( const sw::redis::MovedError& e ) { ++m_redisErrorReplies; - GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):RedisRead: Redis++ MovedError (Redirect failed?) . Current slot: " + + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):RedisRead: Redis++ MovedError (Redirect failed?) . Current slot: " + CORE::ToString( m_redisHashSlot ) + ", new slot: " + CORE::ToString( e.slot() ) + " at node " + e.node().host + ":" + CORE::ToString( e.node().port ) + " exception: " + e.what() ); Reconnect(); @@ -946,7 +946,7 @@ CRedisClusterPubSubClientTopic::RedisRead( void ) catch ( const sw::redis::RedirectionError& e ) { ++m_redisErrorReplies; - GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):RedisRead: Redis++ RedirectionError (rebalance? node failure?). Current slot: " + + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):RedisRead: Redis++ RedirectionError (rebalance? node failure?). Current slot: " + CORE::ToString( m_redisHashSlot ) + ", new slot: " + CORE::ToString( e.slot() ) + " at node " + e.node().host + ":" + CORE::ToString( e.node().port ) + " exception: " + e.what() ); @@ -955,13 +955,13 @@ CRedisClusterPubSubClientTopic::RedisRead( void ) catch ( const sw::redis::Error& e ) { ++m_redisErrorReplies; - GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):RedisRead: Redis++ exception: " + e.what() ); + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):RedisRead: Redis++ exception: " + e.what() ); Reconnect(); UpdateIsHealthyStatus( false ); } catch ( const std::exception& e ) { - GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):RedisRead: exception: " + e.what() ); + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):RedisRead: exception: " + e.what() ); Reconnect(); UpdateIsHealthyStatus( false ); } @@ -1023,7 +1023,7 @@ CRedisClusterPubSubClientTopic::Disconnect( void ) RedisClusterPubSubClientTopicReaderPtr redisReader = m_readerThread; if ( !redisReader.IsNULL() ) { - GUCEF_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):Disconnect: Beginning cleanup" ); + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):Disconnect: Beginning cleanup" ); lock.EarlyUnlock(); @@ -1043,30 +1043,30 @@ CRedisClusterPubSubClientTopic::Disconnect( void ) // the parent client owns the context, we just null it m_redisContext.Unlink(); - GUCEF_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):Disconnect: Finished cleanup" ); + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):Disconnect: Finished cleanup" ); } } catch ( const sw::redis::TimeoutError& e ) { ++m_redisTimeoutErrors; - GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):Disconnect: Redis++ Timeout exception: " + e.what() ); + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):Disconnect: Redis++ Timeout exception: " + e.what() ); return false; } catch ( const sw::redis::OomError& e ) { ++m_redisErrorReplies; - GUCEF_WARNING_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):Disconnect: Redis++ OOM exception: " + e.what() ); + GUCEF_WARNING_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):Disconnect: Redis++ OOM exception: " + e.what() ); return false; } catch ( const sw::redis::Error& e ) { ++m_redisErrorReplies; - GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):Disconnect: Redis++ exception: " + e.what() ); + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):Disconnect: Redis++ exception: " + e.what() ); return false; } catch ( const std::exception& e ) { - GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):Disconnect: exception: " + e.what() ); + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):Disconnect: exception: " + e.what() ); return false; } @@ -1080,7 +1080,7 @@ CRedisClusterPubSubClientTopic::Reconnect( void ) {GUCEF_TRACE; Disconnect(); - GUCEF_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):RedisReconnect: starting reconnect timer" ); + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):RedisReconnect: starting reconnect timer" ); m_redisReconnectTimer->SetEnabled( true ); } @@ -1118,14 +1118,14 @@ CRedisClusterPubSubClientTopic::CleanupRedisReaderThread( void ) { if ( !m_readerThread->RequestTaskToStop( true ) ) { - GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):CleanupRedisReaderThread: Failed to stop dedicated redis reader thread" ); + GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):CleanupRedisReaderThread: Failed to stop dedicated redis reader thread" ); } m_readerThread.Unlink(); } } catch ( const std::exception& e ) { - GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):CleanupRedisReaderThread: exception: " + e.what() ); + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):CleanupRedisReaderThread: exception: " + e.what() ); } } @@ -1141,14 +1141,14 @@ CRedisClusterPubSubClientTopic::SubscribeImpl( const std::string& readOffset ) { if ( m_redisContext.IsNULL() ) { - GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):SubscribeImpl: No redis context is available" ); + GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):SubscribeImpl: No redis context is available" ); return false; } sw::redis::StringView topicNameSV( m_config.topicName.C_String(), m_config.topicName.Length() ); UInt64 streamLength = m_redisContext->xlen( topicNameSV ); - GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_BELOW_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):SubscribeImpl: current stream length for steam with name \"" + + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_BELOW_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):SubscribeImpl: current stream length for steam with name \"" + m_config.topicName + "\" is " + CORE::ToString( streamLength ) ); if ( 0 == streamLength ) @@ -1179,7 +1179,7 @@ CRedisClusterPubSubClientTopic::SubscribeImpl( const std::string& readOffset ) { if ( !m_client->GetThreadPool()->StartTask( m_readerThread ) ) { - GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):SubscribeImpl: Failed to start blocking reader thread for async subscription" ); + GUCEF_ERROR_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):SubscribeImpl: Failed to start blocking reader thread for async subscription" ); return false; } } @@ -1190,31 +1190,31 @@ CRedisClusterPubSubClientTopic::SubscribeImpl( const std::string& readOffset ) catch ( const sw::redis::TimeoutError& e ) { ++m_redisTimeoutErrors; - GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):SubscribeImpl: Redis++ Timeout exception: " + e.what() ); + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):SubscribeImpl: Redis++ Timeout exception: " + e.what() ); return false; } catch ( const sw::redis::OomError& e ) { ++m_redisErrorReplies; - GUCEF_WARNING_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):SubscribeImpl: Redis++ OOM exception: " + e.what() ); + GUCEF_WARNING_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):SubscribeImpl: Redis++ OOM exception: " + e.what() ); return false; } catch ( const sw::redis::Error& e ) { ++m_redisErrorReplies; - GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):SubscribeImpl: Redis++ exception: " + e.what() ); + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):SubscribeImpl: Redis++ exception: " + e.what() ); Reconnect(); return false; } catch ( const GUCEF::timeout_exception& e ) { - GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):SubscribeImpl: timeout exception" ); + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):SubscribeImpl: timeout exception" ); Reconnect(); return false; } catch ( const std::exception& e ) { - GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):SubscribeImpl: exception: " + e.what() ); + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):SubscribeImpl: exception: " + e.what() ); Reconnect(); return false; } @@ -1397,26 +1397,30 @@ CRedisClusterPubSubClientTopic::InitializeConnectivity( bool reset ) MT::CScopeMutex lock( m_lock ); try { + RedisClusterPtr lastRedisContext = m_redisContext; m_redisContext = m_client->GetRedisContext(); if ( m_redisContext.IsNULL() ) return false; - // The following is not a must-have for connectivity - const RedisNodeMap& nodeMap = m_client->GetRedisNodeMap(); - RedisNodeMap::const_iterator i = nodeMap.begin(); - while ( i != nodeMap.end() ) + if ( lastRedisContext != m_redisContext ) { - if ( (*i).first > m_redisHashSlot ) - break; - if ( m_redisHashSlot >= (*i).first && m_redisHashSlot <= (*i).second.endSlot ) + // The following is not a must-have for connectivity + const RedisNodeMap& nodeMap = m_client->GetRedisNodeMap(); + RedisNodeMap::const_iterator i = nodeMap.begin(); + while ( i != nodeMap.end() ) { - m_redisShardHost = (*i).second.host; - m_redisShardNodeId = (*i).second.nodeId; - GUCEF_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):InitializeConnectivity: Stream \"" + m_config.topicName + - "\" hashes to hash slot " + CORE::ToString( m_redisHashSlot ) + " which lives at " + (*i).second.host.HostnameAndPortAsString() + " with node id " + (*i).second.nodeId ); - break; + if ( (*i).first > m_redisHashSlot ) + break; + if ( m_redisHashSlot >= (*i).first && m_redisHashSlot <= (*i).second.endSlot ) + { + m_redisShardHost = (*i).second.host; + m_redisShardNodeId = (*i).second.nodeId; + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):InitializeConnectivity: Stream \"" + m_config.topicName + + "\" hashes to hash slot " + CORE::ToString( m_redisHashSlot ) + " which lives at " + (*i).second.host.HostnameAndPortAsString() + " with node id " + (*i).second.nodeId ); + break; + } + ++i; } - ++i; } if ( m_config.preferDedicatedConnection ) @@ -1428,7 +1432,7 @@ CRedisClusterPubSubClientTopic::InitializeConnectivity( bool reset ) GUCEF_DELETE m_redisPipeline; m_redisPipeline = GUCEF_NEW sw::redis::Pipeline( m_redisContext->pipeline( cnSV ) ); - GUCEF_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):InitializeConnectivity: Successfully created a Redis pipeline. Hash Slot " + CORE::ToString( m_redisHashSlot ) ); + GUCEF_SYSTEM_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):InitializeConnectivity: Successfully created a Redis pipeline. Hash Slot " + CORE::ToString( m_redisHashSlot ) ); } } @@ -1437,25 +1441,25 @@ CRedisClusterPubSubClientTopic::InitializeConnectivity( bool reset ) catch ( const sw::redis::TimeoutError& e ) { ++m_redisTimeoutErrors; - GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):InitializeConnectivity: Redis++ Timeout exception: " + e.what() ); + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_NORMAL, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):InitializeConnectivity: Redis++ Timeout exception: " + e.what() ); return false; } catch ( const sw::redis::OomError& e ) { ++m_redisErrorReplies; - GUCEF_WARNING_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):InitializeConnectivity: Redis++ OOM exception: " + e.what() ); + GUCEF_WARNING_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):InitializeConnectivity: Redis++ OOM exception: " + e.what() ); return false; } catch ( const sw::redis::Error& e ) { ++m_redisErrorReplies; - GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):InitializeConnectivity: Redis++ exception: " + e.what() ); + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):InitializeConnectivity: Redis++ exception: " + e.what() ); m_redisReconnectTimer->SetEnabled( true ); return false; } catch ( const std::exception& e ) { - GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):InitializeConnectivity: exception: " + e.what() ); + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):InitializeConnectivity: exception: " + e.what() ); m_redisReconnectTimer->SetEnabled( true ); return false; } @@ -1555,20 +1559,20 @@ CRedisClusterPubSubClientTopic::GetRedisClusterNodeMap( RedisNodeMap& nodeMap ) } catch ( const sw::redis::TimeoutError& e ) { - GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):GetRedisClusterNodeMap: Redis++ Timeout exception: " + e.what() ); + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):GetRedisClusterNodeMap: Redis++ Timeout exception: " + e.what() ); ++m_redisTimeoutErrors; return false; } catch ( const sw::redis::Error& e ) { - GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):GetRedisClusterNodeMap: Redis++ exception: " + e.what() ); + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):GetRedisClusterNodeMap: Redis++ exception: " + e.what() ); ++m_redisErrorReplies; Reconnect(); return false; } catch ( const std::exception& e ) { - GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::PointerToString( this ) + "):GetRedisClusterNodeMap: exception: " + e.what() ); + GUCEF_EXCEPTION_LOG( CORE::LOGLEVEL_IMPORTANT, "RedisClusterPubSubClientTopic(" + CORE::ToString( this ) + "):GetRedisClusterNodeMap: exception: " + e.what() ); Reconnect(); return false; }