diff --git a/hsflowd.spec b/hsflowd.spec index 1a9d2112..1596a0db 100644 --- a/hsflowd.spec +++ b/hsflowd.spec @@ -1,6 +1,6 @@ Summary: host sFlow daemon Name: hsflowd -Version: 2.0.27 +Version: 2.0.28 Release: 1 License: http://sflow.net/license.html Group: Applications/Internet diff --git a/src/Linux/hsflowd.h b/src/Linux/hsflowd.h index 04a12a8e..8c9d9cdb 100644 --- a/src/Linux/hsflowd.h +++ b/src/Linux/hsflowd.h @@ -358,6 +358,7 @@ extern "C" { // The generic start,tick,tock,final,end events are defined in evbus.h #define HSPEVENT_HOST_COUNTER_SAMPLE "csample" // (csample *) building counter-sample +#define HSPEVENT_INTF_COUNTER_SAMPLE "icsample" // (csample *) building intf counter-sample #define HSPEVENT_FLOW_SAMPLE "flow_sample" // (HSPPendingSample *) building flow-sample #define HSPEVENT_CONFIG_START "config_start" // begin config lines #define HSPEVENT_CONFIG_LINE "config_line" // (line)...next config line @@ -389,8 +390,15 @@ extern "C" { bool localTest:1; bool localSrc:1; bool localDst:1; + bool suppress:1; } HSPPendingSample; + typedef struct _HSPPendingCSample { + SFL_COUNTERS_SAMPLE_TYPE *cs; + SFLPoller *poller; + bool suppress:1; + } HSPPendingCSample; + typedef enum { HSP_TELEMETRY_FLOW_SAMPLES=0, HSP_TELEMETRY_COUNTER_SAMPLES, @@ -398,6 +406,8 @@ extern "C" { HSP_TELEMETRY_RTFLOW_SAMPLES, HSP_TELEMETRY_DATAGRAMS, HSP_TELEMETRY_DROPPED_SAMPLES, + HSP_TELEMETRY_FLOW_SAMPLES_SUPPRESSED, + HSP_TELEMETRY_COUNTER_SAMPLES_SUPPRESSED, HSP_TELEMETRY_NUM_COUNTERS } EnumHSPTelemetry; @@ -408,7 +418,9 @@ extern "C" { "rtmetric_samples", "rtflow_samples", "datagrams", - "dropped_samples" + "dropped_samples", + "flow_samples_suppressed", + "counter_samples_suppressed" }; #endif diff --git a/src/Linux/mod_sonic.c b/src/Linux/mod_sonic.c index c8079eb8..e6149e56 100644 --- a/src/Linux/mod_sonic.c +++ b/src/Linux/mod_sonic.c @@ -16,11 +16,14 @@ extern "C" { #define HSP_SONIC_DB_APPL_NAME "APPL_DB" #define HSP_SONIC_DB_COUNTERS_NAME "COUNTERS_DB" #define HSP_SONIC_DB_CONFIG_NAME "CONFIG_DB" +#define HSP_SONIC_DB_STATE_NAME "STATE_DB" #define HSP_SONIC_DB_EVENT_SUFFIX "_HSFLOWD_EVENTS" #define HSP_SONIC_FIELD_MAC "mac" #define HSP_SONIC_FIELD_LOCALAS "bgp_asn" #define HSP_SONIC_FIELD_IFINDEX "index" +#define HSP_SONIC_FIELD_IFINDEX_OS "ifindex" +#define HSP_SONIC_IFINDEX_UNDEFINED 0xFFFFFFFF #define HSP_SONIC_FIELD_IFSPEED "speed" #define HSP_SONIC_FIELD_IFSPEED_UNITS 1000000LL #define HSP_SONIC_FIELD_IFALIAS "alias" @@ -77,6 +80,8 @@ extern "C" { bool operUp:1; bool adminUp:1; uint32_t ifIndex; + uint32_t osIndex; + uint32_t osIndex_expected; uint64_t ifSpeed; char *ifAlias; SFLHost_nio_counters ctrs; @@ -109,10 +114,13 @@ extern "C" { typedef struct _HSP_mod_SONIC { EnumSonicState state; EVBus *pollBus; + EVBus *packetBus; UTHash *dbInstances; UTHash *dbTables; UTHash *portsByName; + UTHash *portsByOsIndex; UTArray *newPorts; + UTArray *unmappedPorts; bool changedSwitchPorts:1; u_char actorSystemMAC[8]; uint32_t localAS; @@ -127,8 +135,10 @@ extern "C" { } HSP_mod_SONIC; static void db_ping(EVMod *mod, HSPSonicDBClient *db); - static void discoverNewPorts(EVMod *mod); - static void discoverNewCollectors(EVMod *mod); + static bool mapPorts(EVMod *mod); + static bool discoverNewPorts(EVMod *mod); + static void signalCounterDiscontinuity(EVMod *mod, HSPSonicPort *prt); + static bool discoverNewCollectors(EVMod *mod); static void syncConfig(EVMod *mod); static void dbEvt_subscribe(EVMod *mod); @@ -222,6 +232,9 @@ extern "C" { && create) { prt = (HSPSonicPort *)my_calloc(sizeof(HSPSonicPort)); prt->portName = my_strdup(portName); + prt->ifIndex = HSP_SONIC_IFINDEX_UNDEFINED; + prt->osIndex = HSP_SONIC_IFINDEX_UNDEFINED; + prt->osIndex_expected = HSP_SONIC_IFINDEX_UNDEFINED; UTHashAdd(mdata->portsByName, prt); } return prt; @@ -777,6 +790,101 @@ extern "C" { } } + /*_________________---------------------------__________________ + _________________ db_getifIndexMap __________________ + -----------------___________________________------------------ + */ + + static void db_ifIndexMapCB(redisAsyncContext *ctx, void *magic, void *req_magic) + { + HSPSonicDBClient *db = (HSPSonicDBClient *)ctx->ev.data; + EVMod *mod = db->mod; + HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; + redisReply *reply = (redisReply *)magic; + HSPSonicPort *prt = (HSPSonicPort *)req_magic; + myDebug(1, "sonic db_ifIndexMapCB: reply=%s", db_replyStr(reply, db->replyBuf, YES)); + if(reply == NULL) + return; + if(reply->type == REDIS_REPLY_ARRAY + && reply->elements > 0 + && ISEVEN(reply->elements)) { + for(int ii = 0; ii < reply->elements; ii += 2) { + redisReply *c_name = reply->element[ii]; + redisReply *c_val = reply->element[ii + 1]; + if(c_name->type == REDIS_REPLY_STRING) { + myDebug(1, "sonic db_ifIndexMapCB: %s=%s", c_name->str, db_replyStr(c_val, db->replyBuf, YES)); + if(my_strequal(c_name->str, HSP_SONIC_FIELD_IFINDEX)) { + uint32_t idx = db_getU32(c_val); + if(prt->ifIndex != idx) { + if(prt->ifIndex != HSP_SONIC_IFINDEX_UNDEFINED) { + myDebug(1, "sonic ifIndex for port %s changed from %u to %u", + prt->portName, + prt->ifIndex, + idx); + signalCounterDiscontinuity(mod, prt); + } + prt->ifIndex = idx; + } + } + if(my_strequal(c_name->str, HSP_SONIC_FIELD_IFINDEX_OS)) { + uint32_t idx = db_getU32(c_val); + if(prt->osIndex != idx) { + if(prt->osIndex != HSP_SONIC_IFINDEX_UNDEFINED) { + myDebug(1, "sonic osIndex for port %s changed from %u to %u", + prt->portName, + prt->osIndex, + idx); + signalCounterDiscontinuity(mod, prt); + UTHashDel(mdata->portsByOsIndex, prt); + } + prt->osIndex = idx; + UTHashAdd(mdata->portsByOsIndex, prt); + if(prt->osIndex != prt->osIndex_expected) { + // either this port not found in readInterfaces() discovery, + // or we found it, but with a different linux ifIndex. + if(prt->osIndex_expected == HSP_SONIC_IFINDEX_UNDEFINED) + myDebug(1, "sonic osIndex for port %s == %u : not expected from adaptor discovery", + prt->portName, + prt->osIndex); + else + myDebug(1, "sonic osIndex for port %s expected %u, but got %u", + prt->portName, + prt->osIndex_expected, + prt->osIndex); + } + } + } + } + } + } + // there may be more to map + mapPorts(mod); + } + + static void db_getIfIndexMap(EVMod *mod, HSPSonicPort *prt) { + HSPSonicDBTable *dbTab = getDBTable(mod, HSP_SONIC_DB_STATE_NAME); + if(db_selectTab(dbTab)) { + myDebug(1, "sonic db_getIfIndexMap()"); + int status = redisAsyncCommand(dbTab->dbClient->ctx, + db_ifIndexMapCB, + prt, + "HGETALL PORT_INDEX_TABLE%s%s", dbTab->separator, prt->portName); + myDebug(1, "sonic db_getIfIndexMap() returned %d", status); + } + } + + static bool mapPorts(EVMod *mod) { + HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; + // kick off just one - starts a chain reaction if there are more. + // Gets the ifIndex and Linux (OS) ifIndex + HSPSonicPort *prt = UTArrayPop(mdata->unmappedPorts); + if(prt) { + db_getIfIndexMap(mod, prt); + return YES; + } + return NO; + } + /*_________________---------------------------__________________ _________________ db_getPortNames __________________ -----------------___________________________------------------ @@ -824,6 +932,10 @@ extern "C" { setStr(&prt->oid, p_oid->str); signalCounterDiscontinuity(mod, prt); } + if(prt->osIndex == HSP_SONIC_IFINDEX_UNDEFINED) { + // queue it for ifIndex discovery + UTArrayPush(mdata->unmappedPorts, prt); + } prt->mark = NO; } } @@ -866,8 +978,7 @@ extern "C" { redisReply *c_val = reply->element[ii + 1]; if(c_name->type == REDIS_REPLY_STRING) { myDebug(1, "sonic db_portStateCB: %s=%s", c_name->str, db_replyStr(c_val, db->replyBuf, YES)); - if(my_strequal(c_name->str, HSP_SONIC_FIELD_IFINDEX)) - prt->ifIndex = db_getU32(c_val); + // This "index" field is neither ifIndex nor osIndex, so ignore it. if(my_strequal(c_name->str, HSP_SONIC_FIELD_IFSPEED)) prt->ifSpeed = db_getU64(c_val) * HSP_SONIC_FIELD_IFSPEED_UNITS; if(my_strequal(c_name->str, HSP_SONIC_FIELD_IFALIAS)) @@ -893,18 +1004,10 @@ extern "C" { #endif if(adaptor) { - - // see if ifIndex matches - if(adaptor->ifIndex != prt->ifIndex) { - myDebug(1, "warning: port=%s adaptor->ifIndex(%d) != prt->ifIndex(%d)", - prt->portName, - adaptor->ifIndex, - prt->ifIndex); - // let the adaptor one win - hopefully this mismatch will not - // happen on a physical switch. Only on Sonic VS? - prt->ifIndex = adaptor->ifIndex; - } - + // The adaptor ifIndex will probably match the osIndex, + // Record it here so we can cross-check. + prt->osIndex_expected = adaptor->ifIndex; + // TODO: readVlans // TODO: read bond state (may need the nio->bond flag right away) HSPAdaptorNIO *nio = ADAPTOR_NIO(adaptor); @@ -931,13 +1034,16 @@ extern "C" { } } - static void discoverNewPorts(EVMod *mod) { + static bool discoverNewPorts(EVMod *mod) { HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; // kick off just one - starts a chain reaction if there are more. // Gets the state (index, speed etc.) so we can add it as an adaptor. HSPSonicPort *prt = UTArrayPop(mdata->newPorts); - if(prt) + if(prt) { db_getPortState(mod, prt); + return YES; + } + return NO; } @@ -1083,7 +1189,7 @@ extern "C" { int status = redisAsyncCommand(dbTab->dbClient->ctx, db_getLagInfoCB, dbTab->separator, - "KEYS PORTCHANNEL_MEMBER|*"); + "KEYS PORTCHANNEL_MEMBER%s*", dbTab->separator); myDebug(1, "sonic getLagInfo() returned %d", status); } } @@ -1234,12 +1340,15 @@ extern "C" { } } - static void discoverNewCollectors(EVMod *mod) { + static bool discoverNewCollectors(EVMod *mod) { HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; // kick off just one - starts a chain reaction if there are more. HSPSonicCollector *coll = UTArrayPop(mdata->newCollectors); - if(coll) + if(coll) { db_getCollectorInfo(mod, coll); + return YES; + } + return NO; } /*_________________---------------------------__________________ @@ -1548,7 +1657,8 @@ extern "C" { break; case HSP_SONIC_STATE_RUN: // check for new ports - discoverNewPorts(mod); + if(!discoverNewPorts(mod)) + mapPorts(mod); syncSwitchPorts(mod); discoverNewCollectors(mod); break; @@ -1567,6 +1677,110 @@ extern "C" { return; } + /*_________________---------------------------__________________ + _________________ port by osIndex __________________ + -----------------___________________________------------------ + */ + + static HSPSonicPort *getPortByOsIndex(EVMod *mod, uint32_t osIndex) { + HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; + HSPSonicPort search = { .osIndex = osIndex }; + return UTHashGet(mdata->portsByOsIndex, &search); + } + + /*_________________---------------------------__________________ + _________________ evt_flow_sample __________________ + -----------------___________________________------------------ + packet bus + */ + + static void evt_flow_sample(EVMod *mod, EVEvent *evt, void *data, size_t dataLen) { + HSPPendingSample *ps = (HSPPendingSample *)data; + // find and translate all ifIndex fields from the OS (Linux) ifIndex namespace to + // the SONiC ifIndex namespace. For packet samples that means: + // 1. sampler ds_index + // 2. flow_sample input, output ports + // If a mapping is missing for the sampler we have to block the sample. + // If a mapping is missing for in/out ports we have to zero it out (0 == unknown) + uint32_t osIndex = SFL_DS_INDEX(ps->sampler->dsi); + HSPSonicPort *prt = getPortByOsIndex(mod, osIndex); + if(prt == NULL || prt->ifIndex == HSP_SONIC_IFINDEX_UNDEFINED) { + // block this sample from being sent out. + // TODO: if sample_pool is maintained upstream then it + // may need to be adjusted here. + ps->suppress = YES; + } + else { + // fix datasource + sfl_sampler_set_dsAlias(ps->sampler, prt->ifIndex); + // fix in/out + if(ps->fs->input + && ps->fs->input != SFL_INTERNAL_INTERFACE) { + // translate, or mark unknown + HSPSonicPort *in = getPortByOsIndex(mod, ps->fs->input); + ps->fs->input = (in && in->ifIndex != HSP_SONIC_IFINDEX_UNDEFINED) + ? in->ifIndex + : 0; + } + if(ps->fs->output + && ps->fs->output != SFL_INTERNAL_INTERFACE + && (ps->fs->output & 0x80000000) == 0) { + // translate, or mark unknown + HSPSonicPort *out = getPortByOsIndex(mod, ps->fs->output); + ps->fs->output = (out && out->ifIndex != HSP_SONIC_IFINDEX_UNDEFINED) + ? out->ifIndex + : 0; + } + } + } + + /*_________________---------------------------__________________ + _________________ evt_cntr_sample __________________ + -----------------___________________________------------------ + */ + + static void evt_cntr_sample(EVMod *mod, EVEvent *evt, void *data, size_t dataLen) { + HSPPendingCSample *ps = (HSPPendingCSample *)data; + // find and translate all ifIndex fields from the OS (Linux) ifIndex namespace to + // the SONiC ifIndex namespace. For interface counter samples that means: + // 1. poller ds_index + // 2. generic counters ifIndex + // 3. LAG stucture aggregationID + // 4. entity structures? + // 5. Optical structures? + // 6. Parent structure? + uint32_t osIndex = SFL_DS_INDEX(ps->poller->dsi); + HSPSonicPort *prt = getPortByOsIndex(mod, osIndex); + if(prt == NULL || prt->ifIndex == HSP_SONIC_IFINDEX_UNDEFINED) { + // block this sample from being sent out + ps->suppress = YES; + } + else { + // fix datasource + sfl_poller_set_dsAlias(ps->poller, prt->ifIndex); + // look through counter structures + for(SFLCounters_sample_element *elem = ps->cs->elements; + elem != NULL; + elem = elem->nxt) { + if(elem->tag == SFLCOUNTERS_GENERIC) { + // fix generic ifIndex + elem->counterBlock.generic.ifIndex = prt->ifIndex; + } + else if(elem->tag == SFLCOUNTERS_LACP) { + // fix LAG aggregation ID + uint32_t aggID = elem->counterBlock.lacp.attachedAggID; + if(aggID) { + // translate, or mark unknown + HSPSonicPort *lagPort = getPortByOsIndex(mod, aggID); + elem->counterBlock.lacp.attachedAggID = (lagPort && lagPort->ifIndex != HSP_SONIC_IFINDEX_UNDEFINED) + ? lagPort->ifIndex + : 0; + } + } + } + } + } + /*_________________---------------------------__________________ _________________ module init __________________ -----------------___________________________------------------ @@ -1577,11 +1791,14 @@ extern "C" { mod->data = my_calloc(sizeof(HSP_mod_SONIC)); HSP_mod_SONIC *mdata = (HSP_mod_SONIC *)mod->data; mdata->pollBus = EVGetBus(mod, HSPBUS_POLL, YES); + mdata->packetBus = EVGetBus(mod, HSPBUS_PACKET, YES); mdata->dbInstances = UTHASH_NEW(HSPSonicDBClient, dbInstance, UTHASH_SKEY); mdata->dbTables = UTHASH_NEW(HSPSonicDBTable, dbTable, UTHASH_SKEY); mdata->portsByName = UTHASH_NEW(HSPSonicPort, portName, UTHASH_SKEY); + mdata->portsByOsIndex = UTHASH_NEW(HSPSonicPort, osIndex, UTHASH_DFLT); mdata->collectors = UTHASH_NEW(HSPSonicCollector, collectorName, UTHASH_SKEY); mdata->newPorts = UTArrayNew(UTARRAY_DFLT); + mdata->unmappedPorts = UTArrayNew(UTARRAY_DFLT); mdata->newCollectors = UTArrayNew(UTARRAY_DFLT); // retainRootRequest(mod, "Needed to call out to OPX scripts (PYTHONPATH)"); @@ -1618,6 +1835,9 @@ extern "C" { mdata->configEvent = EVGetEvent(mdata->pollBus, HSPEVENT_CONFIG_LINE); mdata->configEndEvent = EVGetEvent(mdata->pollBus, HSPEVENT_CONFIG_END); + // intercept samples before they go out so we can rewrite ifindex numbers + EVEventRx(mod, EVGetEvent(mdata->packetBus, HSPEVENT_FLOW_SAMPLE), evt_flow_sample); + EVEventRx(mod, EVGetEvent(mdata->pollBus, HSPEVENT_INTF_COUNTER_SAMPLE), evt_cntr_sample); } diff --git a/src/Linux/readPackets.c b/src/Linux/readPackets.c index 2817c8eb..6823d00c 100644 --- a/src/Linux/readPackets.c +++ b/src/Linux/readPackets.c @@ -156,10 +156,26 @@ extern "C" { SFLADD_ELEMENT(cs, &sfp_elem); } - SEMLOCK_DO(sp->sync_agent) { - sfl_poller_writeCountersSample(poller, cs); - sp->counterSampleQueued = YES; - sp->telemetry[HSP_TELEMETRY_COUNTER_SAMPLES]++; + // circulate the cs to be annotated by other modules before it is sent out. + // This differs from the packet-sample treatment in that everything is + // on the stack. If we ever wanted to delay counter samples until additional + // lookups were performed then this would all have to shift onto the heap. + HSPPendingCSample ps = { .poller = poller, .cs = cs }; + EVEvent *evt_intf_cs = EVGetEvent(sp->pollBus, HSPEVENT_INTF_COUNTER_SAMPLE); + // TODO: can we specify pollBus only? Receiving this on another bus would + // be a disaster as we would not copy the whole structure here. + EVEventTx(sp->rootModule, evt_intf_cs, &ps, sizeof(ps)); + // TODO: use HSPPendingCSample for HSPEVENT_HOST_COUNTER_SAMPLE too? + // (might be useful to consumers to get pointer to the poller too). + if(ps.suppress) { + sp->telemetry[HSP_TELEMETRY_COUNTER_SAMPLES_SUPPRESSED]++; + } + else { + SEMLOCK_DO(sp->sync_agent) { + sfl_poller_writeCountersSample(poller, cs); + sp->counterSampleQueued = YES; + sp->telemetry[HSP_TELEMETRY_COUNTER_SAMPLES]++; + } } } } @@ -255,13 +271,19 @@ extern "C" { { if(--ps->refCount == 0) { EVBus *bus = EVCurrentBus(); - SEMLOCK_DO(sp->sync_agent) { - sfl_agent_set_now(ps->sampler->agent, bus->now.tv_sec, bus->now.tv_nsec); - sfl_sampler_writeFlowSample(ps->sampler, ps->fs); - sp->telemetry[HSP_TELEMETRY_FLOW_SAMPLES]++; + if(ps->suppress) { + sp->telemetry[HSP_TELEMETRY_FLOW_SAMPLES_SUPPRESSED]++; + } + else { + SEMLOCK_DO(sp->sync_agent) { + sfl_agent_set_now(ps->sampler->agent, bus->now.tv_sec, bus->now.tv_nsec); + sfl_sampler_writeFlowSample(ps->sampler, ps->fs); + sp->telemetry[HSP_TELEMETRY_FLOW_SAMPLES]++; + } } void *ptr; - UTARRAY_WALK(ps->ptrsToFree, ptr) my_free(ptr); + UTARRAY_WALK(ps->ptrsToFree, ptr) + my_free(ptr); UTArrayFree(ps->ptrsToFree); my_free(ps->fs); my_free(ps); diff --git a/src/sflow/sflow_api.h b/src/sflow/sflow_api.h index 34df09df..2829d7ce 100644 --- a/src/sflow/sflow_api.h +++ b/src/sflow/sflow_api.h @@ -74,7 +74,7 @@ typedef struct _SFLDataSource_instance { #define SFL_COUNTERS_SAMPLE_TYPE SFLCounters_sample /* if index numbers are not going to use all 32 bits, then we can use the more compact encoding, with the dataSource class and index merged */ -#define SFL_DS_DATASOURCE(dsi) (((dsi).ds_class << 24) + (dsi).ds_index) +#define SFL_DS_SOURCEID(cls,idx) ((cls) << 24) + (idx) #endif #define SFL_DS_INSTANCE(dsi) (dsi).ds_instance @@ -140,6 +140,8 @@ typedef struct _SFLSampler { uint32_t samplesThisTick; uint32_t samplesLastTick; uint32_t backoffThreshold; + /* optional alias datasource index */ + uint32_t ds_alias; } SFLSampler; /* declare */ @@ -165,6 +167,8 @@ typedef struct _SFLPoller { SFLReceiver *myReceiver; time_t countersCountdown; uint32_t countersSampleSeqNo; + /* optional alias datasource index */ + uint32_t ds_alias; } SFLPoller; typedef void *(*allocFn_t)(void *magic, /* callback to allocate space on heap */ @@ -312,6 +316,10 @@ void sfl_agent_set_now(SFLAgent *agent, time_t now_S, time_t now_nS); /* call this to change the designated sflow-agent-address */ void sfl_agent_set_address(SFLAgent *agent, SFLAddress *ip); +/* use this to remap datasource index numbers on export */ +void sfl_sampler_set_dsAlias(SFLSampler *sampler, uint32_t ds_alias); +void sfl_poller_set_dsAlias(SFLPoller *poller, uint32_t ds_alias); + /* convert stored "now" to mS since bootTime */ uint32_t sfl_agent_uptime_mS(SFLAgent *agent); diff --git a/src/sflow/sflow_poller.c b/src/sflow/sflow_poller.c index 339334f7..13697434 100644 --- a/src/sflow/sflow_poller.c +++ b/src/sflow/sflow_poller.c @@ -95,6 +95,13 @@ so that the sflow collector will know to ignore the next delta. */ void sfl_poller_resetCountersSeqNo(SFLPoller *poller) { poller->countersSampleSeqNo = 0; } +/*_________________---------------------------------__________________ + _________________ datasource alias __________________ + -----------------_________________________________------------------ +Used where we want to export a remapped namespace for datasource index +*/ +void sfl_poller_set_dsAlias(SFLPoller *poller, uint32_t ds_alias) { poller->ds_alias = ds_alias; } + /*_________________---------------------------__________________ _________________ sfl_poller_tick __________________ -----------------___________________________------------------ @@ -128,11 +135,13 @@ void sfl_poller_writeCountersSample(SFLPoller *poller, SFL_COUNTERS_SAMPLE_TYPE { /* fill in the rest of the header fields, and send to the receiver */ cs->sequence_number = ++poller->countersSampleSeqNo; + uint32_t ds_class = SFL_DS_CLASS(poller->dsi); + uint32_t ds_index = poller->ds_alias ?: SFL_DS_INDEX(poller->dsi); #ifdef SFL_USE_32BIT_INDEX - cs->ds_class = SFL_DS_CLASS(poller->dsi); - cs->ds_index = SFL_DS_INDEX(poller->dsi); + cs->ds_class = ds_class; + cs->ds_index = ds_index; #else - cs->source_id = SFL_DS_DATASOURCE(poller->dsi); + cs->source_id = SFL_DS_SOURCEID(ds_class, ds_index); #endif /* sent to my receiver */ if(poller->myReceiver) sfl_receiver_writeCountersSample(poller->myReceiver, cs); diff --git a/src/sflow/sflow_sampler.c b/src/sflow/sflow_sampler.c index 0aa8f84f..d89d8e9e 100644 --- a/src/sflow/sflow_sampler.c +++ b/src/sflow/sflow_sampler.c @@ -109,6 +109,12 @@ so that the sflow collector will know to ignore the next delta. */ void sfl_sampler_resetFlowSeqNo(SFLSampler *sampler) { sampler->flowSampleSeqNo = 0; } +/*_________________---------------------------------__________________ + _________________ datasource alias __________________ + -----------------_________________________________------------------ +Used where we want to export a remapped namespace for datasource index +*/ +void sfl_sampler_set_dsAlias(SFLSampler *sampler, uint32_t ds_alias) { sampler->ds_alias = ds_alias; } /*_________________---------------------------__________________ _________________ sfl_sampler_tick __________________ @@ -141,11 +147,13 @@ void sfl_sampler_writeFlowSample(SFLSampler *sampler, SFL_FLOW_SAMPLE_TYPE *fs) /* increment the sequence number */ fs->sequence_number = ++sampler->flowSampleSeqNo; /* copy the other header fields in */ + uint32_t ds_class = SFL_DS_CLASS(sampler->dsi); + uint32_t ds_index = sampler->ds_alias ?: SFL_DS_INDEX(sampler->dsi); #ifdef SFL_USE_32BIT_INDEX - fs->ds_class = SFL_DS_CLASS(sampler->dsi); - fs->ds_index = SFL_DS_INDEX(sampler->dsi); + fs->ds_class = ds_class; + fs->ds_index = ds_index; #else - fs->source_id = SFL_DS_DATASOURCE(sampler->dsi); + fs->source_id = SFL_DS_SOURCEID(ds_class, ds_index); #endif /* the sampling rate may have been set already. */ if(fs->sampling_rate == 0) fs->sampling_rate = sampler->sFlowFsPacketSamplingRate;