Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-30993 Fix file access for index read activity #18144

Merged
merged 1 commit into from
Feb 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 102 additions & 59 deletions thorlcr/activities/indexread/thindexreadslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
protected:
StringAttr logicalFilename;
IArrayOf<IPartDescriptor> partDescs;
bool isSuperFile = false;
IHThorIndexReadBaseArg *helper;
IHThorSourceLimitTransformExtra * limitTransformExtra;
Owned<IEngineRowAllocator> allocator;
Expand Down Expand Up @@ -76,10 +77,9 @@ class CIndexReadSlaveBase : public CSlaveActivity
rowcount_t rowLimit = RCMAX;
bool useRemoteStreaming = false;
Owned<IFileIO> lazyIFileIO;
mutable CriticalSection ioStatsCS;
mutable CriticalSection keyManagersCS; // CS for any updates to keyManagers
unsigned fileTableStart = NotFound;
CStatsContextLogger contextLogger;
CStatsCtxLoggerDeltaUpdater statsUpdater;
std::vector<Owned<CStatsContextLogger>> contextLoggers;

class TransformCallback : implements IThorIndexCallback , public CSimpleInterface
{
Expand All @@ -98,7 +98,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
if (!keyManager)
throw MakeActivityException(&activity, 0, "Callback attempting to read blob with no key manager - index being read remotely?");
needsBlobCleaning = true;
return (byte *) keyManager->loadBlob(id, dummy, &activity.contextLogger);
return (byte *) keyManager->loadBlob(id, dummy, nullptr);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blob stats will be address in this ticket: https://track.hpccsystems.com/browse/HPCC-31002

}
void prepareManager(IKeyManager *_keyManager)
{
Expand Down Expand Up @@ -166,10 +166,9 @@ class CIndexReadSlaveBase : public CSlaveActivity
unsigned projectedFormatCrc = helper->getProjectedFormatCrc();
IOutputMetaData *projectedFormat = helper->queryProjectedDiskRecordSize();

unsigned p = partNum;
while (p<partDescs.ordinality()) // will process all parts if localMerge
for (unsigned p = partNum; p<partDescs.ordinality(); p++) // will process all parts if localMerge
{
IPartDescriptor &part = partDescs.item(p++);
IPartDescriptor &part = partDescs.item(p);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in lines 169-171 is so that the correct contextLogger can be referenced with [p]. If p++ remains here then contextLoggers[p] cannot be used in line 306.

unsigned crc=0;
part.getCrc(crc);

Expand Down Expand Up @@ -273,7 +272,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
continue; // try next copy and ultimately failover to local when no more copies
}
ActPrintLog("[part=%d]: reading remote dafilesrv index '%s' (logical file = %s)", partNum, path.str(), logicalFilename.get());
partNum = p;
partNum = (p+1); // NB: returning next part number to process.
return indexLookup.getClear();
}
}
Expand All @@ -296,15 +295,16 @@ class CIndexReadSlaveBase : public CSlaveActivity
part.queryOwner().getClusterLabel(0, planeName);
blockedSize = getBlockedFileIOSize(planeName);
}
lazyIFileIO.setown(queryThor().queryFileCache().lookupIFileIO(*this, logicalFilename, part, nullptr, indexReadActivityStatistics, blockedSize));
lazyIFileIO.setown(queryThor().queryFileCache().lookupIFileIO(*this, logicalFilename, part, nullptr, indexReadFileStatistics, blockedSize));

RemoteFilename rfn;
part.getFilename(0, rfn);
StringBuffer path;
rfn.getPath(path); // NB: use for tracing only, IDelayedFile uses IPartDescriptor and any copy

