diff --git a/ReleaseNotes.md b/ReleaseNotes.md index e0f1e38d..44659138 100644 --- a/ReleaseNotes.md +++ b/ReleaseNotes.md @@ -1,5 +1,11 @@ # DDS Release Notes +## v3.9 (2024-04-23) + +- DDS general + - Modified: compress topology files before broadcasting to agents. Significantly improves performance for big topology activations. (GH-478) + - Modified: Improved performance of the Core transport when transferring binary attachments. (GH-478) + ## v3.8 (2024-01-19) - DDS general diff --git a/dds-agent-cmd/src/main.cpp b/dds-agent-cmd/src/main.cpp index 9fb1845b..f8aa77d6 100644 --- a/dds-agent-cmd/src/main.cpp +++ b/dds-agent-cmd/src/main.cpp @@ -51,7 +51,7 @@ int main(int argc, char* argv[]) LOG(log_stdout) << "Files will be downloaded to \"~/.DDS/sessions/" << sid << "/log/agents\""; break; default: - LOG(log_stderr) << "Uknown command."; + LOG(log_stderr) << "Unknown command."; return EXIT_FAILURE; } diff --git a/dds-agent/src/CommanderChannel.cpp b/dds-agent/src/CommanderChannel.cpp index f38df626..8c758d59 100644 --- a/dds-agent/src/CommanderChannel.cpp +++ b/dds-agent/src/CommanderChannel.cpp @@ -92,10 +92,10 @@ CCommanderChannel::CCommanderChannel(boost::asio::io_context& _service, }); // Check free disk space (GH-392) - // Report avaliable disk space at start + // Report available disk space at start uintmax_t nAvailable{ 0 }; isLowDiskSpace(&nAvailable); - LOG(info) << "Avaliable disk space: " << dds::misc::HumanReadable{ nAvailable }; + LOG(info) << "Available disk space: " << dds::misc::HumanReadable{ nAvailable }; // create async timer m_resourceMonitorTimer = make_unique(_service); startResourceMonitor(_service, chrono::seconds(30)); @@ -166,7 +166,7 @@ bool CCommanderChannel::on_cmdREPLY(SCommandAttachmentImpl::ptr_t _att { if (_attachment->m_statusCode == (uint16_t)SReplyCmd::EStatusCode::OK) { - LOG(info) << "SM: Handshake successfull. PHID: " << this->m_protocolHeaderID; + LOG(info) << "SM: Handshake successful. PHID: " << this->m_protocolHeaderID; return true; } else if (_attachment->m_statusCode == (uint16_t)SReplyCmd::EStatusCode::ERROR) @@ -322,6 +322,18 @@ bool CCommanderChannel::on_cmdBINARY_ATTACHMENT_RECEIVED( fs::rename(_attachment->m_receivedFilePath, destFilePath); LOG(info) << "Received new topology file: " << destFilePath.generic_string(); + // Decompressing the topology file + if (destFilePath.extension() == ".gz") + { + const fs::path gzipPath{ bp::search_path("gzip") }; + stringstream ssCmd; + ssCmd << gzipPath.string() << " -d " << destFilePath; + string output; + execute(ssCmd.str(), chrono::seconds(60), &output); + // remove ".gz" extension + destFilePath = destFilePath.stem(); + } + // Activating new topology CTopoCore::Ptr_t topo{ make_shared() }; // Topology already validated on the commander, no need to validate it again @@ -518,12 +530,12 @@ bool CCommanderChannel::on_cmdASSIGN_USER_TASK(SCommandAttachmentImplm_collectionIndex != numeric_limits::max()) ba::replace_all(slot->m_sUsrExe, "%collectionIndex%", to_string(slot->m_collectionIndex)); - // If the user task was transfered, than replace "%DDS_DEFAULT_TASK_PATH%" with the real path + // If the user task was transferred, than replace "%DDS_DEFAULT_TASK_PATH%" with the real path fs::path dir(CUserDefaults::instance().getSlotsRootDir()); dir /= to_string(_sender.m_ID); dir += fs::path::preferred_separator; ba::replace_all(slot->m_sUsrExe, "%DDS_DEFAULT_TASK_PATH%", dir.generic_string()); - // If the user custom environment was transfered, than replace "%DDS_DEFAULT_TASK_PATH%" with the real path + // If the user custom environment was transferred, than replace "%DDS_DEFAULT_TASK_PATH%" with the real path ba::replace_all(slot->m_sUsrEnv, "%DDS_DEFAULT_TASK_PATH%", dir.generic_string()); // Revoke drain of the write queue to start accept messages @@ -550,7 +562,7 @@ bool CCommanderChannel::on_cmdASSIGN_USER_TASK(SCommandAttachmentImplm_taskAssets.push_back(pathAsset); break; @@ -753,7 +765,7 @@ bool CCommanderChannel::on_cmdSTOP_USER_TASK(SCommandAttachmentImplm_pid, [this, id]() { - // Once child termionation is finished, send User task "Done" to the commander + // Once child termination is finished, send User task "Done" to the commander pushMsg( SReplyCmd("Done", (uint16_t)SReplyCmd::EStatusCode::OK, 0, cmdSTOP_USER_TASK), id); }); @@ -895,7 +907,7 @@ void CCommanderChannel::onNewUserTask(uint64_t _slotID, pid_t _pid) if (WIFEXITED(status)) { // NOTE: We are using a bash wrapper script for user tasks. - // According to bash, the exist status of child processes can be interpreted in the folloiwing + // According to bash, the exist status of child processes can be interpreted in the following // way: // - For the shell’s purposes, a command which exits with a zero exit status has succeeded. // - A non-zero exit status indicates failure. @@ -1001,8 +1013,8 @@ void CCommanderChannel::terminateChildrenProcesses( enumChildProcesses(mainPid, vecChildren); // the mainPid is never included to the list - // In case of the agent the reseaon is obviouse. - // In case of a task, since it is running via the DDS task wrapper it will exit autoamticlaly once children are out + // In case of the agent the reason is obvious. + // In case of a task, since it is running via the DDS task wrapper it will exit automatically once children are out string sChildren; pidContainer_t pidChildren; for (const auto& i : vecChildren) @@ -1088,7 +1100,7 @@ void CCommanderChannel::terminateChildrenProcesses( else { // kill all child process of tasks if there are any - // We do it before terminating tasks to give parenrt task processes a change to read state of children - + // We do it before terminating tasks to give the parent task processes a chance to read state of children - // otherwise we will get zombies if user tasks don't manage their children properly LOG(info) << "Timeout is reached. Sending unconditional kill signal to all existing child processes..."; for (auto const& pid : _children) @@ -1142,7 +1154,7 @@ void CCommanderChannel::taskExited(uint64_t _slotID, int _exitCode) slot->m_pid = 0; slot->m_taskID = 0; - // Drainning the Intercom write queue + // Draining the Intercom write queue m_intercomChannel->drainWriteQueue(true, _slotID); // Notify DDS commander @@ -1151,7 +1163,7 @@ void CCommanderChannel::taskExited(uint64_t _slotID, int _exitCode) catch (exception& _e) { LOG(fatal) << "Failed to remove user task on slot " << _slotID << " from the list of children: " << _e.what(); - LOG(error) << "Can't send TASK_DONE. The coresponding slot is missing"; + LOG(error) << "Can't send TASK_DONE. The corrsponding slot is missing"; } } @@ -1176,7 +1188,7 @@ void CCommanderChannel::stopChannel() terminateChildrenProcesses(0, [&waitCondition]() { waitCondition.notifyAll(); }); // wait for termination to finish - // either it is finsihed or we procceed in 30 sec in anyway + // either it is finished or we proceed in 30 sec in anyway waitCondition.waitUntil(std::chrono::system_clock::now() + std::chrono::seconds(30)); if (m_intercomChannel) @@ -1350,7 +1362,7 @@ bool CCommanderChannel::isLowDiskSpace(uintmax_t* _available) } catch (exception& _e) { - LOG(error) << "Failed getting avaliable disk space: " << _e.what(); + LOG(error) << "Failed to get available disk space: " << _e.what(); } return false; } diff --git a/dds-agent/src/CommanderChannel.h b/dds-agent/src/CommanderChannel.h index d2908ee3..bb1f116d 100644 --- a/dds-agent/src/CommanderChannel.h +++ b/dds-agent/src/CommanderChannel.h @@ -128,7 +128,7 @@ namespace dds /// The function first sends a graceful SIGTERM to all children. After a defined timeout (5 sec) an /// unconditional SIGKILL is sent. /// \param[in] _parentPid The pid of the parent process, which children needs to - /// \param[in] _onCompleteSlot is a callback fucntion. It's called when termination of all child process is + /// \param[in] _onCompleteSlot is a callback function. It's called when termination of all child process is /// finished. void terminateChildrenProcesses(pid_t _parentPid, const terminateChildrenOnComplete_t& _onCompleteSlot); void terminateChildrenProcesses(timerPtr_t& _timer, diff --git a/dds-commander/src/AgentChannel.cpp b/dds-commander/src/AgentChannel.cpp index dc63681a..475d243b 100644 --- a/dds-commander/src/AgentChannel.cpp +++ b/dds-commander/src/AgentChannel.cpp @@ -117,8 +117,8 @@ bool CAgentChannel::on_cmdREPLY_HOST_INFO(SCommandAttachmentImplm_slots << " task slots from " << m_info.m_id; for (size_t i = 0; i < _attachment->m_slots; ++i) { diff --git a/dds-commander/src/ConnectionManager.cpp b/dds-commander/src/ConnectionManager.cpp index 9b5067d3..74ef7a7a 100644 --- a/dds-commander/src/ConnectionManager.cpp +++ b/dds-commander/src/ConnectionManager.cpp @@ -336,7 +336,7 @@ void CConnectionManager::broadcastUpdateTopologyAndWait(weakChannelInfo_t::conta dds::tools_api::SProgressResponseData progress(_cmd, 0, m_updateTopology.m_nofRequests, 0); sendCustomCommandResponse(_channel, progress.toJSON()); - // Broadcast message or binary to agents + // Broadcast a message or a binary to agents size_t index = 0; for (auto& agent : _agents) { @@ -713,10 +713,10 @@ void CConnectionManager::on_cmdUSER_TASK_DONE(const SSenderInfo& _sender, } // MARK: ToolsAPI - onTaskDone - // send task done ToolsAPI event to registred channels. A channel, whcih is expired of filed should be removed from + // send task done ToolsAPI event to registered channels. A channel, which is expired of filed should be removed from // the list. lock_guard lock(m_mtxOnTaskDoneSubscribers); - // The loop always recalclates the end() iterator since we might delete expired elelemts from the list + // The loop always recalculates the end() iterator since we might delete expired elemets from the list for (auto iter = m_onTaskDoneSubscribers.begin(); iter != m_onTaskDoneSubscribers.end(); ++iter) { if (auto ch = iter->first.lock()) @@ -726,7 +726,7 @@ void CConnectionManager::on_cmdUSER_TASK_DONE(const SSenderInfo& _sender, response.m_taskID = _attachment->m_taskID; response.m_exitCode = (WIFEXITED(_attachment->m_exitCode) ? WEXITSTATUS(_attachment->m_exitCode) : 0); // NOTE: We are using a bash wrapper script for user tasks. - // According to bash, the exist status of child processes can be interpreted in the folloiwing way: + // According to bash, the exist status of child processes can be interpreted in the following way: // - For the shell’s purposes, a command which exits with a zero exit status has succeeded. // - A non-zero exit status indicates failure. // This seemingly counter-intuitive scheme is used so there is one well-defined way to indicate success @@ -744,7 +744,7 @@ void CConnectionManager::on_cmdUSER_TASK_DONE(const SSenderInfo& _sender, } else { - // channel is expiored - removing it from the list + // channel is expired - removing it from the list m_onTaskDoneSubscribers.erase(iter); } } @@ -754,7 +754,7 @@ void CConnectionManager::on_cmdGET_PROP_LIST(const SSenderInfo& /*_sender*/, SCommandAttachmentImpl::ptr_t /*_attachment*/, CAgentChannel::weakConnectionPtr_t /*_channel*/) { - // FIXME: This command desn't work without CKeyValueManager + // FIXME: This command doesn't work without CKeyValueManager } void CConnectionManager::on_cmdGET_PROP_VALUES(const SSenderInfo& /*_sender*/, @@ -823,7 +823,7 @@ void CConnectionManager::on_cmdCUSTOM_CMD(const SSenderInfo& _sender, else { LOG(error) << "Failed to deliver. Channel is missing. CUSTOM_CMD senderID: " << _sender.m_ID - << "; attachemnt: " << *_attachment; + << "; attachment: " << *_attachment; } } catch (boost::bad_lexical_cast&) @@ -923,7 +923,7 @@ void CConnectionManager::on_cmdCUSTOM_CMD(const SSenderInfo& _sender, ptr->pushMsg(*_attachment, v.m_protocolHeaderID); } - // Debug msg: there are too many of such messages if tasks intensivly use CC + // Debug msg: there are too many of such messages if tasks intensively use CC /* stringstream ss; ss << "Send custom command to " << channels.size() << " channels." << endl; @@ -988,7 +988,7 @@ void CConnectionManager::processToolsAPIRequests(const SCustomCmdCmd& _cmd, CAge else if (tag == "onTaskDone") { SOnTaskDoneRequestData info(data); - // add the given channel (_channel) to the list, which will be allerted whenever a task is exited + // add the given channel (_channel) to the list, which will be alerted whenever a task is exited lock_guard lock(m_mtxOnTaskDoneSubscribers); m_onTaskDoneSubscribers.push_back({ _channel, info }); } @@ -1036,7 +1036,7 @@ void CConnectionManager::submitAgents(const dds::tools_api::SSubmitRequestData& m_SubmitAgents.m_requestID = _submitInfo.m_requestID; m_SubmitAgents.zeroCounters(); - // Generating a submissin ID + // Generating a submission ID const boost::uuids::uuid submissionID{ boost::uuids::random_generator()() }; const string sSubmissionID{ boost::lexical_cast(submissionID) }; LOG(info) << "Initializing an agent submit request with submissionID = " << sSubmissionID; @@ -1051,7 +1051,7 @@ void CConnectionManager::submitAgents(const dds::tools_api::SSubmitRequestData& // Create / re-pack WN package // Include inline script, if present. - // For ssh plug-in inline script has a higher priorety, than the sxcript provided via the submit command + // For ssh plug-in inline script has a higher priority, than the script provided via the submit command // (--env-config). Only the ssh plug-in supports it. bool bNeedCustomEnv{ false }; string scriptFileName(pathWrkPackageDir.string()); @@ -1089,7 +1089,7 @@ void CConnectionManager::submitAgents(const dds::tools_api::SSubmitRequestData& // pack worker package sendToolsAPIMsg(_channel, _submitInfo.m_requestID, "Creating new worker package...", EMsgSeverity::info); - // Use a lightweightpackage when possible + // Use a lightweight package when possible _createWnPkg(bNeedCustomEnv, (_submitInfo.m_rms == "localhost"), _submitInfo.m_slots, @@ -1130,7 +1130,7 @@ void CConnectionManager::submitAgents(const dds::tools_api::SSubmitRequestData& try { // Execute RMS plug-in and don't wait for it. - // Omnce plug-in is up it will connect back to the commander. + // Once plug-in is up it will connect back to the commander. // We will report to user if it won't connect. execute(ssCmd.str()); } @@ -1258,7 +1258,7 @@ void CConnectionManager::updateTopology(const dds::tools_api::STopologyRequestDa // if (removedTasks.size() > 0) { - LOG(info) << "Stoppoing removed tasks"; + LOG(info) << "Stopping removed tasks"; weakChannelInfo_t::container_t agents; // FIXME: Needs to be reviewed for the current architecture // { @@ -1338,8 +1338,46 @@ void CConnectionManager::updateTopology(const dds::tools_api::STopologyRequestDa if (allAgents.size() == 0) throw runtime_error("There are no active agents."); + string topFileDestName{ "topology.xml" }; + fs::path copyTopoFile; + try + { + // compressing the topology. + // In some production cases the orig. file was more than 20MB. Sending such size to hundreds of agents + // might be resource consuming. In case of ALICE prod it was 300 agents (175K task slots) and 20 MB topo + // file. + LOG(info) << "Topology file uncompressed size: " + << dds::misc::HumanReadable{ fs::file_size(topologyFile) } << " Compressing topology file..."; + // make a copy of the orig. file + // The copy is located in the commander's working directory + const fs::path wrkDir(user_defaults_api::CUserDefaults::instance().getWrkDir()); + copyTopoFile = wrkDir; + copyTopoFile /= "topology_agent_copy.xml"; + fs::copy_file(topologyFile, copyTopoFile); + // compressing + const fs::path gzipPath{ bp::search_path("gzip") }; + stringstream ssCmd; + ssCmd << gzipPath.string() << " -9 " << copyTopoFile; + string output; + execute(ssCmd.str(), chrono::seconds(60), &output); + copyTopoFile += ".gz"; + topologyFile = copyTopoFile.string(); + LOG(info) << "Topology file compressed size: " + << dds::misc::HumanReadable{ fs::file_size(topologyFile) }; + topFileDestName += ".gz"; + } + catch (exception& e) + { + LOG(error) << "Failed to compress topology file. " << e.what(); + LOG(info) << "Sending uncompressed topology..."; + } + + LOG(info) << "Broadcasting topology update with a file: " << topologyFile; broadcastUpdateTopologyAndWait( - allAgents, _channel, "Updating topology for agents...", topologyFile, "topology.xml"); + allAgents, _channel, "Updating topology for agents...", topologyFile, topFileDestName); + + if (!copyTopoFile.empty()) + fs::remove(copyTopoFile); } // diff --git a/dds-commander/src/Scheduler.cpp b/dds-commander/src/Scheduler.cpp index c9b1348f..ce26c0b6 100644 --- a/dds-commander/src/Scheduler.cpp +++ b/dds-commander/src/Scheduler.cpp @@ -54,7 +54,7 @@ void CScheduler::makeScheduleImpl(CTopoCore& _topology, m_schedule.clear(); size_t nofChannels{ _channels.size() }; - // Map pair to vector of channel indeces. + // Map pair to vector of channel indexes. // This is needed in order to reduce number of regex matches and speed up scheduling. hostToChannelMap_t hostToChannelMap; for (size_t iChannel = 0; iChannel < nofChannels; ++iChannel) diff --git a/dds-commander/src/Scheduler.h b/dds-commander/src/Scheduler.h index a5ca9ec3..02ad13cf 100644 --- a/dds-commander/src/Scheduler.h +++ b/dds-commander/src/Scheduler.h @@ -37,7 +37,7 @@ namespace dds using weakChannelInfoVector_t = std::vector>; private: - // Map tuple to vector of channel indeces. + // Map tuple to vector of channel indexes. using hostToChannelMap_t = std::map, std::vector>; // Map pair to counter. diff --git a/dds-commander/src/main.cpp b/dds-commander/src/main.cpp index 56fde0df..b8f2216a 100644 --- a/dds-commander/src/main.cpp +++ b/dds-commander/src/main.cpp @@ -154,7 +154,7 @@ int main(int argc, char* argv[]) // <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< // TODO: A temporary solution to cancel slurm jobs. - // ToolsAPI and DDS Plug-in API is being upgraded to use protobuf. In meantime, we cancel slurm jobs diurectloy + // ToolsAPI and DDS Plug-in API is being upgraded to use protobuf. In meantime, we cancel slurm jobs directly // from commander. // - Loop over all submissions of this session diff --git a/dds-intercom-lib/src/Intercom.cpp b/dds-intercom-lib/src/Intercom.cpp index 9bed381f..ab31ac6d 100644 --- a/dds-intercom-lib/src/Intercom.cpp +++ b/dds-intercom-lib/src/Intercom.cpp @@ -108,7 +108,7 @@ void CCustomCmd::subscribe(signal_t::slot_function_type _subscriber) void CCustomCmd::subscribeOnReply(replySignal_t::slot_function_type _subscriber) { connection_t connection = m_service.m_impl->connectCustomCmdReply(_subscriber); - LOG(info) << "User process is waiting for replys from custom commands."; + LOG(info) << "User process is waiting for a reply from custom commands."; } void CCustomCmd::unsubscribe() diff --git a/dds-intercom-lib/src/Intercom.h b/dds-intercom-lib/src/Intercom.h index cd96f43d..d89e1025 100644 --- a/dds-intercom-lib/src/Intercom.h +++ b/dds-intercom-lib/src/Intercom.h @@ -93,7 +93,7 @@ namespace dds class CCustomCmd { public: - /// \typedef Custom command cllback function + /// \typedef Custom command callback function typedef boost::signals2::signal signal_t; @@ -171,7 +171,7 @@ namespace dds std::string m_id; ///< ID for communication with DDS commander. std::string m_wrkPackagePath; ///< A full path of the agent worker package, which needs to be deployed. std::string m_groupName; ///< Agent group name - std::string m_submissionTag; ///< Submission tag. It can be used by RMS to name dds jobs and direcrtories. + std::string m_submissionTag; ///< Submission tag. It can be used by RMS to name dds jobs and directories. std::string m_inlineConfig; ///< Content of this buffer will be added to the RMS job configuration file. }; diff --git a/dds-intercom-lib/src/IntercomServiceCore.h b/dds-intercom-lib/src/IntercomServiceCore.h index 20bda093..49b9a30f 100644 --- a/dds-intercom-lib/src/IntercomServiceCore.h +++ b/dds-intercom-lib/src/IntercomServiceCore.h @@ -119,7 +119,7 @@ namespace dds boost::asio::io_context m_io_context; ///> boost::asio IO context boost::thread_group m_workerThreads; ///> Thread container CAgentChannel::connectionPtr_t m_channel; ///< TCP channel for communication with DDS commander - CSMAgentChannel::connectionPtr_t m_SMChannel; ///< Shared memory channel for comunication with DDS agent + CSMAgentChannel::connectionPtr_t m_SMChannel; ///< Shared memory channel for communication with DDS agent std::atomic m_started; ///< True if started, False otherwise /// Condition variable used to stop the current thread. diff --git a/dds-misc-lib/src/EnvProp.h b/dds-misc-lib/src/EnvProp.h index 7f00a490..b132fef0 100644 --- a/dds-misc-lib/src/EnvProp.h +++ b/dds-misc-lib/src/EnvProp.h @@ -11,7 +11,7 @@ namespace dds { - /// a list of envioronment properties + /// a list of enviornment properties enum EEnvProp { task_id, ///< associated with $DDS_TASK_ID environment variable. diff --git a/dds-misc-lib/src/Logger.h b/dds-misc-lib/src/Logger.h index d447a427..a40de71a 100644 --- a/dds-misc-lib/src/Logger.h +++ b/dds-misc-lib/src/Logger.h @@ -127,7 +127,7 @@ namespace dds::misc // Default format for logger formatter formatter = - // TODO: std::setw doesn't work for the first collumn of the log (TimeStamp). Investigate! + // TODO: std::setw doesn't work for the first column of the log (TimeStamp). Investigate! expressions::stream << std::left << expressions::format_date_time("TimeStamp", "%Y-%m-%d %H:%M:%S.%f") diff --git a/dds-misc-lib/src/Process.h b/dds-misc-lib/src/Process.h index ebc51048..d56f9acb 100644 --- a/dds-misc-lib/src/Process.h +++ b/dds-misc-lib/src/Process.h @@ -79,7 +79,7 @@ namespace dds::misc _FileName + "\" is still running"); } - // Wrtiting new pidfile + // Writing new pidfile std::ofstream f(m_FileName.c_str()); if (!f.is_open()) throw std::runtime_error("can't create PID file: " + m_FileName); @@ -458,7 +458,7 @@ namespace dds::misc smart_path(&smartCmd); // FIX: A fix for cases when the parent process sets SIG_IGN (if it was created by - // bosot::process::spawn). Restore default handler. If we don't do so, we might fail to waitpid our + // boost::process::spawn). Restore default handler. If we don't do so, we might fail to waitpid our // children. After we started using boost::process we noticed that ::waitpid fails. boost:process either // sets its own handler or there is a call for signal(SIGCHLD, SIG_IGN); std::signal(SIGCHLD, SIG_DFL); @@ -561,7 +561,7 @@ namespace dds::misc watchdog.async_wait( [&](boost::system::error_code ec) { - // Workaround we can't use boost::process::child::wait_for because it's buged in Boost 1.70 and not + // Workaround we can't use boost::process::child::wait_for because it's bugged in Boost 1.70 and not // yet fixed. We therefore use a deadline timer. if (!ec) { diff --git a/dds-misc-lib/src/SSHConfigFile.h b/dds-misc-lib/src/SSHConfigFile.h index e557b01a..6dcb0da9 100644 --- a/dds-misc-lib/src/SSHConfigFile.h +++ b/dds-misc-lib/src/SSHConfigFile.h @@ -5,7 +5,7 @@ #ifndef _DDS_NCF_H_ #define _DDS_NCF_H_ // -// - - - - - = = = DDS NCF (nodes configuration file parcer) = = = - - - - - +// - - - - - = = = DDS NCF (nodes configuration file parser) = = = - - - - - // // a configuration should be a comma-separated values (CSV) with // the following records: diff --git a/dds-misc-lib/src/SysHelper.h b/dds-misc-lib/src/SysHelper.h index e73022c1..266e434f 100644 --- a/dds-misc-lib/src/SysHelper.h +++ b/dds-misc-lib/src/SysHelper.h @@ -179,7 +179,7 @@ namespace dds::misc return tmp; } /** - * @brief The function is used to access the host name (with FCDN) of the current processor. + * @brief The function is used to access the host name (with FQDN) of the current processor. * @param[out] _RetVal - The returned buffer string. Must not be NULL. **/ inline void get_hostname(std::string* _RetVal) diff --git a/dds-pipe-log-engine-lib/src/PipeLogEngine.cpp b/dds-pipe-log-engine-lib/src/PipeLogEngine.cpp index 7d24208f..c22057f1 100644 --- a/dds-pipe-log-engine-lib/src/PipeLogEngine.cpp +++ b/dds-pipe-log-engine-lib/src/PipeLogEngine.cpp @@ -47,7 +47,7 @@ void CLogEngine::start(const string& _pipeFilePath, onLogEvent_t _callback) // Open the pipe for reading m_fd = open(m_pipeName.c_str(), O_RDWR | O_NONBLOCK); if ((-1 == m_fd) && (EEXIST != errno)) - throw runtime_error("Can't opem a named pipe: " + m_pipeName); + throw runtime_error("Can't open a named pipe: " + m_pipeName); // Start the log engine m_thread = new boost::thread(boost::bind(&CLogEngine::thread_worker, this, m_fd, m_pipeName)); @@ -59,7 +59,7 @@ void CLogEngine::stop() if (NULL != m_thread) { m_stopLogEngine = 1; - // send just *one* charecter to wake up the thread. + // send just *one* character to wake up the thread. this->operator()("\0", ""); m_thread->join(); delete m_thread; diff --git a/dds-protocol-lib/src/BaseSMChannelImpl.h b/dds-protocol-lib/src/BaseSMChannelImpl.h index f4774ffc..dc8eb388 100644 --- a/dds-protocol-lib/src/BaseSMChannelImpl.h +++ b/dds-protocol-lib/src/BaseSMChannelImpl.h @@ -412,7 +412,7 @@ namespace dds void start() { - // Check that all message queues were succesfully created + // Check that all message queues were successfully created bool queuesCreated(true); for (const auto& v : m_transportIn) { diff --git a/dds-protocol-lib/src/BinaryAttachmentReceivedCmd.h b/dds-protocol-lib/src/BinaryAttachmentReceivedCmd.h index 3d4389b7..1f2ac31a 100644 --- a/dds-protocol-lib/src/BinaryAttachmentReceivedCmd.h +++ b/dds-protocol-lib/src/BinaryAttachmentReceivedCmd.h @@ -23,7 +23,7 @@ namespace dds std::string m_receivedFilePath; ///< Path to the received file std::string m_requestedFileName; ///< Requested name of the file uint16_t m_srcCommand; ///< Source command which initiated file transport - uint32_t m_receivedFileSize; ///< Number of recieved bytes + uint32_t m_receivedFileSize; ///< Number of received bytes uint32_t m_downloadTime; ///< Time spent to download file [microseconds] }; std::ostream& operator<<(std::ostream& _stream, const SBinaryAttachmentReceivedCmd& _val); diff --git a/dds-protocol-lib/src/ConnectionManagerImpl.h b/dds-protocol-lib/src/ConnectionManagerImpl.h index 16e0fad0..5cdf5350 100644 --- a/dds-protocol-lib/src/ConnectionManagerImpl.h +++ b/dds-protocol-lib/src/ConnectionManagerImpl.h @@ -97,7 +97,7 @@ namespace dds bindPortAndListen(m_acceptor); createClientAndStartAccept(m_acceptor); - // If we use second channel for communication with UI we have to start acceptiing connection on that + // If we use second channel for communication with UI we have to start accepting connection on that // channel. if (m_useUITransport) { @@ -388,10 +388,16 @@ namespace dds for (const auto& v : channels) { - if (v.m_channel.expired()) - continue; - auto ptr = v.m_channel.lock(); - ptr->pushBinaryAttachmentCmd(_data, _fileName, _cmdSource, v.m_protocolHeaderID); + // Post each push call, otherwise it will block other pushes until "post" each block of the + // binary file if the file is big. + this->m_ioContext.post( + [&] + { + if (v.m_channel.expired()) + return; + auto ptr = v.m_channel.lock(); + ptr->pushBinaryAttachmentCmd(_data, _fileName, _cmdSource, v.m_protocolHeaderID); + }); } } catch (std::bad_weak_ptr& e) @@ -459,7 +465,7 @@ namespace dds A* pThis = static_cast(this); pThis->newClientCreated(newClient); - // Subsribe on lobby member handshake + // Subscribe on task slot handshake newClient->template registerHandler( [this, newClient](const SSenderInfo& _sender) -> void { @@ -472,7 +478,7 @@ namespace dds << "Adding new slot to " << newClient->getId() << " with id " << _sender.m_ID; }); - // Subscribe on dissconnect event + // Subscribe on the disconnect event newClient->template registerHandler( [this, newClient](const SSenderInfo& /*_sender*/) -> void { this->removeClient(newClient.get()); }); @@ -559,7 +565,7 @@ namespace dds typename channelInfo_t::container_t m_channels; channelContainerCache_t m_channelsCache; - /// Used for the main comunication + /// Used for the main communication boost::asio::io_context m_ioContext; asioAcceptorPtr_t m_acceptor; diff --git a/dds-protocol-lib/src/UpdateTopologyCmd.h b/dds-protocol-lib/src/UpdateTopologyCmd.h index 5890b4a5..a865ba49 100644 --- a/dds-protocol-lib/src/UpdateTopologyCmd.h +++ b/dds-protocol-lib/src/UpdateTopologyCmd.h @@ -27,7 +27,7 @@ namespace dds void _convertToData(dds::misc::BYTEVector_t* _data) const; bool operator==(const SUpdateTopologyCmd& val) const; - // when 0 - valiadate, any other value - don't validate + // when 0 - validate, any other value - don't validate uint16_t m_nDisableValidation; // topology file std::string m_sTopologyFile; diff --git a/dds-session/src/Options.h b/dds-session/src/Options.h index f04f03fb..ae523c52 100644 --- a/dds-session/src/Options.h +++ b/dds-session/src/Options.h @@ -143,7 +143,7 @@ namespace dds "Can be used only with the \"remove\" command."); options.add_options()("mixed", bpo::bool_switch(&_options->m_bMixed), - "Use worker package for a mixed environment - workes on Linux and on OS X.\n" + "Use worker package for a mixed environment - agents on Linux and on OS X.\n" "Can be used only with the \"start\" command."); //...positional diff --git a/dds-session/src/Start.cpp b/dds-session/src/Start.cpp index 359087b1..4c061cb2 100644 --- a/dds-session/src/Start.cpp +++ b/dds-session/src/Start.cpp @@ -57,7 +57,7 @@ void CStart::getNewSessionID() { stringstream ss; if (nExitCode != 0) - ss << "dds-comnader prep-session exited with code " << nExitCode << "; "; + ss << "dds-commander prep-session exited with code " << nExitCode << "; "; if (!sErr.empty()) ss << "error: " << sErr; @@ -81,7 +81,7 @@ void CStart::getNewSessionID() bool CStart::checkPrecompiledWNBins(bool _Mixed) { // wn bin name: - // ---.tar.gz + // ---.tar.gz const string sBaseName("dds-wrk-bin"); const string sBaseSufix(".tar.gz"); const string sOSXArch("Darwin"); @@ -198,7 +198,7 @@ void CStart::getPrecompiledWNBins(StringVector_t& _list) // Set Working dir fs::path pathCur(boost::filesystem::current_path()); fs::current_path(pathWnBins); - // Trying to dowlond bins + // Trying to download bins stringstream ssURL; ssURL << " http://dds.gsi.de/releases/add" << "/" << DDS_VERSION_STRING << "/"; @@ -274,7 +274,7 @@ void CStart::checkCommanderStatus() this_thread::sleep_for(chrono::milliseconds(10)); } if (sHost.empty() || sPort.empty()) - throw runtime_error("Can't detect Comander's UI address. Assuming DDS Commander failed to start."); + throw runtime_error("Can't detect commander's UI address. Assuming DDS Commander failed to start."); LOG(log_stdout_clean) << "DDS commander appears online. Testing connection..."; @@ -300,7 +300,7 @@ void CStart::checkCommanderStatus() requestPtr->setResponseCallback( [](const SCommanderInfoResponseData& _info) { - LOG(debug) << "UI agent has recieved pid of the commander server: " << _info.m_pid; + LOG(debug) << "UI agent has received pid of the commander server: " << _info.m_pid; LOG(log_stdout_clean) << "------------------------"; LOG(log_stdout_clean) << "DDS commander server: " << _info.m_pid; LOG(log_stdout_clean) << "------------------------"; diff --git a/dds-session/src/main.cpp b/dds-session/src/main.cpp index 011707dd..ddf4fab3 100644 --- a/dds-session/src/main.cpp +++ b/dds-session/src/main.cpp @@ -76,7 +76,7 @@ void rebuildSessions(vector& _session_dirs, StringVector_t& _sessions) for (auto& dir : boost::make_iterator_range(fs::directory_iterator(pathSessions), {})) { _session_dirs.push_back(dir.path()); - // Workaround: using .leaf().string(), instead of just .leaf(), vecasue we want to avoid double quotes + // Workaround: using .leaf().string(), instead of just .leaf(), because we want to avoid double quotes // in output. _sessions.push_back(dir.path().filename().string()); } diff --git a/dds-submit/src/Options.h b/dds-submit/src/Options.h index 5e88aa91..5fe1785f 100644 --- a/dds-submit/src/Options.h +++ b/dds-submit/src/Options.h @@ -75,7 +75,7 @@ namespace dds "only RMS options. To define custom environment per agent, use --env-config.\n"); options.add_options()("env-config,e", bpo::value(&_options->m_envCfgFilePath), - "A path to a user enironment script. Will be execeuted once per agent (valid for all " + "A path to a user environment script. Will be executed once per agent (valid for all " "task slots of the agent).\n"); options.add_options()( "inline-config", diff --git a/dds-submit/src/main.cpp b/dds-submit/src/main.cpp index 09291b72..08d0e636 100644 --- a/dds-submit/src/main.cpp +++ b/dds-submit/src/main.cpp @@ -29,7 +29,7 @@ int main(int argc, char* argv[]) if (dds::misc::defaultExecReinit(options.m_sid) == EXIT_FAILURE) return EXIT_FAILURE; - // List all avbaliable plug-ins + // List all available plug-ins if (options.m_bListPlugins) { namespace fs = boost::filesystem; @@ -44,7 +44,7 @@ int main(int argc, char* argv[]) if (fs::exists(someDir) && fs::is_directory(someDir)) { - cout << "Avaliable RMS plug-ins:\n"; + cout << "Available RMS plug-ins:\n"; for (fs::directory_iterator dir_iter(someDir); dir_iter != end_iter; ++dir_iter) { if (fs::is_directory(dir_iter->status())) diff --git a/dds-tools-lib/src/Tools.cpp b/dds-tools-lib/src/Tools.cpp index 7ddceb64..a20393cd 100644 --- a/dds-tools-lib/src/Tools.cpp +++ b/dds-tools-lib/src/Tools.cpp @@ -88,7 +88,7 @@ boost::uuids::uuid CSession::create() if (!m_impl->m_sid.is_nil()) throw runtime_error("ToolsAPI: DDS session is already running."); - // Call "dds-session start" to fireup a new session + // Call "dds-session start" to fire up a new session // Get new session ID string sOut; string sErr; diff --git a/dds-tools-lib/src/Tools.h b/dds-tools-lib/src/Tools.h index c5bfa009..4c25febc 100644 --- a/dds-tools-lib/src/Tools.h +++ b/dds-tools-lib/src/Tools.h @@ -22,10 +22,10 @@ namespace dds { /** * - * \brief The main class of the Tools API. It represets a DDS session. + * \brief The main class of the Tools API. It represents a DDS session. * \details * It can be used to create new DDS sessions or attach to existing ones. - * Also, this class can be used to send and recieve Tools commands. + * Also, this class can be used to send and receive Tools commands. * Currently the following commands are \link ToolsProtocol.h \endlink * * Please note, when you send a requests, server will respond with a corresponding reply with a following @@ -54,7 +54,7 @@ namespace dds }); // Subscribe on Done evens. - // Server will send Done when there it has finsihed proccessing a corresponding request. + // Server will send Done when there it has finished processing a corresponding request. submitRequestPtr->setDoneCallback([&session, &start]() { auto end = chrono::high_resolution_clock::now(); chrono::duration elapsed = end - start; @@ -85,7 +85,7 @@ namespace dds }); // Subscribe on Done event. - // Server sends Done when it has finsihed proccessing the request. + // Server sends Done when it has finished processing the request. agentInfoRequestPtr->setDoneCallback([&session]() { session.unblockCurrentThread(); }); @@ -165,7 +165,7 @@ namespace dds * [&nTaskDoneCount](const SOnTaskDoneResponseData& _info) * { * ++nTaskDoneCount; - * cout << "Recieved onTaskDone event. TaskID: " << _info.m_taskID + * cout << "Received onTaskDone event. TaskID: " << _info.m_taskID * << " ; ExitCode: " << _info.m_exitCode * << " ; Signal: " << _info.m_signal; * }); diff --git a/dds-tools-lib/src/ToolsProtocol.cpp b/dds-tools-lib/src/ToolsProtocol.cpp index 5f804bf8..232b6a9e 100644 --- a/dds-tools-lib/src/ToolsProtocol.cpp +++ b/dds-tools-lib/src/ToolsProtocol.cpp @@ -248,7 +248,7 @@ namespace dds { return _os << _data.defaultToString() << "; instances: " << _data.m_instances << "; minInstances: " << _data.m_minInstances << "; slots: " << _data.m_slots - << "; falgs: " << _data.m_flags << "; config: " << _data.m_config << "; rms: " << _data.m_rms + << "; flags: " << _data.m_flags << "; config: " << _data.m_config << "; rms: " << _data.m_rms << "; pluginPath: " << _data.m_pluginPath << "; groupName: " << _data.m_groupName << "; submissionTag: " << _data.m_submissionTag << "; envCfgFilePath: " << _data.m_envCfgFilePath << "; inlineConfig: " << _data.m_inlineConfig; diff --git a/dds-tools-lib/src/ToolsProtocol.h b/dds-tools-lib/src/ToolsProtocol.h index a51dbc5d..53a08a0c 100644 --- a/dds-tools-lib/src/ToolsProtocol.h +++ b/dds-tools-lib/src/ToolsProtocol.h @@ -106,7 +106,7 @@ namespace dds std::string m_pluginPath; ///< Optional. A plug-in's directory search path std::string m_groupName; ///< A group name of agents. std::string m_submissionTag; ///< A Submission Tag - std::string m_envCfgFilePath; ///< A path to a user enironment script. Will be execeuted once per agent + std::string m_envCfgFilePath; ///< A path to a user environment script. Will be executed once per agent ///< (valid for all task slots of the agent) std::string m_inlineConfig; ///< Content of this buffer will be added to the RMS job configuration file. diff --git a/dds-tools-lib/tests/TestSession.cpp b/dds-tools-lib/tests/TestSession.cpp index 67eb67a0..1973d685 100644 --- a/dds-tools-lib/tests/TestSession.cpp +++ b/dds-tools-lib/tests/TestSession.cpp @@ -299,7 +299,7 @@ void runDDS(vector& _sessions) createSessions(_sessions); - // Initital topology + // Initial topology CTopology topo(topoPath.string()); auto numAgents = topo.getRequiredNofAgents(10); size_t requiredCount{ numAgents.first * numAgents.second }; @@ -309,7 +309,7 @@ void runDDS(vector& _sessions) session, topoPath, STopologyRequest::request_t::EUpdateType::ACTIVATE, numAgents, numAgents, requiredCount); } - // Upscaled topology + // Unscaled topology CTopology upTopo(upTopoPath.string()); auto upNumAgents = upTopo.getRequiredNofAgents(10); requiredCount += upNumAgents.first * upNumAgents.second; @@ -323,7 +323,7 @@ void runDDS(vector& _sessions) requiredCount); } - // Downscaled topology + // Downcased topology CTopology downTopo(downTopoPath.string()); auto downNumAgents = downTopo.getRequiredNofAgents(10); requiredCount += downNumAgents.first * downNumAgents.second; @@ -662,7 +662,7 @@ BOOST_AUTO_TEST_CASE(test_dds_tools_unsubscribe) BOOST_CHECK(!sid.is_nil()); // test onTaskDone events - // Subscrube on events + // Subscribe on events SOnTaskDoneRequest::request_t request; SOnTaskDoneRequest::ptr_t requestPtr = SOnTaskDoneRequest::makeRequest(request); diff --git a/dds-topology-lib/src/TopoAsset.h b/dds-topology-lib/src/TopoAsset.h index 1f1ce480..637ee1b4 100644 --- a/dds-topology-lib/src/TopoAsset.h +++ b/dds-topology-lib/src/TopoAsset.h @@ -27,8 +27,8 @@ namespace dds enum class EVisibility { - Task, ///< The asset is visiable only for the task it is assigned to - Global ///< The asset is visiable for all tasks of the given session. + Task, ///< The asset is visible only for the task it is assigned to + Global ///< The asset is visible for all tasks of the given session. }; using Ptr_t = std::shared_ptr; diff --git a/dds-topology-lib/src/TopoCore.cpp b/dds-topology-lib/src/TopoCore.cpp index 37c8f0ba..c1a427c7 100644 --- a/dds-topology-lib/src/TopoCore.cpp +++ b/dds-topology-lib/src/TopoCore.cpp @@ -33,7 +33,7 @@ CTopoCore::CTopoCore() /// copy constructor CTopoCore::CTopoCore(CTopoCore const& _topo) { - // no need to lock this objec because no other thread + // no need to lock this object because no other thread // will be using it until after construction // but we DO need to lock the other object std::unique_lock lock_other(_topo.m_mtxTopoInit); diff --git a/dds-topology-lib/src/TopoCore.h b/dds-topology-lib/src/TopoCore.h index ce98e389..6969c75e 100644 --- a/dds-topology-lib/src/TopoCore.h +++ b/dds-topology-lib/src/TopoCore.h @@ -146,7 +146,7 @@ namespace dds std::string m_currentCollectionIdPath; Id_t m_currentCollectionId{ 0 }; - bool m_bXMLValidationDisabled{ false }; ///< if true than XML will not be validated agains XSD + bool m_bXMLValidationDisabled{ false }; ///< if true than XML will not be validated against XSD std::string m_name; ///< Name of the topology uint32_t m_hash{ 0 }; ///< CRC64 of the topology XML file std::string m_filepath; ///< Path to the XML topology file diff --git a/dds-topology-lib/src/TopoParserXML.h b/dds-topology-lib/src/TopoParserXML.h index 0d94940a..92fc9f1c 100644 --- a/dds-topology-lib/src/TopoParserXML.h +++ b/dds-topology-lib/src/TopoParserXML.h @@ -19,7 +19,7 @@ namespace dds { public: /// \brief Parse topology from specified XML file. - /// \param[out] _pt Output ptoperty tree. + /// \param[out] _pt Output property tree. /// \param[in] _filepath Path to the topology file. /// \param[in] _schemaFilepath Path to the XSD schema file. /// \throw std::runtime_error. @@ -29,7 +29,7 @@ namespace dds std::string* _topologyName = nullptr); /// \brief Parse topology from specified XML input stream. - /// \param[out] _pt Output ptoperty tree. + /// \param[out] _pt Output property tree. /// \param[in] _stream Input stream. /// \param[in] _schemaFilepath Path to the XSD schema file. /// \throw std::runtime_error. diff --git a/dds-topology-lib/src/TopoTask.h b/dds-topology-lib/src/TopoTask.h index eb304a1e..5cf2c10a 100644 --- a/dds-topology-lib/src/TopoTask.h +++ b/dds-topology-lib/src/TopoTask.h @@ -94,7 +94,7 @@ namespace dds private: std::string m_exe; ///< Path to executable - std::string m_env; ///< Path to environmtnt file + std::string m_env; ///< Path to environment file bool m_exeReachable{ true }; ///< If executable is available on the WN bool m_envReachable{ true }; ///< If environment script is available on the WN CTopoProperty::PtrMap_t m_properties; ///< Properties diff --git a/dds-topology-lib/src/TopoVars.h b/dds-topology-lib/src/TopoVars.h index 2127153a..1d155f3e 100644 --- a/dds-topology-lib/src/TopoVars.h +++ b/dds-topology-lib/src/TopoVars.h @@ -34,10 +34,10 @@ namespace dds /// \brief Destructor. virtual ~CTopoVars(); - /// \brief Init the obejct from DDS topology xml + /// \brief Init the object from DDS topology xml void initFromXML(const std::string& _filepath); - /// \brief Serizalize the obejct to DDS topology xml + /// \brief Serizalize the object to DDS topology xml void saveToXML(const std::string& _filepath); /// \brief Inherited from TopoBase diff --git a/dds-topology-lib/tests/TestPerformance.cpp b/dds-topology-lib/tests/TestPerformance.cpp index dfcd722c..cb1338b5 100644 --- a/dds-topology-lib/tests/TestPerformance.cpp +++ b/dds-topology-lib/tests/TestPerformance.cpp @@ -27,6 +27,7 @@ BOOST_AUTO_TEST_CASE(test_dds_topology_performance_1) auto execTime = STimeMeasure<>::execution( [&topology]() { + topology.setXMLValidationDisabled(true); for (size_t i = 0; i < 3; i++) { topology.init("topology_test_8.xml"); diff --git a/dds-topology/src/Options.h b/dds-topology/src/Options.h index e52d7999..0a4e12cc 100644 --- a/dds-topology/src/Options.h +++ b/dds-topology/src/Options.h @@ -66,7 +66,7 @@ namespace dds "Define a topology to update currently active topology."); options.add_options()("disable-validation", bpo::bool_switch(&_options->m_bDisableValidation), - "Disable topology valiadation."); + "Disable topology validation."); options.add_options()("activate", bpo::value(&_options->m_sTopoFile), "Request to activate agents, i.e. distribute and start user tasks."); diff --git a/dds-user-defaults/src/UserDefaults.cpp b/dds-user-defaults/src/UserDefaults.cpp index 0507e323..e0b080fb 100644 --- a/dds-user-defaults/src/UserDefaults.cpp +++ b/dds-user-defaults/src/UserDefaults.cpp @@ -552,7 +552,7 @@ string CUserDefaults::getMainSIDFile() const } /// Returns Session ID full file path (return main SID if exists. If there is no main, it checks for a clone SID. If -/// none of SID exist, the fucntions returns an empty string) +/// none of SID exist, the functions returns an empty string) string CUserDefaults::getSIDFile() const { string sWorkDir(m_options.m_server.m_workDir); diff --git a/dds-user-defaults/src/main.cpp b/dds-user-defaults/src/main.cpp index 99732a47..87071cb9 100644 --- a/dds-user-defaults/src/main.cpp +++ b/dds-user-defaults/src/main.cpp @@ -29,7 +29,7 @@ bool parseCmdLine(int _Argc, char* _Argv[]) bool ignoreDefaultSID(false); // Generic options bpo::options_description visible("Options"); - // WORKAROUND: repeat add_options call to help clang-format, otherwise it produce ureadable output + // WORKAROUND: repeat add_options call to help clang-format, otherwise it produce unreadable output visible.add_options()("help,h", "Produce help message"); visible.add_options()("version,v", "Version information"); visible.add_options()("path,p", "Show DDS user defaults config file path");