It’s a thing I just wrote for reading from multiple microcontrollers on one machine.
Writing to all devices is possible, but disabled by default.
Requires python 3.6+ as well as curio
.
I used the default ports for the machines I need it on.
Takes optional args --port
, --baudrate
, and --writer
.
If only one baudrate is specified, it will use it for all the ports.
Please share if you have the time to review it.
#!/usr/bin/env python
import curio
import itertools
import logging
import os
import serial
import signal
import sys
import threading
def os_default_ports():
"""Returns default serial port name for OS"""
sysname = os.uname().sysname.lower()
if sysname == 'darwin':
return '/dev/cu.SLAB_USBtoUART'
elif sysname == 'linux':
return '/dev/ttyUSB0'
def add_args(*flags, default=None):
"""Sorts args based on flags passed and attempt; prefer int"""
args = []
for i, arg in enumerate(sys.argv):
if (
arg.strip('"').strip("'").lower() in flags
and len(sys.argv) > i
):
args.append(
int(sys.argv[i + 1]) if (sys.argv[i + 1]).isdigit()
else sys.argv[i + 1]
)
if not args and default is not None:
args.append(default)
return args
def parse_serial_args():
"""Setter for serial-specific args"""
ports = add_args('-p', '--port', default=os_default_ports())
baudrates = add_args('-b', '--baud', '--baudrate', default=921600)
(writer,) = add_args('-w', '--write', '--writer', default=False)
return ports, baudrates, writer
def create_serial_instances(ports, baudrates):
"""Creates serial objects for each port and baudrate arg passed"""
instances = []
for port, baudrate in itertools.zip_longest(
ports, baudrates, fillvalue=baudrates[-1]
):
try:
ser = serial.Serial(
port=port, baudrate=baudrate,
bytesize=serial.EIGHTBITS, timeout=1
)
except Exception as e:
logging.exception(e)
else:
instances.append(ser)
return instances
def process_line(line, port=None, func=print):
"""Processes received lines"""
device = port.rpartition('/')[-1]
func((f'{device}: ' if device else '') + line)
async def async_process_line(line, func=print):
"""Processes received lines"""
func(line)
await curio.sleep(0)
def read_serial_device(instance):
"""Read and print incoming lines from a serial device"""
try:
while True:
line = instance.readline()
if line:
try:
line = line.decode('utf-8').rstrip()
except:
pass
else:
process_line(line, port=instance.port)
except KeyboardInterrupt:
logging.info('Interrupted')
logging.info(f'Stopping read of {instance.port}')
async def async_read_serial_device(instance):
"""Read and print incoming lines from a serial device"""
async with curio.SignalQueue(signal.SIGHUP) as interrupt:
try:
while True:
line = instance.readline()
if line:
try:
line = line.decode('utf-8').rstrip()
except:
pass
else:
await process_line(line)
await curio.sleep(0)
await interrupt.get()
except curio.CancelledError:
logging.info('Interrupted')
logging.info(f'Stopping read of {instance.port}')
async def write_to_serial(instance, payload, escape='\n'):
"""Encodes and writes a message to a serial device"""
message = b''.join((
bytes(payload.encode('utf-8')),
(bytes(escape.encode('utf-8')) if escape else b'')
))
instance.write(message)
async def write_to_all(*instances):
"""
Asynchronously writes a message
from stdin to all connected serial ports
"""
async with curio.SignalQueue(signal.SIGHUP) as interrupt:
try:
while True:
await interrupt.join()
line = input().rstrip()
if line:
async with curio.TaskGroup() as writer:
for instance in instances:
logging.info(
f'Writing to {instance.port}: {line}'
)
await writer.spawn(
write_to_serial, instance, line
)
await writer.join()
except curio.TaskCancelled:
await interrupt.join()
finally:
logging.info('Closing writer')
async def async_read_serial(ports, baudrates, writer=False):
"""Asynchronously read all incoming serial messages"""
async with curio.TaskGroup(wait=any) as group:
instances = create_serial_instances(ports, baudrates)
try:
if writer:
await group.spawn(write_to_all, *instances)
for instance in instances:
await group.spawn(async_read_serial_device, instance)
await group.join()
except curio.TaskGroupError:
await group.cancel_remaining()
finally:
logging.info('Closing serial connections')
for instance in instances:
logging.info(f'Closing {instance.port}')
instance.close()
logging.info(f'{instance.port} closed')
logging.info('Serial connections closed')
def read_serial(ports, baudrates, writer=False):
curio.run(async_read_serial, ports, baudrates, writer)
def threaded_read_serial(ports, baudrates, writer=False):
instances = create_serial_instances(ports, baudrates)
try:
threads = tuple(
threading.Thread(
target=read_serial_device,
args=(instance,)
)
for instance in instances
)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
finally:
for instance in instances:
instance.close()
if __name__ == '__main__':
logging.basicConfig(level=1)
ports, baudrates, writer = parse_serial_args()
# read_serial(ports, baudrates, writer)
threaded_read_serial(ports, baudrates, writer)