From 6c19459a913d7ee27ad85fb10bc5416000fc2d18 Mon Sep 17 00:00:00 2001 From: dapplegatecp <59579399+dapplegatecp@users.noreply.github.com> Date: Thu, 14 Mar 2024 19:30:21 -0600 Subject: [PATCH] fix timeout and add interactive for csterm.py (#116) --- cli_sample/csterm.py | 102 +++++++++++++++++++++++++++++++++++++---- cli_sample/package.ini | 2 +- 2 files changed, 94 insertions(+), 10 deletions(-) diff --git a/cli_sample/csterm.py b/cli_sample/csterm.py index f24024d7..391dfd6b 100644 --- a/cli_sample/csterm.py +++ b/cli_sample/csterm.py @@ -1,6 +1,45 @@ import random import time import re +import sys + + +class Terminal: + def inkey(self, timeout=0): + i = b'' + c, *_ = select.select([sys.stdin],[],[],timeout) + if c: + i = os.read(self.fd, 4) + if len(i) > 1: + while True: + c, *_ = select.select( [sys.stdin], [], [], timeout ) + if not c: + break + i += os.read(self.fd, 1024) + return i.decode() + + def cbreak(self): + return self + + def __enter__(self): + self.fd = sys.stdin.fileno() + self.old_settings = termios.tcgetattr(self.fd) + tty.setcbreak(self.fd) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old_settings) + if exc_type is not None: + raise + + +try: + from blessed import Terminal +except ImportError: + # NOTE: blessed not installed, using fallback terminal (pip3 install blessed) + import select + import tty, termios + import os class CSTerm: @@ -32,14 +71,14 @@ def exec(self, cmds, clean=True): """ cmds = cmds if isinstance(cmds, list) else [cmds] cmds = [c + '\n' for c in cmds] - timeout = self.timeout * (1 / self.INTERVAL) - soft_timeout = self.soft_timeout * (1 / self.INTERVAL) + timeout = self.timeout + soft_timeout = self.soft_timeout k = iter(cmds) kp = next(k) r = '' - while timeout > 0: - + start = time.time() + while True: self.c.put("/control/csterm/%s" % self.s_id, self._k(kp)) rp = self.c.get("/control/csterm/%s" % self.s_id) @@ -50,13 +89,16 @@ def exec(self, cmds, clean=True): break kp = next(k, None) or "" - timeout-=1 + time.sleep(self.INTERVAL) - if timeout < soft_timeout: + elapsed = time.time() - start + if elapsed > timeout: + break + if elapsed > soft_timeout: self.c.put("/control/csterm/%s" % self.s_id, self._k('\x03')) rp = self.c.get("/control/csterm/%s" % self.s_id) r += rp['k'] - soft_timeout = 0 + soft_timeout = sys.maxsize # remove the prompt and any terminal escape sequences if clean: @@ -68,16 +110,58 @@ def exec(self, cmds, clean=True): return r + def interactive(self): + initial = self._k("") + print(f"Connecting (ctrl+d to quit)...") + self.c.put("/control/csterm/%s" % self.s_id, initial) + r = self.c.get("/control/csterm/%s" % self.s_id) + + term = Terminal() + + with term.cbreak(): + print(r['k'], end='', flush=True) + interrupt = False + while True: + kill = False + + c = "" + try: + while True: + if interrupt: + c = '\x03' + interrupt = False + break + i = term.inkey(timeout=0.01) + if '\x04' in i: + kill = True + break + if not i: + break + c += i + if kill: + break + self.c.put("/control/csterm/%s" % self.s_id, self._k(c)) + r = self.c.get("/control/csterm/%s" % self.s_id) + if r['k']: + print(r['k'], end='', flush=True) + except KeyboardInterrupt: + interrupt = True + print("\nExiting...") + def _k(self, v): r = {"k": v} if self.user: r["u"] = self.user return r + if __name__ == '__main__': - import sys from csclient import EventingCSClient c = EventingCSClient('cli_sample') ct = CSTerm(c, user="admin") - print(ct.exec(sys.argv[1:])) + + if len(sys.argv) == 1: + ct.interactive() + else: + print(ct.exec(" ".join(sys.argv[1:]))) diff --git a/cli_sample/package.ini b/cli_sample/package.ini index 21a67c95..d0f5be05 100644 --- a/cli_sample/package.ini +++ b/cli_sample/package.ini @@ -9,4 +9,4 @@ reboot = true auto_start = true version_major = 1 version_minor = 0 -version_patch = 1 \ No newline at end of file +version_patch = 3 \ No newline at end of file