Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-478 #479

Merged
merged 1 commit into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ReleaseNotes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# DDS Release Notes

## v3.9 (2024-04-23)

- DDS general
- Modified: compress topology files before broadcasting to agents. Significantly improves performance for big topology activations. (GH-478)
- Modified: Improved performance of the Core transport when transferring binary attachments. (GH-478)

## v3.8 (2024-01-19)

- DDS general
Expand Down
2 changes: 1 addition & 1 deletion dds-agent-cmd/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ int main(int argc, char* argv[])
LOG(log_stdout) << "Files will be downloaded to \"~/.DDS/sessions/" << sid << "/log/agents\"";
break;
default:
LOG(log_stderr) << "Uknown command.";
LOG(log_stderr) << "Unknown command.";
return EXIT_FAILURE;
}

Expand Down
42 changes: 27 additions & 15 deletions dds-agent/src/CommanderChannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ CCommanderChannel::CCommanderChannel(boost::asio::io_context& _service,
});

// Check free disk space (GH-392)
// Report avaliable disk space at start
// Report available disk space at start
uintmax_t nAvailable{ 0 };
isLowDiskSpace(&nAvailable);
LOG(info) << "Avaliable disk space: " << dds::misc::HumanReadable{ nAvailable };
LOG(info) << "Available disk space: " << dds::misc::HumanReadable{ nAvailable };
// create async timer
m_resourceMonitorTimer = make_unique<timer_t>(_service);
startResourceMonitor(_service, chrono::seconds(30));
Expand Down Expand Up @@ -166,7 +166,7 @@ bool CCommanderChannel::on_cmdREPLY(SCommandAttachmentImpl<cmdREPLY>::ptr_t _att
{
if (_attachment->m_statusCode == (uint16_t)SReplyCmd::EStatusCode::OK)
{
LOG(info) << "SM: Handshake successfull. PHID: " << this->m_protocolHeaderID;
LOG(info) << "SM: Handshake successful. PHID: " << this->m_protocolHeaderID;
return true;
}
else if (_attachment->m_statusCode == (uint16_t)SReplyCmd::EStatusCode::ERROR)
Expand Down Expand Up @@ -322,6 +322,18 @@ bool CCommanderChannel::on_cmdBINARY_ATTACHMENT_RECEIVED(
fs::rename(_attachment->m_receivedFilePath, destFilePath);
LOG(info) << "Received new topology file: " << destFilePath.generic_string();

// Decompressing the topology file
if (destFilePath.extension() == ".gz")
{
const fs::path gzipPath{ bp::search_path("gzip") };
stringstream ssCmd;
ssCmd << gzipPath.string() << " -d " << destFilePath;
string output;
execute(ssCmd.str(), chrono::seconds(60), &output);
// remove ".gz" extension
destFilePath = destFilePath.stem();
}

// Activating new topology
CTopoCore::Ptr_t topo{ make_shared<CTopoCore>() };
// Topology already validated on the commander, no need to validate it again
Expand Down Expand Up @@ -518,12 +530,12 @@ bool CCommanderChannel::on_cmdASSIGN_USER_TASK(SCommandAttachmentImpl<cmdASSIGN_
if (slot->m_collectionIndex != numeric_limits<uint32_t>::max())
ba::replace_all(slot->m_sUsrExe, "%collectionIndex%", to_string(slot->m_collectionIndex));

// If the user task was transfered, than replace "%DDS_DEFAULT_TASK_PATH%" with the real path
// If the user task was transferred, than replace "%DDS_DEFAULT_TASK_PATH%" with the real path
fs::path dir(CUserDefaults::instance().getSlotsRootDir());
dir /= to_string(_sender.m_ID);
dir += fs::path::preferred_separator;
ba::replace_all(slot->m_sUsrExe, "%DDS_DEFAULT_TASK_PATH%", dir.generic_string());
// If the user custom environment was transfered, than replace "%DDS_DEFAULT_TASK_PATH%" with the real path
// If the user custom environment was transferred, than replace "%DDS_DEFAULT_TASK_PATH%" with the real path
ba::replace_all(slot->m_sUsrEnv, "%DDS_DEFAULT_TASK_PATH%", dir.generic_string());

// Revoke drain of the write queue to start accept messages
Expand All @@ -550,7 +562,7 @@ bool CCommanderChannel::on_cmdASSIGN_USER_TASK(SCommandAttachmentImpl<cmdASSIGN_
pathAsset = dir;
pathAsset /= assetFileName.str();

// If local asset exists, we will overwrite it. No skiping.
// If local asset exists, we will overwrite it. No skipping.

slot->m_taskAssets.push_back(pathAsset);
break;
Expand Down Expand Up @@ -753,7 +765,7 @@ bool CCommanderChannel::on_cmdSTOP_USER_TASK(SCommandAttachmentImpl<cmdSTOP_USER
slot->m_pid,
[this, id]()
{
// Once child termionation is finished, send User task "Done" to the commander
// Once child termination is finished, send User task "Done" to the commander
pushMsg<cmdREPLY>(
SReplyCmd("Done", (uint16_t)SReplyCmd::EStatusCode::OK, 0, cmdSTOP_USER_TASK), id);
});
Expand Down Expand Up @@ -895,7 +907,7 @@ void CCommanderChannel::onNewUserTask(uint64_t _slotID, pid_t _pid)
if (WIFEXITED(status))
{
// NOTE: We are using a bash wrapper script for user tasks.
// According to bash, the exist status of child processes can be interpreted in the folloiwing
// According to bash, the exist status of child processes can be interpreted in the following
// way:
// - For the shell’s purposes, a command which exits with a zero exit status has succeeded.
// - A non-zero exit status indicates failure.
Expand Down Expand Up @@ -1001,8 +1013,8 @@ void CCommanderChannel::terminateChildrenProcesses(
enumChildProcesses(mainPid, vecChildren);

// the mainPid is never included to the list
// In case of the agent the reseaon is obviouse.
// In case of a task, since it is running via the DDS task wrapper it will exit autoamticlaly once children are out
// In case of the agent the reason is obvious.
// In case of a task, since it is running via the DDS task wrapper it will exit automatically once children are out
string sChildren;
pidContainer_t pidChildren;
for (const auto& i : vecChildren)
Expand Down Expand Up @@ -1088,7 +1100,7 @@ void CCommanderChannel::terminateChildrenProcesses(
else
{
// kill all child process of tasks if there are any
// We do it before terminating tasks to give parenrt task processes a change to read state of children -
// We do it before terminating tasks to give the parent task processes a chance to read state of children -
// otherwise we will get zombies if user tasks don't manage their children properly
LOG(info) << "Timeout is reached. Sending unconditional kill signal to all existing child processes...";
for (auto const& pid : _children)
Expand Down Expand Up @@ -1142,7 +1154,7 @@ void CCommanderChannel::taskExited(uint64_t _slotID, int _exitCode)
slot->m_pid = 0;
slot->m_taskID = 0;

// Drainning the Intercom write queue
// Draining the Intercom write queue
m_intercomChannel->drainWriteQueue(true, _slotID);

// Notify DDS commander
Expand All @@ -1151,7 +1163,7 @@ void CCommanderChannel::taskExited(uint64_t _slotID, int _exitCode)
catch (exception& _e)
{
LOG(fatal) << "Failed to remove user task on slot " << _slotID << " from the list of children: " << _e.what();
LOG(error) << "Can't send TASK_DONE. The coresponding slot is missing";
LOG(error) << "Can't send TASK_DONE. The corrsponding slot is missing";
}
}

Expand All @@ -1176,7 +1188,7 @@ void CCommanderChannel::stopChannel()
terminateChildrenProcesses(0, [&waitCondition]() { waitCondition.notifyAll(); });

// wait for termination to finish
// either it is finsihed or we procceed in 30 sec in anyway
// either it is finished or we proceed in 30 sec in anyway
waitCondition.waitUntil(std::chrono::system_clock::now() + std::chrono::seconds(30));

if (m_intercomChannel)
Expand Down Expand Up @@ -1350,7 +1362,7 @@ bool CCommanderChannel::isLowDiskSpace(uintmax_t* _available)
}
catch (exception& _e)
{
LOG(error) << "Failed getting avaliable disk space: " << _e.what();
LOG(error) << "Failed to get available disk space: " << _e.what();
}
return false;
}
2 changes: 1 addition & 1 deletion dds-agent/src/CommanderChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ namespace dds
/// The function first sends a graceful SIGTERM to all children. After a defined timeout (5 sec) an
/// unconditional SIGKILL is sent.
/// \param[in] _parentPid The pid of the parent process, which children needs to
/// \param[in] _onCompleteSlot is a callback fucntion. It's called when termination of all child process is
/// \param[in] _onCompleteSlot is a callback function. It's called when termination of all child process is
/// finished.
void terminateChildrenProcesses(pid_t _parentPid, const terminateChildrenOnComplete_t& _onCompleteSlot);
void terminateChildrenProcesses(timerPtr_t& _timer,
Expand Down
4 changes: 2 additions & 2 deletions dds-commander/src/AgentChannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ bool CAgentChannel::on_cmdREPLY_HOST_INFO(SCommandAttachmentImpl<cmdREPLY_HOST_I
<< "] has successfully connected. Startup time: " << m_info.m_startUpTime.count() << " ms.";

// Request agent to add Task Slots
// We get the number of slots from the agent. On submit each agent is assiugned to a fixed number of slots. Then
// when agent is up, we requast tyhe agent to actully active each slot.
// We get the number of slots from the agent. On submit each agent is assigned to a fixed number of slots. Then
// when agent is up, we request the agent to actually active each slot.
LOG(info) << "Requesting " << _attachment->m_slots << " task slots from " << m_info.m_id;
for (size_t i = 0; i < _attachment->m_slots; ++i)
{
Expand Down
68 changes: 53 additions & 15 deletions dds-commander/src/ConnectionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ void CConnectionManager::broadcastUpdateTopologyAndWait(weakChannelInfo_t::conta
dds::tools_api::SProgressResponseData progress(_cmd, 0, m_updateTopology.m_nofRequests, 0);
sendCustomCommandResponse(_channel, progress.toJSON());

// Broadcast message or binary to agents
// Broadcast a message or a binary to agents
size_t index = 0;
for (auto& agent : _agents)
{
Expand Down Expand Up @@ -713,10 +713,10 @@ void CConnectionManager::on_cmdUSER_TASK_DONE(const SSenderInfo& _sender,
}

// MARK: ToolsAPI - onTaskDone
// send task done ToolsAPI event to registred channels. A channel, whcih is expired of filed should be removed from
// send task done ToolsAPI event to registered channels. A channel, which is expired of filed should be removed from
// the list.
lock_guard<mutex> lock(m_mtxOnTaskDoneSubscribers);
// The loop always recalclates the end() iterator since we might delete expired elelemts from the list
// The loop always recalculates the end() iterator since we might delete expired elemets from the list
for (auto iter = m_onTaskDoneSubscribers.begin(); iter != m_onTaskDoneSubscribers.end(); ++iter)
{
if (auto ch = iter->first.lock())
Expand All @@ -726,7 +726,7 @@ void CConnectionManager::on_cmdUSER_TASK_DONE(const SSenderInfo& _sender,
response.m_taskID = _attachment->m_taskID;
response.m_exitCode = (WIFEXITED(_attachment->m_exitCode) ? WEXITSTATUS(_attachment->m_exitCode) : 0);
// NOTE: We are using a bash wrapper script for user tasks.
// According to bash, the exist status of child processes can be interpreted in the folloiwing way:
// According to bash, the exist status of child processes can be interpreted in the following way:
// - For the shell’s purposes, a command which exits with a zero exit status has succeeded.
// - A non-zero exit status indicates failure.
// This seemingly counter-intuitive scheme is used so there is one well-defined way to indicate success
Expand All @@ -744,7 +744,7 @@ void CConnectionManager::on_cmdUSER_TASK_DONE(const SSenderInfo& _sender,
}
else
{
// channel is expiored - removing it from the list
// channel is expired - removing it from the list
m_onTaskDoneSubscribers.erase(iter);
}
}
Expand All @@ -754,7 +754,7 @@ void CConnectionManager::on_cmdGET_PROP_LIST(const SSenderInfo& /*_sender*/,
SCommandAttachmentImpl<cmdGET_PROP_LIST>::ptr_t /*_attachment*/,
CAgentChannel::weakConnectionPtr_t /*_channel*/)
{
// FIXME: This command desn't work without CKeyValueManager
// FIXME: This command doesn't work without CKeyValueManager
}

void CConnectionManager::on_cmdGET_PROP_VALUES(const SSenderInfo& /*_sender*/,
Expand Down Expand Up @@ -823,7 +823,7 @@ void CConnectionManager::on_cmdCUSTOM_CMD(const SSenderInfo& _sender,
else
{
LOG(error) << "Failed to deliver. Channel is missing. CUSTOM_CMD senderID: " << _sender.m_ID
<< "; attachemnt: " << *_attachment;
<< "; attachment: " << *_attachment;
}
}
catch (boost::bad_lexical_cast&)
Expand Down Expand Up @@ -923,7 +923,7 @@ void CConnectionManager::on_cmdCUSTOM_CMD(const SSenderInfo& _sender,
ptr->pushMsg<cmdCUSTOM_CMD>(*_attachment, v.m_protocolHeaderID);
}

// Debug msg: there are too many of such messages if tasks intensivly use CC
// Debug msg: there are too many of such messages if tasks intensively use CC
/* stringstream ss;
ss << "Send custom command to " << channels.size() << " channels." << endl;

Expand Down Expand Up @@ -988,7 +988,7 @@ void CConnectionManager::processToolsAPIRequests(const SCustomCmdCmd& _cmd, CAge
else if (tag == "onTaskDone")
{
SOnTaskDoneRequestData info(data);
// add the given channel (_channel) to the list, which will be allerted whenever a task is exited
// add the given channel (_channel) to the list, which will be alerted whenever a task is exited
lock_guard<mutex> lock(m_mtxOnTaskDoneSubscribers);
m_onTaskDoneSubscribers.push_back({ _channel, info });
}
Expand Down Expand Up @@ -1036,7 +1036,7 @@ void CConnectionManager::submitAgents(const dds::tools_api::SSubmitRequestData&
m_SubmitAgents.m_requestID = _submitInfo.m_requestID;
m_SubmitAgents.zeroCounters();

// Generating a submissin ID
// Generating a submission ID
const boost::uuids::uuid submissionID{ boost::uuids::random_generator()() };
const string sSubmissionID{ boost::lexical_cast<std::string>(submissionID) };
LOG(info) << "Initializing an agent submit request with submissionID = " << sSubmissionID;
Expand All @@ -1051,7 +1051,7 @@ void CConnectionManager::submitAgents(const dds::tools_api::SSubmitRequestData&

// Create / re-pack WN package
// Include inline script, if present.
// For ssh plug-in inline script has a higher priorety, than the sxcript provided via the submit command
// For ssh plug-in inline script has a higher priority, than the script provided via the submit command
// (--env-config). Only the ssh plug-in supports it.
bool bNeedCustomEnv{ false };
string scriptFileName(pathWrkPackageDir.string());
Expand Down Expand Up @@ -1089,7 +1089,7 @@ void CConnectionManager::submitAgents(const dds::tools_api::SSubmitRequestData&
// pack worker package
sendToolsAPIMsg(_channel, _submitInfo.m_requestID, "Creating new worker package...", EMsgSeverity::info);

// Use a lightweightpackage when possible
// Use a lightweight package when possible
_createWnPkg(bNeedCustomEnv,
(_submitInfo.m_rms == "localhost"),
_submitInfo.m_slots,
Expand Down Expand Up @@ -1130,7 +1130,7 @@ void CConnectionManager::submitAgents(const dds::tools_api::SSubmitRequestData&
try
{
// Execute RMS plug-in and don't wait for it.
// Omnce plug-in is up it will connect back to the commander.
// Once plug-in is up it will connect back to the commander.
// We will report to user if it won't connect.
execute(ssCmd.str());
}
Expand Down Expand Up @@ -1258,7 +1258,7 @@ void CConnectionManager::updateTopology(const dds::tools_api::STopologyRequestDa
//
if (removedTasks.size() > 0)
{
LOG(info) << "Stoppoing removed tasks";
LOG(info) << "Stopping removed tasks";
weakChannelInfo_t::container_t agents;
// FIXME: Needs to be reviewed for the current architecture
// {
Expand Down Expand Up @@ -1338,8 +1338,46 @@ void CConnectionManager::updateTopology(const dds::tools_api::STopologyRequestDa
if (allAgents.size() == 0)
throw runtime_error("There are no active agents.");

string topFileDestName{ "topology.xml" };
fs::path copyTopoFile;
try
{
// compressing the topology.
// In some production cases the orig. file was more than 20MB. Sending such size to hundreds of agents
// might be resource consuming. In case of ALICE prod it was 300 agents (175K task slots) and 20 MB topo
// file.
LOG(info) << "Topology file uncompressed size: "
<< dds::misc::HumanReadable{ fs::file_size(topologyFile) } << " Compressing topology file...";
// make a copy of the orig. file
// The copy is located in the commander's working directory
const fs::path wrkDir(user_defaults_api::CUserDefaults::instance().getWrkDir());
copyTopoFile = wrkDir;
copyTopoFile /= "topology_agent_copy.xml";
fs::copy_file(topologyFile, copyTopoFile);
// compressing
const fs::path gzipPath{ bp::search_path("gzip") };
stringstream ssCmd;
ssCmd << gzipPath.string() << " -9 " << copyTopoFile;
string output;
execute(ssCmd.str(), chrono::seconds(60), &output);
copyTopoFile += ".gz";
topologyFile = copyTopoFile.string();
LOG(info) << "Topology file compressed size: "
<< dds::misc::HumanReadable{ fs::file_size(topologyFile) };
topFileDestName += ".gz";
}
catch (exception& e)
{
LOG(error) << "Failed to compress topology file. " << e.what();
LOG(info) << "Sending uncompressed topology...";
}

LOG(info) << "Broadcasting topology update with a file: " << topologyFile;
broadcastUpdateTopologyAndWait<cmdUPDATE_TOPOLOGY>(
allAgents, _channel, "Updating topology for agents...", topologyFile, "topology.xml");
allAgents, _channel, "Updating topology for agents...", topologyFile, topFileDestName);

if (!copyTopoFile.empty())
fs::remove(copyTopoFile);
}

//
Expand Down
2 changes: 1 addition & 1 deletion dds-commander/src/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void CScheduler::makeScheduleImpl(CTopoCore& _topology,
m_schedule.clear();

size_t nofChannels{ _channels.size() };
// Map pair<host name, worker id> to vector of channel indeces.
// Map pair<host name, worker id> to vector of channel indexes.
// This is needed in order to reduce number of regex matches and speed up scheduling.
hostToChannelMap_t hostToChannelMap;
for (size_t iChannel = 0; iChannel < nofChannels; ++iChannel)
Expand Down
Loading