Skip to content

Commit

Permalink
Merge pull request hpcc-systems#18048 from shamser/issue29657new2
Browse files Browse the repository at this point in the history
HPCC-29657 Produce aggregate stats (e.g. spill, cost) whilst a job is running

Reviewed-by: Jake Smith <[email protected]>
Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Dec 7, 2023
2 parents 2a66e73 + d45f744 commit 2a959b9
Show file tree
Hide file tree
Showing 12 changed files with 317 additions and 178 deletions.
120 changes: 52 additions & 68 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,9 @@ CWuGraphStats::CWuGraphStats(StatisticCreatorType _creatorType, const char * _cr
{
StatsScopeId graphScopeId;
verifyex(graphScopeId.setScopeText(_rootScope));
StatsScopeId wfScopeId(SSTworkflow,wfid);

StatsScopeId rootScopeId(SSTworkflow,wfid);
collector.setown(createStatisticsGatherer(_creatorType, _creator, rootScopeId));
collector.setown(createStatisticsGatherer(_creatorType, _creator, wfScopeId));
collector->beginScope(graphScopeId);
}

Expand Down Expand Up @@ -2659,83 +2659,67 @@ cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope, bool exclu
}
}

//aggregate disk costs from top-level subgraphs (when scope specified) or workflows (scope not specified)
cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope)
void StatisticsAggregator::loadExistingAggregates(const IConstWorkUnit &workunit)
{
WuScopeFilter filter;
if (!isEmptyString(scope))
filter.addScope(scope);
else
filter.addScope(""); // Needed to match scope
// when scope is a workflow, sum graph costs (or subgraph cost when no graph cost) to get workflow cost
// (Costs from child graphs and activities should have been summed up to graph/subgraph level already)
// when isEmptyString(scope), sum workflow costs (or graph cost when no workflow cost) to get global cost
// (Costs from all levels below graph should be summed upto at least graph level already)
// i.e. need 2 levels of nesting
filter.setIncludeNesting(2);
// includeNesting(2) needs just source "global". However, WuScopeFilter is incorrectly inferring the source as "global,stats",
// causing too many of the stats to be pulled in and inefficiency. Here, explicitly set source to "global"
filter.addSource("global");
filter.addOutputStatistic(StCostFileAccess);
filter.addRequiredStat(StCostFileAccess);
filter.finishedFilter();
Owned<IConstWUScopeIterator> it = &wu->getScopeIterator(filter);
cost_type totalCost = 0;
for (it->first(); it->isValid(); )
StatsScopeId globalScopeId(SSTglobal, (unsigned)0);
statsCollection.setown(createStatisticCollection(globalScopeId));

class StatsCollectionAggregatesLoader : public IWuScopeVisitor
{
cost_type value = 0;
if (it->getStat(StCostFileAccess, value))
{
totalCost += value;
it->nextSibling();
}
else
public:
StatsCollectionAggregatesLoader(IStatisticCollection * _statsCollection) : statsCollection(_statsCollection) {}

virtual void noteStatistic(StatisticKind kind, unsigned __int64 value, IConstWUStatistic & extra) override
{
it->next();
statsCollection->setStatistic(extra.queryScope(), kind, value);
}
}
return totalCost;
}
virtual void noteAttribute(WuAttr attr, const char * value) override { throwUnexpected(); }
virtual void noteHint(const char * kind, const char * value) override { throwUnexpected(); }
virtual void noteException(IConstWUException & exception) override { throwUnexpected(); }
private:
Linked<IStatisticCollection> statsCollection;
};

void gatherSpillSize(const IConstWorkUnit * wu, const char *scope, stat_type & peakSizeSpill)
{
WuScopeFilter filter;
if (!isEmptyString(scope))
filter.addScope(scope);
else
{
filter.addScope("");
filter.addSource("global");
}
filter.setIncludeNesting(1);
filter.addOutputStatistic(StSizeGraphSpill);
filter.addRequiredStat(StSizeGraphSpill);
filter.addScopeType(SSTglobal).addScopeType(SSTworkflow).addScopeType(SSTgraph);
const unsigned numStats = mapping.numStatistics();
for (unsigned i=0; i<numStats; ++i)
filter.addOutputStatistic(mapping.getKind(i));
filter.setDepth(1,3); // 1=global, 2=workflow, 3=graph
filter.setSources(SSFsearchGlobalStats);
filter.setIncludeNesting(0);
filter.finishedFilter();
Owned<IConstWUScopeIterator> it = &wu->getScopeIterator(filter);
peakSizeSpill = 0;
for (it->first(); it->isValid(); )
{
stat_type value = 0;
if (it->getStat(StSizeGraphSpill, value))
{
if (value>peakSizeSpill)
peakSizeSpill = value;
it->nextSibling();
}
else
{
it->next();
}
}

StatsCollectionAggregatesLoader aggregatesLoader(statsCollection);
Owned<IConstWUScopeIterator> iter = &workunit.getScopeIterator(filter);
ForEach(*iter)
iter->playProperties(aggregatesLoader);
}

