Skip to content

Commit

Permalink
- first test
Browse files Browse the repository at this point in the history
  • Loading branch information
mringwal committed Sep 27, 2024
1 parent 001da31 commit 8c150e5
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 5 deletions.
18 changes: 17 additions & 1 deletion autopts/ptsprojects/btstack/gmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from autopts.ptsprojects.btstack.gmap_wid import gmap_wid_hdl
from autopts.client import get_unique_name
from autopts.pybtp.types import Addr
from autopts.utils import ResultWithFlag


def set_pixits(ptses):
Expand All @@ -40,6 +41,11 @@ def test_cases(ptses):
iut_device_name = get_unique_name(pts)
stack = get_stack()

# declare iut_addr as a kind of future
iut_addr = ResultWithFlag()
def set_addr(addr):
iut_addr.set(addr)

# Generic preconditions for all test case in the profile
pre_conditions = [
TestFunc(btp.core_reg_svc_gap),
Expand All @@ -52,8 +58,18 @@ def test_cases(ptses):
TestFunc(btp.set_pts_addr, pts_bd_addr, Addr.le_public),
TestFunc(btp.core_reg_svc_gatt),
TestFunc(stack.gatt_init),
TestFunc(btp.core_reg_svc_pacs),
TestFunc(stack.pacs_init),
TestFunc(btp.core_reg_svc_ascs),
TestFunc(stack.ascs_init),
TestFunc(btp.core_reg_svc_bap),
TestFunc(stack.bap_init),
TestFunc(btp.core_reg_svc_cap),
TestFunc(stack.cap_init),
TestFunc(btp.core_reg_svc_gmap),
TestFunc(stack.gmap_init)
TestFunc(stack.gmap_init),
# Gives a signal to the LT2 to continue its preconditions
TestFunc(lambda: set_addr(stack.gap.iut_addr_get_str())),
]

test_case_name_list = pts.get_test_case_list('GMAP')
Expand Down
183 changes: 179 additions & 4 deletions autopts/wid/gmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,194 @@
#

import logging
from autopts.pybtp.types import WIDParams
import struct
from argparse import Namespace

from autopts.pybtp import btp, defs
from autopts.ptsprojects.stack import get_stack
from autopts.pybtp.btp import lt2_addr_get, lt2_addr_type_get, pts_addr_get, pts_addr_type_get
from autopts.pybtp.defs import AUDIO_METADATA_STREAMING_AUDIO_CONTEXTS
from autopts.pybtp.types import WIDParams, AudioDir, ASCSState, CODEC_CONFIG_SETTINGS, QOS_CONFIG_SETTINGS, \
create_lc3_ltvs_bytes
from autopts.wid import generic_wid_hdl

log = logging.debug


def gmap_wid_hdl(wid, description, test_case_name):
log(f'{gmap_wid_hdl.__name__}, {wid}, {description}, {test_case_name}')
return generic_wid_hdl(wid, description, test_case_name, [__name__])


def create_default_config():
return Namespace(addr=pts_addr_get(),
addr_type=pts_addr_type_get(),
vid=0x0000,
cid=0x0000,
coding_format=0x06,
frames_per_sdu=0x01,
audio_locations=0x01,
cig_id=0x00,
cis_id=0x00,
presentation_delay=40000,
qos_config=None,
codec_set_name=None,
codec_ltvs=None,
metadata_ltvs=None,
mono=None)


# wid handlers section begin
def hdl_wid_1(params: WIDParams):
# Example WID

def hdl_wid_311(_: WIDParams):
"""
Please configure 1 SINK ASE with Config Setting: .
"""
addr = pts_addr_get()
addr_type = pts_addr_type_get()
stack = get_stack()

audio_dir = AudioDir.SINK
coding_format = 0x06
audio_location_list = [defs.PACS_AUDIO_LOCATION_FRONT_LEFT]
ase_ids = []

