-
Notifications
You must be signed in to change notification settings - Fork 309
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HPCC-30993 Fix file access for index read activity #18144
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,6 +44,7 @@ class CIndexReadSlaveBase : public CSlaveActivity | |
protected: | ||
StringAttr logicalFilename; | ||
IArrayOf<IPartDescriptor> partDescs; | ||
bool isSuperFile = false; | ||
IHThorIndexReadBaseArg *helper; | ||
IHThorSourceLimitTransformExtra * limitTransformExtra; | ||
Owned<IEngineRowAllocator> allocator; | ||
|
@@ -76,10 +77,9 @@ class CIndexReadSlaveBase : public CSlaveActivity | |
rowcount_t rowLimit = RCMAX; | ||
bool useRemoteStreaming = false; | ||
Owned<IFileIO> lazyIFileIO; | ||
mutable CriticalSection ioStatsCS; | ||
mutable CriticalSection keyManagersCS; // CS for any updates to keyManagers | ||
unsigned fileTableStart = NotFound; | ||
CStatsContextLogger contextLogger; | ||
CStatsCtxLoggerDeltaUpdater statsUpdater; | ||
std::vector<Owned<CStatsContextLogger>> contextLoggers; | ||
|
||
class TransformCallback : implements IThorIndexCallback , public CSimpleInterface | ||
{ | ||
|
@@ -98,7 +98,7 @@ class CIndexReadSlaveBase : public CSlaveActivity | |
if (!keyManager) | ||
throw MakeActivityException(&activity, 0, "Callback attempting to read blob with no key manager - index being read remotely?"); | ||
needsBlobCleaning = true; | ||
return (byte *) keyManager->loadBlob(id, dummy, &activity.contextLogger); | ||
return (byte *) keyManager->loadBlob(id, dummy, nullptr); | ||
} | ||
void prepareManager(IKeyManager *_keyManager) | ||
{ | ||
|
@@ -166,10 +166,9 @@ class CIndexReadSlaveBase : public CSlaveActivity | |
unsigned projectedFormatCrc = helper->getProjectedFormatCrc(); | ||
IOutputMetaData *projectedFormat = helper->queryProjectedDiskRecordSize(); | ||
|
||
unsigned p = partNum; | ||
while (p<partDescs.ordinality()) // will process all parts if localMerge | ||
for (unsigned p = partNum; p<partDescs.ordinality(); p++) // will process all parts if localMerge | ||
{ | ||
IPartDescriptor &part = partDescs.item(p++); | ||
IPartDescriptor &part = partDescs.item(p); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The changes in lines 169-171 is so that the correct contextLogger can be referenced with [p]. If p++ remains here then contextLoggers[p] cannot be used in line 306. |
||
unsigned crc=0; | ||
part.getCrc(crc); | ||
|
||
|
@@ -273,7 +272,7 @@ class CIndexReadSlaveBase : public CSlaveActivity | |
continue; // try next copy and ultimately failover to local when no more copies | ||
} | ||
ActPrintLog("[part=%d]: reading remote dafilesrv index '%s' (logical file = %s)", partNum, path.str(), logicalFilename.get()); | ||
partNum = p; | ||
partNum = (p+1); // NB: returning next part number to process. | ||
return indexLookup.getClear(); | ||
} | ||
} | ||
|
@@ -296,15 +295,16 @@ class CIndexReadSlaveBase : public CSlaveActivity | |
part.queryOwner().getClusterLabel(0, planeName); | ||
blockedSize = getBlockedFileIOSize(planeName); | ||
} | ||
lazyIFileIO.setown(queryThor().queryFileCache().lookupIFileIO(*this, logicalFilename, part, nullptr, indexReadActivityStatistics, blockedSize)); | ||
lazyIFileIO.setown(queryThor().queryFileCache().lookupIFileIO(*this, logicalFilename, part, nullptr, indexReadFileStatistics, blockedSize)); | ||
|
||
RemoteFilename rfn; | ||
part.getFilename(0, rfn); | ||
StringBuffer path; | ||
rfn.getPath(path); // NB: use for tracing only, IDelayedFile uses IPartDescriptor and any copy | ||
|
||
Owned<IKeyIndex> keyIndex = createKeyIndex(path, crc, *lazyIFileIO, (unsigned) -1, false); | ||
Owned<IKeyManager> klManager = createLocalKeyManager(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndex, &contextLogger, helper->hasNewSegmentMonitors(), false); | ||
IContextLogger * contextLogger = isSuperFile?contextLoggers[p]:contextLoggers[0]; | ||
Owned<IKeyManager> klManager = createLocalKeyManager(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndex, contextLogger, helper->hasNewSegmentMonitors(), false); | ||
if (localMerge) | ||
{ | ||
if (!keyIndexSet) | ||
|
@@ -315,7 +315,10 @@ class CIndexReadSlaveBase : public CSlaveActivity | |
translators.append(translator.getClear()); | ||
} | ||
keyIndexSet->addIndex(keyIndex.getClear()); | ||
keyManagers.append(*klManager.getLink()); | ||
{ | ||
CriticalBlock b(keyManagersCS); | ||
keyManagers.append(*klManager.getLink()); | ||
} | ||
keyManager = klManager; | ||
} | ||
else | ||
|
@@ -325,13 +328,17 @@ class CIndexReadSlaveBase : public CSlaveActivity | |
if (translator) | ||
klManager->setLayoutTranslator(&translator->queryTranslator()); | ||
translators.append(translator.getClear()); | ||
keyManagers.append(*klManager.getLink()); | ||
{ | ||
CriticalBlock b(keyManagersCS); | ||
keyManagers.append(*klManager.getLink()); | ||
} | ||
keyManager = klManager; | ||
partNum = p; | ||
partNum = (p+1); // NB: returning next part number to process. | ||
return createIndexLookup(keyManager); | ||
} | ||
} | ||
keyMergerManager.setown(createKeyMerger(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndexSet, seekGEOffset, &contextLogger, helper->hasNewSegmentMonitors(), false)); | ||
//Not tracking jhtree cache stats in KeyMerger at the moment. Future: something to consider | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Jira has been created to address KeyMerger stats: https://track.hpccsystems.com/browse/HPCC-30999 |
||
keyMergerManager.setown(createKeyMerger(helper->queryDiskRecordSize()->queryRecordAccessor(true), keyIndexSet, seekGEOffset, nullptr, helper->hasNewSegmentMonitors(), false)); | ||
const ITranslator *translator = translators.item(0); | ||
if (translator) | ||
keyMergerManager->setLayoutTranslator(&translator->queryTranslator()); | ||
|
@@ -348,40 +355,12 @@ class CIndexReadSlaveBase : public CSlaveActivity | |
else | ||
return nullptr; | ||
} | ||
void mergeFileStats(IPartDescriptor *partDesc, IFileIO *partIO) | ||
{ | ||
if (fileStats.size()>0) | ||
{ | ||
ISuperFileDescriptor * superFDesc = partDesc->queryOwner().querySuperFileDescriptor(); | ||
if (superFDesc) | ||
{ | ||
unsigned subfile, lnum; | ||
if(superFDesc->mapSubPart(partDesc->queryPartIndex(), subfile, lnum)) | ||
mergeStats(*fileStats[fileTableStart+subfile], partIO); | ||
} | ||
else | ||
mergeStats(*fileStats[fileTableStart], partIO); | ||
} | ||
} | ||
void updateStats() | ||
{ | ||
// NB: updateStats() should always be called whilst ioStatsCS is held. | ||
if (lazyIFileIO) | ||
{ | ||
mergeStats(inactiveStats, lazyIFileIO); | ||
if (currentPart<partDescs.ordinality()) | ||
mergeFileStats(&partDescs.item(currentPart), lazyIFileIO); | ||
} | ||
} | ||
void configureNextInput() | ||
{ | ||
if (currentManager) | ||
{ | ||
resetManager(currentManager); | ||
currentManager = nullptr; | ||
|
||
CriticalBlock b(ioStatsCS); | ||
updateStats(); | ||
lazyIFileIO.clear(); | ||
} | ||
IKeyManager *keyManager = nullptr; | ||
|
@@ -433,7 +412,6 @@ class CIndexReadSlaveBase : public CSlaveActivity | |
if (eoi) | ||
return nullptr; | ||
dbgassertex(currentInput); | ||
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); | ||
const void *ret = nullptr; | ||
while (true) | ||
{ | ||
|
@@ -528,7 +506,7 @@ class CIndexReadSlaveBase : public CSlaveActivity | |
} | ||
public: | ||
CIndexReadSlaveBase(CGraphElementBase *container) | ||
: CSlaveActivity(container, indexReadActivityStatistics), callback(*this), contextLogger(jhtreeCacheStatistics, thorJob), statsUpdater(jhtreeCacheStatistics, *this, contextLogger) | ||
: CSlaveActivity(container, indexReadActivityStatistics), callback(*this) | ||
{ | ||
helper = (IHThorIndexReadBaseArg *)container->queryHelper(); | ||
limitTransformExtra = nullptr; | ||
|
@@ -555,7 +533,6 @@ class CIndexReadSlaveBase : public CSlaveActivity | |
break; | ||
if (keyManager) | ||
prepareManager(keyManager); | ||
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); | ||
if (hard) // checkCount checks hard key count only. | ||
count += indexInput->checkCount(keyedLimit-count); // part max, is total limit [keyedLimit] minus total so far [count] | ||
else | ||
|
@@ -589,7 +566,10 @@ class CIndexReadSlaveBase : public CSlaveActivity | |
partDescs.kill(); | ||
keyIndexSet.clear(); | ||
translators.kill(); | ||
keyManagers.kill(); | ||
{ | ||
CriticalBlock b(keyManagersCS); | ||
keyManagers.kill(); | ||
} | ||
keyMergerManager.clear(); | ||
} | ||
else | ||
|
@@ -607,6 +587,7 @@ class CIndexReadSlaveBase : public CSlaveActivity | |
IPartDescriptor &part0 = partDescs.item(0); | ||
IFileDescriptor &fileDesc = part0.queryOwner(); | ||
ISuperFileDescriptor *super = fileDesc.querySuperFileDescriptor(); | ||
isSuperFile = super != nullptr; | ||
|
||
if ((0 == (helper->getFlags() & TIRusesblob)) && !localMerge) | ||
{ | ||
|
@@ -684,7 +665,17 @@ class CIndexReadSlaveBase : public CSlaveActivity | |
} | ||
} | ||
data.read(fileTableStart); | ||
setupSpace4FileStats(fileTableStart, reInit, super!=nullptr, super?super->querySubFiles():0, indexReadActivityStatistics); | ||
setupSpace4FileStats(fileTableStart, reInit, isSuperFile, isSuperFile?super->querySubFiles():0, indexReadFileStatistics); | ||
if (isSuperFile) | ||
{ | ||
// Only superfiles required multiple context loggers to track sublevel stats | ||
for(unsigned i = 0; i < parts; ++i) | ||
contextLoggers.push_back(new CStatsContextLogger(jhtreeCacheStatistics, thorJob)); | ||
} | ||
else | ||
{ | ||
contextLoggers.push_back(new CStatsContextLogger(jhtreeCacheStatistics, thorJob)); | ||
} | ||
} | ||
} | ||
// IThorDataLink | ||
|
@@ -719,10 +710,70 @@ class CIndexReadSlaveBase : public CSlaveActivity | |
virtual void gatherActiveStats(CRuntimeStatisticCollection &activeStats) const | ||
{ | ||
PARENT::gatherActiveStats(activeStats); | ||
// *) jhtree stats will have been tracked as follows | ||
// - For superfiles, there will be 1 IContextLogger in contextLoggers[] for each part | ||
// - In all other cases, use 1st IContextLogger in contextLoggers[] for all parts | ||
// *) File IO stats will have been tracked as follows | ||
// - the io stats will be tracked in keyManagers[] | ||
// - (There will be exactly 1 keyManager per part) | ||
// Note: the KeyManagers[], contextLoggers[] and fileStats[] are not necessarilly the same size | ||
// - KeyManagers[] will always be the same size as partDescs[] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment doesn't seem to be quite true, since line 742 is testing it is true. Worth adding a comment where when it might not be true. |
||
// - contextLoggers[] will be the size of the number of subfiles for superfiles, otherwise 1 | ||
// - fileStats[] may be size of #subfiles for superfiles, size 1 for dynamic/variable files, or | ||
// zero for everything else | ||
// This function will: | ||
// 1) In all cases, activeStats will have activity level jhtree & io stats | ||
// 2) For superfiles and variable/dynamic files, fileStats[] will have jhtree & io stats | ||
// (If there are no fileStats, the file level stats can be extracted from activeStats) | ||
// TODO: | ||
// * When localMerge==true or when useMerger==true, stats may not correct. Revisit | ||
// this section to ensure that the code handles these correctly. | ||
if (partDescs.ordinality()) | ||
{ | ||
CriticalBlock b(ioStatsCS); | ||
if (lazyIFileIO) | ||
mergeStats(activeStats, lazyIFileIO); | ||
// reset required because within loop below, mergeStats() is used to build up stats for each file | ||
for (auto & fileStatItem: fileStats) | ||
fileStatItem->reset(); | ||
ISuperFileDescriptor *superFDesc = partDescs.item(0).queryOwner().querySuperFileDescriptor(); | ||
for (unsigned partNum=0; partNum<partDescs.ordinality(); partNum++) | ||
{ | ||
IKeyManager * keyManager; | ||
{ | ||
CriticalBlock b(keyManagersCS); | ||
if (!keyManagers.isItem(partNum)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should there be a test (earlier) to exclude (and later handle differently) the merger case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would merging stats even when merger is used be a problem? The merger was going to handled separately with https://track.hpccsystems.com/browse/HPCC-30999 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it will cause exceptions, and it looks like stats will be correct in the non-super case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comment added line 728-730. |
||
continue; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @shamser could this break rather than continuing? partNum will only get larger, and the test above will always fail. (If something else is adding them at the same time I guess it could succeed, but will be blank.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a critical section to protect against changes to KeyManagers. |
||
keyManager = &keyManagers.item(partNum); | ||
} | ||
if (fileStats.size()>0) | ||
jakesmith marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
unsigned subfile = 0; | ||
// For super file workout entry number in filestats for subfile | ||
// (For non-superfiles, subfile will be zero, so first entry from fileStats will be used) | ||
if (isSuperFile) | ||
{ | ||
unsigned lnum; | ||
if(!superFDesc->mapSubPart(partDescs.item(partNum).queryPartIndex(), subfile, lnum)) | ||
continue; // should not happen | ||
} | ||
CRuntimeStatisticCollection * fileStatItem = fileStats[fileTableStart+subfile]; | ||
keyManager->mergeStats(*fileStatItem); // file level IO stats | ||
// Merge subfile jhtree stats into fileStats and activeStats | ||
if (isSuperFile) | ||
{ | ||
fileStatItem->merge(contextLoggers[partNum]->queryStats()); | ||
activeStats.merge(contextLoggers[partNum]->queryStats()); | ||
} | ||
} | ||
// IO stats always needed in activeStats for activity level stats | ||
keyManager->mergeStats(activeStats); | ||
jakesmith marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
// ContextLoggers for non-superfile, not merged to activeStats or fileStats | ||
// (Avoid doing in loop above, to avoid merging same jhtree stats multiple times) | ||
if (!isSuperFile) | ||
{ | ||
activeStats.merge(contextLoggers[0]->queryStats()); | ||
if (fileStats.size()>0) | ||
fileStats[fileTableStart]->merge(contextLoggers[0]->queryStats()); | ||
} | ||
} | ||
activeStats.setStatistic(StNumRowsProcessed, progress); | ||
} | ||
|
@@ -735,11 +786,7 @@ class CIndexReadSlaveBase : public CSlaveActivity | |
} | ||
virtual void done() override | ||
{ | ||
{ | ||
CriticalBlock b(ioStatsCS); | ||
updateStats(); | ||
lazyIFileIO.clear(); | ||
} | ||
lazyIFileIO.clear(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The stats gathering has been moved to gatherActiveStats so that the stats are updated periodically whilst the activity is running. |
||
PARENT::done(); | ||
} | ||
}; | ||
|
@@ -819,7 +866,6 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase | |
helper->mapOutputToInput(tempBuilder, seek, numFields); // NOTE - weird interface to mapOutputToInput means that it STARTS writing at seekGEOffset... | ||
rawSeek = (byte *)temp; | ||
} | ||
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); | ||
if (!currentManager->lookupSkip(rawSeek, seekGEOffset, seekSize)) | ||
return NULL; | ||
const byte *row = currentManager->queryKeyBuffer(); | ||
|
@@ -972,7 +1018,6 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase | |
// IRowStream | ||
virtual void stop() override | ||
{ | ||
CStatsScopedDeltaUpdater scoped(statsUpdater); | ||
if (RCMAX != keyedLimit) // NB: will not be true if nextRow() has handled | ||
{ | ||
keyedLimitCount = sendGetCount(keyedProcessed); | ||
|
@@ -1142,7 +1187,6 @@ class CIndexGroupAggregateSlaveActivity : public CIndexReadSlaveBase, implements | |
if (keyManager) | ||
prepareManager(keyManager); | ||
|
||
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); | ||
while (true) | ||
{ | ||
const void *key = indexInput->nextKey(); | ||
|
@@ -1301,7 +1345,6 @@ class CIndexCountSlaveActivity : public CIndexReadSlaveBase | |
if (keyManager) | ||
prepareManager(keyManager); | ||
|
||
CStatsScopedThresholdDeltaUpdater scoped(statsUpdater); | ||
while (true) | ||
{ | ||
const void *key = indexInput->nextKey(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blob stats will be address in this ticket: https://track.hpccsystems.com/browse/HPCC-31002