Skip to content

Commit

Permalink
Merge branch 'debug-group-freelist' into max-next
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneia committed Apr 6, 2023
2 parents cf292b2 + f3905b3 commit 7cc72ae
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 37 deletions.
45 changes: 20 additions & 25 deletions src/core/group_freelist.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@ local shm = require("core.shm")
local lib = require("core.lib")
local ffi = require("ffi")

local waitfor, compiler_barrier = lib.waitfor, lib.compiler_barrier
local band = bit.band

-- Group freelist: lock-free multi-producer multi-consumer ring buffer
-- (mpmc queue)
--
-- https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
--
-- NB: assumes 32-bit wide loads/stores are atomic (as is the fact on x86_64)!

-- Group freelist holds up to n chunks of chunksize packets each
chunksize = 2048
Expand All @@ -24,21 +22,22 @@ chunksize = 2048
local default_size = 1024 -- must be a power of two

local CACHELINE = 64 -- XXX - make dynamic
local INT = ffi.sizeof("uint32_t")
local QWORD = ffi.sizeof("uint64_t")

ffi.cdef([[
struct group_freelist_chunk {
uint32_t sequence[1], nfree;
uint64_t sequence[1];
uint32_t nfree, pad;
struct packet *list[]]..chunksize..[[];
} __attribute__((packed))]])