# frequency from test case
# presentation delay = low latency
# streaming audio context = game
# retransmission number: 1
default_config = create_default_config()
default_config.codec_set_name = '32_1'
default_config.qos_set_name = '32_1_1'
default_config.metadata_ltvs = struct.pack('<BBH', 3, AUDIO_METADATA_STREAMING_AUDIO_CONTEXTS, 0x0008)
default_config.presentation_delay = 10000

(default_config.sampling_freq,
default_config.frame_duration,
default_config.octets_per_frame) = CODEC_CONFIG_SETTINGS[default_config.codec_set_name]

(default_config.sdu_interval,
default_config.framing,
default_config.max_sdu_size,
default_config.retransmission_number,
default_config.max_transport_latency) = QOS_CONFIG_SETTINGS[default_config.qos_set_name]

default_config.retransmission_number = 1

ases= []
cig_id = 0x00
cis_id = 0x00

for audio_location in audio_location_list:
# Find ID of the ASE
ev = stack.bap.wait_ase_found_ev(addr_type, addr, audio_dir, 30, remove=True)
if ev is None:
return False

_, _, _, ase_id = ev

logging.debug(f"Using ASE_ID: {ase_id} for Location: {audio_location}")

# 24_1
config = Namespace(**vars(default_config))
config.audio_dir = audio_dir
audio_locations = audio_location

# Perform Codec Config operation
config.ase_id = ase_id
config.cig_id = cig_id
config.cis_id = cis_id
config.codec_ltvs = create_lc3_ltvs_bytes(config.sampling_freq, config.frame_duration,
audio_locations, config.octets_per_frame,
config.frames_per_sdu)

btp.cap_unicast_setup_ase(config, config.addr_type, config.addr)
stack.bap.ase_configs.append(config)

ases.append(config)

cis_id += 1

btp.cap_unicast_audio_start(cig_id, defs.CAP_UNICAST_AUDIO_START_SET_TYPE_AD_HOC)
ev = stack.cap.wait_unicast_start_completed_ev(cig_id, 10)
if ev is None:
return False

# We could wait for this, but Zephyr controller has issue with the second CIS,
# so PTS does not send Streaming notification.
for config in ases:
# Wait for the ASE states to be changed to streaming
ev = stack.ascs.wait_ascs_ase_state_changed_ev(config.addr_type,
config.addr,
config.ase_id,
ASCSState.STREAMING,
20)
if ev is None:
log('hdl_wid_311 exit, not streaming')
return False

return True


def hdl_wid_364(params: WIDParams):
"""
After processed audio stream data, please click OK.
"""

return True


def hdl_wid_558(params: WIDParams):
"""
Please click ok if the IUT is in streaming state.
"""

# ToDo: verify streaming state

return True



def hdl_wid_20100(params: WIDParams):
"""
Please initiate a GATT connection to the PTS.
Description: Verify that the Implementation Under Test (IUT) can initiate a GATT connect request to the PTS.
"""
if params.test_case_name.endswith('LT2'):
addr = lt2_addr_get()
addr_type = lt2_addr_type_get()
else:
addr = pts_addr_get()
addr_type = pts_addr_type_get()

stack = get_stack()
btp.gap_conn(addr, addr_type)
stack.gap.wait_for_connection(timeout=10, addr=addr)
stack.gap.gap_wait_for_sec_lvl_change(level=2, timeout=30, addr=addr)

return True


def hdl_wid_20106(params: WIDParams):
"""
Please write to Client Characteristic Configuration Descriptor of ASE
Control Point characteristic to enable notification.
"""
if params.test_case_name.endswith('LT2'):
addr = lt2_addr_get()
addr_type = lt2_addr_type_get()
else:
addr = pts_addr_get()
addr_type = pts_addr_type_get()

stack = get_stack()
peer = stack.bap.get_peer(addr_type, addr)
if peer.discovery_completed:
log('Skip BAP discovery, discovery completed before')

# Skip if discovery has been done already
return True

btp.bap_discover(addr_type, addr)
stack.bap.wait_discovery_completed_ev(addr_type, addr, 30)

return True

0 comments on commit 8c150e5

Please sign in to comment.