Owned<IKeyIndex> keyIndex = createKeyIndex(path, crc, *lazyIFileIO, (unsigned) -1, false);
Owned<IKeyManager> klManager = createLocalKeyManager(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndex, &contextLogger, helper->hasNewSegmentMonitors(), false);
IContextLogger * contextLogger = isSuperFile?contextLoggers[p]:contextLoggers[0];
Owned<IKeyManager> klManager = createLocalKeyManager(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndex, contextLogger, helper->hasNewSegmentMonitors(), false);
if (localMerge)
{
if (!keyIndexSet)
Expand All @@ -315,7 +315,10 @@ class CIndexReadSlaveBase : public CSlaveActivity
translators.append(translator.getClear());
}
keyIndexSet->addIndex(keyIndex.getClear());
keyManagers.append(*klManager.getLink());
{
CriticalBlock b(keyManagersCS);
keyManagers.append(*klManager.getLink());
}
keyManager = klManager;
}
else
Expand All @@ -325,13 +328,17 @@ class CIndexReadSlaveBase : public CSlaveActivity
if (translator)
klManager->setLayoutTranslator(&translator->queryTranslator());
translators.append(translator.getClear());
keyManagers.append(*klManager.getLink());
{
CriticalBlock b(keyManagersCS);
keyManagers.append(*klManager.getLink());
}
keyManager = klManager;
partNum = p;
partNum = (p+1); // NB: returning next part number to process.
return createIndexLookup(keyManager);
}
}
keyMergerManager.setown(createKeyMerger(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndexSet, seekGEOffset, &contextLogger, helper->hasNewSegmentMonitors(), false));
//Not tracking jhtree cache stats in KeyMerger at the moment. Future: something to consider
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jira has been created to address KeyMerger stats: https://track.hpccsystems.com/browse/HPCC-30999

