CircuitPythonのasyncioを使った協調型マルチタスク2 応用編

AdafruitのLearning Guideの非公式日本語訳です。
英語独特の言い回しを可能な限り日本語的な表現に直していますが、不自然に感じる部分も残っていますことを何卒ご容赦ください。

タスク間のコミュニケーション

前の投稿で説明した2つのLチカのタスクはお互いの状況を知りません。これは非常に重要なポイントです。タスクがそれぞれ独立して実行されていても、asyncio.sleep()を使って交代で実行するため、期待通りのタイミングを保つことが可能なんです。

1個のLEDの点滅を制御する

今度はLEDの動作に影響を与える情報をLチカタスクに渡したい場合を考えてみます。例えば、ボタンを押した回数によって、点滅速度を変更したいとします。

Lチカタスクの中でボタンを監視することもできますが、それではLチカタスクが複雑になってしまいます。
この例では、ボタンを監視する別のタスクを作成します。このタスクは、共有オブジェクトの値を変更し、Lチカタスクに何をすべきかを指示します。

import asyncio
import board
import digitalio
import keypad


class Interval:
    """Simple class to hold an interval value. Use .value to to read or write."""

    def __init__(self, initial_interval):
        self.value = initial_interval


async def monitor_interval_buttons(pin_slower, pin_faster, interval):
    """Monitor two buttons: one lengthens the interval, the other shortens it.
    Change interval.value as appropriate.
    """
    # Assume buttons are active low.
    with keypad.Keys(
        (pin_slower, pin_faster), value_when_pressed=False, pull=True
    ) as keys:
        while True:
            key_event = keys.events.get()
            if key_event and key_event.pressed:
                if key_event.key_number == 0:
                    # Lengthen the interval.
                    interval.value += 0.1
                else:
                    # Shorten the interval.
                    interval.value = max(0.1, interval.value - 0.1)
                print("interval is now", interval.value)
            # Let another task run.
            await asyncio.sleep(0)


async def blink(pin, interval):
    """Blink the given pin forever.
    The blinking rate is controlled by the supplied Interval object.
    """
    with digitalio.DigitalInOut(pin) as led:
        led.switch_to_output()
        while True:
            led.value = not led.value
            await asyncio.sleep(interval.value)


async def main():
    # Start blinking 0.5 sec on, 0.5 sec off.
    interval = Interval(0.5)

    led_task = asyncio.create_task(blink(board.D1, interval))
    interval_task = asyncio.create_task(
        monitor_interval_buttons(board.D3, board.D4, interval)
    )
    # This will run forever, because neither task ever exits.
    await asyncio.gather(led_task, interval_task)


asyncio.run(main())

上のプログラムでは、led_task と interval_task は interval オブジェクトを共有していますが、それ以外はお互いについて知りません。一方を変更しても、もう一方は変更する必要がありません。

ここで、新たに興味深いことが一つあります。monitor_interval_buttons()では、無限ループでキー入力を待ちます。何が起こるかに関係なく、ループを回るたびに、await asyncio.sleep(0) を実行し、タスクスケジューラにコントロールを返します。これはasyncioの標準的な方法で、「私はもう十分長く走ったから、他のタスクを走らせなさい」と言うものです。もし他に実行可能なタスクがなければ、スケジューラはスリープ時間が0であるため、すぐに制御を取り戻します。

2個のLEDの点滅を制御する

それでは2つのLEDを異なるボタンで制御したいとします。これは非常に簡単で、単にタスクを追加すればよいのです。以下は、その例です。
main()の部分だけが変更されています。残りのプログラムは同じままです。

import asyncio
import board
import digitalio
import keypad


class Interval:
    """Simple class to hold an interval value. Use .value to to read or write."""

    def __init__(self, initial_interval):
        self.value = initial_interval


async def monitor_interval_buttons(pin_slower, pin_faster, interval):
    """Monitor two buttons: one lengthens the interval, the other shortens it.
    Change interval.value as appropriate.
    """
    # Assume buttons are active low.
    with keypad.Keys(
        (pin_slower, pin_faster), value_when_pressed=False, pull=True
    ) as keys:
        while True:
            key_event = keys.events.get()
            if key_event and key_event.pressed:
                if key_event.key_number == 0:
                    # Lengthen the interval.
                    interval.value += 0.1
                else:
                    # Shorten the interval.
                    interval.value = max(0.1, interval.value - 0.1)
                print("interval is now", interval.value)
            # Let another task run.
            await asyncio.sleep(0)