void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scopeType)
// Replace the stats at the specified scope level
void StatisticsAggregator::recordStats(IStatisticCollection * sourceStats, unsigned wfid, const char * graphName, unsigned sgId)
{
stat_type peakSizeSpill = 0;
gatherSpillSize(wu, scope, peakSizeSpill);
if (peakSizeSpill)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scope, StSizeGraphSpill, nullptr, peakSizeSpill, 1, 0, StatsMergeMax);
StatsScopeId graphScopeId;
verifyex(graphScopeId.setScopeText(graphName));
StatsScopeId wfScopeId(SSTworkflow, wfid);
StatsScopeId sgScopeId(SSTsubgraph, sgId);
statsCollection->recordStats(mapping, sourceStats, {wfScopeId, graphScopeId, sgScopeId});
}

// Recalculate aggregates and then write the aggregates to global stats (dali)
void StatisticsAggregator::updateAggregates(IWorkUnit *wu)
{
if (!statsCollection)
return;

AggregateUpdatedCallBackFunc f = [&](const char * scope, StatisticScopeType sst, StatisticKind kind, stat_type value)
{
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), sst, scope, kind, nullptr, value, 1, 0, StatsMergeReplace);
};

statsCollection->refreshAggregates(mapping, f);
}

//---------------------------------------------------------------------------------------------------------------------


Expand Down
16 changes: 13 additions & 3 deletions common/workunit/workunit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,6 @@ interface IConstWUScopeIterator : extends IScmIterator
};

//---------------------------------------------------------------------------------------------------------------------

//! IWorkUnit
//! Provides high level access to WorkUnit "header" data.
interface IWorkUnit;
Expand Down Expand Up @@ -1726,8 +1725,6 @@ extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, ITimeReporter *ti
extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, StatisticScopeType scopeType, StatisticKind kind, ITimeReporter *timer);
extern WORKUNIT_API void aggregateStatistic(StatsAggregation & result, IConstWorkUnit * wu, const WuScopeFilter & filter, StatisticKind search);
extern WORKUNIT_API cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope=nullptr, bool excludehThor=false);
extern WORKUNIT_API cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope);
extern WORKUNIT_API void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scopeType);
extern WORKUNIT_API const char *getTargetClusterComponentName(const char *clustname, const char *processType, StringBuffer &name);
extern WORKUNIT_API void descheduleWorkunit(char const * wuid);
#if 0
Expand Down Expand Up @@ -1786,4 +1783,17 @@ extern WORKUNIT_API TraceFlags loadTraceFlags(IConstWorkUnit * wu, const std::in

extern WORKUNIT_API bool executeGraphOnLingeringThor(IConstWorkUnit &workunit, unsigned wfid, const char *graphName);


class WORKUNIT_API StatisticsAggregator : public CInterface
{
public:
StatisticsAggregator(const StatisticsMapping & _mapping) : mapping(_mapping) {}
void loadExistingAggregates(const IConstWorkUnit &workunit);
void recordStats(IStatisticCollection * sourceStats, unsigned wfid, const char * graphName, unsigned graphId);
void updateAggregates(IWorkUnit *wu);
private:
Owned<IStatisticCollection> statsCollection;
const StatisticsMapping & mapping;
};

#endif
2 changes: 2 additions & 0 deletions ecl/eclagent/agentctx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ struct IAgentContext : extends IGlobalCodeContext
virtual bool forceNewDiskReadActivity() const = 0;
virtual void addWuExceptionEx(const char * text, unsigned code, unsigned severity, unsigned audience, char const * source) = 0;
virtual double queryAgentMachineCost() const = 0;
virtual void updateAggregates(IWorkUnit* lockedwu) = 0;
virtual void mergeAggregatorStats(IStatisticCollection & stats, unsigned wfid, const char *graphname, unsigned sgId) = 0;
};

#endif // AGENTCTX_HPP_INCL
21 changes: 8 additions & 13 deletions ecl/eclagent/eclagent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ class EclAgentPluginCtx : public SimplePluginCtx
//=======================================================================================

