Skip to content

Commit

Permalink
Add batch support in ZmqProducerStateTable. (#803)
Browse files Browse the repository at this point in the history
* Add batch support in zmq.

Change-Id: Iec83d86c8541598467a7eefaa37cc232326570f4

* Trigger test

Change-Id: I6cdf31582814724792e2046a49592be13725a653
  • Loading branch information
mint570 authored Nov 1, 2023
1 parent 1c18502 commit a57cf9e
Show file tree
Hide file tree
Showing 12 changed files with 375 additions and 149 deletions.
1 change: 0 additions & 1 deletion common/asyncdbupdater.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include "dbconnector.h"
#include "table.h"

#define MQ_RESPONSE_MAX_COUNT (4*1024*1024)
#define MQ_SIZE 100
#define MQ_MAX_RETRY 10
#define MQ_POLL_TIMEOUT (1000)
Expand Down
84 changes: 72 additions & 12 deletions common/binaryserializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

#include "common/armhelper.h"

#include <string>

using namespace std;

namespace swss {
Expand All @@ -12,27 +14,35 @@ class BinarySerializer {
static size_t serializeBuffer(
const char* buffer,
const size_t size,
const std::string& key,
const std::vector<swss::FieldValueTuple>& values,
const std::string& command,
const std::string& dbName,
const std::string& tableName)
const std::string& tableName,
const std::vector<KeyOpFieldsValuesTuple>& kcos)
{
auto tmpSerializer = BinarySerializer(buffer, size);

// Set the first pair as DB name and table name.
tmpSerializer.setKeyAndValue(
dbName.c_str(), dbName.length(),
tableName.c_str(), tableName.length());
tmpSerializer.setKeyAndValue(
key.c_str(), key.length(),
command.c_str(), command.length());
for (auto& kvp : values)
for (auto& kco : kcos)
{
auto& field = fvField(kvp);
auto& value = fvValue(kvp);
auto& key = kfvKey(kco);
auto& fvs = kfvFieldsValues(kco);
std::string fvs_len = std::to_string(fvs.size());
// For each request, the first pair is the key and the number of attributes,
// followed by the attribute pairs.
// The operation is not set, when there is no attribute, it is a DEL request.
tmpSerializer.setKeyAndValue(
field.c_str(), field.length(),
value.c_str(), value.length());
key.c_str(), key.length(),
fvs_len.c_str(), fvs_len.length());
for (auto& fv : fvs)
{
auto& field = fvField(fv);
auto& value = fvValue(fv);
tmpSerializer.setKeyAndValue(
field.c_str(), field.length(),
value.c_str(), value.length());
}
}

return tmpSerializer.finalize();
Expand Down Expand Up @@ -88,6 +98,56 @@ class BinarySerializer {
}
}

static void deserializeBuffer(
const char* buffer,
const size_t size,
std::string& dbName,
std::string& tableName,
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos)
{
std::vector<FieldValueTuple> values;
deserializeBuffer(buffer, size, values);
int fvs_size = -1;
KeyOpFieldsValuesTuple kco;
auto& key = kfvKey(kco);
auto& op = kfvOp(kco);
auto& fvs = kfvFieldsValues(kco);
for (auto& fv : values)
{
auto& field = fvField(fv);
auto& value = fvValue(fv);
// The first pair is the DB name and the table name.
if (fvs_size < 0)
{
dbName = field;
tableName = value;
fvs_size = 0;
continue;
}
// This is the beginning of a request.
// The first pair is the key and the number of attributes.
// If the attribute count is zero, it is a DEL request.
if (fvs_size == 0)
{
key = field;
fvs_size = std::stoi(value);
op = (fvs_size == 0) ? DEL_COMMAND : SET_COMMAND;
fvs.clear();
}
// This is an attribut pair.
else
{
fvs.push_back(fv);
--fvs_size;
}
// We got the last attribut pair. This is the end of a request.
if (fvs_size == 0)
{
kcos.push_back(std::make_shared<KeyOpFieldsValuesTuple>(kco));
}
}
}

private:
const char* m_buffer;
const size_t m_buffer_size;
Expand Down
17 changes: 10 additions & 7 deletions common/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,21 +103,24 @@ void ZmqClient::connect()
}

