Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.4.x'
Browse files Browse the repository at this point in the history
Signed-off-by: Gordon Smith <[email protected]>

# Conflicts:
#	helm/hpcc/Chart.yaml
#	helm/hpcc/templates/_helpers.tpl
#	helm/hpcc/templates/dafilesrv.yaml
#	helm/hpcc/templates/dali.yaml
#	helm/hpcc/templates/dfuserver.yaml
#	helm/hpcc/templates/eclagent.yaml
#	helm/hpcc/templates/eclccserver.yaml
#	helm/hpcc/templates/eclscheduler.yaml
#	helm/hpcc/templates/esp.yaml
#	helm/hpcc/templates/localroxie.yaml
#	helm/hpcc/templates/roxie.yaml
#	helm/hpcc/templates/sasha.yaml
#	helm/hpcc/templates/thor.yaml
#	version.cmake
  • Loading branch information
GordonSmith committed Dec 7, 2023
2 parents 2a959b9 + c3a5532 commit d1d5219
Show file tree
Hide file tree
Showing 186 changed files with 2,385 additions and 286 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/build-assets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ jobs:
- os: ubuntu-20.04
name: LN
ln: true
- os: centos-8
name: LN
cmake_options_extra: ""
ln: true
- os: centos-7
name: LN
cmake_options_extra: "-DVCPKG_TARGET_TRIPLET=x64-centos-7-dynamic"
Expand Down
6 changes: 0 additions & 6 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
[submodule "esp/src/xstyle"]
path = esp/src/xstyle
url = https://github.com/hpcc-systems/xstyle.git
[submodule "esp/src/put-selector"]
path = esp/src/put-selector
url = https://github.com/hpcc-systems/put-selector.git
[submodule "esp/src/dgrid"]
path = esp/src/dgrid
url = https://github.com/hpcc-systems/dgrid.git
Expand Down
2 changes: 1 addition & 1 deletion common/wuanalysis/anaerrorcodes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ typedef enum
ANA_DISTRIB_SKEW_INPUT_ID,
ANA_DISTRIB_SKEW_OUTPUT_ID,
ANA_IOSKEW_RECORDS_ID,
ANA_UNUSED_ID, /* May re-use but don't remove to avoid changing later id's */
ANA_EXECUTE_SKEW_ID,
ANA_KJ_EXCESS_PREFILTER_ID
} AnalyzerErrorCode;

Expand Down
52 changes: 52 additions & 0 deletions common/wuanalysis/anarule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,57 @@ class IoSkewRule : public AActivityRule
const char * category;
};

class LocalExecuteSkewRule : public AActivityRule
{
public:
virtual bool isCandidate(IWuActivity & activity) const override
{
switch (activity.getAttr(WaKind))
{
case TAKfirstn: // skew is expected, so ignore
case TAKtopn:
case TAKsort:
return false;
}
return true;
}

virtual bool check(PerformanceIssue & result, IWuActivity & activity, const IAnalyserOptions & options) override
{
stat_type localExecuteMaxSkew = activity.getStatRaw(StTimeLocalExecute, StSkewMax);
if (localExecuteMaxSkew<options.queryOption(watOptSkewThreshold))
return false;

stat_type timeMaxLocalExecute = activity.getStatRaw(StTimeLocalExecute, StMaxX);
stat_type timeAvgLocalExecute = activity.getStatRaw(StTimeLocalExecute, StAvgX);
stat_type timePenalty = (timeMaxLocalExecute - timeAvgLocalExecute);;
if (timePenalty<options.queryOption(watOptMinInterestingTime))
return false;

bool inputSkewed = false;
for(unsigned edgeNo = 0; IWuEdge *wuInputEdge = activity.queryInput(edgeNo); edgeNo++)
{
if (wuInputEdge->getStatRaw(StNumRowsProcessed, StSkewMax)>options.queryOption(watOptSkewThreshold))
{
inputSkewed = true;
break;
}
}
bool outputSkewed = false;
IWuEdge *wuOutputEdge = activity.queryOutput(0);
if (wuOutputEdge && (wuOutputEdge->getStatRaw(StNumRowsProcessed, StSkewMax)>options.queryOption(watOptSkewThreshold)))
outputSkewed = true;

if (inputSkewed)
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, "Significant skew in local execute time caused by uneven input");
else if (outputSkewed)
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, "Significant skew in local execute time caused by uneven output");
else
result.set(ANA_EXECUTE_SKEW_ID, timePenalty, "Significant skew in local execute time");
return true;
}
};