async def blink(pin, interval):
    """Blink the given pin forever.
    The blinking rate is controlled by the supplied Interval object.
    """
    with digitalio.DigitalInOut(pin) as led:
        led.switch_to_output()
        while True:
            led.value = not led.value
            await asyncio.sleep(interval.value)


async def main():
    interval1 = Interval(0.5)
    interval2 = Interval(1.0)

    led1_task = asyncio.create_task(blink(board.D1, interval1))
    led2_task = asyncio.create_task(blink(board.D2, interval2))
    interval1_task = asyncio.create_task(
        monitor_interval_buttons(board.D3, board.D4, interval1)
    )
    interval2_task = asyncio.create_task(
        monitor_interval_buttons(board.D5, board.D6, interval2)
    )

    await asyncio.gather(led1_task, led2_task, interval1_task, interval2_task)


asyncio.run(main())

競合状態にはならない

このように共有データを使う手法では、あるタスクが書き込んだデータを他のタスクが読み込んだときに、整合性のない状態になってしまうという競合状態が発生しないかと心配されるかもしれません。
awaitを使ってスケジューラに制御を委ねる前に、タスクがデータの一貫性を確保する限り、このようなことは起こりません。タスクは協調して交代しているので、あるタスクが他のタスクの実行を中断することはできません。

専門用語で言うと、タスク内の2つのawait文の間のコードは、「クリティカルセクション」のようなもので、他のタスクに割り込まれないということです。

NeoPixelを制御する

タスクを使ってNeoPixelアニメーションの方向と速度を制御する、より興味深い例を紹介します。
この例は、Adafruit QT Py RP2040でテストされています。24個のNeoPixelリングがboard.A0に接続されています。3つのタクトスイッチが board.A1, board.A2, board.A3 に接続されています。ボタンの反対側はGNDに接続されているので、ボタンを押すとピンはローレベルになります。

ボタンA1を押すと、アニメーションの向きを反転させることができます。A2 と A3 を押すと、アニメーションのサイクル間の遅延を変えることにより、アニメーションを遅くしたり速くしたりすることができます。

上記「タスク間のコミュニケーション」の例と同様に、値を使うタスクと値を設定するタスクの間のコミュニケーションに、共有オブジェクトが使用されています。

import asyncio
import board
import keypad
import neopixel
from rainbowio import colorwheel

pixel_pin = board.A0
num_pixels = 24

pixels = neopixel.NeoPixel(pixel_pin, num_pixels, brightness=0.03, auto_write=False)


class Controls:
    def __init__(self):
        self.reverse = False
        self.wait = 0.0


async def rainbow_cycle(controls):
    while True:
        # Increment by 2 instead of 1 to speed the cycle up a bit.
        for j in range(255, -1, -2) if controls.reverse else range(0, 256, 2):
            for i in range(num_pixels):
                rc_index = (i * 256 // num_pixels) + j
                pixels[i] = colorwheel(rc_index & 255)
            pixels.show()
            await asyncio.sleep(controls.wait)


async def monitor_buttons(reverse_pin, slower_pin, faster_pin, controls):
    """Monitor buttons that reverse direction and change animation speed.
    Assume buttons are active low.
    """
    with keypad.Keys(
        (reverse_pin, slower_pin, faster_pin), value_when_pressed=False, pull=True
    ) as keys:
        while True:
            key_event = keys.events.get()
            if key_event and key_event.pressed:
                key_number = key_event.key_number
                if key_number == 0:
                    controls.reverse = not controls.reverse
                elif key_number == 1:
                    # Lengthen the interval.
                    controls.wait = controls.wait + 0.001
                elif key_number == 2:
                    # Shorten the interval.
                    controls.wait = max(0.0, controls.wait - 0.001)
            # Let another task run.
            await asyncio.sleep(0)


async def main():
    controls = Controls()

    buttons_task = asyncio.create_task(
        monitor_buttons(board.A1, board.A2, board.A3, controls)
    )
    animation_task = asyncio.create_task(rainbow_cycle(controls))

    # This will run forever, because no tasks ever finish.
    await asyncio.gather(buttons_task, animation_task)


asyncio.run(main())

Follow me on Twitter