Skip to content

Commit

Permalink
feat: adapt logcollector to new message format
Browse files Browse the repository at this point in the history
  • Loading branch information
cborla committed Nov 11, 2024
1 parent 5969a99 commit 549d465
Show file tree
Hide file tree
Showing 14 changed files with 199 additions and 178 deletions.
19 changes: 9 additions & 10 deletions src/agent/multitype_queue/include/imultitype_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ class IMultiTypeQueue
* @param moduleType The type of the module requesting the messages.
* @return Message The next message from the queue.
*/
virtual Message getNext(MessageType type, const std::string module = "",
const std::string moduleType = "") = 0;
virtual Message getNext(MessageType type, const std::string module = "", const std::string moduleType = "") = 0;

/**
* @brief Retrieves the next message from the queue asynchronously.
Expand All @@ -67,10 +66,10 @@ class IMultiTypeQueue
* @param moduleType The type of the module requesting the messages.
* @return boost::asio::awaitable<Message> Awaitable object representing the next message.
*/
virtual boost::asio::awaitable<Message>
getNextNAwaitable(MessageType type, int messageQuantity,
const std::string moduleName = "",
const std::string moduleType = "") = 0;
virtual boost::asio::awaitable<Message> getNextNAwaitable(MessageType type,
int messageQuantity,
const std::string moduleName = "",
const std::string moduleType = "") = 0;

/**
* @brief Retrieves the next N messages from the queue.
Expand All @@ -81,10 +80,10 @@ class IMultiTypeQueue
* @param moduleType The type of the module requesting the messages.
* @return std::vector<Message> A vector of messages fetched from the queue.
*/
virtual std::vector<Message>
getNextN(MessageType type, int messageQuantity,
const std::string moduleName = "",
const std::string moduleType = "") = 0;
virtual std::vector<Message> getNextN(MessageType type,
int messageQuantity,
const std::string moduleName = "",
const std::string moduleType = "") = 0;