class KeyedJoinExcessRejectedRowsRule : public ActivityKindRule
{
public:
Expand Down Expand Up @@ -221,4 +272,5 @@ void gatherRules(CIArrayOf<AActivityRule> & rules)
rules.append(*new IoSkewRule(StTimeDiskWriteIO, "disk write"));
rules.append(*new IoSkewRule(StTimeSpillElapsed, "spill"));
rules.append(*new KeyedJoinExcessRejectedRowsRule);
rules.append(*new LocalExecuteSkewRule);
}
114 changes: 78 additions & 36 deletions dali/base/dadfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,41 +177,49 @@ static IPropertyTree *getEmptyAttr()
return createPTree("Attr");
}

extern da_decl void calcFileCost(const char * cluster, double sizeGB, double fileAgeDays, __int64 numDiskWrites, __int64 numDiskReads, double & atRestCost, double & accessCost)
static IPropertyTree *getCostPropTree(const char *cluster)
{
Owned<IPropertyTree> plane = getStoragePlane(cluster);
Owned<IPropertyTree> global;
IPropertyTree * costPT = nullptr;

if (plane && plane->hasProp("cost/@storageAtRest"))
{
costPT = plane->queryPropTree("cost");
return plane->getPropTree("cost");
}
else
{
global.setown(getGlobalConfig());
costPT = global->queryPropTree("cost");
return getGlobalConfigSP()->getPropTree("cost");
}
}

extern da_decl double calcFileAtRestCost(const char * cluster, double sizeGB, double fileAgeDays)
{
Owned<const IPropertyTree> costPT = getCostPropTree(cluster);

if (costPT==nullptr)
{
atRestCost = 0.0;
accessCost = 0.0;
return;
}
constexpr int accessPriceScalingFactor = 10000; // read/write pricing based on 10,000 operations
return 0.0;
double atRestPrice = costPT->getPropReal("@storageAtRest", 0.0);
double readPrice = costPT->getPropReal("@storageReads", 0.0);
double writePrice = costPT->getPropReal("@storageWrites", 0.0);
double storageCostDaily = atRestPrice * 12 / 365;
atRestCost = storageCostDaily * sizeGB * fileAgeDays;
accessCost = (readPrice * numDiskReads / accessPriceScalingFactor) + (writePrice * numDiskWrites / accessPriceScalingFactor);
return storageCostDaily * sizeGB * fileAgeDays;
}

extern da_decl double calcFileAccessCost(const char * cluster, __int64 numDiskWrites, __int64 numDiskReads)
{
double atRestCost, accessCost;
calcFileCost(cluster, 0, 0, numDiskWrites, numDiskReads, atRestCost, accessCost);
return accessCost;
Owned<const IPropertyTree> costPT = getCostPropTree(cluster);

if (costPT==nullptr)
return 0.0;
constexpr int accessPriceScalingFactor = 10000; // read/write pricing based on 10,000 operations
double readPrice = costPT->getPropReal("@storageReads", 0.0);
double writePrice = costPT->getPropReal("@storageWrites", 0.0);
return (readPrice * numDiskReads / accessPriceScalingFactor) + (writePrice * numDiskWrites / accessPriceScalingFactor);
}

extern da_decl double calcFileAccessCost(IDistributedFile *f, __int64 numDiskWrites, __int64 numDiskReads)
{
StringBuffer clusterName;
// Should really specify the cluster number too, but this is the best we can do for now
f->getClusterName(0, clusterName);
return calcFileAccessCost(clusterName, numDiskWrites, numDiskReads);
}

