diff --git a/common/workunit/workunit.cpp b/common/workunit/workunit.cpp index 0675947fb51..ca9d21dd6a2 100644 --- a/common/workunit/workunit.cpp +++ b/common/workunit/workunit.cpp @@ -188,9 +188,9 @@ CWuGraphStats::CWuGraphStats(StatisticCreatorType _creatorType, const char * _cr { StatsScopeId graphScopeId; verifyex(graphScopeId.setScopeText(_rootScope)); + StatsScopeId wfScopeId(SSTworkflow,wfid); - StatsScopeId rootScopeId(SSTworkflow,wfid); - collector.setown(createStatisticsGatherer(_creatorType, _creator, rootScopeId)); + collector.setown(createStatisticsGatherer(_creatorType, _creator, wfScopeId)); collector->beginScope(graphScopeId); } @@ -2659,83 +2659,67 @@ cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope, bool exclu } } -//aggregate disk costs from top-level subgraphs (when scope specified) or workflows (scope not specified) -cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope) +void StatisticsAggregator::loadExistingAggregates(const IConstWorkUnit &workunit) { - WuScopeFilter filter; - if (!isEmptyString(scope)) - filter.addScope(scope); - else - filter.addScope(""); // Needed to match scope - // when scope is a workflow, sum graph costs (or subgraph cost when no graph cost) to get workflow cost - // (Costs from child graphs and activities should have been summed up to graph/subgraph level already) - // when isEmptyString(scope), sum workflow costs (or graph cost when no workflow cost) to get global cost - // (Costs from all levels below graph should be summed upto at least graph level already) - // i.e. need 2 levels of nesting - filter.setIncludeNesting(2); - // includeNesting(2) needs just source "global". However, WuScopeFilter is incorrectly inferring the source as "global,stats", - // causing too many of the stats to be pulled in and inefficiency. Here, explicitly set source to "global" - filter.addSource("global"); - filter.addOutputStatistic(StCostFileAccess); - filter.addRequiredStat(StCostFileAccess); - filter.finishedFilter(); - Owned it = &wu->getScopeIterator(filter); - cost_type totalCost = 0; - for (it->first(); it->isValid(); ) + StatsScopeId globalScopeId(SSTglobal, (unsigned)0); + statsCollection.setown(createStatisticCollection(globalScopeId)); + + class StatsCollectionAggregatesLoader : public IWuScopeVisitor { - cost_type value = 0; - if (it->getStat(StCostFileAccess, value)) - { - totalCost += value; - it->nextSibling(); - } - else + public: + StatsCollectionAggregatesLoader(IStatisticCollection * _statsCollection) : statsCollection(_statsCollection) {} + + virtual void noteStatistic(StatisticKind kind, unsigned __int64 value, IConstWUStatistic & extra) override { - it->next(); + statsCollection->setStatistic(extra.queryScope(), kind, value); } - } - return totalCost; -} + virtual void noteAttribute(WuAttr attr, const char * value) override { throwUnexpected(); } + virtual void noteHint(const char * kind, const char * value) override { throwUnexpected(); } + virtual void noteException(IConstWUException & exception) override { throwUnexpected(); } + private: + Linked statsCollection; + }; -void gatherSpillSize(const IConstWorkUnit * wu, const char *scope, stat_type & peakSizeSpill) -{ WuScopeFilter filter; - if (!isEmptyString(scope)) - filter.addScope(scope); - else - { - filter.addScope(""); - filter.addSource("global"); - } - filter.setIncludeNesting(1); - filter.addOutputStatistic(StSizeGraphSpill); - filter.addRequiredStat(StSizeGraphSpill); + filter.addScopeType(SSTglobal).addScopeType(SSTworkflow).addScopeType(SSTgraph); + const unsigned numStats = mapping.numStatistics(); + for (unsigned i=0; i it = &wu->getScopeIterator(filter); - peakSizeSpill = 0; - for (it->first(); it->isValid(); ) - { - stat_type value = 0; - if (it->getStat(StSizeGraphSpill, value)) - { - if (value>peakSizeSpill) - peakSizeSpill = value; - it->nextSibling(); - } - else - { - it->next(); - } - } + + StatsCollectionAggregatesLoader aggregatesLoader(statsCollection); + Owned iter = &workunit.getScopeIterator(filter); + ForEach(*iter) + iter->playProperties(aggregatesLoader); } -void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scopeType) +// Replace the stats at the specified scope level +void StatisticsAggregator::recordStats(IStatisticCollection * sourceStats, unsigned wfid, const char * graphName, unsigned sgId) { - stat_type peakSizeSpill = 0; - gatherSpillSize(wu, scope, peakSizeSpill); - if (peakSizeSpill) - wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scope, StSizeGraphSpill, nullptr, peakSizeSpill, 1, 0, StatsMergeMax); + StatsScopeId graphScopeId; + verifyex(graphScopeId.setScopeText(graphName)); + StatsScopeId wfScopeId(SSTworkflow, wfid); + StatsScopeId sgScopeId(SSTsubgraph, sgId); + statsCollection->recordStats(mapping, sourceStats, {wfScopeId, graphScopeId, sgScopeId}); +} + +// Recalculate aggregates and then write the aggregates to global stats (dali) +void StatisticsAggregator::updateAggregates(IWorkUnit *wu) +{ + if (!statsCollection) + return; + + AggregateUpdatedCallBackFunc f = [&](const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value) + { + wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), sst, scope, kind, nullptr, value, 1, 0, StatsMergeReplace); + }; + + statsCollection->refreshAggregates(mapping, f); } + //--------------------------------------------------------------------------------------------------------------------- diff --git a/common/workunit/workunit.hpp b/common/workunit/workunit.hpp index 02543eb5eba..05eab985f42 100644 --- a/common/workunit/workunit.hpp +++ b/common/workunit/workunit.hpp @@ -1174,7 +1174,6 @@ interface IConstWUScopeIterator : extends IScmIterator }; //--------------------------------------------------------------------------------------------------------------------- - //! IWorkUnit //! Provides high level access to WorkUnit "header" data. interface IWorkUnit; @@ -1726,8 +1725,6 @@ extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, ITimeReporter *ti extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, StatisticScopeType scopeType, StatisticKind kind, ITimeReporter *timer); extern WORKUNIT_API void aggregateStatistic(StatsAggregation & result, IConstWorkUnit * wu, const WuScopeFilter & filter, StatisticKind search); extern WORKUNIT_API cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope=nullptr, bool excludehThor=false); -extern WORKUNIT_API cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope); -extern WORKUNIT_API void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scopeType); extern WORKUNIT_API const char *getTargetClusterComponentName(const char *clustname, const char *processType, StringBuffer &name); extern WORKUNIT_API void descheduleWorkunit(char const * wuid); #if 0 @@ -1786,4 +1783,17 @@ extern WORKUNIT_API TraceFlags loadTraceFlags(IConstWorkUnit * wu, const std::in extern WORKUNIT_API bool executeGraphOnLingeringThor(IConstWorkUnit &workunit, unsigned wfid, const char *graphName); + +class WORKUNIT_API StatisticsAggregator : public CInterface +{ +public: + StatisticsAggregator(const StatisticsMapping & _mapping) : mapping(_mapping) {} + void loadExistingAggregates(const IConstWorkUnit &workunit); + void recordStats(IStatisticCollection * sourceStats, unsigned wfid, const char * graphName, unsigned graphId); + void updateAggregates(IWorkUnit *wu); +private: + Owned statsCollection; + const StatisticsMapping & mapping; +}; + #endif diff --git a/ecl/eclagent/agentctx.hpp b/ecl/eclagent/agentctx.hpp index a016679fecf..73444006563 100644 --- a/ecl/eclagent/agentctx.hpp +++ b/ecl/eclagent/agentctx.hpp @@ -124,6 +124,8 @@ struct IAgentContext : extends IGlobalCodeContext virtual bool forceNewDiskReadActivity() const = 0; virtual void addWuExceptionEx(const char * text, unsigned code, unsigned severity, unsigned audience, char const * source) = 0; virtual double queryAgentMachineCost() const = 0; + virtual void updateAggregates(IWorkUnit* lockedwu) = 0; + virtual void mergeAggregatorStats(IStatisticCollection & stats, unsigned wfid, const char *graphname, unsigned sgId) = 0; }; #endif // AGENTCTX_HPP_INCL diff --git a/ecl/eclagent/eclagent.cpp b/ecl/eclagent/eclagent.cpp index d8ea4e78835..1f83ea2b440 100644 --- a/ecl/eclagent/eclagent.cpp +++ b/ecl/eclagent/eclagent.cpp @@ -509,7 +509,7 @@ class EclAgentPluginCtx : public SimplePluginCtx //======================================================================================= EclAgent::EclAgent(IConstWorkUnit *wu, const char *_wuid, bool _checkVersion, bool _resetWorkflow, bool _noRetry, char const * _logname, const char *_allowedPipeProgs, IPropertyTree *_queryXML, ILogMsgHandler * _logMsgHandler) - : wuRead(wu), wuid(_wuid), checkVersion(_checkVersion), resetWorkflow(_resetWorkflow), noRetry(_noRetry), allowedPipeProgs(_allowedPipeProgs), logMsgHandler(_logMsgHandler) + : wuRead(wu), wuid(_wuid), checkVersion(_checkVersion), resetWorkflow(_resetWorkflow), noRetry(_noRetry), allowedPipeProgs(_allowedPipeProgs), logMsgHandler(_logMsgHandler), statsAggregator(stdAggregateKindStatistics) { isAborting = false; isStandAloneExe = false; @@ -1988,10 +1988,6 @@ void EclAgent::doProcess() const cost_type cost = aggregateCost(w, nullptr, false); if (cost) w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - const cost_type diskAccessCost = aggregateDiskAccessCost(w, nullptr); - if (diskAccessCost) - w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostFileAccess, NULL, diskAccessCost, 1, 0, StatsMergeReplace); - updateSpillSize(w, nullptr, SSTglobal); addTimings(w); switch (w->getState()) @@ -2245,12 +2241,15 @@ void EclAgent::runProcess(IEclProcess *process) ForEachItemIn(i2, queryLibraries) queryLibraries.item(i2).destroyGraph(); - if (rowManager) { WorkunitUpdate wu = updateWorkUnit(); - WuStatisticTarget statsTarget(wu, "eclagent"); - rowManager->reportPeakStatistics(statsTarget, 0); - rowManager->getMemoryUsage();//Causes statistics to be written to logfile + updateAggregates(wu); + if (rowManager) + { + WuStatisticTarget statsTarget(wu, "eclagent"); + rowManager->reportPeakStatistics(statsTarget, 0); + rowManager->getMemoryUsage();//Causes statistics to be written to logfile + } } rowManager.clear(); // Must go before the allocatorCache @@ -2513,10 +2512,6 @@ void EclAgentWorkflowMachine::noteTiming(unsigned wfid, timestamp_type startTime const cost_type cost = money2cost_type(calcCost(agent.queryAgentMachineCost(), nanoToMilli(elapsedNs))) + aggregateCost(wu, scope, true); if (cost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - const cost_type diskAccessCost = aggregateDiskAccessCost(wu, scope); - if (diskAccessCost) - wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostFileAccess, NULL, diskAccessCost, 1, 0, StatsMergeReplace); - updateSpillSize(wu, scope, SSTworkflow); } void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item) diff --git a/ecl/eclagent/eclagent.ipp b/ecl/eclagent/eclagent.ipp index 10c86cc6ebb..3aafca17a3e 100644 --- a/ecl/eclagent/eclagent.ipp +++ b/ecl/eclagent/eclagent.ipp @@ -249,7 +249,15 @@ public: virtual double queryAgentMachineCost() const override { return ctx->queryAgentMachineCost(); - }; + } + virtual void updateAggregates(IWorkUnit* lockedwu) override + { + ctx->updateAggregates(lockedwu); + } + virtual void mergeAggregatorStats(IStatisticCollection & stats, unsigned wfid, const char *graphname, unsigned sgId) override + { + ctx->mergeAggregatorStats(stats, wfid, graphname, sgId); + } protected: IAgentContext * ctx; @@ -392,6 +400,7 @@ private: Owned outputSerializer; int retcode; double agentMachineCost = 0; + StatisticsAggregator statsAggregator; private: void doSetResultString(type_t type, const char * stepname, unsigned sequence, int len, const char *val); @@ -705,6 +714,14 @@ public: { return agentMachineCost; } + virtual void updateAggregates(IWorkUnit* lockedwu) override + { + statsAggregator.updateAggregates(lockedwu); + } + virtual void mergeAggregatorStats(IStatisticCollection & stats, unsigned wfid, const char *graphname, unsigned sgId) override + { + statsAggregator.recordStats(&stats, wfid, graphname, sgId); + } }; //--------------------------------------------------------------------------- @@ -1055,7 +1072,7 @@ public: void executeLibrary(const byte * parentExtract, IHThorGraphResults * results); IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph); void updateWUStatistic(IWorkUnit* lockedwu, StatisticScopeType scopeType, const char* scope, StatisticKind kind, const char* descr, long long unsigned int value); - + void updateAggregates(IWorkUnit* lockedwu); EclSubGraph * idToGraph(unsigned id); EclGraphElement * idToActivity(unsigned id); const char *queryGraphName() { return graphName; } diff --git a/ecl/eclagent/eclgraph.cpp b/ecl/eclagent/eclgraph.cpp index d4ffdd99c42..e1de571cecd 100644 --- a/ecl/eclagent/eclgraph.cpp +++ b/ecl/eclagent/eclgraph.cpp @@ -879,10 +879,12 @@ void EclSubGraph::updateProgress() Owned progress = parent.updateStats(queryStatisticsComponentType(), queryStatisticsComponentName(), parent.queryWfid(), id); IStatisticGatherer & stats = progress->queryStatsBuilder(); updateProgress(stats); - + Owned statsCollection = stats.getResult(); + agent->mergeAggregatorStats(*statsCollection, parent.queryWfid(), parent.queryGraphName(), id); if (startGraphTime || elapsedGraphCycles) { WorkunitUpdate lockedwu(agent->updateWorkUnit()); + agent->updateAggregates(lockedwu); StringBuffer subgraphid; subgraphid.append(parent.queryGraphName()).append(":").append(SubGraphScopePrefix).append(id); if (startGraphTime) @@ -897,10 +899,6 @@ void EclSubGraph::updateProgress() if (cost) lockedwu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); } - Owned statsCollection = stats.getResult(); - const cost_type costDiskAccess = aggregateStatistic(StCostFileAccess, statsCollection) ; - if (costDiskAccess) - lockedwu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, scope, StCostFileAccess, NULL, costDiskAccess, 1, 0, StatsMergeReplace); } } } @@ -927,6 +925,11 @@ void EclSubGraph::updateProgress(IStatisticGatherer &progress) } ForEachItemIn(i2, subgraphs) subgraphs.item(i2).updateProgress(progress); + + Owned statsCollection = progress.getResult(); + const cost_type costDiskAccess = statsCollection->aggregateStatistic(StCostFileAccess); + if (costDiskAccess) + progress.addStatistic(StCostFileAccess, costDiskAccess); } bool EclSubGraph::prepare(const byte * parentExtract, bool checkDependencies) @@ -1277,10 +1280,6 @@ void EclGraph::execute(const byte * parentExtract) const cost_type cost = money2cost_type(calcCost(agent->queryAgentMachineCost(), elapsed)); if (cost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - - const cost_type costDiskAccess = aggregateDiskAccessCost(wu, scope); - if (costDiskAccess) - wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostFileAccess, NULL, costDiskAccess, 1, 0, StatsMergeReplace); } if (agent->queryRemoteWorkunit()) @@ -1349,8 +1348,12 @@ void EclGraph::updateLibraryProgress() { EclSubGraph & cur = graphs.item(idx); unsigned wfid = cur.parent.queryWfid(); - Owned progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false); - cur.updateProgress(progress->queryStatsBuilder()); + + Owned progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false); + IStatisticGatherer & stats = progress->queryStatsBuilder(); + cur.updateProgress(stats); + Owned statsCollection = stats.getResult(); + agent->mergeAggregatorStats(*statsCollection, wfid, queryGraphName(), cur.id); } } @@ -1492,7 +1495,7 @@ void GraphResults::setResult(unsigned id, IHThorGraphResult * result) IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, unsigned subgraph) { - return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false); + return wu->updateStats(queryGraphName(), creatorType, creator, activeWfid, subgraph, false); } void EclGraph::updateWUStatistic(IWorkUnit *lockedwu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * descr, unsigned __int64 value) @@ -1544,6 +1547,7 @@ EclGraph * EclAgent::loadGraph(const char * graphName, IConstWorkUnit * wu, ILoa Owned eclGraph = new EclGraph(*this, graphName, wu, isLibrary, debugContext, probeManager, wuGraph->getWfid()); eclGraph->createFromXGMML(dll, xgmml); + statsAggregator.loadExistingAggregates(*wu); return eclGraph.getClear(); } diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index 4dafcc75ed2..6b694eb46c5 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -1331,6 +1331,7 @@ const StatisticsMapping diskLocalStatistics({StCycleDiskReadIOCycles, StSizeDisk const StatisticsMapping diskRemoteStatistics({StTimeDiskReadIO, StSizeDiskRead, StNumDiskReads, StTimeDiskWriteIO, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries}); const StatisticsMapping diskReadRemoteStatistics({StTimeDiskReadIO, StSizeDiskRead, StNumDiskReads, StNumDiskRetries, StCycleDiskReadIOCycles}); const StatisticsMapping diskWriteRemoteStatistics({StTimeDiskWriteIO, StSizeDiskWrite, StNumDiskWrites, StNumDiskRetries, StCycleDiskWriteIOCycles}); +const StatisticsMapping stdAggregateKindStatistics({StCostExecute, StCostFileAccess, StSizeGraphSpill, StSizeSpillFile}); const StatisticsMapping * queryStatsMapping(const StatsScopeId & scope, unsigned hashcode) { @@ -1430,6 +1431,8 @@ StringBuffer & StatsScopeId::getScopeText(StringBuffer & out) const return out.append(ChannelScopePrefix).append(id); case SSTunknown: return out.append(name); + case SSTglobal: + return out; default: #ifdef _DEBUG throwUnexpected(); @@ -1783,11 +1786,78 @@ class SortedCollectionIterator : public ArrayIIteratorOf { friend class CollectionHashTable; + + CStatisticCollection * ensureSubScopePath(std::initializer_list path) + { + CStatisticCollection * curScope = this; + for (const auto & scopeItem: path) + curScope = curScope->ensureSubScope(scopeItem, true); // n.b. this will always return a valid pointer + return curScope; + } + void markDirty() + { + isDirty=true; + if (parent) parent->markDirty(); + } + void refreshAggregates(CRuntimeStatisticCollection & parentTotals, AggregateUpdatedCallBackFunc & fWhenAggregateUpdated) + { + const StatisticsMapping & mapping = parentTotals.queryMapping(); + // if this scope is not dirty, the aggregates are accurate at this level so return totals (no need to descend) + // Also there is no stats below sg scope, so no need to descend + if (isDirty==false || id.queryScopeType()==SSTsubgraph) + { + // return the aggregates at this scope level + ForEachItemIn(i, stats) + { + Statistic & stat = stats.element(i); + StatisticKind kind = stat.queryKind(); + if (queryStatsVariant(kind) != 0) + continue; // ignore variants (shouldn't happen as the mapping ensure only the aggregator kinds are present) + if (mapping.hasKind(kind)) + { + // Totals required from this level by parent, even if they are not dirty + parentTotals.mergeStatistic(kind, stat.queryValue()); + } + } + isDirty=false; + return; + } + else + { + // descend down to lower level to obtain totals required for aggregation and then aggregate + CRuntimeStatisticCollection childTotals(mapping); + for (auto & child : children) + { + child.refreshAggregates(childTotals, fWhenAggregateUpdated); + } + // 1) Set any values that has changed for this scope and 2) update ALL totals for parent + const unsigned numStats = mapping.numStatistics(); + for (unsigned i=0; i children.add(*next); } } - virtual byte getCollectionType() const { return SCintermediate; } - - //interface IStatisticCollection: virtual StringBuffer &toXML(StringBuffer &out) const override; virtual StatisticScopeType queryScopeType() const override @@ -1835,7 +1902,8 @@ class CStatisticCollection : public CInterfaceOf if (parent) { parent->getFullScope(str); - str.append(':'); + if (!str.isEmpty()) + str.append(':'); } id.getScopeText(str); return str; @@ -1881,7 +1949,6 @@ class CStatisticCollection : public CInterfaceOf return *hashIter.getClear(); return * new SortedCollectionIterator(*hashIter); } - virtual void getMinMaxScope(IStringVal & minValue, IStringVal & maxValue, StatisticScopeType searchScopeType) const override { if (id.queryScopeType() == searchScopeType) @@ -1899,7 +1966,6 @@ class CStatisticCollection : public CInterfaceOf for (auto & curChild : children) curChild.getMinMaxScope(minValue, maxValue, searchScopeType); } - virtual void getMinMaxActivity(unsigned & minValue, unsigned & maxValue) const override { unsigned activityId = id.queryActivity(); @@ -1915,6 +1981,25 @@ class CStatisticCollection : public CInterfaceOf for (iter.first(); iter.isValid(); iter.next()) iter.query().getMinMaxActivity(minValue, maxValue); } + virtual bool setStatistic(const char *scope, StatisticKind kind, unsigned __int64 value) override + { + if (*scope=='\0') + { + return updateStatistic(kind, value, StatsMergeReplace); + } + else + { + StatsScopeId childScopeId; + const char * next; + if (!childScopeId.setScopeText(scope, &next) || (*next!=':' && *next!='\0')) + throw makeStringExceptionV(JLIBERR_UnexpectedValue, "'%s' does not appear to be a valid scope id", scope); + CStatisticCollection * child = ensureSubScope(childScopeId, true); + + if (*next==':') + next++; + return child->setStatistic(next, kind, value); + } + } //other public interface functions void addStatistic(StatisticKind kind, unsigned __int64 value) @@ -1922,8 +2007,7 @@ class CStatisticCollection : public CInterfaceOf Statistic s(kind, value); stats.append(s); } - - void updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) + bool updateStatistic(StatisticKind kind, unsigned __int64 value, StatsMergeAction mergeAction) { if (mergeAction != StatsMergeAppend) { @@ -1932,15 +2016,20 @@ class CStatisticCollection : public CInterfaceOf Statistic & cur = stats.element(i); if (cur.kind == kind) { + if (mergeAction==StatsMergeReplace) + { + if (cur.value==value) + return false; + } cur.value = mergeStatisticValue(cur.value, value, mergeAction); - return; + return true; } } } Statistic s(kind, value); stats.append(s); + return true; } - CStatisticCollection * ensureSubScope(const StatsScopeId & search, bool hasChildren) { //Once the CStatisticCollection is created it should not be replaced - so that returned pointers remain valid. @@ -1952,8 +2041,7 @@ class CStatisticCollection : public CInterfaceOf children.add(*ret); return ret; } - - virtual void serialize(MemoryBuffer & out) const + virtual void serialize(MemoryBuffer & out) const override { out.append(getCollectionType()); id.serialize(out); @@ -1967,9 +2055,7 @@ class CStatisticCollection : public CInterfaceOf for (iter.first(); iter.isValid(); iter.next()) iter.query().serialize(out); } - inline const StatsScopeId & queryScopeId() const { return id; } - virtual void mergeInto(IStatisticGatherer & target) const { StatsOptScope block(target, id); @@ -1979,7 +2065,6 @@ class CStatisticCollection : public CInterfaceOf for (auto const & cur : children) cur.mergeInto(target); } - virtual void visit(IStatisticVisitor & visitor) const { if (visitor.visitScope(*this)) @@ -1988,19 +2073,71 @@ class CStatisticCollection : public CInterfaceOf cur.visit(visitor); } } - virtual void visitChildren(IStatisticVisitor & visitor) const { for (auto const & cur : children) cur.visit(visitor); } + virtual void refreshAggregates(const StatisticsMapping & mapping, AggregateUpdatedCallBackFunc & fWhenAggregateUpdated) override + { + if (isDirty) + { + CRuntimeStatisticCollection totals(mapping); + refreshAggregates(totals, fWhenAggregateUpdated); + } + } + virtual stat_type aggregateStatistic(StatisticKind kind) const override + { + stat_type sum = 0; + if (!getStatistic(kind, sum)) // get sum of statistics at this level + { + // if no stats at this level, then get sum of stats from children + for (auto & child : children) + sum += child.aggregateStatistic(kind); + } + return sum; + } + virtual void recordStats(const StatisticsMapping & mapping, IStatisticCollection * sourceStatsCollection, std::initializer_list path) override + { + CStatisticCollection * curSrcCollection = static_cast(sourceStatsCollection); + const StatsScopeId * scopeItem = path.begin(); + // n.b. sourceStatsCollection has workflow as root but this collection has global as root + // Locate the collection with the stats and make curSrcCollection point to that + if (!curSrcCollection || curSrcCollection->queryScopeId().compare(*scopeItem)!=0) + return; // Required path doesn't exist in source collection so nothing more to do here + ++scopeItem; + while (scopeItem!=path.end()) + { + curSrcCollection = curSrcCollection->children.find(scopeItem); + if (!curSrcCollection) + return; // Required path doesn't exist in source collection so nothing more to do here + ++scopeItem; + } + CStatisticCollection * tgtScopeCollection = ensureSubScopePath(path); + bool wasUpdated = false; + // More efficient to iterate over stats rather than mapping... + ForEachItemIn(i, curSrcCollection->stats) + { + Statistic & cur = curSrcCollection->stats.element(i); + if (queryStatsVariant(cur.kind) != 0) + continue; // ignore variants + if (mapping.hasKind(cur.kind)) + { + if (tgtScopeCollection->updateStatistic(cur.kind, cur.value, StatsMergeReplace)) + wasUpdated=true; + } + } + if (wasUpdated) + tgtScopeCollection->markDirty(); + } private: StatsScopeId id; CStatisticCollection * parent; protected: CollectionHashTable children; StatsArray stats; + bool isDirty = false; // used to track which scope has changed (used to workout what aggregates to recalculate) }; StringBuffer &CStatisticCollection::toXML(StringBuffer &out) const @@ -2114,52 +2251,6 @@ class CRootStatisticCollection : public CStatisticCollection unsigned __int64 whenCreated; }; - -class StatAggregator : implements IStatisticVisitor -{ -public: - StatAggregator(StatisticKind _kind) : kind(_kind) {} - - virtual bool visitScope(const IStatisticCollection & cur) - { - switch (cur.queryScopeType()) - { - //If there is a match for the stat in any of these containers, then avoid summing any child scopes - case SSTglobal: - case SSTgraph: - case SSTsubgraph: - case SSTsection: - case SSTchildgraph: - case SSTworkflow: - { - stat_type value; - if (cur.getStatistic(kind, value)) - { - total += value; - return false; - } - return true; - } - //Default is to sum the value for this scope and children => recurse. E.g. activity and any child activities. - default: - total += cur.queryStatistic(kind); - return true; - } - } - stat_type getTotal() const { return total; } -private: - stat_type total = 0; - StatisticKind kind; -}; - - -stat_type aggregateStatistic(StatisticKind kind, IStatisticCollection * statsCollection) -{ - StatAggregator aggregator(kind); - statsCollection->visit(aggregator); - return aggregator.getTotal(); -} - //--------------------------------------------------------------------------------------------------------------------- void serializeStatisticCollection(MemoryBuffer & out, IStatisticCollection * collection) @@ -2191,6 +2282,11 @@ IStatisticCollection * createStatisticCollection(MemoryBuffer & in) return deserializeCollection(NULL, in, version); } +IStatisticCollection * createStatisticCollection(const StatsScopeId & scopeId) +{ + return new CStatisticCollection(nullptr, scopeId); +} + //-------------------------------------------------------------------------------------------------------------------- @@ -2610,7 +2706,7 @@ StringBuffer & CRuntimeStatisticCollection::toStr(StringBuffer &str) const unsigned __int64 rawValue = getStatisticValue(rawKind); if (rawValue) value += convertMeasure(rawKind, kind, rawValue); - } + } if (value) { const char * name = queryStatisticName(serialKind); diff --git a/system/jlib/jstats.h b/system/jlib/jstats.h index c86d18c1937..cacdb23684e 100644 --- a/system/jlib/jstats.h +++ b/system/jlib/jstats.h @@ -104,6 +104,9 @@ interface IStatisticCollectionIterator; interface IStatisticGatherer; interface IStatisticVisitor; +class jlib_decl StatisticsMapping; +typedef std::function AggregateUpdatedCallBackFunc; + interface IStatisticCollection : public IInterface { public: @@ -117,12 +120,16 @@ interface IStatisticCollection : public IInterface virtual IStatisticCollectionIterator & getScopes(const char * filter, bool sorted) = 0; virtual void getMinMaxScope(IStringVal & minValue, IStringVal & maxValue, StatisticScopeType searchScopeType) const = 0; virtual void getMinMaxActivity(unsigned & minValue, unsigned & maxValue) const = 0; + virtual bool setStatistic(const char *scope, StatisticKind kind, unsigned __int64 value) = 0; virtual void serialize(MemoryBuffer & out) const = 0; virtual unsigned __int64 queryWhenCreated() const = 0; virtual void mergeInto(IStatisticGatherer & target) const = 0; virtual StringBuffer &toXML(StringBuffer &out) const = 0; virtual void visit(IStatisticVisitor & target) const = 0; virtual void visitChildren(IStatisticVisitor & target) const = 0; + virtual void refreshAggregates(const StatisticsMapping & mapping, AggregateUpdatedCallBackFunc & fWhenAggregateUpdated) = 0; + virtual stat_type aggregateStatistic(StatisticKind kind) const = 0; + virtual void recordStats(const StatisticsMapping & mapping, IStatisticCollection * statsCollection, std::initializer_list path) = 0; }; interface IStatisticCollectionIterator : public IIteratorOf @@ -456,6 +463,10 @@ class jlib_decl StatisticsMapping dbgassertex(kind >= StKindNone && kind < StMax); return kindToIndex.item(kind); } + inline bool hasKind(StatisticKind kind) const + { + return kindToIndex.item(kind) != numStatistics(); + } inline StatisticKind getKind(unsigned index) const { return (StatisticKind)indexToKind.item(index); } inline unsigned numStatistics() const { return indexToKind.ordinality(); } inline unsigned getUniqueHash() const { return hashcode; } @@ -493,6 +504,7 @@ extern const jlib_decl StatisticsMapping diskRemoteStatistics; extern const jlib_decl StatisticsMapping diskReadRemoteStatistics; extern const jlib_decl StatisticsMapping diskWriteRemoteStatistics; extern const jlib_decl StatisticsMapping jhtreeCacheStatistics; +extern const jlib_decl StatisticsMapping stdAggregateKindStatistics; //--------------------------------------------------------------------------------------------------------------------- @@ -886,6 +898,7 @@ extern jlib_decl IStatisticGatherer * createStatisticsGatherer(StatisticCreatorT extern jlib_decl void serializeStatisticCollection(MemoryBuffer & out, IStatisticCollection * collection); extern jlib_decl IStatisticCollection * createStatisticCollection(MemoryBuffer & in); +extern jlib_decl IStatisticCollection * createStatisticCollection(const StatsScopeId & scopeId); inline unsigned __int64 milliToNano(unsigned __int64 value) { return value * 1000000; } // call avoids need to upcast values inline unsigned __int64 nanoToMilli(unsigned __int64 value) { return value / 1000000; } @@ -942,6 +955,5 @@ class jlib_decl RuntimeStatisticTarget : implements IStatisticTarget }; extern jlib_decl StringBuffer & formatMoney(StringBuffer &out, unsigned __int64 value); -extern jlib_decl stat_type aggregateStatistic(StatisticKind kind, IStatisticCollection * statsCollection); #endif diff --git a/thorlcr/graph/thgraphmaster.cpp b/thorlcr/graph/thgraphmaster.cpp index 678ea25a8a6..9230dbe112d 100644 --- a/thorlcr/graph/thgraphmaster.cpp +++ b/thorlcr/graph/thgraphmaster.cpp @@ -2902,9 +2902,11 @@ bool CMasterGraph::deserializeStats(unsigned node, MemoryBuffer &mb) void CMasterGraph::getStats(IStatisticGatherer &stats) { stats.addStatistic(StNumSlaves, queryClusterWidth()); + cost_type costDiskAccess = getDiskAccessCost(); + if (costDiskAccess) + stats.addStatistic(StCostFileAccess, costDiskAccess); // graph specific stats - graphStats.getStats(stats); Owned iter; diff --git a/thorlcr/master/thdemonserver.cpp b/thorlcr/master/thdemonserver.cpp index 5de19dced0b..5b4094c3315 100644 --- a/thorlcr/master/thdemonserver.cpp +++ b/thorlcr/master/thdemonserver.cpp @@ -43,6 +43,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer unsigned numberOfMachines = 0; cost_type costLimit = 0; cost_type workunitCost = 0; + StatisticsAggregator statsAggregator; void doReportGraph(IStatisticGatherer & stats, CGraphBase *graph) { @@ -91,16 +92,13 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer if (costLimit || finished) { const cost_type sgCost = money2cost_type(calcCost(thorManagerRate, duration) + calcCost(thorWorkerRate, duration) * numberOfMachines); - cost_type costDiskAccess = graph.getDiskAccessCost(); if (finished) { if (sgCost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, graphScope, StCostExecute, NULL, sgCost, 1, 0, StatsMergeReplace); - if (costDiskAccess) - wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, graphScope, StCostFileAccess, NULL, costDiskAccess, 1, 0, StatsMergeReplace); } - const cost_type totalCost = workunitCost + sgCost + costDiskAccess; + const cost_type totalCost = workunitCost + sgCost + graph.getDiskAccessCost(); if (costLimit>0 && totalCost > costLimit) { LOG(MCwarning, thorJob, "ABORT job cost exceeds limit"); @@ -131,7 +129,15 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer queryServerStatus().queryProperties()->setPropInt("@sg_duration", (duration+59999)/60000); // round it up } } - + void updateGraphStats(IConstWorkUnit ¤tWU, const char *graphName, unsigned wfid, CGraphBase & graph) + { + Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph.queryGraphId(), false); + IStatisticGatherer & statsBuilder = stats->queryStatsBuilder(); + reportGraph(statsBuilder, &graph); + // Merge only the stats at the specified scope level + Owned statsCollection = statsBuilder.getResult(); + statsAggregator.recordStats(statsCollection, wfid, graphName, graph.queryGraphId()); + } void reportActiveGraphs(bool finished, bool success=true) { if (activeGraphs.ordinality()) @@ -145,8 +151,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer ForEachItemIn (g, activeGraphs) { CGraphBase &graph = activeGraphs.item(g); - Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph.queryGraphId(), false); - reportGraph(stats->queryStatsBuilder(), &graph); + updateGraphStats(currentWU, graphName, wfid, graph); } Owned wu = ¤tWU.lock(); ForEachItemIn (g2, activeGraphs) @@ -155,6 +160,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer unsigned startTime = graphStarts.item(g2); reportStatus(wu, graph, startTime, finished, success); } + updateAggregates(wu); queryServerStatus().commitProperties(); } catch (IException *E) @@ -172,10 +178,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer IConstWorkUnit ¤tWU = graph->queryJob().queryWorkUnit(); const char *graphName = ((CJobMaster &)activeGraphs.item(0).queryJob()).queryGraphName(); unsigned wfid = graph->queryJob().getWfid(); - { - Owned stats = currentWU.updateStats(graphName, SCTthor, queryStatisticsComponentName(), wfid, graph->queryGraphId(), false); - reportGraph(stats->queryStatsBuilder(), graph); - } + updateGraphStats(currentWU, graphName, wfid, *graph); Owned wu = ¤tWU.lock(); if (startTimeStamp) @@ -186,7 +189,6 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, graphScope, StWhenStarted, NULL, getTimeStampNowValue(), 1, 0, StatsMergeAppend); } reportStatus(wu, *graph, startTime, finished, success); - queryServerStatus().commitProperties(); } catch (IException *e) @@ -199,7 +201,7 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer public: IMPLEMENT_IINTERFACE_USING(CSimpleInterface); - DeMonServer() + DeMonServer() : statsAggregator(stdAggregateKindStatistics) { lastReport = msTick(); reportRate = globals->getPropInt("@watchdogProgressInterval", 30); @@ -300,6 +302,14 @@ class DeMonServer : public CSimpleInterface, implements IDeMonServer reportActiveGraphs(true, false); activeGraphs.kill(); } + virtual void updateAggregates(IWorkUnit * lockedWu) override + { + statsAggregator.updateAggregates(lockedWu); + } + virtual void loadExistingAggregates(IConstWorkUnit &workunit) override + { + statsAggregator.loadExistingAggregates(workunit); + } }; diff --git a/thorlcr/master/thdemonserver.hpp b/thorlcr/master/thdemonserver.hpp index 32453c92764..3931b540ded 100644 --- a/thorlcr/master/thdemonserver.hpp +++ b/thorlcr/master/thdemonserver.hpp @@ -24,12 +24,15 @@ interface IWUGraphProgress; class CGraphBase; +interface IConstWorkUnit; interface IDeMonServer : extends IInterface { virtual void takeHeartBeat(MemoryBuffer &progressMbb) = 0; virtual void startGraph(CGraphBase *graph) = 0; virtual void endGraph(CGraphBase *graph, bool success) = 0; virtual void endGraphs() = 0; + virtual void updateAggregates(IWorkUnit * lockedWu) = 0; + virtual void loadExistingAggregates(IConstWorkUnit &workunit) = 0; }; diff --git a/thorlcr/master/thgraphmanager.cpp b/thorlcr/master/thgraphmanager.cpp index 95c38c1c93f..ae1d3ded4c4 100644 --- a/thorlcr/master/thgraphmanager.cpp +++ b/thorlcr/master/thgraphmanager.cpp @@ -1131,6 +1131,8 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName, if (isContainerized() && podInfo.hasStdDev()) podInfo.report(wu); } + if (globals->getPropBool("@watchdogProgressEnabled")) + queryDeMonServer()->loadExistingAggregates(workunit); setWuid(workunit.queryWuid(), workunit.queryClusterName()); @@ -1148,7 +1150,9 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName, cost_type cost = money2cost_type(calculateThorCost(nanoToMilli(graphTimeNs), numberOfMachines)); if (cost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - updateSpillSize(wu, graphScope, SSTgraph); + if (globals->getPropBool("@watchdogProgressEnabled")) + queryDeMonServer()->updateAggregates(wu); + removeJob(*job); } catch (IException *e)