Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.4.x'
Browse files Browse the repository at this point in the history
Signed-off-by: Gordon Smith <[email protected]>

# Conflicts:
#	helm/hpcc/Chart.yaml
#	helm/hpcc/templates/_helpers.tpl
#	helm/hpcc/templates/dafilesrv.yaml
#	helm/hpcc/templates/dali.yaml
#	helm/hpcc/templates/dfuserver.yaml
#	helm/hpcc/templates/eclagent.yaml
#	helm/hpcc/templates/eclccserver.yaml
#	helm/hpcc/templates/eclscheduler.yaml
#	helm/hpcc/templates/esp.yaml
#	helm/hpcc/templates/localroxie.yaml
#	helm/hpcc/templates/roxie.yaml
#	helm/hpcc/templates/sasha.yaml
#	helm/hpcc/templates/thor.yaml
#	version.cmake
  • Loading branch information
GordonSmith committed Nov 12, 2023
2 parents 45e67dc + cef0bf9 commit ab2b917
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 36 deletions.
54 changes: 54 additions & 0 deletions docs/EN_US/ECLStandardLibraryReference/SLR-Mods/getElapsedMs.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE sect1 PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN"
"http://www.oasis-open.org/docbook/xml/4.5/docbookx.dtd">
<sect1 id="getElapsedMs">
<title>getElapsedMs</title>

<para><emphasis>result</emphasis> <emphasis role="bold"> :=
STD.System.Log.getElapsedMs<indexterm>
<primary>STD.System.Log.getElapsedMs</primary>
</indexterm> <indexterm>
<primary>System.Log.getElapsedMs</primary>
</indexterm> <indexterm>
<primary>Log.getElapsedMs</primary>
</indexterm> <indexterm>
<primary>getElapsedMs</primary>
</indexterm></emphasis><emphasis></emphasis><emphasis role="bold">
();</emphasis></para>

<informaltable colsep="1" frame="all" rowsep="1">
<tgroup cols="2">
<colspec colwidth="80.50pt" />

<colspec />

<tbody>
<row>
<entry>Return:</entry>

<entry>getElapsedMs returns returns the elapsed time in
milliseconds.</entry>
</row>
</tbody>
</tgroup>
</informaltable>

<para>The <emphasis role="bold">getElapsedMs </emphasis>function returns the
current elapsed query time (in ms) in Roxie. </para>

<para>This is the elapsed time when STD.System.Log.getElapsedMs() is called.
Because ECL is a declarative language, code is not necessarily executed in
sequence. You have to be careful when trying to get the elapsed time for a
particular point in your code. You can look at the Workunit graphs to see
the exact point at which the activity executes. </para>

<para><emphasis role="bold">For use in Roxie only</emphasis>. An error is
returned if you try to run on Thor or hThor.</para>

<para>Example:</para>

<programlisting format="linespecific" lang="ECL" role="runnable">IMPORT STD;
STD.System.Debug.Sleep (1054); // pause processing for 1054 milliseconds.
OUTPUT(STD.System.Log.getElapsedMs(), NAMED('Elapsed')); //returns total time elapsed
</programlisting>
</sect1>
3 changes: 3 additions & 0 deletions docs/EN_US/ECLStandardLibraryReference/SLR-includer.xml
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,9 @@

<xi:include href="ECLStandardLibraryReference/SLR-Mods/GenerateGloballyUniqueID.xml"
xmlns:xi="http://www.w3.org/2001/XInclude" />

<xi:include href="ECLStandardLibraryReference/SLR-Mods/getElapsedMs.xml"
xmlns:xi="http://www.w3.org/2001/XInclude" />
</chapter>

