diff --git a/common/workunit/wujobq.cpp b/common/workunit/wujobq.cpp index 47a35e8f1dd..cd1f0d5b330 100644 --- a/common/workunit/wujobq.cpp +++ b/common/workunit/wujobq.cpp @@ -1264,6 +1264,20 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue } + IJobQueueItem *dequeue(int minPrio, unsigned timeout, unsigned prioritytransitiondelay) override + { + Owned item; + if (prioritytransitiondelay) + { + unsigned timeout = prioritytransitiondelay; + bool usePrevPrio = true; + item.setown(dodequeue(minPrio, timeout, usePrevPrio, nullptr)); + } + if (!item) + item.setown(dodequeue(minPrio, timeout-prioritytransitiondelay, false, nullptr)); + return item.getClear(); + } + void placeonqueue(sQueueData &qd, IJobQueueItem *qitem,unsigned idx) // takes ownership of qitem { Owned qi = qitem; diff --git a/common/workunit/wujobq.hpp b/common/workunit/wujobq.hpp index c3209904a50..4fdcda7a837 100644 --- a/common/workunit/wujobq.hpp +++ b/common/workunit/wujobq.hpp @@ -120,6 +120,7 @@ interface IJobQueue: extends IJobQueueConst virtual void connect(bool validateitemsessions)=0; // must be called before dequeueing // validateitemsessions ensures that all queue items have running session virtual IJobQueueItem *dequeue(unsigned timeout=INFINITE)=0; + virtual IJobQueueItem *dequeue(int minPrio, unsigned timeout, unsigned prioritytransitiondelay)=0; virtual void disconnect()=0; // signal no longer wil be dequeing (optional - done automatically on release) virtual void getStats(unsigned &connected,unsigned &waiting, unsigned &enqueued)=0; // this not quick as validates clients still running virtual bool waitStatsChange(unsigned timeout)=0; diff --git a/testing/unittests/CMakeLists.txt b/testing/unittests/CMakeLists.txt index 59498fb0d2f..ee8278a30c3 100644 --- a/testing/unittests/CMakeLists.txt +++ b/testing/unittests/CMakeLists.txt @@ -78,6 +78,7 @@ include_directories ( ./../../dali/base ./../../system/security/shared ./../../common/deftype + ./../../common/workunit ./../../system/security/cryptohelper ./../../configuration/configmgr/configmgrlib ${HPCC_SOURCE_DIR}/system/masking/include @@ -118,6 +119,7 @@ target_link_libraries ( unittests esphttp esdllib logginglib + workunit ${CppUnit_LIBRARIES} ) diff --git a/testing/unittests/dalitests.cpp b/testing/unittests/dalitests.cpp index c10a7529d2d..2a08b473519 100644 --- a/testing/unittests/dalitests.cpp +++ b/testing/unittests/dalitests.cpp @@ -31,11 +31,14 @@ #include "dasds.hpp" #include "danqs.hpp" #include "dautils.hpp" +#include "wujobq.hpp" #include #include #include +#include "jthread.hpp" + #include "unittests.hpp" #include "sysinfologger.hpp" @@ -3262,4 +3265,386 @@ class DaliSysInfoLoggerTester : public CppUnit::TestFixture CPPUNIT_TEST_SUITE_REGISTRATION( DaliSysInfoLoggerTester ); CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( DaliSysInfoLoggerTester, "DaliSysInfoLoggerTester" ); + + + +static constexpr bool traceJobQueue = false; +static unsigned jobQueueStartTick; +static constexpr unsigned tickScaling = 1; +static unsigned getJobQueueTick() +{ + return (msTick() - jobQueueStartTick) / tickScaling; +} +static void JobQueueSleep(unsigned ms) +{ + MilliSleep(ms * tickScaling); +} +class JobQueueTester : public CppUnit::TestFixture +{ + /* Note: global messages will be written for dates between 2000-02-04 and 2000-02-05 */ + /* Note: All global messages with time stamp before 2000-03-31 will be deleted */ + CPPUNIT_TEST_SUITE(JobQueueTester); + CPPUNIT_TEST(testInit); + CPPUNIT_TEST(testSingle); + CPPUNIT_TEST(testDouble); + CPPUNIT_TEST(testMany); + CPPUNIT_TEST(testCleanup); + CPPUNIT_TEST_SUITE_END(); + + struct JobEntry + { + unsigned delayMs; + const char * name; + unsigned processingMs; + int priority; + }; + + class JobProcessor : public Thread + { + public: + JobProcessor(Semaphore & _startedSem, Semaphore & _processedSem, IJobQueue * _queue, unsigned _id) + : startedSem(_startedSem), processedSem(_processedSem), queue(_queue), id(_id) + { + } + + virtual int run() override + { + startedSem.signal(); + try + { + processAll(); + } + catch (IException * _e) + { + e.setown(_e); + } + return 0; + } + + bool processItem(IJobQueueItem * item) + { + assertex(item); + const char * name = item->queryWUID(); + if (traceJobQueue) + DBGLOG("===%s===@%u", name, getJobQueueTick()); + if (name[0] == '!') + return false; + output.append(name); + if (!log.isEmpty()) + log.append(","); + log.append(name).append("@").append(getJobQueueTick()); + unsigned delay = item->getPort(); + JobQueueSleep(delay); + processedSem.signal(); + return true; + } + + const char * queryOutput() + { + if (e) + throw e.getClear(); + return output.str(); + } + + const char * queryLog() + { + return log.str(); + } + + virtual void processAll() = 0; + + protected: + Semaphore & startedSem; + Semaphore & processedSem; + Linked queue; + StringBuffer output; + StringBuffer log; + Owned e; + unsigned id; + }; + + class StandardJobProcessor : public JobProcessor + { + public: + StandardJobProcessor(Semaphore & _startedSem, Semaphore & _processedSem, IJobQueue * _queue, unsigned _id) + : JobProcessor(_startedSem, _processedSem, _queue, _id) + { + } + + virtual void processAll() override + { + for (;;) + { + Owned item = queue->dequeue(); + if (!processItem(item)) + break; + } + } + + }; + + class ThorJobProcessor : public JobProcessor + { + public: + ThorJobProcessor(Semaphore & _startedSem, Semaphore & _processedSem, IJobQueue * _queue, unsigned _id) + : JobProcessor(_startedSem, _processedSem, _queue, _id) + { + } + + virtual void processAll() override + { + for (;;) + { + Owned item = queue->dequeue(0, INFINITE, 200*tickScaling); + bool ret = processItem(item); + if (!ret) + break; + } + } + }; + + class NewThorJobProcessor : public JobProcessor + { + public: + NewThorJobProcessor(Semaphore & _startedSem, Semaphore & _processedSem, IJobQueue * _queue, unsigned _id) + : JobProcessor(_startedSem, _processedSem, _queue, _id) + { + } + + virtual void processAll() override + { + for (;;) + { + Owned item = queue->dequeue(lastPrio, 200*tickScaling, 0); + if (!item) + item.setown(queue->dequeue(0, INFINITE, 0)); + lastPrio = item->getPriority(); + bool ret = processItem(item); + if (!ret) + break; + } + } + + protected: + int lastPrio = 0; + + }; + + enum JobProcessorType + { + StandardProcessor, + ThorProcessor, + NewThorProcessor, + }; + + void testInit() + { + daliClientInit(); + } + + void testCleanup() + { + daliClientEnd(); + } + + void runTestCase(const char * name, const std::initializer_list & jobs, const std::initializer_list & processors, const std::initializer_list & expectedResults, bool uniqueQueues) + { + Owned queue = createJobQueue("JobQueueTester"); + Semaphore startedSem; + Semaphore processedSem; + + CIArrayOf jobProcessors; + for (auto & processor : processors) + { + JobProcessor * cur = nullptr; + Owned localQueue; + IJobQueue * processorQueue = queue; + if (uniqueQueues) + { + localQueue.setown(createJobQueue("JobQueueTester")); + processorQueue = localQueue; + } + + switch (processor) + { + case StandardProcessor: + cur = new StandardJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); + break; + case ThorProcessor: + cur = new ThorJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); + break; + case NewThorProcessor: + cur = new NewThorJobProcessor(startedSem, processedSem, processorQueue, jobProcessors.ordinality()); + break; + default: + UNIMPLEMENTED; + } + jobProcessors.append(*cur); + cur->start(true); + } + + for (auto & processor : processors) + startedSem.wait(); + + IArrayOf conversations; + jobQueueStartTick = msTick(); + for (auto & job : jobs) + { + JobQueueSleep(job.delayMs); + if (traceJobQueue) + DBGLOG("Add (%s, %d, %d) @%u", job.name, job.delayMs, job.processingMs, getJobQueueTick()); + Owned item = createJobQueueItem(job.name); + item->setPort(job.processingMs); + item->setPriority(job.priority); + + queue->enqueue(item.getClear()); + } + + ForEachItemIn(i1, jobProcessors) + { + if (traceJobQueue) + DBGLOG("Add (eoj) @%u", getJobQueueTick()); + + //The queue code dedups by "wuid", so we need to add a unique "stop" entry + std::string end = std::string("!") + std::to_string(i1); + Owned item = createJobQueueItem(end.c_str()); + queue->enqueue(item.getClear()); + } + + ForEachItemIn(i2, jobProcessors) + { + if (traceJobQueue) + DBGLOG("Wait for %u", i2); + jobProcessors.item(i2).join(); + } + + DBGLOG("%s:%s, %ums", name, uniqueQueues ? " unique queues" : "", getJobQueueTick()); + ForEachItemIn(i3, jobProcessors) + { + JobProcessor & cur = jobProcessors.item(i3); + DBGLOG(" Result: '%s' '%s'", cur.queryOutput(), cur.queryLog()); +// if (i3 < expectedResults.size()) +// CPPUNIT_ASSERT_EQUAL(std::string(expectedResults.begin()[i3]), std::string(cur.queryOutput())); + } + } + + void runTestCaseX2(const char * name, const std::initializer_list & jobs, const std::initializer_list & processors, const std::initializer_list & expectedResults) + { + runTestCase(name, jobs, processors, expectedResults, false); + runTestCase(name, jobs, processors, expectedResults, true); + } + + static constexpr std::initializer_list singleWuTest = { + { 0, "a", 90, 0 }, + { 100, "b", 90, 0 }, + { 100, "c", 90, 0 }, + { 100, "d", 90, 0 }, + }; + + static constexpr std::initializer_list twoWuTest = { + { 0, "a", 90, 0 }, + { 50, "A", 90, 0}, + { 50, "b", 90, 0 }, + { 50, "B", 90, 0 }, + { 50, "c", 90, 0 }, + { 50, "C", 90, 0 }, + { 50, "d", 90, 0 }, + { 50, "D", 90, 0 }, + }; + + static constexpr std::initializer_list lowHighTest = { + { 0, "a", 90, 0 }, + { 50, "A", 90, 1}, + { 50, "b", 90, 0 }, + { 50, "B", 90, 1 }, + { 50, "c", 90, 0 }, + { 50, "C", 90, 1 }, + { 50, "d", 90, 0 }, + { 50, "D", 90, 1 }, + }; + + static constexpr std::initializer_list lowHigh2Test = { + { 0, "a", 90, 0 }, + { 50, "A", 90, 1}, + { 10, "b", 90, 0 }, + { 10, "B", 90, 1 }, + { 10, "c", 90, 0 }, + { 10, "C", 90, 1 }, + { 10, "d", 90, 0 }, + { 10, "D", 90, 1 }, + }; + + static constexpr std::initializer_list lowHigh3Test = { + { 0, "a", 90, 0 }, + { 50, "A", 90, 1}, + { 10, "b", 90, 0 }, + }; + + static constexpr std::initializer_list dripFeedTest = { + { 0, "a", 10, 0 }, + { 100, "b", 10, 0}, + { 100, "c", 10, 0}, + { 100, "d", 10, 0}, + { 100, "e", 10, 0}, + { 100, "f", 10, 0}, + { 100, "g", 10, 0}, + { 100, "h", 10, 0}, + { 100, "i", 10, 0}, + { 100, "j", 10, 0}, + }; + + static constexpr std::initializer_list drip2FeedTest = { + { 0, "a", 60, 0 }, + { 50, "b", 60, 0}, + { 50, "c", 60, 0}, + { 50, "d", 60, 0}, + { 50, "e", 60, 0}, + { 50, "f", 60, 0}, + { 50, "g", 60, 0}, + { 50, "h", 60, 0}, + { 50, "i", 60, 0}, + { 50, "j", 60, 0}, + { 50, "k", 60, 0}, + { 50, "l", 60, 0}, + { 50, "m", 60, 0}, + { 50, "n", 60, 0}, + { 50, "o", 60, 0}, + }; + + void testSingle() + { + runTestCase("1 wu, 1 standard", singleWuTest, { StandardProcessor }, { "abcd" }, false); + runTestCase("2 wu, 1 standard", twoWuTest, { StandardProcessor }, { "aAbBcCdD" }, false); + runTestCase("lo hi wu, 1 standard", lowHighTest, { StandardProcessor }, { "aABCDbcd" }, false); + runTestCase("lo hi2 wu, 1 standard", lowHigh2Test, { StandardProcessor }, { "aABCDbcd" }, false); + runTestCase("lo hi2 wu, 1 thor", lowHigh2Test, { ThorProcessor }, {}, false); + runTestCase("lo hi2 wu, 1 newthor", lowHigh2Test, { NewThorProcessor }, {}, false); + runTestCase("drip wu, 1 std", dripFeedTest, { StandardProcessor }, {}, false); + + } + + void testDouble() + { + runTestCaseX2("2 wu, 2 standard", twoWuTest, { StandardProcessor, StandardProcessor }, { "abcd", "ABCD" }); + runTestCaseX2("lo hi wu, 2 standard", lowHighTest, { StandardProcessor, StandardProcessor }, { "aBDc" "ACbd" }); + runTestCaseX2("lo hi2 wu, 2 standard", lowHigh2Test, { StandardProcessor, StandardProcessor }, { "a"}); + runTestCaseX2("lo hi2 wu, 2 thor", lowHigh2Test, { ThorProcessor, ThorProcessor }, {}); + runTestCaseX2("lo hi2 wu, 2 newthor", lowHigh2Test, { NewThorProcessor, NewThorProcessor }, {}); + + runTestCaseX2("lo hi3 wu, 2 thor", lowHigh3Test, { ThorProcessor, ThorProcessor }, {}); + runTestCaseX2("lo hi3 wu, 2 newthor", lowHigh3Test, { NewThorProcessor, NewThorProcessor }, {}); + runTestCaseX2("drip wu, 2 std", dripFeedTest, { StandardProcessor, StandardProcessor }, {}); + runTestCaseX2("drip wu, 2 newthor", dripFeedTest, { NewThorProcessor, NewThorProcessor }, {}); + } + + void testMany() + { + runTestCaseX2("drip wu, 3 std", dripFeedTest, { StandardProcessor, StandardProcessor, StandardProcessor }, {}); + runTestCaseX2("drip2 wu, 3 std", drip2FeedTest, { StandardProcessor, StandardProcessor, StandardProcessor }, {}); + } +}; + +CPPUNIT_TEST_SUITE_REGISTRATION( JobQueueTester ); +CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( JobQueueTester, "JobQueueTester" ); + #endif // _USE_CPPUNIT