Skip to content

Commit

Permalink
Adds annotations tests and unnecessary wakeups in sync/pool
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladislav Grubov committed Apr 30, 2023
1 parent 1c73ae4 commit 548b876
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 51 deletions.
8 changes: 8 additions & 0 deletions sync/cond.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ local fiber = require "fiber"
---@class sync.cond
---@field name string
---@field timeout number? default timeout for :recv()
---@field sent boolean? was data sent through condvar
local cond = {}
cond.__index = cond
cond.__tostring = function (self) return "cond<".. (self.name or 'anon') ..">" end
Expand All @@ -20,6 +21,10 @@ function cond.new(name, timeout)
}, cond)
end

---Sends data to the cond and receiver
---
---Method raises exception if any data was already sent through this cond
---@param data any data you want to send (please do not send nil/box.NULL, it is acceptable but not usable)
function cond:send(data)
if getmetatable( self ) ~= cond then
error("Usage: cond:send(value) (not cond.send(value))", 2)
Expand All @@ -34,6 +39,9 @@ function cond:send(data)
end
end

---Receives data from conditional variable
---@param timeout number? timeout in seconds to wait for data (default self.timeout or infinity)
---@return any|nil, string? error_message # returns `sent data` if was awaited, or nil, "Timed out" otherwise
function cond:recv(timeout)
if getmetatable( self ) ~= cond then
error("Usage: cond:send(value) (not cond.send(value))", 2)
Expand Down
57 changes: 36 additions & 21 deletions sync/latch.lua
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ for _, def in pairs(defs) do
end
end

-- ffi.metatype('', {})

local latch = {
-- WAIT_WINDOW = 1/3,
}
---Latch is bindings to lightweight tarantool locks
---@class sync.latch
---@field name string? name of the latch if was given
---@field obj ffi.ctype* latch object itself
local latch = {}
latch.__index = latch
latch.__tostring = function (self) return "latch<".. (self.name or 'anon') ..">" end
setmetatable(latch, { __call = function (_, name) return _.new(name) end})
Expand All @@ -57,36 +57,45 @@ local function destroy(obj)
C.box_latch_delete(obj)
end

---Creates new latch
---@param name string? name of the latch
---@return sync.latch
function latch.new(name)
if name == latch then error("Usage: latch.new([name]) or latch([name]) (not latch:new())", 2) end
if name == latch then error("Usage: latch.new([name]) or latch([name]) (not latch:new())", 2) end
local obj = C.box_latch_new()
if not obj then error("Failed to create latch") end
ffi.gc(obj, destroy)

return setmetatable({
return setmetatable({
obj = obj;
name = name;
}, latch)
name = name;
}, latch)
end

---Locks latch
---
---this method does not accept timeout, so can be locked infinetely long
function latch:lock()
if getmetatable( self ) ~= latch then
error("Usage: latch:lock() (not latch.lock())", 2)
end
if getmetatable( self ) ~= latch then
error("Usage: latch:lock() (not latch.lock())", 2)
end
C.box_latch_lock(self.obj)
end

---Fast checks if latch can be locked
---@return boolean success # returns true if latch was captured
function latch:trylock()
if getmetatable( self ) ~= latch then
error("Usage: latch:trylock() (not latch.trylock())", 2)
end
if getmetatable( self ) ~= latch then
error("Usage: latch:trylock() (not latch.trylock())", 2)
end
return C.box_latch_trylock(self.obj) == 0 -- 0 means success
end

---Releases locked latch
function latch:unlock()
if getmetatable( self ) ~= latch then
error("Usage: latch:unlock() (not latch.unlock())", 2)
end
if getmetatable( self ) ~= latch then
error("Usage: latch:unlock() (not latch.unlock())", 2)
end
C.box_latch_unlock(self.obj)
end

Expand All @@ -98,10 +107,16 @@ local function tail(self, r, ...)
return ...
end

---Wrapper lock()/unlock() which executes `f`
---
---latch will be released whether function raised exception or not
---@param f fun(...:any) callable object, usually function of the critical section
---@param ... any? arguments of the function
---@return ... # return result of the function or raises exception
function latch:with(f, ...)
if getmetatable( self ) ~= latch then
error("Usage: latch:with(fn) (not latch.with(fn))", 2)
end
if getmetatable( self ) ~= latch then
error("Usage: latch:with(fn) (not latch.with(fn))", 2)
end
self:lock()
return tail(self, pcall(f, ...))
end
Expand Down
40 changes: 28 additions & 12 deletions sync/lock.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
local fiber = require "fiber"

