From 7d8bc9e1efec3599c726541a13003246414f40b2 Mon Sep 17 00:00:00 2001 From: CSS Electronics <187732701+cssedev@users.noreply.github.com> Date: Fri, 8 Nov 2024 08:14:15 +0100 Subject: [PATCH 1/7] MF4 reader updates --- can/io/mf4.py | 401 ++++++++++++++++++++++++++------------------------ 1 file changed, 211 insertions(+), 190 deletions(-) diff --git a/can/io/mf4.py b/can/io/mf4.py index 042bf8765..cb48616db 100644 --- a/can/io/mf4.py +++ b/can/io/mf4.py @@ -10,11 +10,11 @@ from hashlib import md5 from io import BufferedIOBase, BytesIO from pathlib import Path -from typing import Any, BinaryIO, Generator, Optional, Union, cast +from typing import Any, BinaryIO, Generator, Iterable, Optional, Union, cast from ..message import Message from ..typechecking import StringPathLike -from ..util import channel2int, dlc2len, len2dlc +from ..util import channel2int, len2dlc from .generic import BinaryIOMessageReader, BinaryIOMessageWriter logger = logging.getLogger("can.io.mf4") @@ -22,10 +22,10 @@ try: import asammdf import numpy as np - from asammdf import Signal + from asammdf import Signal, Source from asammdf.blocks.mdf_v4 import MDF4 - from asammdf.blocks.v4_blocks import SourceInformation - from asammdf.blocks.v4_constants import BUS_TYPE_CAN, SOURCE_BUS + from asammdf.blocks.v4_blocks import ChannelGroup, SourceInformation + from asammdf.blocks.v4_constants import BUS_TYPE_CAN, FLAG_CG_BUS_EVENT, SOURCE_BUS from asammdf.mdf import MDF STD_DTYPE = np.dtype( @@ -270,9 +270,168 @@ class MF4Reader(BinaryIOMessageReader): """ Iterator of CAN messages from a MF4 logging file. - The MF4Reader only supports MF4 files that were recorded with python-can. + The MF4Reader only supports MF4 files with CAN bus logging. """ - + + # NOTE: Readout based on the bus logging code from asammdf GUI + + class FrameIterator(object): + """ + Iterator helper class for common handling among CAN DataFrames, ErrorFrames and RemoteFrames. + """ + + # Number of records to request for each asammdf call + _chunk_size = 1000 + + def __init__(self, mdf: MDF, group_index: int, start_timestamp: float, name: str): + self._mdf = mdf + self._group_index = group_index + self._start_timestamp = start_timestamp + self._name = name + + # Extract names + channel_group: ChannelGroup = self._mdf.groups[self._group_index] + + self._channel_names = [] + + for channel in channel_group.channels: + if channel.name.startwith(f"{self._name}."): + self._channel_names.append(channel.name) + + return + + def _get_data(self, current_offset: int) -> asammdf.Signal: + # NOTE: asammdf suggests using select instead of get. Select seem to miss converting some channels which + # get does convert as expected. + data_raw = self._mdf.get( + self._name, + self._group_index, + record_offset=current_offset, + record_count=self._chunk_size, + raw=False + ) + + return data_raw + + pass + + class CANDataFrameIterator(FrameIterator): + + def __init__(self, mdf: MDF, group_index: int, start_timestamp: float): + super().__init__(mdf, group_index, start_timestamp, "CAN_DataFrame") + + return + + def __iter__(self) -> Generator[Message, None, None]: + for current_offset in range(0, self._mdf.groups[self._group_index].channel_group.cycles_nr, self._chunk_size): + data = self._get_data(current_offset) + names = data.samples[0].dtype.names + + for i in range(len(data)): + data_length = int(data["CAN_DataFrame.DataLength"][i]) + + kv = { + "timestamp": float(data.timestamps[i]) + self._start_timestamp, + "arbitration_id": int(data["CAN_DataFrame.ID"][i]) & 0x1FFFFFFF, + "data": data["CAN_DataFrame.DataBytes"][i][:data_length].tobytes(), + } + + if "CAN_DataFrame.BusChannel" in names: + kv["channel"] = int(data["CAN_DataFrame.BusChannel"][i]) + if "CAN_DataFrame.Dir" in names: + kv["is_rx"] = int(data["CAN_DataFrame.Dir"][i]) == 0 + if "CAN_DataFrame.IDE" in names: + kv["is_extended_id"] = bool(data["CAN_DataFrame.IDE"][i]) + if "CAN_DataFrame.EDL" in names: + kv["is_fd"] = bool(data["CAN_DataFrame.EDL"][i]) + if "CAN_DataFrame.BRS" in names: + kv["bitrate_switch"] = bool(data["CAN_DataFrame.BRS"][i]) + if "CAN_DataFrame.ESI" in names: + kv["error_state_indicator"] = bool(data["CAN_DataFrame.ESI"][i]) + + yield Message(**kv) + + return None + + pass + + class CANErrorFrameIterator(FrameIterator): + + def __init__(self, mdf: MDF, group_index: int, start_timestamp: float): + super().__init__(mdf, group_index, start_timestamp, "CAN_ErrorFrame") + + return + + def __iter__(self) -> Generator[Message, None, None]: + for current_offset in range(0, self._mdf.groups[self._group_index].channel_group.cycles_nr, self._chunk_size): + data = self._get_data(current_offset) + names = data.samples[0].dtype.names + + for i in range(len(data)): + kv = { + "timestamp": float(data.timestamps[i]) + self._start_timestamp, + "is_error_frame": True, + } + + if "CAN_ErrorFrame.BusChannel" in names: + kv["channel"] = int(data["CAN_ErrorFrame.BusChannel"][i]) + if "CAN_ErrorFrame.Dir" in names: + kv["is_rx"] = int(data["CAN_ErrorFrame.Dir"][i]) == 0 + if "CAN_ErrorFrame.ID" in names: + kv["arbitration_id"] = int(data["CAN_ErrorFrame.ID"][i]) & 0x1FFFFFFF + if "CAN_ErrorFrame.IDE" in names: + kv["is_extended_id"] = bool(data["CAN_ErrorFrame.IDE"][i]) + if "CAN_ErrorFrame.EDL" in names: + kv["is_fd"] = bool(data["CAN_ErrorFrame.EDL"][i]) + if "CAN_ErrorFrame.BRS" in names: + kv["bitrate_switch"] = bool(data["CAN_ErrorFrame.BRS"][i]) + if "CAN_ErrorFrame.ESI" in names: + kv["error_state_indicator"] = bool(data["CAN_ErrorFrame.ESI"][i]) + if "CAN_ErrorFrame.RTR" in names: + kv["is_remote_frame"] = bool(data["CAN_ErrorFrame.RTR"][i]) + if "CAN_ErrorFrame.DataLength" in names and "CAN_ErrorFrame.DataBytes" in names: + data_length = int(data["CAN_ErrorFrame.DataLength"][i]) + kv["data"] = data["CAN_ErrorFrame.DataBytes"][i][:data_length].tobytes() + + yield Message(**kv) + + return None + + pass + + class CANRemoteFrameIterator(FrameIterator): + + def __init__(self, mdf: MDF, group_index: int, start_timestamp: float): + super().__init__(mdf, group_index, start_timestamp, "CAN_RemoteFrame") + + return + + def __iter__(self) -> Generator[Message, None, None]: + for current_offset in range(0, self._mdf.groups[self._group_index].channel_group.cycles_nr, self._chunk_size): + data = self._get_data(current_offset) + names = data.samples[0].dtype.names + + for i in range(len(data)): + kv = { + "timestamp": float(data.timestamps[i]) + self._start_timestamp, + "arbitration_id": int(data["CAN_RemoteFrame.ID"][i]) & 0x1FFFFFFF, + "dlc": int(data["CAN_RemoteFrame.DLC"][i]), + "is_remote_frame": True, + } + + if "CAN_RemoteFrame.BusChannel" in names: + kv["channel"] = int(data["CAN_RemoteFrame.BusChannel"][i]) + if "CAN_RemoteFrame.Dir" in names: + kv["is_rx"] = int(data["CAN_RemoteFrame.Dir"][i]) == 0 + if "CAN_RemoteFrame.IDE" in names: + kv["is_extended_id"] = bool(data["CAN_RemoteFrame.IDE"][i]) + + yield Message(**kv) + + return None + + pass + def __init__( self, file: Union[StringPathLike, BinaryIO], @@ -291,195 +450,57 @@ def __init__( super().__init__(file, mode="rb") - self._mdf: MDF4 + self._mdf: MDF if isinstance(file, BufferedIOBase): self._mdf = MDF(BytesIO(file.read())) else: self._mdf = MDF(file) - - self.start_timestamp = self._mdf.header.start_time.timestamp() - - masters = [self._mdf.get_master(i) for i in range(3)] - - masters = [ - np.core.records.fromarrays((master, np.ones(len(master)) * i)) - for i, master in enumerate(masters) - ] - - self.masters = np.sort(np.concatenate(masters)) - - def __iter__(self) -> Generator[Message, None, None]: - standard_counter = 0 - error_counter = 0 - rtr_counter = 0 - - for timestamp, group_index in self.masters: - # standard frames - if group_index == 0: - sample = self._mdf.get( - "CAN_DataFrame", - group=group_index, - raw=True, - record_offset=standard_counter, - record_count=1, - ) - - try: - channel = int(sample["CAN_DataFrame.BusChannel"][0]) - except ValueError: - channel = None - - if sample["CAN_DataFrame.EDL"] == 0: - is_extended_id = bool(sample["CAN_DataFrame.IDE"][0]) - arbitration_id = int(sample["CAN_DataFrame.ID"][0]) - is_rx = int(sample["CAN_DataFrame.Dir"][0]) == 0 - size = int(sample["CAN_DataFrame.DataLength"][0]) - dlc = int(sample["CAN_DataFrame.DLC"][0]) - data = sample["CAN_DataFrame.DataBytes"][0, :size].tobytes() - - msg = Message( - timestamp=timestamp + self.start_timestamp, - is_error_frame=False, - is_remote_frame=False, - is_fd=False, - is_extended_id=is_extended_id, - channel=channel, - is_rx=is_rx, - arbitration_id=arbitration_id, - data=data, - dlc=dlc, - ) - - else: - is_extended_id = bool(sample["CAN_DataFrame.IDE"][0]) - arbitration_id = int(sample["CAN_DataFrame.ID"][0]) - is_rx = int(sample["CAN_DataFrame.Dir"][0]) == 0 - size = int(sample["CAN_DataFrame.DataLength"][0]) - dlc = dlc2len(sample["CAN_DataFrame.DLC"][0]) - data = sample["CAN_DataFrame.DataBytes"][0, :size].tobytes() - error_state_indicator = bool(sample["CAN_DataFrame.ESI"][0]) - bitrate_switch = bool(sample["CAN_DataFrame.BRS"][0]) - - msg = Message( - timestamp=timestamp + self.start_timestamp, - is_error_frame=False, - is_remote_frame=False, - is_fd=True, - is_extended_id=is_extended_id, - channel=channel, - arbitration_id=arbitration_id, - is_rx=is_rx, - data=data, - dlc=dlc, - bitrate_switch=bitrate_switch, - error_state_indicator=error_state_indicator, - ) - - yield msg - standard_counter += 1 - - # error frames - elif group_index == 1: - sample = self._mdf.get( - "CAN_ErrorFrame", - group=group_index, - raw=True, - record_offset=error_counter, - record_count=1, - ) - - try: - channel = int(sample["CAN_ErrorFrame.BusChannel"][0]) - except ValueError: - channel = None - - if sample["CAN_ErrorFrame.EDL"] == 0: - is_extended_id = bool(sample["CAN_ErrorFrame.IDE"][0]) - arbitration_id = int(sample["CAN_ErrorFrame.ID"][0]) - is_rx = int(sample["CAN_ErrorFrame.Dir"][0]) == 0 - size = int(sample["CAN_ErrorFrame.DataLength"][0]) - dlc = int(sample["CAN_ErrorFrame.DLC"][0]) - data = sample["CAN_ErrorFrame.DataBytes"][0, :size].tobytes() - - msg = Message( - timestamp=timestamp + self.start_timestamp, - is_error_frame=True, - is_remote_frame=False, - is_fd=False, - is_extended_id=is_extended_id, - channel=channel, - arbitration_id=arbitration_id, - is_rx=is_rx, - data=data, - dlc=dlc, - ) - - else: - is_extended_id = bool(sample["CAN_ErrorFrame.IDE"][0]) - arbitration_id = int(sample["CAN_ErrorFrame.ID"][0]) - is_rx = int(sample["CAN_ErrorFrame.Dir"][0]) == 0 - size = int(sample["CAN_ErrorFrame.DataLength"][0]) - dlc = dlc2len(sample["CAN_ErrorFrame.DLC"][0]) - data = sample["CAN_ErrorFrame.DataBytes"][0, :size].tobytes() - error_state_indicator = bool(sample["CAN_ErrorFrame.ESI"][0]) - bitrate_switch = bool(sample["CAN_ErrorFrame.BRS"][0]) - - msg = Message( - timestamp=timestamp + self.start_timestamp, - is_error_frame=True, - is_remote_frame=False, - is_fd=True, - is_extended_id=is_extended_id, - channel=channel, - arbitration_id=arbitration_id, - is_rx=is_rx, - data=data, - dlc=dlc, - bitrate_switch=bitrate_switch, - error_state_indicator=error_state_indicator, - ) - - yield msg - error_counter += 1 - - # remote frames + + self._start_timestamp = self._mdf.header.start_time.timestamp() + + def __iter__(self) -> Iterable[Message]: + import heapq + + # To handle messages split over multiple channel groups, create a single iterator per channel group and merge + # these iterators into a single iterator using heapq. + iterators = [] + for group_index, group in enumerate(self._mdf.groups): + channel_group: ChannelGroup = group.channel_group + + if not channel_group.flags & FLAG_CG_BUS_EVENT: + # Not a bus event, skip + continue + + if channel_group.cycles_nr == 0: + # No data, skip + continue + + acquisition_source: Optional[Source] = channel_group.acq_source + + if acquisition_source is None: + # No source information, skip + continue + elif not acquisition_source.source_type & Source.SOURCE_BUS: + # Not a bus type (likely already covered by the channel group flag), skip + continue + + channel_names = [channel.name for channel in group.channels] + + if acquisition_source.bus_type == Source.BUS_TYPE_CAN: + if "CAN_DataFrame" in channel_names: + iterators.append(self.CANDataFrameIterator(self._mdf, group_index, self._start_timestamp)) + elif "CAN_ErrorFrame" in channel_names: + iterators.append(self.CANErrorFrameIterator(self._mdf, group_index, self._start_timestamp)) + elif "CAN_RemoteFrame" in channel_names: + iterators.append(self.CANRemoteFrameIterator(self._mdf, group_index, self._start_timestamp)) else: - sample = self._mdf.get( - "CAN_RemoteFrame", - group=group_index, - raw=True, - record_offset=rtr_counter, - record_count=1, - ) - - try: - channel = int(sample["CAN_RemoteFrame.BusChannel"][0]) - except ValueError: - channel = None - - is_extended_id = bool(sample["CAN_RemoteFrame.IDE"][0]) - arbitration_id = int(sample["CAN_RemoteFrame.ID"][0]) - is_rx = int(sample["CAN_RemoteFrame.Dir"][0]) == 0 - dlc = int(sample["CAN_RemoteFrame.DLC"][0]) - - msg = Message( - timestamp=timestamp + self.start_timestamp, - is_error_frame=False, - is_remote_frame=True, - is_fd=False, - is_extended_id=is_extended_id, - channel=channel, - arbitration_id=arbitration_id, - is_rx=is_rx, - dlc=dlc, - ) - - yield msg - - rtr_counter += 1 - - self.stop() + # Unknown bus type, skip + continue + + # Create merged iterator over all the groups, using the timestamps as comparison key + return heapq.merge(*iterators, key=lambda x: x.timestamp) def stop(self) -> None: self._mdf.close() + self._mdf = None super().stop() From 39dd1e02c75d69381428f127c73c791b35def8b6 Mon Sep 17 00:00:00 2001 From: CSS Electronics <187732701+cssedev@users.noreply.github.com> Date: Mon, 11 Nov 2024 06:44:56 +0100 Subject: [PATCH 2/7] Typo fix --- can/io/mf4.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/can/io/mf4.py b/can/io/mf4.py index cb48616db..75ba0cd8a 100644 --- a/can/io/mf4.py +++ b/can/io/mf4.py @@ -295,7 +295,7 @@ def __init__(self, mdf: MDF, group_index: int, start_timestamp: float, name: str self._channel_names = [] for channel in channel_group.channels: - if channel.name.startwith(f"{self._name}."): + if str(channel.name).startswith(f"{self._name}."): self._channel_names.append(channel.name) return From 8582f09bbb5b2056ee6868b766b443de8a740051 Mon Sep 17 00:00:00 2001 From: CSS Electronics <187732701+cssedev@users.noreply.github.com> Date: Mon, 11 Nov 2024 06:49:26 +0100 Subject: [PATCH 3/7] Reformat using black --- can/io/mf4.py | 164 +++++++++++++++++++++++++++++++------------------- 1 file changed, 101 insertions(+), 63 deletions(-) diff --git a/can/io/mf4.py b/can/io/mf4.py index 75ba0cd8a..6ebac4b1d 100644 --- a/can/io/mf4.py +++ b/can/io/mf4.py @@ -272,34 +272,36 @@ class MF4Reader(BinaryIOMessageReader): The MF4Reader only supports MF4 files with CAN bus logging. """ - + # NOTE: Readout based on the bus logging code from asammdf GUI - + class FrameIterator(object): """ Iterator helper class for common handling among CAN DataFrames, ErrorFrames and RemoteFrames. """ - + # Number of records to request for each asammdf call _chunk_size = 1000 - - def __init__(self, mdf: MDF, group_index: int, start_timestamp: float, name: str): + + def __init__( + self, mdf: MDF, group_index: int, start_timestamp: float, name: str + ): self._mdf = mdf self._group_index = group_index self._start_timestamp = start_timestamp self._name = name - + # Extract names channel_group: ChannelGroup = self._mdf.groups[self._group_index] - + self._channel_names = [] - + for channel in channel_group.channels: if str(channel.name).startswith(f"{self._name}."): self._channel_names.append(channel.name) - + return - + def _get_data(self, current_offset: int) -> asammdf.Signal: # NOTE: asammdf suggests using select instead of get. Select seem to miss converting some channels which # get does convert as expected. @@ -308,34 +310,40 @@ def _get_data(self, current_offset: int) -> asammdf.Signal: self._group_index, record_offset=current_offset, record_count=self._chunk_size, - raw=False + raw=False, ) - + return data_raw - + pass - + class CANDataFrameIterator(FrameIterator): - + def __init__(self, mdf: MDF, group_index: int, start_timestamp: float): super().__init__(mdf, group_index, start_timestamp, "CAN_DataFrame") - + return - + def __iter__(self) -> Generator[Message, None, None]: - for current_offset in range(0, self._mdf.groups[self._group_index].channel_group.cycles_nr, self._chunk_size): + for current_offset in range( + 0, + self._mdf.groups[self._group_index].channel_group.cycles_nr, + self._chunk_size, + ): data = self._get_data(current_offset) names = data.samples[0].dtype.names - + for i in range(len(data)): data_length = int(data["CAN_DataFrame.DataLength"][i]) - + kv = { "timestamp": float(data.timestamps[i]) + self._start_timestamp, "arbitration_id": int(data["CAN_DataFrame.ID"][i]) & 0x1FFFFFFF, - "data": data["CAN_DataFrame.DataBytes"][i][:data_length].tobytes(), + "data": data["CAN_DataFrame.DataBytes"][i][ + :data_length + ].tobytes(), } - + if "CAN_DataFrame.BusChannel" in names: kv["channel"] = int(data["CAN_DataFrame.BusChannel"][i]) if "CAN_DataFrame.Dir" in names: @@ -348,37 +356,43 @@ def __iter__(self) -> Generator[Message, None, None]: kv["bitrate_switch"] = bool(data["CAN_DataFrame.BRS"][i]) if "CAN_DataFrame.ESI" in names: kv["error_state_indicator"] = bool(data["CAN_DataFrame.ESI"][i]) - + yield Message(**kv) - + return None - + pass - + class CANErrorFrameIterator(FrameIterator): - + def __init__(self, mdf: MDF, group_index: int, start_timestamp: float): super().__init__(mdf, group_index, start_timestamp, "CAN_ErrorFrame") - + return - + def __iter__(self) -> Generator[Message, None, None]: - for current_offset in range(0, self._mdf.groups[self._group_index].channel_group.cycles_nr, self._chunk_size): + for current_offset in range( + 0, + self._mdf.groups[self._group_index].channel_group.cycles_nr, + self._chunk_size, + ): data = self._get_data(current_offset) names = data.samples[0].dtype.names - + for i in range(len(data)): kv = { "timestamp": float(data.timestamps[i]) + self._start_timestamp, "is_error_frame": True, } - + if "CAN_ErrorFrame.BusChannel" in names: kv["channel"] = int(data["CAN_ErrorFrame.BusChannel"][i]) if "CAN_ErrorFrame.Dir" in names: kv["is_rx"] = int(data["CAN_ErrorFrame.Dir"][i]) == 0 if "CAN_ErrorFrame.ID" in names: - kv["arbitration_id"] = int(data["CAN_ErrorFrame.ID"][i]) & 0x1FFFFFFF + kv["arbitration_id"] = ( + int(data["CAN_ErrorFrame.ID"][i]) & 0x1FFFFFFF + ) if "CAN_ErrorFrame.IDE" in names: kv["is_extended_id"] = bool(data["CAN_ErrorFrame.IDE"][i]) if "CAN_ErrorFrame.EDL" in names: @@ -386,52 +400,64 @@ def __iter__(self) -> Generator[Message, None, None]: if "CAN_ErrorFrame.BRS" in names: kv["bitrate_switch"] = bool(data["CAN_ErrorFrame.BRS"][i]) if "CAN_ErrorFrame.ESI" in names: - kv["error_state_indicator"] = bool(data["CAN_ErrorFrame.ESI"][i]) + kv["error_state_indicator"] = bool( + data["CAN_ErrorFrame.ESI"][i] + ) if "CAN_ErrorFrame.RTR" in names: kv["is_remote_frame"] = bool(data["CAN_ErrorFrame.RTR"][i]) - if "CAN_ErrorFrame.DataLength" in names and "CAN_ErrorFrame.DataBytes" in names: + if ( + "CAN_ErrorFrame.DataLength" in names + and "CAN_ErrorFrame.DataBytes" in names + ): data_length = int(data["CAN_ErrorFrame.DataLength"][i]) - kv["data"] = data["CAN_ErrorFrame.DataBytes"][i][:data_length].tobytes() - + kv["data"] = data["CAN_ErrorFrame.DataBytes"][i][ + :data_length + ].tobytes() + yield Message(**kv) - + return None - + pass - + class CANRemoteFrameIterator(FrameIterator): - + def __init__(self, mdf: MDF, group_index: int, start_timestamp: float): super().__init__(mdf, group_index, start_timestamp, "CAN_RemoteFrame") - + return - + def __iter__(self) -> Generator[Message, None, None]: - for current_offset in range(0, self._mdf.groups[self._group_index].channel_group.cycles_nr, self._chunk_size): + for current_offset in range( + 0, + self._mdf.groups[self._group_index].channel_group.cycles_nr, + self._chunk_size, + ): data = self._get_data(current_offset) names = data.samples[0].dtype.names - + for i in range(len(data)): kv = { "timestamp": float(data.timestamps[i]) + self._start_timestamp, - "arbitration_id": int(data["CAN_RemoteFrame.ID"][i]) & 0x1FFFFFFF, + "arbitration_id": int(data["CAN_RemoteFrame.ID"][i]) + & 0x1FFFFFFF, "dlc": int(data["CAN_RemoteFrame.DLC"][i]), "is_remote_frame": True, } - + if "CAN_RemoteFrame.BusChannel" in names: kv["channel"] = int(data["CAN_RemoteFrame.BusChannel"][i]) if "CAN_RemoteFrame.Dir" in names: kv["is_rx"] = int(data["CAN_RemoteFrame.Dir"][i]) == 0 if "CAN_RemoteFrame.IDE" in names: kv["is_extended_id"] = bool(data["CAN_RemoteFrame.IDE"][i]) - + yield Message(**kv) - + return None - + pass - + def __init__( self, file: Union[StringPathLike, BinaryIO], @@ -455,48 +481,60 @@ def __init__( self._mdf = MDF(BytesIO(file.read())) else: self._mdf = MDF(file) - + self._start_timestamp = self._mdf.header.start_time.timestamp() def __iter__(self) -> Iterable[Message]: import heapq - + # To handle messages split over multiple channel groups, create a single iterator per channel group and merge # these iterators into a single iterator using heapq. iterators = [] for group_index, group in enumerate(self._mdf.groups): channel_group: ChannelGroup = group.channel_group - + if not channel_group.flags & FLAG_CG_BUS_EVENT: # Not a bus event, skip continue - + if channel_group.cycles_nr == 0: # No data, skip continue - + acquisition_source: Optional[Source] = channel_group.acq_source - + if acquisition_source is None: # No source information, skip continue elif not acquisition_source.source_type & Source.SOURCE_BUS: # Not a bus type (likely already covered by the channel group flag), skip continue - + channel_names = [channel.name for channel in group.channels] - + if acquisition_source.bus_type == Source.BUS_TYPE_CAN: if "CAN_DataFrame" in channel_names: - iterators.append(self.CANDataFrameIterator(self._mdf, group_index, self._start_timestamp)) + iterators.append( + self.CANDataFrameIterator( + self._mdf, group_index, self._start_timestamp + ) + ) elif "CAN_ErrorFrame" in channel_names: - iterators.append(self.CANErrorFrameIterator(self._mdf, group_index, self._start_timestamp)) + iterators.append( + self.CANErrorFrameIterator( + self._mdf, group_index, self._start_timestamp + ) + ) elif "CAN_RemoteFrame" in channel_names: - iterators.append(self.CANRemoteFrameIterator(self._mdf, group_index, self._start_timestamp)) + iterators.append( + self.CANRemoteFrameIterator( + self._mdf, group_index, self._start_timestamp + ) + ) else: # Unknown bus type, skip continue - + # Create merged iterator over all the groups, using the timestamps as comparison key return heapq.merge(*iterators, key=lambda x: x.timestamp) From 8dd8b4fb9c620569ead26a48e2b5376c913ce8fa Mon Sep 17 00:00:00 2001 From: CSS Electronics <187732701+cssedev@users.noreply.github.com> Date: Mon, 11 Nov 2024 07:48:32 +0100 Subject: [PATCH 4/7] Cleanup for mypy --- can/io/mf4.py | 106 ++++++++++++++++++++++++++------------------------ 1 file changed, 56 insertions(+), 50 deletions(-) diff --git a/can/io/mf4.py b/can/io/mf4.py index 6ebac4b1d..36c0c436b 100644 --- a/can/io/mf4.py +++ b/can/io/mf4.py @@ -5,12 +5,13 @@ the ASAM MDF standard (see https://www.asam.net/standards/detail/mdf/) """ +import abc import logging from datetime import datetime from hashlib import md5 from io import BufferedIOBase, BytesIO from pathlib import Path -from typing import Any, BinaryIO, Generator, Iterable, Optional, Union, cast +from typing import Any, BinaryIO, Dict, Generator, Iterator, List, Optional, Union, cast from ..message import Message from ..typechecking import StringPathLike @@ -70,6 +71,8 @@ ) except ImportError: asammdf = None + MDF4 = None + Signal = None CAN_MSG_EXT = 0x80000000 @@ -266,60 +269,63 @@ def on_message_received(self, msg: Message) -> None: self._rtr_buffer = np.zeros(1, dtype=RTR_DTYPE) -class MF4Reader(BinaryIOMessageReader): +class FrameIterator(object, metaclass=abc.ABCMeta): """ - Iterator of CAN messages from a MF4 logging file. - - The MF4Reader only supports MF4 files with CAN bus logging. + Iterator helper class for common handling among CAN DataFrames, ErrorFrames and RemoteFrames. """ - # NOTE: Readout based on the bus logging code from asammdf GUI + # Number of records to request for each asammdf call + _chunk_size = 1000 - class FrameIterator(object): - """ - Iterator helper class for common handling among CAN DataFrames, ErrorFrames and RemoteFrames. - """ + def __init__(self, mdf: MDF4, group_index: int, start_timestamp: float, name: str): + self._mdf = mdf + self._group_index = group_index + self._start_timestamp = start_timestamp + self._name = name - # Number of records to request for each asammdf call - _chunk_size = 1000 + # Extract names + channel_group: ChannelGroup = self._mdf.groups[self._group_index] - def __init__( - self, mdf: MDF, group_index: int, start_timestamp: float, name: str - ): - self._mdf = mdf - self._group_index = group_index - self._start_timestamp = start_timestamp - self._name = name + self._channel_names = [] - # Extract names - channel_group: ChannelGroup = self._mdf.groups[self._group_index] + for channel in channel_group.channels: + if str(channel.name).startswith(f"{self._name}."): + self._channel_names.append(channel.name) - self._channel_names = [] + return - for channel in channel_group.channels: - if str(channel.name).startswith(f"{self._name}."): - self._channel_names.append(channel.name) + def _get_data(self, current_offset: int) -> Signal: + # NOTE: asammdf suggests using select instead of get. Select seem to miss converting some channels which + # get does convert as expected. + data_raw = self._mdf.get( + self._name, + self._group_index, + record_offset=current_offset, + record_count=self._chunk_size, + raw=False, + ) - return + return data_raw - def _get_data(self, current_offset: int) -> asammdf.Signal: - # NOTE: asammdf suggests using select instead of get. Select seem to miss converting some channels which - # get does convert as expected. - data_raw = self._mdf.get( - self._name, - self._group_index, - record_offset=current_offset, - record_count=self._chunk_size, - raw=False, - ) + @abc.abstractmethod + def __iter__(self) -> Generator[Message, None, None]: + pass - return data_raw + pass - pass + +class MF4Reader(BinaryIOMessageReader): + """ + Iterator of CAN messages from a MF4 logging file. + + The MF4Reader only supports MF4 files with CAN bus logging. + """ + + # NOTE: Readout based on the bus logging code from asammdf GUI class CANDataFrameIterator(FrameIterator): - def __init__(self, mdf: MDF, group_index: int, start_timestamp: float): + def __init__(self, mdf: MDF4, group_index: int, start_timestamp: float): super().__init__(mdf, group_index, start_timestamp, "CAN_DataFrame") return @@ -336,7 +342,7 @@ def __iter__(self) -> Generator[Message, None, None]: for i in range(len(data)): data_length = int(data["CAN_DataFrame.DataLength"][i]) - kv = { + kv: Dict[str, Any] = { "timestamp": float(data.timestamps[i]) + self._start_timestamp, "arbitration_id": int(data["CAN_DataFrame.ID"][i]) & 0x1FFFFFFF, "data": data["CAN_DataFrame.DataBytes"][i][ @@ -365,7 +371,7 @@ def __iter__(self) -> Generator[Message, None, None]: class CANErrorFrameIterator(FrameIterator): - def __init__(self, mdf: MDF, group_index: int, start_timestamp: float): + def __init__(self, mdf: MDF4, group_index: int, start_timestamp: float): super().__init__(mdf, group_index, start_timestamp, "CAN_ErrorFrame") return @@ -380,7 +386,7 @@ def __iter__(self) -> Generator[Message, None, None]: names = data.samples[0].dtype.names for i in range(len(data)): - kv = { + kv: Dict[str, Any] = { "timestamp": float(data.timestamps[i]) + self._start_timestamp, "is_error_frame": True, } @@ -422,7 +428,7 @@ def __iter__(self) -> Generator[Message, None, None]: class CANRemoteFrameIterator(FrameIterator): - def __init__(self, mdf: MDF, group_index: int, start_timestamp: float): + def __init__(self, mdf: MDF4, group_index: int, start_timestamp: float): super().__init__(mdf, group_index, start_timestamp, "CAN_RemoteFrame") return @@ -437,7 +443,7 @@ def __iter__(self) -> Generator[Message, None, None]: names = data.samples[0].dtype.names for i in range(len(data)): - kv = { + kv: Dict[str, Any] = { "timestamp": float(data.timestamps[i]) + self._start_timestamp, "arbitration_id": int(data["CAN_RemoteFrame.ID"][i]) & 0x1FFFFFFF, @@ -476,20 +482,20 @@ def __init__( super().__init__(file, mode="rb") - self._mdf: MDF + self._mdf: MDF4 if isinstance(file, BufferedIOBase): - self._mdf = MDF(BytesIO(file.read())) + self._mdf = cast(MDF4, MDF(BytesIO(file.read()))) else: - self._mdf = MDF(file) + self._mdf = cast(MDF4, MDF(file)) self._start_timestamp = self._mdf.header.start_time.timestamp() - def __iter__(self) -> Iterable[Message]: + def __iter__(self) -> Iterator[Message]: import heapq # To handle messages split over multiple channel groups, create a single iterator per channel group and merge # these iterators into a single iterator using heapq. - iterators = [] + iterators: List[FrameIterator] = [] for group_index, group in enumerate(self._mdf.groups): channel_group: ChannelGroup = group.channel_group @@ -536,7 +542,7 @@ def __iter__(self) -> Iterable[Message]: continue # Create merged iterator over all the groups, using the timestamps as comparison key - return heapq.merge(*iterators, key=lambda x: x.timestamp) + return iter(heapq.merge(*iterators, key=lambda x: x.timestamp)) def stop(self) -> None: self._mdf.close() From 640f6ceac08c37aaa9d5caf5e3b8d9f9a7bbb5f1 Mon Sep 17 00:00:00 2001 From: CSS Electronics <187732701+cssedev@users.noreply.github.com> Date: Mon, 11 Nov 2024 11:07:02 +0100 Subject: [PATCH 5/7] Fix ruff checks --- can/io/mf4.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/can/io/mf4.py b/can/io/mf4.py index 36c0c436b..d3455ea67 100644 --- a/can/io/mf4.py +++ b/can/io/mf4.py @@ -269,7 +269,7 @@ def on_message_received(self, msg: Message) -> None: self._rtr_buffer = np.zeros(1, dtype=RTR_DTYPE) -class FrameIterator(object, metaclass=abc.ABCMeta): +class FrameIterator(metaclass=abc.ABCMeta): """ Iterator helper class for common handling among CAN DataFrames, ErrorFrames and RemoteFrames. """ @@ -295,8 +295,8 @@ def __init__(self, mdf: MDF4, group_index: int, start_timestamp: float, name: st return def _get_data(self, current_offset: int) -> Signal: - # NOTE: asammdf suggests using select instead of get. Select seem to miss converting some channels which - # get does convert as expected. + # NOTE: asammdf suggests using select instead of get. Select seem to miss converting some + # channels which get does convert as expected. data_raw = self._mdf.get( self._name, self._group_index, @@ -493,8 +493,8 @@ def __init__( def __iter__(self) -> Iterator[Message]: import heapq - # To handle messages split over multiple channel groups, create a single iterator per channel group and merge - # these iterators into a single iterator using heapq. + # To handle messages split over multiple channel groups, create a single iterator per + # channel group and merge these iterators into a single iterator using heapq. iterators: List[FrameIterator] = [] for group_index, group in enumerate(self._mdf.groups): channel_group: ChannelGroup = group.channel_group From 240ad2dc8d6c9f8d239bb88febf038be80d6636f Mon Sep 17 00:00:00 2001 From: CSS Electronics <187732701+cssedev@users.noreply.github.com> Date: Mon, 11 Nov 2024 11:24:01 +0100 Subject: [PATCH 6/7] Doc fixes --- can/io/mf4.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/can/io/mf4.py b/can/io/mf4.py index d3455ea67..615468ed4 100644 --- a/can/io/mf4.py +++ b/can/io/mf4.py @@ -323,7 +323,7 @@ class MF4Reader(BinaryIOMessageReader): # NOTE: Readout based on the bus logging code from asammdf GUI - class CANDataFrameIterator(FrameIterator): + class _CANDataFrameIterator(FrameIterator): def __init__(self, mdf: MDF4, group_index: int, start_timestamp: float): super().__init__(mdf, group_index, start_timestamp, "CAN_DataFrame") @@ -369,7 +369,7 @@ def __iter__(self) -> Generator[Message, None, None]: pass - class CANErrorFrameIterator(FrameIterator): + class _CANErrorFrameIterator(FrameIterator): def __init__(self, mdf: MDF4, group_index: int, start_timestamp: float): super().__init__(mdf, group_index, start_timestamp, "CAN_ErrorFrame") @@ -426,7 +426,7 @@ def __iter__(self) -> Generator[Message, None, None]: pass - class CANRemoteFrameIterator(FrameIterator): + class _CANRemoteFrameIterator(FrameIterator): def __init__(self, mdf: MDF4, group_index: int, start_timestamp: float): super().__init__(mdf, group_index, start_timestamp, "CAN_RemoteFrame") @@ -521,19 +521,19 @@ def __iter__(self) -> Iterator[Message]: if acquisition_source.bus_type == Source.BUS_TYPE_CAN: if "CAN_DataFrame" in channel_names: iterators.append( - self.CANDataFrameIterator( + self._CANDataFrameIterator( self._mdf, group_index, self._start_timestamp ) ) elif "CAN_ErrorFrame" in channel_names: iterators.append( - self.CANErrorFrameIterator( + self._CANErrorFrameIterator( self._mdf, group_index, self._start_timestamp ) ) elif "CAN_RemoteFrame" in channel_names: iterators.append( - self.CANRemoteFrameIterator( + self._CANRemoteFrameIterator( self._mdf, group_index, self._start_timestamp ) ) From 56dd88227204eaac133849573c2a062f7d592275 Mon Sep 17 00:00:00 2001 From: CSS Electronics <187732701+cssedev@users.noreply.github.com> Date: Mon, 11 Nov 2024 12:36:50 +0100 Subject: [PATCH 7/7] Pylint fixes --- can/io/mf4.py | 27 ++------------------------- 1 file changed, 2 insertions(+), 25 deletions(-) diff --git a/can/io/mf4.py b/can/io/mf4.py index 615468ed4..4f5336b42 100644 --- a/can/io/mf4.py +++ b/can/io/mf4.py @@ -6,6 +6,7 @@ """ import abc +import heapq import logging from datetime import datetime from hashlib import md5 @@ -292,8 +293,6 @@ def __init__(self, mdf: MDF4, group_index: int, start_timestamp: float, name: st if str(channel.name).startswith(f"{self._name}."): self._channel_names.append(channel.name) - return - def _get_data(self, current_offset: int) -> Signal: # NOTE: asammdf suggests using select instead of get. Select seem to miss converting some # channels which get does convert as expected. @@ -311,8 +310,6 @@ def _get_data(self, current_offset: int) -> Signal: def __iter__(self) -> Generator[Message, None, None]: pass - pass - class MF4Reader(BinaryIOMessageReader): """ @@ -328,8 +325,6 @@ class _CANDataFrameIterator(FrameIterator): def __init__(self, mdf: MDF4, group_index: int, start_timestamp: float): super().__init__(mdf, group_index, start_timestamp, "CAN_DataFrame") - return - def __iter__(self) -> Generator[Message, None, None]: for current_offset in range( 0, @@ -365,17 +360,11 @@ def __iter__(self) -> Generator[Message, None, None]: yield Message(**kv) - return None - - pass - class _CANErrorFrameIterator(FrameIterator): def __init__(self, mdf: MDF4, group_index: int, start_timestamp: float): super().__init__(mdf, group_index, start_timestamp, "CAN_ErrorFrame") - return - def __iter__(self) -> Generator[Message, None, None]: for current_offset in range( 0, @@ -422,17 +411,11 @@ def __iter__(self) -> Generator[Message, None, None]: yield Message(**kv) - return None - - pass - class _CANRemoteFrameIterator(FrameIterator): def __init__(self, mdf: MDF4, group_index: int, start_timestamp: float): super().__init__(mdf, group_index, start_timestamp, "CAN_RemoteFrame") - return - def __iter__(self) -> Generator[Message, None, None]: for current_offset in range( 0, @@ -460,10 +443,6 @@ def __iter__(self) -> Generator[Message, None, None]: yield Message(**kv) - return None - - pass - def __init__( self, file: Union[StringPathLike, BinaryIO], @@ -491,8 +470,6 @@ def __init__( self._start_timestamp = self._mdf.header.start_time.timestamp() def __iter__(self) -> Iterator[Message]: - import heapq - # To handle messages split over multiple channel groups, create a single iterator per # channel group and merge these iterators into a single iterator using heapq. iterators: List[FrameIterator] = [] @@ -512,7 +489,7 @@ def __iter__(self) -> Iterator[Message]: if acquisition_source is None: # No source information, skip continue - elif not acquisition_source.source_type & Source.SOURCE_BUS: + if not acquisition_source.source_type & Source.SOURCE_BUS: # Not a bus type (likely already covered by the channel group flag), skip continue