/**
* @brief Deletes a message from the queue.
Expand Down
3 changes: 2 additions & 1 deletion src/agent/multitype_queue/include/message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ enum class MessageType
* module name, the module type and the metadata.
*
*/
class Message {
class Message
{
public:
MessageType type;
nlohmann::json data;
Expand Down
11 changes: 6 additions & 5 deletions src/agent/multitype_queue/include/multitype_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,16 @@ class MultiTypeQueue : public IMultiTypeQueue
/**
* @copydoc IMultiTypeQueue::getNextNAwaitable(MessageType, int, const std::string)
*/
boost::asio::awaitable<Message>
getNextNAwaitable(MessageType type, int messageQuantity,
const std::string moduleName = "",
const std::string moduleType = "") override;
boost::asio::awaitable<Message> getNextNAwaitable(MessageType type,
int messageQuantity,
const std::string moduleName = "",
const std::string moduleType = "") override;

/**
* @copydoc IMultiTypeQueue::getNextN(MessageType, int, const std::string)
*/
std::vector<Message> getNextN(MessageType type, int messageQuantity,
std::vector<Message> getNextN(MessageType type,
int messageQuantity,
const std::string moduleName = "",
const std::string moduleType = "") override;

Expand Down
12 changes: 8 additions & 4 deletions src/agent/multitype_queue/include/persistence.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ class Persistence
* @param moduleName The name of the module.
* @return nlohmann::json The retrieved JSON message.
*/
virtual nlohmann::json Retrieve(int id, const std::string& queueName,
const std::string& moduleName = "", const std::string& moduleType = "") = 0;
virtual nlohmann::json Retrieve(int id,
const std::string& queueName,
const std::string& moduleName = "",
const std::string& moduleType = "") = 0;

/**
* @brief Retrieve multiple JSON messages from the specified queue.
Expand All @@ -52,8 +54,10 @@ class Persistence
* @param moduleName The name of the module.
* @return nlohmann::json The retrieved JSON messages.
*/
virtual nlohmann::json RetrieveMultiple(int n, const std::string& queueName,
const std::string& moduleName = "", const std::string& moduleType = "") = 0;
virtual nlohmann::json RetrieveMultiple(int n,
const std::string& queueName,
const std::string& moduleName = "",
const std::string& moduleType = "") = 0;

/**
* @brief Remove a JSON message from the specified queue.
Expand Down
52 changes: 28 additions & 24 deletions src/agent/multitype_queue/src/multitype_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,12 @@ int MultiTypeQueue::push(Message message, bool shouldWait)
}
else
{
result = m_persistenceDest->Store(
message.data, m_mapMessageTypeName.at(message.type),
message.moduleName, message.moduleType, message.metaData);
m_cv.notify_all();
result = m_persistenceDest->Store(message.data,
m_mapMessageTypeName.at(message.type),
message.moduleName,
message.moduleType,
message.metaData);
m_cv.notify_all();
}
}
}
Expand Down Expand Up @@ -109,10 +111,12 @@ boost::asio::awaitable<int> MultiTypeQueue::pushAwaitable(Message message)
}
else
{
result = m_persistenceDest->Store(
message.data, m_mapMessageTypeName.at(message.type),
message.moduleName, message.moduleType, message.metaData);
m_cv.notify_all();
result = m_persistenceDest->Store(message.data,
m_mapMessageTypeName.at(message.type),
message.moduleName,
message.moduleType,
message.metaData);
m_cv.notify_all();
}
}
}
Expand Down Expand Up @@ -155,11 +159,10 @@ Message MultiTypeQueue::getNext(MessageType type, const std::string moduleName,
return result;
}

boost::asio::awaitable<Message>
MultiTypeQueue::getNextNAwaitable(MessageType type,
int messageQuantity,
const std::string moduleName,
const std::string moduleType)
boost::asio::awaitable<Message> MultiTypeQueue::getNextNAwaitable(MessageType type,
int messageQuantity,
const std::string moduleName,
const std::string moduleType)
{
boost::asio::steady_timer timer(co_await boost::asio::this_coro::executor);

Expand Down Expand Up @@ -191,9 +194,9 @@ MultiTypeQueue::getNextNAwaitable(MessageType type,
}

std::vector<Message> MultiTypeQueue::getNextN(MessageType type,
int messageQuantity,
const std::string moduleName,
const std::string moduleType)
int messageQuantity,
const std::string moduleName,
const std::string moduleType)
{
std::vector<Message> result;
if (m_mapMessageTypeName.contains(type))
Expand All @@ -202,9 +205,8 @@ std::vector<Message> MultiTypeQueue::getNextN(MessageType type,
m_persistenceDest->RetrieveMultiple(messageQuantity, m_mapMessageTypeName.at(type), moduleName, moduleType);
for (auto singleJson : arrayData)
{
result.emplace_back(type, singleJson["data"],
singleJson["moduleName"],
singleJson["moduleType"], singleJson["metadata"]);
result.emplace_back(
type, singleJson["data"], singleJson["moduleName"], singleJson["moduleType"], singleJson["metadata"]);
}
}
else
Expand Down Expand Up @@ -263,11 +265,13 @@ bool MultiTypeQueue::isFull(MessageType type, const std::string moduleName)
{
if (m_mapMessageTypeName.contains(type))
{
return static_cast<size_t>(m_persistenceDest->GetElementCount(
m_mapMessageTypeName.at(type), moduleName)) == m_maxItems;
} else {
// TODO: error handling
LogError("Error didn't find the queue.");
return static_cast<size_t>(m_persistenceDest->GetElementCount(m_mapMessageTypeName.at(type), moduleName)) ==
m_maxItems;
}
else
{
// TODO: error handling
LogError("Error didn't find the queue.");
}
return false;
}
Expand Down
56 changes: 28 additions & 28 deletions src/agent/multitype_queue/src/sqlitestorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ void SQLiteStorage::ReleaseDatabaseAccess()
m_cv.notify_one();
}


int SQLiteStorage::Store(const nlohmann::json& message,
const std::string& tableName,
const std::string& moduleName,
Expand All @@ -74,7 +73,7 @@ int SQLiteStorage::Store(const nlohmann::json& message,
std::string insertQuery;

constexpr std::string_view INSERT_QUERY {
R"(INSERT INTO {} (module_name, module_type, metadata, message) VALUES ("{}", "{}", "{}", ?);)"};
R"(INSERT INTO {} (module_name, module_type, metadata, message) VALUES ("{}", "{}", '{}', ?);)"};
insertQuery = fmt::format(INSERT_QUERY, tableName, moduleName, moduleType, metadata);

int result = 0;
Expand Down Expand Up @@ -115,16 +114,17 @@ int SQLiteStorage::Store(const nlohmann::json& message,
}

// TODO: we shouldn't use rowid outside the table itself
nlohmann::json SQLiteStorage::Retrieve(int id, const std::string& tableName,
const std::string& moduleName, [[maybe_unused]] const std::string& moduleType)
nlohmann::json SQLiteStorage::Retrieve(int id,
const std::string& tableName,
const std::string& moduleName,
[[maybe_unused]] const std::string& moduleType)
{
std::string selectQuery;
if (moduleName.empty())
{
constexpr std::string_view SELECT_QUERY{
"SELECT module_name, module_type, metadata, message FROM {} WHERE "
"rowid = ?;"};
selectQuery = fmt::format(SELECT_QUERY, tableName);
constexpr std::string_view SELECT_QUERY {"SELECT module_name, module_type, metadata, message FROM {} WHERE "
"rowid = ?;"};
selectQuery = fmt::format(SELECT_QUERY, tableName);
}
else
{
Expand All @@ -137,12 +137,12 @@ nlohmann::json SQLiteStorage::Retrieve(int id, const std::string& tableName,
{
SQLite::Statement query(*m_db, selectQuery);
query.bind(1, id);
nlohmann::json outputJson = {{"moduleName", ""},{"moduleType", ""},{"metadata", ""},{"data", {}}};
nlohmann::json outputJson = {{"moduleName", ""}, {"moduleType", ""}, {"metadata", ""}, {"data", {}}};
if (query.executeStep())
{
if (query.getColumnCount() == 4 &&
query.getColumn(3).getType() == SQLite::TEXT && query.getColumn(2).getType() == SQLite::TEXT &&
query.getColumn(1).getType() == SQLite::TEXT && query.getColumn(0).getType() == SQLite::TEXT)
if (query.getColumnCount() == 4 && query.getColumn(3).getType() == SQLite::TEXT &&
query.getColumn(2).getType() == SQLite::TEXT && query.getColumn(1).getType() == SQLite::TEXT &&
query.getColumn(0).getType() == SQLite::TEXT)
{
std::string moduleNameString = query.getColumn(0).getString();
std::string moduleTypeString = query.getColumn(1).getString();
Expand Down Expand Up @@ -181,8 +181,10 @@ nlohmann::json SQLiteStorage::Retrieve(int id, const std::string& tableName,
}
}

nlohmann::json SQLiteStorage::RetrieveMultiple(int n, const std::string& tableName,
const std::string& moduleName, [[maybe_unused]] const std::string& moduleType)
nlohmann::json SQLiteStorage::RetrieveMultiple(int n,
const std::string& tableName,
const std::string& moduleName,
[[maybe_unused]] const std::string& moduleType)
{
std::string selectQuery;
if (moduleName.empty())
Expand All @@ -193,10 +195,10 @@ nlohmann::json SQLiteStorage::RetrieveMultiple(int n, const std::string& tableNa
}
else
{
constexpr std::string_view SELECT_MULTIPLE_QUERY{
"SELECT module_name, module_type, metadata, message FROM {} WHERE "
"module_name LIKE \"{}\" ORDER BY rowid ASC LIMIT ?;"};
selectQuery = fmt::format(SELECT_MULTIPLE_QUERY, tableName, moduleName);
constexpr std::string_view SELECT_MULTIPLE_QUERY {
"SELECT module_name, module_type, metadata, message FROM {} WHERE "
"module_name LIKE \"{}\" ORDER BY rowid ASC LIMIT ?;"};
selectQuery = fmt::format(SELECT_MULTIPLE_QUERY, tableName, moduleName);
}

try
Expand All @@ -206,16 +208,16 @@ nlohmann::json SQLiteStorage::RetrieveMultiple(int n, const std::string& tableNa
nlohmann::json messages = nlohmann::json::array();
while (query.executeStep())
{
if (query.getColumnCount() == 4 &&
query.getColumn(3).getType() == SQLite::TEXT && query.getColumn(2).getType() == SQLite::TEXT &&
query.getColumn(1).getType() == SQLite::TEXT && query.getColumn(0).getType() == SQLite::TEXT)
if (query.getColumnCount() == 4 && query.getColumn(3).getType() == SQLite::TEXT &&
query.getColumn(2).getType() == SQLite::TEXT && query.getColumn(1).getType() == SQLite::TEXT &&
query.getColumn(0).getType() == SQLite::TEXT)
{
std::string moduleNameString = query.getColumn(0).getString();
std::string moduleTypeString = query.getColumn(1).getString();
std::string metadataString = query.getColumn(2).getString();
std::string dataString = query.getColumn(3).getString();

nlohmann::json outputJson = {{"moduleName", ""},{"moduleType", ""},{"metadata", ""},{"data", {}}};
nlohmann::json outputJson = {{"moduleName", ""}, {"moduleType", ""}, {"metadata", ""}, {"data", {}}};

if (!dataString.empty())
{
Expand Down Expand Up @@ -292,12 +294,10 @@ int SQLiteStorage::RemoveMultiple(int n, const std::string& tableName, const std
}
else
{
constexpr std::string_view DELETE_MULTIPLE_QUERY{
"DELETE FROM {} WHERE module_name LIKE \"{}\" AND rowid IN "
"(SELECT rowid FROM {} WHERE module_name LIKE \"{}\" ORDER "
"BY rowid ASC LIMIT ?);"};
deleteQuery = fmt::format(DELETE_MULTIPLE_QUERY, tableName, moduleName,
tableName, moduleName);
constexpr std::string_view DELETE_MULTIPLE_QUERY {"DELETE FROM {} WHERE module_name LIKE \"{}\" AND rowid IN "
"(SELECT rowid FROM {} WHERE module_name LIKE \"{}\" ORDER "
"BY rowid ASC LIMIT ?);"};
deleteQuery = fmt::format(DELETE_MULTIPLE_QUERY, tableName, moduleName, tableName, moduleName);
}

try
Expand Down
12 changes: 8 additions & 4 deletions src/agent/multitype_queue/src/sqlitestorage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ class SQLiteStorage : public Persistence
* @param moduleName The name of the module that created the message.
* @return The retrieved JSON message.
*/
nlohmann::json Retrieve(int id, const std::string& tableName,
const std::string& moduleName = "", const std::string& moduleType = "") override;
nlohmann::json Retrieve(int id,
const std::string& tableName,
const std::string& moduleName = "",
const std::string& moduleType = "") override;

/**
* @brief Retrieve multiple JSON messages.
Expand All @@ -83,8 +85,10 @@ class SQLiteStorage : public Persistence
* @param moduleName The name of the module that created the message.
* @return A vector of retrieved JSON messages.
*/
nlohmann::json RetrieveMultiple(int n, const std::string& tableName,
const std::string& moduleName = "", const std::string& moduleType = "") override;
nlohmann::json RetrieveMultiple(int n,
const std::string& tableName,
const std::string& moduleName = "",
const std::string& moduleType = "") override;

/**
* @brief Remove a JSON message by its ID.
Expand Down
12 changes: 6 additions & 6 deletions src/agent/multitype_queue/tests/multitype_queue_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,17 +571,17 @@ TEST_F(MultiTypeQueueTest, FifoOrderCheck)
multiTypeQueue.getNextN(messageType, 10); // NOLINT(cppcoreguidelines-avoid-magic-numbers)
EXPECT_EQ(messageReceivedVector.size(), 10);

std::for_each(
messageReceivedVector.begin(),
messageReceivedVector.end(),
[i = 0](const auto& singleMessage) mutable
{ EXPECT_EQ(singleMessage.data, (nlohmann::json{{"Data", "for STATEFUL" + std::to_string(++i)}})); });
std::for_each(messageReceivedVector.begin(),
messageReceivedVector.end(),
[i = 0](const auto& singleMessage) mutable {
EXPECT_EQ(singleMessage.data, (nlohmann::json {{"Data", "for STATEFUL" + std::to_string(++i)}}));
});

// Keep the order of the message: FIFO
for (int i : {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
{
auto messageReceived = multiTypeQueue.getNextN(messageType, 1);
EXPECT_EQ(messageReceived[0].data, (nlohmann::json{{"Data", "for STATEFUL" + std::to_string(i)}}));
EXPECT_EQ(messageReceived[0].data, (nlohmann::json {{"Data", "for STATEFUL" + std::to_string(i)}}));
EXPECT_TRUE(multiTypeQueue.pop(messageType));
}
}
Loading

0 comments on commit 549d465

Please sign in to comment.