Skip to content

Commit 961044a

Browse files
committed
HPCC-30993 Fix file access for index read activity
Signed-off-by: Shamser Ahmed <[email protected]>
1 parent e67d16a commit 961044a

File tree

1 file changed

+77
-59
lines changed

1 file changed

+77
-59
lines changed

thorlcr/activities/indexread/thindexreadslave.cpp

+77-59
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
4444
protected:
4545
StringAttr logicalFilename;
4646
IArrayOf<IPartDescriptor> partDescs;
47+
bool isSuperFile = false;
4748
IHThorIndexReadBaseArg *helper;
4849
IHThorSourceLimitTransformExtra * limitTransformExtra;
4950
Owned<IEngineRowAllocator> allocator;
@@ -76,10 +77,9 @@ class CIndexReadSlaveBase : public CSlaveActivity
7677
rowcount_t rowLimit = RCMAX;
7778
bool useRemoteStreaming = false;
7879
Owned<IFileIO> lazyIFileIO;
79-
mutable CriticalSection ioStatsCS;
80+
mutable CriticalSection keyManagersCS; // CS for any updates to keyManagers
8081
unsigned fileTableStart = NotFound;
81-
CStatsContextLogger contextLogger;
82-
CStatsCtxLoggerDeltaUpdater statsUpdater;
82+
std::vector<Owned<CStatsContextLogger>> contextLoggers;
8383

8484
class TransformCallback : implements IThorIndexCallback , public CSimpleInterface
8585
{
@@ -98,7 +98,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
9898
if (!keyManager)
9999
throw MakeActivityException(&activity, 0, "Callback attempting to read blob with no key manager - index being read remotely?");
100100
needsBlobCleaning = true;
101-
return (byte *) keyManager->loadBlob(id, dummy, &activity.contextLogger);
101+
return (byte *) keyManager->loadBlob(id, dummy, nullptr);
102102
}
103103
void prepareManager(IKeyManager *_keyManager)
104104
{
@@ -166,10 +166,9 @@ class CIndexReadSlaveBase : public CSlaveActivity
166166
unsigned projectedFormatCrc = helper->getProjectedFormatCrc();
167167
IOutputMetaData *projectedFormat = helper->queryProjectedDiskRecordSize();
168168

169-
unsigned p = partNum;
170-
while (p<partDescs.ordinality()) // will process all parts if localMerge
169+
for (unsigned p = partNum; p<partDescs.ordinality(); p++) // will process all parts if localMerge
171170
{
172-
IPartDescriptor &part = partDescs.item(p++);
171+
IPartDescriptor &part = partDescs.item(p);
173172
unsigned crc=0;
174173
part.getCrc(crc);
175174

@@ -273,7 +272,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
273272
continue; // try next copy and ultimately failover to local when no more copies
274273
}
275274
ActPrintLog("[part=%d]: reading remote dafilesrv index '%s' (logical file = %s)", partNum, path.str(), logicalFilename.get());
276-
partNum = p;
275+
partNum = (p+1);
277276
return indexLookup.getClear();
278277
}
279278
}
@@ -296,15 +295,16 @@ class CIndexReadSlaveBase : public CSlaveActivity
296295
part.queryOwner().getClusterLabel(0, planeName);
297296
blockedSize = getBlockedFileIOSize(planeName);
298297
}
299-
lazyIFileIO.setown(queryThor().queryFileCache().lookupIFileIO(*this, logicalFilename, part, nullptr, indexReadActivityStatistics, blockedSize));
298+
lazyIFileIO.setown(queryThor().queryFileCache().lookupIFileIO(*this, logicalFilename, part, nullptr, indexReadFileStatistics, blockedSize));
300299

301300
RemoteFilename rfn;
302301
part.getFilename(0, rfn);
303302
StringBuffer path;
304303
rfn.getPath(path); // NB: use for tracing only, IDelayedFile uses IPartDescriptor and any copy
305304

