Browse Source

Fixed many async bugs, still need to add handling interrupts

master
BuildTools 11 months ago
parent
commit
d11885754d
  1. 183
      lib/async.lua
  2. 18
      lib/queue.lua

183
lib/async.lua

@ -1,120 +1,88 @@
local coroutine = require("coroutine")
local computer = require("computer")
local event = require("event")
local ll = require("liblua")
local os = require("os")
local table = require("table")
local coroutine = coroutine or require("coroutine")
local computer = computer or require("computer")
local event = event or require("event")
local ll = ll or require("liblua")
local os = os or require("os")
local math = math or require("math")
local queue = queue or require("queue")
local table = table or require("table")
local async = {}
-- An async wrapper for event.pullFiltered
function async.pullFiltered(...)
local start = computer.uptime()
local args = {...}
-- TODO fix this so async can be required in more than 1 file
-- Set global interrupted value if it doesn't exist
if type(async.interrupted) ~= "boolean" then
async.interrupted = false
end
local blocking = nil
-- Check if we're in a coroutine or not
if type(args[#args]) == "boolean" and args[#args] then
blocking = true
else
blocking = false
-- Register an interrupt handler if it doesn't exist
do
local marker = "async_interrupt"
for i, v in ipairs(event.handlers) do
-- Ignore any previous interrupt handlers
if v.marker == marker then
return
end
end
--------------------
-- Break on interrupts
function f(name, ...)
-- if args[1] happens to be "interrupted" already the second won't trigger
if name == args[1] or name == "interrupted" then
return true
local interrupted_id = event.listen("interrupted",
function(...)
async.interrupted = true
end
)
-- Add our marker back
event.handlers[interrupted_id].marker = marker
end
-- Allows async listenting to events
async.listen = event.listen
-- Async pull messages. Must be nonblocking only, otherwise use event.pull
function async.pull(...)
local args = {...}
-- Set timeout if it exists
local timeout = math.huge
if type(args[1]) == "number" then
timeout = args[1]
args = ll.slice(args, 2)
end
--------------------
-- Set name if it exists
local name = ""
if type(args[1]) == "string" then
-- Check if not in a coroutine
if blocking then return event.pullFiltered(f, ...) end
repeat
print("Yielding")
coroutine.yield()
tmp = {event.pullFiltered(.05, f, ...)}
print("Pulled?")
until #tmp > 0
else
local timeout = nil
if type(args[1]) == "number" then
timeout = args[1]
args = ll.slice({...}, 2)
end
-- For speed purposes check up here even though it's more repeated code
if type(args[1]) == "function" then
--------------------
-- Wrap other function so this will handle interrupts
function g(name, ...)
local a = ll.slice(args, 2)
-- Handle interrupt
if name == "interrupted" then
return true
end
-- Otherwise run other function
return args[1](name, ...)
end
--------------------
if timeout ~= nil then
-- Check if not in a coroutine
if blocking then
return event.pullFiltered(timeout, g, table.unpack(args))
end
repeat
coroutine.yield()
tmp = {event.pullFiltered(.05, g, table.unpack(args))}
until #tmp > 0 or computer.uptime() - start >= timeout
else
-- Check if not in a coroutine
if blocking then
return event.pullFiltered(g, ...)
end
repeat
coroutine.yield()
tmp = {event.pullFiltered(.05, g, table.unpack(args))}
until #tmp > 0
end
else
if timeout ~= nil then
-- Check if not in a coroutine
if blocking then
return event.pullFiltered(timeout, f, table.unpack(args))
end
repeat
coroutine.yield()
tmp = {event.pullFiltered(.05, f, table.unpack(args))}
until #tmp > 0 or computer.uptime() - start >= timeout
else
-- Check if not in a coroutine
if blocking then
return event.pullFiltered(f, table.unpack(args))
end
repeat
coroutine.yield()
tmp = {event.pullFiltered(.05, f, table.unpack(args))}
until #tmp > 0
end
end
name = args[1]
end
if tmp == nil then
return nil
-- Initialize event queue
local q = queue.new()
local function callback(name, ...)
-- Add event to the queue when called
q:push({name, ...})
end
return table.unpack(tmp)
end
-- An alias for async.pullFiltered which does all of the logic for both functions
async.pull = async.pullFiltered
-- Register a listener to add to our queue
local id = event.listen(name, callback)
local start = computer.uptime()
-- Infinite loop waiting for an event
local waited
repeat
os.sleep(0.01)
coroutine.yield()
waited = computer.uptime() - start
until (#q > 0 or interrupted or waited > timeout)
-- Cancel the event when done
event.cancel(id)
if interrupted then return nil, "interrupted" end
if waited > timeout then return nil, "timeout exceeded" end
if #q > 0 then
return table.unpack(q:get())
end
end
-- Sleep in a coroutine friendly way
function async.sleep(s, blocking)
@ -126,6 +94,7 @@ function async.sleep(s, blocking)
local start = computer.uptime()
repeat
os.sleep(.01)
coroutine.yield()
until (computer.uptime() - start > s)
end
@ -183,6 +152,8 @@ function async.roundRobin(loop, ignoreError)
if coroutine.status(b.coro) == "dead" then
b.results = results
table.insert(loop.dead, #loop.dead+1, table.remove(loop.live, i))
-- Set i back to the beginning in case it's off now
if i > #loop.live then i = 1 end
else
i = (i % (#loop.live)) + 1
end

18
lib/queue.lua

@ -1,5 +1,7 @@
-- Simple FIFO Queue
local math = math or require("math")
local Queue = {}
Queue.__index = Queue
@ -7,12 +9,16 @@ setmetatable(Queue, {
__call = function (cls)
return cls.new()
end,
__len = function (cls)
return self.length
end,
})
function Queue.new()
local self = setmetatable({}, Queue)
self.first = 0
self.last = -1
self.first = 1
self.last = 0
self.length = 0
return self
end
@ -20,6 +26,7 @@ function Queue:push(e)
local last = self.last + 1
self.last = last
self[last] = e
self.length = self.length + 1
end
function Queue:pop()
@ -30,7 +37,12 @@ function Queue:pop()
local value = self[first]
self[first] = nil
self.first = first + 1
self.length = self.length - 1
return value
end
return Queue
function Queue:get() return self[self.first] end
return Queue
-- wget -f http://mc.bashed.rocks:13699/lib/queue.lua lib/queue.lua
Loading…
Cancel
Save