---@class sync.lock
---@field name string name of the lock
---@field locked false|number false when no one acquired lock, `fiber_id` otherwise
---@field _lock fiber.cond
local lock = {
WAIT_WINDOW = 1/3,
}
Expand All @@ -10,11 +14,9 @@ setmetatable(lock, { __call = function (_, name) return _.new(name) end })

local FIBER_STORE = 'sync.lock'

--- debug routine
-- local function print(...)
-- return _G.print(fiber.id(), ...)
-- end

---Creates new lock
---@param name string? name of the lock
---@return sync.lock
function lock.new(name)
if name == lock then error("Usage: lock.new([name]) or lock([name]) (not lock:new())", 2) end
return setmetatable({
Expand All @@ -32,6 +34,15 @@ function lock:_self_check()
end
end

---Tries to acquire lock
---
---Returns `true` on success, returns `false` on timed out
---
---Raises exception when attempting to acquire same lock twice in the same fiber
---
---Raises exception when deadlock is discovered
---@param timeout? number timeout in seconds, default timeout is infinity
---@return boolean # `true` if lock was successfully captured, `false` whe timed out
function lock:aquire(timeout)
if getmetatable( self ) ~= lock then
error("Usage: lock:aquire() (not lock.aquire())", 2)
Expand Down Expand Up @@ -102,16 +113,15 @@ function lock:aquire(timeout)
end

self.locked = fiber.id()
-- print("[".. tostring(self) .. "] + aquired by ", fiber.id());
return true

-- local deadline
-- if timeout then
-- deadline = fiber.time() + timeout;
-- end
end
lock.lock = lock.aquire

---Releases lock
---
---Raises exception when lock is acquired by noone
---
---Does not check lock ownership
function lock:release()
if getmetatable( self ) ~= lock then
error("Usage: lock:release() (not lock.release())", 2)
Expand All @@ -120,7 +130,6 @@ function lock:release()
error("lock:release called on not aquired lock", 2)
end
self.locked = false
-- print("[".. tostring(self) .. "] - released by ", fiber.id());
if self._lock then
self._lock:signal()
end
Expand All @@ -135,6 +144,13 @@ local function tail(self, r, ...)
return ...
end

---Executes given function under lock
---
---:with() reraises exception if function raised exception
---lock will be automatically released regardless success of function execution
---@param f fun() function of the critical section
---@param ... any arguments for the function
---@return ... # returns result of the function
function lock:with(f, ...)
if getmetatable( self ) ~= lock then
error("Usage: lock:with(fn) (not lock.with(fn))", 2)
Expand Down
14 changes: 4 additions & 10 deletions sync/pool.lua
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ local function worker_main_loop(p)
---@type sync.pool.task?
local t
repeat
t = p.chan:get(0.01) --[[@as sync.pool.task?]]
t = p.chan:get() --[[@as sync.pool.task?]]
until (not should_live()) or t

if type(t) ~= 'table' then
Expand Down Expand Up @@ -133,9 +133,8 @@ local function worker_main_loop(p)
end
if p.chan then
if p.debug then
log.info("closing channel")
log.info("removing channel")
end
p.chan:close()
p.chan = nil
end
end
Expand Down Expand Up @@ -238,11 +237,6 @@ function task_mt:on_finish(on_finish_cb, on_finish_ctx)
self.on_finish_cb = on_finish_cb
end

function task_mt:__gc()
self.terminate_cb = nil
self.cb = nil
end

---Executes given function with arguments on the pool
---
---`pool:send()` raises exception if pool has been terminated
Expand Down Expand Up @@ -323,7 +317,7 @@ end
--- end
--- end
---
---@param func fun()|table function to execute
---@param func fun(...:any)|table function to execute
---@param args? any[] arguments for the call will passed as arguments to given function
---@param opts? sync.pool.runOpts options for the task (async is true by default)
---@return sync.pool.task|boolean|nil maybe_task, string? error_message # (async) false will be returned only when task was not scheduled
Expand Down Expand Up @@ -472,10 +466,10 @@ function pool:terminate(force)
if type(self.terminate_cb) == 'table' and not self.terminate_cb.sent then
self.terminate_cb:send(true)
end
self.chan:close()
if not force then return end
end

self.chan:close()
self.chan = nil

log.warn("terminating workers: force=%s", not not force)
Expand Down
10 changes: 10 additions & 0 deletions sync/wg.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ local fiber = require "fiber"
---@class sync.wg
---@field name string name of the waitgroup
---@field timeout number? default wait timeout
---@field active number
---@field completed boolean?
local wg = {}
wg.__index = wg
wg.__tostring = function (self) return "wg<".. (self.name or 'anon') ..">" end
Expand All @@ -23,6 +25,8 @@ function wg.new(name, timeout)
}, wg)
end

