Skip to content

Commit 21dcbb7

Browse files
committed
HPCC-30993 Fix file access for index read activity
Signed-off-by: Shamser Ahmed <[email protected]>
1 parent e67d16a commit 21dcbb7

File tree

1 file changed

+98
-59
lines changed

1 file changed

+98
-59
lines changed

thorlcr/activities/indexread/thindexreadslave.cpp

+98-59
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
4444
protected:
4545
StringAttr logicalFilename;
4646
IArrayOf<IPartDescriptor> partDescs;
47+
bool isSuperFile = false;
4748
IHThorIndexReadBaseArg *helper;
4849
IHThorSourceLimitTransformExtra * limitTransformExtra;
4950
Owned<IEngineRowAllocator> allocator;
@@ -76,10 +77,9 @@ class CIndexReadSlaveBase : public CSlaveActivity
7677
rowcount_t rowLimit = RCMAX;
7778
bool useRemoteStreaming = false;
7879
Owned<IFileIO> lazyIFileIO;
79-
mutable CriticalSection ioStatsCS;
80+
mutable CriticalSection keyManagersCS; // CS for any updates to keyManagers
8081
unsigned fileTableStart = NotFound;
81-
CStatsContextLogger contextLogger;
82-
CStatsCtxLoggerDeltaUpdater statsUpdater;
82+
std::vector<Owned<CStatsContextLogger>> contextLoggers;
8383

8484
class TransformCallback : implements IThorIndexCallback , public CSimpleInterface
8585
{
@@ -98,7 +98,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
9898
if (!keyManager)
9999
throw MakeActivityException(&activity, 0, "Callback attempting to read blob with no key manager - index being read remotely?");
100100
needsBlobCleaning = true;
101-
return (byte *) keyManager->loadBlob(id, dummy, &activity.contextLogger);
101+
return (byte *) keyManager->loadBlob(id, dummy, nullptr);
102102
}
103103
void prepareManager(IKeyManager *_keyManager)
104104
{
@@ -166,10 +166,9 @@ class CIndexReadSlaveBase : public CSlaveActivity
166166
unsigned projectedFormatCrc = helper->getProjectedFormatCrc();
167167
IOutputMetaData *projectedFormat = helper->queryProjectedDiskRecordSize();
168168

169-
unsigned p = partNum;
170-
while (p<partDescs.ordinality()) // will process all parts if localMerge
169+
for (unsigned p = partNum; p<partDescs.ordinality(); p++) // will process all parts if localMerge
171170
{
172-
IPartDescriptor &part = partDescs.item(p++);
171+
IPartDescriptor &part = partDescs.item(p);
173172
unsigned crc=0;
174173
part.getCrc(crc);
175174

@@ -273,7 +272,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
273272
continue; // try next copy and ultimately failover to local when no more copies
274273
}
275274
ActPrintLog("[part=%d]: reading remote dafilesrv index '%s' (logical file = %s)", partNum, path.str(), logicalFilename.get());
276-
partNum = p;
275+
partNum = (p+1);
277276
return indexLookup.getClear();
278277
}
279278
}
@@ -296,15 +295,16 @@ class CIndexReadSlaveBase : public CSlaveActivity
296295
part.queryOwner().getClusterLabel(0, planeName);
297296
blockedSize = getBlockedFileIOSize(planeName);
298297
}
299-
lazyIFileIO.setown(queryThor().queryFileCache().lookupIFileIO(*this, logicalFilename, part, nullptr, indexReadActivityStatistics, blockedSize));
298+
lazyIFileIO.setown(queryThor().queryFileCache().lookupIFileIO(*this, logicalFilename, part, nullptr, indexReadFileStatistics, blockedSize));
300299

301300
RemoteFilename rfn;
302301
part.getFilename(0, rfn);
303302
StringBuffer path;
304303
rfn.getPath(path); // NB: use for tracing only, IDelayedFile uses IPartDescriptor and any copy
305304

