Skip to content

Commit

Permalink
maint
Browse files Browse the repository at this point in the history
  • Loading branch information
FilipDominec committed Feb 15, 2025
1 parent c6abc3c commit 471f812
Showing 1 changed file with 11 additions and 13 deletions.
24 changes: 11 additions & 13 deletions rp2daq.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ def _register_commands(self):

def _report_processor(self):
"""
Thread to continuously check for incoming data.
When a byte comes in, place it onto the deque.
A thread to continuously check for incoming data. When a byte comes in, place it onto the deque.
When the bytes in the deque are enough for the expected length of a report, process them into one.
"""

def queue_recv_bytes(length): # note: should re-implement with io.BytesIO() ring buffer?
Expand Down Expand Up @@ -179,30 +179,27 @@ def unpack_data_payload(data_bytes, count, bitwidth):
logging.debug(f"received packet header {report_type} {bytes(report_header_bytes)}")

# 3rd: Get the data payload (if present), and convert it into a list of ints
#if cb_kwargs.get("_data_count") and cb_kwargs["_data_count"]:
if "data_count" in self.report_header_varnames[report_type]:
cb_kwargs = dict(zip(self.report_header_varnames[report_type], report_args))
count, bitwidth = cb_kwargs["data_count"], cb_kwargs["data_bitwidth"]
payload_length = -((-count*bitwidth)//8) # int div like floor(); this makes it ceil()
payload_raw = queue_recv_bytes(payload_length)

# todo pre-cache classes for each report type...
cb_nt = self.report_namedtuple_classes[report_type](
# Use pre-cached classes for each report type
return_values = self.report_namedtuple_classes[report_type](
*report_args, unpack_data_payload(payload_raw, count, bitwidth))
else:
cb_nt = self.report_namedtuple_classes[report_type](
return_values = self.report_namedtuple_classes[report_type](
*report_args)

#cb_kwargs['_nt'] = cb_nt # XXX

# 4th: Register callback (if async), or wait (if sync)
cb = self.report_callbacks.get(report_type, False) # false for unexpected reports
if cb:
self.async_report_cb_queue.put((cb, cb_nt))
self.async_report_cb_queue.put((cb, return_values))
elif cb is None: # expected report from blocking command
self.sync_report_cb_queues[report_type].put(cb_nt) # unblock default callback (& send it data)
self.sync_report_cb_queues[report_type].put(return_values) # unblock default callback (& send it data)
elif cb is False: # unexpected report, from command that was not yet called in this script instance
logging.warning(f"Warning: Got report type that was not asked for\n\tDebug info: {cb_kwargs}")
logging.warning(f"Warning: Unexpected report type; you may want to reset the device. \n\tDebug info: {return_values}")
pass
## TODO: enqueue to be called by yet another thread (so that sync cmds work within callbacks,too)
## TODO: check if sync cmd works correctly after async cmd (of the same type)
Expand All @@ -213,12 +210,13 @@ def unpack_data_payload(data_bytes, count, bitwidth):

def _callback_dispatcher(self):
"""
A separate thread of the main process to call all callbacks.
"""
self.run_event.wait()

while self.run_event.is_set():
(cb, cb_kwargs) = self.async_report_cb_queue.get()
cb(cb_kwargs)
(cb, return_values) = self.async_report_cb_queue.get()
cb(return_values)

def default_blocking_callback(self, command_code): #
"""
Expand Down

0 comments on commit 471f812

Please sign in to comment.