Skip to content

Commit

Permalink
Add PBP
Browse files Browse the repository at this point in the history
  • Loading branch information
mringwal committed Sep 9, 2024
1 parent e8804ac commit 49b2045
Show file tree
Hide file tree
Showing 10 changed files with 467 additions and 0 deletions.
1 change: 1 addition & 0 deletions autopts/ptsprojects/stack/layers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@
from .gtbs import *
from .tmap import *
from .ots import *
from .pbp import *
# GENERATOR append 1
33 changes: 33 additions & 0 deletions autopts/ptsprojects/stack/layers/pbp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#
# auto-pts - The Bluetooth PTS Automation Framework
#
# Copyright (c) 2024, BlueKitchen GmbH.
#
# This program is free software; you can redistribute it and/or modify it
# under the terms and conditions of the GNU General Public License,
# version 2, as published by the Free Software Foundation.
#
# This program is distributed in the hope it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#

from autopts.ptsprojects.stack.common import wait_for_queue_event
from autopts.pybtp import defs


class PBP:
def __init__(self):
self.event_queues = {
defs.PBP_EV_PUBLIC_BROADCAST_ANNOUNCEMENT_FOUND: [],
}

def event_received(self, event_type, event_data):
self.event_queues[event_type].append(event_data)

def wait_public_broadcast_event_found_ev(self, addr_type, addr, broadcast_name, timeout, remove=True):
return wait_for_queue_event(
self.event_queues[defs.PBP_EV_PUBLIC_BROADCAST_ANNOUNCEMENT_FOUND],
lambda ev: (addr_type, addr, broadcast_name) == (ev['addr_type'], ev['addr'], ev['broadcast_name']),
timeout, remove)
8 changes: 8 additions & 0 deletions autopts/ptsprojects/stack/stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
"TBS": 1 << defs.BTP_SERVICE_ID_TBS,
"TMAP": 1 << defs.BTP_SERVICE_ID_TMAP,
"OTS": 1 << defs.BTP_SERVICE_ID_OTS,
"PBP": 1 << defs.BTP_SERVICE_ID_PBP,
# GENERATOR append 1
}

Expand Down Expand Up @@ -87,6 +88,7 @@ def __init__(self):
self.gtbs = None
self.tmap = None
self.ots = None
self.pbp = None
# GENERATOR append 2

def is_svc_supported(self, svc):
Expand Down Expand Up @@ -186,6 +188,9 @@ def tmap_init(self):
def ots_init(self):
self.ots = OTS()

def pbp_init(self):
self.pbp = PBP()

# GENERATOR append 3

def cleanup(self):
Expand Down Expand Up @@ -267,6 +272,9 @@ def cleanup(self):
if self.ots:
self.ots_init()

if self.pbp:
self.pbp_init()

# GENERATOR append 4


Expand Down
1 change: 1 addition & 0 deletions autopts/pybtp/btp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@
from autopts.pybtp.btp.tbs import *
from autopts.pybtp.btp.tmap import *
from autopts.pybtp.btp.ots import *
from autopts.pybtp.btp.pbp import *
# GENERATOR append 1
11 changes: 11 additions & 0 deletions autopts/pybtp/btp/btp.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ def read_supp_svcs():
defs.BTP_INDEX_NONE, defs.BTP_SERVICE_ID_TMAP),
"ots_reg": (defs.BTP_SERVICE_ID_CORE, defs.CORE_REGISTER_SERVICE,
defs.BTP_INDEX_NONE, defs.BTP_SERVICE_ID_OTS),
"pbp_reg": (defs.BTP_SERVICE_ID_CORE, defs.CORE_REGISTER_SERVICE,
defs.BTP_INDEX_NONE, defs.BTP_SERVICE_ID_PBP),
# GENERATOR append 4
"read_supp_cmds": (defs.BTP_SERVICE_ID_CORE,
defs.CORE_READ_SUPPORTED_COMMANDS,
Expand Down Expand Up @@ -704,6 +706,13 @@ def core_reg_svc_ots():
iutctl.btp_socket.send_wait_rsp(*CORE['ots_reg'])


def core_reg_svc_pbp():
logging.debug("%s", core_reg_svc_pbp.__name__)

iutctl = get_iut()
iutctl.btp_socket.send_wait_rsp(*CORE['pbp_reg'])


# GENERATOR append 1

def core_reg_svc_rsp_succ():
Expand Down Expand Up @@ -805,6 +814,7 @@ def init(get_iut_method):
from .tbs import TBS_EV
from .tmap import TMAP_EV
from .ots import OTS_EV
from .pbp import PBP_EV
# GENERATOR append 2

from autopts.pybtp.iutctl_common import set_event_handler
Expand Down Expand Up @@ -844,6 +854,7 @@ def event_handler(hdr, data):
defs.BTP_SERVICE_ID_TBS: (TBS_EV, stack.tbs),
defs.BTP_SERVICE_ID_TMAP: (TMAP_EV, stack.tmap),
defs.BTP_SERVICE_ID_OTS: (OTS_EV, stack.ots),
defs.BTP_SERVICE_ID_PBP: (PBP_EV, stack.pbp),
# GENERATOR append 3
}

Expand Down
140 changes: 140 additions & 0 deletions autopts/pybtp/btp/pbp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
#
# auto-pts - The Bluetooth PTS Automation Framework
#
# Copyright (c) 2024, BlueKitchen GmbH.
#
# This program is free software; you can redistribute it and/or modify it
# under the terms and conditions of the GNU General Public License,
# version 2, as published by the Free Software Foundation.
#
# This program is distributed in the hope it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#

import binascii
import logging
import struct