RemoteFilename &constructPartFilename(IGroup *grp,unsigned partno,unsigned partmax,const char *name,const char *partmask,const char *partdir,unsigned copy,ClusterPartDiskMapSpec &mspec,RemoteFilename &rfn)
Expand Down Expand Up @@ -4941,27 +4949,43 @@ protected: friend class CDistributedFilePart;
double fileAgeDays = difftime(time(nullptr), dt.getSimple())/(24*60*60);
double sizeGB = getDiskSize(true, false) / ((double)1024 * 1024 * 1024);
const IPropertyTree *attrs = root->queryPropTree("Attr");
bool doLegacyAccessCostCalc = false;
__int64 numDiskWrites = 0, numDiskReads = 0;
if (attrs)
{
numDiskWrites = attrs->getPropInt64("@numDiskWrites");
numDiskReads = attrs->getPropInt64("@numDiskReads");
if (hasReadWriteCostFields(*attrs))
{
// Newer files have readCost and writeCost attributes
accessCost = cost_type2money(attrs->getPropInt64(getDFUQResultFieldName(DFUQRFreadCost)) + attrs->getPropInt64(getDFUQResultFieldName(DFUQRFwriteCost)));
}
else
{
// Costs need to be calculated from numDiskReads and numDiskWrites for legacy files
numDiskWrites = attrs->getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites));
doLegacyAccessCostCalc = true;
// NB: Costs of index reading can not be reliably estimated based on 'numDiskReads'
if (!isFileKey(*attrs))
numDiskReads = attrs->getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads));
}
}
if (isEmptyString(cluster))
{
StringArray clusterNames;
unsigned countClusters = getClusterNames(clusterNames);
for (unsigned i = 0; i < countClusters; i++)
atRestCost += calcFileAtRestCost(clusterNames[i], sizeGB, fileAgeDays);
if (countClusters && doLegacyAccessCostCalc)
{
double tmpAtRestcost, tmpAccessCost;
calcFileCost(clusterNames[i], sizeGB, fileAgeDays, numDiskWrites, numDiskReads, tmpAtRestcost, tmpAccessCost);
atRestCost += tmpAtRestcost;
accessCost += tmpAccessCost;
// NB: numDiskReads/numDiskWrites are stored at the file level, not per cluster.
// So cannot calculate accessCost per cluster, assume cost is based on 1st.
accessCost = calcFileAccessCost(clusterNames[0], numDiskWrites, numDiskReads);
}
}
else
{
calcFileCost(cluster, sizeGB, fileAgeDays, numDiskWrites, numDiskReads, atRestCost, accessCost);
atRestCost += calcFileAtRestCost(cluster, sizeGB, fileAgeDays);
if (doLegacyAccessCostCalc)
accessCost = calcFileAccessCost(cluster, numDiskWrites, numDiskReads);
}
}
};
Expand Down Expand Up @@ -13343,11 +13367,12 @@ IDFProtectedIterator *CDistributedFileDirectory::lookupProtectedFiles(const char
const char* DFUQResultFieldNames[] = { "@name", "@description", "@group", "@kind", "@modified", "@job", "@owner",
"@DFUSFrecordCount", "@recordCount", "@recordSize", "@DFUSFsize", "@size", "@workunit", "@DFUSFcluster", "@numsubfiles",
"@accessed", "@numparts", "@compressedSize", "@directory", "@partmask", "@superowners", "@persistent", "@protect", "@compressed",
"@cost", "@numDiskReads", "@numDiskWrites", "@atRestCost", "@accessCost", "@maxSkew", "@minSkew", "@maxSkewPart", "@minSkewPart" };
"@cost", "@numDiskReads", "@numDiskWrites", "@atRestCost", "@accessCost", "@maxSkew", "@minSkew", "@maxSkewPart", "@minSkewPart",
"@readCost", "@writeCost" };

extern da_decl const char* getDFUQResultFieldName(DFUQResultField feild)
extern da_decl const char* getDFUQResultFieldName(DFUQResultField field)
{
return DFUQResultFieldNames[feild];
return DFUQResultFieldNames[field];
}

IPropertyTreeIterator *deserializeFileAttrIterator(MemoryBuffer& mb, unsigned numFiles, DFUQResultField* localFilters, const char* localFilterBuf)
Expand Down Expand Up @@ -13414,6 +13439,7 @@ IPropertyTreeIterator *deserializeFileAttrIterator(MemoryBuffer& mb, unsigned nu

void setCost(IPropertyTree* file, const char *nodeGroup)
{
// Set the following dynamic fields: atRestCost, accessCost, cost and for legacy files: readCost, writeCost
StringBuffer str;
double fileAgeDays = 0.0;
if (file->getProp(getDFUQResultFieldName(DFUQRFtimemodified), str))
Expand All @@ -13428,13 +13454,29 @@ IPropertyTreeIterator *deserializeFileAttrIterator(MemoryBuffer& mb, unsigned nu
else
sizeDiskSize = file->getPropInt64(getDFUQResultFieldName(DFUQRForigsize), 0);
double sizeGB = sizeDiskSize / ((double)1024 * 1024 * 1024);
__int64 numDiskWrites = file->getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0);
__int64 numDiskReads = file->getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), 0);
double atRestCost, accessCost;
calcFileCost(nodeGroup, sizeGB, fileAgeDays, numDiskWrites, numDiskReads, atRestCost, accessCost);
file->setPropReal(getDFUQResultFieldName(DFUQRFcost), atRestCost+accessCost);
file->setPropReal(getDFUQResultFieldName(DFUQRFatRestCost), atRestCost);
file->setPropReal(getDFUQResultFieldName(DFUQRFaccessCost), accessCost);
cost_type atRestCost = money2cost_type(calcFileAtRestCost(nodeGroup, sizeGB, fileAgeDays));
file->setPropInt64(getDFUQResultFieldName(DFUQRFatRestCost), atRestCost);

