Skip to content

Commit

Permalink
Merge pull request #44 from fechan/concurrency-fix
Browse files Browse the repository at this point in the history
Fix concurrency issues again
  • Loading branch information
fechan authored Nov 18, 2024
2 parents 9b1aa9a + 039dbac commit 931e69b
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 3 deletions.
2 changes: 2 additions & 0 deletions computercraft/sigils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ local Pipe = require('sigils.pipe')
local WebSocket = require('sigils.websocket')
local Utils = require('sigils.utils')
local Logging = require('sigils.logging')
local Concurrent = require('sigils.concurrent')

local DEFAULT_SERVER_URL = 'wss://sigils.fredchan.org'

Expand Down Expand Up @@ -81,6 +82,7 @@ local function init ()
function () WebSocket.doWebSocket(wsContext) end,
function () Controller.listenForCcpipesEvents(wsContext, factory) end,
function () Pipe.processAllPipesForever(factory) end,
function () Concurrent.default_runner.run_forever() end,
function () waitForQuitKey(wsContext) end,
function () while true do os.sleep(0.05) end end -- forces the OS not to lock up
)
Expand Down
9 changes: 6 additions & 3 deletions computercraft/sigils/ItemDetailAndLimitCache.lua
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ function ItemDetailAndLimitCache.new (missingPeriphs, initialMap)
---@param groups Group[] List of groups to fulfill item limits and details for
function o:Fulfill(groups)
local runner = Concurrent.default_runner
local parallelTasks = {}

for _, group in pairs(groups) do
for _, slot in pairs(group.slots) do
Expand All @@ -41,7 +42,8 @@ function ItemDetailAndLimitCache.new (missingPeriphs, initialMap)
end

-- fulfill itemDetail
runner.spawn(
table.insert(
parallelTasks,
function ()
local periph = peripheral.wrap(slot.periphId)
if periph and o.map[slotId].itemDetail == nil then
Expand All @@ -52,7 +54,8 @@ function ItemDetailAndLimitCache.new (missingPeriphs, initialMap)
)

-- fulfill itemLimit
runner.spawn(
table.insert(
parallelTasks,
function ()
local periph = peripheral.wrap(slot.periphId)
if periph and o.map[slotId].itemLimit == nil then
Expand All @@ -65,7 +68,7 @@ function ItemDetailAndLimitCache.new (missingPeriphs, initialMap)
end
end

runner.run_until_done()
runner.await_batch_tasks(parallelTasks)
end

---Get the item limit of the given Slot
Expand Down
17 changes: 17 additions & 0 deletions computercraft/sigils/concurrent.lua
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,30 @@ local function create_runner(max_size)
end
end

---Run an array of functions in parallel and wait for all of them to finish
---@param fns function[] List of functions to wait for the completion of
function await_batch_tasks(fns)
local awaits = {}
for _, task in pairs(fns) do
local resolve, await = create_future()
table.insert(awaits, await)
spawn(function ()
task()
resolve()
end)
end

parallel.waitForAll(table.unpack(awaits))
end

--- A coroutine executor.
-- @type Runner
return {
spawn = spawn,
has_work = has_work,
run_until_done = run_until_done,
run_forever = run_forever,
await_all_tasks = await_all_tasks,
}
end

Expand Down

0 comments on commit 931e69b

Please sign in to comment.