306305
Owned<IKeyIndex> keyIndex = createKeyIndex(path, crc, *lazyIFileIO, (unsigned) -1, false);
307-
Owned<IKeyManager> klManager = createLocalKeyManager(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndex, &contextLogger, helper->hasNewSegmentMonitors(), false);
306+
IContextLogger * contextLogger = isSuperFile?contextLoggers[p]:contextLoggers[0];
307+
Owned<IKeyManager> klManager = createLocalKeyManager(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndex, contextlogger, helper->hasNewSegmentMonitors(), false);
308308
if (localMerge)
309309
{
310310
if (!keyIndexSet)
@@ -315,7 +315,10 @@ class CIndexReadSlaveBase : public CSlaveActivity
315315
translators.append(translator.getClear());
316316
}
317317
keyIndexSet->addIndex(keyIndex.getClear());
318-
keyManagers.append(*klManager.getLink());
318+
{
319+
CriticalBlock b(keyManagersCS);
320+
keyManagers.append(*klManager.getLink());
321+
}
319322
keyManager = klManager;
320323
}
321324
else
@@ -325,13 +328,17 @@ class CIndexReadSlaveBase : public CSlaveActivity
325328
if (translator)
326329
klManager->setLayoutTranslator(&translator->queryTranslator());
327330
translators.append(translator.getClear());
328-
keyManagers.append(*klManager.getLink());
331+
{
332+
CriticalBlock b(keyManagersCS);
333+
keyManagers.append(*klManager.getLink());
334+
}
329335
keyManager = klManager;
330-
partNum = p;
336+
partNum = (p+1);
331337
return createIndexLookup(keyManager);
332338
}
333339
}
334-
keyMergerManager.setown(createKeyMerger(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndexSet, seekGEOffset, &contextLogger, helper->hasNewSegmentMonitors(), false));
340+
//Not tracking jhtree cache stats in KeyMerger at the moment. Future: something to consider
341+
keyMergerManager.setown(createKeyMerger(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndexSet, seekGEOffset, nullptr, helper->hasNewSegmentMonitors(), false));
335342
const ITranslator *translator = translators.item(0);
336343
if (translator)
337344
keyMergerManager->setLayoutTranslator(&translator->queryTranslator());
@@ -348,40 +355,12 @@ class CIndexReadSlaveBase : public CSlaveActivity
348355
else
349356
return nullptr;
350357
}
351-
void mergeFileStats(IPartDescriptor *partDesc, IFileIO *partIO)
352-
{
353-
if (fileStats.size()>0)
354-
{
355-
ISuperFileDescriptor * superFDesc = partDesc->queryOwner().querySuperFileDescriptor();
356-
if (superFDesc)
357-
{
358-
unsigned subfile, lnum;
359-
if(superFDesc->mapSubPart(partDesc->queryPartIndex(), subfile, lnum))
360-
mergeStats(*fileStats[fileTableStart+subfile], partIO);
361-
}
362-
else
363-
mergeStats(*fileStats[fileTableStart], partIO);
364-
}
365-
}
366-
void updateStats()
367-
{
368-
// NB: updateStats() should always be called whilst ioStatsCS is held.
369-
if (lazyIFileIO)
370-
{
371-
mergeStats(inactiveStats, lazyIFileIO);
372-
if (currentPart<partDescs.ordinality())
373-
mergeFileStats(&partDescs.item(currentPart), lazyIFileIO);
374-
}
375-
}
376358
void configureNextInput()
377359
{
378360
if (currentManager)
379361
{
380362
resetManager(currentManager);
381363
currentManager = nullptr;
382-
383-
CriticalBlock b(ioStatsCS);
384-
updateStats();
385364
lazyIFileIO.clear();
386365
}
387366
IKeyManager *keyManager = nullptr;
@@ -433,7 +412,6 @@ class CIndexReadSlaveBase : public CSlaveActivity
433412
if (eoi)
434413
return nullptr;
435414
dbgassertex(currentInput);
436-
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
437415
const void *ret = nullptr;
438416
while (true)
439417
{
@@ -528,7 +506,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
528506
}
529507
public:
530508
CIndexReadSlaveBase(CGraphElementBase *container)
531-
: CSlaveActivity(container, indexReadActivityStatistics), callback(*this), contextLogger(jhtreeCacheStatistics, thorJob), statsUpdater(jhtreeCacheStatistics, *this, contextLogger)
509+
: CSlaveActivity(container, indexReadActivityStatistics), callback(*this)
532510
{
533511
helper = (IHThorIndexReadBaseArg *)container->queryHelper();
534512
limitTransformExtra = nullptr;
@@ -555,7 +533,6 @@ class CIndexReadSlaveBase : public CSlaveActivity
555533
break;
556534
if (keyManager)
557535
prepareManager(keyManager);
558-
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
559536
if (hard) // checkCount checks hard key count only.
560537
count += indexInput->checkCount(keyedLimit-count); // part max, is total limit [keyedLimit] minus total so far [count]
561538
else
@@ -589,7 +566,10 @@ class CIndexReadSlaveBase : public CSlaveActivity
589566
partDescs.kill();
590567
keyIndexSet.clear();
591568
translators.kill();
592-
keyManagers.kill();
569+
{
570+
CriticalBlock b(keyManagersCS);
571+
keyManagers.kill();
572+
}
593573
keyMergerManager.clear();
594574
}
595575
else
@@ -607,6 +587,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
607587
IPartDescriptor &part0 = partDescs.item(0);
608588
IFileDescriptor &fileDesc = part0.queryOwner();
609589
ISuperFileDescriptor *super = fileDesc.querySuperFileDescriptor();
590+
isSuperFile = super != nullptr;
610591

611592
if ((0 == (helper->getFlags() & TIRusesblob)) && !localMerge)
612593
{
@@ -684,7 +665,17 @@ class CIndexReadSlaveBase : public CSlaveActivity
684665
}
685666
}
686667
data.read(fileTableStart);
687-
setupSpace4FileStats(fileTableStart, reInit, super!=nullptr, super?super->querySubFiles():0, indexReadActivityStatistics);
668+
setupSpace4FileStats(fileTableStart, reInit, isSuperFile, isSuperFile?super->querySubFiles():0, indexReadFileStatistics);
669+
// One contextLogger required for non-super files. Superfiles require one context logger per subfile
670+
if (isSuperFile)
671+
{
672+
for(unsigned i = 0; i < parts; ++i)
673+
contextLoggers.push_back(new CStatsContextLogger(jhtreeCacheStatistics, thorJob));
674+
}
675+
else
676+
{
677+
contextLoggers.push_back(new CStatsContextLogger(jhtreeCacheStatistics, thorJob));
678+
}
688679
}
689680
}
690681
// IThorDataLink
@@ -719,11 +710,46 @@ class CIndexReadSlaveBase : public CSlaveActivity
719710
virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const
720711
{
721712
PARENT::gatherActiveStats(activeStats);
713+
if (partDescs.ordinality())
722714
{
723-
CriticalBlock b(ioStatsCS);
724-
if (lazyIFileIO)
725-
mergeStats(activeStats, lazyIFileIO);
715+
// reset required because within loop below, mergeStats() is used to build up stats for each file
716+
for (auto & fileStatItem: fileStats)
717+
fileStatItem->reset();
718+
ISuperFileDescriptor *superFDesc = partDescs.item(0).queryOwner().querySuperFileDescriptor();
719+
for (unsigned partNum=0; partNum<partDescs.ordinality(); partNum++)
720+
{
721+
// If it is a superfile, track stats at subfile level with fileStats[fileTableStart+subfile]
722+
// if not, one set of stats for the whole file with fileStats[fileTableStart]
723+
unsigned subfile = 0;
724+
if (isSuperFile) // if it's not superfile, subfile is always 0 (otherwise it is the subfile number)
725+
{
726+
unsigned lnum;
727+
if(!superFDesc->mapSubPart(partDescs.item(partNum).queryPartIndex(), subfile, lnum))
728+
continue; // should not happen
729+
}
730+
IKeyManager * keyManager;
731+
{
732+
CriticalBlock b(keyManagersCS);
733+
if (!keyManagers.isItem(partNum))
734+
continue;
735+
keyManager = &keyManagers.item(partNum);
736+
}
737+
if (fileStats.size()>0) // fileStats used for superfiles where stats are tracked at subfile level
738+
{
739+
CRuntimeStatisticCollection * fileStatItem = fileStats[fileTableStart+subfile];
740+
keyManager->mergeStats(*fileStatItem); // for file level stats
741+
fileStatItem->merge(contextLoggers[partNum]->queryStats());
742+
activeStats.merge(*fileStatItem); // for activity level stats
743+
}
744+
else
745+
{
746+
// when just 1 file, merge into activeStats (can use activeStats for file level stats)
747+
keyManager->mergeStats(activeStats);
748+
}
749+
}
726750
}
751+
if (fileStats.size()==0) // for non-superfiles, merge jhtree stats into activeStats
752+
activeStats.merge(contextLoggers[0]->queryStats());
727753
activeStats.setStatistic(StNumRowsProcessed, progress);
728754
}
729755
virtual void serializeStats(MemoryBuffer &mb) override
@@ -735,11 +761,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
735761
}
736762
virtual void done() override
737763
{
738-
{
739-
CriticalBlock b(ioStatsCS);
740-
updateStats();
741-
lazyIFileIO.clear();
742-
}
764+
lazyIFileIO.clear();
743765
PARENT::done();
744766
}
745767
};
@@ -819,7 +841,6 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase
819841
helper->mapOutputToInput(tempBuilder, seek, numFields); // NOTE - weird interface to mapOutputToInput means that it STARTS writing at seekGEOffset...
820842
rawSeek = (byte *)temp;
821843
}
822-
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
823844
if (!currentManager->lookupSkip(rawSeek, seekGEOffset, seekSize))
824845
return NULL;
825846
const byte *row = currentManager->queryKeyBuffer();
@@ -972,7 +993,6 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase
972993
// IRowStream
973994
virtual void stop() override
974995
{
975-
CStatsScopedDeltaUpdater scoped(statsUpdater);
976996
if (RCMAX != keyedLimit) // NB: will not be true if nextRow() has handled
977997
{
978998
keyedLimitCount = sendGetCount(keyedProcessed);
@@ -1142,7 +1162,6 @@ class CIndexGroupAggregateSlaveActivity : public CIndexReadSlaveBase, implements
11421162
if (keyManager)
11431163
prepareManager(keyManager);
11441164

1145-
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
11461165
while (true)
11471166
{
11481167
const void *key = indexInput->nextKey();
@@ -1301,7 +1320,6 @@ class CIndexCountSlaveActivity : public CIndexReadSlaveBase
13011320
if (keyManager)
13021321
prepareManager(keyManager);
13031322

1304-
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
13051323
while (true)
13061324
{
13071325
const void *key = indexInput->nextKey();

0 commit comments

Comments
 (0)