Skip to content

Commit

Permalink
[orchagent] implement ring buffer feature with a flag
Browse files Browse the repository at this point in the history
  • Loading branch information
a114j0y committed Jul 25, 2024
1 parent 2367bca commit 7f23081
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 19 deletions.
15 changes: 13 additions & 2 deletions orchagent/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ extern size_t gMaxBulkSize;
#define DEFAULT_BATCH_SIZE 128
extern int gBatchSize;

bool gRingMode = false;

bool gSyncMode = false;
sai_redis_communication_mode_t gRedisCommunicationMode = SAI_REDIS_COMMUNICATION_MODE_REDIS_ASYNC;
string gAsicInstance;
Expand All @@ -72,7 +74,7 @@ bool gTraditionalFlexCounter = false;

void usage()
{
cout << "usage: orchagent [-h] [-r record_type] [-d record_location] [-f swss_rec_filename] [-j sairedis_rec_filename] [-b batch_size] [-m MAC] [-i INST_ID] [-s] [-z mode] [-k bulk_size] [-q zmq_server_address] [-c mode]" << endl;
cout << "usage: orchagent [-h] [-r record_type] [-d record_location] [-f swss_rec_filename] [-j sairedis_rec_filename] [-b batch_size] [-m MAC] [-i INST_ID] [-s] [-z mode] [-k bulk_size] [-q zmq_server_address] [-c mode] [-R]" << endl;
cout << " -h: display this message" << endl;
cout << " -r record_type: record orchagent logs with type (default 3)" << endl;
cout << " Bit 0: sairedis.rec, Bit 1: swss.rec, Bit 2: responsepublisher.rec. For example:" << endl;
Expand All @@ -92,6 +94,7 @@ void usage()
cout << " -k max bulk size in bulk mode (default 1000)" << endl;
cout << " -q zmq_server_address: ZMQ server address (default disable ZMQ)" << endl;
cout << " -c counter mode (traditional|asic_db), default: asic_db" << endl;
cout << " -R: enable the ring buffer mode" << endl;
}

void sighup_handler(int signo)
Expand Down Expand Up @@ -346,7 +349,7 @@ int main(int argc, char **argv)
string responsepublisher_rec_filename = Recorder::RESPPUB_FNAME;
int record_type = 3; // Only swss and sairedis recordings enabled by default.

while ((opt = getopt(argc, argv, "b:m:r:f:j:d:i:hsz:k:q:c:")) != -1)
while ((opt = getopt(argc, argv, "b:m:r:f:j:d:i:hsz:k:q:c:R")) != -1)
{
switch (opt)
{
Expand Down Expand Up @@ -437,6 +440,9 @@ int main(int argc, char **argv)
enable_zmq = true;
}
break;
case 'R':
gRingMode = true;
break;
default: /* '?' */
exit(EXIT_FAILURE);
}
Expand Down Expand Up @@ -782,6 +788,11 @@ int main(int argc, char **argv)
orchDaemon = make_shared<FabricOrchDaemon>(&appl_db, &config_db, &state_db, chassis_app_db.get(), zmq_server.get());
}

if (gRingMode) {
/* Initialize the ring before OrchDaemon initializing Orchs */
orchDaemon->enableRingBuffer();
}

if (!orchDaemon->init())
{
SWSS_LOG_ERROR("Failed to initialize orchestration daemon");
Expand Down
108 changes: 99 additions & 9 deletions orchagent/orch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,17 @@
#include "zmqconsumerstatetable.h"
#include "sai_serialize.h"

#define PRINT_ALL 1
#define VERBOSE true
#define LARGE_TRAFFIC 10000

using namespace swss;

int gBatchSize = 0;

OrchRing* Orch::gRingBuffer = nullptr;
OrchRing* Executor::gRingBuffer = nullptr;

Orch::Orch(DBConnector *db, const string tableName, int pri)
{
addConsumer(db, tableName, pri);
Expand Down Expand Up @@ -155,6 +162,10 @@ size_t ConsumerBase::addToSync(const std::deque<KeyOpFieldsValuesTuple> &entries
return entries.size();
}

size_t ConsumerBase::addToSync(std::shared_ptr<std::deque<swss::KeyOpFieldsValuesTuple>> entries) {
return addToSync(*entries);
}

// TODO: Table should be const
size_t ConsumerBase::refillToSync(Table* table)
{
Expand Down Expand Up @@ -241,17 +252,86 @@ void Consumer::execute()
{
// ConsumerBase::execute_impl<swss::ConsumerTableBase>();
SWSS_LOG_ENTER();
if (!gRingBuffer) {
size_t update_size = 0;
auto table = static_cast<swss::ConsumerTableBase *>(getSelectable());
do
{
std::deque<KeyOpFieldsValuesTuple> entries;
table->pops(entries);
update_size = addToSync(entries);
} while (update_size != 0);

size_t update_size = 0;
auto table = static_cast<swss::ConsumerTableBase *>(getSelectable());
do
{
std::deque<KeyOpFieldsValuesTuple> entries;
table->pops(entries);
update_size = addToSync(entries);
} while (update_size != 0);
drain();
} else if (gRingBuffer->Serves(getName())) {
ring_execute();
} else {
pushRingBuffer(
[=]() {
std::deque<KeyOpFieldsValuesTuple> entries;

getConsumerTable()->pops(entries);
addToSync(entries);
drain();
}
);
}
}

void Consumer::ring_execute()
{
SWSS_LOG_ENTER();
static swss::PerformanceTimer timer("POPS", PRINT_ALL, VERBOSE);
size_t total = 0;
while (true) {
timer.start();
auto entries = std::make_shared<std::deque<KeyOpFieldsValuesTuple>>();
getConsumerTable()->pops(*entries);
// number of entries popped
size_t count = entries->size();
total += count;
pushRingBuffer([=](){
addToSync(entries);
}, this);
timer.stop();
timer.inc((int)count);
if (total >= LARGE_TRAFFIC) {
pushRingBuffer([=](){
drain();
}, this);
total = 0;
}
if (!gBatchSize || count * 10 <= (size_t)gBatchSize) {
// some program doesn't initialize gBatchSize
// but use TableConsumable::DEFAULT_POP_BATCH_SIZE
break;
}
}
pushRingBuffer([=](){
drain();
}, this);
}

drain();
void Executor::pushRingBuffer(AnyTask&& func, Executor* e)
{
if (!gRingBuffer || !gRingBuffer->Started) {
func();
} else {
if (e && gRingBuffer->Serves(e->getName())) {
while (!gRingBuffer->push(func)) {
SWSS_LOG_WARN("fail to push..ring is full...");
}
gRingBuffer->cv.notify_one();

}
else
{
while (!gRingBuffer->IsEmpty() || !gRingBuffer->Idle) {
std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_MSECONDS));
}
func();
}
}
}

void Consumer::drain()
Expand Down Expand Up @@ -542,6 +622,16 @@ void Orch::doTask()
}
}

void Orch::doTask(std::string excluded_table)
{
for (auto &it : m_consumerMap) {
if (gRingBuffer && it.second->getName() == excluded_table) {
continue;
}
it.second->drain();
}
}

void Orch::dumpPendingTasks(vector<string> &ts)
{
for (auto &it : m_consumerMap)
Expand Down
Loading

0 comments on commit 7f23081

Please sign in to comment.