Skip to content

[common] enable redispipeline to only publish after flush #895

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 58 additions & 35 deletions common/producerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,39 +14,71 @@ using namespace std;
namespace swss {

ProducerStateTable::ProducerStateTable(DBConnector *db, const string &tableName)
: ProducerStateTable(new RedisPipeline(db, 1), tableName, false)
: ProducerStateTable(new RedisPipeline(db, 1), tableName, false, false)
{
m_pipeowned = true;
}

ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &tableName, bool buffered)
: ProducerStateTable(pipeline, tableName, buffered, false) {}

ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &tableName, bool buffered, bool flushPub)
: TableBase(tableName, SonicDBConfig::getSeparator(pipeline->getDBConnector()))
, TableName_KeySet(tableName)
, m_flushPub(flushPub)
, m_buffered(buffered)
, m_pipeowned(false)
, m_tempViewActive(false)
, m_pipe(pipeline)
{
reloadRedisScript();

string luaClear =
"redis.call('DEL', KEYS[1])\n"
"local keys = redis.call('KEYS', KEYS[2] .. '*')\n"
"for i,k in pairs(keys) do\n"
" redis.call('DEL', k)\n"
"end\n"
"redis.call('DEL', KEYS[3])\n";
m_shaClear = m_pipe->loadRedisScript(luaClear);

string luaApplyView = loadLuaScript("producer_state_table_apply_view.lua");
m_shaApplyView = m_pipe->loadRedisScript(luaApplyView);
}

ProducerStateTable::~ProducerStateTable()
{
if (m_pipeowned)
{
delete m_pipe;
}
}

void ProducerStateTable::reloadRedisScript()
{
// Set m_flushPub to remove publish from a single lua string and let pipeline do publish once per flush

// However, if m_buffered is false, follow the original one publish per lua design
// Hence we need to check both m_buffered and m_flushPub, and reload the redis script once setBuffered() changes m_buffered

/* 1. Inform the pipeline of what channel to publish, when flushPub feature is enabled */
if (m_buffered && m_flushPub)
m_pipe->addChannel(getChannelName(m_pipe->getDbId()));

/* 2. Setup lua strings: determine whether to attach luaPub after each lua string */

// num in luaSet and luaDel means number of elements that were added to the key set,
// not including all the elements already present into the set.
string luaSet =
"local added = redis.call('SADD', KEYS[2], ARGV[2])\n"
"for i = 0, #KEYS - 3 do\n"
" redis.call('HSET', KEYS[3 + i], ARGV[3 + i * 2], ARGV[4 + i * 2])\n"
"end\n"
" if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
m_shaSet = m_pipe->loadRedisScript(luaSet);

string luaDel =
"local added = redis.call('SADD', KEYS[2], ARGV[2])\n"
"redis.call('SADD', KEYS[4], ARGV[2])\n"
"redis.call('DEL', KEYS[3])\n"
"if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
m_shaDel = m_pipe->loadRedisScript(luaDel);
"redis.call('DEL', KEYS[3])\n";

string luaBatchedSet =
"local added = 0\n"
Expand All @@ -59,48 +91,39 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta
" redis.call('HSET', KEYS[3] .. KEYS[4 + i], attr, val)\n"
" end\n"
" idx = idx + tonumber(ARGV[idx]) * 2 + 1\n"
"end\n"
"if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
m_shaBatchedSet = m_pipe->loadRedisScript(luaBatchedSet);

string luaBatchedDel =
"local added = 0\n"
"for i = 0, #KEYS - 5 do\n"
" added = added + redis.call('SADD', KEYS[2], KEYS[5 + i])\n"
" redis.call('SADD', KEYS[3], KEYS[5 + i])\n"
" redis.call('DEL', KEYS[4] .. KEYS[5 + i])\n"
"end\n"
"if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
m_shaBatchedDel = m_pipe->loadRedisScript(luaBatchedDel);

string luaClear =
"redis.call('DEL', KEYS[1])\n"
"local keys = redis.call('KEYS', KEYS[2] .. '*')\n"
"for i,k in pairs(keys) do\n"
" redis.call('DEL', k)\n"
"end\n"
"redis.call('DEL', KEYS[3])\n";
m_shaClear = m_pipe->loadRedisScript(luaClear);

string luaApplyView = loadLuaScript("producer_state_table_apply_view.lua");
m_shaApplyView = m_pipe->loadRedisScript(luaApplyView);
}

ProducerStateTable::~ProducerStateTable()
{
if (m_pipeowned)
if (!m_flushPub || !m_buffered)
{
delete m_pipe;
string luaPub =
"if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
luaSet += luaPub;
luaDel += luaPub;
luaBatchedSet += luaPub;
luaBatchedDel += luaPub;
}

/* 3. load redis script based on the lua string */
m_shaSet = m_pipe->loadRedisScript(luaSet);
m_shaDel = m_pipe->loadRedisScript(luaDel);
m_shaBatchedSet = m_pipe->loadRedisScript(luaBatchedSet);
m_shaBatchedDel = m_pipe->loadRedisScript(luaBatchedDel);
}