---Increments counter by n (default 1)
---@param n number? increment (1 if not given)
function wg:start(n)
if getmetatable( self ) ~= wg then
error("Usage: wg:start() (not wg.start())", 2)
Expand All @@ -32,6 +36,9 @@ function wg:start(n)
end
wg.add = wg.start

---Decrements counter by 1
---
---Notifies waiters when counter==0.
function wg:finish()
if getmetatable( self ) ~= wg then
error("Usage: wg:finish() (not wg.finish())", 2)
Expand All @@ -49,6 +56,9 @@ function wg:finish()
end
wg.done = wg.finish

---Awaits wait group
---@param timeout number? timeout in seconds (default self.timeout or infinety)
---@return true|nil awaited, string? error_message # returns `true` when wg was awaited, and `nil` when "Timed out"
function wg:wait(timeout)
if getmetatable( self ) ~= wg then
error("Usage: wg:wait([timeout]) (not wg.wait([timeout]))", 2)
Expand Down
10 changes: 3 additions & 7 deletions test/04-latch.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ end, 3, 'concurrent locks')

test:deadline(function()
local lock = sync.latch()
local completed = 0
local N = 10
local value = 0

local fibers = {}

Expand Down Expand Up @@ -127,11 +125,9 @@ test:deadline(function()
lock:unlock()
end, 3, 'trylock no yield')

do
test:deadline(function()
test:diag("Test from synchronized")

local fiber = require('fiber')

local in_action = 0
local function criticalsection(id)
test:diag("call to crit section %d", id)
Expand All @@ -141,7 +137,7 @@ do
test:is(in_action, 1, "in action only 1 fiber in "..id)
in_action = in_action - 1
end
local lock = sync.lock('somekey')
local lock = sync.latch('somekey')

local N = 3
local ch = fiber.channel(N)
Expand All @@ -155,6 +151,6 @@ do
for _=1, N do
ch:get()
end
end
end, 3, 'latch 3 fibers')

test:done_testing()
32 changes: 31 additions & 1 deletion test/05-pool.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ test:deadline(function()
test:ok(ok, "result is awaited")
test:is(result, value, "received result is the same as value")

test:ok(waited <= 0.105, "result was awaited not for too long")
test:ok(waited <= 0.2, "result was awaited not for too long")

test:noyield(function()
ok, result = task:wait()
Expand Down Expand Up @@ -323,4 +323,34 @@ test:deadline(function()
test:is(fins[2], true, "second task finished ok")
end, 1, "spawn/despawn does not cancel scheduled tasks")

test:deadline(function()
local pool = sync.pool.new('pool', 2)
pool:send(function()
fiber.sleep(0.1)
end)

local executed = false
local task = assert(pool:send(function(...) executed = true fiber.sleep(...) end, {1}))
test:ok(task, "task must be returned")
test:is(task.scheduled_at, nil, "task must not be scheduled yet")
local err, res = task:wait(0, true)

test:is(err, nil, "task:wait(0, true) must return nil")
test:is(task.cancelled, true, "task must be cancelled")
test:is(res, "task await timed out", "task:wait must return await timed out")

pool:terminate()
pool:wait()
test:is(task.scheduled_at, nil, "task must not be scheduled")
fiber.sleep(0.1)
test:is(executed, false, "task must not be executed")

local ft = fiber.time()
local err2, res2 = task:wait(1)
local waited = fiber.time()-ft
test:is(err2, nil, "double task wait on cancelled")
test:is(res2, "task was cancelled", "second wait must return task was cancelled")
test:is(waited, 0, "task:wait on cancelled task must return instantly")
end, 1, "task autocancelled when wait_timeout is too low")

test:done_testing()

0 comments on commit 548b876

Please sign in to comment.