Add Macropad asyncio article
This commit is contained in:
parent
95f4f1a616
commit
8cf6983304
5 changed files with 639 additions and 0 deletions
|
@ -0,0 +1,42 @@
|
|||
from adafruit_macropad import MacroPad
|
||||
import usb_cdc, asyncio, random
|
||||
|
||||
macropad = MacroPad()
|
||||
macropad.pixels.brightness = 0.3
|
||||
color = 0xad20b4
|
||||
|
||||
async def get_key(taskslist):
|
||||
while True:
|
||||
key_event = macropad.keys.events.get()
|
||||
if key_event:
|
||||
if key_event.pressed:
|
||||
print("key k:{} t:{}".format(key_event.key_number, len(taskslist)))
|
||||
taskslist.append(asyncio.create_task(blink(key_event.key_number)))
|
||||
await asyncio.sleep(0)
|
||||
|
||||
async def blink(led):
|
||||
timer = random.random() * 3
|
||||
for _ in range(5):
|
||||
macropad.pixels[led] = color
|
||||
await asyncio.sleep(timer)
|
||||
macropad.pixels[led] = 0x0
|
||||
await asyncio.sleep(timer)
|
||||
|
||||
async def manage_tasks(taskslist):
|
||||
tasks = 0
|
||||
print("Run task manager t:{}".format(len(taskslist)))
|
||||
while True:
|
||||
for task_number in range(0, len(taskslist)):
|
||||
if taskslist[task_number].done():
|
||||
print("Remove task t:{}/{}".format(task_number + 1,len(taskslist)))
|
||||
taskslist.pop(task_number)
|
||||
break
|
||||
await asyncio.sleep(0)
|
||||
|
||||
async def main():
|
||||
tasks = []
|
||||
tasks.append(asyncio.create_task(get_key(tasks)))
|
||||
tasks.append(asyncio.create_task(manage_tasks(tasks)))
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
asyncio.run(main())
|
|
@ -0,0 +1,48 @@
|
|||
from adafruit_macropad import MacroPad
|
||||
import usb_cdc, asyncio
|
||||
|
||||
macropad = MacroPad()
|
||||
timer = 1
|
||||
color = 0xad20b4
|
||||
led = 0
|
||||
|
||||
def exec_command (data):
|
||||
global timer
|
||||
command,option = data.split()
|
||||
print("cmd:{} opt:{}".format(command,option))
|
||||
if command == 'time':
|
||||
timer = float(option)
|
||||
|
||||
async def blink(led, color):
|
||||
macropad.pixels.brightness = 0.3
|
||||
while True:
|
||||
try:
|
||||
if macropad.pixels[led] == (0,0,0):
|
||||
macropad.pixels[led] = color
|
||||
else:
|
||||
macropad.pixels[led] = 0x0
|
||||
print("l:{} c:{} t:{}".format(led,hex(color),timer))
|
||||
await asyncio.sleep(timer)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
async def check_serial(task_led):
|
||||
data = bytearray()
|
||||
serial = usb_cdc.data
|
||||
# Avec le timeout, même si la ligne ne contient pas \n, elle
|
||||
# sera pris en compte
|
||||
serial.timeout = 1
|
||||
while True:
|
||||
if serial.in_waiting:
|
||||
data = serial.readline()
|
||||
print("serial data received")
|
||||
exec_command(data.decode("utf-8"))
|
||||
task_led.cancel()
|
||||
await asyncio.sleep(0)
|
||||
|
||||
async def main():
|
||||
t_led = asyncio.create_task(blink(led, color))
|
||||
t_serial = asyncio.create_task(check_serial(t_led))
|
||||
await asyncio.gather(t_led, t_serial)
|
||||
|
||||
asyncio.run(main())
|
|
@ -0,0 +1,45 @@
|
|||
from adafruit_macropad import MacroPad
|
||||
import usb_cdc, asyncio
|
||||
|
||||
macropad = MacroPad()
|
||||
timer = 1
|
||||
color = 0xad20b4
|
||||
led = 0
|
||||
|
||||
def exec_command (data):
|
||||
global timer
|
||||
command,option = data.split()
|
||||
print("cmd:{} opt:{}".format(command,option))
|
||||
if command == 'time':
|
||||
timer = float(option)
|
||||
|
||||
async def blink(led, color):
|
||||
#global timer
|
||||
macropad.pixels.brightness = 0.3
|
||||
while True:
|
||||
if macropad.pixels[led] == (0,0,0):
|
||||
macropad.pixels[led] = color
|
||||
else:
|
||||
macropad.pixels[led] = 0x0
|
||||
print("l:{} c:{} t:{}".format(led,hex(color),timer))
|
||||
await asyncio.sleep(timer)
|
||||
|
||||
async def check_serial():
|
||||
data = bytearray()
|
||||
serial = usb_cdc.data
|
||||
# Avec le timeout, même si la ligne ne contient pas \n, elle
|
||||
# sera pris en compte
|
||||
serial.timeout = 1
|
||||
while True:
|
||||
if serial.in_waiting:
|
||||
data = serial.readline()
|
||||
print("serial data received")
|
||||
exec_command(data.decode("utf-8"))
|
||||
await asyncio.sleep(0)
|
||||
|
||||
async def main():
|
||||
t_led = asyncio.create_task(blink(led, color))
|
||||
t_serial = asyncio.create_task(check_serial())
|
||||
await asyncio.gather(t_led, t_serial)
|
||||
|
||||
asyncio.run(main())
|
|
@ -0,0 +1,52 @@
|
|||
from adafruit_macropad import MacroPad
|
||||
import usb_cdc, asyncio, random
|
||||
|
||||
macropad = MacroPad()
|
||||
macropad.pixels.brightness = 0.3
|
||||
run_color = 0xad20b4
|
||||
wait_color = 0x21f312
|
||||
|
||||
# déclaration de notre Mutex
|
||||
blink_mutex = asyncio.Lock()
|
||||
|
||||
async def get_key(taskslist):
|
||||
while True:
|
||||
key_event = macropad.keys.events.get()
|
||||
if key_event:
|
||||
if key_event.pressed:
|
||||
taskslist.append(asyncio.create_task(blink(key_event.key_number)))
|
||||
await asyncio.sleep(0)
|
||||
|
||||
async def blink(led):
|
||||
timer = random.random() * 3
|
||||
macropad.pixels[led] = wait_color
|
||||
if blink_mutex.locked():
|
||||
print("Wait for mutex l:{}".format(led))
|
||||
|
||||
await blink_mutex.acquire()
|
||||
|
||||
print("Aquire mutex l:{}".format(led))
|
||||
for _ in range(5):
|
||||
macropad.pixels[led] = run_color
|
||||
await asyncio.sleep(timer)
|
||||
macropad.pixels[led] = 0x0
|
||||
await asyncio.sleep(timer)
|
||||
|
||||
blink_mutex.release()
|
||||
|
||||
async def manage_tasks(taskslist):
|
||||
tasks = 0
|
||||
while True:
|
||||
for task_number in range(0, len(taskslist)):
|
||||
if taskslist[task_number].done():
|
||||
taskslist.pop(task_number)
|
||||
break
|
||||
await asyncio.sleep(0)
|
||||
|
||||
async def main():
|
||||
tasks = []
|
||||
tasks.append(asyncio.create_task(get_key(tasks)))
|
||||
tasks.append(asyncio.create_task(manage_tasks(tasks)))
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
asyncio.run(main())
|
Loading…
Add table
Add a link
Reference in a new issue