Skip to content

Commit

Permalink
Merge pull request #6 from moonlibs/fix-readme
Browse files Browse the repository at this point in the history
Fixes codestyle, markdown, annotations and more tests for the sync
  • Loading branch information
Mons authored May 3, 2023
2 parents 65553d0 + 548b876 commit f036ab3
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 88 deletions.
71 changes: 34 additions & 37 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[![Coverage Status](https://coveralls.io/repos/github/moonlibs/sync/badge.svg?branch=master)](https://coveralls.io/github/moonlibs/sync?branch=master)

# Collection of synchronization primitives for Tarantool fibers

[![Coverage Status](https://coveralls.io/repos/github/moonlibs/sync/badge.svg?branch=master)](https://coveralls.io/github/moonlibs/sync?branch=master)

## Conditional Variable (cond)

Current implementation of Tarantool's `fiber.cond` has unexpected behavior if signal/broadcast was executed before wait (unlike other implementations of conditional variable). So, with `sync.cond` the following code will not freeze forever.
Expand All @@ -19,8 +19,8 @@ local value = cond:recv()
local cond = sync.cond()

fiber.create(function()
fiber.sleep(1)
cond:send('some data')
fiber.sleep(1)
cond:send('some data')
end)

print(cond:recv())
Expand All @@ -34,10 +34,10 @@ Used to wait for finishing of several simultaneous/parallel tasks.
local wg = sync.wg()

for 1..10 do
wg:start() -- or wg:begin()
fiber.create(function()
wg:done() -- or wg:finish()
end)
wg:start() -- or wg:begin()
fiber.create(function()
wg:done() -- or wg:finish()
end)
end

wg:wait(timeout)
Expand All @@ -50,7 +50,6 @@ Sadly we cannod use the pair `begin`/`end`, since `end` is a keyword. One could

There is alternative name `sync.cv` for `sync.wg` for compatibility with the previous version.


## Mutex (lock) with deadlock detection

Heavyweight mutex, which is assigned to fiber. That allows to implement deadlock detection.
Expand All @@ -59,17 +58,17 @@ Heavyweight mutex, which is assigned to fiber. That allows to implement deadlock
local lock = sync.lock()

for i = 1, 3 do
fiber.create(function(i)
lock:acquire()
fiber.sleep(math.random())
print(i, "doing work")
fiber.sleep(math.random())
lock:release()
end,i)
fiber.create(function(i)
lock:acquire()
fiber.sleep(math.random())
print(i, "doing work")
fiber.sleep(math.random())
lock:release()
end,i)
end

lock:with(function()
-- critical section
-- critical section
end)
```

Expand All @@ -83,13 +82,13 @@ Rather performant, but without any sugar, like deadlock detection
local lock = sync.latch()

for i = 1, 3 do
fiber.create(function(i)
lock:acquire()
fiber.sleep(math.random())
print(i, "doing work")
fiber.sleep(math.random())
lock:release()
end,i)
fiber.create(function(i)
lock:acquire()
fiber.sleep(math.random())
print(i, "doing work")
fiber.sleep(math.random())
lock:release()
end,i)
end
```

Expand All @@ -102,20 +101,20 @@ local http = require 'http.client'
local pool = sync.pool('workers', 4)

for i = 1, 16 do
pool:send(function(url)
local r = http.get(url)
assert(r.status == 200)
return r.status, r.headers, r.body
end, {"https://tarantool.io"})
pool:send(function(url)
local r = http.get(url)
assert(r.status == 200)
return r.status, r.headers, r.body
end, {"https://tarantool.io"})
end

pool:wait() -- pool can be awaited
print("pool finished")
```

sync.pool is usefull in background fibers when you need to parallel networks requests
```lua

```lua
function job:start()
self.fiber_f = fiber.create(function()
local pool = sync.pool('fetches', 4)
Expand All @@ -137,7 +136,6 @@ function job:start()
end
end)
end

```

## More plans and ideas to implement
Expand All @@ -146,11 +144,10 @@ There are several ideas may be implemented. PR's or proposals are welcome

* Named wait groups — names instead of counters
* fiber.select — ability to wait for something waitable (like in go)
* fiber.select or wait_any/wait_all
* https://github.com/tarantool/tarantool/issues/5635
* Joinable fiber pool
* fiber.select or wait_any/wait_all
* <https://github.com/tarantool/tarantool/issues/5635>
* "Normal" joinable fiber (like coro)
* able to "return"
* able to rethrow
* zombie status: no tombstones in fiber pool
* able to "return"
* able to rethrow
* zombie status: no tombstones in fiber pool
* Channel+luafun
8 changes: 8 additions & 0 deletions sync/cond.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ local fiber = require "fiber"
---@class sync.cond
---@field name string
---@field timeout number? default timeout for :recv()
---@field sent boolean? was data sent through condvar
local cond = {}
cond.__index = cond
cond.__tostring = function (self) return "cond<".. (self.name or 'anon') ..">" end
Expand All @@ -20,6 +21,10 @@ function cond.new(name, timeout)
}, cond)
end

---Sends data to the cond and receiver
---
---Method raises exception if any data was already sent through this cond
---@param data any data you want to send (please do not send nil/box.NULL, it is acceptable but not usable)
function cond:send(data)
if getmetatable( self ) ~= cond then
error("Usage: cond:send(value) (not cond.send(value))", 2)
Expand All @@ -34,6 +39,9 @@ function cond:send(data)
end
end

---Receives data from conditional variable
---@param timeout number? timeout in seconds to wait for data (default self.timeout or infinity)
---@return any|nil, string? error_message # returns `sent data` if was awaited, or nil, "Timed out" otherwise
function cond:recv(timeout)
if getmetatable( self ) ~= cond then
error("Usage: cond:send(value) (not cond.send(value))", 2)
Expand Down
57 changes: 36 additions & 21 deletions sync/latch.lua
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ for _, def in pairs(defs) do
end
end

-- ffi.metatype('', {})

local latch = {
-- WAIT_WINDOW = 1/3,
}
---Latch is bindings to lightweight tarantool locks
---@class sync.latch
---@field name string? name of the latch if was given
---@field obj ffi.ctype* latch object itself
local latch = {}
latch.__index = latch
latch.__tostring = function (self) return "latch<".. (self.name or 'anon') ..">" end
setmetatable(latch, { __call = function (_, name) return _.new(name) end})
Expand All @@ -57,36 +57,45 @@ local function destroy(obj)
C.box_latch_delete(obj)
end

---Creates new latch
---@param name string? name of the latch
---@return sync.latch
function latch.new(name)
if name == latch then error("Usage: latch.new([name]) or latch([name]) (not latch:new())", 2) end
if name == latch then error("Usage: latch.new([name]) or latch([name]) (not latch:new())", 2) end
local obj = C.box_latch_new()
if not obj then error("Failed to create latch") end
ffi.gc(obj, destroy)

return setmetatable({
return setmetatable({
obj = obj;
name = name;
}, latch)
name = name;
}, latch)
end

---Locks latch
---
---this method does not accept timeout, so can be locked infinetely long
function latch:lock()
if getmetatable( self ) ~= latch then
error("Usage: latch:lock() (not latch.lock())", 2)
end
if getmetatable( self ) ~= latch then
error("Usage: latch:lock() (not latch.lock())", 2)
end
C.box_latch_lock(self.obj)
end

---Fast checks if latch can be locked
---@return boolean success # returns true if latch was captured
function latch:trylock()
if getmetatable( self ) ~= latch then
error("Usage: latch:trylock() (not latch.trylock())", 2)
end
if getmetatable( self ) ~= latch then
error("Usage: latch:trylock() (not latch.trylock())", 2)
end
return C.box_latch_trylock(self.obj) == 0 -- 0 means success
end

---Releases locked latch
function latch:unlock()
if getmetatable( self ) ~= latch then
error("Usage: latch:unlock() (not latch.unlock())", 2)
end
if getmetatable( self ) ~= latch then
error("Usage: latch:unlock() (not latch.unlock())", 2)
end
C.box_latch_unlock(self.obj)
end

Expand All @@ -98,10 +107,16 @@ local function tail(self, r, ...)
return ...
end

---Wrapper lock()/unlock() which executes `f`
---
---latch will be released whether function raised exception or not
---@param f fun(...:any) callable object, usually function of the critical section
---@param ... any? arguments of the function
---@return ... # return result of the function or raises exception
function latch:with(f, ...)
if getmetatable( self ) ~= latch then
error("Usage: latch:with(fn) (not latch.with(fn))", 2)
end
if getmetatable( self ) ~= latch then
error("Usage: latch:with(fn) (not latch.with(fn))", 2)
end
self:lock()
return tail(self, pcall(f, ...))
end
Expand Down
40 changes: 28 additions & 12 deletions sync/lock.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
local fiber = require "fiber"

---@class sync.lock
---@field name string name of the lock
---@field locked false|number false when no one acquired lock, `fiber_id` otherwise
---@field _lock fiber.cond
local lock = {
WAIT_WINDOW = 1/3,
}
Expand All @@ -10,11 +14,9 @@ setmetatable(lock, { __call = function (_, name) return _.new(name) end })

local FIBER_STORE = 'sync.lock'

--- debug routine
-- local function print(...)
-- return _G.print(fiber.id(), ...)
-- end

---Creates new lock
---@param name string? name of the lock
---@return sync.lock
function lock.new(name)
if name == lock then error("Usage: lock.new([name]) or lock([name]) (not lock:new())", 2) end
return setmetatable({
Expand All @@ -32,6 +34,15 @@ function lock:_self_check()
end
end

---Tries to acquire lock
---
---Returns `true` on success, returns `false` on timed out
---
---Raises exception when attempting to acquire same lock twice in the same fiber
---
---Raises exception when deadlock is discovered
---@param timeout? number timeout in seconds, default timeout is infinity
---@return boolean # `true` if lock was successfully captured, `false` whe timed out
function lock:aquire(timeout)
if getmetatable( self ) ~= lock then
error("Usage: lock:aquire() (not lock.aquire())", 2)
Expand Down Expand Up @@ -102,16 +113,15 @@ function lock:aquire(timeout)
end

self.locked = fiber.id()
-- print("[".. tostring(self) .. "] + aquired by ", fiber.id());
return true

-- local deadline
-- if timeout then
-- deadline = fiber.time() + timeout;
-- end
end
lock.lock = lock.aquire

---Releases lock
---
---Raises exception when lock is acquired by noone
---
---Does not check lock ownership
function lock:release()
if getmetatable( self ) ~= lock then
error("Usage: lock:release() (not lock.release())", 2)
Expand All @@ -120,7 +130,6 @@ function lock:release()
error("lock:release called on not aquired lock", 2)
end
self.locked = false
-- print("[".. tostring(self) .. "] - released by ", fiber.id());
if self._lock then
self._lock:signal()
end
Expand All @@ -135,6 +144,13 @@ local function tail(self, r, ...)
return ...
end

---Executes given function under lock
---
---:with() reraises exception if function raised exception
---lock will be automatically released regardless success of function execution
---@param f fun() function of the critical section
---@param ... any arguments for the function
---@return ... # returns result of the function
function lock:with(f, ...)
if getmetatable( self ) ~= lock then
error("Usage: lock:with(fn) (not lock.with(fn))", 2)
Expand Down
Loading

0 comments on commit f036ab3

Please sign in to comment.