void ProducerStateTable::setBuffered(bool buffered)
{
m_buffered = buffered;
reloadRedisScript();
}

void ProducerStateTable::set(const string &key, const vector<FieldValueTuple> &values,
Expand Down
4 changes: 4 additions & 0 deletions common/producerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class ProducerStateTable : public TableBase, public TableName_KeySet
public:
ProducerStateTable(DBConnector *db, const std::string &tableName);
ProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered = false);
ProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered, bool flushPub);
virtual ~ProducerStateTable();

void setBuffered(bool buffered);
Expand Down Expand Up @@ -51,6 +52,7 @@ class ProducerStateTable : public TableBase, public TableName_KeySet

void apply_temp_view();
private:
bool m_flushPub; // publish per piepeline flush intead of per redis script
bool m_buffered;
bool m_pipeowned;
bool m_tempViewActive;
Expand All @@ -62,6 +64,8 @@ class ProducerStateTable : public TableBase, public TableName_KeySet
std::string m_shaClear;
std::string m_shaApplyView;
TableDump m_tempViewState;

void reloadRedisScript(); // redis script may change if m_buffered changes
};

}
44 changes: 44 additions & 0 deletions common/redispipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

#include <string>
#include <queue>
#include <unordered_set>
#include <functional>
#include <chrono>
#include <iostream>
#include "redisreply.h"
#include "rediscommand.h"
#include "dbconnector.h"
Expand All @@ -22,9 +25,11 @@ class RedisPipeline {
RedisPipeline(const DBConnector *db, size_t sz = 128)
: COMMAND_MAX(sz)
, m_remaining(0)
, m_shaPub("")
{
m_db = db->newConnector(NEWCONNECTOR_TIMEOUT);
initializeOwnerTid();
lastHeartBeat = std::chrono::steady_clock::now();
}

~RedisPipeline() {
Expand Down Expand Up @@ -113,11 +118,19 @@ class RedisPipeline {

void flush()
{
lastHeartBeat = std::chrono::steady_clock::now();

if (m_remaining == 0) {
return;
}

while(m_remaining)
{
// Construct an object to use its dtor, so that resource is released
RedisReply r(pop());
}

publish();
}

size_t size()
Expand Down Expand Up @@ -145,12 +158,43 @@ class RedisPipeline {
m_ownerTid = gettid();
}

void addChannel(std::string channel)
{
if (m_channels.find(channel) != m_channels.end())
return;

m_channels.insert(channel);
m_luaPub += "redis.call('PUBLISH', '" + channel + "', 'G');";
m_shaPub = loadRedisScript(m_luaPub);
}

int getIdleTime(std::chrono::time_point<std::chrono::steady_clock> tcurrent=std::chrono::steady_clock::now())
{
return static_cast<int>(std::chrono::duration_cast<std::chrono::milliseconds>(tcurrent - lastHeartBeat).count());
}

void publish() {
if (m_shaPub.empty()) {
return;
}
RedisCommand cmd;
cmd.format(
"EVALSHA %s 0",
m_shaPub.c_str());
RedisReply r(m_db, cmd);
}

private:
DBConnector *m_db;
std::queue<int> m_expectedTypes;
size_t m_remaining;
long int m_ownerTid;

std::string m_luaPub;
std::string m_shaPub;
std::chrono::time_point<std::chrono::steady_clock> lastHeartBeat; // marks the timestamp of latest pipeline flush being invoked
std::unordered_set<std::string> m_channels;

void mayflush()
{
if (m_remaining >= COMMAND_MAX)
Expand Down
56 changes: 56 additions & 0 deletions tests/redis_piped_state_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -730,3 +730,59 @@ TEST(ConsumerStateTable, async_multitable)

cout << endl << "Done." << endl;
}

TEST(ConsumerStateTable, flushPub)
{
clearDB();

/* Prepare producer */
int index = 0;
string tableName = "UT_REDIS_THREAD_" + to_string(index);
DBConnector db(TEST_DB, 0, true);
RedisPipeline pipeline(&db);
ProducerStateTable p(&pipeline, tableName, false, true);
p.setBuffered(true);

string key = "TheKey";
int maxNumOfFields = 2;

/* Set operation */
{
vector<FieldValueTuple> fields;
for (int j = 0; j < maxNumOfFields; j++)
{
FieldValueTuple t(field(j), value(j));
fields.push_back(t);
}
p.set(key, fields);
}

/* Del operation */
p.del(key);
p.flush();

/* Prepare consumer */
ConsumerStateTable c(&db, tableName);
Select cs;
Selectable *selectcs;
cs.addSelectable(&c);

/* First pop operation */
{
int ret = cs.select(&selectcs);
EXPECT_EQ(ret, Select::OBJECT);
KeyOpFieldsValuesTuple kco;
c.pop(kco);
EXPECT_EQ(kfvKey(kco), key);
EXPECT_EQ(kfvOp(kco), "DEL");

auto fvs = kfvFieldsValues(kco);
EXPECT_EQ(fvs.size(), 0U);
}

/* Second select operation */
{
int ret = cs.select(&selectcs, 1000);
EXPECT_EQ(ret, Select::TIMEOUT);
}
}
Loading