EclAgent::EclAgent(IConstWorkUnit *wu, const char *_wuid, bool _checkVersion, bool _resetWorkflow, bool _noRetry, char const * _logname, const char *_allowedPipeProgs, IPropertyTree *_queryXML, ILogMsgHandler * _logMsgHandler)
: wuRead(wu), wuid(_wuid), checkVersion(_checkVersion), resetWorkflow(_resetWorkflow), noRetry(_noRetry), allowedPipeProgs(_allowedPipeProgs), logMsgHandler(_logMsgHandler)
: wuRead(wu), wuid(_wuid), checkVersion(_checkVersion), resetWorkflow(_resetWorkflow), noRetry(_noRetry), allowedPipeProgs(_allowedPipeProgs), logMsgHandler(_logMsgHandler), statsAggregator(stdAggregateKindStatistics)
{
isAborting = false;
isStandAloneExe = false;
Expand Down Expand Up @@ -1988,10 +1988,6 @@ void EclAgent::doProcess()
const cost_type cost = aggregateCost(w, nullptr, false);
if (cost)
w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
const cost_type diskAccessCost = aggregateDiskAccessCost(w, nullptr);
if (diskAccessCost)
w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostFileAccess, NULL, diskAccessCost, 1, 0, StatsMergeReplace);
updateSpillSize(w, nullptr, SSTglobal);
addTimings(w);

switch (w->getState())
Expand Down Expand Up @@ -2245,12 +2241,15 @@ void EclAgent::runProcess(IEclProcess *process)
ForEachItemIn(i2, queryLibraries)
queryLibraries.item(i2).destroyGraph();

if (rowManager)
{
WorkunitUpdate wu = updateWorkUnit();
WuStatisticTarget statsTarget(wu, "eclagent");
rowManager->reportPeakStatistics(statsTarget, 0);
rowManager->getMemoryUsage();//Causes statistics to be written to logfile
updateAggregates(wu);
if (rowManager)
{
WuStatisticTarget statsTarget(wu, "eclagent");
rowManager->reportPeakStatistics(statsTarget, 0);
rowManager->getMemoryUsage();//Causes statistics to be written to logfile
}
}

rowManager.clear(); // Must go before the allocatorCache
Expand Down Expand Up @@ -2513,10 +2512,6 @@ void EclAgentWorkflowMachine::noteTiming(unsigned wfid, timestamp_type startTime
const cost_type cost = money2cost_type(calcCost(agent.queryAgentMachineCost(), nanoToMilli(elapsedNs))) + aggregateCost(wu, scope, true);
if (cost)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
const cost_type diskAccessCost = aggregateDiskAccessCost(wu, scope);
if (diskAccessCost)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostFileAccess, NULL, diskAccessCost, 1, 0, StatsMergeReplace);
updateSpillSize(wu, scope, SSTworkflow);
}

void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item)
Expand Down
21 changes: 19 additions & 2 deletions ecl/eclagent/eclagent.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,15 @@ public:
virtual double queryAgentMachineCost() const override
{
return ctx->queryAgentMachineCost();
};
}
virtual void updateAggregates(IWorkUnit* lockedwu) override
{
ctx->updateAggregates(lockedwu);
}
virtual void mergeAggregatorStats(IStatisticCollection & stats, unsigned wfid, const char *graphname, unsigned sgId) override
{
ctx->mergeAggregatorStats(stats, wfid, graphname, sgId);
}

protected:
IAgentContext * ctx;
Expand Down Expand Up @@ -392,6 +400,7 @@ private:
Owned<IOrderedOutputSerializer> outputSerializer;
int retcode;
double agentMachineCost = 0;
StatisticsAggregator statsAggregator;

private:
void doSetResultString(type_t type, const char * stepname, unsigned sequence, int len, const char *val);
Expand Down Expand Up @@ -705,6 +714,14 @@ public:
{
return agentMachineCost;
}
virtual void updateAggregates(IWorkUnit* lockedwu) override
{
statsAggregator.updateAggregates(lockedwu);
}
virtual void mergeAggregatorStats(IStatisticCollection & stats, unsigned wfid, const char *graphname, unsigned sgId) override
{
statsAggregator.recordStats(&stats, wfid, graphname, sgId);
}
};

//---------------------------------------------------------------------------
Expand Down Expand Up @@ -1055,7 +1072,7 @@ public:
void executeLibrary(const byte * parentExtract, IHThorGraphResults * results);
IWUGraphStats *updateStats(StatisticCreatorType creatorType, const char * creator, unsigned wfid, unsigned subgraph);
void updateWUStatistic(IWorkUnit* lockedwu, StatisticScopeType scopeType, const char* scope, StatisticKind kind, const char* descr, long long unsigned int value);

