From ff8eee3f09e2896f35768427c08d6251633ad4fc Mon Sep 17 00:00:00 2001 From: Ben West Date: Wed, 16 Apr 2014 12:21:03 -0700 Subject: [PATCH] refactor/cleanup command line tools Now the code that creates the commandline tools is also re-usable for other tools. Also now all tools use the same framework, so they all behave more or less the same. --- bin/mm-press-key.py | 135 ++++----------- bin/mm-send-comm.py | 312 +---------------------------------- bin/mm-set-suspend.py | 126 +++----------- bin/mm-temp-basals.py | 171 +++++++------------ decocare/commands.py | 1 + decocare/helpers/messages.py | 1 + 6 files changed, 113 insertions(+), 633 deletions(-) diff --git a/bin/mm-press-key.py b/bin/mm-press-key.py index 175577c..0a2d0b7 100755 --- a/bin/mm-press-key.py +++ b/bin/mm-press-key.py @@ -1,119 +1,40 @@ #!/usr/bin/env python # PYTHON_ARGCOMPLETE_OK -import argcomplete, argparse -import sys, os -from decocare import link, stick, session, commands, lib -from pprint import pformat -import logging -log = logging.getLogger( ).getChild(__name__) +from decocare import commands -def parse_env ( ): - return { - "serial": os.environ.get('SERIAL', ''), - "port": os.environ.get('PORT', '') - } +from decocare.helpers import cli -def get_parser ( ): - conf = parse_env( ) - parser = argparse.ArgumentParser( ) - parser.add_argument('--serial', type=str, - dest='serial', - default=conf.get('serial', ''), - help="serial number of pump [default: %(default)s]") - parser.add_argument('--port', type=str, - dest='port', - default=conf.get('port', ''), - help="Path to device [default: %(default)s]") - parser.add_argument('--no-op', - dest='dryrun', - action='store_true', default=False, - help="Dry run, don't do main function" - ) - parser.add_argument('-v', '--verbose', - dest='verbose', - action='append_const', const=1, - help="Verbosity" - ) - parser.add_argument('--init', - dest='init', - action='store_true', default=False, - help="Send power ctrl to initialize RF session." - ) - parser.add_argument('commands', - nargs="+", - #dest='status', - choices=['act', 'esc', 'up', 'down', 'easy' ], - default='act', - help="buttons to press [default: %(default)s)]" - ) - argcomplete.autocomplete(parser) - return parser +class PressKeysApp (cli.CommandApp): + """%(prog)s - Simulate presses on the keypad. -command_map = { - 'easy': commands.KeypadPush.EASY, - 'esc': commands.KeypadPush.ESC, - 'act': commands.KeypadPush.ACT, - 'down': commands.KeypadPush.DOWN, - 'up': commands.KeypadPush.UP, -} - -def exec_request (pump, request): - msg = command_map.get(request) - response = pump.query(msg) - print "response: %s" % response - print "hexdump:" - print "```" - print lib.hexdump(response.data) - print "```" - print "##### decoded:\n```python\n", repr(response.getData( )), "\n```" -def main (args): - print "## press keys on a pump" - print "using", "`", args, "`" + Press keys on the keypad. + """ + def customize_parser (self, parser): + parser.add_argument('commands', + nargs="+", + choices=['act', 'esc', 'up', 'down', 'easy' ], + # default='act', + help="buttons to press [default: %(default)s)]" + ) + return parser - print "```" - uart = stick.Stick(link.Link(args.port, timeout=.400)) - print "```" - print "```" - uart.open( ) - print "```" - print "```" - pump = session.Pump(uart, args.serial) - print "```" - print "```" - stats = uart.interface_stats( ) - print "```" - print "```javascript" - print pformat(stats) - print "```" - print "```" - if args.init: - pump.power_control( ) - model = pump.read_model( ) - print "```" - print '### PUMP MODEL: `%s`' % model + def exec_request (self, pump, msg, **kwds): + msg = lookup_command(msg) + super(PressKeysApp, self).exec_request(pump, msg, **kwds) - for flow in args.commands: - print '### ', flow - if args.dryrun: - print "#### dry run, no action taken" - else: - exec_request(pump, flow) +command_map = { + 'easy': commands.PushEASY, + 'esc': commands.PushESC, + 'act': commands.PushACT, + 'down': commands.PushDOWN, + 'up': commands.PushUP +} - print "### end stats" - print "```" - stats = uart.interface_stats( ) - print "```" - print "```javascript" - print pformat(stats) - print "```" +def lookup_command (name): + return command_map.get(name) if __name__ == '__main__': - parser = get_parser( ) - args = parser.parse_args( ) - level = None - if args.verbose > 0: - level = args.verbose > 1 and logging.DEBUG or logging.INFO - logging.basicConfig(stream=sys.stdout, level=level) - main(args) + app = PressKeysApp( ) + app.run(None) diff --git a/bin/mm-send-comm.py b/bin/mm-send-comm.py index adedf29..a285ea5 100755 --- a/bin/mm-send-comm.py +++ b/bin/mm-send-comm.py @@ -1,317 +1,9 @@ #!/usr/bin/env python # PYTHON_ARGCOMPLETE_OK -import argparse -import sys, os -from decocare import link, stick, session, commands, lib, scan -from pprint import pformat -import logging -import time -log = logging.getLogger( ).getChild(__name__) - -def parse_env ( ): - return { - "serial": os.environ.get('SERIAL', ''), - "port": os.environ.get('PORT', '') - } - - -class CommandApp(object): - def __init__(self): - self.env = parse_env( ) - self.parser = self.get_parser( ) - self.autocomplete( ) - - def autocomplete (self): - try: - import argcomplete - argcomplete.autocomplete(self.parser) - except ImportError: - # no auto completion - pass - - def help (self): - return self.__class__.__doc__ - - def get_parser (self): - conf = parse_env( ) - helptext = self.help( ).split("\n") - description = '\n'.join(helptext[0:2]) - epilog = '\n'.join(helptext[2:]) - parser = argparse.ArgumentParser(description=description, epilog=epilog) - parser.add_argument('--serial', type=str, - dest='serial', - default=conf.get('serial', ''), - help="serial number of pump [default: %(default)s]") - parser.add_argument('--port', type=str, - dest='port', - default=conf.get('port', 'scan'), - help="Path to device [default: %(default)s]") - parser.add_argument('--no-op', - dest='dryrun', - action='store_true', default=False, - help="Dry run, don't do main function" - ) - parser.add_argument('--skip-prelude', - dest='no_prelude', - action='store_true', default=False, - help="Don't do the normal prelude." - ) - parser.add_argument('--no-rf-prelude', - dest='no_rf_prelude', - action='store_true', default=False, - help="Do the prelude, but don't query the pump." - ) - parser.add_argument('--skip-postlude', - dest='no_postlude', - action='store_true', default=False, - help="Don't do the normal postlude." - ) - parser.add_argument('-v', '--verbose', - dest='verbose', - action='append_const', const=1, - help="Verbosity" - ) - parser.add_argument('--init', - dest='init', - action='store_true', default=False, - help="Send power ctrl to initialize RF session." - ) - parser = self.customize_parser(parser) - return parser - - def customize_parser (self, parser): - parser.add_argument('commands', - nargs="+", - #dest='status', - choices=['act', 'esc', 'up', 'down', 'easy' ], - default='act', - help="buttons to press [default: %(default)s)]" - ) - return parser - - def configure (self): - args = self.parser.parse_args( ) - level = None - if args.verbose > 0: - level = args.verbose > 1 and logging.DEBUG or logging.INFO - logging.basicConfig(stream=sys.stdout, level=level) - self.args = args - return args - - def run (self, args): - if not args: - args = self.configure( ) - self.prelude(args) - self.main(args) - self.postlude(args) - - def prelude (self, args): - if args.no_prelude: - print "##### skipping prelude" - return - print "## do stuff with an insulin pump over RF" - print "using", "`", args, "`" - - print "```" - if args.port == 'scan' or args.port == "": - args.port = scan.scan( ) - uart = stick.Stick(link.Link(args.port, timeout=.400)) - print "```" - print "```" - uart.open( ) - print "```" - print "```" - pump = session.Pump(uart, args.serial) - print "```" - print "```" - stats = uart.interface_stats( ) - print "```" - print "```javascript" - print pformat(stats) - print "```" - self.uart = uart - self.pump = pump - self.model = None - if args.no_rf_prelude: - print "##### skipping normal RF preludes" - return - print "```" - if args.init: - pump.power_control( ) - model = pump.read_model( ) - self.model = model - print "```" - print '### PUMP MODEL: `%s`' % model - - def postlude (self, args): - if args.no_postlude: - print "##### skipping postlude" - return - print "### end stats" - print "```" - stats = self.uart.interface_stats( ) - print "```" - print "```javascript" - print pformat(stats) - print "```" - - def main (self, args): - for flow in args.commands: - print '### ', flow - if args.dryrun: - print "#### dry run, no action taken" - else: - exec_request(self.pump, flow) - -command_map = { - 'easy': commands.KeypadPush.EASY, - 'esc': commands.KeypadPush.ESC, - 'act': commands.KeypadPush.ACT, - 'down': commands.KeypadPush.DOWN, - 'up': commands.KeypadPush.UP, -} - -def exec_request (pump, msg, args={}, - dryrun=False, - save=False, - prefix='', - render_decoded=True, - render_hexdump=True): - if dryrun: - print "skipping query", pump, msg, args - return False - response = pump.query(msg, **args) - print "response: %s" % response - if render_hexdump: - print "hexdump:" - print "```python" - print response.hexdump( ) - print "```" - if render_decoded: - print "#### decoded:" - print "```python" - print repr(response.getData( )) - print "```" - if save: - response.save(prefix=prefix) - return response - -class SendMsgApp(CommandApp): - """ - %(prog)s - send messages to a compatible MM insulin pump - - This tool is intended to help discover protocol behavior. - Under no circumstance is it intended to deliver therapy. - """ - def main (self, args): - if args.prefix: - self.execute_list(args.prefix, args.saveall) - if args.command == "ManualCommand": - kwds = { - 'params': map(int, getattr(args, 'params', [ ])), - 'retries': getattr(args, 'retries', 0), - 'effectTime': getattr(args, 'effectTime'), - 'maxRecords': getattr(args, 'maxRecords'), - 'bytesPerRecord': getattr(args, 'bytesPerRecord'), - 'code': args.code, - 'name': getattr(args, 'name', "ExperimentCode%02x" % args.code), - 'descr': getattr(args, 'descr', "Experimental msg") - } - msg = commands.ManualCommand - resp = exec_request(self.pump, msg, args=kwds, - dryrun=args.dryrun, render_hexdump=False, save=args.save, prefix=args.prefix_path) - if args.command == "sleep": - time.sleep(args.timeout) - if args.command == "tweak": - Other = getattr(commands, args.other) - kwds = commands.TweakAnotherCommand.get_kwds(Other, args) - print Other, kwds - resp = exec_request(self.pump, Other, args=kwds, - dryrun=args.dryrun, save=args.save, prefix=args.prefix_path) - if args.postfix: - self.execute_list(args.postfix, args.saveall) - - def execute_list (self, messages, save=False): - for name in messages: - msg = getattr(commands, name) - print "###### sending `%s`" % getattr(msg, 'name', msg) - resp = exec_request(self.pump, msg, dryrun=self.args.dryrun, - render_hexdump=self.args.verbose>0, - save=save, prefix=self.args.prefix_path) - - def customize_parser (self, parser): - choices = commands.__all__ - parser.add_argument('--prefix-path', - dest="prefix_path", - type=str, - default="", - help="Prefix to store saved files when using --save or --saveall." - ) - parser.add_argument('--saveall', - action="store_true", - default=False, - help="Whether or not to save all responses.", - ) - - parser.add_argument('--prefix', - action="append", - choices=choices, - help="Built-in commands to run before the main one." - ) - - parser.add_argument('--postfix', - action="append", - choices=choices, - help="Built-in commands to run after the main one." - ) - - subparsers = parser.add_subparsers(help="Main thing to do between --prefix and--postfix", dest="command") - sleep_parser = subparsers.add_parser("sleep", help="Just sleep between command sets") - sleep_parser.add_argument('timeout', type=float, default=0, - help="Sleep in between running --prefix and --postfix" - ) - tweaker = subparsers.add_parser("tweak", help="Tweak a builtin command") - tweaker.add_argument('other', - choices=choices, - help="Command to tweak." - ) - commands.TweakAnotherCommand.config_argparse(tweaker) - all_parser = subparsers.add_parser("ManualCommand", help="Customize a command") - all_parser.add_argument('code', type=int, - help="The opcode to send to the pump." - ) - #__fields__ = ['maxRecords', 'code', 'descr', - # 'serial', 'bytesPerRecord', 'retries', 'params'] - all_parser.add_argument('--params', type=str, action="append", - help="parameters to format into sent message", - default=commands.ManualCommand.params - ) - all_parser.add_argument('--descr', type=str, default="Experimental command", - help="Description of command" - ) - all_parser.add_argument('--name', type=str, - help="Proposed name of command" - ) - all_parser.add_argument('--save', action="store_true", default=False, - help="Save response in a file." - ) - all_parser.add_argument('--effectTime', type=float, - help="time to sleep before responding to message, float in seconds", - default=commands.ManualCommand.effectTime - ) - all_parser.add_argument('--maxRecords', type=int, - help="number of frames in a packet composing payload response", - default=commands.ManualCommand.maxRecords - ) - all_parser.add_argument('--bytesPerRecord', type=int, - help="bytes per frame", - default=commands.ManualCommand.bytesPerRecord - ) - - return parser +from decocare.helpers import messages if __name__ == '__main__': - app = SendMsgApp( ) + app = messages.SendMsgApp( ) app.run(None) diff --git a/bin/mm-set-suspend.py b/bin/mm-set-suspend.py index 8501ad0..53ffd3a 100755 --- a/bin/mm-set-suspend.py +++ b/bin/mm-set-suspend.py @@ -1,53 +1,26 @@ #!/usr/bin/env python # PYTHON_ARGCOMPLETE_OK -import argcomplete, argparse -import sys, os -from decocare import link, stick, session, commands, lib -from pprint import pformat -import logging -log = logging.getLogger( ).getChild(__name__) -def parse_env ( ): - return { - "serial": os.environ.get('SERIAL', ''), - "port": os.environ.get('PORT', '') - } +from decocare import commands +from decocare.helpers import cli -def get_parser ( ): - conf = parse_env( ) - parser = argparse.ArgumentParser( ) - parser.add_argument('--serial', type=str, - dest='serial', - default=conf.get('serial', ''), - help="serial number of pump [default: %(default)s]") - parser.add_argument('--port', type=str, - dest='port', - default=conf.get('port', ''), - help="Path to device [default: %(default)s]") - parser.add_argument('--no-op', - dest='dryrun', - action='store_true', default=False, - help="Dry run, don't do main function" - ) - parser.add_argument('-v', '--verbose', - dest='verbose', - action='append_const', const=1, - help="Verbosity" - ) - parser.add_argument('--init', - dest='init', - action='store_true', default=False, - help="Send power ctrl to initialize RF session." - ) - parser.add_argument('commands', - nargs="+", - #dest='status', - choices=['query', 'suspend', 'resume'], - default='query', - help="Set or query pump status [default: %(default)s)]" - ) - argcomplete.autocomplete(parser) - return parser +class SetSuspendResumeApp (cli.CommandApp): + """ %(prog)s - query or set suspend/resume status + + Pause or resume pump. + """ + def customize_parser (self, parser): + parser.add_argument('commands', + nargs="+", + choices=['query', 'suspend', 'resume'], + default='query', + help="Set or query pump status [default: %(default)s)]" + ) + return parser + + def exec_request (self, pump, msg, **kwds): + msg = lookup_command(msg) + super(SetSuspendResumeApp, self).exec_request(pump, msg, **kwds) command_map = { 'query': commands.ReadPumpStatus @@ -55,63 +28,10 @@ def get_parser ( ): , 'resume': commands.PumpResume } -def exec_request (pump, request): - msg = command_map.get(request) - response = pump.query(msg) - print "response: %s" % response - print "hexdump:" - print "```" - print lib.hexdump(response.data) - print "```" - print "##### decoded:\n```python\n", repr(response.getData( )), "\n```" - -def main (args): - print "## query or set suspend/resume status" - print "hi", "`", args, "`" - - print "```" - uart = stick.Stick(link.Link(args.port, timeout=.400)) - print "```" - print "```" - uart.open( ) - print "```" - print "```" - pump = session.Pump(uart, args.serial) - print "```" - print "```" - stats = uart.interface_stats( ) - print "```" - print "```javascript" - print pformat(stats) - print "```" - print "```" - if args.init: - pump.power_control( ) - model = pump.read_model( ) - print "```" - print '### PUMP MODEL: `%s`' % model - - for flow in args.commands: - print '### ', flow - if args.dryrun: - print "#### dry run, no action taken" - else: - exec_request(pump, flow) - - print "### end stats" - print "```" - stats = uart.interface_stats( ) - print "```" - print "```javascript" - print pformat(stats) - print "```" +def lookup_command (name): + return command_map.get(name) if __name__ == '__main__': - parser = get_parser( ) - args = parser.parse_args( ) - level = None - if args.verbose > 0: - level = args.verbose > 1 and logging.DEBUG or logging.INFO - logging.basicConfig(stream=sys.stdout, level=level) - main(args) + app = SetSuspendResumeApp( ) + app.run(None) diff --git a/bin/mm-temp-basals.py b/bin/mm-temp-basals.py index 6020f04..23d86d0 100755 --- a/bin/mm-temp-basals.py +++ b/bin/mm-temp-basals.py @@ -1,46 +1,63 @@ #!/usr/bin/env python # PYTHON_ARGCOMPLETE_OK -import argcomplete, argparse -import sys, os -from decocare import link, stick, session, commands, lib -from pprint import pformat -import logging -log = logging.getLogger( ).getChild(__name__) +from decocare import commands +from decocare.helpers import cli -def parse_env ( ): - return { - "serial": os.environ.get('SERIAL', ''), - "port": os.environ.get('PORT', '') - } +class TempBasalApp (cli.CommandApp): + """ %(prog)s - query or set temp basals -def get_parser ( ): - conf = parse_env( ) - parser = argparse.ArgumentParser( ) - parser.add_argument('--serial', type=str, - dest='serial', - default=conf.get('serial', ''), - help="serial number of pump [default: %(default)s]") - parser.add_argument('--port', type=str, - dest='port', - default=conf.get('port', ''), - help="Path to device [default: %(default)s]") - parser.add_argument('-v', '--verbose', - dest='verbose', - action='append_const', const=1, - help="Verbosity" - ) - parser.add_argument('--duration', - dest='duration', - type=int, default=0, - help="Duration of temp rate [default: %(default)s)]" - ) - parser.add_argument('--rate', - dest='rate', - type=float, default=0, - help="Rate of temp basal [default: %(default)s)]" - ) - argcomplete.autocomplete(parser) - return parser + Set or query temp basals. + """ + def customize_parser (self, parser): + parser.add_argument('command', + choices=['query', 'set', ], + default='query', + help="Set or query pump status [default: %(default)s)]" + ) + parser.add_argument('--duration', + dest='duration', + type=int, default=0, + help="Duration of temp rate [default: %(default)s)]" + ) + parser.add_argument('--rate', + dest='rate', + type=float, default=0, + help="Rate of temp basal [default: %(default)s)]" + ) + """ + subparsers = parser.add_subparsers(help="Main thing to do", dest="command") + basals_parser = subparsers.add_parser("set", help="Just basals between command sets") + basals_parser.add_argument('--duration', + dest='duration', + type=int, default=0, + help="Duration of temp rate [default: %(default)s)]" + ) + basals_parser.add_argument('--rate', + dest='rate', + type=float, default=0, + help="Rate of temp basal [default: %(default)s)]" + ) + """ + return parser + def main (self, args): + self.query_temp(args) + if args.command == "query": + return + if args.command == "set": + params = format_params(args) + kwds = dict(params=params) + msg = commands.TempBasal + resp = self.exec_request(self.pump, msg, args=kwds, + dryrun=args.dryrun, render_hexdump=True ) + self.query_temp(args) + + # (serial=pump.serial, params=params) + + def query_temp (self, args): + query = commands.ReadBasalTemp + + resp = self.exec_request(self.pump, query, + dryrun=args.dryrun, render_hexdump=False) def format_params (args): duration = args.duration / 30 @@ -48,79 +65,7 @@ def format_params (args): params = [0x00, rate, duration] return params -def main (args): - print "## set a temporary basal rate" - print "hi", "`", args, "`" - print "```" - uart = stick.Stick(link.Link(args.port, timeout=.400)) - print "```" - print "```" - uart.open( ) - print "```" - print "```" - pump = session.Pump(uart, args.serial) - print "```" - print "```" - stats = uart.interface_stats( ) - print "```" - print "```javascript" - print pformat(stats) - print "```" - print "```" - model = pump.read_model( ) - print "```" - print '### PUMP MODEL: `%s`' % model - - print "### existing temp basal" - print "```" - reader = commands.ReadBasalTemp(serial=pump.serial) - pump.execute(reader) - temp_basal = reader.getData( ) - print "```" - print "```javascript" - print pformat(temp_basal) - print "```" - - print "### setting rate" - print "#### sending command" - params = format_params(args) - print "###### params: `%s`" % params - print "```" - #comm = commands.TempBasal(serial=device.serial, params=[ x ] ) - # params = [0x00, 0x01, 0x02] - comm = commands.TempBasal(serial=pump.serial, params=params) - pump.execute(comm) - page = comm.getData( ) - print "```" - print "### result" - log.info("XXX: SET TempBasal!!:\n```\n%s\n```" % lib.hexdump(page)) - - print "```" - reader = commands.ReadBasalTemp(serial=pump.serial) - pump.execute(reader) - temp_basal = reader.getData( ) - print "```" - print "###### results + params" - print "```" - print pformat(args) - print lib.hexdump(params) - print "```" - print "```javascript" - print pformat(temp_basal) - print "```" - print "```" - stats = uart.interface_stats( ) - print "```" - print "```javascript" - print pformat(stats) - print "```" - if __name__ == '__main__': - parser = get_parser( ) - args = parser.parse_args( ) - level = None - if args.verbose > 0: - level = args.verbose > 1 and logging.DEBUG or logging.INFO - logging.basicConfig(stream=sys.stdout, level=level) - main(args) + app = TempBasalApp( ) + app.run(None) diff --git a/decocare/commands.py b/decocare/commands.py index eb9d820..bf1c280 100644 --- a/decocare/commands.py +++ b/decocare/commands.py @@ -728,6 +728,7 @@ class ReadOldTraceAlarm (PumpCommand): # MMPump???/ CMD_????? 36 0x24 ('$') ?? class PumpExperimentSelfCheck_OP36 (PumpCommand): code = 36 + # MMX22/ CMD_WRITE_GLUCOSE_HISTORY_TIMESTAMP 40 0x28 ('(') ?? class WriteGlucoseHistoryTimestamp (PumpCommand): code = 40 diff --git a/decocare/helpers/messages.py b/decocare/helpers/messages.py index cce4cd4..dc5ab41 100644 --- a/decocare/helpers/messages.py +++ b/decocare/helpers/messages.py @@ -47,6 +47,7 @@ def execute_list (self, messages, save=False): def customize_parser (self, parser): choices = commands.__all__ + choices.sort( ) parser.add_argument('--prefix-path', dest="prefix_path", type=str,