From d6dcd8c76bd6cb49cefd69e56a04595a57d0c80d Mon Sep 17 00:00:00 2001 From: Gavin Halliday Date: Wed, 6 Nov 2024 10:14:31 +0000 Subject: [PATCH 1/3] HPCC-32954 Add unit tests for the jobqueue Signed-off-by: Gavin Halliday --- common/workunit/wujobq.cpp | 14 ++ common/workunit/wujobq.hpp | 1 + testing/unittests/CMakeLists.txt | 2 + testing/unittests/dalitests.cpp | 385 +++++++++++++++++++++++++++++++ 4 files changed, 402 insertions(+) diff --git a/common/workunit/wujobq.cpp b/common/workunit/wujobq.cpp index 47a35e8f1dd..cd1f0d5b330 100644 --- a/common/workunit/wujobq.cpp +++ b/common/workunit/wujobq.cpp @@ -1264,6 +1264,20 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue } + IJobQueueItem *dequeue(int minPrio, unsigned timeout, unsigned prioritytransitiondelay) override + { + Owned item; + if (prioritytransitiondelay) + { + unsigned timeout = prioritytransitiondelay; + bool usePrevPrio = true; + item.setown(dodequeue(minPrio, timeout, usePrevPrio, nullptr)); + } + if (!item) + item.setown(dodequeue(minPrio, timeout-prioritytransitiondelay, false, nullptr)); + return item.getClear(); + } + void placeonqueue(sQueueData &qd, IJobQueueItem *qitem,unsigned idx) // takes ownership of qitem { Owned qi = qitem; diff --git a/common/workunit/wujobq.hpp b/common/workunit/wujobq.hpp index c3209904a50..4fdcda7a837 100644 --- a/common/workunit/wujobq.hpp +++ b/common/workunit/wujobq.hpp @@ -120,6 +120,7 @@ interface IJobQueue: extends IJobQueueConst virtual void connect(bool validateitemsessions)=0; // must be called before dequeueing // validateitemsessions ensures that all queue items have running session virtual IJobQueueItem *dequeue(unsigned timeout=INFINITE)=0; + virtual IJobQueueItem *dequeue(int minPrio, unsigned timeout, unsigned prioritytransitiondelay)=0; virtual void disconnect()=0; // signal no longer wil be dequeing (optional - done automatically on release) virtual void getStats(unsigned &connected,unsigned &waiting, unsigned &enqueued)=0; // this not quick as validates clients still running virtual bool waitStatsChange(unsigned timeout)=0; diff --git a/testing/unittests/CMakeLists.txt b/testing/unittests/CMakeLists.txt index 59498fb0d2f..ee8278a30c3 100644 --- a/testing/unittests/CMakeLists.txt +++ b/testing/unittests/CMakeLists.txt @@ -78,6 +78,7 @@ include_directories ( ./../../dali/base ./../../system/security/shared ./../../common/deftype + ./../../common/workunit ./../../system/security/cryptohelper ./../../configuration/configmgr/configmgrlib ${HPCC_SOURCE_DIR}/system/masking/include @@ -118,6 +119,7 @@ target_link_libraries ( unittests esphttp esdllib logginglib + workunit ${CppUnit_LIBRARIES} ) diff --git a/testing/unittests/dalitests.cpp b/testing/unittests/dalitests.cpp index c10a7529d2d..2a08b473519 100644 --- a/testing/unittests/dalitests.cpp +++ b/testing/unittests/dalitests.cpp @@ -31,11 +31,14 @@ #include "dasds.hpp" #include "danqs.hpp" #include "dautils.hpp" +#include "wujobq.hpp" #include #include #include +#include "jthread.hpp" + #include "unittests.hpp" #include "sysinfologger.hpp" @@ -3262,4 +3265,386 @@ class DaliSysInfoLoggerTester : public CppUnit::TestFixture CPPUNIT_TEST_SUITE_REGISTRATION( DaliSysInfoLoggerTester ); CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( DaliSysInfoLoggerTester, "DaliSysInfoLoggerTester" ); + + + +static constexpr bool traceJobQueue = false; +static unsigned jobQueueStartTick; +static constexpr unsigned tickScaling = 1; +static unsigned getJobQueueTick() +{ + return (msTick() - jobQueueStartTick) / tickScaling; +} +static void JobQueueSleep(unsigned ms) +{ + MilliSleep(ms * tickScaling); +} +class JobQueueTester : public CppUnit::TestFixture +{ + /* Note: global messages will be written for dates between 2000-02-04 and 2000-02-05 */ + /* Note: All global messages with time stamp before 2000-03-31 will be deleted */ + CPPUNIT_TEST_SUITE(JobQueueTester); + CPPUNIT_TEST(testInit); + CPPUNIT_TEST(testSingle); + CPPUNIT_TEST(testDouble); + CPPUNIT_TEST(testMany); + CPPUNIT_TEST(testCleanup); + CPPUNIT_TEST_SUITE_END(); + + struct JobEntry + { + unsigned delayMs; + const char * name; + unsigned processingMs; + int priority; + }; + + class JobProcessor : public Thread + { + public: + JobProcessor(Semaphore & _startedSem, Semaphore & _processedSem, IJobQueue * _queue, unsigned _id) + : startedSem(_startedSem), processedSem(_processedSem), queue(_queue), id(_id) + { + } + + virtual int run() override + { + startedSem.signal(); + try + { + processAll(); + } + catch (IException * _e) + { + e.setown(_e); + } + return 0; + } + + bool processItem(IJobQueueItem * item) + { + assertex(item); + const char * name = item->queryWUID(); + if (traceJobQueue) + DBGLOG("===%s===@%u", name, getJobQueueTick()); + if (name[0] == '!') + return false; + output.append(name); + if (!log.isEmpty()) + log.append(","); + log.append(name).append("@").append(getJobQueueTick()); + unsigned delay = item->getPort(); + JobQueueSleep(delay); + processedSem.signal(); + return true; + } + + const char * queryOutput() + { + if (e) + throw e.getClear(); + return output.str(); + } + + const char * queryLog() + { + return log.str(); + } + + virtual void processAll() = 0; + + protected: + Semaphore & startedSem; + Semaphore & processedSem; + Linked queue; + StringBuffer output; + StringBuffer log; + Owned e; + unsigned id; + }; + + class StandardJobProcessor : public JobProcessor + { + public: + StandardJobProcessor(Semaphore & _startedSem, Semaphore & _processedSem, IJobQueue * _queue, unsigned _id) + : JobProcessor(_startedSem, _processedSem, _queue, _id) + { + } + + virtual void processAll() override + { + for (;;) + { + Owned item = queue->dequeue(); + if (!processItem(item)) + break; + } + } + + }; + + class ThorJobProcessor : public JobProcessor + { + public: + ThorJobProcessor(Semaphore & _startedSem, Semaphore & _processedSem, IJobQueue * _queue, unsigned _id) + : JobProcessor(_startedSem, _processedSem, _queue, _id) + { + } + + virtual void processAll() override + { + for (;;) + { + Owned item = queue->dequeue(0, INFINITE, 200*tickScaling); + bool ret = processItem(item); + if (!ret) + break; + } + } + }; + + class NewThorJobProcessor : public JobProcessor + { + public: + NewThorJobProcessor(Semaphore & _startedSem, Semaphore & _processedSem, IJobQueue * _queue, unsigned _id) + : JobProcessor(_startedSem, _processedSem, _queue, _id) + { + } + + virtual void processAll() override + { + for (;;) + { + Owned item = queue->dequeue(lastPrio, 200*tickScaling, 0); + if (!item) + item.setown(queue->dequeue(0, INFINITE, 0)); + lastPrio = item->getPriority(); + bool ret = processItem(item); + if (!ret) + break; + } + } + + protected: + int lastPrio = 0; + + }; + + enum JobProcessorType + { + StandardProcessor, + ThorProcessor, + NewThorProcessor, + }; + + void testInit() + { + daliClientInit(); + } + + void testCleanup() + { + daliClientEnd(); + } + + void runTestCase(const char * name, const std::initializer_list & jobs, const std::initializer_list & processors, const std::initializer_list & expectedResults, bool uniqueQueues) + { + Owned queue = createJobQueue("JobQueueTester"); + Semaphore startedSem; + Semaphore processedSem; + + CIArrayOf jobProcessors; + for (auto & processor : processors) + { + JobProcessor * cur = nullptr; + Owned localQueue; + IJobQueue * processorQueue = queue; + if (uniqueQueues) + { + localQueue.setown(createJobQueue("JobQueueTester")); + processorQueue = localQueue; + } + + switch (processor) + { + case StandardProcessor: + cur = new StandardJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); + break; + case ThorProcessor: + cur = new ThorJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); + break; + case NewThorProcessor: + cur = new NewThorJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); + break; + default: + UNIMPLEMENTED; + } + jobProcessors.append(*cur); + cur->start(true); + } + + for (auto & processor : processors) + startedSem.wait(); + + IArrayOf conversations; + jobQueueStartTick = msTick(); + for (auto & job : jobs) + { + JobQueueSleep(job.delayMs); + if (traceJobQueue) + DBGLOG("Add (%s, %d, %d) @%u", job.name, job.delayMs, job.processingMs, getJobQueueTick()); + Owned item = createJobQueueItem(job.name); + item->setPort(job.processingMs); + item->setPriority(job.priority); + + queue->enqueue(item.getClear()); + } + + ForEachItemIn(i1, jobProcessors) + { + if (traceJobQueue) + DBGLOG("Add (eoj) @%u", getJobQueueTick()); + + //The queue code dedups by "wuid", so we need to add a unique "stop" entry + std::string end = std::string("!") + std::to_string(i1); + Owned item = createJobQueueItem(end.c_str()); + queue->enqueue(item.getClear()); + } + + ForEachItemIn(i2, jobProcessors) + { + if (traceJobQueue) + DBGLOG("Wait for %u", i2); + jobProcessors.item(i2).join(); + } + + DBGLOG("%s:%s, %ums", name, uniqueQueues ? " unique queues" : "", getJobQueueTick()); + ForEachItemIn(i3, jobProcessors) + { + JobProcessor & cur = jobProcessors.item(i3); + DBGLOG(" Result: '%s' '%s'", cur.queryOutput(), cur.queryLog()); +// if (i3 < expectedResults.size()) +// CPPUNIT_ASSERT_EQUAL(std::string(expectedResults.begin()[i3]), std::string(cur.queryOutput())); + } + } + + void runTestCaseX2(const char * name, const std::initializer_list & jobs, const std::initializer_list & processors, const std::initializer_list & expectedResults) + { + runTestCase(name, jobs, processors, expectedResults, false); + runTestCase(name, jobs, processors, expectedResults, true); + } + + static constexpr std::initializer_list singleWuTest = { + { 0, "a", 90, 0 }, + { 100, "b", 90, 0 }, + { 100, "c", 90, 0 }, + { 100, "d", 90, 0 }, + }; + + static constexpr std::initializer_list twoWuTest = { + { 0, "a", 90, 0 }, + { 50, "A", 90, 0}, + { 50, "b", 90, 0 }, + { 50, "B", 90, 0 }, + { 50, "c", 90, 0 }, + { 50, "C", 90, 0 }, + { 50, "d", 90, 0 }, + { 50, "D", 90, 0 }, + }; + + static constexpr std::initializer_list lowHighTest = { + { 0, "a", 90, 0 }, + { 50, "A", 90, 1}, + { 50, "b", 90, 0 }, + { 50, "B", 90, 1 }, + { 50, "c", 90, 0 }, + { 50, "C", 90, 1 }, + { 50, "d", 90, 0 }, + { 50, "D", 90, 1 }, + }; + + static constexpr std::initializer_list lowHigh2Test = { + { 0, "a", 90, 0 }, + { 50, "A", 90, 1}, + { 10, "b", 90, 0 }, + { 10, "B", 90, 1 }, + { 10, "c", 90, 0 }, + { 10, "C", 90, 1 }, + { 10, "d", 90, 0 }, + { 10, "D", 90, 1 }, + }; + + static constexpr std::initializer_list lowHigh3Test = { + { 0, "a", 90, 0 }, + { 50, "A", 90, 1}, + { 10, "b", 90, 0 }, + }; + + static constexpr std::initializer_list dripFeedTest = { + { 0, "a", 10, 0 }, + { 100, "b", 10, 0}, + { 100, "c", 10, 0}, + { 100, "d", 10, 0}, + { 100, "e", 10, 0}, + { 100, "f", 10, 0}, + { 100, "g", 10, 0}, + { 100, "h", 10, 0}, + { 100, "i", 10, 0}, + { 100, "j", 10, 0}, + }; + + static constexpr std::initializer_list drip2FeedTest = { + { 0, "a", 60, 0 }, + { 50, "b", 60, 0}, + { 50, "c", 60, 0}, + { 50, "d", 60, 0}, + { 50, "e", 60, 0}, + { 50, "f", 60, 0}, + { 50, "g", 60, 0}, + { 50, "h", 60, 0}, + { 50, "i", 60, 0}, + { 50, "j", 60, 0}, + { 50, "k", 60, 0}, + { 50, "l", 60, 0}, + { 50, "m", 60, 0}, + { 50, "n", 60, 0}, + { 50, "o", 60, 0}, + }; + + void testSingle() + { + runTestCase("1 wu, 1 standard", singleWuTest, { StandardProcessor }, { "abcd" }, false); + runTestCase("2 wu, 1 standard", twoWuTest, { StandardProcessor }, { "aAbBcCdD" }, false); + runTestCase("lo hi wu, 1 standard", lowHighTest, { StandardProcessor }, { "aABCDbcd" }, false); + runTestCase("lo hi2 wu, 1 standard", lowHigh2Test, { StandardProcessor }, { "aABCDbcd" }, false); + runTestCase("lo hi2 wu, 1 thor", lowHigh2Test, { ThorProcessor }, {}, false); + runTestCase("lo hi2 wu, 1 newthor", lowHigh2Test, { NewThorProcessor }, {}, false); + runTestCase("drip wu, 1 std", dripFeedTest, { StandardProcessor }, {}, false); + + } + + void testDouble() + { + runTestCaseX2("2 wu, 2 standard", twoWuTest, { StandardProcessor, StandardProcessor }, { "abcd", "ABCD" }); + runTestCaseX2("lo hi wu, 2 standard", lowHighTest, { StandardProcessor, StandardProcessor }, { "aBDc" "ACbd" }); + runTestCaseX2("lo hi2 wu, 2 standard", lowHigh2Test, { StandardProcessor, StandardProcessor }, { "a"}); + runTestCaseX2("lo hi2 wu, 2 thor", lowHigh2Test, { ThorProcessor, ThorProcessor }, {}); + runTestCaseX2("lo hi2 wu, 2 newthor", lowHigh2Test, { NewThorProcessor, NewThorProcessor }, {}); + + runTestCaseX2("lo hi3 wu, 2 thor", lowHigh3Test, { ThorProcessor, ThorProcessor }, {}); + runTestCaseX2("lo hi3 wu, 2 newthor", lowHigh3Test, { NewThorProcessor, NewThorProcessor }, {}); + runTestCaseX2("drip wu, 2 std", dripFeedTest, { StandardProcessor, StandardProcessor }, {}); + runTestCaseX2("drip wu, 2 newthor", dripFeedTest, { NewThorProcessor, NewThorProcessor }, {}); + } + + void testMany() + { + runTestCaseX2("drip wu, 3 std", dripFeedTest, { StandardProcessor, StandardProcessor, StandardProcessor }, {}); + runTestCaseX2("drip2 wu, 3 std", drip2FeedTest, { StandardProcessor, StandardProcessor, StandardProcessor }, {}); + } +}; + +CPPUNIT_TEST_SUITE_REGISTRATION( JobQueueTester ); +CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( JobQueueTester, "JobQueueTester" ); + #endif // _USE_CPPUNIT From 5e124d3011f3f66aaff4ed37cdb664645d7344e1 Mon Sep 17 00:00:00 2001 From: Gavin Halliday Date: Tue, 12 Nov 2024 20:08:42 +0000 Subject: [PATCH 2/3] HPCC-32945 Add support for queue clients with priorities Signed-off-by: Gavin Halliday --- common/workunit/wujobq.cpp | 137 ++++++++++++++------ common/workunit/wujobq.hpp | 1 + testing/unittests/dalitests.cpp | 217 ++++++++++++++++++++++---------- 3 files changed, 253 insertions(+), 102 deletions(-) diff --git a/common/workunit/wujobq.cpp b/common/workunit/wujobq.cpp index cd1f0d5b330..fe20091c17a 100644 --- a/common/workunit/wujobq.cpp +++ b/common/workunit/wujobq.cpp @@ -18,6 +18,7 @@ #include "platform.h" #include +#include #include "limits.h" #include "jlib.hpp" #include "jbuff.hpp" @@ -51,13 +52,12 @@ JobQueues JobQueue @name= @count= @state=active|paused|stopped Edition - Client @session= @connected= @waiting= -- connections and waiting can be > 1 (multiple threads) + Client @session= @connected= [@priority=n] @waiting= -- connections and waiting can no longer be > 1 Item* @wuid @owner @node @port @priority @session #endif - class CJobQueueItem: implements IJobQueueItem, public CInterface { int priority; @@ -789,16 +789,17 @@ class CJobQueueConst: public CJobQueueBase class CJobQueue: public CJobQueueBase, implements IJobQueue { public: - sQueueData *activeq; + sQueueData *activeq = nullptr; SessionId sessionid; - unsigned locknest; - bool writemode; - bool connected; + unsigned locknest = 0; + bool writemode = false; + bool connected = false; Owned initiateconv; StringAttr initiatewu; - bool dequeuestop; - bool cancelwaiting; - bool validateitemsessions; + std::atomic isProcessingDequeue = 0; + bool dequeuestop = false; + bool cancelwaiting = false; + bool validateitemsessions = false; class csubs: implements ISDSSubscription, public CInterface { @@ -811,15 +812,21 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue } void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData) { - CriticalBlock block(parent->crit); + //There is a race condition - a callback may be at this point while the CJobQueue is deleted. + //Adding a critical section in parent makes it much more likely to be hit. + //Ultimately the semaphore should be moved to this class instead + //CriticalBlock block(parent->crit); parent->notifysem.signal(); } - } subs; + }; - IMPLEMENT_IINTERFACE; + Owned subs; + + IMPLEMENT_IINTERFACE_USING(CJobQueueBase); - CJobQueue(const char *_qname) : CJobQueueBase(_qname), subs(this) + CJobQueue(const char *_qname) : CJobQueueBase(_qname) { + subs.setown(new csubs(this)); activeq = qdata; sessionid = myProcessSession(); validateitemsessions = false; @@ -1037,7 +1044,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue } StringBuffer path; path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get()); - qd->subscriberid = querySDS().subscribe(path.str(), subs, false); + qd->subscriberid = querySDS().subscribe(path.str(), *subs, false); } } @@ -1048,7 +1055,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue if (!qd->subscriberid) { StringBuffer path; path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get()); - qd->subscriberid = querySDS().subscribe(path.str(), subs, false); + qd->subscriberid = querySDS().subscribe(path.str(), *subs, false); } unsigned e = (unsigned)qd->root->getPropInt("Edition", 1); if (e!=qd->lastWaitEdition) { @@ -1128,7 +1135,24 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue } } - sQueueData *findbestqueue(bool useprev,int minprio,unsigned numqueues,sQueueData **queues) + bool hasHigherPriorityClients(IPropertyTree * queueTree, __uint64 clientPrio, unsigned threshold) + { + unsigned higher = 0; + Owned iter = queueTree->getElements("Client"); + ForEach(*iter) + { + unsigned __int64 priority = iter->query().getPropInt64("@priority", 0); + if (priority > clientPrio) + { + higher++; + if (higher >= threshold) + return true; + } + } + return false; + } + + sQueueData *findbestqueue(bool useprev,int minprio,__uint64 clientPrio,unsigned numqueues,sQueueData **queues) { if (numqueues==0) return NULL; @@ -1139,7 +1163,11 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue for (unsigned i=0;iroot->getPropInt("@count"); - if (count) { + if (count) + { + if (hasHigherPriorityClients(qd->root, clientPrio, count)) + continue; + int mpr = useprev?std::max(qd->root->getPropInt("@prevpriority"),minprio):minprio; if (count&&((minprio==INT_MIN)||checkprio(*qd,mpr))) { StringBuffer path; @@ -1160,17 +1188,33 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue return best; } - void setWaiting(unsigned numqueues,sQueueData **queues, bool set) + void setWaiting(unsigned numqueues,sQueueData **queues, unsigned __int64 clientPrio, bool set) { for (unsigned i=0; isetPropInt64("@waiting",croot->getPropInt64("@waiting",0)+(set?1:-1)); + //If a non-zero client priority has been specified, add (or remove) it from the list of priorities + if (clientPrio) + { + if (set) + croot->setPropInt64("@priority", clientPrio); + else + croot->removeProp("@priority"); + } } } // 'simple' queuing - IJobQueueItem *dodequeue(int minprio,unsigned timeout=INFINITE, bool useprev=false, bool *timedout=NULL) + IJobQueueItem *dodequeue(int minprio, __uint64 clientPrio, unsigned timeout, bool useprev, bool * timedout) { + //If more than one thread is waiting on the queue, then the queue code does not work correctly + //It is undefined which thread the semaphore signal will wake up. + //E.g. there is one thread with a minimum priority of 0, and another with a minimum of 100, and an item of + //priority 50 is queued. If the minimum priority of 100 is woken twice nothing will be dequeued. + //Similar problems occur when the clientPriority is mixed. + if (isProcessingDequeue.exchange(true)) + throw MakeStringException(0, "Multiple concurrent dequeue not supported"); + bool hasminprio=(minprio!=INT_MIN); if (timedout) *timedout = false; @@ -1200,23 +1244,30 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue active.append(qd); } if (stopped==total) + { + isProcessingDequeue.store(false); return NULL; // all stopped + } sQueueData **activeqds = (sQueueData **)active.getArray(); unsigned activenum = active.ordinality(); if (activenum) { - sQueueData *bestqd = findbestqueue(useprev,minprio,activenum,activeqds); + sQueueData *bestqd = findbestqueue(useprev,minprio,clientPrio,activenum,activeqds); unsigned count = bestqd?bestqd->root->getPropInt("@count"):0; // load minp from cache - if (count) { - int mpr = useprev?std::max(bestqd->root->getPropInt("@prevpriority"),minprio):minprio; - if (!hasminprio||checkprio(*bestqd,mpr)) { - block.setRollback(false); - ret = dotake(*bestqd,NULL,true,hasminprio,mpr); - if (ret) // think it must be! - timeout = 0; // so mark that done - else if (!hasminprio) { - WARNLOG("Resetting queue %s",bestqd->qname.get()); - clear(*bestqd); // reset queue as seems to have become out of sync + if (count) + { + if (!hasHigherPriorityClients(bestqd->root, clientPrio, count)) + { + int mpr = useprev?std::max(bestqd->root->getPropInt("@prevpriority"),minprio):minprio; + if (!hasminprio||checkprio(*bestqd,mpr)) { + block.setRollback(false); + ret = dotake(*bestqd,NULL,true,hasminprio,mpr); + if (ret) // think it must be! + timeout = 0; // so mark that done + else if (!hasminprio) { + WARNLOG("Resetting queue %s",bestqd->qname.get()); + clear(*bestqd); // reset queue as seems to have become out of sync + } } } } @@ -1226,7 +1277,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue block.setRollback(false); } if (!waitingset) { - setWaiting(activenum,activeqds,true); + setWaiting(activenum, activeqds, clientPrio, true); block.commit(); waitingset = true; } @@ -1234,7 +1285,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue } if (timeout==0) { if (waitingset) { - setWaiting(activenum,activeqds,false); + setWaiting(activenum, activeqds, clientPrio, false); block.commit(); } if (timedout) @@ -1255,12 +1306,14 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue timeout = 0; } } + + isProcessingDequeue.store(false); return ret; } IJobQueueItem *dequeue(unsigned timeout=INFINITE) { - return dodequeue(INT_MIN,timeout); + return dodequeue(INT_MIN, 0, timeout, false, nullptr); } @@ -1271,13 +1324,18 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue { unsigned timeout = prioritytransitiondelay; bool usePrevPrio = true; - item.setown(dodequeue(minPrio, timeout, usePrevPrio, nullptr)); + item.setown(dodequeue(minPrio, 0, timeout, usePrevPrio, nullptr)); } if (!item) - item.setown(dodequeue(minPrio, timeout-prioritytransitiondelay, false, nullptr)); + item.setown(dodequeue(minPrio, 0, timeout-prioritytransitiondelay, false, nullptr)); return item.getClear(); } + IJobQueueItem *dequeuePriority(unsigned __int64 priority, unsigned timeout=INFINITE) + { + return dodequeue(INT_MIN, priority, timeout, false, nullptr); + } + void placeonqueue(sQueueData &qd, IJobQueueItem *qitem,unsigned idx) // takes ownership of qitem { Owned qi = qitem; @@ -1628,6 +1686,11 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue return (state&&(strcmp(state,"stopped")==0)); } + void removeClient(sQueueData & qd, IPropertyTree * croot) + { + qd.root->removeTree(croot); + } + void doGetStats(sQueueData &qd,unsigned &connected,unsigned &waiting,unsigned &enqueued) { Cconnlockblock block(this,false); @@ -1640,7 +1703,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue break; if (validateitemsessions && !validSession(croot)) { Cconnlockblock block(this,true); - qd.root->removeTree(croot); + removeClient(qd, croot); } else { waiting += croot->getPropInt("@waiting"); @@ -1772,7 +1835,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue int minprio = 0; unsigned timeout = prioritytransitiondelay; bool usePrevPrio = true; - item.setown(dodequeue(minprio, timeout, usePrevPrio, &timedout)); + item.setown(dodequeue(minprio, 0, timeout, usePrevPrio, &timedout)); } else item.setown(dequeue(INFINITE)); diff --git a/common/workunit/wujobq.hpp b/common/workunit/wujobq.hpp index 4fdcda7a837..ed7cc2e2257 100644 --- a/common/workunit/wujobq.hpp +++ b/common/workunit/wujobq.hpp @@ -121,6 +121,7 @@ interface IJobQueue: extends IJobQueueConst // validateitemsessions ensures that all queue items have running session virtual IJobQueueItem *dequeue(unsigned timeout=INFINITE)=0; virtual IJobQueueItem *dequeue(int minPrio, unsigned timeout, unsigned prioritytransitiondelay)=0; + virtual IJobQueueItem *dequeuePriority(unsigned __int64 priority, unsigned timeout=INFINITE)=0; virtual void disconnect()=0; // signal no longer wil be dequeing (optional - done automatically on release) virtual void getStats(unsigned &connected,unsigned &waiting, unsigned &enqueued)=0; // this not quick as validates clients still running virtual bool waitStatsChange(unsigned timeout)=0; diff --git a/testing/unittests/dalitests.cpp b/testing/unittests/dalitests.cpp index 2a08b473519..d82f2eb1192 100644 --- a/testing/unittests/dalitests.cpp +++ b/testing/unittests/dalitests.cpp @@ -3430,11 +3430,36 @@ class JobQueueTester : public CppUnit::TestFixture }; + class PriorityJobProcessor : public JobProcessor + { + public: + PriorityJobProcessor(Semaphore & _startedSem, Semaphore & _processedSem, IJobQueue * _queue, unsigned _id) + : JobProcessor(_startedSem, _processedSem, _queue, _id) + { + } + + virtual void processAll() override + { + __uint64 priority = 0; + for (;;) + { + Owned item = queue->dequeuePriority(priority); + if (!item) + item.setown(queue->dequeue(0, INFINITE, 0)); + bool ret = processItem(item); + if (!ret) + break; + priority = getTimeStampNowValue(); + } + } + }; + enum JobProcessorType { StandardProcessor, ThorProcessor, NewThorProcessor, + PriorityProcessor, }; void testInit() @@ -3449,88 +3474,139 @@ class JobQueueTester : public CppUnit::TestFixture void runTestCase(const char * name, const std::initializer_list & jobs, const std::initializer_list & processors, const std::initializer_list & expectedResults, bool uniqueQueues) { - Owned queue = createJobQueue("JobQueueTester"); - Semaphore startedSem; - Semaphore processedSem; - - CIArrayOf jobProcessors; - for (auto & processor : processors) + try { - JobProcessor * cur = nullptr; - Owned localQueue; - IJobQueue * processorQueue = queue; - if (uniqueQueues) + Owned queue = createJobQueue("JobQueueTester"); + queue->connect(true); + queue->clear(); + + Semaphore startedSem; + Semaphore processedSem; + + CIArrayOf jobProcessors; + for (auto & processor : processors) { - localQueue.setown(createJobQueue("JobQueueTester")); - processorQueue = localQueue; + JobProcessor * cur = nullptr; + Owned localQueue; + IJobQueue * processorQueue = queue; + if (uniqueQueues) + { + localQueue.setown(createJobQueue("JobQueueTester")); + processorQueue = localQueue; + } + + switch (processor) + { + case StandardProcessor: + cur = new StandardJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); + break; + case ThorProcessor: + cur = new ThorJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); + break; + case NewThorProcessor: + cur = new NewThorJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); + break; + case PriorityProcessor: + cur = new PriorityJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); + break; + default: + UNIMPLEMENTED; + } + jobProcessors.append(*cur); + cur->start(true); } - switch (processor) + for (auto & processor : processors) + startedSem.wait(); + + IArrayOf conversations; + jobQueueStartTick = msTick(); + for (auto & job : jobs) { - case StandardProcessor: - cur = new StandardJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); - break; - case ThorProcessor: - cur = new ThorJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); - break; - case NewThorProcessor: - cur = new NewThorJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); - break; - default: - UNIMPLEMENTED; + JobQueueSleep(job.delayMs); + if (traceJobQueue) + DBGLOG("Add (%s, %d, %d) @%u", job.name, job.delayMs, job.processingMs, getJobQueueTick()); + Owned item = createJobQueueItem(job.name); + item->setPort(job.processingMs); + item->setPriority(job.priority); + + queue->enqueue(item.getClear()); } - jobProcessors.append(*cur); - cur->start(true); - } - for (auto & processor : processors) - startedSem.wait(); + for (;;) + { + //Wait until all the items have been processed before adding the special end markers + //otherwise the ends will be interpreted as valid items, and may cause the items to + //be dequeued by the wrong thread. + unsigned connected; + unsigned waiting; + unsigned enqueued; + queue->getStats(connected,waiting,enqueued); + if (enqueued == 0) + break; + MilliSleep(100 * tickScaling); + } - IArrayOf conversations; - jobQueueStartTick = msTick(); - for (auto & job : jobs) - { - JobQueueSleep(job.delayMs); - if (traceJobQueue) - DBGLOG("Add (%s, %d, %d) @%u", job.name, job.delayMs, job.processingMs, getJobQueueTick()); - Owned item = createJobQueueItem(job.name); - item->setPort(job.processingMs); - item->setPriority(job.priority); + ForEachItemIn(i1, jobProcessors) + { + if (traceJobQueue) + DBGLOG("Add (eoj) @%u", getJobQueueTick()); - queue->enqueue(item.getClear()); - } + //The queue code dedups by "wuid", so we need to add a unique "stop" entry + std::string end = std::string("!") + std::to_string(i1); + Owned item = createJobQueueItem(end.c_str()); + queue->enqueue(item.getClear()); + } - ForEachItemIn(i1, jobProcessors) - { - if (traceJobQueue) - DBGLOG("Add (eoj) @%u", getJobQueueTick()); + ForEachItemIn(i2, jobProcessors) + { + if (traceJobQueue) + DBGLOG("Wait for %u", i2); + jobProcessors.item(i2).join(); + } - //The queue code dedups by "wuid", so we need to add a unique "stop" entry - std::string end = std::string("!") + std::to_string(i1); - Owned item = createJobQueueItem(end.c_str()); - queue->enqueue(item.getClear()); - } + DBGLOG("%s:%s, %ums", name, uniqueQueues ? " unique queues" : "", getJobQueueTick()); + unsigned numProcessors = processors.size(); + ForEachItemIn(i3, jobProcessors) + { + JobProcessor & cur = jobProcessors.item(i3); + DBGLOG(" Result: '%s' '%s'", cur.queryOutput(), cur.queryLog()); + } - ForEachItemIn(i2, jobProcessors) - { - if (traceJobQueue) - DBGLOG("Wait for %u", i2); - jobProcessors.item(i2).join(); + if (numProcessors == expectedResults.size()) + { + ForEachItemIn(i3, jobProcessors) + { + JobProcessor & cur = jobProcessors.item(i3); + unsigned matchedPos = numProcessors; + for (unsigned i =0; i < numProcessors; i++) + { + if (streq(expectedResults.begin()[i], cur.queryOutput())) + { + matchedPos = i; + break; + } + } + if (matchedPos == numProcessors) + { + VStringBuffer msg("Test %s: No match for output %u", name, i3); + CPPUNIT_ASSERT_MESSAGE(msg.str(), 0); + } + } + } } - - DBGLOG("%s:%s, %ums", name, uniqueQueues ? " unique queues" : "", getJobQueueTick()); - ForEachItemIn(i3, jobProcessors) + catch (IException * e) { - JobProcessor & cur = jobProcessors.item(i3); - DBGLOG(" Result: '%s' '%s'", cur.queryOutput(), cur.queryLog()); -// if (i3 < expectedResults.size()) -// CPPUNIT_ASSERT_EQUAL(std::string(expectedResults.begin()[i3]), std::string(cur.queryOutput())); + StringBuffer msg("Fail: "); + e->errorMessage(msg); + e->Release(); + CPPUNIT_ASSERT_MESSAGE(msg.str(), 0); } } void runTestCaseX2(const char * name, const std::initializer_list & jobs, const std::initializer_list & processors, const std::initializer_list & expectedResults) { - runTestCase(name, jobs, processors, expectedResults, false); + //runTestCase(name, jobs, processors, expectedResults, false); runTestCase(name, jobs, processors, expectedResults, true); } @@ -3620,28 +3696,39 @@ class JobQueueTester : public CppUnit::TestFixture runTestCase("lo hi2 wu, 1 thor", lowHigh2Test, { ThorProcessor }, {}, false); runTestCase("lo hi2 wu, 1 newthor", lowHigh2Test, { NewThorProcessor }, {}, false); runTestCase("drip wu, 1 std", dripFeedTest, { StandardProcessor }, {}, false); - + runTestCase("drip wu, 1 std", dripFeedTest, { PriorityProcessor }, {}, false); } void testDouble() { runTestCaseX2("2 wu, 2 standard", twoWuTest, { StandardProcessor, StandardProcessor }, { "abcd", "ABCD" }); runTestCaseX2("lo hi wu, 2 standard", lowHighTest, { StandardProcessor, StandardProcessor }, { "aBDc" "ACbd" }); - runTestCaseX2("lo hi2 wu, 2 standard", lowHigh2Test, { StandardProcessor, StandardProcessor }, { "a"}); + runTestCaseX2("lo hi2 wu, 2 standard", lowHigh2Test, { StandardProcessor, StandardProcessor }, { }); runTestCaseX2("lo hi2 wu, 2 thor", lowHigh2Test, { ThorProcessor, ThorProcessor }, {}); runTestCaseX2("lo hi2 wu, 2 newthor", lowHigh2Test, { NewThorProcessor, NewThorProcessor }, {}); runTestCaseX2("lo hi3 wu, 2 thor", lowHigh3Test, { ThorProcessor, ThorProcessor }, {}); runTestCaseX2("lo hi3 wu, 2 newthor", lowHigh3Test, { NewThorProcessor, NewThorProcessor }, {}); + runTestCaseX2("lo hi3 wu, 2 prio", lowHigh3Test, { PriorityProcessor, PriorityProcessor }, {}); runTestCaseX2("drip wu, 2 std", dripFeedTest, { StandardProcessor, StandardProcessor }, {}); runTestCaseX2("drip wu, 2 newthor", dripFeedTest, { NewThorProcessor, NewThorProcessor }, {}); + runTestCaseX2("drip wu, 2 prio", dripFeedTest, { PriorityProcessor, PriorityProcessor }, { "abcdefghij", "" }); } void testMany() { runTestCaseX2("drip wu, 3 std", dripFeedTest, { StandardProcessor, StandardProcessor, StandardProcessor }, {}); runTestCaseX2("drip2 wu, 3 std", drip2FeedTest, { StandardProcessor, StandardProcessor, StandardProcessor }, {}); + runTestCaseX2("drip wu, 3 prio", dripFeedTest, { PriorityProcessor, PriorityProcessor, PriorityProcessor }, { "abcdefghij", "", "" }); + runTestCaseX2("drip2 wu, 3 prio", drip2FeedTest, { PriorityProcessor, PriorityProcessor, PriorityProcessor }, { "acegikmo", "bdfhjln", ""}); } + + //MORE Tests: + //Many requests at a time in waves + //Priority 1,2,3 fixed - not dynamic + //Stopping listening after N to check priorities removed correctly + //Mix standard and priority + //Priority with expiring and gaps to ensure the correct client picks up the items. }; CPPUNIT_TEST_SUITE_REGISTRATION( JobQueueTester ); From dde7ce47cf6c1f457584b97c0fb8362f06b28c67 Mon Sep 17 00:00:00 2001 From: Gavin Halliday Date: Tue, 19 Nov 2024 15:38:27 +0000 Subject: [PATCH 3/3] Fix race condition Signed-off-by: Gavin Halliday --- common/workunit/wujobq.cpp | 38 ++++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/common/workunit/wujobq.cpp b/common/workunit/wujobq.cpp index fe20091c17a..1250d513f49 100644 --- a/common/workunit/wujobq.cpp +++ b/common/workunit/wujobq.cpp @@ -395,7 +395,6 @@ class CJobQueueBase: implements IJobQueueConst, public CInterface } public: sQueueData *qdata; - Semaphore notifysem; CriticalSection crit; IMPLEMENT_IINTERFACE; @@ -801,32 +800,31 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue bool cancelwaiting = false; bool validateitemsessions = false; - class csubs: implements ISDSSubscription, public CInterface + class QueueChangeSubscription : implements ISDSSubscription, public CInterface { - CJobQueue *parent; + public: + //If this semaphone is in the CJobQueue class then there is a race condition + //A callback may be at this point while the CJobQueue is deleted - causing it to signal + //a deleted semaphore + Semaphore notifysem; public: IMPLEMENT_IINTERFACE; - csubs(CJobQueue *_parent) - { - parent = _parent; - } + void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData) { - //There is a race condition - a callback may be at this point while the CJobQueue is deleted. - //Adding a critical section in parent makes it much more likely to be hit. - //Ultimately the semaphore should be moved to this class instead - //CriticalBlock block(parent->crit); - parent->notifysem.signal(); + notifysem.signal(); } }; - Owned subs; + //This must be an owned pointer, rather than a member, to avoid it being deleted while the notify() + //callback is being called. + Owned notifySubscription; IMPLEMENT_IINTERFACE_USING(CJobQueueBase); CJobQueue(const char *_qname) : CJobQueueBase(_qname) { - subs.setown(new csubs(this)); + notifySubscription.setown(new QueueChangeSubscription); activeq = qdata; sessionid = myProcessSession(); validateitemsessions = false; @@ -1044,7 +1042,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue } StringBuffer path; path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get()); - qd->subscriberid = querySDS().subscribe(path.str(), *subs, false); + qd->subscriberid = querySDS().subscribe(path.str(), *notifySubscription, false); } } @@ -1055,7 +1053,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue if (!qd->subscriberid) { StringBuffer path; path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get()); - qd->subscriberid = querySDS().subscribe(path.str(), *subs, false); + qd->subscriberid = querySDS().subscribe(path.str(), *notifySubscription, false); } unsigned e = (unsigned)qd->root->getPropInt("Edition", 1); if (e!=qd->lastWaitEdition) { @@ -1297,7 +1295,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue // check every 5 mins independant of notify (in case subscription lost for some reason) if (to>timeout) to = timeout; - notifysem.wait(to); + notifySubscription->notifysem.wait(to); if (timeout!=(unsigned)INFINITE) { t = msTick()-t; if (tnotifysem.signal(); } bool cancelInitiateConversation(sQueueData &qd,const char *wuid) @@ -1915,7 +1913,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue if (haschanged()) return true; } - if (!notifysem.wait(timeout)) + if (!notifySubscription->notifysem.wait(timeout)) break; } return false; @@ -1924,7 +1922,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue { CriticalBlock block(crit); cancelwaiting = true; - notifysem.signal(); + notifySubscription->notifysem.signal(); } virtual void enqueue(IJobQueueItem *qitem)