-
-
Notifications
You must be signed in to change notification settings - Fork 430
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
power: add support for uhubctl devices
Signed-off-by: Eric Callahan <[email protected]>
- Loading branch information
Showing
1 changed file
with
108 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
# Raspberry Pi Power Control | ||
# Power Switch Control | ||
# | ||
# Copyright (C) 2024 Eric Callahan <[email protected]> | ||
# Copyright (C) 2020 Jordan Ruthe <[email protected]> | ||
# | ||
# This file may be distributed under the terms of the GNU GPLv3 license. | ||
|
@@ -10,6 +11,8 @@ | |
import socket | ||
import asyncio | ||
import time | ||
import re | ||
import shutil | ||
from urllib.parse import quote, urlencode | ||
from ..utils import json_wrapper as jsonw | ||
from ..common import RequestType, KlippyState | ||
|
@@ -35,6 +38,7 @@ | |
from .mqtt import MQTTClient | ||
from .http_client import HttpClient | ||
from .klippy_connection import KlippyConnection | ||
from .shell_command import ShellCommandFactory as ShellCommand | ||
|
||
class PrinterPower: | ||
def __init__(self, config: ConfigHelper) -> None: | ||
|
@@ -56,6 +60,7 @@ def __init__(self, config: ConfigHelper) -> None: | |
"smartthings": SmartThings, | ||
"hue": HueDevice, | ||
"http": GenericHTTP, | ||
"uhubctl": UHubCtl | ||
} | ||
|
||
for section in prefix_sections: | ||
|
@@ -1465,6 +1470,108 @@ async def _send_power_request(self, state: str) -> str: | |
async def _send_status_request(self) -> str: | ||
return await self._send_generic_request("status") | ||
|
||
|
||
HUB_STATE_PATTERN = r""" | ||
(?:Port\s(?P<port>[0-9]+):) | ||
(?:\s(?P<bits>[0-9a-f]{4})) | ||
(?:\s(?P<pstate>power|off)) | ||
(?P<flags>(?:\s[0-9a-z]+)+)? | ||
(?:\s\[(?P<desc>.+)\])? | ||
""" | ||
|
||
class UHubCtl(PowerDevice): | ||
_uhubctrl_regex = re.compile( | ||
r"^\s*" + HUB_STATE_PATTERN + r"\s*$", | ||
re.VERBOSE | re.IGNORECASE | ||
) | ||
def __init__(self, config: ConfigHelper) -> None: | ||
super().__init__(config) | ||
self.scmd: ShellCommand = self.server.load_component(config, "shell_command") | ||
self.location = config.get("location") | ||
self.port = config.getint("port") | ||
ret = shutil.which("uhubctl") | ||
if ret is None: | ||
raise config.error( | ||
f"[{config.get_name()}]: failed to locate 'uhubctl' binary. " | ||
"Make sure uhubctl is correctly installed on the host machine." | ||
) | ||
|
||
async def init_state(self) -> None: | ||
async with self.request_lock: | ||
await self.refresh_status() | ||
cur_state = True if self.state == "on" else False | ||
if self.initial_state is not None and cur_state != self.initial_state: | ||
await self.set_power("on" if self.initial_state else "off") | ||
|
||
async def refresh_status(self) -> None: | ||
try: | ||
result = await self._run_uhubctl("info") | ||
except self.server.error as e: | ||
self.state = "error" | ||
output = f"\n{e}" | ||
if isinstance(e, self.scmd.error): | ||
output += f"\nuhubctrl output: {e.stderr.decode(errors='ignore')}" | ||
logging.info(f"Power Device {self.name}: Refresh Error{output}") | ||
return | ||
logging.debug(f"Power Device {self.name}: uhubctl device info: {result}") | ||
self.state = result["state"] | ||
|
||
async def set_power(self, state: str) -> None: | ||
try: | ||
result = await self._run_uhubctl(state) | ||
except self.server.error as e: | ||
self.state = "error" | ||
msg = f"Power Device {self.name}: Error turning device {state}" | ||
output = f"\n{e}" | ||
if isinstance(e, self.scmd.error): | ||
output += f"\nuhubctrl output: {e.stderr.decode(errors='ignore')}" | ||
logging.info(f"{msg}{output}") | ||
raise self.server.error(msg) from None | ||
logging.debug(f"Power Device {self.name}: uhubctl device info: {result}") | ||
self.state = result["state"] | ||
|
||
async def _run_uhubctl(self, action: str) -> Dict[str, Any]: | ||
cmd = f"uhubctl -l {self.location} -p {self.port}" | ||
search_prefix = "Current status" | ||
if action in ["on", "off"]: | ||
cmd += f" -a {action}" | ||
search_prefix = "New status" | ||
resp: str = await self.scmd.exec_cmd(cmd, log_complete=False) | ||
for line in resp.splitlines(): | ||
if search_prefix: | ||
if line.startswith(search_prefix): | ||
search_prefix = "" | ||
continue | ||
match = self._uhubctrl_regex.match(line.strip()) | ||
if match is None: | ||
continue | ||
result = match.groupdict() | ||
try: | ||
port = int(result["port"]) | ||
status_bits = int(result["bits"], 16) | ||
except (TypeError, ValueError): | ||
continue | ||
if port != self.port: | ||
continue | ||
if result["pstate"] is None: | ||
continue | ||
state = "on" if result["pstate"] == "power" else "off" | ||
flags: List[str] = [] | ||
if result["flags"] is not None: | ||
flags = result["flags"].strip().split() | ||
return { | ||
"port": port, | ||
"status_bits": status_bits, | ||
"state": state, | ||
"flags": flags, | ||
"desc": result["desc"] | ||
} | ||
raise self.server.error( | ||
f"Failed to receive response for device at location {self.location}, " | ||
f"port {self.port}, " | ||
) | ||
|
||
|
||
# The power component has multiple configuration sections | ||
def load_component(config: ConfigHelper) -> PrinterPower: | ||
return PrinterPower(config) |