void updateAggregates(IWorkUnit* lockedwu);
EclSubGraph * idToGraph(unsigned id);
EclGraphElement * idToActivity(unsigned id);
const char *queryGraphName() { return graphName; }
Expand Down
28 changes: 16 additions & 12 deletions ecl/eclagent/eclgraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -879,10 +879,12 @@ void EclSubGraph::updateProgress()
Owned<IWUGraphStats> progress = parent.updateStats(queryStatisticsComponentType(), queryStatisticsComponentName(), parent.queryWfid(), id);
IStatisticGatherer & stats = progress->queryStatsBuilder();
updateProgress(stats);

Owned<IStatisticCollection> statsCollection = stats.getResult();
agent->mergeAggregatorStats(*statsCollection, parent.queryWfid(), parent.queryGraphName(), id);
if (startGraphTime || elapsedGraphCycles)
{
WorkunitUpdate lockedwu(agent->updateWorkUnit());
agent->updateAggregates(lockedwu);
StringBuffer subgraphid;
subgraphid.append(parent.queryGraphName()).append(":").append(SubGraphScopePrefix).append(id);
if (startGraphTime)
Expand All @@ -897,10 +899,6 @@ void EclSubGraph::updateProgress()
if (cost)
lockedwu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);
}
Owned<IStatisticCollection> statsCollection = stats.getResult();
const cost_type costDiskAccess = aggregateStatistic(StCostFileAccess, statsCollection) ;
if (costDiskAccess)
lockedwu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTsubgraph, scope, StCostFileAccess, NULL, costDiskAccess, 1, 0, StatsMergeReplace);
}
}
}
Expand All @@ -927,6 +925,11 @@ void EclSubGraph::updateProgress(IStatisticGatherer &progress)
}
ForEachItemIn(i2, subgraphs)
subgraphs.item(i2).updateProgress(progress);

Owned<IStatisticCollection> statsCollection = progress.getResult();
const cost_type costDiskAccess = statsCollection->aggregateStatistic(StCostFileAccess);
if (costDiskAccess)
progress.addStatistic(StCostFileAccess, costDiskAccess);
}

bool EclSubGraph::prepare(const byte * parentExtract, bool checkDependencies)
Expand Down Expand Up @@ -1277,10 +1280,6 @@ void EclGraph::execute(const byte * parentExtract)
const cost_type cost = money2cost_type(calcCost(agent->queryAgentMachineCost(), elapsed));
if (cost)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);

const cost_type costDiskAccess = aggregateDiskAccessCost(wu, scope);
if (costDiskAccess)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, scope, StCostFileAccess, NULL, costDiskAccess, 1, 0, StatsMergeReplace);
}

if (agent->queryRemoteWorkunit())
Expand Down Expand Up @@ -1349,8 +1348,12 @@ void EclGraph::updateLibraryProgress()
{
EclSubGraph & cur = graphs.item(idx);
unsigned wfid = cur.parent.queryWfid();
Owned<IWUGraphStats> progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false);
cur.updateProgress(progress->queryStatsBuilder());

Owned<IWUGraphStats> progress = wu->updateStats(queryGraphName(), queryStatisticsComponentType(), queryStatisticsComponentName(), wfid, cur.id, false);
IStatisticGatherer & stats = progress->queryStatsBuilder();
cur.updateProgress(stats);
Owned<IStatisticCollection> statsCollection = stats.getResult();
agent->mergeAggregatorStats(*statsCollection, wfid, queryGraphName(), cur.id);
}
}

Expand Down Expand Up @@ -1492,7 +1495,7 @@ void GraphResults::setResult(unsigned id, IHThorGraphResult * result)

IWUGraphStats *EclGraph::updateStats(StatisticCreatorType creatorType, const char * creator, unsigned activeWfid, unsigned subgraph)
{
return wu->updateStats (queryGraphName(), creatorType, creator, activeWfid, subgraph, false);
return wu->updateStats(queryGraphName(), creatorType, creator, activeWfid, subgraph, false);
}

void EclGraph::updateWUStatistic(IWorkUnit *lockedwu, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * descr, unsigned __int64 value)
Expand Down Expand Up @@ -1544,6 +1547,7 @@ EclGraph * EclAgent::loadGraph(const char * graphName, IConstWorkUnit * wu, ILoa

Owned<EclGraph> eclGraph = new EclGraph(*this, graphName, wu, isLibrary, debugContext, probeManager, wuGraph->getWfid());
eclGraph->createFromXGMML(dll, xgmml);
statsAggregator.loadExistingAggregates(*wu);
return eclGraph.getClear();
}

Expand Down
Loading

0 comments on commit 2a959b9

Please sign in to comment.