// Dyamically calc and set the access cost field and for legacy files set read/write cost fields
cost_type accessCost = 0;
if (hasReadWriteCostFields(*file))
{
accessCost = file->getPropInt64(getDFUQResultFieldName(DFUQRFreadCost)) + file->getPropInt64(getDFUQResultFieldName(DFUQRFwriteCost));
}
else // Calc access cost from numDiskRead & numDiskWrites for Legacy files
{
cost_type legacyReadCost = getLegacyReadCost(*file, nodeGroup);
file->setPropInt64(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost);

cost_type legacyWriteCost = getLegacyWriteCost(*file, nodeGroup);
file->setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), legacyWriteCost);

accessCost = legacyReadCost + legacyWriteCost;
}
file->setPropInt64(getDFUQResultFieldName(DFUQRFaccessCost), accessCost);

// Dymically calc and set the total cost field
file->setPropInt64(getDFUQResultFieldName(DFUQRFcost), atRestCost + accessCost);
}

IPropertyTree *deserializeFileAttr(MemoryBuffer &mb, StringArray& nodeGroupFilter)
Expand Down
44 changes: 39 additions & 5 deletions dali/base/dadfs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,15 +294,17 @@ enum DFUQResultField
DFUQRFminSkew = 30,
DFUQRFmaxSkewPart = 31,
DFUQRFminSkewPart = 32,
DFUQRFterm = 33,
DFUQRFreadCost = 33,
DFUQRFwriteCost = 34,
DFUQRFterm = 35, // must be last in list
DFUQRFreverse = 256,
DFUQRFnocase = 512,
DFUQRFnumeric = 1024,
DFUQRFfloat = 2048
};

extern da_decl const char* getDFUQFilterFieldName(DFUQFilterField feild);
extern da_decl const char* getDFUQResultFieldName(DFUQResultField feild);
extern da_decl const char* getDFUQFilterFieldName(DFUQFilterField field);
extern da_decl const char* getDFUQResultFieldName(DFUQResultField field);

