Skip to content

Commit

Permalink
Merge pull request #269 from boschresearch/python_client_sub_unsub
Browse files Browse the repository at this point in the history
Subscribe/Unsubscribe for Python Client
  • Loading branch information
SebastianSchildt authored May 7, 2022
2 parents 11f30ea + a642c29 commit bf6f492
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 5 deletions.
2 changes: 2 additions & 0 deletions kuksa_viss_client/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
log_*
venv
25 changes: 24 additions & 1 deletion kuksa_viss_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,30 @@ def subscribe(self, path, callback, timeout = 5):
res = self._sendReceiveMsg(req, timeout)
resJson = json.loads(res)
if "subscriptionId" in resJson:
self.subscriptionCallbacks[resJson["subscriptionId"]] = callback;
self.subscriptionCallbacks[resJson["subscriptionId"]] = callback
return res

# Unsubscribe value changes of to a given path.
# The subscription id from the response of the corresponding subscription request will be required
def unsubscribe(self, id, timeout = 5):
req = {}
req["action"]= "unsubscribe"
req["subscriptionId"] = id

res = {}
# Check if the subscription id exists
if id in self.subscriptionCallbacks.keys():
# No matter what happens, remove the callback
del(self.subscriptionCallbacks[id])
res = self._sendReceiveMsg(req, timeout)
else:
errMsg = {}
errMsg["number"] = "404"
errMsg["message"] = "Could not unsubscribe"
errMsg["reason"] = "Subscription ID does not exist"
res["error"] = errMsg
res = json.dumps(res)

return res


Expand Down
59 changes: 55 additions & 4 deletions kuksa_viss_client/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
# SPDX-License-Identifier: EPL-2.0
########################################################################
import argparse, json, sys
from re import sub
from typing import Dict, List
import queue, time, os
from pygments import highlight, lexers, formatters
from cmd2 import Cmd, with_argparser, with_category, Cmd2ArgumentParser, CompletionItem
from cmd2.utils import CompletionError, basic_complete
import functools
import functools, subprocess
DEFAULT_SERVER_ADDR = "127.0.0.1"
DEFAULT_SERVER_PORT = 8090

Expand Down Expand Up @@ -83,6 +84,16 @@ def path_completer(self, text, line, begidx, endidx):

return basic_complete(text, line, begidx, endidx, self.pathCompletionItems)

def subscribeCallback(self, path, resp):
self.subscribeFileDesc[path].write(resp + "\n")
self.subscribeFileDesc[path].flush()

def subscriptionIdCompleter(self, text, line, begidx, endidx):
self.pathCompletionItems = []
for id in self.subscribeIdToPath.keys():
self.pathCompletionItems.append(CompletionItem(id))
return basic_complete(text, line, begidx, endidx, self.pathCompletionItems)

COMM_SETUP_COMMANDS = "Communication Set-up Commands"
VISS_COMMANDS = "Kuksa Interaction Commands"
INFO_COMMANDS = "Info Commands"
Expand All @@ -104,7 +115,14 @@ def path_completer(self, text, line, begidx, endidx):
ap_setValue.add_argument("Value", help="Value to be set")

ap_getValue = argparse.ArgumentParser()
ap_getValue.add_argument("Path", help="Path whose metadata is to be read", completer_method=path_completer)
ap_getValue.add_argument("Path", help="Path to be read", completer_method=path_completer)

ap_subscribe = argparse.ArgumentParser()
ap_subscribe.add_argument("Path", help="Path to be subscribed", completer_method=path_completer)

ap_unsubscribe = argparse.ArgumentParser()
ap_unsubscribe.add_argument("SubscribeId", help="Corresponding subscription Id", completer_method=subscriptionIdCompleter)

ap_getMetaData = argparse.ArgumentParser()
ap_getMetaData.add_argument("Path", help="Path whose metadata is to be read", completer_method=path_completer)
ap_updateMetaData = argparse.ArgumentParser()
Expand All @@ -127,6 +145,8 @@ def __init__(self):
self.serverPort = DEFAULT_SERVER_PORT
self.vssTree = {}
self.pathCompletionItems = []
self.subscribeFileDesc = {}
self.subscribeIdToPath = {}

print("Welcome to kuksa viss client version " + str(__version__))
print()
Expand All @@ -137,7 +157,6 @@ def __init__(self):
print()
self.connect()


@with_category(COMM_SETUP_COMMANDS)
@with_argparser(ap_authorize)
def do_authorize(self, args):
Expand All @@ -164,6 +183,39 @@ def do_getValue(self, args):
print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter()))
self.pathCompletionItems = []

@with_category(VISS_COMMANDS)
@with_argparser(ap_subscribe)
def do_subscribe(self, args):
"""Subscribe the value of a path"""
if self.checkConnection():
resp = self.commThread.subscribe(args.Path, lambda msg: self.subscribeCallback(args.Path, msg))
resJson = json.loads(resp)
if "subscriptionId" in resJson:
fileName = os.getcwd() + "/log_"+args.Path.replace("/", ".")+"_"+str(time.time())
self.subscribeFileDesc[args.Path] = open(fileName, "w")
self.subscribeIdToPath[resJson["subscriptionId"]] = args.Path
print("Subscription log available at " + fileName)
print("Execute tail -f " + fileName + " on another Terminal instance")
from shutil import which
if which("xterm") != None:
subprocess.Popen(["xterm", "-e", "/bin/bash", "-l", "-c", "tail -f " + fileName])
print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter()))
self.pathCompletionItems = []

@with_category(VISS_COMMANDS)
@with_argparser(ap_unsubscribe)
def do_unsubscribe(self, args):
"""Unsubscribe an existing subscription"""
if self.checkConnection():
resp = self.commThread.unsubscribe(args.SubscribeId)
print(highlight(resp, lexers.JsonLexer(), formatters.TerminalFormatter()))
if args.SubscribeId in self.subscribeIdToPath.keys():
path = self.subscribeIdToPath[args.SubscribeId]
if path in self.subscribeFileDesc:
self.subscribeFileDesc[path].close()
del(self.subscribeFileDesc[path])
del(self.subscribeIdToPath[args.SubscribeId])
self.pathCompletionItems = []

def do_quit(self, args):
if hasattr(self, "commThread"):
Expand Down Expand Up @@ -220,7 +272,6 @@ def checkConnection(self):
self.connect()
return self.commThread.wsConnected


def connect(self, insecure=False):
"""Connect to the VISS Server"""
if hasattr(self, "commThread"):
Expand Down

0 comments on commit bf6f492

Please sign in to comment.