Skip to content

moonlibs/sync

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

23 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Collection of synchronization primitives for Tarantool fibers

Coverage Status

Conditional Variable (cond)

Current implementation of Tarantool's fiber.cond has unexpected behavior if signal/broadcast was executed before wait (unlike other implementations of conditional variable). So, with sync.cond the following code will not freeze forever.

And unlike fiber cond (which are, in fact, more like signals) it is one time use only.

local cond = sync.cond()

cond:send(data)
local value = cond:recv()
local cond = sync.cond()

fiber.create(function()
    fiber.sleep(1)
    cond:send('some data')
end)

print(cond:recv())

WaitGroup (wg)

Used to wait for finishing of several simultaneous/parallel tasks.

local wg = sync.wg()

for 1..10 do
    wg:start() -- or wg:begin()
    fiber.create(function()
        wg:done() -- or wg:finish()
    end)
end

wg:wait(timeout)

There is a pair of methods: start & finish with a synonims add + done for easy migration from other languages (golang). Sadly we cannod use the pair begin/end, since end is a keyword. One could use mixed combination: start + done or add + finish

start also supports number (like add in Go), but that's not recommended.

There is alternative name sync.cv for sync.wg for compatibility with the previous version.

Mutex (lock) with deadlock detection

Heavyweight mutex, which is assigned to fiber. That allows to implement deadlock detection.

local lock = sync.lock()

for i = 1, 3 do
    fiber.create(function(i)
        lock:acquire()
        fiber.sleep(math.random())
        print(i, "doing work")
        fiber.sleep(math.random())
        lock:release()
    end,i)
end

lock:with(function()
    -- critical section
end)

Latch (lightweight lock)

Binding to tarantool's builtin latch: Latch of cooperative multitasking environment, which preserves strict order of fibers waiting for the latch.

Rather performant, but without any sugar, like deadlock detection

local lock = sync.latch()

for i = 1, 3 do
    fiber.create(function(i)
        lock:acquire()
        fiber.sleep(math.random())
        print(i, "doing work")
        fiber.sleep(math.random())
        lock:release()
    end,i)
end

Pool (fiber pool)

Implementation of fire-and-forget fiber pool.

local http = require 'http.client'
local pool = sync.pool('workers', 4)

for i = 1, 16 do
    pool:send(function(url)
        local r = http.get(url)
        assert(r.status == 200)
        return r.status, r.headers, r.body
    end, {"https://tarantool.io"})
end

pool:wait() -- pool can be awaited
print("pool finished")

sync.pool is usefull in background fibers when you need to parallel networks requests

function job:start()
    self.fiber_f = fiber.create(function()
        local pool = sync.pool('fetches', 4)
        while self.is_active do
            for _, user in box.space.users:pairs() do
                if self.is_active then
                    -- fast exit
                    break
                end
                pool:send(process_user_network, {user})
            end
        end
        pool:terminate()
        if not pool:wait(gracefull_timeout) then
            -- terminate pool with force
            log.warn("forcefully terminating pool")
            pool:terminate(true)
            pool:wait()
        end
    end)
end

Rate (token bucket rateimiter)

Implements classic Token Bucket algorithm limited with burst (integer non-negative value) and rps (floating value).

If you need to limit your requests to resource with rps, you might want to create Token Bucket with burst=1 and rps=rps.

To limit your requests you better use common method rate:wait() it awaits only single token but not limited with timeout.

When you need to wait rate-limiter at most timeout seconds then you can specify that as first argument: rate:wait(timeout).

Note, that rate-limiter returns immediately if token can't be awaited within provided timeout (noyield response).

local rate = sync.rate.new("rlimit", 10/60, 1) -- 10 requests / 60 seconds, with burst=1
assert(rate:wait()) -- infinitely wait for ratelimit

Read more about Token Bucket https://en.wikipedia.org/wiki/Token_bucket

Inspired by golang's time/rate https://pkg.go.dev/golang.org/x/time/rate

More plans and ideas to implement

There are several ideas may be implemented. PR's or proposals are welcome

  • Named wait groups — names instead of counters
  • fiber.select — ability to wait for something waitable (like in go)
  • "Normal" joinable fiber (like coro)
    • able to "return"
    • able to rethrow
    • zombie status: no tombstones in fiber pool
  • Channel+luafun