diff --git a/README.md b/README.md index fa04a33..58731aa 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ -[![Coverage Status](https://coveralls.io/repos/github/moonlibs/sync/badge.svg?branch=master)](https://coveralls.io/github/moonlibs/sync?branch=master) - # Collection of synchronization primitives for Tarantool fibers +[![Coverage Status](https://coveralls.io/repos/github/moonlibs/sync/badge.svg?branch=master)](https://coveralls.io/github/moonlibs/sync?branch=master) + ## Conditional Variable (cond) Current implementation of Tarantool's `fiber.cond` has unexpected behavior if signal/broadcast was executed before wait (unlike other implementations of conditional variable). So, with `sync.cond` the following code will not freeze forever. @@ -19,8 +19,8 @@ local value = cond:recv() local cond = sync.cond() fiber.create(function() - fiber.sleep(1) - cond:send('some data') + fiber.sleep(1) + cond:send('some data') end) print(cond:recv()) @@ -34,10 +34,10 @@ Used to wait for finishing of several simultaneous/parallel tasks. local wg = sync.wg() for 1..10 do - wg:start() -- or wg:begin() - fiber.create(function() - wg:done() -- or wg:finish() - end) + wg:start() -- or wg:begin() + fiber.create(function() + wg:done() -- or wg:finish() + end) end wg:wait(timeout) @@ -50,7 +50,6 @@ Sadly we cannod use the pair `begin`/`end`, since `end` is a keyword. One could There is alternative name `sync.cv` for `sync.wg` for compatibility with the previous version. - ## Mutex (lock) with deadlock detection Heavyweight mutex, which is assigned to fiber. That allows to implement deadlock detection. @@ -59,17 +58,17 @@ Heavyweight mutex, which is assigned to fiber. That allows to implement deadlock local lock = sync.lock() for i = 1, 3 do - fiber.create(function(i) - lock:acquire() - fiber.sleep(math.random()) - print(i, "doing work") - fiber.sleep(math.random()) - lock:release() - end,i) + fiber.create(function(i) + lock:acquire() + fiber.sleep(math.random()) + print(i, "doing work") + fiber.sleep(math.random()) + lock:release() + end,i) end lock:with(function() - -- critical section + -- critical section end) ``` @@ -83,13 +82,13 @@ Rather performant, but without any sugar, like deadlock detection local lock = sync.latch() for i = 1, 3 do - fiber.create(function(i) - lock:acquire() - fiber.sleep(math.random()) - print(i, "doing work") - fiber.sleep(math.random()) - lock:release() - end,i) + fiber.create(function(i) + lock:acquire() + fiber.sleep(math.random()) + print(i, "doing work") + fiber.sleep(math.random()) + lock:release() + end,i) end ``` @@ -102,11 +101,11 @@ local http = require 'http.client' local pool = sync.pool('workers', 4) for i = 1, 16 do - pool:send(function(url) - local r = http.get(url) - assert(r.status == 200) - return r.status, r.headers, r.body - end, {"https://tarantool.io"}) + pool:send(function(url) + local r = http.get(url) + assert(r.status == 200) + return r.status, r.headers, r.body + end, {"https://tarantool.io"}) end pool:wait() -- pool can be awaited @@ -114,8 +113,8 @@ print("pool finished") ``` sync.pool is usefull in background fibers when you need to parallel networks requests -```lua +```lua function job:start() self.fiber_f = fiber.create(function() local pool = sync.pool('fetches', 4) @@ -137,7 +136,6 @@ function job:start() end end) end - ``` ## More plans and ideas to implement @@ -146,11 +144,10 @@ There are several ideas may be implemented. PR's or proposals are welcome * Named wait groups — names instead of counters * fiber.select — ability to wait for something waitable (like in go) - * fiber.select or wait_any/wait_all - * https://github.com/tarantool/tarantool/issues/5635 -* Joinable fiber pool + * fiber.select or wait_any/wait_all + * * "Normal" joinable fiber (like coro) - * able to "return" - * able to rethrow - * zombie status: no tombstones in fiber pool + * able to "return" + * able to rethrow + * zombie status: no tombstones in fiber pool * Channel+luafun diff --git a/sync/cond.lua b/sync/cond.lua index 2598148..5c3120b 100644 --- a/sync/cond.lua +++ b/sync/cond.lua @@ -3,6 +3,7 @@ local fiber = require "fiber" ---@class sync.cond ---@field name string ---@field timeout number? default timeout for :recv() +---@field sent boolean? was data sent through condvar local cond = {} cond.__index = cond cond.__tostring = function (self) return "cond<".. (self.name or 'anon') ..">" end @@ -20,6 +21,10 @@ function cond.new(name, timeout) }, cond) end +---Sends data to the cond and receiver +--- +---Method raises exception if any data was already sent through this cond +---@param data any data you want to send (please do not send nil/box.NULL, it is acceptable but not usable) function cond:send(data) if getmetatable( self ) ~= cond then error("Usage: cond:send(value) (not cond.send(value))", 2) @@ -34,6 +39,9 @@ function cond:send(data) end end +---Receives data from conditional variable +---@param timeout number? timeout in seconds to wait for data (default self.timeout or infinity) +---@return any|nil, string? error_message # returns `sent data` if was awaited, or nil, "Timed out" otherwise function cond:recv(timeout) if getmetatable( self ) ~= cond then error("Usage: cond:send(value) (not cond.send(value))", 2) diff --git a/sync/latch.lua b/sync/latch.lua index aabc18e..160d1c9 100644 --- a/sync/latch.lua +++ b/sync/latch.lua @@ -43,11 +43,11 @@ for _, def in pairs(defs) do end end --- ffi.metatype('', {}) - -local latch = { - -- WAIT_WINDOW = 1/3, -} +---Latch is bindings to lightweight tarantool locks +---@class sync.latch +---@field name string? name of the latch if was given +---@field obj ffi.ctype* latch object itself +local latch = {} latch.__index = latch latch.__tostring = function (self) return "latch<".. (self.name or 'anon') ..">" end setmetatable(latch, { __call = function (_, name) return _.new(name) end}) @@ -57,36 +57,45 @@ local function destroy(obj) C.box_latch_delete(obj) end +---Creates new latch +---@param name string? name of the latch +---@return sync.latch function latch.new(name) - if name == latch then error("Usage: latch.new([name]) or latch([name]) (not latch:new())", 2) end + if name == latch then error("Usage: latch.new([name]) or latch([name]) (not latch:new())", 2) end local obj = C.box_latch_new() if not obj then error("Failed to create latch") end ffi.gc(obj, destroy) - return setmetatable({ + return setmetatable({ obj = obj; - name = name; - }, latch) + name = name; + }, latch) end +---Locks latch +--- +---this method does not accept timeout, so can be locked infinetely long function latch:lock() - if getmetatable( self ) ~= latch then - error("Usage: latch:lock() (not latch.lock())", 2) - end + if getmetatable( self ) ~= latch then + error("Usage: latch:lock() (not latch.lock())", 2) + end C.box_latch_lock(self.obj) end +---Fast checks if latch can be locked +---@return boolean success # returns true if latch was captured function latch:trylock() - if getmetatable( self ) ~= latch then - error("Usage: latch:trylock() (not latch.trylock())", 2) - end + if getmetatable( self ) ~= latch then + error("Usage: latch:trylock() (not latch.trylock())", 2) + end return C.box_latch_trylock(self.obj) == 0 -- 0 means success end +---Releases locked latch function latch:unlock() - if getmetatable( self ) ~= latch then - error("Usage: latch:unlock() (not latch.unlock())", 2) - end + if getmetatable( self ) ~= latch then + error("Usage: latch:unlock() (not latch.unlock())", 2) + end C.box_latch_unlock(self.obj) end @@ -98,10 +107,16 @@ local function tail(self, r, ...) return ... end +---Wrapper lock()/unlock() which executes `f` +--- +---latch will be released whether function raised exception or not +---@param f fun(...:any) callable object, usually function of the critical section +---@param ... any? arguments of the function +---@return ... # return result of the function or raises exception function latch:with(f, ...) - if getmetatable( self ) ~= latch then - error("Usage: latch:with(fn) (not latch.with(fn))", 2) - end + if getmetatable( self ) ~= latch then + error("Usage: latch:with(fn) (not latch.with(fn))", 2) + end self:lock() return tail(self, pcall(f, ...)) end diff --git a/sync/lock.lua b/sync/lock.lua index a57b57e..7429f9d 100644 --- a/sync/lock.lua +++ b/sync/lock.lua @@ -1,5 +1,9 @@ local fiber = require "fiber" +---@class sync.lock +---@field name string name of the lock +---@field locked false|number false when no one acquired lock, `fiber_id` otherwise +---@field _lock fiber.cond local lock = { WAIT_WINDOW = 1/3, } @@ -10,11 +14,9 @@ setmetatable(lock, { __call = function (_, name) return _.new(name) end }) local FIBER_STORE = 'sync.lock' ---- debug routine --- local function print(...) --- return _G.print(fiber.id(), ...) --- end - +---Creates new lock +---@param name string? name of the lock +---@return sync.lock function lock.new(name) if name == lock then error("Usage: lock.new([name]) or lock([name]) (not lock:new())", 2) end return setmetatable({ @@ -32,6 +34,15 @@ function lock:_self_check() end end +---Tries to acquire lock +--- +---Returns `true` on success, returns `false` on timed out +--- +---Raises exception when attempting to acquire same lock twice in the same fiber +--- +---Raises exception when deadlock is discovered +---@param timeout? number timeout in seconds, default timeout is infinity +---@return boolean # `true` if lock was successfully captured, `false` whe timed out function lock:aquire(timeout) if getmetatable( self ) ~= lock then error("Usage: lock:aquire() (not lock.aquire())", 2) @@ -102,16 +113,15 @@ function lock:aquire(timeout) end self.locked = fiber.id() - -- print("[".. tostring(self) .. "] + aquired by ", fiber.id()); return true - - -- local deadline - -- if timeout then - -- deadline = fiber.time() + timeout; - -- end end lock.lock = lock.aquire +---Releases lock +--- +---Raises exception when lock is acquired by noone +--- +---Does not check lock ownership function lock:release() if getmetatable( self ) ~= lock then error("Usage: lock:release() (not lock.release())", 2) @@ -120,7 +130,6 @@ function lock:release() error("lock:release called on not aquired lock", 2) end self.locked = false - -- print("[".. tostring(self) .. "] - released by ", fiber.id()); if self._lock then self._lock:signal() end @@ -135,6 +144,13 @@ local function tail(self, r, ...) return ... end +---Executes given function under lock +--- +---:with() reraises exception if function raised exception +---lock will be automatically released regardless success of function execution +---@param f fun() function of the critical section +---@param ... any arguments for the function +---@return ... # returns result of the function function lock:with(f, ...) if getmetatable( self ) ~= lock then error("Usage: lock:with(fn) (not lock.with(fn))", 2) diff --git a/sync/pool.lua b/sync/pool.lua index 6e0d6a0..d50874b 100644 --- a/sync/pool.lua +++ b/sync/pool.lua @@ -85,7 +85,7 @@ local function worker_main_loop(p) ---@type sync.pool.task? local t repeat - t = p.chan:get(0.01) --[[@as sync.pool.task?]] + t = p.chan:get() --[[@as sync.pool.task?]] until (not should_live()) or t if type(t) ~= 'table' then @@ -133,9 +133,8 @@ local function worker_main_loop(p) end if p.chan then if p.debug then - log.info("closing channel") + log.info("removing channel") end - p.chan:close() p.chan = nil end end @@ -238,11 +237,6 @@ function task_mt:on_finish(on_finish_cb, on_finish_ctx) self.on_finish_cb = on_finish_cb end -function task_mt:__gc() - self.terminate_cb = nil - self.cb = nil -end - ---Executes given function with arguments on the pool --- ---`pool:send()` raises exception if pool has been terminated @@ -323,7 +317,7 @@ end --- end --- end --- ----@param func fun()|table function to execute +---@param func fun(...:any)|table function to execute ---@param args? any[] arguments for the call will passed as arguments to given function ---@param opts? sync.pool.runOpts options for the task (async is true by default) ---@return sync.pool.task|boolean|nil maybe_task, string? error_message # (async) false will be returned only when task was not scheduled @@ -472,10 +466,10 @@ function pool:terminate(force) if type(self.terminate_cb) == 'table' and not self.terminate_cb.sent then self.terminate_cb:send(true) end + self.chan:close() if not force then return end end - self.chan:close() self.chan = nil log.warn("terminating workers: force=%s", not not force) diff --git a/sync/wg.lua b/sync/wg.lua index 485decb..d0b7aef 100644 --- a/sync/wg.lua +++ b/sync/wg.lua @@ -3,6 +3,8 @@ local fiber = require "fiber" ---@class sync.wg ---@field name string name of the waitgroup ---@field timeout number? default wait timeout +---@field active number +---@field completed boolean? local wg = {} wg.__index = wg wg.__tostring = function (self) return "wg<".. (self.name or 'anon') ..">" end @@ -23,6 +25,8 @@ function wg.new(name, timeout) }, wg) end +---Increments counter by n (default 1) +---@param n number? increment (1 if not given) function wg:start(n) if getmetatable( self ) ~= wg then error("Usage: wg:start() (not wg.start())", 2) @@ -32,6 +36,9 @@ function wg:start(n) end wg.add = wg.start +---Decrements counter by 1 +--- +---Notifies waiters when counter==0. function wg:finish() if getmetatable( self ) ~= wg then error("Usage: wg:finish() (not wg.finish())", 2) @@ -49,6 +56,9 @@ function wg:finish() end wg.done = wg.finish +---Awaits wait group +---@param timeout number? timeout in seconds (default self.timeout or infinety) +---@return true|nil awaited, string? error_message # returns `true` when wg was awaited, and `nil` when "Timed out" function wg:wait(timeout) if getmetatable( self ) ~= wg then error("Usage: wg:wait([timeout]) (not wg.wait([timeout]))", 2) diff --git a/test/04-latch.test.lua b/test/04-latch.test.lua index 6690d7c..809cabb 100644 --- a/test/04-latch.test.lua +++ b/test/04-latch.test.lua @@ -76,9 +76,7 @@ end, 3, 'concurrent locks') test:deadline(function() local lock = sync.latch() - local completed = 0 local N = 10 - local value = 0 local fibers = {} @@ -127,11 +125,9 @@ test:deadline(function() lock:unlock() end, 3, 'trylock no yield') -do +test:deadline(function() test:diag("Test from synchronized") - local fiber = require('fiber') - local in_action = 0 local function criticalsection(id) test:diag("call to crit section %d", id) @@ -141,7 +137,7 @@ do test:is(in_action, 1, "in action only 1 fiber in "..id) in_action = in_action - 1 end - local lock = sync.lock('somekey') + local lock = sync.latch('somekey') local N = 3 local ch = fiber.channel(N) @@ -155,6 +151,6 @@ do for _=1, N do ch:get() end -end +end, 3, 'latch 3 fibers') test:done_testing() diff --git a/test/05-pool.test.lua b/test/05-pool.test.lua index df9b7e1..74528f8 100644 --- a/test/05-pool.test.lua +++ b/test/05-pool.test.lua @@ -131,7 +131,7 @@ test:deadline(function() test:ok(ok, "result is awaited") test:is(result, value, "received result is the same as value") - test:ok(waited <= 0.105, "result was awaited not for too long") + test:ok(waited <= 0.2, "result was awaited not for too long") test:noyield(function() ok, result = task:wait() @@ -323,4 +323,34 @@ test:deadline(function() test:is(fins[2], true, "second task finished ok") end, 1, "spawn/despawn does not cancel scheduled tasks") +test:deadline(function() + local pool = sync.pool.new('pool', 2) + pool:send(function() + fiber.sleep(0.1) + end) + + local executed = false + local task = assert(pool:send(function(...) executed = true fiber.sleep(...) end, {1})) + test:ok(task, "task must be returned") + test:is(task.scheduled_at, nil, "task must not be scheduled yet") + local err, res = task:wait(0, true) + + test:is(err, nil, "task:wait(0, true) must return nil") + test:is(task.cancelled, true, "task must be cancelled") + test:is(res, "task await timed out", "task:wait must return await timed out") + + pool:terminate() + pool:wait() + test:is(task.scheduled_at, nil, "task must not be scheduled") + fiber.sleep(0.1) + test:is(executed, false, "task must not be executed") + + local ft = fiber.time() + local err2, res2 = task:wait(1) + local waited = fiber.time()-ft + test:is(err2, nil, "double task wait on cancelled") + test:is(res2, "task was cancelled", "second wait must return task was cancelled") + test:is(waited, 0, "task:wait on cancelled task must return instantly") +end, 1, "task autocancelled when wait_timeout is too low") + test:done_testing()