From f775fb5c7da6a4bc194f264777fcd1852e698906 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Sun, 11 Aug 2024 21:39:23 -0400 Subject: [PATCH] Switch to JSON and dump network information --- zigpy_cli/radio.py | 87 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 64 insertions(+), 23 deletions(-) diff --git a/zigpy_cli/radio.py b/zigpy_cli/radio.py index 6de4990..acd3005 100644 --- a/zigpy_cli/radio.py +++ b/zigpy_cli/radio.py @@ -228,16 +228,34 @@ async def energy_scan(app, num_scans): @radio.command() @click.pass_obj -@click.option("-n", "--num-scans", type=int, default=10 * 2**8) +@click.option("-e", "--num-energy-scans", type=int, default=10 * 2**8) +@click.option("-n", "--num-network-scans", type=int, default=5) @click.option("-r", "--randomize", type=bool, default=True) @click.argument("output", type=click.File("w"), default="-") @click_coroutine -async def advanced_energy_scan(app, output, num_scans, randomize): +async def advanced_energy_scan( + app, + output, + num_energy_scans, + num_network_scans, + randomize, +): + import bellows.types + from bellows.zigbee.application import ( + ControllerApplication as EzspControllerApplication, + ) + from bellows.zigbee.util import map_energy_to_rssi as ezsp_map_energy_to_rssi + await app.startup() LOGGER.info("Running scan...") channels = zigpy.types.Channels.ALL_CHANNELS - scan_counts = {channel: num_scans for channel in channels} + scan_counts = {channel: num_energy_scans for channel in channels} + + scan_data = { + "energy_scan": [], + "network_scan": [], + } if randomize: @@ -260,11 +278,9 @@ def iter_channels(): with click.progressbar( iterable=iter_channels(), - length=len(list(channels)) * num_scans, + length=len(list(channels)) * num_energy_scans, item_show_func=lambda item: None if item is None else f"Channel {item}", ) as bar: - output.write("Timestamp,Channel,Energy\n") - for channel in bar: results = await app.energy_scan( channels=zigpy.types.Channels.from_channel_list([channel]), @@ -272,28 +288,53 @@ def iter_channels(): count=1, ) - energy = results[channel] - timestamp = time.time() - output.write(f"{timestamp:0.4f},{channel},{energy:0.4f}\n") + rssi = None - import bellows.types - from bellows.zigbee.application import ( - ControllerApplication as EzspControllerApplication, - ) + if isinstance(app, EzspControllerApplication): + rssi = ezsp_map_energy_to_rssi(results[channel]) - if not isinstance(app, EzspControllerApplication): - return + scan_data["energy_scan"].append( + { + "timestamp": time.time(), + "channel": channel, + "energy": results[channel], + "rssi": rssi, + } + ) await asyncio.sleep(1) for channel in channels: - networks = await app._ezsp.startScan( - scanType=bellows.types.EzspNetworkScanType.ACTIVE_SCAN, - channelMask=zigpy.types.Channels.from_channel_list([channel]), - duration=6, - ) - - for network, lqi, rssi in networks: - print(f"Found network {network}: LQI={lqi}, RSSI={rssi}") + print(f"Scanning for networks on channel {channel}") + networks = set() + + for attempt in range(num_network_scans): + networks_scan = await app._ezsp.startScan( + scanType=bellows.types.EzspNetworkScanType.ACTIVE_SCAN, + channelMask=zigpy.types.Channels.from_channel_list([channel]), + duration=6, + ) + networks_scan = tuple( + [(network.freeze(), lqi, rssi) for network, lqi, rssi in networks_scan] + ) + new_networks = set(networks_scan) - networks + + for network, lqi, rssi in new_networks: + print(f"Found network {network}: LQI={lqi}, RSSI={rssi}") + scan_data["network_scan"].append( + { + "channel": channel, + "lqi": lqi, + "rssi": rssi, + "network": { + **network.as_dict(), + "extendedPanId": str(network.extendedPanId), + }, + } + ) + + networks.update(new_networks) + + json.dump(scan_data, output, separators=(",", ":")) @radio.command()