Skip to content

Commit

Permalink
Enhance Python host functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
qianlifeng committed Dec 5, 2024
1 parent 1af4b7f commit a21dd8d
Show file tree
Hide file tree
Showing 18 changed files with 1,038 additions and 233 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ __pycache__/
Release/
__debug_bin*
Wox/log/
Wox.Plugin.Python/dist/
Wox.Plugin.Python/wox_plugin.egg-info/
43 changes: 36 additions & 7 deletions Wox.Plugin.Host.Python/__main__.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,51 @@
import asyncio
import sys
import uuid
import os

import logger
from host import start_websocket

if len(sys.argv) != 4:
print('Usage: python python-host.pyz <port> <logDirectory>')
print('Usage: python python-host.pyz <port> <logDirectory> <woxPid>')
sys.exit(1)

port = int(sys.argv[1])
log_directory = (sys.argv[2])
log_directory = sys.argv[2]
wox_pid = int(sys.argv[3])

trace_id = f"{uuid.uuid4()}"
trace_id = str(uuid.uuid4())
host_id = f"python-{uuid.uuid4()}"
logger.update_log_directory(log_directory)
logger.info(trace_id, "----------------------------------------")
logger.info(trace_id, f"start python host: {host_id}")
logger.info(trace_id, f"port: {port}")

asyncio.run(start_websocket(port))
def check_wox_process():
"""Check if Wox process is still alive"""
try:
os.kill(wox_pid, 0)
return True
except OSError:
return False

async def monitor_wox_process():
"""Monitor Wox process and exit if it's not alive"""
await logger.info(trace_id, "start monitor wox process")
while True:
if not check_wox_process():
await logger.error(trace_id, "wox process is not alive, exit")
sys.exit(1)
await asyncio.sleep(1)

async def main():
"""Main function"""
# Log startup information
await logger.info(trace_id, "----------------------------------------")
await logger.info(trace_id, f"start python host: {host_id}")
await logger.info(trace_id, f"port: {port}")
await logger.info(trace_id, f"wox pid: {wox_pid}")

# Start tasks
monitor_task = asyncio.create_task(monitor_wox_process())
websocket_task = asyncio.create_task(start_websocket(port))
await asyncio.gather(monitor_task, websocket_task)

asyncio.run(main())
5 changes: 5 additions & 0 deletions Wox.Plugin.Host.Python/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Constants used across the application"""

PLUGIN_JSONRPC_TYPE_REQUEST = "WOX_JSONRPC_REQUEST"
PLUGIN_JSONRPC_TYPE_RESPONSE = "WOX_JSONRPC_RESPONSE"
PLUGIN_JSONRPC_TYPE_SYSTEM_LOG = "WOX_JSONRPC_SYSTEM_LOG"
77 changes: 65 additions & 12 deletions Wox.Plugin.Host.Python/host.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,76 @@
#!/usr/bin/env python

import asyncio
import pkgutil
import json
import uuid
from typing import Dict, Any

import websockets
from loguru import logger
import logger
from wox_plugin import Context, new_context_with_value
from constants import PLUGIN_JSONRPC_TYPE_REQUEST, PLUGIN_JSONRPC_TYPE_RESPONSE
from plugin_manager import waiting_for_response
from jsonrpc import handle_request_from_wox

async def handle_message(ws: websockets.WebSocketServerProtocol, message: str):
"""Handle incoming WebSocket message"""
try:
msg_data = json.loads(message)
trace_id = msg_data.get("TraceId", str(uuid.uuid4()))
ctx = new_context_with_value("traceId", trace_id)

async def handler(websocket):
while True:
message = await websocket.recv()
logger.info(message)
# my_module = importlib.import_module('os.path')
if PLUGIN_JSONRPC_TYPE_RESPONSE in message:
# Handle response from Wox
if msg_data.get("Id") in waiting_for_response:
deferred = waiting_for_response[msg_data["Id"]]
if msg_data.get("Error"):
deferred.reject(msg_data["Error"])
else:
deferred.resolve(msg_data.get("Result"))
del waiting_for_response[msg_data["Id"]]
elif PLUGIN_JSONRPC_TYPE_REQUEST in message:
# Handle request from Wox
try:
result = await handle_request_from_wox(ctx, msg_data, ws)
response = {
"TraceId": trace_id,
"Id": msg_data["Id"],
"Method": msg_data["Method"],
"Type": PLUGIN_JSONRPC_TYPE_RESPONSE,
"Result": result
}
await ws.send(json.dumps(response))
except Exception as e:
error_response = {
"TraceId": trace_id,
"Id": msg_data["Id"],
"Method": msg_data["Method"],
"Type": PLUGIN_JSONRPC_TYPE_RESPONSE,
"Error": str(e)
}
await logger.error(trace_id, f"handle request failed: {str(e)}")
await ws.send(json.dumps(error_response))
else:
await logger.error(trace_id, f"unknown message type: {message}")
except Exception as e:
await logger.error(str(uuid.uuid4()), f"receive and handle msg error: {message}, err: {str(e)}")

async def handler(websocket: websockets.WebSocketServerProtocol):
"""WebSocket connection handler"""
logger.update_websocket(websocket)

try:
async for message in websocket:
await handle_message(websocket, message)
except websockets.exceptions.ConnectionClosed:
await logger.info(str(uuid.uuid4()), "connection closed")
except Exception as e:
await logger.error(str(uuid.uuid4()), f"connection error: {str(e)}")
finally:
logger.update_websocket(None)

async def start_websocket(websocket_port: int):
"""Start WebSocket server"""
await logger.info(str(uuid.uuid4()), "start websocket server")
async with websockets.serve(handler, "", websocket_port):
await asyncio.Future() # run forever


def load_plugin():
pkgutil.iter_modules(['plugins'])
await asyncio.Future() # run forever
204 changes: 204 additions & 0 deletions Wox.Plugin.Host.Python/jsonrpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
import json
import importlib.util
import sys
from typing import Any, Dict, Optional
import uuid
import websockets
import logger
from wox_plugin import (
Context,
Plugin,
Query,
QueryType,
Selection,
QueryEnv,
Result,
new_context_with_value,
PluginInitParams
)
from constants import PLUGIN_JSONRPC_TYPE_REQUEST, PLUGIN_JSONRPC_TYPE_RESPONSE
from plugin_manager import plugin_instances, waiting_for_response
from plugin_api import PluginAPI

async def handle_request_from_wox(ctx: Context, request: Dict[str, Any], ws: websockets.WebSocketServerProtocol) -> Any:
"""Handle incoming request from Wox"""
method = request.get("Method")
plugin_name = request.get("PluginName")

await logger.info(ctx["Values"]["traceId"], f"invoke <{plugin_name}> method: {method}")

if method == "loadPlugin":
return await load_plugin(ctx, request)
elif method == "init":
return await init_plugin(ctx, request, ws)
elif method == "query":
return await query(ctx, request)
elif method == "action":
return await action(ctx, request)
elif method == "refresh":
return await refresh(ctx, request)
elif method == "unloadPlugin":
return await unload_plugin(ctx, request)
else:
await logger.info(ctx["Values"]["traceId"], f"unknown method handler: {method}")
raise Exception(f"unknown method handler: {method}")

async def load_plugin(ctx: Context, request: Dict[str, Any]) -> None:
"""Load a plugin"""
plugin_directory = request["Params"]["PluginDirectory"]
entry = request["Params"]["Entry"]
plugin_id = request["PluginId"]
plugin_name = request["PluginName"]

try:
# Add plugin directory to Python path
if plugin_directory not in sys.path:
sys.path.append(plugin_directory)

# Import the plugin module
spec = importlib.util.spec_from_file_location("plugin", entry)
if spec is None or spec.loader is None:
raise ImportError(f"Could not load plugin from {entry}")

module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)

if not hasattr(module, "plugin"):
raise AttributeError("Plugin module does not have a 'plugin' attribute")

plugin_instances[plugin_id] = {
"module": module,
"plugin": module.plugin,
"directory": plugin_directory,
"entry": entry,
"name": plugin_name,
"api": None
}

await logger.info(ctx["Values"]["traceId"], f"<{plugin_name}> load plugin successfully")
except Exception as e:
await logger.error(ctx["Values"]["traceId"], f"<{plugin_name}> load plugin failed: {str(e)}")
raise e

async def init_plugin(ctx: Context, request: Dict[str, Any], ws: websockets.WebSocketServerProtocol) -> None:
"""Initialize a plugin"""
plugin_id = request["PluginId"]
plugin = plugin_instances.get(plugin_id)
if not plugin:
raise Exception(f"plugin not found: {request['PluginName']}, forget to load plugin?")

try:
# Create plugin API instance
api = PluginAPI(ws, plugin_id, plugin["name"])
plugin["api"] = api

# Call plugin's init method if it exists
if hasattr(plugin["plugin"], "init"):
init_params = PluginInitParams(API=api, PluginDirectory=plugin["directory"])
await plugin["plugin"].init(ctx, init_params)

await logger.info(ctx["Values"]["traceId"], f"<{plugin['name']}> init plugin successfully")
except Exception as e:
await logger.error(ctx["Values"]["traceId"], f"<{plugin['name']}> init plugin failed: {str(e)}")
raise e

async def query(ctx: Context, request: Dict[str, Any]) -> list:
"""Handle query request"""
plugin_id = request["PluginId"]
plugin = plugin_instances.get(plugin_id)
if not plugin:
raise Exception(f"plugin not found: {request['PluginName']}, forget to load plugin?")

try:
if not hasattr(plugin["plugin"], "query"):
return []

query_params = Query(
Type=QueryType(request["Params"]["Type"]),
RawQuery=request["Params"]["RawQuery"],
TriggerKeyword=request["Params"]["TriggerKeyword"],
Command=request["Params"]["Command"],
Search=request["Params"]["Search"],
Selection=Selection(**json.loads(request["Params"]["Selection"])),
Env=QueryEnv(**json.loads(request["Params"]["Env"]))
)

results = await plugin["plugin"].query(ctx, query_params)

# Ensure each result has an ID
if results:
for result in results:
if not result.Id:
result.Id = str(uuid.uuid4())
if result.Actions:
for action in result.Actions:
if not action.Id:
action.Id = str(uuid.uuid4())

return [result.__dict__ for result in results] if results else []
except Exception as e:
await logger.error(ctx["Values"]["traceId"], f"<{plugin['name']}> query failed: {str(e)}")
raise e

async def action(ctx: Context, request: Dict[str, Any]) -> Any:
"""Handle action request"""
plugin_id = request["PluginId"]
plugin = plugin_instances.get(plugin_id)
if not plugin:
raise Exception(f"plugin not found: {request['PluginName']}, forget to load plugin?")

try:
action_id = request["Params"]["ActionId"]
context_data = request["Params"].get("ContextData")

# Find the action in the plugin's results
if hasattr(plugin["plugin"], "handle_action"):
return await plugin["plugin"].handle_action(action_id, context_data)

return None
except Exception as e:
await logger.error(ctx["Values"]["traceId"], f"<{plugin['name']}> action failed: {str(e)}")
raise e

async def refresh(ctx: Context, request: Dict[str, Any]) -> Any:
"""Handle refresh request"""
plugin_id = request["PluginId"]
plugin = plugin_instances.get(plugin_id)
if not plugin:
raise Exception(f"plugin not found: {request['PluginName']}, forget to load plugin?")

try:
result_id = request["Params"]["ResultId"]

# Find the refresh callback in the plugin's results
if hasattr(plugin["plugin"], "handle_refresh"):
return await plugin["plugin"].handle_refresh(result_id)

return None
except Exception as e:
await logger.error(ctx["Values"]["traceId"], f"<{plugin['name']}> refresh failed: {str(e)}")
raise e

async def unload_plugin(ctx: Context, request: Dict[str, Any]) -> None:
"""Unload a plugin"""
plugin_id = request["PluginId"]
plugin = plugin_instances.get(plugin_id)
if not plugin:
raise Exception(f"plugin not found: {request['PluginName']}, forget to load plugin?")

try:
# Call plugin's unload method if it exists
if hasattr(plugin["plugin"], "unload"):
await plugin["plugin"].unload()

# Remove plugin from instances
del plugin_instances[plugin_id]

# Remove plugin directory from Python path
if plugin["directory"] in sys.path:
sys.path.remove(plugin["directory"])

await logger.info(ctx["Values"]["traceId"], f"<{plugin['name']}> unload plugin successfully")
except Exception as e:
await logger.error(ctx["Values"]["traceId"], f"<{plugin['name']}> unload plugin failed: {str(e)}")
raise e
Loading

0 comments on commit a21dd8d

Please sign in to comment.