Skip to content

Commit c167963

Browse files
committed
HPCC-30993 Fix file access for index read activity
Serialize jhtree and file io stats to thor manager. Record jhtree and file io stats for Index Read Activity (to activity scope) (Correct cost calculation for index reads require jhtree stats, so with this change the file access cost for index read should be correct.) Signed-off-by: Shamser Ahmed <[email protected]>
1 parent 57bdd34 commit c167963

File tree

1 file changed

+102
-59
lines changed

1 file changed

+102
-59
lines changed

thorlcr/activities/indexread/thindexreadslave.cpp

+102-59
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
4444
protected:
4545
StringAttr logicalFilename;
4646
IArrayOf<IPartDescriptor> partDescs;
47+
bool isSuperFile = false;
4748
IHThorIndexReadBaseArg *helper;
4849
IHThorSourceLimitTransformExtra * limitTransformExtra;
4950
Owned<IEngineRowAllocator> allocator;
@@ -76,10 +77,9 @@ class CIndexReadSlaveBase : public CSlaveActivity
7677
rowcount_t rowLimit = RCMAX;
7778
bool useRemoteStreaming = false;
7879
Owned<IFileIO> lazyIFileIO;
79-
mutable CriticalSection ioStatsCS;
80+
mutable CriticalSection keyManagersCS; // CS for any updates to keyManagers
8081
unsigned fileTableStart = NotFound;
81-
CStatsContextLogger contextLogger;
82-
CStatsCtxLoggerDeltaUpdater statsUpdater;
82+
std::vector<Owned<CStatsContextLogger>> contextLoggers;
8383

8484
class TransformCallback : implements IThorIndexCallback , public CSimpleInterface
8585
{
@@ -98,7 +98,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
9898
if (!keyManager)
9999
throw MakeActivityException(&activity, 0, "Callback attempting to read blob with no key manager - index being read remotely?");
100100
needsBlobCleaning = true;
101-
return (byte *) keyManager->loadBlob(id, dummy, &activity.contextLogger);
101+
return (byte *) keyManager->loadBlob(id, dummy, nullptr);
102102
}
103103
void prepareManager(IKeyManager *_keyManager)
104104
{
@@ -166,10 +166,9 @@ class CIndexReadSlaveBase : public CSlaveActivity
166166
unsigned projectedFormatCrc = helper->getProjectedFormatCrc();
167167
IOutputMetaData *projectedFormat = helper->queryProjectedDiskRecordSize();
168168

169-
unsigned p = partNum;
170-
while (p<partDescs.ordinality()) // will process all parts if localMerge
169+
for (unsigned p = partNum; p<partDescs.ordinality(); p++) // will process all parts if localMerge
171170
{
172-
IPartDescriptor &part = partDescs.item(p++);
171+
IPartDescriptor &part = partDescs.item(p);
173172
unsigned crc=0;
174173
part.getCrc(crc);
175174

@@ -273,7 +272,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
273272
continue; // try next copy and ultimately failover to local when no more copies
274273
}
275274
ActPrintLog("[part=%d]: reading remote dafilesrv index '%s' (logical file = %s)", partNum, path.str(), logicalFilename.get());
276-
partNum = p;
275+
partNum = (p+1); // NB: returning next part number to process.
277276
return indexLookup.getClear();
278277
}
279278
}
@@ -296,15 +295,16 @@ class CIndexReadSlaveBase : public CSlaveActivity
296295
part.queryOwner().getClusterLabel(0, planeName);
297296
blockedSize = getBlockedFileIOSize(planeName);
298297
}
299-
lazyIFileIO.setown(queryThor().queryFileCache().lookupIFileIO(*this, logicalFilename, part, nullptr, indexReadActivityStatistics, blockedSize));
298+
lazyIFileIO.setown(queryThor().queryFileCache().lookupIFileIO(*this, logicalFilename, part, nullptr, indexReadFileStatistics, blockedSize));
300299

301300
RemoteFilename rfn;
302301
part.getFilename(0, rfn);
303302
StringBuffer path;
304303
rfn.getPath(path); // NB: use for tracing only, IDelayedFile uses IPartDescriptor and any copy
305304

