Skip to content

Commit

Permalink
redispipeline publish at flush
Browse files Browse the repository at this point in the history
  • Loading branch information
a114j0y committed Aug 25, 2024
1 parent aba0f66 commit 81ee88f
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 143 deletions.
3 changes: 2 additions & 1 deletion common/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ common_libswsscommon_la_SOURCES = \
common/zmqclient.cpp \
common/zmqserver.cpp \
common/asyncdbupdater.cpp \
common/redis_table_waiter.cpp
common/redis_table_waiter.cpp \
common/redispipeline.cpp

common_libswsscommon_la_CXXFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(LIBNL_CFLAGS) $(CODE_COVERAGE_CXXFLAGS)
common_libswsscommon_la_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(LIBNL_CPPFLAGS) $(CODE_COVERAGE_CPPFLAGS)
Expand Down
43 changes: 24 additions & 19 deletions common/producerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <sstream>
#include <utility>
#include <algorithm>
#include "redispipeline.h"
#include "redisreply.h"
#include "table.h"
#include "redisapi.h"
Expand All @@ -13,40 +14,37 @@ using namespace std;

namespace swss {

ProducerStateTable::ProducerStateTable(DBConnector *db, const string &tableName)
: ProducerStateTable(new RedisPipeline(db, 1), tableName, false)
ProducerStateTable::ProducerStateTable(DBConnector *db, const string &tableName, bool flushPub)
: ProducerStateTable(new RedisPipeline(db, 1), tableName, false, flushPub)
{
m_pipeowned = true;
}

ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &tableName, bool buffered)
ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &tableName, bool buffered, bool flushPub)
: TableBase(tableName, SonicDBConfig::getSeparator(pipeline->getDBConnector()))
, TableName_KeySet(tableName)
, m_flushPub(flushPub)
, m_buffered(buffered)
, m_pipeowned(false)
, m_tempViewActive(false)
, m_pipe(pipeline)
{
if (m_flushPub) {
m_pipe->addChannel(getChannelName(m_pipe->getDbId()));
}

// num in luaSet and luaDel means number of elements that were added to the key set,
// not including all the elements already present into the set.
string luaSet =
"local added = redis.call('SADD', KEYS[2], ARGV[2])\n"
"for i = 0, #KEYS - 3 do\n"
" redis.call('HSET', KEYS[3 + i], ARGV[3 + i * 2], ARGV[4 + i * 2])\n"
"end\n"
" if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
m_shaSet = m_pipe->loadRedisScript(luaSet);

string luaDel =
"local added = redis.call('SADD', KEYS[2], ARGV[2])\n"
"redis.call('SADD', KEYS[4], ARGV[2])\n"
"redis.call('DEL', KEYS[3])\n"
"if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
m_shaDel = m_pipe->loadRedisScript(luaDel);

string luaBatchedSet =
"local added = 0\n"
Expand All @@ -60,10 +58,6 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta
" end\n"
" idx = idx + tonumber(ARGV[idx]) * 2 + 1\n"
"end\n"
"if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
m_shaBatchedSet = m_pipe->loadRedisScript(luaBatchedSet);

string luaBatchedDel =
"local added = 0\n"
Expand All @@ -72,10 +66,6 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta
" redis.call('SADD', KEYS[3], KEYS[5 + i])\n"
" redis.call('DEL', KEYS[4] .. KEYS[5 + i])\n"
"end\n"
"if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
m_shaBatchedDel = m_pipe->loadRedisScript(luaBatchedDel);

string luaClear =
"redis.call('DEL', KEYS[1])\n"
Expand All @@ -84,6 +74,21 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta
" redis.call('DEL', k)\n"
"end\n"
"redis.call('DEL', KEYS[3])\n";

if (!m_flushPub) {
string luaPub =
"if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
luaSet += luaPub;
luaDel += luaPub;
luaBatchedSet += luaPub;
luaBatchedDel += luaPub;
}
m_shaSet = m_pipe->loadRedisScript(luaSet);
m_shaDel = m_pipe->loadRedisScript(luaDel);
m_shaBatchedSet = m_pipe->loadRedisScript(luaBatchedSet);
m_shaBatchedDel = m_pipe->loadRedisScript(luaBatchedDel);
m_shaClear = m_pipe->loadRedisScript(luaClear);

string luaApplyView = loadLuaScript("producer_state_table_apply_view.lua");
Expand Down
4 changes: 2 additions & 2 deletions common/producerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ namespace swss {
class ProducerStateTable : public TableBase, public TableName_KeySet
{
public:
ProducerStateTable(DBConnector *db, const std::string &tableName);
ProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered = false);
ProducerStateTable(DBConnector *db, const std::string &tableName, bool flushPub = false);
ProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered = false, bool flushPub = false);
virtual ~ProducerStateTable();

void setBuffered(bool buffered);
Expand Down
147 changes: 147 additions & 0 deletions common/redispipeline.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
#include "redispipeline.h"

namespace swss {

RedisPipeline::~RedisPipeline()
{
flush();
delete m_db;
}

redisReply *RedisPipeline::push(const RedisCommand& command, int expectedType)
{
switch (expectedType)
{
case REDIS_REPLY_NIL:
case REDIS_REPLY_STATUS:
case REDIS_REPLY_INTEGER:
{
int rc = redisAppendFormattedCommand(m_db->getContext(), command.c_str(), command.length());
if (rc != REDIS_OK)
{
// The only reason of error is REDIS_ERR_OOM (Out of memory)
// ref: https://github.com/redis/hiredis/blob/master/hiredis.c
throw std::bad_alloc();
}
m_expectedTypes.push(expectedType);
m_remaining++;
mayflush();
return NULL;
}
default:
{
flush();
RedisReply r(m_db, command, expectedType);
return r.release();
}
}
}

redisReply *RedisPipeline::push(const RedisCommand& command)
{
flush();
RedisReply r(m_db, command);
return r.release();
}

std::string RedisPipeline::loadRedisScript(const std::string& script)
{
RedisCommand loadcmd;
loadcmd.format("SCRIPT LOAD %s", script.c_str());
RedisReply r = push(loadcmd, REDIS_REPLY_STRING);

std::string sha = r.getReply<std::string>();
return sha;
}

void RedisPipeline::flush() {

lastHeartBeat = std::chrono::steady_clock::now();

if (m_remaining == 0) {
return;
}

while(m_remaining)
{
// Construct an object to use its dtor, so that resource is released
RedisReply r(pop());

}
publish();

}

int RedisPipeline::getIdleTime(std::chrono::time_point<std::chrono::steady_clock> tcurrent)
{
return static_cast<int>(std::chrono::duration_cast<std::chrono::milliseconds>(tcurrent - lastHeartBeat).count());
}

size_t RedisPipeline::size()
{
return m_remaining;
}

int RedisPipeline::getDbId()
{
return m_db->getDbId();
}

std::string RedisPipeline::getDbName()
{
return m_db->getDbName();
}

DBConnector *RedisPipeline::getDBConnector()
{
return m_db;
}

void RedisPipeline::addChannel(std::string channel) {
m_luaPub +=
"redis.call('PUBLISH', '" + channel + "', 'G');";

m_shaPub = loadRedisScript(m_luaPub);
}

redisReply *RedisPipeline::pop()
{
if (m_remaining == 0) return NULL;

redisReply *reply;
int rc = redisGetReply(m_db->getContext(), (void**)&reply);
if (rc != REDIS_OK)
{
throw RedisError("Failed to redisGetReply in RedisPipeline::pop", m_db->getContext());
}
RedisReply r(reply);
m_remaining--;

int expectedType = m_expectedTypes.front();
m_expectedTypes.pop();
r.checkReplyType(expectedType);
if (expectedType == REDIS_REPLY_STATUS)
{
r.checkStatusOK();
}
return r.release();
}

void RedisPipeline::mayflush()
{
if (m_remaining >= COMMAND_MAX)
flush();
}

void RedisPipeline::publish() {
if (m_shaPub == "") {
return;
}
RedisCommand cmd;
cmd.format(
"EVALSHA %s 0",
m_shaPub.c_str());
RedisReply r(m_db, cmd);
}

} // namespace swss
Loading

0 comments on commit 81ee88f

Please sign in to comment.