keyMergerManager.setown(createKeyMerger(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndexSet, seekGEOffset, nullptr, helper->hasNewSegmentMonitors(), false));
const ITranslator *translator = translators.item(0);
if (translator)
keyMergerManager->setLayoutTranslator(&translator->queryTranslator());
Expand All @@ -348,40 +355,12 @@ class CIndexReadSlaveBase : public CSlaveActivity
else
return nullptr;
}
void mergeFileStats(IPartDescriptor *partDesc, IFileIO *partIO)
{
if (fileStats.size()>0)
{
ISuperFileDescriptor * superFDesc = partDesc->queryOwner().querySuperFileDescriptor();
if (superFDesc)
{
unsigned subfile, lnum;
if(superFDesc->mapSubPart(partDesc->queryPartIndex(), subfile, lnum))
mergeStats(*fileStats[fileTableStart+subfile], partIO);
}
else
mergeStats(*fileStats[fileTableStart], partIO);
}
}
void updateStats()
{
// NB: updateStats() should always be called whilst ioStatsCS is held.
if (lazyIFileIO)
{
mergeStats(inactiveStats, lazyIFileIO);
if (currentPart<partDescs.ordinality())
mergeFileStats(&partDescs.item(currentPart), lazyIFileIO);
}
}
void configureNextInput()
{
if (currentManager)
{
resetManager(currentManager);
currentManager = nullptr;

CriticalBlock b(ioStatsCS);
updateStats();
lazyIFileIO.clear();
}
IKeyManager *keyManager = nullptr;
Expand Down Expand Up @@ -433,7 +412,6 @@ class CIndexReadSlaveBase : public CSlaveActivity
if (eoi)
return nullptr;
dbgassertex(currentInput);
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
const void *ret = nullptr;
while (true)
{
Expand Down Expand Up @@ -528,7 +506,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
}
public:
CIndexReadSlaveBase(CGraphElementBase *container)
: CSlaveActivity(container, indexReadActivityStatistics), callback(*this), contextLogger(jhtreeCacheStatistics, thorJob), statsUpdater(jhtreeCacheStatistics, *this, contextLogger)
: CSlaveActivity(container, indexReadActivityStatistics), callback(*this)
{
helper = (IHThorIndexReadBaseArg *)container->queryHelper();
limitTransformExtra = nullptr;
Expand All @@ -555,7 +533,6 @@ class CIndexReadSlaveBase : public CSlaveActivity
break;
if (keyManager)
prepareManager(keyManager);
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
if (hard) // checkCount checks hard key count only.
count += indexInput->checkCount(keyedLimit-count); // part max, is total limit [keyedLimit] minus total so far [count]
else
Expand Down Expand Up @@ -589,7 +566,10 @@ class CIndexReadSlaveBase : public CSlaveActivity
partDescs.kill();
keyIndexSet.clear();
translators.kill();
keyManagers.kill();
{
CriticalBlock b(keyManagersCS);
keyManagers.kill();
}
keyMergerManager.clear();
}
else
Expand All @@ -607,6 +587,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
IPartDescriptor &part0 = partDescs.item(0);
IFileDescriptor &fileDesc = part0.queryOwner();
ISuperFileDescriptor *super = fileDesc.querySuperFileDescriptor();
isSuperFile = super != nullptr;

if ((0 == (helper->getFlags() & TIRusesblob)) && !localMerge)
{
Expand Down Expand Up @@ -684,7 +665,17 @@ class CIndexReadSlaveBase : public CSlaveActivity
}
}
data.read(fileTableStart);
setupSpace4FileStats(fileTableStart, reInit, super!=nullptr, super?super->querySubFiles():0, indexReadActivityStatistics);
setupSpace4FileStats(fileTableStart, reInit, isSuperFile, isSuperFile?super->querySubFiles():0, indexReadFileStatistics);
if (isSuperFile)
{
// Only superfiles required multiple context loggers to track sublevel stats
for(unsigned i = 0; i < parts; ++i)
contextLoggers.push_back(new CStatsContextLogger(jhtreeCacheStatistics, thorJob));
}
else
{
contextLoggers.push_back(new CStatsContextLogger(jhtreeCacheStatistics, thorJob));
}
}
}
// IThorDataLink
Expand Down Expand Up @@ -719,10 +710,70 @@ class CIndexReadSlaveBase : public CSlaveActivity
virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const
{
PARENT::gatherActiveStats(activeStats);
// *) jhtree stats will have been tracked as follows
// - For superfiles, there will be 1 IContextLogger in contextLoggers[] for each part
// - In all other cases, use 1st IContextLogger in contextLoggers[] for all parts
// *) File IO stats will have been tracked as follows
// - the io stats will be tracked in keyManagers[]
// - (There will be exactly 1 keyManager per part)
// Note: the KeyManagers[], contextLoggers[] and fileStats[] are not necessarilly the same size
// - KeyManagers[] will always be the same size as partDescs[]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment doesn't seem to be quite true, since line 742 is testing it is true. Worth adding a comment where when it might not be true.

// - contextLoggers[] will be the size of the number of subfiles for superfiles, otherwise 1
// - fileStats[] may be size of #subfiles for superfiles, size 1 for dynamic/variable files, or
// zero for everything else
// This function will:
// 1) In all cases, activeStats will have activity level jhtree & io stats
// 2) For superfiles and variable/dynamic files, fileStats[] will have jhtree & io stats
// (If there are no fileStats, the file level stats can be extracted from activeStats)
// TODO:
// * When localMerge==true or when useMerger==true, stats may not correct. Revisit
// this section to ensure that the code handles these correctly.
if (partDescs.ordinality())
{
CriticalBlock b(ioStatsCS);
if (lazyIFileIO)
mergeStats(activeStats, lazyIFileIO);
// reset required because within loop below, mergeStats() is used to build up stats for each file
for (auto & fileStatItem: fileStats)
fileStatItem->reset();
ISuperFileDescriptor *superFDesc = partDescs.item(0).queryOwner().querySuperFileDescriptor();
for (unsigned partNum=0; partNum<partDescs.ordinality(); partNum++)
{
IKeyManager * keyManager;
{
CriticalBlock b(keyManagersCS);
if (!keyManagers.isItem(partNum))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should there be a test (earlier) to exclude (and later handle differently) the merger case?
The test here may mean it passed for part 1, and bails out for part 2+ in the merge case, which will only have 1 IKeyManager in keyManagers.

Copy link
Contributor Author

@shamser shamser Jan 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would merging stats even when merger is used be a problem? The merger was going to handled separately with https://track.hpccsystems.com/browse/HPCC-30999

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it will cause exceptions, and it looks like stats will be correct in the non-super case.
But not clear to the reader what will happen in localMerge=true case at the moment.
I think worth a comment block that explains current caveat and that it needs refactoring, in if (localMerge) case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment added line 728-730.

continue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shamser could this break rather than continuing? partNum will only get larger, and the test above will always fail. (If something else is adding them at the same time I guess it could succeed, but will be blank.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a critical section to protect against changes to KeyManagers.

keyManager = &keyManagers.item(partNum);
}
if (fileStats.size()>0)
{
unsigned subfile = 0;
// For super file workout entry number in filestats for subfile
// (For non-superfiles, subfile will be zero, so first entry from fileStats will be used)
if (isSuperFile)
{
unsigned lnum;
if(!superFDesc->mapSubPart(partDescs.item(partNum).queryPartIndex(), subfile, lnum))
continue; // should not happen
}
CRuntimeStatisticCollection * fileStatItem = fileStats[fileTableStart+subfile];
keyManager->mergeStats(*fileStatItem); // file level IO stats
// Merge subfile jhtree stats into fileStats and activeStats
if (isSuperFile)
{
fileStatItem->merge(contextLoggers[partNum]->queryStats());
activeStats.merge(contextLoggers[partNum]->queryStats());
}
}
// IO stats always needed in activeStats for activity level stats
keyManager->mergeStats(activeStats);
}
// ContextLoggers for non-superfile, not merged to activeStats or fileStats
// (Avoid doing in loop above, to avoid merging same jhtree stats multiple times)
if (!isSuperFile)
{
activeStats.merge(contextLoggers[0]->queryStats());
if (fileStats.size()>0)
fileStats[fileTableStart]->merge(contextLoggers[0]->queryStats());
}
}
activeStats.setStatistic(StNumRowsProcessed, progress);
}
Expand All @@ -735,11 +786,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
}
virtual void done() override
{
{
CriticalBlock b(ioStatsCS);
updateStats();
lazyIFileIO.clear();
}
lazyIFileIO.clear();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stats gathering has been moved to gatherActiveStats so that the stats are updated periodically whilst the activity is running.

PARENT::done();
}
};
Expand Down Expand Up @@ -819,7 +866,6 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase
helper->mapOutputToInput(tempBuilder, seek, numFields); // NOTE - weird interface to mapOutputToInput means that it STARTS writing at seekGEOffset...
rawSeek = (byte *)temp;
}
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
if (!currentManager->lookupSkip(rawSeek, seekGEOffset, seekSize))
return NULL;
const byte *row = currentManager->queryKeyBuffer();
Expand Down Expand Up @@ -972,7 +1018,6 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase
// IRowStream
virtual void stop() override
{
CStatsScopedDeltaUpdater scoped(statsUpdater);
if (RCMAX != keyedLimit) // NB: will not be true if nextRow() has handled
{
keyedLimitCount = sendGetCount(keyedProcessed);
Expand Down Expand Up @@ -1142,7 +1187,6 @@ class CIndexGroupAggregateSlaveActivity : public CIndexReadSlaveBase, implements
if (keyManager)
prepareManager(keyManager);

CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
while (true)
{
const void *key = indexInput->nextKey();
Expand Down Expand Up @@ -1301,7 +1345,6 @@ class CIndexCountSlaveActivity : public CIndexReadSlaveBase
if (keyManager)
prepareManager(keyManager);

CStatsScopedThresholdDeltaUpdater scoped(statsUpdater);
while (true)
{
const void *key = indexInput->nextKey();
Expand Down
Loading