-
Notifications
You must be signed in to change notification settings - Fork 3
feat: add download and download_file methods #5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
dab255f
321b103
f6cc44e
75909db
4016c5f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,3 +22,16 @@ async def upload_file( | |
async def upload(transport: Transport, name: str, content: bytes) -> ResponseMessage: | ||
params = UploadParams(name=name, binary=base64.b64encode(content).decode()) | ||
return await transport.request("file:upload", params.model_dump()) | ||
|
||
|
||
async def download(transport: Transport, name: str) -> ResponseMessage: | ||
return await transport.request("file:download", {"name": name}) | ||
|
||
|
||
async def download_file(transport: Transport, name: str, local_path: Optional[Path] = None) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. WDYT about removing The idea is that most users won't use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have found a way to create the wrapper based on an async client without duplicating the code. So I have moved the download_file logic to the API client. |
||
if local_path is None: | ||
local_path = Path(name) | ||
|
||
result = await download(transport, name) | ||
with open(local_path, "wb") as f: | ||
f.write(base64.b64decode(result["result"]["binary"])) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
"""Framebuffer command helpers. | ||
|
||
Provides utilities to interact with devices exposing a framebuffer (e.g. LCD | ||
modules) via the `framebuffer:read` command. | ||
|
||
Exposed helpers: | ||
* framebuffer_read -> raw response (contains base64 PNG at result.png) | ||
* framebuffer_png_bytes -> decoded PNG bytes | ||
* save_framebuffer_png -> save PNG to disk | ||
* compare_framebuffer_png -> compare current framebuffer against reference | ||
""" | ||
|
||
# SPDX-FileCopyrightText: 2025-present CodeMagic LTD | ||
# | ||
# SPDX-License-Identifier: MIT | ||
|
||
from __future__ import annotations | ||
|
||
import base64 | ||
from pathlib import Path | ||
|
||
from .exceptions import WokwiError | ||
from .protocol_types import ResponseMessage | ||
from .transport import Transport | ||
|
||
__all__ = [ | ||
"framebuffer_read", | ||
"framebuffer_png_bytes", | ||
"save_framebuffer_png", | ||
"compare_framebuffer_png", | ||
] | ||
|
||
|
||
async def framebuffer_read(transport: Transport, *, id: str) -> ResponseMessage: | ||
"""Issue `framebuffer:read` for the given device id and return raw response.""" | ||
return await transport.request("framebuffer:read", {"id": id}) | ||
|
||
|
||
def _extract_png_b64(resp: ResponseMessage) -> str: | ||
result = resp.get("result", {}) | ||
png_b64 = result.get("png") | ||
if not isinstance(png_b64, str): # pragma: no cover - defensive | ||
raise WokwiError("Malformed framebuffer:read response: missing 'png' base64 string") | ||
return png_b64 | ||
|
||
|
||
async def framebuffer_png_bytes(transport: Transport, *, id: str) -> bytes: | ||
"""Return decoded PNG bytes for the framebuffer of device `id`.""" | ||
resp = await framebuffer_read(transport, id=id) | ||
return base64.b64decode(_extract_png_b64(resp)) | ||
|
||
|
||
async def save_framebuffer_png( | ||
transport: Transport, *, id: str, path: Path, overwrite: bool = True | ||
) -> Path: | ||
"""Save the framebuffer PNG to `path` and return the path. | ||
|
||
Args: | ||
transport: Active transport. | ||
id: Device id (e.g. "lcd1"). | ||
path: Destination file path. | ||
overwrite: Overwrite existing file (default True). If False and file | ||
exists, raises WokwiError. | ||
""" | ||
if path.exists() and not overwrite: | ||
raise WokwiError(f"File already exists and overwrite=False: {path}") | ||
data = await framebuffer_png_bytes(transport, id=id) | ||
path.parent.mkdir(parents=True, exist_ok=True) | ||
with open(path, "wb") as f: | ||
f.write(data) | ||
return path | ||
|
||
|
||
async def compare_framebuffer_png( | ||
transport: Transport, *, id: str, reference: Path, save_mismatch: Path | None = None | ||
) -> bool: | ||
"""Compare the current framebuffer PNG with a reference file. | ||
|
||
Performs a byte-for-byte comparison. If different and `save_mismatch` is | ||
provided, writes the current framebuffer PNG there. | ||
|
||
Returns True if identical, False otherwise. | ||
""" | ||
if not reference.exists(): | ||
raise WokwiError(f"Reference image does not exist: {reference}") | ||
current = await framebuffer_png_bytes(transport, id=id) | ||
ref_bytes = reference.read_bytes() | ||
if current == ref_bytes: | ||
return True | ||
if save_mismatch: | ||
save_mismatch.parent.mkdir(parents=True, exist_ok=True) | ||
save_mismatch.write_bytes(current) | ||
return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just returned the bytes directly, as returning ResponseMessage isn't very useful for users of this API (they either want to bytes or to save the file, they shouldn't care about the format of the repsonse / or the base64 encoded data)