diff --git a/.buildkite/pipelines/sonarqube.yml.sh b/.buildkite/pipelines/sonarqube.yml.sh index 33c47321e1..f580449b55 100755 --- a/.buildkite/pipelines/sonarqube.yml.sh +++ b/.buildkite/pipelines/sonarqube.yml.sh @@ -15,8 +15,8 @@ steps: soft_fail: true agents: cpu: 6 - ephemeralStorage: "20G" - memory: "8G" + ephemeralStorage: "40G" + memory: "12G" image: "docker.elastic.co/ml-dev/ml-linux-build:33" env: PATH: "/usr/local/gcc133/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" diff --git a/.clang-tidy b/.clang-tidy index a65d6e8e43..146d35bcef 100644 --- a/.clang-tidy +++ b/.clang-tidy @@ -8,32 +8,31 @@ Checks: > clang-diagnostic-*, -clang-diagnostic-sign-conversion, + -clang-diagnostic-unused-macros, google-*, -google-build-using-namespace, -google-readability-namespace-comments, -google-runtime-references, + misc-*, + -misc-non-private-member-variables-in-classes, + -misc-include-cleaner, modernize-*, -modernize-use-trailing-return-type, -modernize-concat-nested-namespaces, -modernize-use-nodiscard, - -modernize-replace-random-shuffle, - -modernize-unary-static-assert, - -modernize-use-uncaught-exception, performance-*, readability-*, -readability-magic-numbers, - -readability-named-parameter, -readability-redundant-access-specifiers, -readability-simplify-boolean-expr, -readability-identifier-length, WarningsAsErrors: false -AnalyzeTemporaryDtors: false FormatStyle: file CheckOptions: - key: bugprone-assert-side-effect.AssertMacros @@ -61,4 +60,6 @@ CheckOptions: - key: bugprone-suspicious-string-compare.WarnOnLogicalNotComparison value: 'true' - key: readability-function-cognitive-complexity.IgnoreMacros - value: 'true' \ No newline at end of file + value: 'true' + - key: modernize-include.UseAngleBrackets + value: true \ No newline at end of file diff --git a/cmake/variables.cmake b/cmake/variables.cmake index 140869c48a..811fe35cdc 100644 --- a/cmake/variables.cmake +++ b/cmake/variables.cmake @@ -10,7 +10,7 @@ # # set the C++ standard we need to enforce -set (CMAKE_CXX_STANDARD 17 CACHE STRING "The C++ standard to use") +set (CMAKE_CXX_STANDARD 20 CACHE STRING "The C++ standard to use") set (CMAKE_CXX_STANDARD_REQUIRED ON) set (CMAKE_CXX_EXTENSIONS OFF) diff --git a/include/api/CAnomalyJob.h b/include/api/CAnomalyJob.h index e4f1f452cb..c79ad71bca 100644 --- a/include/api/CAnomalyJob.h +++ b/include/api/CAnomalyJob.h @@ -126,7 +126,7 @@ class API_EXPORT CAnomalyJob : public CDataProcessor { struct SBackgroundPersistArgs { SBackgroundPersistArgs(core_t::TTime time, const model::CResourceMonitor::SModelSizeStats& modelSizeStats, - const model::CInterimBucketCorrector& interimBucketCorrector, + model::CInterimBucketCorrector interimBucketCorrector, const model::CHierarchicalResultsAggregator& aggregator, core_t::TTime latestRecordTime, core_t::TTime lastResultsTime, @@ -146,12 +146,12 @@ class API_EXPORT CAnomalyJob : public CDataProcessor { using TBackgroundPersistArgsPtr = std::shared_ptr; public: - CAnomalyJob(const std::string& jobId, + CAnomalyJob(std::string jobId, model::CLimits& limits, CAnomalyJobConfig& jobConfig, model::CAnomalyDetectorModelConfig& modelConfig, core::CJsonOutputStreamWrapper& outputBuffer, - const TPersistCompleteFunc& persistCompleteFunc, + TPersistCompleteFunc persistCompleteFunc, CPersistenceManager* persistenceManager, core_t::TTime maxQuantileInterval, const std::string& timeFieldName, @@ -265,7 +265,8 @@ class API_EXPORT CAnomalyJob : public CDataProcessor { //! This is the function that is called in a different thread to the //! main processing when background persistence is triggered. - bool runBackgroundPersist(TBackgroundPersistArgsPtr args, core::CDataAdder& persister); + bool runBackgroundPersist(const TBackgroundPersistArgsPtr& args, + core::CDataAdder& persister); //! This function is called from the persistence manager when foreground persistence is triggered bool runForegroundPersist(core::CDataAdder& persister); @@ -331,9 +332,9 @@ class API_EXPORT CAnomalyJob : public CDataProcessor { //! Parses the time range in a control message assuming the time range follows after a //! single character code (e.g. starts with 'i10 20'). - bool parseTimeRangeInControlMessage(const std::string& controlMessage, - core_t::TTime& start, - core_t::TTime& end); + static bool parseTimeRangeInControlMessage(const std::string& controlMessage, + core_t::TTime& start, + core_t::TTime& end); //! Update equalizers if not interim and aggregate. void updateAggregatorAndAggregate(bool isInterim, model::CHierarchicalResults& results); @@ -375,7 +376,7 @@ class API_EXPORT CAnomalyJob : public CDataProcessor { //! Update configuration void doForecast(const std::string& controlMessage); - TAnomalyDetectorPtr + static TAnomalyDetectorPtr makeDetector(const model::CAnomalyDetectorModelConfig& modelConfig, model::CLimits& limits, const std::string& partitionFieldValue, @@ -383,16 +384,17 @@ class API_EXPORT CAnomalyJob : public CDataProcessor { const model::CAnomalyDetector::TModelFactoryCPtr& modelFactory); //! Populate detector keys from the anomaly job config. - void populateDetectorKeys(const CAnomalyJobConfig& jobConfig, TKeyVec& keys); + static void populateDetectorKeys(const CAnomalyJobConfig& jobConfig, TKeyVec& keys); //! Extract the field called \p fieldName from \p dataRowFields. - const std::string* fieldValue(const std::string& fieldName, const TStrStrUMap& dataRowFields); + static const std::string* fieldValue(const std::string& fieldName, + const TStrStrUMap& dataRowFields); //! Extract the required fields from \p dataRowFields //! and add the new record to \p detector - void addRecord(const TAnomalyDetectorPtr detector, - core_t::TTime time, - const TStrStrUMap& dataRowFields); + static void addRecord(const TAnomalyDetectorPtr& detector, + core_t::TTime time, + const TStrStrUMap& dataRowFields); //! Parses a control message requesting that model state be persisted. //! Extracts optional arguments to be used for persistence. @@ -420,12 +422,12 @@ class API_EXPORT CAnomalyJob : public CDataProcessor { core_t::TTime time, const model::CSearchKey& key, const std::string& partitionFieldValue, - model::CResourceMonitor& resourceMonitor); + const model::CResourceMonitor& resourceMonitor); //! Prune all the models that exceed \p buckets in age //! A value of 0 for \buckets indicates that only 'obsolete' models will //! be pruned, i.e. those which are so old as to be effectively dead. - void pruneAllModels(std::size_t buckets = 0); + void pruneAllModels(std::size_t buckets = 0) const; private: //! The job ID @@ -454,7 +456,7 @@ class API_EXPORT CAnomalyJob : public CDataProcessor { model::CAnomalyDetectorModelConfig& m_ModelConfig; //! Keep count of how many records we've handled - std::uint64_t m_NumRecordsHandled; + std::uint64_t m_NumRecordsHandled = 0; //! Detector keys. TKeyVec m_DetectorKeys; @@ -463,7 +465,7 @@ class API_EXPORT CAnomalyJob : public CDataProcessor { TKeyAnomalyDetectorPtrUMap m_Detectors; //! The end time of the last bucket out of latency window we've seen - core_t::TTime m_LastFinalisedBucketEndTime; + core_t::TTime m_LastFinalisedBucketEndTime = 0; //! Optional function to be called when persistence is complete TPersistCompleteFunc m_PersistCompleteFunc; @@ -486,10 +488,10 @@ class API_EXPORT CAnomalyJob : public CDataProcessor { core_t::TTime m_LastNormalizerPersistTime; //! Latest record time seen. - core_t::TTime m_LatestRecordTime; + core_t::TTime m_LatestRecordTime = 0; //! Last time we sent a finalised result to the API. - core_t::TTime m_LastResultsTime; + core_t::TTime m_LastResultsTime = 0; //! When the model state was restored was it entirely successful. //! Extra information about any errors that may have occurred diff --git a/lib/api/CAnomalyJob.cc b/lib/api/CAnomalyJob.cc index dfb43368b7..61466eba44 100644 --- a/lib/api/CAnomalyJob.cc +++ b/lib/api/CAnomalyJob.cc @@ -47,7 +47,9 @@ #include #include +#include #include +#include namespace ml { namespace api { @@ -110,7 +112,7 @@ class CReadableRapidXmlStatePersistInserter : public core::CRapidXmlStatePersist ~CReadableRapidXmlStatePersistInserter() override { std::string xml; this->toXml(false, xml); - m_WriteStream << "{\"xml\":\"" << xml << "\"}\n"; + m_WriteStream << R"({"xml":")" << xml << "\"}\n"; } bool readableTags() const override { return true; } @@ -126,31 +128,32 @@ const std::string CAnomalyJob::EMPTY_STRING; const CAnomalyJob::TAnomalyDetectorPtr CAnomalyJob::NULL_DETECTOR; -CAnomalyJob::CAnomalyJob(const std::string& jobId, +CAnomalyJob::CAnomalyJob(std::string jobId, model::CLimits& limits, CAnomalyJobConfig& jobConfig, model::CAnomalyDetectorModelConfig& modelConfig, core::CJsonOutputStreamWrapper& outputStream, - const TPersistCompleteFunc& persistCompleteFunc, + TPersistCompleteFunc persistCompleteFunc, CPersistenceManager* persistenceManager, core_t::TTime maxQuantileInterval, const std::string& timeFieldName, const std::string& timeFieldFormat, size_t maxAnomalyRecords) - : CDataProcessor{timeFieldName, timeFieldFormat}, m_JobId{jobId}, m_Limits{limits}, + : CDataProcessor{timeFieldName, timeFieldFormat}, m_JobId{std::move(jobId)}, m_Limits{limits}, m_OutputStream{outputStream}, m_ForecastRunner{m_JobId, m_OutputStream, limits.resourceMonitor()}, m_JsonOutputWriter{m_JobId, m_OutputStream}, m_JobConfig{jobConfig}, - m_ModelConfig{modelConfig}, m_NumRecordsHandled{0}, - m_LastFinalisedBucketEndTime{0}, m_PersistCompleteFunc{persistCompleteFunc}, + m_ModelConfig{modelConfig}, m_PersistCompleteFunc{std::move(persistCompleteFunc)}, m_MaxDetectors{std::numeric_limits::max()}, m_PersistenceManager{persistenceManager}, m_MaxQuantileInterval{maxQuantileInterval}, - m_LastNormalizerPersistTime{core::CTimeUtils::now()}, m_LatestRecordTime{0}, - m_LastResultsTime{0}, m_Aggregator{modelConfig}, m_Normalizer{modelConfig} { + m_LastNormalizerPersistTime{core::CTimeUtils::now()}, + m_Aggregator{modelConfig}, m_Normalizer{modelConfig} { m_JsonOutputWriter.limitNumberRecords(maxAnomalyRecords); - m_Limits.resourceMonitor().memoryUsageReporter(std::bind( - &CJsonOutputWriter::reportMemoryUsage, &m_JsonOutputWriter, std::placeholders::_1)); + m_Limits.resourceMonitor().memoryUsageReporter( + [ObjectPtr = &m_JsonOutputWriter](T && PH1) { + ObjectPtr->reportMemoryUsage(std::forward(PH1)); + }); } CAnomalyJob::~CAnomalyJob() { @@ -159,8 +162,8 @@ CAnomalyJob::~CAnomalyJob() { bool CAnomalyJob::handleRecord(const TStrStrUMap& dataRowFields, TOptionalTime time) { // Non-empty control fields take precedence over everything else - TStrStrUMapCItr iter = dataRowFields.find(CONTROL_FIELD_NAME); - if (iter != dataRowFields.end() && !iter->second.empty()) { + if (TStrStrUMapCItr const iter = dataRowFields.find(CONTROL_FIELD_NAME); + iter != dataRowFields.end() && !iter->second.empty()) { return this->handleControlMessage(iter->second); } @@ -182,7 +185,7 @@ bool CAnomalyJob::handleRecord(const TStrStrUMap& dataRowFields, TOptionalTime t ++core::CProgramCounters::counter(counter_t::E_TSADNumberTimeOrderErrors); std::ostringstream ss; ss << "Records must be in ascending time order. " - << "Record '" << this->debugPrintRecord(dataRowFields) << "' time " + << "Record '" << ml::api::CAnomalyJob::debugPrintRecord(dataRowFields) << "' time " << *time << " is before bucket time " << m_LastFinalisedBucketEndTime; LOG_ERROR(<< ss.str()); return true; @@ -193,30 +196,30 @@ bool CAnomalyJob::handleRecord(const TStrStrUMap& dataRowFields, TOptionalTime t this->outputBucketResultsUntil(*time); if (m_DetectorKeys.empty()) { - this->populateDetectorKeys(m_JobConfig, m_DetectorKeys); + ml::api::CAnomalyJob::populateDetectorKeys(m_JobConfig, m_DetectorKeys); } - for (std::size_t i = 0; i < m_DetectorKeys.size(); ++i) { - const std::string& partitionFieldName(m_DetectorKeys[i].partitionFieldName()); + for (const auto& m_DetectorKey : m_DetectorKeys) { + const std::string& partitionFieldName(m_DetectorKey.partitionFieldName()); // An empty partitionFieldName means no partitioning - TStrStrUMapCItr itr = partitionFieldName.empty() - ? dataRowFields.end() - : dataRowFields.find(partitionFieldName); + TStrStrUMapCItr const itr = partitionFieldName.empty() + ? dataRowFields.end() + : dataRowFields.find(partitionFieldName); const std::string& partitionFieldValue( itr == dataRowFields.end() ? EMPTY_STRING : itr->second); - // TODO - should usenull apply to the partition field too? + // TODO(valeriy): - should usenull apply to the partition field too? const TAnomalyDetectorPtr& detector = this->detectorForKey( false, // not restoring - *time, m_DetectorKeys[i], partitionFieldValue, m_Limits.resourceMonitor()); + *time, m_DetectorKey, partitionFieldValue, m_Limits.resourceMonitor()); if (detector == nullptr) { // There wasn't enough memory to create the detector continue; } - this->addRecord(detector, *time, dataRowFields); + ml::api::CAnomalyJob::addRecord(detector, *time, dataRowFields); } ++core::CProgramCounters::counter(counter_t::E_TSADNumberApiRecordsHandled); @@ -291,21 +294,21 @@ void CAnomalyJob::descriptionAndDebugMemoryUsage() const { this->sortedDetectors(detectors); std::ostringstream ss; - ss << "Anomaly detectors:" << std::endl; + ss << "Anomaly detectors:" << '\n'; TStrCRef partition = detectors[0].first.first; - ss << "\tpartition " << partition.get() << std::endl; - ss << "\t\tkey " << detectors[0].first.second.get() << std::endl; - ss << "\t\t\t" << detectors[0].second->description() << std::endl; + ss << "\tpartition " << partition.get() << '\n'; + ss << "\t\tkey " << detectors[0].first.second.get() << '\n'; + ss << "\t\t\t" << detectors[0].second->description() << '\n'; detectors[0].second->showMemoryUsage(ss); for (std::size_t i = 1; i < detectors.size(); ++i) { - ss << std::endl; + ss << '\n'; if (detectors[i].first.first.get() != partition.get()) { partition = detectors[i].first.first; - ss << "\tpartition " << partition.get() << std::endl; + ss << "\tpartition " << partition.get() << '\n'; } - ss << "\t\tkey " << detectors[i].first.second.get() << std::endl; - ss << "\t\t\t" << detectors[i].second->description() << std::endl; + ss << "\t\tkey " << detectors[i].first.second.get() << '\n'; + ss << "\t\t\t" << detectors[i].second->description() << '\n'; detectors[i].second->showMemoryUsage(ss); } LOG_INFO(<< ss.str()); @@ -391,13 +394,13 @@ bool CAnomalyJob::parsePersistControlMessageArgs(const std::string& controlMessa // snapshotId = short string identifier for snapshot - containing no spaces // snapshotDescription = description of snapshot. May contain spaces. - std::size_t pos{controlMessageArgs.find(' ')}; + std::size_t const pos{controlMessageArgs.find(' ')}; if (pos == std::string::npos) { LOG_ERROR(<< "Invalid control message format: \"" << controlMessageArgs << "\""); return false; } - std::string timestampStr{controlMessageArgs.substr(0, pos)}; + std::string const timestampStr{controlMessageArgs.substr(0, pos)}; if (timestampStr.empty()) { LOG_ERROR(<< "Received empty snapshot timestamp."); return false; @@ -408,7 +411,7 @@ bool CAnomalyJob::parsePersistControlMessageArgs(const std::string& controlMessa return false; } - std::size_t pos2{controlMessageArgs.find(' ', pos + 1)}; + std::size_t const pos2{controlMessageArgs.find(' ', pos + 1)}; if (pos2 == std::string::npos) { LOG_ERROR(<< "Invalid control message format: \"" << controlMessageArgs << "\""); return false; @@ -516,8 +519,8 @@ bool CAnomalyJob::isPersistenceNeeded(const std::string& description) const { void CAnomalyJob::outputBucketResultsUntil(core_t::TTime time) { // If the bucket time has increased, output results for all field names - core_t::TTime bucketLength = m_ModelConfig.bucketLength(); - core_t::TTime latency = m_ModelConfig.latency(); + core_t::TTime const bucketLength = m_ModelConfig.bucketLength(); + core_t::TTime const latency = m_ModelConfig.latency(); if (m_LastFinalisedBucketEndTime == 0) { m_LastFinalisedBucketEndTime = std::max( @@ -611,10 +614,11 @@ void CAnomalyJob::generateInterimResults(const std::string& controlMessage) { } core_t::TTime start = m_LastFinalisedBucketEndTime; - core_t::TTime end = m_LastFinalisedBucketEndTime + - (m_ModelConfig.latencyBuckets() + 1) * m_ModelConfig.bucketLength(); + core_t::TTime end = + m_LastFinalisedBucketEndTime + + ((m_ModelConfig.latencyBuckets() + 1) * m_ModelConfig.bucketLength()); - if (this->parseTimeRangeInControlMessage(controlMessage, start, end)) { + if (ml::api::CAnomalyJob::parseTimeRangeInControlMessage(controlMessage, start, end)) { LOG_TRACE(<< "Time range for results: " << start << " : " << end); this->outputResultsWithinRange(true, start, end); } @@ -631,7 +635,7 @@ bool CAnomalyJob::parseTimeRangeInControlMessage(const std::string& controlMessa if (!remainder.empty()) { tokens.push_back(remainder); } - std::size_t tokensSize = tokens.size(); + std::size_t const tokensSize = tokens.size(); if (tokensSize == 0) { // Default range return true; @@ -664,7 +668,7 @@ void CAnomalyJob::doForecast(const std::string& controlMessage) { void CAnomalyJob::outputResults(core_t::TTime bucketStartTime) { core::CStopWatch timer(true); - core_t::TTime bucketLength = m_ModelConfig.bucketLength(); + core_t::TTime const bucketLength = m_ModelConfig.bucketLength(); model::CHierarchicalResults results; TModelPlotDataVec modelPlotData; @@ -705,7 +709,7 @@ void CAnomalyJob::outputResults(core_t::TTime bucketStartTime) { this->updateNormalizerAndNormalizeResults(false, results); } - std::uint64_t processingTime = timer.stop(); + std::uint64_t const processingTime = timer.stop(); // Model plots must be written first so the Java persists them // once the bucket result is processed @@ -718,7 +722,7 @@ void CAnomalyJob::outputResults(core_t::TTime bucketStartTime) { // to the next whole number of buckets (this doesn't really matter if we enforce // that the model prune window always be an exact multiple of bucket span in the // corresponding Java code) - core_t::TTime bucketPruneWindow{ + core_t::TTime const bucketPruneWindow{ (m_ModelConfig.modelPruneWindow() + m_ModelConfig.bucketLength() - 1) / m_ModelConfig.bucketLength()}; this->pruneAllModels(bucketPruneWindow); @@ -731,7 +735,7 @@ void CAnomalyJob::outputResults(core_t::TTime bucketStartTime) { void CAnomalyJob::outputInterimResults(core_t::TTime bucketStartTime) { core::CStopWatch timer(true); - core_t::TTime bucketLength = m_ModelConfig.bucketLength(); + core_t::TTime const bucketLength = m_ModelConfig.bucketLength(); model::CHierarchicalResults results; results.setInterim(); @@ -765,7 +769,7 @@ void CAnomalyJob::outputInterimResults(core_t::TTime bucketStartTime) { this->updateNormalizerAndNormalizeResults(true, results); } - std::uint64_t processingTime = timer.stop(); + std::uint64_t const processingTime = timer.stop(); this->writeOutResults(true, results, bucketStartTime, processingTime); } @@ -780,14 +784,18 @@ void CAnomalyJob::writeOutResults(bool interim, using TScopedAllocator = core::CScopedBoostJsonPoolAllocator; static const std::string ALLOCATOR_ID("CAnomalyJob::writeOutResults"); - TScopedAllocator scopedAllocator(ALLOCATOR_ID, m_JsonOutputWriter); + TScopedAllocator const scopedAllocator(ALLOCATOR_ID, m_JsonOutputWriter); api::CHierarchicalResultsWriter writer( m_Limits, - std::bind(&CJsonOutputWriter::acceptResult, &m_JsonOutputWriter, - std::placeholders::_1), - std::bind(&CJsonOutputWriter::acceptInfluencer, &m_JsonOutputWriter, - std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); + [ObjectPtr = &m_JsonOutputWriter](T && PH1) { + return ObjectPtr->acceptResult(std::forward(PH1)); + }, + [ObjectPtr = &m_JsonOutputWriter]( + T && PH1, U && PH2, V && PH3) { + return ObjectPtr->acceptInfluencer( + std::forward(PH1), std::forward(PH2), std::forward(PH3)); + }); results.bottomUpBreadthFirst(writer); results.pivotsBottomUpBreadthFirst(writer); @@ -814,23 +822,25 @@ void CAnomalyJob::resetBuckets(const std::string& controlMessage) { } core_t::TTime start = 0; core_t::TTime end = 0; - if (this->parseTimeRangeInControlMessage(controlMessage, start, end)) { - core_t::TTime bucketLength = m_ModelConfig.bucketLength(); - core_t::TTime time = maths::common::CIntegerTools::floor(start, bucketLength); - core_t::TTime bucketEnd = maths::common::CIntegerTools::ceil(end, bucketLength); - while (time < bucketEnd) { - for (const auto& detector_ : m_Detectors) { - model::CAnomalyDetector* detector = detector_.second.get(); - if (detector == nullptr) { - LOG_ERROR(<< "Unexpected NULL pointer for key '" - << pairDebug(detector_.first) << '\''); - continue; - } - LOG_TRACE(<< "Resetting bucket = " << time); - detector->resetBucket(time); + if (ml::api::CAnomalyJob::parseTimeRangeInControlMessage(controlMessage, start, + end) == false) { + return; + } + core_t::TTime const bucketLength = m_ModelConfig.bucketLength(); + core_t::TTime time = maths::common::CIntegerTools::floor(start, bucketLength); + core_t::TTime const bucketEnd = maths::common::CIntegerTools::ceil(end, bucketLength); + while (time < bucketEnd) { + for (const auto& detector_ : m_Detectors) { + model::CAnomalyDetector* detector = detector_.second.get(); + if (detector == nullptr) { + LOG_ERROR(<< "Unexpected NULL pointer for key '" + << pairDebug(detector_.first) << '\''); + continue; } - time += bucketLength; + LOG_TRACE(<< "Resetting bucket = " << time); + detector->resetBucket(time); } + time += bucketLength; } } @@ -859,7 +869,7 @@ bool CAnomalyJob::restoreState(core::CDataSearcher& restoreSearcher, // and substitute decompressor with restoreSearcher two lines below.) core::CStateDecompressor decompressor(restoreSearcher); - core::CDataSearcher::TIStreamP strm(decompressor.search(1, 1)); + core::CDataSearcher::TIStreamP const strm(decompressor.search(1, 1)); if (strm == nullptr) { LOG_ERROR(<< "Unable to connect to data store"); return false; @@ -893,7 +903,7 @@ bool CAnomalyJob::restoreState(core::CDataSearcher& restoreSearcher, } if (completeToTime > 0) { - core_t::TTime lastBucketEndTime(maths::common::CIntegerTools::ceil( + core_t::TTime const lastBucketEndTime(maths::common::CIntegerTools::ceil( completeToTime, m_ModelConfig.bucketLength())); this->setDetectorsLastBucketEndTime(lastBucketEndTime); @@ -964,24 +974,26 @@ bool CAnomalyJob::restoreState(core::CStateRestoreTraverser& traverser, // Note that this has to be persisted and restored before any detectors. auto interimBucketCorrector = std::make_shared( m_ModelConfig.bucketLength()); - if (traverser.traverseSubLevel(std::bind( - &model::CInterimBucketCorrector::acceptRestoreTraverser, - interimBucketCorrector.get(), std::placeholders::_1)) == false) { + if (traverser.traverseSubLevel( + [capture0 = interimBucketCorrector.get()](T && PH1) { + return capture0->acceptRestoreTraverser(std::forward(PH1)); + }) == false) { LOG_ERROR(<< "Cannot restore interim bucket corrector"); return false; } m_ModelConfig.interimBucketCorrector(interimBucketCorrector); } else if (name == TOP_LEVEL_DETECTOR_TAG) { - if (traverser.traverseSubLevel(std::bind(&CAnomalyJob::restoreSingleDetector, - this, std::placeholders::_1)) == false) { + if (traverser.traverseSubLevel([this](T && PH1) { + return restoreSingleDetector(std::forward(PH1)); + }) == false) { LOG_ERROR(<< "Cannot restore anomaly detector"); return false; } ++numDetectors; } else if (name == RESULTS_AGGREGATOR_TAG) { - if (traverser.traverseSubLevel(std::bind( - &model::CHierarchicalResultsAggregator::acceptRestoreTraverser, - &m_Aggregator, std::placeholders::_1)) == false) { + if (traverser.traverseSubLevel([ObjectPtr = &m_Aggregator](T && PH1) { + return ObjectPtr->acceptRestoreTraverser(std::forward(PH1)); + }) == false) { LOG_ERROR(<< "Cannot restore results aggregator"); return false; } @@ -1010,8 +1022,10 @@ bool CAnomalyJob::restoreSingleDetector(core::CStateRestoreTraverser& traverser) } model::CSearchKey key; - if (traverser.traverseSubLevel(std::bind(&model::CAnomalyDetector::keyAcceptRestoreTraverser, - std::placeholders::_1, std::ref(key))) == false) { + if (traverser.traverseSubLevel([&key](T && PH1) { + return model::CAnomalyDetector::keyAcceptRestoreTraverser( + std::forward(PH1), key); + }) == false) { LOG_ERROR(<< "Cannot restore anomaly detector - no key found in " << KEY_TAG); m_RestoredStateDetail.s_RestoredStateStatus = E_UnexpectedTag; @@ -1035,9 +1049,10 @@ bool CAnomalyJob::restoreSingleDetector(core::CStateRestoreTraverser& traverser) } std::string partitionFieldValue; - if (traverser.traverseSubLevel(std::bind( - &model::CAnomalyDetector::partitionFieldAcceptRestoreTraverser, - std::placeholders::_1, std::ref(partitionFieldValue))) == false) { + if (traverser.traverseSubLevel([&partitionFieldValue](T && PH1) { + return model::CAnomalyDetector::partitionFieldAcceptRestoreTraverser( + std::forward(PH1), partitionFieldValue); + }) == false) { LOG_ERROR(<< "Cannot restore anomaly detector - " "no partition field value found in " << PARTITION_FIELD_TAG); @@ -1091,9 +1106,11 @@ bool CAnomalyJob::restoreDetectorState(const model::CSearchKey& key, LOG_DEBUG(<< "Restoring state for detector with key '" << key.debug() << '/' << partitionFieldValue << '\''); - if (traverser.traverseSubLevel(std::bind( - &model::CAnomalyDetector::acceptRestoreTraverser, detector.get(), - std::cref(partitionFieldValue), std::placeholders::_1)) == false) { + if (traverser.traverseSubLevel([ + capture0 = detector.get(), capture1 = std::cref(partitionFieldValue) + ](T && PH1) { + return capture0->acceptRestoreTraverser(capture1, std::forward(PH1)); + }) == false) { LOG_ERROR(<< "Error restoring anomaly detector for key '" << key.debug() << '/' << partitionFieldValue << '\''); return false; @@ -1123,7 +1140,7 @@ bool CAnomalyJob::persistStateInForeground(core::CDataAdder& persister, return true; } - core_t::TTime snapshotTimestamp{core::CTimeUtils::now()}; + core_t::TTime const snapshotTimestamp{core::CTimeUtils::now()}; const std::string snapshotId{core::CStringUtils::typeToString(snapshotTimestamp)}; const std::string description{descriptionPrefix + core::CTimeUtils::toIso8601(snapshotTimestamp)}; @@ -1173,7 +1190,7 @@ bool CAnomalyJob::backgroundPersistState() { // passing to a new thread. // Do NOT add std::ref wrappers around these arguments - they // MUST be copied for thread safety - TBackgroundPersistArgsPtr args = std::make_shared( + auto const args = std::make_shared( m_LastFinalisedBucketEndTime, m_Limits.resourceMonitor().createMemoryUsageReport( m_LastFinalisedBucketEndTime - m_ModelConfig.bucketLength()), @@ -1194,21 +1211,23 @@ bool CAnomalyJob::backgroundPersistState() { << pairDebug(detector_.first) << '\''); continue; } - model::CSearchKey::TStrCRefKeyCRefPr key(std::cref(detector_.first.first), - std::cref(detector_.first.second)); + model::CSearchKey::TStrCRefKeyCRefPr const key( + std::cref(detector_.first.first), std::cref(detector_.first.second)); if (detector->isSimpleCount()) { - copiedDetectors.push_back(TKeyCRefAnomalyDetectorPtrPr( - key, TAnomalyDetectorPtr(new model::CSimpleCountDetector(true, *detector)))); + copiedDetectors.emplace_back( + key, TAnomalyDetectorPtr(std::make_shared( + true, *detector))); } else { - copiedDetectors.push_back(TKeyCRefAnomalyDetectorPtrPr( - key, TAnomalyDetectorPtr(new model::CAnomalyDetector(true, *detector)))); + copiedDetectors.emplace_back( + key, std::make_shared(true, *detector)); } } std::sort(copiedDetectors.begin(), copiedDetectors.end(), maths::common::COrderings::SFirstLess()); - if (m_PersistenceManager->addPersistFunc(std::bind( - &CAnomalyJob::runBackgroundPersist, this, args, std::placeholders::_1)) == false) { + if (m_PersistenceManager->addPersistFunc([ this, args ](T && PH1) { + return runBackgroundPersist(args, std::forward(PH1)); + }) == false) { LOG_ERROR(<< "Failed to add anomaly detector background persistence function"); return false; } @@ -1227,14 +1246,14 @@ bool CAnomalyJob::runForegroundPersist(core::CDataAdder& persister) { return this->persistStateInForeground(persister, "Periodic foreground persist at "); } -bool CAnomalyJob::runBackgroundPersist(TBackgroundPersistArgsPtr args, +bool CAnomalyJob::runBackgroundPersist(const TBackgroundPersistArgsPtr& args, core::CDataAdder& persister) { if (!args) { LOG_ERROR(<< "Unexpected NULL pointer passed to background persist"); return false; } - core_t::TTime snapshotTimestamp(core::CTimeUtils::now()); + core_t::TTime const snapshotTimestamp(core::CTimeUtils::now()); const std::string snapshotId(core::CStringUtils::typeToString(snapshotTimestamp)); const std::string description{"Periodic background persist at " + core::CTimeUtils::toIso8601(snapshotTimestamp)}; @@ -1313,7 +1332,7 @@ bool CAnomalyJob::persistCopiedState(const std::string& description, // As the cache is cleared when the simple count detector is persisted this may seem // unnecessary at first, but there are occasions when the simple count detector does not exist, // e.g. when no data is seen but time is advanced. - core::CProgramCounters::CCacheManager cacheMgr; + core::CProgramCounters::CCacheManager const cacheMgr; // Persist state for each detector separately by streaming try { @@ -1334,8 +1353,9 @@ bool CAnomalyJob::persistCopiedState(const std::string& description, inserter.insertValue(VERSION_TAG, model::CAnomalyDetector::STATE_VERSION); inserter.insertLevel( INTERIM_BUCKET_CORRECTOR_TAG, - std::bind(&model::CInterimBucketCorrector::acceptPersistInserter, - &interimBucketCorrector, std::placeholders::_1)); + [ObjectPtr = &interimBucketCorrector](T && PH1) { + ObjectPtr->acceptPersistInserter(std::forward(PH1)); + }); for (const auto& detector_ : detectors) { const model::CAnomalyDetector* detector(detector_.second.get()); @@ -1351,15 +1371,18 @@ bool CAnomalyJob::persistCopiedState(const std::string& description, } inserter.insertLevel( TOP_LEVEL_DETECTOR_TAG, - std::bind(&CAnomalyJob::persistIndividualDetector, - std::cref(*detector), std::placeholders::_1)); + [capture0 = std::cref(*detector)](T && PH1) { + CAnomalyJob::persistIndividualDetector( + capture0, std::forward(PH1)); + }); LOG_DEBUG(<< "Persisted state for '" << detector->description() << "'"); } - inserter.insertLevel(RESULTS_AGGREGATOR_TAG, - std::bind(&model::CHierarchicalResultsAggregator::acceptPersistInserter, - &aggregator, std::placeholders::_1)); + inserter.insertLevel( + RESULTS_AGGREGATOR_TAG, [ObjectPtr = &aggregator](T && PH1) { + ObjectPtr->acceptPersistInserter(std::forward(PH1)); + }); core::CPersistUtils::persist(LATEST_RECORD_TIME_TAG, latestRecordTime, inserter); @@ -1375,7 +1398,7 @@ bool CAnomalyJob::persistCopiedState(const std::string& description, } if (m_PersistCompleteFunc) { - CModelSnapshotJsonWriter::SModelSnapshotReport modelSnapshotReport{ + CModelSnapshotJsonWriter::SModelSnapshotReport const modelSnapshotReport{ MODEL_SNAPSHOT_MIN_VERSION, snapshotTimestamp, description, snapshotId, compressor.numCompressedDocs(), modelSizeStats, normalizerState, latestRecordTime, @@ -1422,8 +1445,9 @@ bool CAnomalyJob::periodicPersistStateInForeground() { return false; } - if (m_PersistenceManager->addPersistFunc(std::bind( - &CAnomalyJob::runForegroundPersist, this, std::placeholders::_1)) == false) { + if (m_PersistenceManager->addPersistFunc([this](T && PH1) { + return runForegroundPersist(std::forward(PH1)); + }) == false) { LOG_ERROR(<< "Failed to add anomaly detector foreground persistence function"); return false; } @@ -1486,9 +1510,9 @@ void CAnomalyJob::outputResultsWithinRange(bool isInterim, core_t::TTime start, << "): Start time is later than end time."); return; } - core_t::TTime bucketLength = m_ModelConfig.bucketLength(); + core_t::TTime const bucketLength = m_ModelConfig.bucketLength(); core_t::TTime time = maths::common::CIntegerTools::floor(start, bucketLength); - core_t::TTime bucketEnd = maths::common::CIntegerTools::ceil(end, bucketLength); + core_t::TTime const bucketEnd = maths::common::CIntegerTools::ceil(end, bucketLength); while (time < bucketEnd) { if (isInterim) { this->outputInterimResults(time); @@ -1504,7 +1528,7 @@ void CAnomalyJob::generateModelPlot(core_t::TTime startTime, core_t::TTime endTime, const model::CAnomalyDetector& detector, TModelPlotDataVec& modelPlotData) { - double modelPlotBoundsPercentile(m_ModelConfig.modelPlotBoundsPercentile()); + double const modelPlotBoundsPercentile(m_ModelConfig.modelPlotBoundsPercentile()); if (modelPlotBoundsPercentile > 0.0) { LOG_TRACE(<< "Generating model debug data at " << startTime); detector.generateModelPlot(startTime, endTime, @@ -1528,7 +1552,7 @@ void CAnomalyJob::writeOutAnnotations(const TAnnotationVec& annotations) { } void CAnomalyJob::refreshMemoryAndReport() { - core_t::TTime bucketLength{m_ModelConfig.bucketLength()}; + core_t::TTime const bucketLength{m_ModelConfig.bucketLength()}; if (m_LastFinalisedBucketEndTime < bucketLength) { LOG_ERROR(<< "Cannot report memory usage because last finalized bucket end time (" << m_LastFinalisedBucketEndTime @@ -1552,13 +1576,15 @@ void CAnomalyJob::refreshMemoryAndReport() { void CAnomalyJob::persistIndividualDetector(const model::CAnomalyDetector& detector, core::CStatePersistInserter& inserter) { - inserter.insertLevel(KEY_TAG, std::bind(&model::CAnomalyDetector::keyAcceptPersistInserter, - &detector, std::placeholders::_1)); - inserter.insertLevel(PARTITION_FIELD_TAG, - std::bind(&model::CAnomalyDetector::partitionFieldAcceptPersistInserter, - &detector, std::placeholders::_1)); - inserter.insertLevel(DETECTOR_TAG, std::bind(&model::CAnomalyDetector::acceptPersistInserter, - &detector, std::placeholders::_1)); + inserter.insertLevel(KEY_TAG, [ObjectPtr = &detector](T && PH1) { + ObjectPtr->keyAcceptPersistInserter(std::forward(PH1)); + }); + inserter.insertLevel(PARTITION_FIELD_TAG, [ObjectPtr = &detector](T && PH1) { + ObjectPtr->partitionFieldAcceptPersistInserter(std::forward(PH1)); + }); + inserter.insertLevel(DETECTOR_TAG, [ObjectPtr = &detector](T && PH1) { + ObjectPtr->acceptPersistInserter(std::forward(PH1)); + }); } void CAnomalyJob::detectors(TAnomalyDetectorPtrVec& detectors) const { @@ -1572,10 +1598,10 @@ void CAnomalyJob::detectors(TAnomalyDetectorPtrVec& detectors) const { void CAnomalyJob::sortedDetectors(TKeyCRefAnomalyDetectorPtrPrVec& detectors) const { detectors.reserve(m_Detectors.size()); for (const auto& detector : m_Detectors) { - detectors.push_back(TKeyCRefAnomalyDetectorPtrPr( + detectors.emplace_back( model::CSearchKey::TStrCRefKeyCRefPr(std::cref(detector.first.first), std::cref(detector.first.second)), - detector.second)); + detector.second); } std::sort(detectors.begin(), detectors.end(), maths::common::COrderings::SFirstLess()); } @@ -1589,7 +1615,7 @@ CAnomalyJob::detectorForKey(bool isRestoring, core_t::TTime time, const model::CSearchKey& key, const std::string& partitionFieldValue, - model::CResourceMonitor& resourceMonitor) { + const model::CResourceMonitor& resourceMonitor) { // The simple count detector always lives in a special null partition. const std::string& partition = key.isSimpleCount() ? EMPTY_STRING : partitionFieldValue; @@ -1610,8 +1636,8 @@ CAnomalyJob::detectorForKey(bool isRestoring, << partition << '\'' << ", time " << time); LOG_TRACE(<< "Detector count " << m_Detectors.size()); - detector = this->makeDetector(m_ModelConfig, m_Limits, partition, time, - m_ModelConfig.factory(key)); + detector = ml::api::CAnomalyJob::makeDetector( + m_ModelConfig, m_Limits, partition, time, m_ModelConfig.factory(key)); if (detector == nullptr) { // This should never happen as CAnomalyDetectorUtils::makeDetector() // contracts to never return NULL @@ -1625,7 +1651,8 @@ CAnomalyJob::detectorForKey(bool isRestoring, m_Limits.resourceMonitor().forceRefresh(*detector); } return detector; - } else if (itr == m_Detectors.end()) { + } + if (itr == m_Detectors.end()) { LOG_TRACE(<< "No memory to create new detector for key '" << key.debug() << '/' << partition << '\''); return NULL_DETECTOR; @@ -1634,7 +1661,7 @@ CAnomalyJob::detectorForKey(bool isRestoring, return itr->second; } -void CAnomalyJob::pruneAllModels(std::size_t buckets) { +void CAnomalyJob::pruneAllModels(std::size_t buckets) const { if (buckets == 0) { LOG_INFO(<< "Pruning obsolete models"); } else { @@ -1684,20 +1711,20 @@ void CAnomalyJob::populateDetectorKeys(const CAnomalyJobConfig& jobConfig, TKeyV const std::string* CAnomalyJob::fieldValue(const std::string& fieldName, const TStrStrUMap& dataRowFields) { - TStrStrUMapCItr itr = fieldName.empty() ? dataRowFields.end() - : dataRowFields.find(fieldName); + TStrStrUMapCItr const itr = fieldName.empty() ? dataRowFields.end() + : dataRowFields.find(fieldName); const std::string& fieldValue(itr == dataRowFields.end() ? EMPTY_STRING : itr->second); return !fieldName.empty() && fieldValue.empty() ? nullptr : &fieldValue; } -void CAnomalyJob::addRecord(const TAnomalyDetectorPtr detector, +void CAnomalyJob::addRecord(const TAnomalyDetectorPtr& detector, core_t::TTime time, const TStrStrUMap& dataRowFields) { model::CAnomalyDetector::TStrCPtrVec fieldValues; const TStrVec& fieldNames = detector->fieldsOfInterest(); fieldValues.reserve(fieldNames.size()); - for (std::size_t i = 0; i < fieldNames.size(); ++i) { - fieldValues.push_back(fieldValue(fieldNames[i], dataRowFields)); + for (const auto& fieldName : fieldNames) { + fieldValues.push_back(fieldValue(fieldName, dataRowFields)); } detector->addRecord(time, fieldValues); @@ -1706,14 +1733,15 @@ void CAnomalyJob::addRecord(const TAnomalyDetectorPtr detector, CAnomalyJob::SBackgroundPersistArgs::SBackgroundPersistArgs( core_t::TTime time, const model::CResourceMonitor::SModelSizeStats& modelSizeStats, - const model::CInterimBucketCorrector& interimBucketCorrector, + model::CInterimBucketCorrector interimBucketCorrector, const model::CHierarchicalResultsAggregator& aggregator, core_t::TTime latestRecordTime, core_t::TTime lastResultsTime, core_t::TTime initialLastFinalisedBucketEndTime) : s_Time(time), s_ModelSizeStats(modelSizeStats), - s_InterimBucketCorrector(interimBucketCorrector), s_Aggregator(aggregator), - s_LatestRecordTime(latestRecordTime), s_LastResultsTime(lastResultsTime), + s_InterimBucketCorrector(std::move(interimBucketCorrector)), + s_Aggregator(aggregator), s_LatestRecordTime(latestRecordTime), + s_LastResultsTime(lastResultsTime), s_InitialLastFinalizedBucketEndTime(initialLastFinalisedBucketEndTime) { } } diff --git a/lib/maths/common/unittest/CKMeansOnlineTest.cc b/lib/maths/common/unittest/CKMeansOnlineTest.cc index c90e4a8f54..f9e366bb52 100644 --- a/lib/maths/common/unittest/CKMeansOnlineTest.cc +++ b/lib/maths/common/unittest/CKMeansOnlineTest.cc @@ -205,8 +205,8 @@ BOOST_AUTO_TEST_CASE(testDeduplicate) { 0.0); CKMeansOnlineForTest::deduplicate(points); BOOST_REQUIRE_EQUAL(1, points.size()); - BOOST_REQUIRE_EQUAL(TFloatVector2{0.0}, - maths::common::CBasicStatistics::mean(points[0].first)); + BOOST_ASSERT(TFloatVector2{0.0} == + maths::common::CBasicStatistics::mean(points[0].first)); BOOST_REQUIRE_EQUAL(4.0, maths::common::CBasicStatistics::count(points[0].first)); BOOST_REQUIRE_EQUAL(0.0, points[0].second); points.clear(); @@ -222,16 +222,16 @@ BOOST_AUTO_TEST_CASE(testDeduplicate) { 0.0); CKMeansOnlineForTest::deduplicate(points); BOOST_REQUIRE_EQUAL(3, points.size()); - BOOST_REQUIRE_EQUAL(TFloatVector2{0.0}, - maths::common::CBasicStatistics::mean(points[0].first)); + BOOST_ASSERT(TFloatVector2{0.0} == + maths::common::CBasicStatistics::mean(points[0].first)); BOOST_REQUIRE_EQUAL(1.0, maths::common::CBasicStatistics::count(points[0].first)); BOOST_REQUIRE_EQUAL(0.0, points[0].second); - BOOST_REQUIRE_EQUAL(TFloatVector2{1.0}, - maths::common::CBasicStatistics::mean(points[1].first)); + BOOST_ASSERT(TFloatVector2{1.0} == + maths::common::CBasicStatistics::mean(points[1].first)); BOOST_REQUIRE_EQUAL(2.0, maths::common::CBasicStatistics::count(points[1].first)); BOOST_REQUIRE_EQUAL(0.0, points[1].second); - BOOST_REQUIRE_EQUAL(TFloatVector2{2.0}, - maths::common::CBasicStatistics::mean(points[2].first)); + BOOST_ASSERT(TFloatVector2{2.0} == + maths::common::CBasicStatistics::mean(points[2].first)); BOOST_REQUIRE_EQUAL(1.0, maths::common::CBasicStatistics::count(points[2].first)); BOOST_REQUIRE_EQUAL(0.0, points[2].second); points.clear(); @@ -259,16 +259,16 @@ BOOST_AUTO_TEST_CASE(testDeduplicate) { CKMeansOnlineForTest::deduplicate(points); BOOST_REQUIRE_EQUAL(3, points.size()); - BOOST_REQUIRE_EQUAL(TFloatVector2{0.0}, - maths::common::CBasicStatistics::mean(points[0].first)); + BOOST_ASSERT(TFloatVector2{0.0} == + maths::common::CBasicStatistics::mean(points[0].first)); BOOST_REQUIRE_EQUAL(10.0, maths::common::CBasicStatistics::count(points[0].first)); - BOOST_REQUIRE_EQUAL(TFloatVector2{1.0}, - maths::common::CBasicStatistics::mean(points[1].first)); + BOOST_ASSERT(TFloatVector2{1.0} == + maths::common::CBasicStatistics::mean(points[1].first)); BOOST_REQUIRE_EQUAL(7.0, maths::common::CBasicStatistics::count(points[1].first)); - BOOST_REQUIRE_EQUAL(TFloatVector2{2.0}, - maths::common::CBasicStatistics::mean(points[2].first)); + BOOST_ASSERT(TFloatVector2{2.0} == + maths::common::CBasicStatistics::mean(points[2].first)); BOOST_REQUIRE_EQUAL(6.0, maths::common::CBasicStatistics::count(points[2].first)); diff --git a/lib/maths/time_series/unittest/CTimeSeriesDecompositionTest.cc b/lib/maths/time_series/unittest/CTimeSeriesDecompositionTest.cc index ec2b11579c..ee6c90bb21 100644 --- a/lib/maths/time_series/unittest/CTimeSeriesDecompositionTest.cc +++ b/lib/maths/time_series/unittest/CTimeSeriesDecompositionTest.cc @@ -252,9 +252,9 @@ class CComponentsTest : public CTestFixture { TSeasonalDecomposition::TPeriodDescriptor::E_Week}; TSeasonalDecomposition::TOptionalTime startOfWeekTime; TSeasonalDecomposition::TFloatMeanAccumulatorVec seasonalValues; - seasonalDecompositionComponents.add( - "Test component 2", secondSeasonalComponent, 0.0, - periodDescriptor, 0.0, 0.0, 1.0, startOfWeekTime, seasonalValues); + seasonalDecompositionComponents.add("Test component 2", secondSeasonalComponent, + 0, periodDescriptor, 0.0, 0.0, 1.0, + startOfWeekTime, seasonalValues); CConfigurableMemoryCircuitBreaker allocator{false};