Skip to content

Commit 9fbb35e

Browse files
feat: add pin control and reading functionalities (#1)
* feat: add pin control and reading functionalities * docs: update features list to include control of peripherals and GPIO pins * feat: enhance serial monitor functionality with write support and update documentation
1 parent 4f60bea commit 9fbb35e

File tree

6 files changed

+139
-4
lines changed

6 files changed

+139
-4
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ The basic example is in the [examples/hello_esp32/main.py](examples/hello_esp32/
3232
- Connect to the Wokwi Simulator
3333
- Upload a diagram and firmware files
3434
- Start a simulation
35-
- Monitor the serial output
35+
- Monitor the serial output and write to them
36+
- Read GPIO pins
37+
- Control peripherals (buttons, MPU6050, etc.)
3638

3739
You can run the example with:
3840

docs/index.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ Typed, asyncio-friendly Python SDK for the **Wokwi Simulation API**.
77
- Connect to the Wokwi Simulator from Python
88
- Upload diagrams and firmware files
99
- Start, pause, resume, and restart simulations
10-
- Monitor serial output asynchronously
10+
- Monitor serial output asynchronously and write to them
11+
- Control peripherals and read GPIO pins
1112
- Fully type-annotated and easy to use with asyncio
1213

1314
## Installation

src/wokwi_client/client.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@
77

88
from .__version__ import get_version
99
from .constants import DEFAULT_WS_URL
10+
from .control import set_control
1011
from .event_queue import EventQueue
1112
from .file_ops import upload, upload_file
13+
from .pins import pin_listen, pin_read
1214
from .protocol_types import EventMessage, ResponseMessage
13-
from .serial import monitor_lines
15+
from .serial import monitor_lines, write_serial
1416
from .simulation import pause, restart, resume, start
1517
from .transport import Transport
1618

@@ -191,5 +193,41 @@ async def serial_monitor_cat(self, decode_utf8: bool = True, errors: str = "repl
191193
else:
192194
print(line, end="", flush=True)
193195

196+
async def serial_write(self, data: bytes | str | list[int]) -> None:
197+
"""Write data to the simulation serial monitor interface."""
198+
await write_serial(self._transport, data)
199+
194200
def _on_pause(self, event: EventMessage) -> None:
195201
self.last_pause_nanos = int(event["nanos"])
202+
203+
async def read_pin(self, part: str, pin: str) -> ResponseMessage:
204+
"""Read the current state of a pin.
205+
206+
Args:
207+
part: The part id (e.g. "uno").
208+
pin: The pin name (e.g. "A2").
209+
"""
210+
return await pin_read(self._transport, part=part, pin=pin)
211+
212+
async def listen_pin(self, part: str, pin: str, listen: bool = True) -> ResponseMessage:
213+
"""Start or stop listening for changes on a pin.
214+
215+
When enabled, "pin:change" events will be delivered via the transport's
216+
event mechanism.
217+
218+
Args:
219+
part: The part id.
220+
pin: The pin name.
221+
listen: True to start listening, False to stop.
222+
"""
223+
return await pin_listen(self._transport, part=part, pin=pin, listen=listen)
224+
225+
async def set_control(self, part: str, control: str, value: int | bool | float) -> ResponseMessage:
226+
"""Set a control value (e.g. simulate button press).
227+
228+
Args:
229+
part: Part id (e.g. "btn1").
230+
control: Control name (e.g. "pressed").
231+
value: Control value to set (float).
232+
"""
233+
return await set_control(self._transport, part=part, control=control, value=value)

src/wokwi_client/control.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
"""Control command helpers for virtual parts.
2+
3+
Provides `set_control` to manipulate part controls (e.g. press a button).
4+
5+
Assumptions:
6+
* Underlying websocket command name: "control:set".
7+
* Parameter names expected by server: part, control, value.
8+
"""
9+
10+
# SPDX-FileCopyrightText: 2025-present CodeMagic LTD
11+
#
12+
# SPDX-License-Identifier: MIT
13+
14+
from .protocol_types import ResponseMessage
15+
from .transport import Transport
16+
17+
18+
async def set_control(
19+
transport: Transport, *, part: str, control: str, value: int | bool | float
20+
) -> ResponseMessage:
21+
"""Set a control value on a part (e.g. simulate button press/release).
22+
23+
Args:
24+
transport: Active Transport.
25+
part: Part identifier (e.g. "btn1").
26+
control: Control name (e.g. "pressed").
27+
value: Control value to set (float).
28+
"""
29+
return await transport.request(
30+
"control:set", {"part": part, "control": control, "value": float(value)}
31+
)

src/wokwi_client/pins.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
"""Pin command helpers for the Wokwi Simulation API.
2+
3+
This module exposes helper coroutines for issuing pin-related commands:
4+
5+
* pin:read - Read the current state of a pin.
6+
* pin:listen - Start/stop listening for changes on a pin (emits pin:change
7+
events).
8+
"""
9+
10+
# SPDX-FileCopyrightText: 2025-present CodeMagic LTD
11+
#
12+
# SPDX-License-Identifier: MIT
13+
14+
from .protocol_types import ResponseMessage
15+
from .transport import Transport
16+
17+
18+
async def pin_read(
19+
transport: Transport, *, part: str, pin: str
20+
) -> ResponseMessage:
21+
"""Read the state of a pin.
22+
23+
Args:
24+
transport: The active Transport instance.
25+
part: Part identifier (e.g. "uno").
26+
pin: Pin name (e.g. "A2", "13").
27+
"""
28+
29+
return await transport.request("pin:read", {"part": part, "pin": pin})
30+
31+
32+
async def pin_listen(
33+
transport: Transport, *, part: str, pin: str, listen: bool = True
34+
) -> ResponseMessage:
35+
"""Enable or disable listening for changes on a pin.
36+
37+
When listening is enabled, "pin:change" events will be emitted with the
38+
pin state.
39+
40+
Args:
41+
transport: The active Transport instance.
42+
part: Part identifier.
43+
pin: Pin name.
44+
listen: True to start listening, False to stop.
45+
"""
46+
47+
return await transport.request(
48+
"pin:listen", {"part": part, "pin": pin, "listen": listen}
49+
)

src/wokwi_client/serial.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#
33
# SPDX-License-Identifier: MIT
44

5-
from collections.abc import AsyncGenerator
5+
from collections.abc import AsyncGenerator, Iterable
66

77
from .event_queue import EventQueue
88
from .transport import Transport
@@ -14,3 +14,17 @@ async def monitor_lines(transport: Transport) -> AsyncGenerator[bytes, None]:
1414
while True:
1515
event_msg = await queue.get()
1616
yield bytes(event_msg["payload"]["bytes"])
17+
18+
19+
async def write_serial(transport: Transport, data: bytes | str | Iterable[int]) -> None:
20+
"""Write data to the serial monitor.
21+
22+
Accepts bytes, str (encoded as utf-8), or an iterable of integer byte values.
23+
"""
24+
if isinstance(data, str):
25+
payload = list(data.encode("utf-8"))
26+
elif isinstance(data, bytes):
27+
payload = list(data)
28+
else:
29+
payload = list(int(b) & 0xFF for b in data)
30+
await transport.request("serial-monitor:write", {"bytes": payload})

0 commit comments

Comments
 (0)