From bd33da92c936b2e8f78ee420d9a47a2c93b06e07 Mon Sep 17 00:00:00 2001 From: Kamil Cudnik Date: Wed, 17 Jul 2019 03:50:59 +0200 Subject: [PATCH] Process flex counters requests in separate thread (#483) --- syncd/syncd.cpp | 93 +++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 90 insertions(+), 3 deletions(-) diff --git a/syncd/syncd.cpp b/syncd/syncd.cpp index 2b9ae2a6a1..a24f79bf5e 100644 --- a/syncd/syncd.cpp +++ b/syncd/syncd.cpp @@ -38,6 +38,9 @@ std::shared_ptr g_redisClient; std::shared_ptr getResponse; std::shared_ptr notifications; +std::shared_ptr g_processFlexCounterEventThread; +volatile bool g_processFlexCounterEventThreadRun = true; + /* * TODO: Those are hard coded values for mlnx integration for v1.0.1 they need * to be updated. @@ -2954,25 +2957,98 @@ void processFlexCounterGroupEvent( } } +std::queue g_flexCounterEventQueue; + +bool tryPopFlexCounterEvent( + _Out_ swss::KeyOpFieldsValuesTuple& kco) +{ + SWSS_LOG_ENTER(); + + std::lock_guard lock(g_mutex); + + if (g_flexCounterEventQueue.empty()) + { + return false; + } + + kco = g_flexCounterEventQueue.front(); + + g_flexCounterEventQueue.pop(); + + return true; +} + +void pushFlexCounterEvent( + _In_ const swss::KeyOpFieldsValuesTuple& kco) +{ + SWSS_LOG_ENTER(); + + std::lock_guard lock(g_mutex); + + g_flexCounterEventQueue.push(kco); +} + +bool processFlexCounterEvent( + _In_ const swss::KeyOpFieldsValuesTuple kco); + +void processFlexCounterEventThread() +{ + SWSS_LOG_ENTER(); + + while (g_processFlexCounterEventThreadRun) + { + swss::KeyOpFieldsValuesTuple kco; + + if (tryPopFlexCounterEvent(kco)) + { + if (!processFlexCounterEvent(kco)) + { + // event was not successfully processed, put it again to the queue + + pushFlexCounterEvent(kco); + } + } + + sleep(1); + } +} + void processFlexCounterEvent( _In_ swss::ConsumerTable &consumer) { SWSS_LOG_ENTER(); swss::KeyOpFieldsValuesTuple kco; + { std::lock_guard lock(g_mutex); consumer.pop(kco); } + // because flex counter event can arrive independently (on RIF interface) + // it may happen that it will be picked up from the select api before + // actual interface will be created, and subscription for counters will + // fail, so let's process each request in the thread and use queue for + // arriving events, and failed events will be put back to the queue until + // they will be processed + + pushFlexCounterEvent(kco); +} + +bool processFlexCounterEvent( + _In_ const swss::KeyOpFieldsValuesTuple kco) +{ + SWSS_LOG_ENTER(); + const auto &key = kfvKey(kco); - std::string &op = kfvOp(kco); + const std::string &op = kfvOp(kco); std::size_t delimiter = key.find_first_of(":"); if (delimiter == std::string::npos) { SWSS_LOG_ERROR("Failed to parse the key %s", key.c_str()); - return; + + return true; // if key is invalid there is no need to process this event again } const auto groupName = key.substr(0, delimiter); @@ -2987,7 +3063,7 @@ void processFlexCounterEvent( SWSS_LOG_WARN("port VID %s, was not found (probably port was removed/splitted) and will remove from counters now", sai_serialize_object_id(vid).c_str()); - op = DEL_COMMAND; + return false; } sai_object_type_t objectType = redis_sai_object_type_query(vid); // VID and RID will have the same object type @@ -3132,6 +3208,8 @@ void processFlexCounterEvent( FlexCounter::setBufferPoolCounterList(vid, rid, groupName, bufferPoolCounterIds, statsMode); } + + return true; } void printUsage() @@ -3846,6 +3924,11 @@ int syncd_main(int argc, char **argv) twd.setCallback(timerWatchdogCallback); + g_processFlexCounterEventThreadRun = true; + + g_processFlexCounterEventThread = std::make_shared(processFlexCounterEventThread); + + while(runMainLoop) { try @@ -4029,6 +4112,10 @@ int syncd_main(int argc, char **argv) #endif + g_processFlexCounterEventThreadRun = false; + + g_processFlexCounterEventThread->join(); + FlexCounter::removeAllCounters(); {