Skip to content

Commit

Permalink
[orchagent] implement ring buffer feature with a flag
Browse files Browse the repository at this point in the history
  • Loading branch information
a114j0y committed Aug 6, 2024
1 parent 465391d commit 3d3cde6
Show file tree
Hide file tree
Showing 5 changed files with 318 additions and 22 deletions.
15 changes: 13 additions & 2 deletions orchagent/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ extern size_t gMaxBulkSize;
#define DEFAULT_BATCH_SIZE 128
extern int gBatchSize;

bool gRingMode = false;

bool gSyncMode = false;
sai_redis_communication_mode_t gRedisCommunicationMode = SAI_REDIS_COMMUNICATION_MODE_REDIS_ASYNC;
string gAsicInstance;
Expand All @@ -72,7 +74,7 @@ bool gTraditionalFlexCounter = false;

void usage()
{
cout << "usage: orchagent [-h] [-r record_type] [-d record_location] [-f swss_rec_filename] [-j sairedis_rec_filename] [-b batch_size] [-m MAC] [-i INST_ID] [-s] [-z mode] [-k bulk_size] [-q zmq_server_address] [-c mode]" << endl;
cout << "usage: orchagent [-h] [-r record_type] [-d record_location] [-f swss_rec_filename] [-j sairedis_rec_filename] [-b batch_size] [-m MAC] [-i INST_ID] [-s] [-z mode] [-k bulk_size] [-q zmq_server_address] [-c mode] [-R]" << endl;
cout << " -h: display this message" << endl;
cout << " -r record_type: record orchagent logs with type (default 3)" << endl;
cout << " Bit 0: sairedis.rec, Bit 1: swss.rec, Bit 2: responsepublisher.rec. For example:" << endl;
Expand All @@ -92,6 +94,7 @@ void usage()
cout << " -k max bulk size in bulk mode (default 1000)" << endl;
cout << " -q zmq_server_address: ZMQ server address (default disable ZMQ)" << endl;
cout << " -c counter mode (traditional|asic_db), default: asic_db" << endl;
cout << " -R: enable the ring buffer mode" << endl;
}

void sighup_handler(int signo)
Expand Down Expand Up @@ -346,7 +349,7 @@ int main(int argc, char **argv)
string responsepublisher_rec_filename = Recorder::RESPPUB_FNAME;
int record_type = 3; // Only swss and sairedis recordings enabled by default.

while ((opt = getopt(argc, argv, "b:m:r:f:j:d:i:hsz:k:q:c:")) != -1)
while ((opt = getopt(argc, argv, "b:m:r:f:j:d:i:hsz:k:q:c:R")) != -1)
{
switch (opt)
{
Expand Down Expand Up @@ -437,6 +440,9 @@ int main(int argc, char **argv)
enable_zmq = true;
}
break;
case 'R':
gRingMode = true;
break;
default: /* '?' */
exit(EXIT_FAILURE);
}
Expand Down Expand Up @@ -782,6 +788,11 @@ int main(int argc, char **argv)
orchDaemon = make_shared<FabricOrchDaemon>(&appl_db, &config_db, &state_db, chassis_app_db.get(), zmq_server.get());
}

if (gRingMode) {
/* Initialize the ring before OrchDaemon initializing Orchs */
orchDaemon->enableRingBuffer();
}

