Command line utility for async multi-device serial read/write

It’s a thing I just wrote for reading from multiple microcontrollers on one machine.
Writing to all devices is possible, but disabled by default.
Requires python 3.6+ as well as curio.
I used the default ports for the machines I need it on.

Takes optional args --port, --baudrate, and --writer.
If only one baudrate is specified, it will use it for all the ports.

Please share if you have the time to review it.

#!/usr/bin/env python

import curio
import itertools
import logging
import os
import serial
import signal
import sys
import threading


def os_default_ports():
    """Returns default serial port name for OS"""
    sysname = os.uname().sysname.lower()
    if sysname == 'darwin':
        return '/dev/cu.SLAB_USBtoUART'
    elif sysname == 'linux':
        return '/dev/ttyUSB0'


def add_args(*flags, default=None):
    """Sorts args based on flags passed and attempt; prefer int"""
    args = []
    for i, arg in enumerate(sys.argv):
        if (
                arg.strip('"').strip("'").lower() in flags
                and len(sys.argv) > i
            ):
            args.append(
                    int(sys.argv[i + 1]) if (sys.argv[i + 1]).isdigit()
                    else sys.argv[i + 1]
                )
    if not args and default is not None:
        args.append(default)
    return args


def parse_serial_args():
    """Setter for serial-specific args"""
    ports = add_args('-p', '--port', default=os_default_ports())
    baudrates = add_args('-b', '--baud', '--baudrate', default=921600)
    (writer,) = add_args('-w', '--write', '--writer', default=False)
    return ports, baudrates, writer


def create_serial_instances(ports, baudrates):
    """Creates serial objects for each port and baudrate arg passed"""
    instances = []
    for port, baudrate in itertools.zip_longest(
            ports, baudrates, fillvalue=baudrates[-1]
        ):
        try:
            ser = serial.Serial(
                    port=port, baudrate=baudrate,
                    bytesize=serial.EIGHTBITS, timeout=1
                )
        except Exception as e:
            logging.exception(e)
        else:
            instances.append(ser)
    return instances


def process_line(line, port=None, func=print):
    """Processes received lines"""
    device = port.rpartition('/')[-1]
    func((f'{device}: ' if device else '') + line)


async def async_process_line(line, func=print):
    """Processes received lines"""
    func(line)
    await curio.sleep(0)


def read_serial_device(instance):
    """Read and print incoming lines from a serial device"""
    try:
        while True:
            line = instance.readline()
            if line:
                try:
                    line = line.decode('utf-8').rstrip()
                except:
                    pass
                else:
                    process_line(line, port=instance.port)
    except KeyboardInterrupt:
        logging.info('Interrupted')
        logging.info(f'Stopping read of {instance.port}')


async def async_read_serial_device(instance):
    """Read and print incoming lines from a serial device"""
    async with curio.SignalQueue(signal.SIGHUP) as interrupt:
        try:
            while True:
                line = instance.readline()
                if line:
                    try:
                        line = line.decode('utf-8').rstrip()
                    except:
                        pass
                    else:
                        await process_line(line)
                await curio.sleep(0)
                await interrupt.get()
        except curio.CancelledError:
            logging.info('Interrupted')
            logging.info(f'Stopping read of {instance.port}')


async def write_to_serial(instance, payload, escape='\n'):
    """Encodes and writes a message to a serial device"""
    message = b''.join((
            bytes(payload.encode('utf-8')),
            (bytes(escape.encode('utf-8')) if escape else b'')
        ))
    instance.write(message)


async def write_to_all(*instances):
    """
    Asynchronously writes a message
    from stdin to all connected serial ports
    """
    async with curio.SignalQueue(signal.SIGHUP) as interrupt:
        try:
            while True:
                await interrupt.join()
                line = input().rstrip()
                if line:
                    async with curio.TaskGroup() as writer:
                        for instance in instances:
                            logging.info(
                                    f'Writing to {instance.port}: {line}'
                                )
                            await writer.spawn(
                                    write_to_serial, instance, line
                                )
                        await writer.join()
        except curio.TaskCancelled:
            await interrupt.join()
        finally:
            logging.info('Closing writer')


async def async_read_serial(ports, baudrates, writer=False):
    """Asynchronously read all incoming serial messages"""
    async with curio.TaskGroup(wait=any) as group:
        instances = create_serial_instances(ports, baudrates)
        try:
            if writer:
                await group.spawn(write_to_all, *instances)
            for instance in instances:
                await group.spawn(async_read_serial_device, instance)
            await group.join()
        except curio.TaskGroupError:
            await group.cancel_remaining()
        finally:
            logging.info('Closing serial connections')
            for instance in instances:
                logging.info(f'Closing {instance.port}')
                instance.close()
                logging.info(f'{instance.port} closed')
            logging.info('Serial connections closed')


def read_serial(ports, baudrates, writer=False):
    curio.run(async_read_serial, ports, baudrates, writer)


def threaded_read_serial(ports, baudrates, writer=False):
    instances = create_serial_instances(ports, baudrates)
    try:
        threads = tuple(
                threading.Thread(
                        target=read_serial_device,
                        args=(instance,)
                    )
                for instance in instances
            )
        for thread in threads:
            thread.start()
        for thread in threads:
            thread.join()
    finally:
        for instance in instances:
            instance.close()


if __name__ == '__main__':
    logging.basicConfig(level=1)
    ports, baudrates, writer = parse_serial_args()
    # read_serial(ports, baudrates, writer)
    threaded_read_serial(ports, baudrates, writer)

3 Likes

some updates that probably don’t belong in the OP as edits:

  1. and now actually I find it isn’t really working, dunno why. was working earlier, I went crazy with the writer and arg parsing and now it maybe works?? anyway will probably package it on github and link it later. try it out if you want, let me know if it works for you.

  2. ok, fixed it temporarily with threading. maybe it’s better this way, whatever, it can read at least

1 Like