You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
rough sketch of my idea of a high-level API in pseudo-code:
user-defined module
-- must be done on top level to be available/defined in all threadsffi.cdef[[struct my_flow_state {
uint64_tpacket_counter;
uint64_tbyte_counter;
uint64_tfirst_seen;
uint64_tlast_seen;
uint8_tsome_flags;
uint16_tsome_fancy_data[20];
};
]]localtracker=flowtracker:new{
struct="struct my_flow_state",
ip4Handler="handleIp4Packet",
ip4TimeoutHandler="handleIp4Timeout",
-- default = ffi.new("struct my_flow_state", { other defaults go here })
}
-- this part should be wrapped by flowscope and exposed via CLI argumentslocaldev=device.config{...}
fori=0, 3do-- get from QQ or from a device queuelm.startTask(flowtracker.analyzerTask, tracker, dev:getRxQueue(i))
end-- end wrapped part-- state starts out empty if it doesn't exist yet; buf is whatever the device queue or QQ gives usfunctionhandleIp4Packet(tuple, state, buf, isFirstPacket)
-- implicit lock by TBBstate.packet_counter=state.packet_counter+1state.byte_counter=state.byte_counter+buf:getSize()
ifisFirstPacketthenstate.first_seen=time()
endstate.last_seen=time()
-- can add custom "active timeout" (like ipfix) hereendfunctionhandleIp4Timeout(tuple, state)
print("flow died, state was: %s", state) -- assume state has reasonable __tostringend
flowtracker looks like this
functionflowtracker:new(args)
-- get size of stateType and round up to something-- in C++: force template instantiation of several hashtable types (4,8,16,32,64,....?) bytes value?-- get appropriate hashtables-- check parameters herelocalobj=setmetatable(args, flowtracker)
obj.table4=C.get_table(ffi.sizeof(obj.stateType)),
obj.default=obj.defaultorffi.new(obj.stateType),
lm.startTask("__FLOWTRACKER_SWAPPER", obj)
returnobjendfunctionflowtracker:analyzer(queue)
localhandler4=_G[self.ip4Handler]
assert(handler4)
whilelm.running() dolocalbufs=-- perform the usual DPDK incantations to get packetsforbufinipairs(bufs) do-- also handle IPv4/6/whateverlocaltuple=extractTuple(buf)
-- copy-constructedlocalaccessor, new=self.table4.insertWithDefault(tuple, self.default)
handler4(tuple, accessor:getValue(), buf, new)
accessor:release()
endendend-- usual libmoon threading magic__FLOWTRACKER_ANALYZER=flowtracker.analyzerflowtracker.analyzerTask="__FLOWTRACKER_ANALYZER"-- don't forget the usual magic in __serialize for thread-stuff-- swapper goes here
The text was updated successfully, but these errors were encountered:
rough sketch of my idea of a high-level API in pseudo-code:
user-defined module
flowtracker looks like this
The text was updated successfully, but these errors were encountered: