Skip to content

Commit

Permalink
Add a new command for concurrent packet capture
Browse files Browse the repository at this point in the history
  • Loading branch information
puddly committed Jan 22, 2025
1 parent e29c3dd commit 0f1fe7c
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 5 deletions.
29 changes: 29 additions & 0 deletions zigpy_cli/pcap.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
from __future__ import annotations

import datetime
import json
import logging
import sys

import click
import zigpy.types as t
from scapy.config import conf as scapy_conf
from scapy.layers.dot15d4 import Dot15d4 # NOQA: F401
from scapy.utils import PcapReader, PcapWriter

from zigpy_cli.cli import cli

from .helpers import PcapWriter as ZigpyPcapWriter

scapy_conf.dot15d4_protocol = "zigbee"

LOGGER = logging.getLogger(__name__)
Expand All @@ -29,3 +35,26 @@ def fix_fcs(input, output):
for packet in reader:
packet.fcs = None
writer.write(packet)


@pcap.command()
@click.option("-o", "--output", type=click.File("wb"), required=True)
def interleave_combine(output):
if output.name == "<stdout>":
output = sys.stdout.buffer.raw

writer = ZigpyPcapWriter(output)
writer.write_header()

while True:
line = sys.stdin.readline()
data = json.loads(line)
packet = t.CapturedPacket(
timestamp=datetime.datetime.fromisoformat(data["timestamp"]),
rssi=data["rssi"],
lqi=data["lqi"],
channel=data["channel"],
data=bytes.fromhex(data["data"]),
)

writer.write_packet(packet)
35 changes: 30 additions & 5 deletions zigpy_cli/radio.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import json
import logging
import random
import sys

import click
import zigpy.state
Expand Down Expand Up @@ -251,10 +252,19 @@ async def change_channel(app, channel):
)
@click.option("-p", "--channel-hop-period", type=float, default=5.0)
@click.option("-o", "--output", type=click.File("wb"), required=True)
@click.option("--interleave", is_flag=True, type=bool, default=False)
@click_coroutine
async def packet_capture(
app, channel_hop_randomly, channels, channel_hop_period, output
app,
channel_hop_randomly,
channels,
channel_hop_period,
output,
interleave,
):
if output.name == "<stdout>" and not interleave:
output = sys.stdout.buffer.raw

if not channel_hop_randomly:
channels_iter = itertools.cycle(channels)
else:
Expand All @@ -270,8 +280,9 @@ def channels_iter_func():

await app.connect()

writer = PcapWriter(output)
writer.write_header()
if not interleave:
writer = PcapWriter(output)
writer.write_header()

async with asyncio.TaskGroup() as tg:
channel_hopper_task = None
Expand All @@ -287,7 +298,21 @@ async def channel_hopper():
channel_hopper_task = tg.create_task(channel_hopper())

LOGGER.debug("Got a packet %s", packet)
writer.write_packet(packet)

if output.name == "<stdout>": # Surely there's a better way?
if not interleave:
writer.write_packet(packet)
else:
# To do line interleaving, encode the packets as JSON
output.write(
json.dumps(
{
"timestamp": packet.timestamp.isoformat(),
"rssi": packet.rssi,
"lqi": packet.lqi,
"channel": packet.channel,
"data": packet.data.hex(),
}
).encode("ascii")
+ b"\n"
)
output.flush()

0 comments on commit 0f1fe7c

Please sign in to comment.