Skip to content

Commit

Permalink
Merge commit 'refs/pull/17663/head' of github.com:hpcc-systems/HPCC-P…
Browse files Browse the repository at this point in the history
…latform
  • Loading branch information
AttilaVamos committed Aug 10, 2023
2 parents 5314d6e + 7a286bc commit defa04b
Show file tree
Hide file tree
Showing 20 changed files with 698 additions and 100 deletions.
36 changes: 27 additions & 9 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8500,7 +8500,9 @@ void CLocalWorkUnit::copyWorkUnit(IConstWorkUnit *cached, bool copyStats, bool a
bool CLocalWorkUnit::hasDebugValue(const char *propname) const
{
StringBuffer lower;
lower.append(propname).toLowerCase();
lower.append(propname);
if (!strchr(lower, ':'))
lower.toLowerCase();
CriticalBlock block(crit);
StringBuffer prop("Debug/");
return p->hasProp(prop.append(lower));
Expand All @@ -8509,7 +8511,9 @@ bool CLocalWorkUnit::hasDebugValue(const char *propname) const
IStringVal& CLocalWorkUnit::getDebugValue(const char *propname, IStringVal &str) const
{
StringBuffer lower;
lower.append(propname).toLowerCase();
lower.append(propname);
if (!strchr(lower, ':'))
lower.toLowerCase();
CriticalBlock block(crit);
StringBuffer prop("Debug/");
str.set(p->queryProp(prop.append(lower).str()));
Expand All @@ -8528,7 +8532,9 @@ IStringIterator& CLocalWorkUnit::getDebugValues(const char *prop) const
if (prop)
{
StringBuffer lower;
lower.append(prop).toLowerCase();
lower.append(prop);
if (!strchr(lower, ':'))
lower.toLowerCase();
path.append(lower);
}
else
Expand All @@ -8539,7 +8545,9 @@ IStringIterator& CLocalWorkUnit::getDebugValues(const char *prop) const
int CLocalWorkUnit::getDebugValueInt(const char *propname, int defVal) const
{
StringBuffer lower;
lower.append(propname).toLowerCase();
lower.append(propname);
if (!strchr(lower, ':'))
lower.toLowerCase();
CriticalBlock block(crit);
StringBuffer prop("Debug/");
prop.append(lower);
Expand All @@ -8549,7 +8557,9 @@ int CLocalWorkUnit::getDebugValueInt(const char *propname, int defVal) const
__int64 CLocalWorkUnit::getDebugValueInt64(const char *propname, __int64 defVal) const
{
StringBuffer lower;
lower.append(propname).toLowerCase();
lower.append(propname);
if (!strchr(lower, ':'))
lower.toLowerCase();
CriticalBlock block(crit);
StringBuffer prop("Debug/");
prop.append(lower);
Expand All @@ -8559,7 +8569,9 @@ __int64 CLocalWorkUnit::getDebugValueInt64(const char *propname, __int64 defVal)
double CLocalWorkUnit::getDebugValueReal(const char *propname, double defVal) const
{
StringBuffer lower;
lower.append(propname).toLowerCase();
lower.append(propname);
if (!strchr(lower, ':'))
lower.toLowerCase();
CriticalBlock block(crit);
StringBuffer prop("Debug/");
prop.append(lower);
Expand All @@ -8569,7 +8581,9 @@ double CLocalWorkUnit::getDebugValueReal(const char *propname, double defVal) co
bool CLocalWorkUnit::getDebugValueBool(const char * propname, bool defVal) const
{
StringBuffer lower;
lower.append(propname).toLowerCase();
lower.append(propname);
if (!strchr(lower, ':'))
lower.toLowerCase();
CriticalBlock block(crit);
StringBuffer prop("Debug/");
prop.append(lower);
Expand Down Expand Up @@ -8642,7 +8656,9 @@ void CLocalWorkUnit::addProcess(const char *type, const char *instance, unsigned
void CLocalWorkUnit::setDebugValue(const char *propname, const char *value, bool overwrite)
{
StringBuffer lower;
lower.append(propname).toLowerCase();
lower.append(propname);
if (!strchr(lower, ':'))
lower.toLowerCase();
CriticalBlock block(crit);
StringBuffer prop("Debug/");
prop.append(lower);
Expand All @@ -8657,7 +8673,9 @@ void CLocalWorkUnit::setDebugValue(const char *propname, const char *value, bool
void CLocalWorkUnit::setDebugValueInt(const char *propname, int value, bool overwrite)
{
StringBuffer lower;
lower.append(propname).toLowerCase();
lower.append(propname);
if (!strchr(lower, ':'))
lower.toLowerCase();
CriticalBlock block(crit);
StringBuffer prop("Debug/");
prop.append(lower);
Expand Down
7 changes: 3 additions & 4 deletions configuration/configmgr/configmgrlib/EnvironmentNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,17 @@ void EnvironmentNode::addChild(std::shared_ptr<EnvironmentNode> pNode)

bool EnvironmentNode::removeChild(const std::shared_ptr<EnvironmentNode> pNode, std::vector<std::string> &removedNodeIds)
{
bool removed = false;
for (auto it=m_children.begin(); it!= m_children.end() && !removed; ++it)
for (auto it=m_children.begin(); it!= m_children.end(); ++it)
{
if (pNode == it->second)
{
pNode->removeAllChildren(removedNodeIds);
removedNodeIds.emplace_back(pNode->getId());
m_children.erase(it);
removed = true;
return true;
}
}
return removed;
return false;
}


Expand Down
16 changes: 16 additions & 0 deletions devdoc/roxie.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,3 +281,19 @@ Should the scope of the blacklist be different? Possible scopes are:

Options 2 and 4 above would allow all aspects of the blacklisting behaviour to be specified by options on the SOAPCALL. We could control whether or not the
blacklister is to be used at all via a SOAPCALL option with any of the above...

Some notes on LocalAgent mode
=============================

In localAgent mode, the global queueManager object (normally a RoxieUdpSocketQueueManager) is replaced by a RoxieLocalQueueManager. Outbound packets are added directly to target queue, inbound are packed into memorybuffers (rather than DataBuffers) which may be a bad idea.

There is also "local optimizations" mode where any index operation reading a one-part file (does the same apply to one-part disk files?) just reads it directly on the server (regardless of localAgent setting). Typically still injected into receiver code though as otherwise handling exception cases, limits etc would all be duplicated/messy. Rows created in localOptimization mode are created directly in the caller's row manager, and are injected in serialized format.

Why are inbound not created directly in the desired destination's allocator and then marked as serialized? Some lifespan issues... are they insurmountable?
Alternatively, should we pack into dataBuffers rather than MemoryBuffers, avoiding a need to copy the data before the receiver can use it? Downside is that large rows get split (but we could set dataBufferSize to be bigger in localAgent mode...)

What is the lifespan issue? In-flight queries may be abandoned when a server-side query fails, times out, or no longer needs the data. Using DataBuffer does not have this issue as they are attached to the query's memory manager/allocation once read. Or we could bypass the agent queue altogether, but rather more refactoring needed for that (might almost be easier to extent the "local optimization" mode to use multiple threads at that point)

abortPending, replyPending, and abortPendingData methods are unimplemented, which may lead to some inefficiencies?

LocalMessagePacker currently derives from DummyMessagePacker, but they may be used for different things... DummyMessagePacker is used in parallel aggregate activities to intercept the output from each parallel processor so that they can be combined and resent. These don't really have the lifespan issues, and might be better off using unserialized rows in the parent activity's RowManager. Perhaps the first step is to make LocalMessagePaker independent of DummyMessagePacker by copying the functions into the former.
24 changes: 23 additions & 1 deletion esp/scm/ws_topology.ecm
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,28 @@ ESPresponse [nil_remove, exceptions_inline] TpDropZoneQueryResponse
ESParray<ESPstruct TpDropZone> TpDropZones;
};

ESPservice [auth_feature("DEFERRED"), noforms, version("1.31"), cache_group("ESPWsTP"), exceptions_inline("./smc_xslt/exceptions.xslt")] WsTopology
ESPrequest TpListLogFilesRequest
{
string NetworkAddress;
string Path;
};

ESPStruct LogFileStruct
{
string Name;
string Path;
string Host;
bool IsDir;
int64 FileSize;
string Modifiedtime;
};

ESPresponse [exceptions_inline] TpListLogFilesResponse
{
ESParray<ESPStruct LogFileStruct> Files;
};

ESPservice [auth_feature("DEFERRED"), noforms, version("1.32"), cache_group("ESPWsTP"), exceptions_inline("./smc_xslt/exceptions.xslt")] WsTopology
{
ESPmethod [cache_seconds(180), cache_global(1), resp_xsl_default("/esp/xslt/targetclusters.xslt")] TpTargetClusterQuery(TpTargetClusterQueryRequest, TpTargetClusterQueryResponse);
ESPmethod [cache_seconds(180), cache_global(1), resp_xsl_default("/esp/xslt/topology.xslt")] TpClusterQuery(TpClusterQueryRequest, TpClusterQueryResponse);
Expand All @@ -655,6 +676,7 @@ ESPservice [auth_feature("DEFERRED"), noforms, version("1.31"), cache_group("ESP
ESPmethod [cache_seconds(180), cache_global(1)] TpGetServicePlugins(TpGetServicePluginsRequest, TpGetServicePluginsResponse);
ESPmethod [cache_seconds(180), cache_global(1)] TpListTargetClusters(TpListTargetClustersRequest, TpListTargetClustersResponse);
ESPmethod [cache_seconds(180), cache_global(1), min_ver(1.25)] TpMachineInfo(TpMachineInfoRequest, TpMachineInfoResponse);
ESPmethod [cache_seconds(180), cache_global(1), min_ver(1.32)] TpListLogFiles(TpListLogFilesRequest, TpListLogFilesResponse);

ESPmethod SystemLog(SystemLogRequest, SystemLogResponse);
};
Expand Down
14 changes: 14 additions & 0 deletions esp/services/ws_topology/ws_topologyService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1978,3 +1978,17 @@ bool CWsTopologyEx::onTpDropZoneQuery(IEspContext &context, IEspTpDropZoneQueryR
}
return false;
}

bool CWsTopologyEx::onTpListLogFiles(IEspContext &context, IEspTpListLogFilesRequest &req, IEspTpListLogFilesResponse &resp)
{
try
{
context.ensureFeatureAccess(FEATURE_URL, SecAccess_Read, ECLWATCH_TOPOLOGY_ACCESS_DENIED, "WsTopology::onTpListLogFile: Permission denied.");
m_TpWrapper.listLogFiles(req.getNetworkAddress(), req.getPath(), resp.getFiles());
}
catch(IException* e)
{
FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
}
return false;
}
2 changes: 2 additions & 0 deletions esp/services/ws_topology/ws_topologyService.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ class CWsTopologyEx : public CWsTopology

bool onTpSwapNode(IEspContext &context,IEspTpSwapNodeRequest &req, IEspTpSwapNodeResponse &resp);

bool onTpListLogFiles(IEspContext &context, IEspTpListLogFilesRequest &req, IEspTpListLogFilesResponse &resp);

bool onTpXMLFile(IEspContext &context,IEspTpXMLFileRequest &req, IEspTpXMLFileResponse &resp);

bool onTpLogFile(IEspContext &context,IEspTpLogFileRequest &req, IEspTpLogFileResponse &resp);
Expand Down
5 changes: 5 additions & 0 deletions esp/smc/SMCLib/TpContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ void CTpWrapper::getMachineList(double clientVersion, const char* MachineType, c
IWARNLOG("UNIMPLEMENTED: CONTAINERIZED(CTpWrapper::getMachineList)");
}

void CTpWrapper::listLogFiles(const char* host, const char* path, IArrayOf<IConstLogFileStruct>& files)
{
IWARNLOG("UNIMPLEMENTED: CONTAINERIZED(CTpWrapper::listLogFiles)");
}

const char* CTpWrapper::getNodeNameTag(const char* MachineType)
{
if (strcmp(MachineType,"Computer")==0)
Expand Down
48 changes: 48 additions & 0 deletions esp/smc/SMCLib/TpWrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2193,3 +2193,51 @@ StringArray & getRoxieDirectAccessPlanes(StringArray & planes, StringBuffer &def
planes.append(defaultPlane);
return planes;
}

void CTpWrapper::listLogFiles(const char * host, const char * path, IArrayOf<IConstLogFileStruct> & files)
{
if (isEmptyString(host))
throw makeStringException(ECLWATCH_INVALID_INPUT, "Network Address must be specified.");
Owned<IEnvironmentFactory> factory = getEnvironmentFactory(true);
Owned<IConstEnvironment> env = factory->openEnvironment();
Owned<IConstMachineInfo> machine = env->getMachineByAddress(host);
if (!machine)
throw makeStringExceptionV(ECLWATCH_INVALID_INPUT, "Invalid Network Address %s", host);

if (isEmptyString(path))
throw makeStringException(ECLWATCH_INVALID_INPUT, "Path must be specified.");
if (containsRelPaths(path)) //Detect a path like: /var/log/HPCCSystems/myesp/../../../
throw makeStringExceptionV(ECLWATCH_INVALID_INPUT, "Invalid path %s", path);
if (!validateConfigurationDirectory(nullptr, "log", nullptr, nullptr, path))
throw makeStringExceptionV(ECLWATCH_INVALID_INPUT, "Invalid path %s", path);

RemoteFilename rfn;
SocketEndpoint ep(host);
rfn.setPath(ep, path);
Owned<IFile> f = createIFile(rfn);
if (f->isDirectory() != fileBool::foundYes)
throw makeStringExceptionV(ECLWATCH_INVALID_DIRECTORY, "%s is not a directory.", path);

Owned<IDirectoryIterator> di = f->directoryFiles("*.log", false, true);
ForEach(*di)
{
StringBuffer fileName;
di->getName(fileName);

Owned<IEspLogFileStruct> lfs = createLogFileStruct();
lfs->setName(fileName);
lfs->setPath(path);
lfs->setHost(host);
lfs->setIsDir(di->isDir());
lfs->setFileSize(di->getFileSize());

StringBuffer s;
CDateTime modtime;
di->getModifiedTime(modtime);
modtime.getString(s);
s.setCharAt(10, ' ');
lfs->setModifiedtime(s);

files.append(*lfs.getLink());
}
}
1 change: 1 addition & 0 deletions esp/smc/SMCLib/TpWrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ class TPWRAPPER_API CTpWrapper : public CInterface
void getTargetClusterList(IArrayOf<IEspTpLogicalCluster>& clusters, const char* clusterType = NULL, const char* clusterName = NULL);
void queryTargetClusterProcess(double version, const char* processName, const char* clusterType, IArrayOf<IConstTpCluster>& list);
void getServices(double version, const char* serviceType, const char* serviceName, IArrayOf<IConstHPCCService>& list);
void listLogFiles(const char* host, const char* path, IArrayOf<IConstLogFileStruct>& files);

};

Expand Down
5 changes: 5 additions & 0 deletions helm/hpcc/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1585,6 +1585,11 @@
"default": 0,
"description": "Specify an IONICE value for the background copy thread, if backgroundCopyClass set to best-effort."
},
"blockedLocalAgent": {
"type": "boolean",
"default": true,
"description": "Used DataBuffer blocks to return agent data in localAgent mode."
},
"callbackRetries": {
"type": "integer",
"default": 3,
Expand Down
2 changes: 2 additions & 0 deletions roxie/ccd/ccd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ extern IPropertyTree *topology;
extern MapStringTo<int> *preferredClusters;
extern StringArray allQuerySetNames;

extern bool blockedLocalAgent;
extern bool acknowledgeAllRequests;
extern bool alwaysTrustFormatCrcs;
extern bool allFilesDynamic;
Expand Down Expand Up @@ -402,6 +403,7 @@ extern bool defaultExecuteDependenciesSequentially;
extern bool defaultStartInputsSequentially;
extern bool oneShotRoxie;
extern bool traceStrands;
extern unsigned minPayloadSize;

extern int backgroundCopyClass;
extern int backgroundCopyPrio;
Expand Down
3 changes: 1 addition & 2 deletions roxie/ccd/ccddebug.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1219,10 +1219,9 @@ class CProxyDebugContext : public CInterface
if (header->activityId == ROXIE_EXCEPTION)
throwRemoteException(mu);
assertex(header->activityId == ROXIE_DEBUGREQUEST);
RecordLengthType *rowlen = (RecordLengthType *) mu->getNext(sizeof(RecordLengthType));
RecordLengthType *rowlen = (RecordLengthType *) mu->getNextLength();
assertex(rowlen);
RecordLengthType len = *rowlen;
ReleaseRoxieRow(rowlen);
const char * reply = (const char *) mu->getNext(len);
if (output)
{
Expand Down
Loading

0 comments on commit defa04b

Please sign in to comment.