ffi.cdef([[
struct group_freelist {
uint32_t enqueue_pos[1], enqueue_mask;
uint8_t pad_enqueue_pos[]]..CACHELINE-2*INT..[[];
uint64_t enqueue_pos[1], enqueue_mask;
uint8_t pad_enqueue_pos[]]..CACHELINE-2*QWORD..[[];
uint32_t dequeue_pos[1], dequeue_mask;
uint8_t pad_dequeue_pos[]]..CACHELINE-2*INT..[[];
uint64_t dequeue_pos[1], dequeue_mask;
uint8_t pad_dequeue_pos[]]..CACHELINE-2*QWORD..[[];
uint32_t size, state[1];
Expand Down Expand Up @@ -70,61 +69,57 @@ end

function freelist_open (name, readonly)
local fl = shm.open(name, "struct group_freelist", 'read-only', 1)
waitfor(function () return sync.load(fl.state) == READY end)
lib.waitfor(function () return sync.load(fl.state) == READY end)
local size = fl.size
shm.unmap(fl)
return shm.open(name, "struct group_freelist", readonly, size)
end

function start_add (fl)
local pos = sync.load(fl.enqueue_pos)
local pos = sync.load64(fl.enqueue_pos)
local mask = fl.enqueue_mask
while true do
local chunk = fl.chunk[band(pos, mask)]
local seq = sync.load(chunk.sequence)
local dif = seq - pos
local seq = sync.load64(chunk.sequence)
local dif = ffi.cast("int64_t", seq) - ffi.cast("int64_t", pos)
if dif == 0 then
if sync.cas(fl.enqueue_pos, pos, pos+1) then
if sync.cas64(fl.enqueue_pos, pos, pos+1) then
return chunk, pos+1
end
elseif dif < 0 then
return
else
pos = sync.load(fl.enqueue_pos)
pos = sync.load64(fl.enqueue_pos)
end
end
end

function start_remove (fl)
local pos = sync.load(fl.dequeue_pos)
local pos = sync.load64(fl.dequeue_pos)
local mask = fl.dequeue_mask
while true do
local chunk = fl.chunk[band(pos, mask)]
local seq = sync.load(chunk.sequence)
local dif = seq - (pos+1)
local seq = sync.load64(chunk.sequence)
local dif = ffi.cast("int64_t", seq) - ffi.cast("int64_t", pos+1)
if dif == 0 then
if sync.cas(fl.dequeue_pos, pos, pos+1) then
if sync.cas64(fl.dequeue_pos, pos, pos+1) then
return chunk, pos+mask+1
end
elseif dif < 0 then
return
else
pos = sync.load(fl.dequeue_pos)
pos = sync.load64(fl.dequeue_pos)
end
end
end

function finish (chunk, seq)
chunk.sequence[0] = seq
assert(sync.cas64(chunk.sequence, chunk.sequence[0], seq))
end

local function occupied_chunks (fl)
local enqueue, dequeue = fl.enqueue_pos[0], fl.dequeue_pos[0]
if dequeue > enqueue then
return enqueue + fl.size - dequeue
else
return enqueue - dequeue
end
return tonumber(enqueue - dequeue)
end

-- Register struct group_freelist as an abstract SHM object type so that
Expand Down
72 changes: 60 additions & 12 deletions src/core/sync.dasl
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,25 @@ local ffi = require("ffi")
| .actionlist actions
| .globalnames globalnames

-- This module happens to use 32-bit arguments only.
|.define arg1, edi
|.define arg2, esi
|.define arg3, edx
|.define darg1, edi
|.define darg2, esi
|.define darg3, edx
|.define qarg1, rdi
|.define qarg2, rsi
|.define qarg3, rdx

-- load(src) -> uint32_t
-- Load integer from src.
local load_t = "uint32_t (*) (uint32_t *)"
local function load (Dst)
| mov eax, [arg1]
| mov eax, [qarg1]
| ret
end

-- load64(src) -> uint64_t
local load64_t = "uint64_t (*) (uint64_t *)"
local function load64 (Dst)
| mov rax, [qarg1]
| ret
end

Expand All @@ -27,10 +36,19 @@ end
-- equal, stores new at dst and returns true. Else, returns false.
local cas_t = "bool (*) (int *, int, int)"
local function cas (Dst)
| mov eax, arg2
| lock; cmpxchg [arg1], arg3 -- compare-and-swap; sets ZF flag on success
| mov eax, 0 -- clear eax for return value
| setz al -- set eax to 1 (true) if ZF is set
| mov eax, darg2
| lock; cmpxchg [qarg1], darg3 -- compare-and-swap; sets ZF flag on success
| mov eax, 0 -- clear eax for return value
| setz al -- set eax to 1 (true) if ZF is set
| ret
end

local cas64_t = "bool (*) (uint64_t *, uint64_t, uint64_t)"
local function cas64 (Dst)
| mov rax, qarg2
| lock; cmpxchg [qarg1], qarg3 -- compare-and-swap; sets ZF flag on success
| mov eax, 0 -- clear eax for return value
| setz al -- set eax to 1 (true) if ZF is set
| ret
end

Expand All @@ -42,20 +60,20 @@ local lock_t = "void (*) (int *)"
local function lock (Dst)
-- attempt to acquire
| mov eax, 1
| xchg eax, [arg1]
| xchg eax, [qarg1]
| test eax, eax -- was it 0 (unlocked)?
| jnz >1 -- no, go spin
| ret
-- spin
|1:
| pause
| cmp dword [arg1], 1 -- does it look locked?
| cmp dword [qarg1], 1 -- does it look locked?
| je <1 -- spin if it does
| jmp ->lock -- otherwise try to acquire
end
local unlock_t = "void (*) (int *)"
local function unlock (Dst)
| mov dword [arg1], 0
| mov dword [qarg1], 0
| ret
end

Expand All @@ -65,9 +83,15 @@ local function generate (Dst)
|->load:
|| load(Dst)
| .align 16
|->load64:
|| load64(Dst)
| .align 16
|->cas:
|| cas(Dst)
| .align 16
|->cas64:
|| cas64(Dst)
| .align 16
|->lock:
|| lock(Dst)
| .align 16
Expand All @@ -87,7 +111,9 @@ local entry = dasm.globals(globals, globalnames)

local sync = {
load = ffi.cast(load_t, entry.load),
load64 = ffi.cast(load64_t, entry.load64),
cas = ffi.cast(cas_t, entry.cas),
cas64 = ffi.cast(cas64_t, entry.cas64),
lock = ffi.cast(lock_t, entry.lock),
unlock = ffi.cast(unlock_t, entry.unlock)
}
Expand All @@ -104,6 +130,18 @@ sync.selftest = function ()
state = sync.load(box.state)
assert(state == i)
end
local box = ffi.new(
"struct { uint64_t pad1, state[1], pad2; } __attribute__((packed))"
)
local state = sync.load64(box.state)
assert(state == 0)
for i=1,100 do
box.state[0] = state + 1
state = sync.load64(box.state)
assert(state == i)
end
box.state[0] = 0x5555555555555555
assert(sync.load64(box.state) == 0x5555555555555555)
-- cas
local box = ffi.new(
"struct { unsigned int pad1, state[1], pad2; } __attribute__((packed))"
Expand All @@ -115,6 +153,16 @@ sync.selftest = function ()
and box.state[0] == 2147483648
and box.pad1 == 0
and box.pad2 == 0)
local box = ffi.new(
"struct { uint64_t pad1, state[1], pad2; } __attribute__((packed))"
)
assert(sync.cas64(box.state, 0, 1) and box.state[0] == 1)
assert(not sync.cas64(box.state, 0, 2) and box.state[0] == 1)
assert(sync.cas64(box.state, 1, 2) and box.state[0] == 2)
assert(sync.cas64(box.state, 2, 0x5555555555555555)
and box.state[0] == 0x5555555555555555
and box.pad1 == 0
and box.pad2 == 0)
-- lock / unlock
local spinlock = ffi.new("int[1]")
sync.lock(spinlock)
Expand Down

0 comments on commit 7cc72ae

Please sign in to comment.