<chapter id="Auditing">
Expand Down
6 changes: 3 additions & 3 deletions esp/src/eclwatch/stub.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ define([
if (modernMode === String(true) && hpccWidget !== "IFrameWidget") {
switch (hpccWidget) {
case "WUDetailsWidget":
window.location.replace(`/#/workunits/${params.Wuid}`);
window.location.replace(`/esp/files/index.html#/workunits/${params.Wuid}`);
break;
case "GraphsWUWidget":
window.location.replace(`/#/workunits/${params.Wuid}/metrics`);
window.location.replace(`/esp/files/index.html#/workunits/${params.Wuid}/metrics`);
break;
case "TopologyWidget":
case "DiskUsageWidget":
Expand All @@ -48,7 +48,7 @@ define([
loadUI();
break;
default:
window.location.replace("/");
window.location.replace("/esp/files/index.html");
}
} else {
loadUI();
Expand Down
5 changes: 5 additions & 0 deletions roxie/udplib/udptrs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1338,6 +1338,11 @@ class CMessagePacker : implements IMessagePacker, public CInterface
}
else
{
//Edge case - ensure that the length of a variable length record does not span a packet
//this can occur when large rows > data_buffer_size are being appended to the buffer.
if (variable && ((data_buffer_size - data_used) < sizeof(RecordLengthType)))
flush(false);

while (len)
{
if (!part_buffer)
Expand Down
3 changes: 2 additions & 1 deletion system/jlib/jstats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1322,7 +1322,8 @@ const StatisticsMapping noStatistics({});
const StatisticsMapping jhtreeCacheStatistics({ StNumIndexSeeks, StNumIndexScans, StNumPostFiltered, StNumIndexWildSeeks,
StNumNodeCacheAdds, StNumLeafCacheAdds, StNumBlobCacheAdds, StNumNodeCacheHits, StNumLeafCacheHits, StNumBlobCacheHits, StCycleNodeLoadCycles, StCycleLeafLoadCycles,
StCycleBlobLoadCycles, StCycleNodeReadCycles, StCycleLeafReadCycles, StCycleBlobReadCycles, StNumNodeDiskFetches, StNumLeafDiskFetches, StNumBlobDiskFetches,
StCycleNodeFetchCycles, StCycleLeafFetchCycles, StCycleBlobFetchCycles});
StCycleNodeFetchCycles, StCycleLeafFetchCycles, StCycleBlobFetchCycles, StCycleIndexCacheBlockedCycles, StNumIndexMergeCompares, StNumIndexMerges, StNumIndexSkips, StNumIndexNullSkips,
StTimeLeafLoad, StTimeLeafRead, StTimeLeafFetch, StTimeIndexCacheBlocked, StTimeNodeFetch});

const StatisticsMapping allStatistics(StKindAll);
const StatisticsMapping heapStatistics({StNumAllocations, StNumAllocationScans});
Expand Down
165 changes: 133 additions & 32 deletions testing/unittests/jlibtests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class JlibTraceTest : public CppUnit::TestFixture
CPPUNIT_TEST(testInternalSpan);
CPPUNIT_TEST(testMultiNestedSpanTraceOutput);
CPPUNIT_TEST(testNullSpan);
CPPUNIT_TEST(testClientSpanGlobalID);
CPPUNIT_TEST_SUITE_END();

const char * simulatedGlobalYaml = R"!!(global:
Expand Down Expand Up @@ -416,6 +417,31 @@ class JlibTraceTest : public CppUnit::TestFixture
}
}

void testClientSpanGlobalID()
{
Owned<IProperties> mockHTTPHeaders = createProperties();
createMockHTTPHeaders(mockHTTPHeaders, true); //includes global ID

Owned<ISpan> serverSpan = queryTraceManager().createServerSpan("propegatedServerSpan", mockHTTPHeaders);
Owned<ISpan> clientSpan = serverSpan->createClientSpan("clientSpanWithGlobalID");

//retrieve serverSpan context with the intent to interrogate attributes
{
Owned<IProperties> retrievedClientSpanCtxAttributes = createProperties();
bool getClientSpanCtxSuccess = clientSpan->getSpanContext(retrievedClientSpanCtxAttributes.get(), false);

CPPUNIT_ASSERT_EQUAL_MESSAGE("Unexpected getSpanContext failure detected on client span", true, getClientSpanCtxSuccess);

CPPUNIT_ASSERT_MESSAGE("Unexpected GlobalID detected",
strsame("IncomingUGID", retrievedClientSpanCtxAttributes->queryProp(kGlobalIdHttpHeaderName)));
CPPUNIT_ASSERT_MESSAGE("Unexpected CallerID detected",
strsame("IncomingCID", retrievedClientSpanCtxAttributes->queryProp(kCallerIdHttpHeaderName)));

CPPUNIT_ASSERT_EQUAL_MESSAGE("Unexpected empty TraceID detected", false, isEmptyString(retrievedClientSpanCtxAttributes->queryProp("traceID")));
CPPUNIT_ASSERT_EQUAL_MESSAGE("Unexpected empty SpanID detected", false, isEmptyString(retrievedClientSpanCtxAttributes->queryProp("spanID")));
}
}

void testPropegatedServerSpan()
{
Owned<IProperties> mockHTTPHeaders = createProperties();
Expand Down Expand Up @@ -3109,15 +3135,15 @@ class JlibCompressionTestsStress : public CppUnit::TestFixture
CPPUNIT_TEST(test);
CPPUNIT_TEST_SUITE_END();

static constexpr size32_t sz = 100*0x100000; // 100MB
enum CompressOpt { RowCompress, AllRowCompress, BlockCompress };
public:
void test()
{
try
{
size32_t sz = 100*0x100000; // 100MB
MemoryBuffer src;
src.ensureCapacity(sz);
MemoryBuffer compressed;
const char *aesKey = "012345678901234567890123";
Owned<ICompressHandlerIterator> iter = getCompressHandlerIterator();

Expand Down Expand Up @@ -3147,53 +3173,128 @@ class JlibCompressionTestsStress : public CppUnit::TestFixture
}
}

DBGLOG("Algorithm || Compression Time (ms) || Decompression Time (ms) || Compression Ratio");
DBGLOG("Algorithm(options) || Comp(ms) || Deco(ms) || 200MB/s (w,r) || 1GB/s (w,r) || 5GB/s (w,r) || Ratio [cLen]");
DBGLOG(" || || || 2Gb/s || 10Gb/s || 50Gb/s ||");

unsigned time200MBs = transferTimeMs(sz, 200000000);
unsigned time1GBs = transferTimeMs(sz, 1000000000);
unsigned time5GBs = transferTimeMs(sz, 5000000000);
DBGLOG("%19s || %8u || %8u || %4u(%4u,%4u) || %4u(%4u,%4u) || %4u(%4u,%4u) || %5.2f [%u]", "uncompressed", 0, 0,
time200MBs, time200MBs, time200MBs, time1GBs, time1GBs, time1GBs, time5GBs, time5GBs, time5GBs, 1.0, sz);
ForEach(*iter)
{
compressed.clear();
ICompressHandler &handler = iter->query();
const char * type = handler.queryType();
//Ignore unusual compressors with no expanders...
if (strieq(handler.queryType(), "randrow"))
if (strieq(type, "randrow"))
continue;
Owned<ICompressor> compressor = handler.getCompressor(streq("AES", handler.queryType()) ? aesKey: nullptr);
const char * options = streq("AES", handler.queryType()) ? aesKey: "";
if (streq(type, "LZ4HC"))
{
testCompressor(handler, "hclevel=3", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=4", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=5", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=6", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=8", rowSz, src.length(), src.bytes(), RowCompress);
testCompressor(handler, "hclevel=10", rowSz, src.length(), src.bytes(), RowCompress);
}
testCompressor(handler, options, rowSz, src.length(), src.bytes(), RowCompress);
if (streq(type, "LZ4"))
{
testCompressor(handler, "allrow", rowSz, src.length(), src.bytes(), AllRowCompress); // block doesn't affect the compressor, just tracing
testCompressor(handler, "block", rowSz, src.length(), src.bytes(), BlockCompress); // block doesn't affect the compressor, just tracing
}
}
}
catch (IException *e)
{
EXCLOG(e, nullptr);
throw;
}
}

unsigned transferTimeMs(__int64 size, __int64 bytesPerSecond)
{
return (unsigned)((size * 1000) / bytesPerSecond);
}

CCycleTimer timer;
void testCompressor(ICompressHandler &handler, const char * options, size32_t rowSz, size32_t srcLen, const byte * src, CompressOpt opt)
{
Owned<ICompressor> compressor = handler.getCompressor(options);

MemoryBuffer compressed;
CCycleTimer timer;
const byte * ptr = src;
switch (opt)
{
case RowCompress:
{
compressor->open(compressed, sz);
compressor->startblock();
const byte *ptr = src.bytes();
const byte *ptrEnd = ptr + src.length();
const byte *ptrEnd = ptr + srcLen;
while (ptr != ptrEnd)
{
compressor->write(ptr, rowSz);
ptr += rowSz;
}
compressor->commitblock();
compressor->close();
cycle_t compressCycles = timer.elapsedCycles();

Owned<IExpander> expander = handler.getExpander(streq("AES", handler.queryType()) ? aesKey: nullptr);

timer.reset();
size32_t required = expander->init(compressed.bytes());
MemoryBuffer tgt(required);
expander->expand(tgt.bufferBase());
tgt.setWritePos(required);
cycle_t decompressCycles = timer.elapsedCycles();

float ratio = (float)(src.length()) / compressed.length();

DBGLOG("%9s || %21u || %23u || %17.2f [ %u, %u ]", handler.queryType(), (unsigned)cycle_to_millisec(compressCycles), (unsigned)cycle_to_millisec(decompressCycles), ratio, src.length(), compressed.length());

CPPUNIT_ASSERT(tgt.length() >= sz);
CPPUNIT_ASSERT(0 == memcmp(src.bufferBase(), tgt.bufferBase(), sz));
}
}
catch (IException *e)
{
EXCLOG(e, nullptr);
throw;
break;
}
case AllRowCompress:
{
compressor->open(compressed, sz);
compressor->startblock();
compressor->write(ptr, sz);
compressor->commitblock();
compressor->close();
break;
}
case BlockCompress:
{
void * target = compressed.reserve(sz);
unsigned written = compressor->compressBlock(sz, target, srcLen, ptr);
compressed.setLength(written);
break;
}
}

cycle_t compressCycles = timer.elapsedCycles();
Owned<IExpander> expander = handler.getExpander(options);

timer.reset();
size32_t required = expander->init(compressed.bytes());
MemoryBuffer tgt(required);
expander->expand(tgt.bufferBase());
tgt.setWritePos(required);
cycle_t decompressCycles = timer.elapsedCycles();

float ratio = (float)(srcLen) / compressed.length();

StringBuffer name(handler.queryType());
if (options)
name.append("(").append(options).append(")");

if (name.length() > 19)
name.setLength(19);

unsigned compressTime = (unsigned)cycle_to_millisec(compressCycles);
unsigned decompressTime = (unsigned)cycle_to_millisec(decompressCycles);
unsigned compressedTime = compressTime + decompressTime;
unsigned copyTime200MBs = transferTimeMs(compressed.length(), 200000000);
unsigned copyTime1GBs = transferTimeMs(compressed.length(), 1000000000);
unsigned copyTime5GBs = transferTimeMs(compressed.length(), 5000000000);
unsigned time200MBs = copyTime200MBs + compressedTime;
unsigned time1GBs = copyTime1GBs + compressedTime;
unsigned time5GBs = copyTime5GBs + compressedTime;
DBGLOG("%19s || %8u || %8u || %4u(%4u,%4u) || %4u(%4u,%4u) || %4u(%4u,%4u) || %5.2f [%u]", name.str(), compressTime, decompressTime,
time200MBs, copyTime200MBs + compressTime, copyTime200MBs + decompressTime,
time1GBs, copyTime1GBs + compressTime, copyTime1GBs + decompressTime,
time5GBs, copyTime5GBs + compressTime, copyTime5GBs + decompressTime,
ratio, compressed.length());

CPPUNIT_ASSERT(tgt.length() >= sz);
CPPUNIT_ASSERT(0 == memcmp(src, tgt.bufferBase(), sz));
}
};

Expand Down

0 comments on commit ab2b917

Please sign in to comment.