diff --git a/src/Makefile b/src/Makefile index 4fc6913c24..4e3c38dcff 100644 --- a/src/Makefile +++ b/src/Makefile @@ -23,6 +23,7 @@ RMSRC = $(shell find . -name '*.md' -not -regex './obj.*' -printf '%P ') PROGRAM = $(shell find program -regex '^[^/]+/[^/]+' -type d -printf '%P ') # sort to eliminate potential duplicate of programs.inc INCSRC = $(sort $(shell find . -regex '[^\#]*\.inc' -printf '%P ') programs.inc) +EVTSRC = $(sort $(shell find . -regex '[^\#]*\.events' -printf '%P ')) YANGSRC= $(shell find . -regex '[^\#]*\.yang' -printf '%P ') LUAOBJ := $(patsubst %.lua,obj/%_lua.o,$(LUASRC)) @@ -35,6 +36,7 @@ JITOBJS:= $(patsubst %,obj/jit_%.o,$(JITSRC)) EXTRAOBJS := obj/jit_tprof.o obj/jit_vmprof.o obj/strict.o RMOBJS := $(patsubst %,obj/%,$(RMSRC)) INCOBJ := $(patsubst %.inc,obj/%_inc.o, $(INCSRC)) +EVTOBJ := $(patsubst %.events,obj/%_events.o, $(EVTSRC)) YANGOBJ:= $(patsubst %.yang,obj/%_yang.o, $(YANGSRC)) EXE := bin/snabb $(patsubst %,bin/%,$(PROGRAM)) @@ -54,7 +56,7 @@ TESTSCRIPTS = $(shell find . -name "selftest.*" -executable | xargs) PATH := ../lib/luajit/src:$(PATH) -snabb: $(LUAOBJ) $(PFLUAOBJ) $(HOBJ) $(COBJ) $(ASMOBJ) $(PFLUAASMOBJ) $(INCOBJ) $(YANGOBJ) $(LUAJIT_A) +snabb: $(LUAOBJ) $(PFLUAOBJ) $(HOBJ) $(COBJ) $(ASMOBJ) $(PFLUAASMOBJ) $(INCOBJ) $(EVTOBJ) $(YANGOBJ) $(LUAJIT_A) $(E) "GEN obj/version.lua.gen" $(Q) ../generate-version-lua.sh > obj/version.lua.gen $(E) "LUA obj/version.lua" @@ -77,6 +79,7 @@ $(EXE): snabb bin $(Q) upx -f --brute -o$@ snabb @echo -n "BINARY " @ls -sh $@ + markdown: $(RMOBJS) test: $(TESTMODS) $(TESTSCRIPTS) @@ -168,6 +171,13 @@ $(INCOBJ): obj/%_inc.o: %.inc Makefile | $(OBJDIR) echo "]=============]") > $(basename $@).luainc $(Q) raptorjit -bg -n $(subst /,.,$*)_inc $(basename $@).luainc $@ +$(EVTOBJ): obj/%_events.o: %.events Makefile | $(OBJDIR) + $(E) "EVENTS $@" + @(echo -n "return [=============["; \ + cat $<; \ + echo "]=============]") > $(basename $@).luainc + $(Q) raptorjit -bg -n $(subst /,.,$*)_events $(basename $@).luainc $@ + $(YANGOBJ): obj/%_yang.o: %.yang Makefile | $(OBJDIR) $(E) "YANG $@" @(echo -n "return [=============["; \ diff --git a/src/apps/interlink/freelist_instrument.lua b/src/apps/interlink/freelist_instrument.lua deleted file mode 100644 index 4070a1063d..0000000000 --- a/src/apps/interlink/freelist_instrument.lua +++ /dev/null @@ -1,40 +0,0 @@ --- Use of this source code is governed by the Apache 2.0 license; see COPYING. - -module(...,package.seeall) - -local histogram = require("core.histogram") -local tsc = require("lib.tsc") - -function instrument_freelist () - local ts = tsc.new() - local rebalance_latency = histogram.create('engine/rebalance_latency.histogram', 1, 100e6) - local reclaim_latency = histogram.create('engine/reclaim_latency.histogram', 1, 100e6) - - local rebalance_step, reclaim_step = packet.rebalance_step, packet.reclaim_step - packet.rebalance_step = function () - local start = ts:stamp() - rebalance_step() - rebalance_latency:add(tonumber(ts:to_ns(ts:stamp()-start))) - end - packet.reclaim_step = function () - local start = ts:stamp() - reclaim_step() - reclaim_latency:add(tonumber(ts:to_ns(ts:stamp()-start))) - end - - return rebalance_latency, reclaim_latency -end - -function histogram_csv_header (out) - out = out or io.stdout - out:write("histogram,lo,hi,count\n") -end - -function histogram_csv (histogram, name, out) - out = out or io.stdout - name = name or 'untitled' - for count, lo, hi in histogram:iterate() do - out:write(("%s,%f,%f,%d\n"):format(name, lo, hi, tonumber(count))) - out:flush() - end -end \ No newline at end of file diff --git a/src/apps/interlink/test_sink.lua b/src/apps/interlink/test_sink.lua index d72f072f5a..c6911599ca 100644 --- a/src/apps/interlink/test_sink.lua +++ b/src/apps/interlink/test_sink.lua @@ -13,26 +13,10 @@ function configure (c, name) config.link(c, name..".output -> sink.input") end -function start (name, duration) +function start (name, duration, core) + numa.bind_to_cpu(core, 'skip') local c = config.new() configure(c, name) engine.configure(c) engine.main{duration=duration} end - -local instr = require("apps.interlink.freelist_instrument") - -function start_instrument (name, duration, core) - numa.bind_to_cpu(core, 'skip') - local rebalance_latency = instr.instrument_freelist() - start(name, duration) - instr.histogram_csv(rebalance_latency, "rebalance") - local min, avg, max = rebalance_latency:summarize() - io.stderr:write(("(%d) rebalance latency (ns) min:%16s avg:%16s max:%16s\n") - :format(core, - lib.comma_value(math.floor(min)), - lib.comma_value(math.floor(avg)), - lib.comma_value(math.floor(max)))) - io.stderr:flush() -end - diff --git a/src/apps/interlink/test_source.lua b/src/apps/interlink/test_source.lua index ecfdd664fc..3ae31f3d86 100644 --- a/src/apps/interlink/test_source.lua +++ b/src/apps/interlink/test_source.lua @@ -20,13 +20,19 @@ function start (name, duration) engine.main{duration=duration} end -function startn (name, duration, n) +function startn (name, duration, n, core) + numa.bind_to_cpu(core, 'skip') local c = config.new() for i=1,n do configure(c, name..i) end engine.configure(c) engine.main{duration=duration} + local txpackets = txpackets() + engine.main{duration=1, no_report=true} + io.stderr:write(("%.3f Mpps\n"):format(txpackets / 1e6 / duration)) + io.stderr:flush() + --engine.report_links() end function txpackets () @@ -36,24 +42,3 @@ function txpackets () end return txpackets end - -local instr = require("apps.interlink.freelist_instrument") - -function startn_instrument (name, duration, n, core) - numa.bind_to_cpu(core, 'skip') - local _, reclaim_latency = instr.instrument_freelist() - startn(name, duration, n) - local txpackets = txpackets() - instr.histogram_csv(reclaim_latency, "reclaim") - local min, avg, max = reclaim_latency:summarize() - engine.main{duration=1, no_report=true} - io.stderr:write(("(%d) reclaim latency (ns) min:%16s avg:%16s max:%16s\n") - :format(core, - lib.comma_value(math.floor(min)), - lib.comma_value(math.floor(avg)), - lib.comma_value(math.floor(max)))) - io.stderr:write(("%.3f Mpps\n"):format(txpackets / 1e6 / duration)) - io.stderr:flush() - - --engine.report_links() -end \ No newline at end of file diff --git a/src/apps/interlink/wait_test.snabb b/src/apps/interlink/wait_test.snabb index ebe0cc0d94..1aebea3bf2 100755 --- a/src/apps/interlink/wait_test.snabb +++ b/src/apps/interlink/wait_test.snabb @@ -17,15 +17,12 @@ for core in pairs(CPUS) do table.sort(cores) end -require("apps.interlink.freelist_instrument").histogram_csv_header() -io.stdout:flush() - for i=1,NCONSUMERS do - worker.start("sink"..i, ([[require("apps.interlink.test_sink").start_instrument(%q, %d, %s)]]) + worker.start("sink"..i, ([[require("apps.interlink.test_sink").start(%q, %d, %s)]]) :format("test"..i, DURATION, cores[1+i])) end -worker.start("source", ([[require("apps.interlink.test_source").startn_instrument(%q, %d, %d, %s)]]) +worker.start("source", ([[require("apps.interlink.test_source").startn(%q, %d, %d, %s)]]) :format("test", DURATION, NCONSUMERS, assert(cores[1]))) engine.main{done = function () diff --git a/src/apps/rss/metadata.lua b/src/apps/rss/metadata.lua index 96ab01fc49..22ecec54cd 100644 --- a/src/apps/rss/metadata.lua +++ b/src/apps/rss/metadata.lua @@ -117,11 +117,6 @@ local ipv6_ext_hdr_fns = { local payload_len = ext_hdr.length return payload_len * 4 - 2, next_header end, - [59] = - -- No next header - function(ptr) - return 0, 255 - end, [60] = -- Destination ipv6_generic_ext_hdr, diff --git a/src/core/app.events b/src/core/app.events new file mode 100644 index 0000000000..be3e6d4986 --- /dev/null +++ b/src/core/app.events @@ -0,0 +1,35 @@ +1,9|started: +The app has been started. (Returned from new() callback.) + +1,9|linked: +The app has been linked. (Returned from link() callback.) + +1,9|unlinked: +The app has been unlinked. (Returned from unlink() callback.) + +1,9|reconfigured: +The app has been reconfigured. (Returned from reconfig() callback.) + +1,9|stopped: +The app has been stopped. (Returned from stop() callback.) + + +3,3|pull: +Entering app pull() callback. + +3,3|pulled: +Returned from app pull() callback. + + +3,3|push: +Entering app push() callback. + +3,3|pushed: +Returned from app push() callback. + + +3,5|tick: +Entering app tick() callback. + +3,5|ticked: +Returned from app tick() callback. diff --git a/src/core/app.lua b/src/core/app.lua index 53a3ea33b9..19dbe39a30 100644 --- a/src/core/app.lua +++ b/src/core/app.lua @@ -2,18 +2,20 @@ module(...,package.seeall) -local packet = require("core.packet") -local lib = require("core.lib") -local link = require("core.link") -local config = require("core.config") -local timer = require("core.timer") -local shm = require("core.shm") -local histogram = require('core.histogram') -local counter = require("core.counter") -local jit = require("jit") -local S = require("syscall") -local ffi = require("ffi") -local C = ffi.C +local packet = require("core.packet") +local lib = require("core.lib") +local link = require("core.link") +local config = require("core.config") +local timer = require("core.timer") +local shm = require("core.shm") +local histogram = require('core.histogram') +local counter = require("core.counter") +local timeline_mod = require("core.timeline") -- avoid collision with timeline +local jit = require("jit") +local S = require("syscall") +local ffi = require("ffi") +local C = ffi.C + require("core.packet_h") -- Packet per pull @@ -37,8 +39,44 @@ function enable_auditlog () auditlog_enabled = true end +-- Timeline event log +local timeline_log, events -- initialized on demand +function timeline () + if timeline_log == nil then + timeline_log = timeline_mod.new("events.timeline") + timeline_mod.rate(timeline_log, 9) -- initially log events with rate >= 9 + events = timeline_mod.load_events(timeline_log, "core.engine") + end + return timeline_log +end + +function randomize_log_rate () + -- Bail if timeline logging is not enabled. + if not timeline_log then return end + -- Randomize the log rate. Enable each rate in 5x more breaths + -- than the rate below by randomly picking from log5() distribution. + -- Goal is ballpark 1000 messages per second (~15min for 1M entries.) + -- + -- Could be better to reduce the log rate over time to "stretch" + -- logs for long running processes? Improvements possible :-). + -- + -- We use rates 0-9 where 9 means "log always", and 0 means "log never." + local rate = math.max(1, math.ceil(math.log(math.random(5^9))/math.log(5))) + timeline_mod.rate(timeline_log, rate) +end + +-- Breath latency histogram +local latency -- initialized on demand +function enable_latency_histogram () + if latency == nil then + latency = histogram.create('engine/latency.histogram', 1e-6, 1e0) + end +end + -- The set of all active apps and links in the system, indexed by name. app_table, link_table = {}, {} +-- Timeline events specific to app instances +app_events = setmetatable({}, { __mode = 'k' }) configuration = config.new() @@ -289,6 +327,7 @@ function compute_config_actions (old, new) end end + events.config_actions_computed() return actions end @@ -314,16 +353,22 @@ function apply_config_actions (actions) local link = app.output[linkname] app.output[linkname] = nil remove_link_from_array(app.output, link) - if app.unlink then app:unlink('output', linkname) - elseif app.link then app:link('output', linkname) end + if app.unlink then + app:unlink('output', linkname) app_events[app].unlinked() + elseif app.link then + app:link('output', linkname) app_events[app].linked() + end end function ops.unlink_input (appname, linkname) local app = app_table[appname] local link = app.input[linkname] app.input[linkname] = nil remove_link_from_array(app.input, link) - if app.unlink then app:unlink('input', linkname) - elseif app.link then app:link('input', linkname) end + if app.unlink then + app:unlink('input', linkname) app_events[app].unlinked() + elseif app.link then + app:link('input', linkname) app_events[app].linked() + end end function ops.free_link (linkspec) link.free(link_table[linkspec], linkspec) @@ -341,7 +386,9 @@ function apply_config_actions (actions) appname..": duplicate output link "..linkname) app.output[linkname] = link table.insert(app.output, link) - if app.link then app:link('output', linkname) end + if app.link then + app:link('output', linkname) app_events[app].linked() + end end function ops.link_input (appname, linkname, linkspec) local app = app_table[appname] @@ -351,13 +398,14 @@ function apply_config_actions (actions) app.input[linkname] = link table.insert(app.input, link) if app.link then - app:link('input', linkname) + app:link('input', linkname) app_events[app].linked() end end function ops.stop_app (name) local app = app_table[name] - if app.stop then app:stop() end + if app.stop then app:stop() app_events[app].stopped() end if app.shm then shm.delete_frame(app.shm) end + app_events[app] = nil app_table[name] = nil configuration.apps[name] = nil end @@ -368,6 +416,8 @@ function apply_config_actions (actions) name, tostring(app))) end local zone = app.zone or rawget(getfenv(class.new), '_NAME') or name + app_events[app] = + timeline_mod.load_events(timeline(), "core.app", {app=name}) app.appname = name app.output = {} app.input = {} @@ -388,21 +438,25 @@ function apply_config_actions (actions) end end configuration.apps[name] = { class = class, arg = arg } + app_events[app].started() end function ops.reconfig_app (name, class, arg) local app = app_table[name] - app:reconfig(arg) + app:reconfig(arg) app_events[app].reconfigured() configuration.apps[name].arg = arg end + events.configure(counter.read(configs) + 1) -- Dispatch actions. for _, action in ipairs(actions) do local name, args = unpack(action) if log then io.write("engine: ", name, " ", args[1], "\n") end assert(ops[name], name)(unpack(args)) end + events.config_applied() compute_breathe_order () + events.breathe_order_computed() end -- Sort the NODES topologically according to SUCCESSORS via @@ -526,26 +580,34 @@ function main (options) enable_auditlog() end - -- Setup vmprofile - setvmprofile("engine") + -- Ensure timeline is created and initialized + timeline() + -- Enable latency histogram unless explicitly disabled local breathe = breathe if options.measure_latency or options.measure_latency == nil then - local latency = histogram.create('engine/latency.histogram', 1e-6, 1e0) + enable_latency_histogram() breathe = latency:wrap_thunk(breathe, now) end + -- Setup vmprofile + setvmprofile("engine") + -- Enable tick enable_tick() + events.engine_started() + monotonic_now = C.get_monotonic_time() repeat breathe() - if not no_timers then timer.run() end + if not no_timers then timer.run() events.polled_timers() end if not busywait then pace_breathing() end + randomize_log_rate() -- roll random log rate until done and done() counter.commit() if not options.no_report then report(options.report) end + events.engine_stopped() -- Switch to catch-all profile setvmprofile("program") @@ -561,14 +623,18 @@ function pace_breathing () nextbreath = nextbreath or monotonic_now local sleep = tonumber(nextbreath - monotonic_now) if sleep > 1e-6 then + events.sleep_Hz(Hz, math.round(sleep*1e6)) C.usleep(sleep * 1e6) monotonic_now = C.get_monotonic_time() + events.wakeup_from_sleep() end nextbreath = math.max(nextbreath + 1/Hz, monotonic_now) else if lastfrees == counter.read(frees) then sleep = math.min(sleep + 1, maxsleep) + events.sleep_on_idle(sleep) C.usleep(sleep) + events.wakeup_from_sleep() else sleep = math.floor(sleep/2) end @@ -578,9 +644,19 @@ function pace_breathing () end end +local function enginestats () + local breaths = counter.read(breaths) + local frees = counter.read(frees) + local freebytes = counter.read(freebytes) + local freebits = counter.read(freebits) + return breaths, frees, freebytes, freebits +end + function breathe () + events.breath_start(enginestats()) running = true monotonic_now = C.get_monotonic_time() + events.got_monotonic_time(C.get_time_ns()) -- Inhale: pull work into the app network local i = 1 ::PULL_LOOP:: @@ -588,12 +664,15 @@ function breathe () if i > #breathe_pull_order then goto PULL_EXIT else local app = breathe_pull_order[i] setvmprofile(app.zone) + app_events[app].pull() app:pull() + app_events[app].pulled() end i = i+1 goto PULL_LOOP end ::PULL_EXIT:: + events.breath_pulled() -- Exhale: push work out through the app network i = 1 ::PUSH_LOOP:: @@ -602,24 +681,32 @@ function breathe () local spec = breathe_push_order[i] local app, push, link = spec.app, spec.push, spec.link setvmprofile(app.zone) + app_events[app].push() push(app, link) + app_events[app].pushed() end i = i+1 goto PUSH_LOOP end ::PUSH_EXIT:: + events.breath_pushed() -- Tick: call tick() methods at tick_Hz frequency if tick() then for _, app in ipairs(breathe_ticks) do setvmprofile(app.zone) + app_events[app].tick() app:tick() + app_events[app].ticked() end + events.breath_ticked() end setvmprofile("engine") + events.breath_end(enginestats()) counter.add(breaths) -- Commit counters at a reasonable frequency if counter.read(breaths) % 100 == 0 then counter.commit() + events.commited_counters() end running = false end @@ -704,7 +791,6 @@ function report_apps () app:report() end end - setvmprofile("engine") end function selftest () @@ -784,7 +870,7 @@ function selftest () engine.main{duration=t} local expected_ticks = t * tick_Hz local ratio = app_table.app_tick.ticks / expected_ticks - assert(ratio >= 0.9 and ratio <= 1.1) + assert(ratio >= 0.8 and ratio <= 1.1) print("ticks: actual/expected = "..ratio) -- Test link() 3.0 diff --git a/src/core/engine.events b/src/core/engine.events new file mode 100644 index 0000000000..c9c59e2cd6 --- /dev/null +++ b/src/core/engine.events @@ -0,0 +1,78 @@ +0,6|engine_started: +The engine starts the traffic processing loop. + + +1,5|breath_start: breath totalpackets totalbytes totaletherbits +The engine starts an iteration of the packet-processing event loop (a +"breath".) + +The total count of packets, bytes, and bits (including layer-1 +ethernet overhead) that the engine has processed are included. These +can be used to track the rate of traffic. + + +2,3|got_monotonic_time: unixnanos +The engine has completed initialization for the breath: synchronized +the current time and handled any pending error recovery. + +'unixnanos' is the current wall-clock time in nanoseconds since the epoc. +This can be used to synchronize the cycle timestamps with wall-clock time. + + +2,4|breath_pulled: +The engine has "pulled" new packets into the event loop for processing. + +2,4|breath_pushed: +The engine has "pushed" packets one step through the processing network. + +2,4|breath_ticked: +The engine has executed "tick" methods. + + +1,5|breath_end: breath totalpackets totalbytes totaletherbits +The engine completes an iteration of the event loop (a "breath.") + +The total count of packets, bytes, and bits (including layer-1 +ethernet overhead) that the engine has processed are included. These +can be used to track the rate of traffic. + + +1,5|commited_counters: +The engine commits the latest counter values to externally visible shared +memory. + +1,4|polled_timers: +The engine polled its timers and executed any that were expired. + + +1,4|sleep_Hz: usec Hz +The engine requests that the kernel suspend this process for a period of +microseconds in order to reduce CPU utilization and achieve a fixed +frequency of breaths per second (Hz). + +1,4|sleep_on_idle: usec +The engine requests that the kernel suspend this process for a period +of microseconds in order to reduce CPU utilization because idleness +has been detected (a breath in which no packets were processed.) + +1,4|wakeup_from_sleep: +The engine resumes operation after sleeping voluntarily. + + +0,6|engine_stopped: +The engine stops the traffic processing loop. + + +0,9|config_actions_computed: +The engine has computed the actions required for applying a new configuration. + +0,9|configure: config +The engine begins to apply a new configuration. + +'config' is the number of this configuration. + +0,9|config_applied: +The engine has applied a new configuration. + +0,9|breathe_order_computed: +The engine has computed the breath order of a new configuration. diff --git a/src/core/group_freelist.lua b/src/core/group_freelist.lua index 2d233a99b3..bec68c7201 100644 --- a/src/core/group_freelist.lua +++ b/src/core/group_freelist.lua @@ -7,7 +7,6 @@ local shm = require("core.shm") local lib = require("core.lib") local ffi = require("ffi") -local waitfor, compiler_barrier = lib.waitfor, lib.compiler_barrier local band = bit.band -- Group freelist: lock-free multi-producer multi-consumer ring buffer @@ -15,7 +14,6 @@ local band = bit.band -- -- https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue -- --- NB: assumes 32-bit wide loads/stores are atomic (as is the fact on x86_64)! -- Group freelist holds up to n chunks of chunksize packets each chunksize = 2048 @@ -24,21 +22,22 @@ chunksize = 2048 local default_size = 1024 -- must be a power of two local CACHELINE = 64 -- XXX - make dynamic -local INT = ffi.sizeof("uint32_t") +local QWORD = ffi.sizeof("uint64_t") ffi.cdef([[ struct group_freelist_chunk { - uint32_t sequence[1], nfree; + uint64_t sequence[1]; + uint32_t nfree, pad; struct packet *list[]]..chunksize..[[]; } __attribute__((packed))]]) ffi.cdef([[ struct group_freelist { - uint32_t enqueue_pos[1], enqueue_mask; - uint8_t pad_enqueue_pos[]]..CACHELINE-2*INT..[[]; + uint64_t enqueue_pos[1], enqueue_mask; + uint8_t pad_enqueue_pos[]]..CACHELINE-2*QWORD..[[]; - uint32_t dequeue_pos[1], dequeue_mask; - uint8_t pad_dequeue_pos[]]..CACHELINE-2*INT..[[]; + uint64_t dequeue_pos[1], dequeue_mask; + uint8_t pad_dequeue_pos[]]..CACHELINE-2*QWORD..[[]; uint32_t size, state[1]; @@ -70,63 +69,57 @@ end function freelist_open (name, readonly) local fl = shm.open(name, "struct group_freelist", 'read-only', 1) - waitfor(function () return fl.state[0] == READY end) + lib.waitfor(function () return sync.load(fl.state) == READY end) local size = fl.size shm.unmap(fl) return shm.open(name, "struct group_freelist", readonly, size) end function start_add (fl) - local pos = fl.enqueue_pos[0] + local pos = sync.load64(fl.enqueue_pos) local mask = fl.enqueue_mask while true do local chunk = fl.chunk[band(pos, mask)] - local seq = chunk.sequence[0] - local dif = seq - pos + local seq = sync.load64(chunk.sequence) + local dif = ffi.cast("int64_t", seq) - ffi.cast("int64_t", pos) if dif == 0 then - if sync.cas(fl.enqueue_pos, pos, pos+1) then + if sync.cas64(fl.enqueue_pos, pos, pos+1) then return chunk, pos+1 end elseif dif < 0 then return else - compiler_barrier() -- ensure fresh load of enqueue_pos - pos = fl.enqueue_pos[0] + pos = sync.load64(fl.enqueue_pos) end end end function start_remove (fl) - local pos = fl.dequeue_pos[0] + local pos = sync.load64(fl.dequeue_pos) local mask = fl.dequeue_mask while true do local chunk = fl.chunk[band(pos, mask)] - local seq = chunk.sequence[0] - local dif = seq - (pos+1) + local seq = sync.load64(chunk.sequence) + local dif = ffi.cast("int64_t", seq) - ffi.cast("int64_t", pos+1) if dif == 0 then - if sync.cas(fl.dequeue_pos, pos, pos+1) then + if sync.cas64(fl.dequeue_pos, pos, pos+1) then return chunk, pos+mask+1 end elseif dif < 0 then return else - compiler_barrier() -- ensure fresh load of dequeue_pos - pos = fl.dequeue_pos[0] + pos = sync.load64(fl.dequeue_pos) end end end function finish (chunk, seq) - chunk.sequence[0] = seq + assert(sync.cas64(chunk.sequence, chunk.sequence[0], seq)) end local function occupied_chunks (fl) local enqueue, dequeue = fl.enqueue_pos[0], fl.dequeue_pos[0] - if dequeue > enqueue then - return enqueue + fl.size - dequeue - else - return enqueue - dequeue - end + return tonumber(enqueue - dequeue) end -- Register struct group_freelist as an abstract SHM object type so that diff --git a/src/core/main.lua b/src/core/main.lua index c6ca9f00e8..b7764a066d 100644 --- a/src/core/main.lua +++ b/src/core/main.lua @@ -28,7 +28,7 @@ end -- Reserve names that we want to use for global module. -- (This way we avoid errors from the 'strict' module.) -_G.config, _G.engine, _G.memory, _G.link, _G.packet, _G.timer, +_G.config, _G.engine, _G.memory, _G.link, _G.packet, _G.timer, _G.timeline, _G.main = nil ffi.cdef[[ @@ -164,6 +164,7 @@ function initialize () _G.link = require("core.link") _G.packet = require("core.packet"); _G.packet.initialize() _G.timer = require("core.timer") + _G.timeline = require("core.timeline") _G.main = getfenv() end diff --git a/src/core/packet.events b/src/core/packet.events new file mode 100644 index 0000000000..8f502e61cb --- /dev/null +++ b/src/core/packet.events @@ -0,0 +1,30 @@ +9,9|packets_preallocated: packets +DMA memory for packets has been preallocated from the operating system. + +'packets' is the number of packets for which space has been reserved. + +9,1|packet_allocated: +A packet has been allocated from the packet freelist. + +9,1|packet_freed: length +A packet has been freed to the packet freelist. + +'length' is the byte size of the packet. + + +9,4|freelist_empty: +The process is trying to allocate a packet but the freelist is empty. + +9,4|freelist_need_rebalance: +The process freed a packet. There are now too many packets on the +local freelist which need to be expelled to the group freelist. + +9,4|group_freelist_released: packets +The packet freelist was rebalanced with the group freelist. + +'packets' is the number of packets released to the group freelist. + +9,4|group_freelist_reclaimed: packets +The packet freelist was refilled from the group freelist. + +'packets' is the number of packets reclaimed from the group freelist. diff --git a/src/core/packet.lua b/src/core/packet.lua index 7c9a7e34ca..523200c311 100644 --- a/src/core/packet.lua +++ b/src/core/packet.lua @@ -12,6 +12,7 @@ local lib = require("core.lib") local memory = require("core.memory") local shm = require("core.shm") local counter = require("core.counter") +local timeline = require("core.timeline") require("core.packet_h") @@ -105,7 +106,7 @@ end local packet_allocation_step = 1000 local packets_allocated = 0 -- Initialized on demand. -local packets_fl, group_fl +local packets_fl, group_fl, events -- Call to ensure packet freelist is enabled. function initialize (max_packets) @@ -115,6 +116,10 @@ function initialize (max_packets) shm.unlink("engine/packets.freelist") end packets_fl = freelist_create("engine/packets.freelist", max_packets) + + if not events then + events = timeline.load_events(engine.timeline(), "core.packet") + end end -- Call to ensure group freelist is enabled. @@ -130,7 +135,7 @@ end local group_fl_chunksize = group_freelist.chunksize -- Return borrowed packets to group freelist. -function rebalance_step () +local function rebalance_step () local chunk, seq = group_freelist.start_add(group_fl) if chunk then chunk.nfree = group_fl_chunksize @@ -141,14 +146,15 @@ function rebalance_step () else error("group freelist overflow") end + events.group_freelist_released(group_fl_chunksize) end -function need_rebalance () +local function need_rebalance () return freelist_nfree(packets_fl) >= (packets_allocated + group_fl_chunksize) end -- Reclaim packets from group freelist. -function reclaim_step () +local function reclaim_step () local chunk, seq = group_freelist.start_remove(group_fl) if chunk then for i=0, chunk.nfree-1 do @@ -156,6 +162,7 @@ function reclaim_step () end group_freelist.finish(chunk, seq) end + events.group_freelist_reclaimed(group_fl_chunksize) end -- Register struct freelist as an abstract SHM object type so that the @@ -168,6 +175,7 @@ end}) -- Return an empty packet. function allocate () if freelist_nfree(packets_fl) == 0 then + events.freelist_empty() if group_fl then reclaim_step() end @@ -175,6 +183,7 @@ function allocate () preallocate_step() end end + events.packet_allocated() return freelist_remove(packets_fl) end @@ -299,9 +308,11 @@ end local free_internal, account_free = free_internal, account_free function free (p) + events.packet_freed(p.length) account_free(p) free_internal(p) if group_fl and need_rebalance() then + events.freelist_need_rebalance() rebalance_step() end end @@ -324,6 +335,7 @@ function preallocate_step() end packets_allocated = packets_allocated + packet_allocation_step packet_allocation_step = 2 * packet_allocation_step + events.packets_preallocated(packet_allocation_step) end function selftest () diff --git a/src/core/sync.dasl b/src/core/sync.dasl index ac3823c5d6..8f839c926e 100644 --- a/src/core/sync.dasl +++ b/src/core/sync.dasl @@ -9,20 +9,46 @@ local ffi = require("ffi") | .actionlist actions | .globalnames globalnames --- This module happens to use 32-bit arguments only. -|.define arg1, edi -|.define arg2, esi -|.define arg3, edx +|.define darg1, edi +|.define darg2, esi +|.define darg3, edx +|.define qarg1, rdi +|.define qarg2, rsi +|.define qarg3, rdx + +-- load(src) -> uint32_t +-- Load integer from src. +local load_t = "uint32_t (*) (uint32_t *)" +local function load (Dst) + | mov eax, [qarg1] + | ret +end + +-- load64(src) -> uint64_t +local load64_t = "uint64_t (*) (uint64_t *)" +local function load64 (Dst) + | mov rax, [qarg1] + | ret +end -- cas(dst, old, new) -> true|false -- Atomic compare-and-swap; compare old with value pointed to by dst. If -- equal, stores new at dst and returns true. Else, returns false. local cas_t = "bool (*) (int *, int, int)" local function cas (Dst) - | mov eax, arg2 - | lock; cmpxchg [arg1], arg3 -- compare-and-swap; sets ZF flag on success - | mov eax, 0 -- clear eax for return value - | setz al -- set eax to 1 (true) if ZF is set + | mov eax, darg2 + | lock; cmpxchg [qarg1], darg3 -- compare-and-swap; sets ZF flag on success + | mov eax, 0 -- clear eax for return value + | setz al -- set eax to 1 (true) if ZF is set + | ret +end + +local cas64_t = "bool (*) (uint64_t *, uint64_t, uint64_t)" +local function cas64 (Dst) + | mov rax, qarg2 + | lock; cmpxchg [qarg1], qarg3 -- compare-and-swap; sets ZF flag on success + | mov eax, 0 -- clear eax for return value + | setz al -- set eax to 1 (true) if ZF is set | ret end @@ -34,29 +60,38 @@ local lock_t = "void (*) (int *)" local function lock (Dst) -- attempt to acquire | mov eax, 1 - | xchg eax, [arg1] + | xchg eax, [qarg1] | test eax, eax -- was it 0 (unlocked)? | jnz >1 -- no, go spin | ret -- spin |1: | pause - | cmp dword [arg1], 1 -- does it look locked? + | cmp dword [qarg1], 1 -- does it look locked? | je <1 -- spin if it does | jmp ->lock -- otherwise try to acquire end local unlock_t = "void (*) (int *)" local function unlock (Dst) - | mov dword [arg1], 0 + | mov dword [qarg1], 0 | ret end local function generate (Dst) Dst:growpc(16) | .align 16 + |->load: + || load(Dst) + | .align 16 + |->load64: + || load64(Dst) + | .align 16 |->cas: || cas(Dst) | .align 16 + |->cas64: + || cas64(Dst) + | .align 16 |->lock: || lock(Dst) | .align 16 @@ -75,12 +110,38 @@ end local entry = dasm.globals(globals, globalnames) local sync = { + load = ffi.cast(load_t, entry.load), + load64 = ffi.cast(load64_t, entry.load64), cas = ffi.cast(cas_t, entry.cas), + cas64 = ffi.cast(cas64_t, entry.cas64), lock = ffi.cast(lock_t, entry.lock), unlock = ffi.cast(unlock_t, entry.unlock) } sync.selftest = function () + -- load + local box = ffi.new( + "struct { uint32_t pad1, state[1], pad2; } __attribute__((packed))" + ) + local state = sync.load(box.state) + assert(state == 0) + for i=1,100 do + box.state[0] = state + 1 + state = sync.load(box.state) + assert(state == i) + end + local box = ffi.new( + "struct { uint64_t pad1, state[1], pad2; } __attribute__((packed))" + ) + local state = sync.load64(box.state) + assert(state == 0) + for i=1,100 do + box.state[0] = state + 1 + state = sync.load64(box.state) + assert(state == i) + end + box.state[0] = 0x5555555555555555 + assert(sync.load64(box.state) == 0x5555555555555555) -- cas local box = ffi.new( "struct { unsigned int pad1, state[1], pad2; } __attribute__((packed))" @@ -92,6 +153,16 @@ sync.selftest = function () and box.state[0] == 2147483648 and box.pad1 == 0 and box.pad2 == 0) + local box = ffi.new( + "struct { uint64_t pad1, state[1], pad2; } __attribute__((packed))" + ) + assert(sync.cas64(box.state, 0, 1) and box.state[0] == 1) + assert(not sync.cas64(box.state, 0, 2) and box.state[0] == 1) + assert(sync.cas64(box.state, 1, 2) and box.state[0] == 2) + assert(sync.cas64(box.state, 2, 0x5555555555555555) + and box.state[0] == 0x5555555555555555 + and box.pad1 == 0 + and box.pad2 == 0) -- lock / unlock local spinlock = ffi.new("int[1]") sync.lock(spinlock) diff --git a/src/core/timeline.dasl b/src/core/timeline.dasl new file mode 100644 index 0000000000..b384131a93 --- /dev/null +++ b/src/core/timeline.dasl @@ -0,0 +1,317 @@ +-- timeline: high-resolution event log using in-memory ring buffer +-- Use of this source code is governed by the Apache 2.0 license; see COPYING. + +module(..., package.seeall) + +local dasm = require("dasm") +local ffi = require("ffi") +local C = ffi.C +local S = require("syscall") +local shm = require("core.shm") +local lib = require("core.lib") + +-- Set to true to enable timeline logging +enabled = (lib.getenv("SNABB_TIMELINE_ENABLE") and true) or false + +-- Load a set of events for logging onto a timeline. +-- Returns a set of logging functions. +-- +-- For example: +-- e = load_events(engine.timeline, "core.app", {name="myapp", class="intel10g"}) +-- Loads the events defined in src/core/app.events and tags each event +-- with the name of the app and class. Events can then be logged: +-- e:app_pulled(inpackets, inbytes, outpackets, outbytes) +function load_events (tl, eventmodule, extra) + local category = eventmodule:match("[^.]+$") -- "core.engine" -> "engine" + -- Convert extra into " key1=value1 key2=value2 ..." attributes string. + local spec = require(eventmodule.."_events") + return load_events_from_string(tl, spec, category, extra) +end + +-- (Helper function) +function load_events_from_string (tl, spec, category, extra) + local events = {} + -- Insert a delimiter character (\a "alarm") between log messages. + spec = spec:gsub("\n(%d,%d|)", "\n\a%1") + for message in spec:gmatch("[^\a]+") do + message = message:gsub("(.-)%s*$", "%1") -- trim trailing spaces + local event = message:match("([%w_]+):") + events[event] = mkevent(tl, category, message, extra) + end + -- Return the set of functions in an efficient-to-call FFI object. + local mt = {__index = events} + return ffi.new(ffi.metatype(ffi.typeof("struct{}"), mt)) +end + +------------------------------------------------------------ +-- Binary data structures + +ffi.cdef[[ + // 64B file header + struct timeline_header { + uint64_t magic; + uint16_t major_version; + uint16_t minor_version; + uint32_t log_bytes; + uint32_t strings_bytes; + uint8_t reserved[44]; + }; + + // 64B log entry + struct timeline_entry { + double tsc; // CPU timestamp (note: assumed to be first elem below) + uint16_t msgid; // msgid*16 is index into string table + uint16_t core_numa; // TSC_AUX: core (bits 0-7) + numa (12-15) + uint32_t reserved; // (available for future use) + double arg0, arg1, arg2, arg3, arg4, arg5; // message arguments + }; + + // Private local state for updating the log + struct timeline_state { + // state for the entries ring buffer + struct timeline_entry *entries; + uint32_t rate; + uint32_t next_entry; + uint32_t num_entries; + // state for the string table + char *stringtable; + int stringtable_size; + int next_string; + }; +]] + +-- Header of the log file +local magic = 0xa3ff7223441d0001ULL +local major, minor = 2, 1 + +------------------------------------------------------------ +-- API + +-- Create a new timeline under the given shared memory path. +function new (shmpath, num_entries, size_stringtable) + if not enabled then return false end + num_entries = num_entries or 1e6 + size_stringtable = size_stringtable or 1e6 + -- Calculate size based on number of log entries + local size_header = ffi.sizeof("struct timeline_header") + local size_entries = num_entries * ffi.sizeof("struct timeline_entry") + local size = size_header + size_entries + size_stringtable + -- Allocate one shm object with memory for all data structures + local memory = shm.create(shmpath, ffi.typeof("char["..size.."]")) + local header = ffi.cast("struct timeline_header *", memory) + local ring = ffi.cast("struct timeline_entry *", memory + size_header) + local stringtable = ffi.cast("char*", memory + size_header + size_entries) + -- Fill in header values + header.magic = 0xa3ff7223441d0001ULL + header.major_version = 3 + header.minor_version = 0 + header.log_bytes = size_entries + header.strings_bytes = size_stringtable + -- Private state + local state = ffi.new("struct timeline_state") + state.entries = ring + state.rate = 0 + state.next_entry = 0 + state.num_entries = num_entries + state.stringtable = stringtable + state.stringtable_size = size_stringtable + state.next_string = 0 + -- Return an object + return state +end + +function mkevent (timeline, category, message, attrs) + if not message:match("^%d,%d|([^:]+):") then + error(("event syntax error: %q"):format(message)) + end + -- Extract the sampling rate for the message + local rate = tonumber(message:match("^%d,(%d)|")) + -- Insert the category ("0,3|event:" -> "0,3|category.event:") + message = message:gsub("|", "|"..category..".", 1) + -- Insert the additional attributes. + -- e.g. "1|foo: arg" with {a1="x",a2="y"} becomes "1|foo a1=x a2=y: arg" + for k,v in pairs(attrs or {}) do + message = message:gsub(":", (" %s=%s:"):format(k, v), 1) + end + -- Count the number of arguments. + -- (See http://stackoverflow.com/a/11158158/1523491) + local _, n = (message:match(":([^\n]*)")):gsub("[^%s]+","") + assert(n >= 0 and n <= 6, "illegal number of arguments: "..n) + if not enabled then return function () end end + local id = intern(timeline, message) + local event = event -- move asm function into local scope + local log = timeline + if n==0 then return function () event(log,rate,id,0,0,0,0,0,0) end end + if n==1 then return function (a) event(log,rate,id,a,0,0,0,0,0) end end + if n==2 then return function (a,b) event(log,rate,id,a,b,0,0,0,0) end end + if n==3 then return function (a,b,c) event(log,rate,id,a,b,c,0,0,0) end end + if n==4 then return function (a,b,c,d) event(log,rate,id,a,b,c,d,0,0) end end + if n==5 then return function (a,b,c,d,e) event(log,rate,id,a,b,c,d,e,0) end end + if n==6 then return function (a,b,c,d,e,f) event(log,rate,id,a,b,c,d,e,f) end end +end + +-- Get or set the current timeline log rate. +function rate (timeline, rate) + if not enabled then return 1/0 end + if rate then timeline.rate = rate end + return timeline.rate +end + +------------------------------------------------------------ +-- Defining log message formats + +-- Intern a string in the timeline stringtable. +-- Return a unique ID (16-bit offset in 16-byte words) or 0xFFFF if +-- the table is full. + +-- Cache known strings in a weak table keyed on timeline object. +-- (Timeline object is an FFI struct that can't contain a Lua tables.) +local known = setmetatable({}, {__mode='k'}) + +function intern (timeline, str) + known[timeline] = known[timeline] or {} + if known[timeline][str] then + return known[timeline][str] + end + local len = #str+1 -- count null terminator + if timeline.next_string + len >= timeline.stringtable_size then + return 0xFFFF -- overflow + else + local position = timeline.next_string + ffi.copy(timeline.stringtable + position, str) + timeline.next_string = lib.align(position + len, 16) + local id = position/16 + assert(id == math.floor(id), "timeline string alignment error") + known[timeline][str] = id + return id + end +end + +------------------------------------------------------------ +-- Logging messages + +|.arch x64 +|.actionlist actions +|.globalnames globalnames + + +-- Registers holding function parameters for x86-64 calling convention. +|.define p0, rdi +|.define p1, rsi +|.define p2, rdx +|.define p3, rcx +|.define p4, r8 +|.define p5, r9 + +|.type log, struct timeline_state +|.type msg, struct timeline_entry +-- void log(timeline, rate, msg, arg0, ..., arg5) +local function asmlog (Dst) + |->log: + -- Check that the enabled log rate is >= the event log rate + | mov eax, log:p0->rate + | cmp p1, rax + | jge >1 + | ret + |1: + -- Load index to write into r11 + | mov r11d, log:p0->next_entry + -- Increment next index and check for wrap-around + | mov eax, r11d + | add eax, 1 + | xor ecx, ecx + | cmp eax, log:p0->num_entries + | cmove eax, ecx + | mov log:p0->next_entry, eax + -- Convert log entry number to pointer + | shl r11, 6 -- 64B element number -> byte index + | mov r10, log:p0->entries + | add r10, r11 + -- Log the arguments from register parameters + | mov msg:r10->msgid, dx + | movsd qword msg:r10->arg0, xmm0 + | movsd qword msg:r10->arg1, xmm1 + | movsd qword msg:r10->arg2, xmm2 + | movsd qword msg:r10->arg3, xmm3 + | movsd qword msg:r10->arg4, xmm4 + | movsd qword msg:r10->arg5, xmm5 + -- Log the timestamp and core/numa aux info + | rdtscp + | mov msg:r10->core_numa, cx + -- Convert TSC in EAX:EDX to double + | shl rdx, 32 + | or rax, rdx + | cvtsi2sd xmm0, rax + | movsd qword msg:r10->tsc, xmm0 + + | ret +end + +local Dst, globals = dasm.new(actions, nil, nil, 1 + #globalnames) +asmlog(Dst) +local mcode, size = Dst:build() +local entry = dasm.globals(globals, globalnames) + +event = ffi.cast("void(*)(struct timeline_state *, int, int, double, double, double, double, double, double)", entry.log) + +_anchor = mcode + +--dasm.dump(mcode, size) + +local test_events = [[ +0,6|six: +event with rate 6 (0 args) + +0,5|five: a b c +event with rate 5 (3 args) + +0,4|four: a b c d e f +event with rate 4 (6 args) + +0,3|three: +event with rate 3 (0 args) +]] + +-- selftest is designed mostly to check that timeline logging does not +-- crash the snabb process e.g. including overflow of the log entries +-- and the string table. it does not verify the contents of the log +-- messages. +function selftest () + print("selftest: timeline") + + enabled = true -- enable timeline + + local tl = new("selftest/timeline") + local e = load_events_from_string(tl, test_events, "selftest", + {module="timeline", func="selftest"}) + rate(tl, 4) -- won't log event three + + print("check logging individual messages") + -- First check that log entries are created + assert(tl.next_entry == 0) + e.six() assert(tl.next_entry == 1) + e.five(1, 2, 3) assert(tl.next_entry == 2) + e.four(1, 2, 3, 4, 5, 6) assert(tl.next_entry == 3) + e.three() assert(tl.next_entry == 3) -- skipped + + local n = tl.num_entries*10 + print("check wrap-around on "..lib.comma_value(n).." events") + for i = 1, n do + e.six() + e.five(1, 2, 3) + e.four(1, 2, 3, 4, 5, 6) + e.three() + end + -- overflow the string table + print("overflowing string table") + for i = 1, 1e5 do + mkevent(tl, "selftest", "0,9|dummy_event_definition:", {i=i}) + end + -- report median logging time + local sample = {} + for i = 1, 1000 do sample[i] = tl.entries[i].tsc - tl.entries[i-1].tsc end + table.sort(sample) + print("median time delta for sample:", tonumber(sample[500]).." cycles") + print("selftest: ok") +end + diff --git a/src/dasm_x86.lua b/src/dasm_x86.lua index 0e4f874add..3714187dd7 100644 --- a/src/dasm_x86.lua +++ b/src/dasm_x86.lua @@ -1233,6 +1233,7 @@ local map_op = { shrd_3 = "mriqdw:0FACRmU|mrC/qq:0FADRm|mrC/dd:|mrC/ww:", rdtsc_0 = "0F31", -- P1+ + rdtscp_0 = "0F01F9",-- P6+ rdpmc_0 = "0F33", -- P6+ cpuid_0 = "0FA2", -- P1+ diff --git a/src/lib/interlink.lua b/src/lib/interlink.lua index 1dc26fb0db..62ace97390 100644 --- a/src/lib/interlink.lua +++ b/src/lib/interlink.lua @@ -298,7 +298,7 @@ local function describe (r) [DOWN] = "deallocating" })[r.state[0]] end - return ("%d/%d (%s)"):format(queue_fill(r), size - 1, status(r)) + return ("%d/%d (%s)"):format(queue_fill(r), r.size, status(r)) end ffi.metatype("struct interlink", {__tostring=describe}) diff --git a/src/lib/poptrie.lua b/src/lib/poptrie.lua index 2367eca46e..109a89072c 100644 --- a/src/lib/poptrie.lua +++ b/src/lib/poptrie.lua @@ -103,17 +103,23 @@ end -- Extract bits at offset -- key=uint8_t[?] function extract (key, offset, length) - local bits, read = 0, 0 - local byte = math.floor(offset/8) - while read < length do - offset = math.max(offset - byte*8, 0) - local nbits = math.min(length - read, 8 - offset) - local x = band(rshift(key[byte], offset), lshift(1, nbits) - 1) - bits = bor(bits, lshift(x, read)) - read = read + nbits - byte = math.min(byte + 1, ffi.sizeof(key) - 1) - end - return bits + local bits = 0 + local skip = math.floor(offset/8) + offset = offset - skip*8 + for i = skip, 15 do + for j = 7, 0, -1 do + if offset == 0 then + local bit = rshift(band(key[i], lshift(1, j)), j) + length = length - 1 + bits = bor(bits, lshift(bit, length)) + if length == 0 then + return bits + end + else + offset = offset - 1 + end + end + end end -- Add key/value pair to RIB (intermediary binary trie) @@ -153,6 +159,7 @@ end -- Map f over keys of length in RIB function Poptrie:rib_map (f, length, root) + local bit = lshift(1, length-1) local function map (node, offset, key, value) value = (node and node.value) or value local left, right = node and node.left, node and node.right @@ -160,7 +167,7 @@ function Poptrie:rib_map (f, length, root) f(key, value, (left or right) and node) else map(left, offset + 1, key, value) - map(right, offset + 1, bor(key, lshift(1, offset)), value) + map(right, offset + 1, bor(key, rshift(bit, offset)), value) end end return map(root or self.rib, 0, 0) @@ -384,80 +391,250 @@ function selftest () for i = 1, 16 do bs[i] = math.random(256) - 1 end return s(unpack(bs)) end - -- To test direct pointing: direct_pointing = true - local t = new{} - -- Tets building empty RIB - t:build() - -- Test RIB - t:add(s(0x00), 8, 1) -- 00000000 - t:add(s(0x0F), 8, 2) -- 00001111 - t:add(s(0x07), 4, 3) -- 0111 - t:add(s(0xFF), 8, 4) -- 11111111 - t:add(s(0xFF), 5, 5) -- 11111 - -- 111111111111111111111111111111111111111111111111111111111111111111110000 - t:add(s(0xF0,0xFF,0xFF,0xFF,0xFF,0xFF,0xFF,0xFF,0x0F), 72, 6) - local v, n = t:rib_lookup(s(0x0), 1) - assert(not v and n.left and not n.right) - local v, n = t:rib_lookup(s(0x00), 8) - assert(v == 1 and not n) - local v, n = t:rib_lookup(s(0x07), 3) - assert(not v and (n.left and n.right)) - local v, n = t:rib_lookup(s(0x0), 1, n) - assert(v == 3 and not n) - local v, n = t:rib_lookup(s(0xFF), 5) - assert(v == 5 and (not n.left) and n.right) - local v, n = t:rib_lookup(s(0x0F), 3, n) - assert(v == 4 and not n) - local v, n = t:rib_lookup(s(0x3F), 8) - assert(v == 5 and not n) - local v, n = t:rib_lookup(s(0xF0,0xFF,0xFF,0xFF,0xFF,0xFF,0xFF,0xFF,0x0F), 72) - assert(v == 6 and not n) - -- Test FIB - t:build() - if debug then t:fib_info() end - assert(t:lookup(s(0x00)) == 1) -- 00000000 - assert(t:lookup(s(0x03)) == 0) -- 00000011 - assert(t:lookup(s(0x07)) == 3) -- 00000111 - assert(t:lookup(s(0x0F)) == 2) -- 00001111 - assert(t:lookup(s(0x1F)) == 5) -- 00011111 - assert(t:lookup(s(0x3F)) == 5) -- 00111111 - assert(t:lookup(s(0xFF)) == 4) -- 11111111 - assert(t:lookup(s(0xF0,0xFF,0xFF,0xFF,0xFF,0xFF,0xFF,0xFF,0x0F)) == 6) - assert(t:lookup32(s(0x00)) == 1) - assert(t:lookup32(s(0x03)) == 0) - assert(t:lookup32(s(0x07)) == 3) - assert(t:lookup32(s(0x0F)) == 2) - assert(t:lookup32(s(0x1F)) == 5) - assert(t:lookup32(s(0x3F)) == 5) - assert(t:lookup32(s(0xFF)) == 4) - assert(t:lookup64(s(0x00)) == 1) - assert(t:lookup64(s(0x03)) == 0) - assert(t:lookup64(s(0x07)) == 3) - assert(t:lookup64(s(0x0F)) == 2) - assert(t:lookup64(s(0x1F)) == 5) - assert(t:lookup64(s(0x3F)) == 5) - assert(t:lookup64(s(0xFF)) == 4) - assert(t:lookup128(s(0x00)) == 1) - assert(t:lookup128(s(0x03)) == 0) - assert(t:lookup128(s(0x07)) == 3) - assert(t:lookup128(s(0x0F)) == 2) - assert(t:lookup128(s(0x1F)) == 5) - assert(t:lookup128(s(0x3F)) == 5) - assert(t:lookup128(s(0xFF)) == 4) - assert(t:lookup128(s(0xF0,0xFF,0xFF,0xFF,0xFF,0xFF,0xFF,0xFF,0x0F)) == 6) - -- Test 32-bit leaves - local t = new{direct_pointing=true, s=8, leaf_t=ffi.typeof("uint32_t")} - t:add(s(0xff,0x00), 9, 0xffffffff) - t:add(s(0xff,0x01), 9, 0xfffffffe) - t:build() - assert(t:lookup(s(0xff,0x00)) == 0xffffffff) - assert(t:lookup(s(0xff,0x01)) == 0xfffffffe) - assert(t:lookup32(s(0xff,0x00)) == 0xffffffff) - assert(t:lookup32(s(0xff,0x01)) == 0xfffffffe) - assert(t:lookup64(s(0xff,0x00)) == 0xffffffff) - assert(t:lookup64(s(0xff,0x01)) == 0xfffffffe) - assert(t:lookup128(s(0xff,0x00)) == 0xffffffff) - assert(t:lookup128(s(0xff,0x01)) == 0xfffffffe) + + for _, leaf_t in ipairs{"uint16_t", "uint32_t"} do + for _, direct_pointing in ipairs{false, true} do + print("unit tests with:") + print("", "direct_pointing", direct_pointing) + print("", "leaf_t", leaf_t) + + local t = new{direct_pointing=direct_pointing, leaf_t=ffi.typeof(leaf_t)} + t:add(s(128,178,0,0), 15, 559) + local v, n = t:rib_lookup(s(128,178,0,1), 32) + assert(t:rib_lookup(s(128,178,0,1), 32) == 559) + assert(t:rib_lookup(s(128,179,0,1), 32) == 559) + t:build() + assert(t:lookup(s(128,178,0,1)) == 559) + assert(t:lookup(s(128,179,0,1)) == 559) + assert(t:lookup32(s(128,178,0,1)) == 559) + assert(t:lookup32(s(128,179,0,1)) == 559) + assert(t:lookup(s(128,178,0,1)) == 559) + assert(t:lookup(s(128,179,0,1)) == 559) + assert(t:lookup64(s(128,178,0,1)) == 559) + assert(t:lookup64(s(128,179,0,1)) == 559) + assert(t:lookup(s(128,178,0,1)) == 559) + assert(t:lookup(s(128,179,0,1)) == 559) + assert(t:lookup128(s(128,178,0,1)) == 559) + assert(t:lookup128(s(128,179,0,1)) == 559) + + local t = new{direct_pointing=direct_pointing, leaf_t=ffi.typeof(leaf_t)} + t:add(s(128,0,0,0), 7, 7) + t:add(s(128,178,0,0), 15, 15) + t:add(s(128,178,154,160), 27, 27) + t:add(s(128,178,154,196), 31, 31) + t:build() + -- /7 + assert(t:rib_lookup(s(128,0,0,0), 32) == 7) + assert(t:rib_lookup(s(128,0,0,1), 32) == 7) + assert(t:rib_lookup(s(129,0,0,1), 32) == 7) + assert(t:rib_lookup(s(127,0,0,1), 32) == nil) + assert(t:rib_lookup(s(130,0,0,1), 32) == nil) + assert(t:lookup(s(128,0,0,0)) == 7) + assert(t:lookup(s(128,0,0,1)) == 7) + assert(t:lookup(s(129,0,0,1)) == 7) + assert(t:lookup(s(127,0,0,1)) == 0) + assert(t:lookup(s(130,0,0,1)) == 0) + assert(t:lookup32(s(128,0,0,0)) == 7) + assert(t:lookup32(s(128,0,0,1)) == 7) + assert(t:lookup32(s(129,0,0,1)) == 7) + assert(t:lookup32(s(127,0,0,1)) == 0) + assert(t:lookup32(s(130,0,0,1)) == 0) + assert(t:lookup64(s(128,0,0,0)) == 7) + assert(t:lookup64(s(128,0,0,1)) == 7) + assert(t:lookup64(s(129,0,0,1)) == 7) + assert(t:lookup64(s(127,0,0,1)) == 0) + assert(t:lookup64(s(130,0,0,1)) == 0) + assert(t:lookup128(s(128,0,0,0)) == 7) + assert(t:lookup128(s(128,0,0,1)) == 7) + assert(t:lookup128(s(129,0,0,1)) == 7) + assert(t:lookup128(s(127,0,0,1)) == 0) + assert(t:lookup128(s(130,0,0,1)) == 0) + -- /15 + assert(t:rib_lookup(s(128,178,0,0), 32) == 15) + assert(t:rib_lookup(s(128,178,0,1), 32) == 15) + assert(t:rib_lookup(s(128,179,0,1), 32) == 15) + assert(t:rib_lookup(s(128,177,0,1), 32) == 7) + assert(t:rib_lookup(s(128,180,0,1), 32) == 7) + assert(t:lookup(s(128,178,0,0)) == 15) + assert(t:lookup(s(128,178,0,1)) == 15) + assert(t:lookup(s(128,179,0,1)) == 15) + assert(t:lookup(s(128,177,0,1)) == 7) + assert(t:lookup(s(128,180,0,1)) == 7) + assert(t:lookup32(s(128,178,0,0)) == 15) + assert(t:lookup32(s(128,178,0,1)) == 15) + assert(t:lookup32(s(128,179,0,1)) == 15) + assert(t:lookup32(s(128,177,0,1)) == 7) + assert(t:lookup32(s(128,180,0,1)) == 7) + assert(t:lookup64(s(128,178,0,0)) == 15) + assert(t:lookup64(s(128,178,0,1)) == 15) + assert(t:lookup64(s(128,179,0,1)) == 15) + assert(t:lookup64(s(128,177,0,1)) == 7) + assert(t:lookup64(s(128,180,0,1)) == 7) + assert(t:lookup128(s(128,178,0,0)) == 15) + assert(t:lookup128(s(128,178,0,1)) == 15) + assert(t:lookup128(s(128,179,0,1)) == 15) + assert(t:lookup128(s(128,177,0,1)) == 7) + assert(t:lookup128(s(128,180,0,1)) == 7) + -- /27 + assert(t:rib_lookup(s(128,178,154,160), 32) == 27) + assert(t:rib_lookup(s(128,178,154,161), 32) == 27) + assert(t:rib_lookup(s(128,178,154,191), 32) == 27) + assert(t:rib_lookup(s(128,179,154,160), 32) == 15) + assert(t:rib_lookup(s(128,178,154,159), 32) == 15) + assert(t:lookup(s(128,178,154,160)) == 27) + assert(t:lookup(s(128,178,154,161)) == 27) + assert(t:lookup(s(128,178,154,191)) == 27) + assert(t:lookup(s(128,179,154,160)) == 15) + assert(t:lookup(s(128,178,154,159)) == 15) + assert(t:lookup32(s(128,178,154,160)) == 27) + assert(t:lookup32(s(128,178,154,161)) == 27) + assert(t:lookup32(s(128,178,154,191)) == 27) + assert(t:lookup32(s(128,179,154,160)) == 15) + assert(t:lookup32(s(128,178,154,159)) == 15) + assert(t:lookup64(s(128,178,154,160)) == 27) + assert(t:lookup64(s(128,178,154,161)) == 27) + assert(t:lookup64(s(128,178,154,191)) == 27) + assert(t:lookup64(s(128,179,154,160)) == 15) + assert(t:lookup64(s(128,178,154,159)) == 15) + assert(t:lookup128(s(128,178,154,160)) == 27) + assert(t:lookup128(s(128,178,154,161)) == 27) + assert(t:lookup128(s(128,178,154,191)) == 27) + assert(t:lookup128(s(128,179,154,160)) == 15) + assert(t:lookup128(s(128,178,154,159)) == 15) + -- /31 + assert(t:rib_lookup(s(128,178,154,196), 32) == 31) + assert(t:rib_lookup(s(128,178,154,197), 32) == 31) + assert(t:rib_lookup(s(128,178,154,180), 32) == 27) + assert(t:rib_lookup(s(128,178,154,198), 32) == 15) + assert(t:lookup(s(128,178,154,196)) == 31) + assert(t:lookup(s(128,178,154,197)) == 31) + assert(t:lookup(s(128,178,154,180)) == 27) + assert(t:lookup(s(128,178,154,198)) == 15) + assert(t:lookup32(s(128,178,154,196)) == 31) + assert(t:lookup32(s(128,178,154,197)) == 31) + assert(t:lookup32(s(128,178,154,180)) == 27) + assert(t:lookup32(s(128,178,154,198)) == 15) + assert(t:lookup64(s(128,178,154,196)) == 31) + assert(t:lookup64(s(128,178,154,197)) == 31) + assert(t:lookup64(s(128,178,154,180)) == 27) + assert(t:lookup64(s(128,178,154,198)) == 15) + assert(t:lookup128(s(128,178,154,196)) == 31) + assert(t:lookup128(s(128,178,154,197)) == 31) + assert(t:lookup128(s(128,178,154,180)) == 27) + assert(t:lookup128(s(128,178,154,198)) == 15) + + local t = new{direct_pointing=direct_pointing, leaf_t=ffi.typeof(leaf_t)} + t:add(s(128,0,0,0,1,224,0,0), 43, 43) + t:add(s(128,0,0,0,1,224,178,196), 63, 63) + t:build() + -- /43 + assert(t:rib_lookup(s(128,0,0,0,1,224,0,0), 64) == 43) + assert(t:rib_lookup(s(128,0,0,0,1,224,0,1), 64) == 43) + assert(t:rib_lookup(s(128,0,0,0,1,225,0,0), 64) == 43) + assert(t:rib_lookup(s(128,0,0,0,1,254,100,12), 64) == 43) + assert(t:rib_lookup(s(128,0,0,0,1,200,100,12), 64) == nil) + assert(t:rib_lookup(s(128,0,0,0,2,224,0,0), 64) == nil) + assert(t:lookup(s(128,0,0,0,1,224,0,0)) == 43) + assert(t:lookup(s(128,0,0,0,1,224,0,1)) == 43) + assert(t:lookup(s(128,0,0,0,1,225,0,0)) == 43) + assert(t:lookup(s(128,0,0,0,1,254,100,12)) == 43) + assert(t:lookup(s(128,0,0,0,1,200,100,12)) == 0) + assert(t:lookup(s(128,0,0,0,2,224,0,0)) == 0) + assert(t:lookup64(s(128,0,0,0,1,224,0,0)) == 43) + assert(t:lookup64(s(128,0,0,0,1,224,0,1)) == 43) + assert(t:lookup64(s(128,0,0,0,1,225,0,0)) == 43) + assert(t:lookup64(s(128,0,0,0,1,254,100,12)) == 43) + assert(t:lookup64(s(128,0,0,0,1,200,100,12)) == 0) + assert(t:lookup64(s(128,0,0,0,2,224,0,0)) == 0) + assert(t:lookup128(s(128,0,0,0,1,224,0,0)) == 43) + assert(t:lookup128(s(128,0,0,0,1,224,0,1)) == 43) + assert(t:lookup128(s(128,0,0,0,1,225,0,0)) == 43) + assert(t:lookup128(s(128,0,0,0,1,254,100,12)) == 43) + assert(t:lookup128(s(128,0,0,0,1,200,100,12)) == 0) + assert(t:lookup128(s(128,0,0,0,2,224,0,0)) == 0) + -- /63 + assert(t:rib_lookup(s(128,0,0,0,1,224,178,196), 64) == 63) + assert(t:rib_lookup(s(128,0,0,0,1,224,178,197), 64) == 63) + assert(t:rib_lookup(s(128,0,0,0,1,225,178,197), 64) == 43) + assert(t:rib_lookup(s(128,0,0,0,1,224,179,196), 64) == 43) + assert(t:rib_lookup(s(128,0,0,0,2,225,178,197), 64) == nil) + assert(t:lookup(s(128,0,0,0,1,224,178,196)) == 63) + assert(t:lookup(s(128,0,0,0,1,224,178,197)) == 63) + assert(t:lookup(s(128,0,0,0,1,225,178,197)) == 43) + assert(t:lookup(s(128,0,0,0,1,224,179,196)) == 43) + assert(t:lookup(s(128,0,0,0,2,225,178,197)) == 0) + assert(t:lookup64(s(128,0,0,0,1,224,178,196)) == 63) + assert(t:lookup64(s(128,0,0,0,1,224,178,197)) == 63) + assert(t:lookup64(s(128,0,0,0,1,225,178,197)) == 43) + assert(t:lookup64(s(128,0,0,0,1,224,179,196)) == 43) + assert(t:lookup64(s(128,0,0,0,2,225,178,197)) == 0) + assert(t:lookup128(s(128,0,0,0,1,224,178,196)) == 63) + assert(t:lookup128(s(128,0,0,0,1,224,178,197)) == 63) + assert(t:lookup128(s(128,0,0,0,1,225,178,197)) == 43) + assert(t:lookup128(s(128,0,0,0,1,224,179,196)) == 43) + assert(t:lookup128(s(128,0,0,0,2,225,178,197)) == 0) + + local t = new{direct_pointing=direct_pointing, leaf_t=ffi.typeof(leaf_t)} + -- Tets building empty RIB + t:build() + -- Test RIB + t:add(s(0x00), 8, 1) -- 00000000 + t:add(s(0xF0), 8, 2) -- 11110000 + t:add(s(0xE0), 4, 3) -- 11100000 + t:add(s(0xFF), 8, 4) -- 11111111 + t:add(s(0xFF), 5, 5) -- 11111000 + t:add(s(0xF0,0xFF,0xFF,0xFF,0xFF,0xFF,0xFF,0xFF,0x0F), 72, 6) + local v, n = t:rib_lookup(s(0x0), 1) + assert(not v and n.left and not n.right) + local v, n = t:rib_lookup(s(0x00), 8) + assert(v == 1 and not n) + local v, n = t:rib_lookup(s(0xE0), 3) + assert(not v and (n.left and n.right)) + local v, n = t:rib_lookup(s(0x0), 1, n) + assert(v == 3 and not n) + local v, n = t:rib_lookup(s(0xFF), 5) + assert(v == 5 and (not n.left) and n.right) + local v, n = t:rib_lookup(s(0xF0), 3, n) + assert(v == 4 and not n) + local v, n = t:rib_lookup(s(0xF8), 8) + assert(v == 5 and not n) + local v, n = t:rib_lookup(s(0xF0,0xFF,0xFF,0xFF,0xFF,0xFF,0xFF,0xFF,0x0F), 72) + assert(v == 6 and not n) + -- Test FIB + t:build() + if debug then t:fib_info() end + assert(t:lookup(s(0x00)) == 1) -- 00000000 + assert(t:lookup(s(0xC0)) == 0) -- 11000000 + assert(t:lookup(s(0xE0)) == 3) -- 11100000 + assert(t:lookup(s(0xF0)) == 2) -- 11110000 + assert(t:lookup(s(0xF8)) == 5) -- 11111000 + assert(t:lookup(s(0xFC)) == 5) -- 11111100 + assert(t:lookup(s(0xFF)) == 4) -- 11111111 + assert(t:lookup(s(0xF0,0xFF,0xFF,0xFF,0xFF,0xFF,0xFF,0xFF,0x0F)) == 6) + assert(t:lookup32(s(0x00)) == 1) + assert(t:lookup32(s(0xC0)) == 0) + assert(t:lookup32(s(0xE0)) == 3) + assert(t:lookup32(s(0xF0)) == 2) + assert(t:lookup32(s(0xF8)) == 5) + assert(t:lookup32(s(0xFC)) == 5) + assert(t:lookup32(s(0xFF)) == 4) + assert(t:lookup64(s(0x00)) == 1) + assert(t:lookup64(s(0xC0)) == 0) + assert(t:lookup64(s(0xE0)) == 3) + assert(t:lookup64(s(0xF0)) == 2) + assert(t:lookup64(s(0xF8)) == 5) + assert(t:lookup64(s(0xFC)) == 5) + assert(t:lookup64(s(0xFF)) == 4) + assert(t:lookup128(s(0x00)) == 1) + assert(t:lookup128(s(0xC0)) == 0) + assert(t:lookup128(s(0xE0)) == 3) + assert(t:lookup128(s(0xF0)) == 2) + assert(t:lookup128(s(0xF8)) == 5) + assert(t:lookup128(s(0xFC)) == 5) + assert(t:lookup128(s(0xFF)) == 4) + assert(t:lookup128(s(0xF0,0xFF,0xFF,0xFF,0xFF,0xFF,0xFF,0xFF,0x0F)) == 6) + end + end -- Random testing local function reproduce (cases, config) diff --git a/src/lib/poptrie_lookup.dasl b/src/lib/poptrie_lookup.dasl index 6cc1d529ec..1aea871025 100644 --- a/src/lib/poptrie_lookup.dasl +++ b/src/lib/poptrie_lookup.dasl @@ -60,23 +60,39 @@ local BMI2 = (assert(lib.readfile("/proc/cpuinfo", "*a"), |.define leaves, rdi -- pointer to leaves array |.define nodes, rsi -- pointer to nodes array |.define key, rdx -- key to look up +|.define key_dw, edx -- (key as dword) |.define dmap, rcx -- pointer to directmap |.define index, r8d -- index into node array |.define node, r8 -- pointer into node array -|.define offset, r9d -- offset into key -|.define offsetx, r9 -- (offset as qword) +|.define key_x, r9 -- key extension (for 128 bit keys) |.define v, r10 -- k or s bits extracted from key |.define v_dw, r10d -- (v as dword) |.define vec, r11 -- 64-bit vector or leafvec -- lookup(leaf_t *leaves, node_t *nodes, key) -> leaf_t function lookup (Dst, Poptrie, keysize) + if keysize == 32 then + | mov key_dw, dword [key] + | bswap key + elseif keysize == 64 then + | mov key, [key] + | bswap key + elseif keysize == 128 then + | mov key_x, [key+8] + | bswap key_x + | mov key, [key] + | bswap key + else error("NYI") end if Poptrie.direct_pointing then -- v = extract(key, 0, Poptrie.s) - local direct_mask = bit.lshift(1ULL, Poptrie.s) - 1 - -- v = band(key, direct_mask) - | mov v_dw, dword [key] - | and v, direct_mask + | mov v, key + | shr v, (64 - Poptrie.s) + if keysize <= 64 then + | shl key, Poptrie.s + else + | shld key, key_x, Poptrie.s + | shl key_x, Poptrie.s + end -- index = dmap[v] | mov index, dword [dmap+v*4] -- eax = band(index, leaf_tag - 1) (tag inverted) @@ -89,43 +105,22 @@ function lookup (Dst, Poptrie, keysize) |1: | imul index, 24 -- multiply by node size | lea node, [nodes+index] - -- offset = s - | mov offset, Poptrie.s else - -- index, node, offset = 0, nodes[index], 0 + -- index, node = 0, nodes[index] | xor index, index | lea node, [nodes+0] -- nodes[0] - | xor offset, offset end -- while band(vec, lshift(1ULL, v)) ~= 0 |2: -- v = extract(key, offset, k=6) - if keysize == 32 then - if BMI2 then - | shrx v_dw, dword [key], offset - else - | mov ecx, offset - | mov v_dw, dword [key] - | shr v, cl - end - elseif keysize == 64 then - if BMI2 then - | shrx v, [key], offsetx - else - | mov ecx, offset - | mov v, [key] - | shr v, cl - end - elseif keysize == 128 then - | mov ecx, offset - | mov v, [key] - | mov vec, [key+8] - | test cl, 64 - | cmovnz v, vec - | shrd v, vec, cl - else error("NYI") end - -- v = band(v, lshift(1, k=6) - 1) - | and v_dw, 0x3F + | mov v, key + | shr v, (64 - Poptrie.k) + if keysize <= 64 then + | shl key, Poptrie.k + else + | shld key, key_x, Poptrie.k + | shl key_x, Poptrie.k + end -- vec = nodes[index].vector | mov vec, qword [node+8] -- is bit v set in vec? @@ -151,8 +146,6 @@ function lookup (Dst, Poptrie, keysize) -- node = nodes[index] | imul index, 24 -- multiply by node size | lea node, [nodes+index] - -- offset = offset + k - | add offset, 6 | jmp <2 -- loop -- end while |4: diff --git a/src/lib/ptree/worker.lua b/src/lib/ptree/worker.lua index bd201c5ffe..4e280121bd 100644 --- a/src/lib/ptree/worker.lua +++ b/src/lib/ptree/worker.lua @@ -14,6 +14,8 @@ local alarms = require("lib.yang.alarms") local channel = require("lib.ptree.channel") local action_codec = require("lib.ptree.action_codec") local ptree_alarms = require("lib.ptree.alarms") +local timeline = require("core.timeline") +local events = timeline.load_events(engine.timeline(), "core.engine") local Worker = {} @@ -103,6 +105,8 @@ function Worker:main () if not engine.auditlog_enabled then engine.enable_auditlog() end + engine.timeline() + engine.enable_tick() engine.setvmprofile("engine") @@ -110,10 +114,14 @@ function Worker:main () self.breathe() if next_time < engine.now() then next_time = engine.now() + self.period + events.engine_stopped() self:handle_actions_from_manager() + events.engine_started() timer.run() + events.polled_timers() end if not engine.busywait then engine.pace_breathing() end + engine.randomize_log_rate() until stop < engine.now() counter.commit() if not self.no_report then engine.report(self.report) end diff --git a/src/lib/scheduling.lua b/src/lib/scheduling.lua index e3a4769e01..27d18997c5 100644 --- a/src/lib/scheduling.lua +++ b/src/lib/scheduling.lua @@ -6,6 +6,7 @@ local S = require("syscall") local lib = require("core.lib") local numa = require("lib.numa") local ingress_drop_monitor = require("lib.timers.ingress_drop_monitor") +local timeline = require("core.timeline") local function fatal (msg) print(msg) @@ -22,9 +23,11 @@ local scheduling_opts = { cpu = {}, -- CPU index (integer). real_time = {}, -- Boolean. max_packets = {}, -- Positive integer. + group_freelist_size = {}, ingress_drop_monitor = {}, -- Action string: one of 'flush' or 'warn'. profile = {default=true}, -- Boolean. busywait = {default=true}, -- Boolean. + timeline = {}, -- Boolean. Enable timeline logging? enable_xdp = {}, -- Enable Snabb XDP mode (see apps.xdp.xdp). eval = {} -- String. } @@ -59,10 +62,18 @@ function sched_apply.max_packets (max_packets) packet.initialize(max_packets) end +function sched_apply.group_freelist_size (nchunks) + packet.enable_group_freelist(nchunks) +end + function sched_apply.busywait (busywait) engine.busywait = busywait end +function sched_apply.timeline (enabled) + timeline.enabled = enabled +end + function sched_apply.enable_xdp (opt) if opt then require('apps.xdp.xdp').snabb_enable_xdp(opt) end end diff --git a/src/lib/yang/list.lua b/src/lib/yang/list.lua index b949ad4846..03aa02274f 100644 --- a/src/lib/yang/list.lua +++ b/src/lib/yang/list.lua @@ -829,16 +829,14 @@ function List:remove_obsolete_nodes (k, r, d, s, h) self:free_node(r) return self:remove_obsolete_nodes(k, node.parent, d, s, h) elseif band(node.occupied, node.occupied-1) == 0 then - -- Node has only one child, move it to parent. + -- Node has only one child, move to parent if it is a leaf. local index = msb_set(node.occupied) - parent.children[parent_index] = node.children[index] if self:node_leaf(node, index) then + parent.children[parent_index] = node.children[index] self:node_leaf(parent, parent_index, true) - else - self:node(node.children[index]).parent = node.parent + self:free_node(r) + return self:remove_obsolete_nodes(k, node.parent, d, s, h) end - self:free_node(r) - return self:remove_obsolete_nodes(k, node.parent, d, s, h) end end diff --git a/src/lib/yang/snabb-snabbflow-v1.yang b/src/lib/yang/snabb-snabbflow-v1.yang index 2465475409..87e75d0949 100644 --- a/src/lib/yang/snabb-snabbflow-v1.yang +++ b/src/lib/yang/snabb-snabbflow-v1.yang @@ -11,6 +11,11 @@ module snabb-snabbflow-v1 { description "Configuration for the Snabbflow IPFIX exporter."; + revision 2023-03-15 { + description + "Added interlink and group freelist configuration options."; + } + revision 2022-04-27 { description "Initial draft."; @@ -161,6 +166,22 @@ module snabb-snabbflow-v1 { } } } + + leaf group-freelist-size { + type uint32 { range 1024|2048|4096|8192; } + default 2048; + description + "Number of chunks allocated for the group freelist. + Each chunk holds up to 2048 packets. Must be a power of two."; + } + + leaf interlink-size { + type uint32 { range 1024|2048|4096|8192|16384|32768|65536|131072|262144; } + default 65536; + description + "Capacity of inter-process packet queues in number of packets. + Must be a power of two."; + } } } diff --git a/src/program/ipfix/lib.lua b/src/program/ipfix/lib.lua index 4a19eefef3..95a2e4831e 100644 --- a/src/program/ipfix/lib.lua +++ b/src/program/ipfix/lib.lua @@ -65,26 +65,28 @@ end local function configure_interlink_input (config, in_graph) config = lib.parse(config, { - name={required=true} + name={required=true}, + size={required=true} }) local graph = in_graph or app_graph.new() local in_name = config.name - app_graph.app(graph, in_name, Receiver) + app_graph.app(graph, in_name, Receiver, { size = config.size }) return graph, {name=in_name, output='output'} end local function configure_interlink_output (config, in_graph) config = lib.parse(config, { - name={required=true} + name={required=true}, + size={required=true} }) local graph = in_graph or app_graph.new() local out_name = config.name - app_graph.app(graph, out_name, Transmitter) + app_graph.app(graph, out_name, Transmitter, { size = config.size }) return graph, {name=out_name, input='input'} end @@ -175,9 +177,9 @@ local function configure_ipfix_tap_instance (config, in_graph) return graph, ipfix end -function configure_interlink_ipfix_tap_instance (in_name, config) +function configure_interlink_ipfix_tap_instance (in_name, link_size, config) local graph = app_graph.new() - local _, receiver = configure_interlink_input({name=in_name}, graph) + local _, receiver = configure_interlink_input({name=in_name, size=link_size}, graph) local _, ipfix = configure_ipfix_tap_instance(config, graph) link(graph, receiver, ipfix) @@ -230,7 +232,7 @@ local function configure_rss_tap_instances (config, outputs, rss_group, in_graph -- Keys -- link_name name of the link local _, transmitter = configure_interlink_output( - {name=output.link_name}, graph + {name=output.link_name, size=output.link_size}, graph ) link(graph, rss, transmitter) else @@ -303,4 +305,4 @@ function configure_mlx_controller (devices) need_ctrl = true end return ctrl_graph, need_ctrl -end \ No newline at end of file +end diff --git a/src/program/ipfix/probe/probe.lua b/src/program/ipfix/probe/probe.lua index b09e43e118..e29911a8e2 100644 --- a/src/program/ipfix/probe/probe.lua +++ b/src/program/ipfix/probe/probe.lua @@ -67,6 +67,17 @@ local function update_cpuset (cpu_pool) end end +local probe_group_freelist_size + +local function update_group_freelist_size (nchunks) + if not probe_group_freelist_size then + probe_group_freelist_size = nchunks + elseif probe_group_freelist_size ~= nchunks then + error("Can not change group-freelist-size after probe has started.") + end + return probe_group_freelist_size +end + local function warn (msg, ...) io.stderr:write("Warning: "..msg:format(...).."\n") io.stderr:flush() @@ -86,6 +97,9 @@ function start (name, confpath) busywait = busywait, real_time = real_time, profile = profile, + group_freelist_size = update_group_freelist_size( + conf.snabbflow_config.rss.software_scaling.group_freelist_size + ), jit_opt = { sizemcode=256, maxmcode=8192, @@ -122,6 +136,8 @@ function setup_workers (config) local flow_director = config.snabbflow_config.flow_director local ipfix = config.snabbflow_config.ipfix + update_group_freelist_size(rss.software_scaling.group_freelist_size) + local collector_pools = {} for name, p in pairs(ipfix.collector_pool) do local collectors = {} @@ -298,10 +314,14 @@ function setup_workers (config) args = iconfig } else - output = { type = "interlink", link_name = rss_link } + output = { + type = "interlink", + link_name = rss_link, + link_size = rss.software_scaling.interlink_size + } workers[rss_link] = probe.configure_interlink_ipfix_tap_instance( - rss_link, iconfig + rss_link, output.link_size, iconfig ) -- Dedicated exporter processes are restartable worker_opts[rss_link] = {