306305
Owned<IKeyIndex> keyIndex = createKeyIndex(path, crc, *lazyIFileIO, (unsigned) -1, false);
307-
Owned<IKeyManager> klManager = createLocalKeyManager(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndex, &contextLogger, helper->hasNewSegmentMonitors(), false);
306+
IContextLogger * contextLogger = isSuperFile?contextLoggers[p]:contextLoggers[0];
307+
Owned<IKeyManager> klManager = createLocalKeyManager(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndex, contextLogger, helper->hasNewSegmentMonitors(), false);
308308
if (localMerge)
309309
{
310310
if (!keyIndexSet)
@@ -315,7 +315,10 @@ class CIndexReadSlaveBase : public CSlaveActivity
315315
translators.append(translator.getClear());
316316
}
317317
keyIndexSet->addIndex(keyIndex.getClear());
318-
keyManagers.append(*klManager.getLink());
318+
{
319+
CriticalBlock b(keyManagersCS);
320+
keyManagers.append(*klManager.getLink());
321+
}
319322
keyManager = klManager;
320323
}
321324
else
@@ -325,13 +328,17 @@ class CIndexReadSlaveBase : public CSlaveActivity
325328
if (translator)
326329
klManager->setLayoutTranslator(&translator->queryTranslator());
327330
translators.append(translator.getClear());
328-
keyManagers.append(*klManager.getLink());
331+
{
332+
CriticalBlock b(keyManagersCS);
333+
keyManagers.append(*klManager.getLink());
334+
}
329335
keyManager = klManager;
330-
partNum = p;
336+
partNum = (p+1); // NB: returning next part number to process.
331337
return createIndexLookup(keyManager);
332338
}
333339
}
334-
keyMergerManager.setown(createKeyMerger(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndexSet, seekGEOffset, &contextLogger, helper->hasNewSegmentMonitors(), false));
340+
//Not tracking jhtree cache stats in KeyMerger at the moment. Future: something to consider
341+
keyMergerManager.setown(createKeyMerger(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndexSet, seekGEOffset, nullptr, helper->hasNewSegmentMonitors(), false));
335342
const ITranslator *translator = translators.item(0);
336343
if (translator)
337344
keyMergerManager->setLayoutTranslator(&translator->queryTranslator());
@@ -348,40 +355,12 @@ class CIndexReadSlaveBase : public CSlaveActivity
348355
else
349356
return nullptr;
350357
}
351-
void mergeFileStats(IPartDescriptor *partDesc, IFileIO *partIO)
352-
{
353-
if (fileStats.size()>0)
354-
{
355-
ISuperFileDescriptor * superFDesc = partDesc->queryOwner().querySuperFileDescriptor();
356-
if (superFDesc)
357-
{
358-
unsigned subfile, lnum;
359-
if(superFDesc->mapSubPart(partDesc->queryPartIndex(), subfile, lnum))
360-
mergeStats(*fileStats[fileTableStart+subfile], partIO);
361-
}
362-
else
363-
mergeStats(*fileStats[fileTableStart], partIO);
364-
}
365-
}
366-
void updateStats()
367-
{
368-
// NB: updateStats() should always be called whilst ioStatsCS is held.
369-
if (lazyIFileIO)
370-
{
371-
mergeStats(inactiveStats, lazyIFileIO);
372-
if (currentPart<partDescs.ordinality())
373-
mergeFileStats(&partDescs.item(currentPart), lazyIFileIO);
374-
}
375-
}
376358
void configureNextInput()
377359
{
378360
if (currentManager)
379361
{
380362
resetManager(currentManager);
381363
currentManager = nullptr;
382-
383-
CriticalBlock b(ioStatsCS);
384-
updateStats();
385364
lazyIFileIO.clear();
386365
}
387366
IKeyManager *keyManager = nullptr;
@@ -433,7 +412,6 @@ class CIndexReadSlaveBase : public CSlaveActivity
433412
if (eoi)
434413
return nullptr;
435414
dbgassertex(currentInput);
436-
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
437415
const void *ret = nullptr;
438416
while (true)
439417
{
@@ -528,7 +506,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
528506
}
529507
public:
530508
CIndexReadSlaveBase(CGraphElementBase *container)
531-
: CSlaveActivity(container, indexReadActivityStatistics), callback(*this), contextLogger(jhtreeCacheStatistics, thorJob), statsUpdater(jhtreeCacheStatistics, *this, contextLogger)
509+
: CSlaveActivity(container, indexReadActivityStatistics), callback(*this)
532510
{
533511
helper = (IHThorIndexReadBaseArg *)container->queryHelper();
534512
limitTransformExtra = nullptr;
@@ -555,7 +533,6 @@ class CIndexReadSlaveBase : public CSlaveActivity
555533
break;
556534
if (keyManager)
557535
prepareManager(keyManager);
558-
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
559536
if (hard) // checkCount checks hard key count only.
560537
count += indexInput->checkCount(keyedLimit-count); // part max, is total limit [keyedLimit] minus total so far [count]
561538
else
@@ -589,7 +566,10 @@ class CIndexReadSlaveBase : public CSlaveActivity
589566
partDescs.kill();
590567
keyIndexSet.clear();
591568
translators.kill();
592-
keyManagers.kill();
569+
{
570+
CriticalBlock b(keyManagersCS);
571+
keyManagers.kill();
572+
}
593573
keyMergerManager.clear();
594574
}
595575
else
@@ -607,6 +587,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
607587
IPartDescriptor &part0 = partDescs.item(0);
608588
IFileDescriptor &fileDesc = part0.queryOwner();
609589
ISuperFileDescriptor *super = fileDesc.querySuperFileDescriptor();
590+
isSuperFile = super != nullptr;
610591

611592
if ((0 == (helper->getFlags() & TIRusesblob)) && !localMerge)
612593
{
@@ -684,7 +665,17 @@ class CIndexReadSlaveBase : public CSlaveActivity
684665
}
685666
}
686667
data.read(fileTableStart);
687-
setupSpace4FileStats(fileTableStart, reInit, super!=nullptr, super?super->querySubFiles():0, indexReadActivityStatistics);
668+
setupSpace4FileStats(fileTableStart, reInit, isSuperFile, isSuperFile?super->querySubFiles():0, indexReadFileStatistics);
669+
if (isSuperFile)
670+
{
671+
// Only superfiles required multiple context loggers to track sublevel stats
672+
for(unsigned i = 0; i < parts; ++i)
673+
contextLoggers.push_back(new CStatsContextLogger(jhtreeCacheStatistics, thorJob));
674+
}
675+
else
676+
{
677+
contextLoggers.push_back(new CStatsContextLogger(jhtreeCacheStatistics, thorJob));
678+
}
688679
}
689680
}
690681
// IThorDataLink
@@ -719,10 +710,70 @@ class CIndexReadSlaveBase : public CSlaveActivity
719710
virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const
720711
{
721712
PARENT::gatherActiveStats(activeStats);
713+
// *) jhtree stats will have been tracked as follows
714+
// - For superfiles, there will be 1 IContextLogger in contextLoggers[] for each part
715+
// - In all other cases, use 1st IContextLogger in contextLoggers[] for all parts
716+
// *) File IO stats will have been tracked as follows
717+
// - the io stats will be tracked in keyManagers[]
718+
// - (There will be exactly 1 keyManager per part)
719+
// Note: the KeyManagers[], contextLoggers[] and fileStats[] are not necessarilly the same size
720+
// - KeyManagers[] will always be the same size as partDescs[]
721+
// - contextLoggers[] will be the size of the number of subfiles for superfiles, otherwise 1
722+
// - fileStats[] may be size of #subfiles for superfiles, size 1 for dynamic/variable files, or
723+
// zero for everything else
724+
// This function will:
725+
// 1) In all cases, activeStats will have activity level jhtree & io stats
726+
// 2) For superfiles and variable/dynamic files, fileStats[] will have jhtree & io stats
727+
// (If there are no fileStats, the file level stats can be extracted from activeStats)
728+
// TODO:
729+
// * When localMerge==true or when useMerger==true, stats may not correct. Revisit
730+
// this section to ensure that the code handles these correctly.
731+
if (partDescs.ordinality())
722732
{
723-
CriticalBlock b(ioStatsCS);
724-
if (lazyIFileIO)
725-
mergeStats(activeStats, lazyIFileIO);
733+
// reset required because within loop below, mergeStats() is used to build up stats for each file
734+
for (auto & fileStatItem: fileStats)
735+
fileStatItem->reset();
736+
ISuperFileDescriptor *superFDesc = partDescs.item(0).queryOwner().querySuperFileDescriptor();
737+
for (unsigned partNum=0; partNum<partDescs.ordinality(); partNum++)
738+
{
739+
IKeyManager * keyManager;
740+
{
741+
CriticalBlock b(keyManagersCS);
742+
if (!keyManagers.isItem(partNum))
743+
continue;
744+
keyManager = &keyManagers.item(partNum);
745+
}
746+
if (fileStats.size()>0)
747+
{
748+
unsigned subfile = 0;
749+
// For super file workout entry number in filestats for subfile
750+
// (For non-superfiles, subfile will be zero, so first entry from fileStats will be used)
751+
if (isSuperFile)
752+
{
753+
unsigned lnum;
754+
if(!superFDesc->mapSubPart(partDescs.item(partNum).queryPartIndex(), subfile, lnum))
755+
continue; // should not happen
756+
}
757+
CRuntimeStatisticCollection * fileStatItem = fileStats[fileTableStart+subfile];
758+
keyManager->mergeStats(*fileStatItem); // file level IO stats
759+
// Merge subfile jhtree stats into fileStats and activeStats
760+
if (isSuperFile)
761+
{
762+
fileStatItem->merge(contextLoggers[partNum]->queryStats());
763+
activeStats.merge(contextLoggers[partNum]->queryStats());
764+
}
765+
}
766+
// IO stats always needed in activeStats for activity level stats
767+
keyManager->mergeStats(activeStats);
768+
}
769+
// ContextLoggers for non-superfile, not merged to activeStats or fileStats
770+
// (Avoid doing in loop above, to avoid merging same jhtree stats multiple times)
771+
if (!isSuperFile)
772+
{
773+
activeStats.merge(contextLoggers[0]->queryStats());
774+
if (fileStats.size()>0)
775+
fileStats[fileTableStart]->merge(contextLoggers[0]->queryStats());
776+
}
726777
}
727778
activeStats.setStatistic(StNumRowsProcessed, progress);
728779
}
@@ -735,11 +786,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
735786
}
736787
virtual void done() override
737788
{
738-
{
739-
CriticalBlock b(ioStatsCS);
740-
updateStats();
741-
lazyIFileIO.clear();
742-
}
789+
lazyIFileIO.clear();
743790
PARENT::done();
744791
}
745792
};
@@ -819,7 +866,6 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase
819866
helper->mapOutputToInput(tempBuilder, seek, numFields); // NOTE - weird interface to mapOutputToInput means that it STARTS writing at seekGEOffset...
820867
rawSeek = (byte *)temp;
821868
}
822-
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
823869
if (!currentManager->lookupSkip(rawSeek, seekGEOffset, seekSize))
824870
return NULL;
825871
const byte *row = currentManager->queryKeyBuffer();
@@ -972,7 +1018,6 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase
9721018
// IRowStream
9731019
virtual void stop() override
9741020
{
975-
CStatsScopedDeltaUpdater scoped(statsUpdater);
9761021
if (RCMAX != keyedLimit) // NB: will not be true if nextRow() has handled
9771022
{
9781023
keyedLimitCount = sendGetCount(keyedProcessed);
@@ -1142,7 +1187,6 @@ class CIndexGroupAggregateSlaveActivity : public CIndexReadSlaveBase, implements
11421187
if (keyManager)
11431188
prepareManager(keyManager);
11441189

1145-
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
11461190
while (true)
11471191
{
11481192
const void *key = indexInput->nextKey();
@@ -1301,7 +1345,6 @@ class CIndexCountSlaveActivity : public CIndexReadSlaveBase
13011345
if (keyManager)
13021346
prepareManager(keyManager);
13031347

1304-
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
13051348
while (true)
13061349
{
13071350
const void *key = indexInput->nextKey();

0 commit comments

Comments
 (0)