This is a pure Python concurrency implementation for the async/await language syntax.
while True
, redefined.
- lets you schedule tasks that may take a long time without making special state machines for them.
- supports non-blocking sleep() for tasks.
- calls
time.sleep()
for a microcontroller-friendly lower power sleep when there's nothing to do.
Your tasks could look like:
- Set
reading_sensor
to True- Send a message over I2C to a sensor telling it to do an environment sample and sleep 1 second (takes 1 second to read)
- Update in-memory state of your current sensor reading and sleep for 10 seconds.
- Update "loading" beach ball displayio animation 10 times per second while
reading_sensor
- Read status updates from a 3d printer 5 times per second.
- Read a rotary button 100 times per second.
Using tasko.managed_resource.ManagedResource
you can share an SPI bus between concurrent tasks without explicit
coordination.
def setup_spi():
from tasko.managed_resource import ManagedResource
import digitalio
import board
# Configure the hardware
spi = board.SPI()
sensor_cs = digitalio.DigitalInOut(board.D4)
sensor_cs.direction = digitalio.Direction.OUTPUT
sdcard_cs = digitalio.DigitalInOut(board.D5)
sdcard_cs.direction = digitalio.Direction.OUTPUT
# Set up acquire/release workflow for the SPI bus
def set_active(pin):
pin.value = True
def set_inactive(pin):
pin.value = False
# Configure the physical spi as a managed resource with callbacks that manage the CS pin
managed_spi = ManagedResource(spi, on_acquire=set_active, on_release=set_inactive)
# Get awaitable handles for each CS using this SPI bus
sensor_handle = managed_spi.handle(pin=sensor_cs)
sdcard_handle = managed_spi.handle(pin=sdcard_cs)
return sensor_handle, sdcard_handle
And with these configured resource handles you can use them without checking whether anything is busy. Things will efficiently wait when they have to, and charge right on through when there's nothing using the bus currently.
async def read_sensor(sensor_handle):
async with sensor_handle as bus:
# Or, if you're a library author consider taking the handle as an arg
# so you can acquire, send, release, --> sleep <--, acquire, read, release
# rather than holding the spi for the whole operation
await send_and_read_from_sensor(bus)
async def log_to_sdcard(sdcard_handle):
async with sdcard_handle as bus:
await write_to_sdcard(bus)
sensor_handle, sdcard_handle = setup_spi()
tasko.schedule(hz=1, read_sensor, sensor_handle)
tasko.schedule(hz=1/10, log_to_sdcard, sdcard_handle)
tasko.run()
Any time you are faced with needing to wait around for something you might be tempted to add state to some class and inspect it up in your main loop(), or "pulse" through your app every loop() to move state forward.
You could instead consider await
ing the condition you need to be fulfilled; be it time or something else.
Uses this library for the rotary button
import tasko
from cpy_rotary import RotaryButton
# Some state. Global state is not super cool but whatevs
reading_sensor = False
# Define the top-level workflows (you would have to write this stuff no matter what)
async def read_sensor():
global reading_sensor
reading_sensor = True
try:
i2c.send(payload)
await tasko.sleep(1) # Don't block your loading beach ball while the sensor is sensing.
i2c.read(payload) # if you have some buffered i2c thing
finally:
reading_sensor = False
async def animate_beach_ball():
global reading_sensor
if reading_sensor:
set_animation_state() # hopefully this is quick - if not, maybe there's something inside to `await`
async def read_from_3d_printer():
pass
rotary = RotaryButton()
# ---------- Tasko wiring begins here ---------- #
# Schedule the workflows at whatever frequency makes sense
tasko.schedule(hz=10, coroutine_function=read_sensor)
tasko.schedule(hz=10, coroutine_function=animate_beach_ball)
tasko.schedule(hz=5, corouting_function=read_from_3d_printer)
tasko.schedule(hz=100, coroutine_function=rotary.loop)
# And let tasko do while True
tasko.run()
# ---------- Tasko wiring ends here ---------- #
Cool! The async
and await
keywords are supported in Circuitpython 6.0. Unless you are from the future you will
need to update your microcontroller. Also, it may be unavailable on your m0 microcontroller because of flash space.