void ZmqClient::sendMsg(
const std::string& key,
const std::vector<swss::FieldValueTuple>& values,
const std::string& command,
const std::string& dbName,
const std::string& tableName,
const std::vector<KeyOpFieldsValuesTuple>& kcos,
std::vector<char>& sendbuffer)
{
int serializedlen = (int)BinarySerializer::serializeBuffer(
sendbuffer.data(),
sendbuffer.size(),
key,
values,
command,
dbName,
tableName);
tableName,
kcos);

if (serializedlen >= MQ_RESPONSE_MAX_COUNT)
{
SWSS_LOG_THROW("ZmqClient sendMsg message was too big (buffer size %d bytes, got %d), reduce the message size, message DROPPED",
MQ_RESPONSE_MAX_COUNT,
serializedlen);
}

SWSS_LOG_DEBUG("sending: %d", serializedlen);
int zmq_err = 0;
Expand Down
6 changes: 2 additions & 4 deletions common/zmqclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ class ZmqClient

void connect();

void sendMsg(const std::string& key,
const std::vector<swss::FieldValueTuple>& values,
const std::string& command,
const std::string& dbName,
void sendMsg(const std::string& dbName,
const std::string& tableName,
const std::vector<KeyOpFieldsValuesTuple>& kcos,
std::vector<char>& sendbuffer);
private:
void initialize(const std::string& endpoint);
Expand Down
32 changes: 17 additions & 15 deletions common/zmqconsumerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,28 @@ ZmqConsumerStateTable::ZmqConsumerStateTable(DBConnector *db, const std::string
SWSS_LOG_DEBUG("ZmqConsumerStateTable ctor tableName: %s", tableName.c_str());
}

void ZmqConsumerStateTable::handleReceivedData(std::shared_ptr<KeyOpFieldsValuesTuple> pkco)
void ZmqConsumerStateTable::handleReceivedData(const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> &kcos)
{
std::shared_ptr<KeyOpFieldsValuesTuple> clone = nullptr;
if (m_asyncDBUpdater != nullptr)
for (auto kco : kcos)
{
// clone before put to received queue, because received data may change by consumer.
clone = std::make_shared<KeyOpFieldsValuesTuple>(*pkco);
}

{
std::lock_guard<std::mutex> lock(m_receivedQueueMutex);
m_receivedOperationQueue.push(pkco);
}
std::shared_ptr<KeyOpFieldsValuesTuple> clone = nullptr;
if (m_asyncDBUpdater != nullptr)
{
// clone before put to received queue, because received data may change by consumer.
clone = std::make_shared<KeyOpFieldsValuesTuple>(*kco);
}

m_selectableEvent.notify(); // will release epoll
{
std::lock_guard<std::mutex> lock(m_receivedQueueMutex);
m_receivedOperationQueue.push(kco);
}

if (m_asyncDBUpdater != nullptr)
{
m_asyncDBUpdater->update(clone);
if (m_asyncDBUpdater != nullptr)
{
m_asyncDBUpdater->update(clone);
}
}
m_selectableEvent.notify(); // will release epoll
}

/* Get multiple pop elements */
Expand Down
2 changes: 1 addition & 1 deletion common/zmqconsumerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class ZmqConsumerStateTable : public Selectable, public TableBase, public ZmqMes
size_t dbUpdaterQueueSize();

private:
void handleReceivedData(std::shared_ptr<KeyOpFieldsValuesTuple> pkco);
void handleReceivedData(const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> &kcos);

std::mutex m_receivedQueueMutex;

Expand Down
62 changes: 39 additions & 23 deletions common/zmqproducerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ void ZmqProducerStateTable::set(
const string &op /*= SET_COMMAND*/,
const string &prefix)
{
std::vector<KeyOpFieldsValuesTuple> kcos = std::vector<KeyOpFieldsValuesTuple>{
KeyOpFieldsValuesTuple{key, op, values}
};
m_zmqClient.sendMsg(
key,
values,
op,
m_dbName,
m_tableNameStr,
kcos,
m_sendbuffer);

if (m_asyncDBUpdater != nullptr)
Expand All @@ -86,12 +87,13 @@ void ZmqProducerStateTable::del(
const string &op /*= DEL_COMMAND*/,
const string &prefix)
{
std::vector<KeyOpFieldsValuesTuple> kcos = std::vector<KeyOpFieldsValuesTuple>{
KeyOpFieldsValuesTuple{key, op, std::vector<FieldValueTuple>{}}
};
m_zmqClient.sendMsg(
key,
vector<FieldValueTuple>(),
op,
m_dbName,
m_tableNameStr,
kcos,
m_sendbuffer);

if (m_asyncDBUpdater != nullptr)
Expand All @@ -107,16 +109,11 @@ void ZmqProducerStateTable::del(

void ZmqProducerStateTable::set(const std::vector<KeyOpFieldsValuesTuple> &values)
{
for (const auto &value : values)
{
m_zmqClient.sendMsg(
kfvKey(value),
kfvFieldsValues(value),
SET_COMMAND,
m_dbName,
m_tableNameStr,
m_sendbuffer);
}
m_zmqClient.sendMsg(
m_dbName,
m_tableNameStr,
values,
m_sendbuffer);

if (m_asyncDBUpdater != nullptr)
{
Expand All @@ -131,16 +128,16 @@ void ZmqProducerStateTable::set(const std::vector<KeyOpFieldsValuesTuple> &value

void ZmqProducerStateTable::del(const std::vector<std::string> &keys)
{
std::vector<KeyOpFieldsValuesTuple> kcos;
for (const auto &key : keys)
{
m_zmqClient.sendMsg(
key,
vector<FieldValueTuple>(),
DEL_COMMAND,
m_dbName,
m_tableNameStr,
m_sendbuffer);
kcos.push_back(KeyOpFieldsValuesTuple{key, DEL_COMMAND, std::vector<FieldValueTuple>{}});
}
m_zmqClient.sendMsg(
m_dbName,
m_tableNameStr,
kcos,
m_sendbuffer);

if (m_asyncDBUpdater != nullptr)
{
Expand All @@ -155,6 +152,25 @@ void ZmqProducerStateTable::del(const std::vector<std::string> &keys)
}
}

void ZmqProducerStateTable::send(const std::vector<KeyOpFieldsValuesTuple> &kcos)
{
m_zmqClient.sendMsg(
m_dbName,
m_tableNameStr,
kcos,
m_sendbuffer);

if (m_asyncDBUpdater != nullptr)
{
for (const auto &value : kcos)
{
// async write need keep data till write to DB
std::shared_ptr<KeyOpFieldsValuesTuple> clone = std::make_shared<KeyOpFieldsValuesTuple>(value);
m_asyncDBUpdater->update(clone);
}
}
}

size_t ZmqProducerStateTable::dbUpdaterQueueSize()
{
if (m_asyncDBUpdater == nullptr)
Expand Down
3 changes: 3 additions & 0 deletions common/zmqproducerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ class ZmqProducerStateTable : public ProducerStateTable

virtual void del(const std::vector<std::string> &keys);

// Batched send that can include both SET and DEL requests.
virtual void send(const std::vector<KeyOpFieldsValuesTuple> &kcos);

size_t dbUpdaterQueueSize();
private:
void initialize(DBConnector *db, const std::string &tableName, bool dbPersistence);
Expand Down
Loading

0 comments on commit a57cf9e

Please sign in to comment.