306305
Owned<IKeyIndex> keyIndex = createKeyIndex(path, crc, *lazyIFileIO, (unsigned) -1, false);
307-
Owned<IKeyManager> klManager = createLocalKeyManager(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndex, &contextLogger, helper->hasNewSegmentMonitors(), false);
306+
IContextLogger * contextLogger = isSuperFile?contextLoggers[p]:contextLoggers[0];
307+
Owned<IKeyManager> klManager = createLocalKeyManager(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndex, contextLogger, helper->hasNewSegmentMonitors(), false);
308308
if (localMerge)
309309
{
310310
if (!keyIndexSet)
@@ -315,7 +315,10 @@ class CIndexReadSlaveBase : public CSlaveActivity
315315
translators.append(translator.getClear());
316316
}
317317
keyIndexSet->addIndex(keyIndex.getClear());
318-
keyManagers.append(*klManager.getLink());
318+
{
319+
CriticalBlock b(keyManagersCS);
320+
keyManagers.append(*klManager.getLink());
321+
}
319322
keyManager = klManager;
320323
}
321324
else
@@ -325,13 +328,17 @@ class CIndexReadSlaveBase : public CSlaveActivity
325328
if (translator)
326329
klManager->setLayoutTranslator(&translator->queryTranslator());
327330
translators.append(translator.getClear());
328-
keyManagers.append(*klManager.getLink());
331+
{
332+
CriticalBlock b(keyManagersCS);
333+
keyManagers.append(*klManager.getLink());
334+
}
329335
keyManager = klManager;
330-
partNum = p;
336+
partNum = (p+1);
331337
return createIndexLookup(keyManager);
332338
}
333339
}
334-
keyMergerManager.setown(createKeyMerger(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndexSet, seekGEOffset, &contextLogger, helper->hasNewSegmentMonitors(), false));
340+
//Not tracking jhtree cache stats in KeyMerger at the moment. Future: something to consider
341+
keyMergerManager.setown(createKeyMerger(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndexSet, seekGEOffset, nullptr, helper->hasNewSegmentMonitors(), false));
335342
const ITranslator *translator = translators.item(0);
336343
if (translator)
337344
keyMergerManager->setLayoutTranslator(&translator->queryTranslator());
@@ -348,40 +355,12 @@ class CIndexReadSlaveBase : public CSlaveActivity
348355
else
349356
return nullptr;
350357
}
351-
void mergeFileStats(IPartDescriptor *partDesc, IFileIO *partIO)
352-
{
353-
if (fileStats.size()>0)
354-
{
355-
ISuperFileDescriptor * superFDesc = partDesc->queryOwner().querySuperFileDescriptor();
356-
if (superFDesc)
357-
{
358-
unsigned subfile, lnum;
359-
if(superFDesc->mapSubPart(partDesc->queryPartIndex(), subfile, lnum))
360-
mergeStats(*fileStats[fileTableStart+subfile], partIO);
361-
}
362-
else
363-
mergeStats(*fileStats[fileTableStart], partIO);
364-
}
365-
}
366-
void updateStats()
367-
{
368-
// NB: updateStats() should always be called whilst ioStatsCS is held.
369-
if (lazyIFileIO)
370-
{
371-
mergeStats(inactiveStats, lazyIFileIO);
372-
if (currentPart<partDescs.ordinality())
373-
mergeFileStats(&partDescs.item(currentPart), lazyIFileIO);
374-
}
375-
}
376358
void configureNextInput()
377359
{
378360
if (currentManager)
379361
{
380362
resetManager(currentManager);
381363
currentManager = nullptr;
382-
383-
CriticalBlock b(ioStatsCS);
384-
updateStats();
385364
lazyIFileIO.clear();
386365
}
387366
IKeyManager *keyManager = nullptr;
@@ -433,7 +412,6 @@ class CIndexReadSlaveBase : public CSlaveActivity
433412
if (eoi)
434413
return nullptr;
435414
dbgassertex(currentInput);
436-
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
437415
const void *ret = nullptr;
438416
while (true)
439417
{
@@ -528,7 +506,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
528506
}
529507
public:
530508
CIndexReadSlaveBase(CGraphElementBase *container)
531-
: CSlaveActivity(container, indexReadActivityStatistics), callback(*this), contextLogger(jhtreeCacheStatistics, thorJob), statsUpdater(jhtreeCacheStatistics, *this, contextLogger)
509+
: CSlaveActivity(container, indexReadActivityStatistics), callback(*this)
532510
{
533511
helper = (IHThorIndexReadBaseArg *)container->queryHelper();
534512
limitTransformExtra = nullptr;
@@ -555,7 +533,6 @@ class CIndexReadSlaveBase : public CSlaveActivity
555533
break;
556534
if (keyManager)
557535
prepareManager(keyManager);
558-
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
559536
if (hard) // checkCount checks hard key count only.
560537
count += indexInput->checkCount(keyedLimit-count); // part max, is total limit [keyedLimit] minus total so far [count]
561538
else
@@ -589,7 +566,10 @@ class CIndexReadSlaveBase : public CSlaveActivity
589566
partDescs.kill();
590567
keyIndexSet.clear();
591568
translators.kill();
592-
keyManagers.kill();
569+
{
570+
CriticalBlock b(keyManagersCS);
571+
keyManagers.kill();
572+
}
593573
keyMergerManager.clear();
594574
}
595575
else
@@ -607,6 +587,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
607587
IPartDescriptor &part0 = partDescs.item(0);
608588
IFileDescriptor &fileDesc = part0.queryOwner();
609589
ISuperFileDescriptor *super = fileDesc.querySuperFileDescriptor();
590+
isSuperFile = super != nullptr;
610591

611592
if ((0 == (helper->getFlags() & TIRusesblob)) && !localMerge)
612593
{
@@ -684,7 +665,17 @@ class CIndexReadSlaveBase : public CSlaveActivity
684665
}
685666
}
686667
data.read(fileTableStart);
687-
setupSpace4FileStats(fileTableStart, reInit, super!=nullptr, super?super->querySubFiles():0, indexReadActivityStatistics);
668+
setupSpace4FileStats(fileTableStart, reInit, isSuperFile, isSuperFile?super->querySubFiles():0, indexReadFileStatistics);
669+
if (isSuperFile)
670+
{
671+
// Only superfiles required multiple context loggers to track sublevel stats
672+
for(unsigned i = 0; i < parts; ++i)
673+
contextLoggers.push_back(new CStatsContextLogger(jhtreeCacheStatistics, thorJob));
674+
}
675+
else
676+
{
677+
contextLoggers.push_back(new CStatsContextLogger(jhtreeCacheStatistics, thorJob));
678+
}
688679
}
689680
}
690681
// IThorDataLink
@@ -719,10 +710,66 @@ class CIndexReadSlaveBase : public CSlaveActivity
719710
virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const
720711
{
721712
PARENT::gatherActiveStats(activeStats);
713+
// *) jhtree stats will have been tracked as follows
714+
// - For superfiles, there will be 1 IContextLogger in contextLoggers[] for each subfile
715+
// - In all other cases, use 1st IContextLogger in contextLoggers[] for all parts
716+
// *) File IO stats will have been tracked as follows
717+
// - the io stats will be tracked in keyManagers[]
718+
// - (There will be exactly 1 keyManager per part)
719+
// Note: the KeyManagers[], contextLoggers[] and fileStats[] are not necessarilly the same size
720+
// - KeyManagers[] will always be the same size as partDescs[]
721+
// - contextLoggers[] will be the size of the number of subfiles for superfiles, otherwise 1
722+
// - fileStats[] may be size of #subfiles for superfiles, size 1 for dynamic/variable files, or zero for everything else
723+
// This function will:
724+
// 1) In all cases, activeStats will have activity level jhtree & io stats
725+
// 2) For superfiles and variable/dynamic files, fileStats[] will have jhtree & io stats
726+
// (If there are no fileStats, the file level stats can be extracted from activeStats)
727+
if (partDescs.ordinality())
722728
{
723-
CriticalBlock b(ioStatsCS);
724-
if (lazyIFileIO)
725-
mergeStats(activeStats, lazyIFileIO);
729+
// reset required because within loop below, mergeStats() is used to build up stats for each file
730+
for (auto & fileStatItem: fileStats)
731+
fileStatItem->reset();
732+
ISuperFileDescriptor *superFDesc = partDescs.item(0).queryOwner().querySuperFileDescriptor();
733+
for (unsigned partNum=0; partNum<partDescs.ordinality(); partNum++)
734+
{
735+
IKeyManager * keyManager;
736+
{
737+
CriticalBlock b(keyManagersCS);
738+
if (!keyManagers.isItem(partNum))
739+
continue;
740+
keyManager = &keyManagers.item(partNum);
741+
}
742+
if (fileStats.size()>0)
743+
{
744+
unsigned subfile = 0;
745+
// For super file workout entry number in filestats for subfile
746+
// (For non-superfiles, subfile will be zero, so first entry from fileStats will be used)
747+
if (isSuperFile)
748+
{
749+
unsigned lnum;
750+
if(!superFDesc->mapSubPart(partDescs.item(partNum).queryPartIndex(), subfile, lnum))
751+
continue; // should not happen
752+
}
753+
CRuntimeStatisticCollection * fileStatItem = fileStats[fileTableStart+subfile];
754+
keyManager->mergeStats(*fileStatItem); // file level IO stats
755+
// Merge subfile jhtree stats into fileStats and activeStats
756+
if (isSuperFile)
757+
{
758+
fileStatItem->merge(contextLoggers[partNum]->queryStats());
759+
activeStats.merge(contextLoggers[partNum]->queryStats());
760+
}
761+
}
762+
// IO stats always needed in activeStats for activity level stats
763+
keyManager->mergeStats(activeStats);
764+
}
765+
// ContextLoggers for non-superfile, not merged to activeStats or fileStats
766+
// (Avoid doing in loop above, to avoid merging same jhtree stats multiple times)
767+
if (!isSuperFile)
768+
{
769+
activeStats.merge(contextLoggers[0]->queryStats());
770+
if (fileStats.size()>0)
771+
fileStats[fileTableStart]->merge(contextLoggers[0]->queryStats());
772+
}
726773
}
727774
activeStats.setStatistic(StNumRowsProcessed, progress);
728775
}
@@ -735,11 +782,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
735782
}
736783
virtual void done() override
737784
{
738-
{
739-
CriticalBlock b(ioStatsCS);
740-
updateStats();
741-
lazyIFileIO.clear();
742-
}
785+
lazyIFileIO.clear();
743786
PARENT::done();
744787
}
745788
};
@@ -819,7 +862,6 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase
819862
helper->mapOutputToInput(tempBuilder, seek, numFields); // NOTE - weird interface to mapOutputToInput means that it STARTS writing at seekGEOffset...
820863
rawSeek = (byte *)temp;
821864
}
822-
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
823865
if (!currentManager->lookupSkip(rawSeek, seekGEOffset, seekSize))
824866
return NULL;
825867
const byte *row = currentManager->queryKeyBuffer();
@@ -972,7 +1014,6 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase
9721014
// IRowStream
9731015
virtual void stop() override
9741016
{
975-
CStatsScopedDeltaUpdater scoped(statsUpdater);
9761017
if (RCMAX != keyedLimit) // NB: will not be true if nextRow() has handled
9771018
{
9781019
keyedLimitCount = sendGetCount(keyedProcessed);
@@ -1142,7 +1183,6 @@ class CIndexGroupAggregateSlaveActivity : public CIndexReadSlaveBase, implements
11421183
if (keyManager)
11431184
prepareManager(keyManager);
11441185

1145-
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
11461186
while (true)
11471187
{
11481188
const void *key = indexInput->nextKey();
@@ -1301,7 +1341,6 @@ class CIndexCountSlaveActivity : public CIndexReadSlaveBase
13011341
if (keyManager)
13021342
prepareManager(keyManager);
13031343

1304-
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
13051344
while (true)
13061345
{
13071346
const void *key = indexInput->nextKey();

0 commit comments

Comments
 (0)