Cancel input/stdin.readline with serial input?

I've got a python script running with a touch display connected via serial. I have several prompts in it, which I can answer either:
1. In a shell
2. With display buttons
I'd like to be able to do either, without rewriting the functions each time, with whichever is first overriding the other.

Lately, my approach has been to find and kill the pid of the stdin input process ("lineread") when a button is pressed, though I'm positive that there's a better way that I'm unaware of. Plus, it doesn't work, anyway. Pressing the display button twice will advance the script (ignoring errors), but then the input won't actually work when Enter is pressed.

"displayRead" just reads the display for input and puts the action in a queue.

I thought maybe I could do something with killing procs via a joinable queue, but I'm unsure how (or what I was thinking, exactly).

Anyone have any thoughts or know how to do this?

Messy code warning (I'm new)...

import serial
import signal
import sys
from time import sleep
import os
from multiprocessing import Queue,JoinableQueue,Process

debug = True

def inputOverride(q,interrupt):
    while True:
        standby = q
        output = standby.get()
        if output:
            killpid = interrupt.get()
            if debug:
                print("Killing:\t\t", killpid)
            try:
                os.kill(killpid, signal.SIGTERM)
            except Exception as e:
                pass

def waitForInput(q,interrupt):
    if debug:
        print("Input PID:\t\t", os.getpid())
    interrupt.put(os.getpid())
    try:
        reply = input().lower()
        if reply in ["n", "no"]:
            q.put("n")
        else:
            q.put("y")
            return
    except Exception as e:
        if debug:
            print(e)
        return

def firstReply():
    if __name__ == '__main__':
        answer = Queue()
        interrupt = JoinableQueue()
        display = Process(target=displayRead, kwargs={"out":answer, "yn":True}, daemon=True)
        override = Process(target=inputOverride, args=(answer,interrupt), daemon=True)
        lineread = Process(target=waitForInput, args=(answer,interrupt))
        procs = [display,override,lineread]
        for p in procs:
            p.start()
        output = None
        while True:
            if output:
                ([p.terminate() for p in procs])
                if debug:
                    print(output)
                break
            elif not output:
                if not answer.empty():
                    output = answer.get()
                    continue
                    # There used to be more crap here but it wasn't working right
        return output

Nevermind. Got it. :grinning:

Will post code later.

1 Like

The answer was to use select.select(); based on what was posted here.

import select
import sys
import serial
from multiprocessing import Process,Queue

def firstInput(default=True):
    if __name__ == '__main__':
        inRead = [sys.stdin]
        answer = Queue()
        display = Process(target=displayRead, kwargs={"out":answer, "yn":True}, daemon=True)
        procs = [display]
        for p in procs:
            p.start()
        while inRead:
            ready = select.select(inRead, [], [], 0)[0]
            if not ready:
                pass
            else:
                for i in ready:
                    line = i.readline()
                    if not line:
                        inRead.remove(i)
                    elif line.rstrip().lower() in ["n","no"]:
                        return "n"
                    elif line and default:
                        return "y"
                    elif line and not default:
                        return "n"
            if not answer.empty():
                output = answer.get()
                ([p.terminate() for p in procs])
                return output
        return output

# Require yes/no input for prompts
def ynPrompt(promptText,yn=True):
    prompt = promptText+" (y/n)? "
    if yn:
        print(prompt)
    else:
        print(promptText)
    displayWrite("status.txt",prompt)
    reply = firstInput()
    while reply:
        if reply in ["y", "yes", buttonYes]:
            return True
        elif reply in ["n", "no", buttonNo]:
            return False
        elif not reply:
            pass
        else:
            displayWrite("status.txt","Invalid input")
            print("Invalid input")
        sleep(0.001)
1 Like

Good job !

2 Likes