@@ -44,6 +44,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
44
44
protected:
45
45
StringAttr logicalFilename;
46
46
IArrayOf<IPartDescriptor> partDescs;
47
+ bool isSuperFile = false ;
47
48
IHThorIndexReadBaseArg *helper;
48
49
IHThorSourceLimitTransformExtra * limitTransformExtra;
49
50
Owned<IEngineRowAllocator> allocator;
@@ -76,10 +77,9 @@ class CIndexReadSlaveBase : public CSlaveActivity
76
77
rowcount_t rowLimit = RCMAX;
77
78
bool useRemoteStreaming = false ;
78
79
Owned<IFileIO> lazyIFileIO;
79
- mutable CriticalSection ioStatsCS;
80
+ mutable CriticalSection keyManagersCS; // CS for any updates to keyManagers
80
81
unsigned fileTableStart = NotFound;
81
- CStatsContextLogger contextLogger;
82
- CStatsCtxLoggerDeltaUpdater statsUpdater;
82
+ std::vector<Owned<CStatsContextLogger>> contextLoggers;
83
83
84
84
class TransformCallback : implements IThorIndexCallback , public CSimpleInterface
85
85
{
@@ -98,7 +98,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
98
98
if (!keyManager)
99
99
throw MakeActivityException (&activity, 0 , " Callback attempting to read blob with no key manager - index being read remotely?" );
100
100
needsBlobCleaning = true ;
101
- return (byte *) keyManager->loadBlob (id, dummy, &activity. contextLogger );
101
+ return (byte *) keyManager->loadBlob (id, dummy, nullptr );
102
102
}
103
103
void prepareManager (IKeyManager *_keyManager)
104
104
{
@@ -166,10 +166,9 @@ class CIndexReadSlaveBase : public CSlaveActivity
166
166
unsigned projectedFormatCrc = helper->getProjectedFormatCrc ();
167
167
IOutputMetaData *projectedFormat = helper->queryProjectedDiskRecordSize ();
168
168
169
- unsigned p = partNum;
170
- while (p<partDescs.ordinality ()) // will process all parts if localMerge
169
+ for (unsigned p = partNum; p<partDescs.ordinality (); p++) // will process all parts if localMerge
171
170
{
172
- IPartDescriptor &part = partDescs.item (p++ );
171
+ IPartDescriptor &part = partDescs.item (p);
173
172
unsigned crc=0 ;
174
173
part.getCrc (crc);
175
174
@@ -273,7 +272,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
273
272
continue ; // try next copy and ultimately failover to local when no more copies
274
273
}
275
274
ActPrintLog (" [part=%d]: reading remote dafilesrv index '%s' (logical file = %s)" , partNum, path.str (), logicalFilename.get ());
276
- partNum = p;
275
+ partNum = (p+ 1 ); // NB: returning next part number to process.
277
276
return indexLookup.getClear ();
278
277
}
279
278
}
@@ -296,15 +295,16 @@ class CIndexReadSlaveBase : public CSlaveActivity
296
295
part.queryOwner ().getClusterLabel (0 , planeName);
297
296
blockedSize = getBlockedFileIOSize (planeName);
298
297
}
299
- lazyIFileIO.setown (queryThor ().queryFileCache ().lookupIFileIO (*this , logicalFilename, part, nullptr , indexReadActivityStatistics , blockedSize));
298
+ lazyIFileIO.setown (queryThor ().queryFileCache ().lookupIFileIO (*this , logicalFilename, part, nullptr , indexReadFileStatistics , blockedSize));
300
299
301
300
RemoteFilename rfn;
302
301
part.getFilename (0 , rfn);
303
302
StringBuffer path;
304
303
rfn.getPath (path); // NB: use for tracing only, IDelayedFile uses IPartDescriptor and any copy
305
304
306
305
Owned<IKeyIndex> keyIndex = createKeyIndex (path, crc, *lazyIFileIO, (unsigned ) -1 , false );
307
- Owned<IKeyManager> klManager = createLocalKeyManager (helper->queryDiskRecordSize ()->queryRecordAccessor (true ), keyIndex, &contextLogger, helper->hasNewSegmentMonitors (), false );
306
+ IContextLogger * contextLogger = isSuperFile?contextLoggers[p]:contextLoggers[0 ];
307
+ Owned<IKeyManager> klManager = createLocalKeyManager (helper->queryDiskRecordSize ()->queryRecordAccessor (true ), keyIndex, contextLogger, helper->hasNewSegmentMonitors (), false );
308
308
if (localMerge)
309
309
{
310
310
if (!keyIndexSet)
@@ -315,7 +315,10 @@ class CIndexReadSlaveBase : public CSlaveActivity
315
315
translators.append (translator.getClear ());
316
316
}
317
317
keyIndexSet->addIndex (keyIndex.getClear ());
318
- keyManagers.append (*klManager.getLink ());
318
+ {
319
+ CriticalBlock b (keyManagersCS);
320
+ keyManagers.append (*klManager.getLink ());
321
+ }
319
322
keyManager = klManager;
320
323
}
321
324
else
@@ -325,13 +328,17 @@ class CIndexReadSlaveBase : public CSlaveActivity
325
328
if (translator)
326
329
klManager->setLayoutTranslator (&translator->queryTranslator ());
327
330
translators.append (translator.getClear ());
328
- keyManagers.append (*klManager.getLink ());
331
+ {
332
+ CriticalBlock b (keyManagersCS);
333
+ keyManagers.append (*klManager.getLink ());
334
+ }
329
335
keyManager = klManager;
330
- partNum = p;
336
+ partNum = (p+ 1 ); // NB: returning next part number to process.
331
337
return createIndexLookup (keyManager);
332
338
}
333
339
}
334
- keyMergerManager.setown (createKeyMerger (helper->queryDiskRecordSize ()->queryRecordAccessor (true ), keyIndexSet, seekGEOffset, &contextLogger, helper->hasNewSegmentMonitors (), false ));
340
+ // Not tracking jhtree cache stats in KeyMerger at the moment. Future: something to consider
341
+ keyMergerManager.setown (createKeyMerger (helper->queryDiskRecordSize ()->queryRecordAccessor (true ), keyIndexSet, seekGEOffset, nullptr , helper->hasNewSegmentMonitors (), false ));
335
342
const ITranslator *translator = translators.item (0 );
336
343
if (translator)
337
344
keyMergerManager->setLayoutTranslator (&translator->queryTranslator ());
@@ -348,40 +355,12 @@ class CIndexReadSlaveBase : public CSlaveActivity
348
355
else
349
356
return nullptr ;
350
357
}
351
- void mergeFileStats (IPartDescriptor *partDesc, IFileIO *partIO)
352
- {
353
- if (fileStats.size ()>0 )
354
- {
355
- ISuperFileDescriptor * superFDesc = partDesc->queryOwner ().querySuperFileDescriptor ();
356
- if (superFDesc)
357
- {
358
- unsigned subfile, lnum;
359
- if (superFDesc->mapSubPart (partDesc->queryPartIndex (), subfile, lnum))
360
- mergeStats (*fileStats[fileTableStart+subfile], partIO);
361
- }
362
- else
363
- mergeStats (*fileStats[fileTableStart], partIO);
364
- }
365
- }
366
- void updateStats ()
367
- {
368
- // NB: updateStats() should always be called whilst ioStatsCS is held.
369
- if (lazyIFileIO)
370
- {
371
- mergeStats (inactiveStats, lazyIFileIO);
372
- if (currentPart<partDescs.ordinality ())
373
- mergeFileStats (&partDescs.item (currentPart), lazyIFileIO);
374
- }
375
- }
376
358
void configureNextInput ()
377
359
{
378
360
if (currentManager)
379
361
{
380
362
resetManager (currentManager);
381
363
currentManager = nullptr ;
382
-
383
- CriticalBlock b (ioStatsCS);
384
- updateStats ();
385
364
lazyIFileIO.clear ();
386
365
}
387
366
IKeyManager *keyManager = nullptr ;
@@ -433,7 +412,6 @@ class CIndexReadSlaveBase : public CSlaveActivity
433
412
if (eoi)
434
413
return nullptr ;
435
414
dbgassertex (currentInput);
436
- CStatsScopedThresholdDeltaUpdater scoped (statsUpdater);
437
415
const void *ret = nullptr ;
438
416
while (true )
439
417
{
@@ -528,7 +506,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
528
506
}
529
507
public:
530
508
CIndexReadSlaveBase (CGraphElementBase *container)
531
- : CSlaveActivity(container, indexReadActivityStatistics), callback(*this ), contextLogger(jhtreeCacheStatistics, thorJob), statsUpdater(jhtreeCacheStatistics, * this , contextLogger)
509
+ : CSlaveActivity(container, indexReadActivityStatistics), callback(*this )
532
510
{
533
511
helper = (IHThorIndexReadBaseArg *)container->queryHelper ();
534
512
limitTransformExtra = nullptr ;
@@ -555,7 +533,6 @@ class CIndexReadSlaveBase : public CSlaveActivity
555
533
break ;
556
534
if (keyManager)
557
535
prepareManager (keyManager);
558
- CStatsScopedThresholdDeltaUpdater scoped (statsUpdater);
559
536
if (hard) // checkCount checks hard key count only.
560
537
count += indexInput->checkCount (keyedLimit-count); // part max, is total limit [keyedLimit] minus total so far [count]
561
538
else
@@ -589,7 +566,10 @@ class CIndexReadSlaveBase : public CSlaveActivity
589
566
partDescs.kill ();
590
567
keyIndexSet.clear ();
591
568
translators.kill ();
592
- keyManagers.kill ();
569
+ {
570
+ CriticalBlock b (keyManagersCS);
571
+ keyManagers.kill ();
572
+ }
593
573
keyMergerManager.clear ();
594
574
}
595
575
else
@@ -607,6 +587,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
607
587
IPartDescriptor &part0 = partDescs.item (0 );
608
588
IFileDescriptor &fileDesc = part0.queryOwner ();
609
589
ISuperFileDescriptor *super = fileDesc.querySuperFileDescriptor ();
590
+ isSuperFile = super != nullptr ;
610
591
611
592
if ((0 == (helper->getFlags () & TIRusesblob)) && !localMerge)
612
593
{
@@ -684,7 +665,17 @@ class CIndexReadSlaveBase : public CSlaveActivity
684
665
}
685
666
}
686
667
data.read (fileTableStart);
687
- setupSpace4FileStats (fileTableStart, reInit, super!=nullptr , super?super->querySubFiles ():0 , indexReadActivityStatistics);
668
+ setupSpace4FileStats (fileTableStart, reInit, isSuperFile, isSuperFile?super->querySubFiles ():0 , indexReadFileStatistics);
669
+ if (isSuperFile)
670
+ {
671
+ // Only superfiles required multiple context loggers to track sublevel stats
672
+ for (unsigned i = 0 ; i < parts; ++i)
673
+ contextLoggers.push_back (new CStatsContextLogger (jhtreeCacheStatistics, thorJob));
674
+ }
675
+ else
676
+ {
677
+ contextLoggers.push_back (new CStatsContextLogger (jhtreeCacheStatistics, thorJob));
678
+ }
688
679
}
689
680
}
690
681
// IThorDataLink
@@ -719,10 +710,70 @@ class CIndexReadSlaveBase : public CSlaveActivity
719
710
virtual void gatherActiveStats (CRuntimeStatisticCollection &activeStats) const
720
711
{
721
712
PARENT::gatherActiveStats (activeStats);
713
+ // *) jhtree stats will have been tracked as follows
714
+ // - For superfiles, there will be 1 IContextLogger in contextLoggers[] for each part
715
+ // - In all other cases, use 1st IContextLogger in contextLoggers[] for all parts
716
+ // *) File IO stats will have been tracked as follows
717
+ // - the io stats will be tracked in keyManagers[]
718
+ // - (There will be exactly 1 keyManager per part)
719
+ // Note: the KeyManagers[], contextLoggers[] and fileStats[] are not necessarilly the same size
720
+ // - KeyManagers[] will always be the same size as partDescs[]
721
+ // - contextLoggers[] will be the size of the number of subfiles for superfiles, otherwise 1
722
+ // - fileStats[] may be size of #subfiles for superfiles, size 1 for dynamic/variable files, or
723
+ // zero for everything else
724
+ // This function will:
725
+ // 1) In all cases, activeStats will have activity level jhtree & io stats
726
+ // 2) For superfiles and variable/dynamic files, fileStats[] will have jhtree & io stats
727
+ // (If there are no fileStats, the file level stats can be extracted from activeStats)
728
+ // TODO:
729
+ // * When localMerge==true or when useMerger==true, stats may not correct. Revisit
730
+ // this section to ensure that the code handles these correctly.
731
+ if (partDescs.ordinality ())
722
732
{
723
- CriticalBlock b (ioStatsCS);
724
- if (lazyIFileIO)
725
- mergeStats (activeStats, lazyIFileIO);
733
+ // reset required because within loop below, mergeStats() is used to build up stats for each file
734
+ for (auto & fileStatItem: fileStats)
735
+ fileStatItem->reset ();
736
+ ISuperFileDescriptor *superFDesc = partDescs.item (0 ).queryOwner ().querySuperFileDescriptor ();
737
+ for (unsigned partNum=0 ; partNum<partDescs.ordinality (); partNum++)
738
+ {
739
+ IKeyManager * keyManager;
740
+ {
741
+ CriticalBlock b (keyManagersCS);
742
+ if (!keyManagers.isItem (partNum))
743
+ continue ;
744
+ keyManager = &keyManagers.item (partNum);
745
+ }
746
+ if (fileStats.size ()>0 )
747
+ {
748
+ unsigned subfile = 0 ;
749
+ // For super file workout entry number in filestats for subfile
750
+ // (For non-superfiles, subfile will be zero, so first entry from fileStats will be used)
751
+ if (isSuperFile)
752
+ {
753
+ unsigned lnum;
754
+ if (!superFDesc->mapSubPart (partDescs.item (partNum).queryPartIndex (), subfile, lnum))
755
+ continue ; // should not happen
756
+ }
757
+ CRuntimeStatisticCollection * fileStatItem = fileStats[fileTableStart+subfile];
758
+ keyManager->mergeStats (*fileStatItem); // file level IO stats
759
+ // Merge subfile jhtree stats into fileStats and activeStats
760
+ if (isSuperFile)
761
+ {
762
+ fileStatItem->merge (contextLoggers[partNum]->queryStats ());
763
+ activeStats.merge (contextLoggers[partNum]->queryStats ());
764
+ }
765
+ }
766
+ // IO stats always needed in activeStats for activity level stats
767
+ keyManager->mergeStats (activeStats);
768
+ }
769
+ // ContextLoggers for non-superfile, not merged to activeStats or fileStats
770
+ // (Avoid doing in loop above, to avoid merging same jhtree stats multiple times)
771
+ if (!isSuperFile)
772
+ {
773
+ activeStats.merge (contextLoggers[0 ]->queryStats ());
774
+ if (fileStats.size ()>0 )
775
+ fileStats[fileTableStart]->merge (contextLoggers[0 ]->queryStats ());
776
+ }
726
777
}
727
778
activeStats.setStatistic (StNumRowsProcessed, progress);
728
779
}
@@ -735,11 +786,7 @@ class CIndexReadSlaveBase : public CSlaveActivity
735
786
}
736
787
virtual void done () override
737
788
{
738
- {
739
- CriticalBlock b (ioStatsCS);
740
- updateStats ();
741
- lazyIFileIO.clear ();
742
- }
789
+ lazyIFileIO.clear ();
743
790
PARENT::done ();
744
791
}
745
792
};
@@ -819,7 +866,6 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase
819
866
helper->mapOutputToInput (tempBuilder, seek, numFields); // NOTE - weird interface to mapOutputToInput means that it STARTS writing at seekGEOffset...
820
867
rawSeek = (byte *)temp;
821
868
}
822
- CStatsScopedThresholdDeltaUpdater scoped (statsUpdater);
823
869
if (!currentManager->lookupSkip (rawSeek, seekGEOffset, seekSize))
824
870
return NULL ;
825
871
const byte *row = currentManager->queryKeyBuffer ();
@@ -972,7 +1018,6 @@ class CIndexReadSlaveActivity : public CIndexReadSlaveBase
972
1018
// IRowStream
973
1019
virtual void stop () override
974
1020
{
975
- CStatsScopedDeltaUpdater scoped (statsUpdater);
976
1021
if (RCMAX != keyedLimit) // NB: will not be true if nextRow() has handled
977
1022
{
978
1023
keyedLimitCount = sendGetCount (keyedProcessed);
@@ -1142,7 +1187,6 @@ class CIndexGroupAggregateSlaveActivity : public CIndexReadSlaveBase, implements
1142
1187
if (keyManager)
1143
1188
prepareManager (keyManager);
1144
1189
1145
- CStatsScopedThresholdDeltaUpdater scoped (statsUpdater);
1146
1190
while (true )
1147
1191
{
1148
1192
const void *key = indexInput->nextKey ();
@@ -1301,7 +1345,6 @@ class CIndexCountSlaveActivity : public CIndexReadSlaveBase
1301
1345
if (keyManager)
1302
1346
prepareManager (keyManager);
1303
1347
1304
- CStatsScopedThresholdDeltaUpdater scoped (statsUpdater);
1305
1348
while (true )
1306
1349
{
1307
1350
const void *key = indexInput->nextKey ();
0 commit comments