From 1de3134b3bbf798b09f0b63d58270c997fc17760 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Mon, 29 Jul 2024 13:03:54 -0400 Subject: [PATCH] WIP: Advanced energy scan --- zigpy_cli/radio.py | 53 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/zigpy_cli/radio.py b/zigpy_cli/radio.py index 089069f..b7f1540 100644 --- a/zigpy_cli/radio.py +++ b/zigpy_cli/radio.py @@ -7,6 +7,8 @@ import itertools import json import logging +import random +import time import click import zigpy.state @@ -227,6 +229,57 @@ async def energy_scan(app, num_scans): print() +@radio.command() +@click.pass_obj +@click.option("-n", "--num-scans", type=int, default=10 * 2**8) +@click.option("-r", "--randomize", type=bool, default=True) +@click.argument("output", type=click.File("w"), default="-") +@click_coroutine +async def advanced_energy_scan(app, output, num_scans, randomize): + await app.startup() + LOGGER.info("Running scan...") + + channels = zigpy.types.Channels.ALL_CHANNELS + scan_counts = {channel: num_scans for channel in channels} + + if randomize: + + def iter_channels(): + while scan_counts: + channel = random.choice(tuple(scan_counts)) + scan_counts[channel] -= 1 + + yield channel + + if scan_counts[channel] <= 0: + del scan_counts[channel] + + else: + + def iter_channels(): + for channel, count in scan_counts.items(): + for i in range(count): + yield channel + + with click.progressbar( + iterable=iter_channels(), + length=len(list(channels)) * num_scans, + item_show_func=lambda item: None if item is None else f"Channel {item}", + ) as bar: + output.write("Timestamp,Channel,Energy\n") + + for channel in bar: + results = await app.energy_scan( + channels=zigpy.types.Channels.from_channel_list([channel]), + duration_exp=0, + count=1, + ) + + energy = results[channel] + timestamp = time.time() + output.write(f"{timestamp:0.4f},{channel},{energy:0.4f}\n") + + @radio.command() @click.pass_obj @click.option("-c", "--channel", type=int)