/**
* File operations can be included in a transaction to ensure that multiple
Expand Down Expand Up @@ -861,7 +863,7 @@ extern da_decl GroupType translateGroupType(const char *groupType);

// Useful property query functions

inline bool isFileKey(IPropertyTree &pt) { const char *kind = pt.queryProp("@kind"); return kind&&strieq(kind,"key"); }
inline bool isFileKey(const IPropertyTree &pt) { const char *kind = pt.queryProp("@kind"); return kind&&strieq(kind,"key"); }
inline bool isFileKey(IDistributedFile *f) { return isFileKey(f->queryAttributes()); }
inline bool isFileKey(IFileDescriptor *f) { return isFileKey(f->queryProperties()); }

Expand All @@ -886,11 +888,43 @@ inline const char *queryFileKind(IFileDescriptor *f) { return queryFileKind(f->q
extern da_decl void ensureFileScope(const CDfsLogicalFileName &dlfn, unsigned timeoutms=INFINITE);

extern da_decl bool checkLogicalName(const char *lfn,IUserDescriptor *user,bool readreq,bool createreq,bool allowquery,const char *specialnotallowedmsg);
extern da_decl void calcFileCost(const char * cluster, double sizeGB, double fileAgeDays, __int64 numDiskWrites, __int64 numDiskReads, double & atRestCost, double & accessCost);

extern da_decl double calcFileAtRestCost(const char * cluster, double sizeGB, double fileAgeDays);
extern da_decl double calcFileAccessCost(const char * cluster, __int64 numDiskWrites, __int64 numDiskReads);
extern da_decl double calcFileAccessCost(IDistributedFile *f, __int64 numDiskWrites, __int64 numDiskReads);
constexpr bool defaultPrivilegedUser = true;
constexpr bool defaultNonPrivilegedUser = false;

extern da_decl void configurePreferredPlanes();
inline bool hasReadWriteCostFields(const IPropertyTree & fileAttr)
{
return fileAttr.hasProp(getDFUQResultFieldName(DFUQRFreadCost)) || fileAttr.hasProp(getDFUQResultFieldName(DFUQRFwriteCost));
}

template<typename Source>
inline cost_type getLegacyReadCost(const IPropertyTree & fileAttr, Source source)
{
// Legacy files do not have @readCost attribute, so calculate from numDiskRead
// NB: Costs of index reading can not be reliably estimated based on 'numDiskReads'
if (!hasReadWriteCostFields(fileAttr) && fileAttr.hasProp(getDFUQResultFieldName(DFUQRFnumDiskReads))
&& !isFileKey(fileAttr))
{
stat_type prevDiskReads = fileAttr.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskReads), 0);
return money2cost_type(calcFileAccessCost(source, 0, prevDiskReads));
}
else
return 0;
}
template<typename Source>
inline cost_type getLegacyWriteCost(const IPropertyTree & fileAttr, Source source)
{
// Legacy files do not have @writeCost attribute, so calculate from numDiskWrites
if (!hasReadWriteCostFields(fileAttr) && fileAttr.hasProp(getDFUQResultFieldName(DFUQRFnumDiskWrites)))
{
stat_type prevDiskWrites = fileAttr.getPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), 0);
return money2cost_type(calcFileAccessCost(source, prevDiskWrites, 0));
}
else
return 0;
}
#endif
2 changes: 2 additions & 0 deletions dali/dfuplus/dfuplus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,8 @@ int CDfuPlusHelper::copy()

if(globals->hasProp("expireDays"))
req->setExpireDays(globals->getPropInt("expireDays"));
if(globals->hasProp("ensure"))
req->setEnsure(globals->getPropBool("ensure"));

Owned<IClientCopyResponse> result = sprayclient->Copy(req);
const char* wuid = result->getResult();
Expand Down
1 change: 1 addition & 0 deletions dali/dfuplus/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ void handleSyntax()
out.append(" diffkeydst=<old-key-name> -- use keydiff/keypatch (dst old name)\n");
out.append(" multicopy=0|1 -- each destination part gets whole file\n");
out.append(" preservecompression=1|0 -- optional, default is 1 (preserve)\n");
out.append(" ensure=0|1 -- optional, only copy file parts if not copied\n");
out.append(" remove options:\n");
out.append(" name=<logical-name>\n");
out.append(" names=<multiple-logical-names-separated-by-comma>\n");
Expand Down
13 changes: 11 additions & 2 deletions dali/ft/filecopy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3592,7 +3592,10 @@ void FileSprayer::updateTargetProperties()

DistributedFilePropertyLock lock(distributedTarget);
IPropertyTree &curProps = lock.queryAttributes();
curProps.setPropInt64("@numDiskWrites", totalNumWrites);
cost_type writeCost = money2cost_type(calcFileAccessCost(distributedTarget, totalNumWrites, 0));
curProps.setPropInt64(getDFUQResultFieldName(DFUQRFwriteCost), writeCost);
curProps.setPropInt64(getDFUQResultFieldName(DFUQRFnumDiskWrites), totalNumWrites);

if (calcCRC())
curProps.setPropInt(FAcrc, totalCRC.get());
curProps.setPropInt64(FAsize, totalLength);
Expand Down Expand Up @@ -3771,7 +3774,13 @@ void FileSprayer::updateTargetProperties()
if (distributedSource)
{
if (distributedSource->querySuperFile()==nullptr)
distributedSource->addAttrValue("@numDiskReads", totalNumReads);
{
IPropertyTree & fileAttr = distributedSource->queryAttributes();
cost_type legacyReadCost = getLegacyReadCost(fileAttr, distributedSource);
cost_type curReadCost = money2cost_type(calcFileAccessCost(distributedSource, 0, totalNumReads));
distributedSource->addAttrValue(getDFUQResultFieldName(DFUQRFreadCost), legacyReadCost+curReadCost);
distributedSource->addAttrValue(getDFUQResultFieldName(DFUQRFnumDiskReads), totalNumReads);
}
}
if (error)
throw error.getClear();
Expand Down
Loading

0 comments on commit d1d5219

Please sign in to comment.