from autopts.pybtp import defs
from autopts.pybtp.btp.btp import CONTROLLER_INDEX, get_iut_method as get_iut, \
btp_hdr_check
from autopts.pybtp.types import BTPError

log = logging.debug

PBP = {
'read_supported_cmds': (defs.BTP_SERVICE_ID_PBP,
defs.PBP_READ_SUPPORTED_COMMANDS,
CONTROLLER_INDEX),
'set_public_broadcast_announcement': (defs.BTP_SERVICE_ID_PBP,
defs.PBP_SET_PUBLIC_BROADCAST_ANNOUNCEMENT,
CONTROLLER_INDEX),
'set_broadcast_name': (defs.BTP_SERVICE_ID_PBP,
defs.PBP_SET_BROADCAST_NAME,
CONTROLLER_INDEX),
'broadcast_scan_start': (defs.BTP_SERVICE_ID_PBP,
defs.PBP_BROADCAST_SCAN_START,
CONTROLLER_INDEX),
'broadcast_scan_stop': (defs.BTP_SERVICE_ID_PBP,
defs.PBP_BROADCAST_SCAN_STOP,
CONTROLLER_INDEX),
}


def pbp_command_rsp_succ(timeout=20.0):
logging.debug("%s", pbp_command_rsp_succ.__name__)

iutctl = get_iut()

tuple_hdr, tuple_data = iutctl.btp_socket.read(timeout)
logging.debug("received %r %r", tuple_hdr, tuple_data)

btp_hdr_check(tuple_hdr, defs.BTP_SERVICE_ID_PBP)

return tuple_data


def pbp_set_public_broadcast_announcement(features, metadata):
logging.debug(f"{pbp_set_public_broadcast_announcement.__name__}")

data = bytearray()
metadata_len = len(metadata)
data += struct.pack('BB', features, metadata_len)

if metadata_len:
data += metadata

iutctl = get_iut()
iutctl.btp_socket.send(*PBP['set_public_broadcast_announcement'], data=data)

pbp_command_rsp_succ()


def pbp_set_broadcast_name(name):
logging.debug(f"{pbp_set_broadcast_name.__name__}")

name_data = name.encode('utf-8')
data = bytearray()
data += struct.pack('B', len(name_data))
data += name_data

iutctl = get_iut()
iutctl.btp_socket.send(*PBP['set_broadcast_name'], data=data)

pbp_command_rsp_succ()


def pbp_broadcast_scan_start():
logging.debug(f"{pbp_broadcast_scan_start.__name__}")

data = bytearray()
iutctl = get_iut()
iutctl.btp_socket.send(*PBP['broadcast_scan_start'], data=data)

pbp_command_rsp_succ()


def pbp_broadcast_scan_stop():
logging.debug(f"{pbp_broadcast_scan_stop.__name__}")

data = bytearray()
iutctl = get_iut()
iutctl.btp_socket.send(*PBP['broadcast_scan_stop'], data=data)

pbp_command_rsp_succ()


def pbp_ev_public_broadcast_announcement_found(pbp, data, data_len):
logging.debug('%s %r', pbp_ev_public_broadcast_announcement_found.__name__, data)

fmt = '<B6s3sBHBB'
fmt_len = struct.calcsize(fmt)
if len(data) < fmt_len:
raise BTPError('Invalid data length')

addr_type, addr, broadcast_id, advertiser_sid, padv_interval, pba_features, broadcast_name_len = \
struct.unpack_from(fmt, data)

addr = binascii.hexlify(addr[::-1]).lower().decode('utf-8')
broadcast_id = int.from_bytes(broadcast_id, "little")

broadcast_name = data[fmt_len:].decode("utf-8")

ev = {'addr_type': addr_type,
'addr': addr,
'broadcast_id': broadcast_id,
'advertiser_sid': advertiser_sid,
'padv_interval': padv_interval,
'pba_features': pba_features,
'broadcast_name': broadcast_name}

logging.debug(f'Public Broadcast Announcement received: {ev}')

pbp.event_received(defs.PBP_EV_PUBLIC_BROADCAST_ANNOUNCEMENT_FOUND, ev)


PBP_EV = {
defs.PBP_EV_PUBLIC_BROADCAST_ANNOUNCEMENT_FOUND: pbp_ev_public_broadcast_announcement_found,
}
8 changes: 8 additions & 0 deletions autopts/pybtp/defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def BIT(bit):
BTP_SERVICE_ID_TBS = 27
BTP_SERVICE_ID_TMAP = 28
BTP_SERVICE_ID_OTS = 29
BTP_SERVICE_ID_PBP = 30
# GENERATOR append 1

BTP_STATUS_SUCCESS = 0x00
Expand Down Expand Up @@ -883,4 +884,11 @@ def BIT(bit):
OTS_READ_SUPPORTED_COMMANDS = 0x01
OTS_REGISTER_OBJECT = 0x02

PBP_READ_SUPPORTED_COMMANDS = 0x01
PBP_SET_PUBLIC_BROADCAST_ANNOUNCEMENT = 0x02
PBP_SET_BROADCAST_NAME = 0x03
PBP_BROADCAST_SCAN_START= 0x04
PBP_BROADCAST_SCAN_STOP = 0x05
PBP_EV_PUBLIC_BROADCAST_ANNOUNCEMENT_FOUND = 0x80

# GENERATOR append 2
1 change: 1 addition & 0 deletions autopts/wid/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@
from .gtbs import gtbs_wid_hdl
from .tmap import tmap_wid_hdl
from .ots import ots_wid_hdl
from .pbp import pbp_wid_hdl
# GENERATOR append 1
Loading

0 comments on commit 49b2045

Please sign in to comment.