if (!orchDaemon->init())
{
SWSS_LOG_ERROR("Failed to initialize orchestration daemon");
Expand Down
109 changes: 98 additions & 11 deletions orchagent/orch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,19 @@
#include "zmqconsumerstatetable.h"
#include "sai_serialize.h"

#define PRINT_ALL 1
#define VERBOSE true

using namespace swss;

int gBatchSize = 0;
// no need for further pops if entries popped in the iteration fewer than gBatchSize divided by POPS_SCALE
#define POPS_SCALE 10
// if #entries popped exceeds LARGE_TRAFFIC, drain them immediately instead of waiting for more pops
#define LARGE_TRAFFIC 10000

OrchRing* Orch::gRingBuffer = nullptr;
OrchRing* Executor::gRingBuffer = nullptr;

Orch::Orch(DBConnector *db, const string tableName, int pri)
{
Expand Down Expand Up @@ -155,6 +165,10 @@ size_t ConsumerBase::addToSync(const std::deque<KeyOpFieldsValuesTuple> &entries
return entries.size();
}

size_t ConsumerBase::addToSync(std::shared_ptr<std::deque<swss::KeyOpFieldsValuesTuple>> entries) {
return addToSync(*entries);
}

// TODO: Table should be const
size_t ConsumerBase::refillToSync(Table* table)
{
Expand Down Expand Up @@ -239,25 +253,88 @@ void ConsumerBase::dumpPendingTasks(vector<string> &ts)

void Consumer::execute()
{
// ConsumerBase::execute_impl<swss::ConsumerTableBase>();
static swss::PerformanceTimer timer("POPS", PRINT_ALL, VERBOSE);

SWSS_LOG_ENTER();

size_t update_size = 0;
auto table = static_cast<swss::ConsumerTableBase *>(getSelectable());
do
{
std::deque<KeyOpFieldsValuesTuple> entries;
table->pops(entries);
update_size = addToSync(entries);
} while (update_size != 0);
size_t popped_size = 0; // number of entries popped per iteration
swss::ConsumerTableBase *table = getConsumerTable();

size_t drain_size = 0; // number of entries popped, batched for a drain

do {

if (gRingBuffer && gRingBuffer->Serves(getName())) {
timer.start();
}

auto entries = std::make_shared<std::deque<KeyOpFieldsValuesTuple>>();
table->pops(*entries);

drain();
popped_size = entries->size();
drain_size += popped_size;

pushRingBuffer([=](){
addToSync(entries);
});

if (gRingBuffer && gRingBuffer->Serves(getName())) {
timer.stop();
timer.inc((int)popped_size);
}

if (drain_size >= LARGE_TRAFFIC) {
pushRingBuffer([=](){
drain();
});
drain_size = 0;
}

} while (gBatchSize && popped_size * POPS_SCALE > (size_t)gBatchSize);

pushRingBuffer([=](){
drain();
});
}

void Executor::pushRingBuffer(AnyTask&& func)
{
if (!gRingBuffer || !gRingBuffer->Started)
{
func();
}
else if (!gRingBuffer->Serves(getName()))
{
while (!gRingBuffer->IsEmpty() || !gRingBuffer->Idle) {
std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_MSECONDS));
}
func();
} else {
while (!gRingBuffer->push(func)) {
SWSS_LOG_WARN("fail to push..ring is full...");
}
gRingBuffer->cv.notify_one();
}
}

void Consumer::drain()
{
if (!m_toSync.empty())
if (m_toSync.empty())
return;

if (getName() == APP_ROUTE_TABLE_NAME) {
static swss::PerformanceTimer timer("DRAIN", PRINT_ALL, VERBOSE);
size_t before = m_toSync.size();
timer.start();
((Orch *)m_orch)->doTask((Consumer&)*this);
timer.stop();
size_t after = m_toSync.size();
timer.inc(before - after);
}
else
{
((Orch *)m_orch)->doTask((Consumer&)*this);
}
}

size_t Orch::addExistingData(const string& tableName)
Expand Down Expand Up @@ -542,6 +619,16 @@ void Orch::doTask()
}
}

void Orch::doTask(std::string excluded_table)
{
for (auto &it : m_consumerMap) {
if (gRingBuffer && it.second->getName() == excluded_table) {
continue;
}
it.second->drain();
}
}

void Orch::dumpPendingTasks(vector<string> &ts)
{
for (auto &it : m_consumerMap)
Expand Down
Loading

0 comments on commit 3d3cde6

Please sign in to comment.