Skip to content

Commit

Permalink
Switch to JSON and dump network information
Browse files Browse the repository at this point in the history
  • Loading branch information
puddly committed Aug 12, 2024
1 parent db2e486 commit f775fb5
Showing 1 changed file with 64 additions and 23 deletions.
87 changes: 64 additions & 23 deletions zigpy_cli/radio.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,16 +228,34 @@ async def energy_scan(app, num_scans):

@radio.command()
@click.pass_obj
@click.option("-n", "--num-scans", type=int, default=10 * 2**8)
@click.option("-e", "--num-energy-scans", type=int, default=10 * 2**8)
@click.option("-n", "--num-network-scans", type=int, default=5)
@click.option("-r", "--randomize", type=bool, default=True)
@click.argument("output", type=click.File("w"), default="-")
@click_coroutine
async def advanced_energy_scan(app, output, num_scans, randomize):
async def advanced_energy_scan(
app,
output,
num_energy_scans,
num_network_scans,
randomize,
):
import bellows.types
from bellows.zigbee.application import (
ControllerApplication as EzspControllerApplication,
)
from bellows.zigbee.util import map_energy_to_rssi as ezsp_map_energy_to_rssi

await app.startup()
LOGGER.info("Running scan...")

channels = zigpy.types.Channels.ALL_CHANNELS
scan_counts = {channel: num_scans for channel in channels}
scan_counts = {channel: num_energy_scans for channel in channels}

scan_data = {
"energy_scan": [],
"network_scan": [],
}

if randomize:

Expand All @@ -260,40 +278,63 @@ def iter_channels():

with click.progressbar(
iterable=iter_channels(),
length=len(list(channels)) * num_scans,
length=len(list(channels)) * num_energy_scans,
item_show_func=lambda item: None if item is None else f"Channel {item}",
) as bar:
output.write("Timestamp,Channel,Energy\n")

for channel in bar:
results = await app.energy_scan(
channels=zigpy.types.Channels.from_channel_list([channel]),
duration_exp=0,
count=1,
)

energy = results[channel]
timestamp = time.time()
output.write(f"{timestamp:0.4f},{channel},{energy:0.4f}\n")
rssi = None

import bellows.types
from bellows.zigbee.application import (
ControllerApplication as EzspControllerApplication,
)
if isinstance(app, EzspControllerApplication):
rssi = ezsp_map_energy_to_rssi(results[channel])

if not isinstance(app, EzspControllerApplication):
return
scan_data["energy_scan"].append(
{
"timestamp": time.time(),
"channel": channel,
"energy": results[channel],
"rssi": rssi,
}
)

await asyncio.sleep(1)
for channel in channels:
networks = await app._ezsp.startScan(
scanType=bellows.types.EzspNetworkScanType.ACTIVE_SCAN,
channelMask=zigpy.types.Channels.from_channel_list([channel]),
duration=6,
)

for network, lqi, rssi in networks:
print(f"Found network {network}: LQI={lqi}, RSSI={rssi}")
print(f"Scanning for networks on channel {channel}")
networks = set()

for attempt in range(num_network_scans):
networks_scan = await app._ezsp.startScan(
scanType=bellows.types.EzspNetworkScanType.ACTIVE_SCAN,
channelMask=zigpy.types.Channels.from_channel_list([channel]),
duration=6,
)
networks_scan = tuple(
[(network.freeze(), lqi, rssi) for network, lqi, rssi in networks_scan]
)
new_networks = set(networks_scan) - networks

for network, lqi, rssi in new_networks:
print(f"Found network {network}: LQI={lqi}, RSSI={rssi}")
scan_data["network_scan"].append(
{
"channel": channel,
"lqi": lqi,
"rssi": rssi,
"network": {
**network.as_dict(),
"extendedPanId": str(network.extendedPanId),
},
}
)

networks.update(new_networks)

json.dump(scan_data, output, separators=(",", ":"))


@radio.command()
Expand Down

0 comments on commit f775fb5

Please sign in to comment.