diff --git a/CHANGELOG.MD b/CHANGELOG.MD index f13fd1f..b84dc51 100644 --- a/CHANGELOG.MD +++ b/CHANGELOG.MD @@ -4,6 +4,18 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.7.0] +### Changed + - UnionBlock failing to flush if u_node size is defined by FieldType + - BytesBuffer failing to seek to offset 0(start of buffer) + - BytesArrayBuffer failing to initialize \_pos on instantiation + +### Added + - more complete definition for wav audio file + - ability to use BlockDef.str_to_name as a class method + - util.desc_variant has the ability to verify replacements are valid based on field sizes + - util.desc_variant no longer needs name of field specified(taken from replacement desc) + ## [1.5.4] ### Changed - Update build config for Python 3.9. diff --git a/setup.py b/setup.py index 9cb470f..24b9e80 100644 --- a/setup.py +++ b/setup.py @@ -70,6 +70,9 @@ "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3 :: Only", ], zip_safe=False, diff --git a/supyr_struct/__init__.py b/supyr_struct/__init__.py index 76aacfb..9f67614 100644 --- a/supyr_struct/__init__.py +++ b/supyr_struct/__init__.py @@ -87,8 +87,8 @@ # ############## __author__ = "Sigmmma" # YYYY.MM.DD -__date__ = "2020.10.30" -__version__ = (1, 5, 4) +__date__ = "2025.01.18" +__version__ = (1, 7, 0) __website__ = "https://github.com/Sigmmma/supyr_struct" diff --git a/supyr_struct/blocks/array_block.py b/supyr_struct/blocks/array_block.py index ba774a3..a3f71a1 100644 --- a/supyr_struct/blocks/array_block.py +++ b/supyr_struct/blocks/array_block.py @@ -2,7 +2,7 @@ from sys import getsizeof from supyr_struct.blocks.block import Block -from supyr_struct.blocks.list_block import ListBlock +from supyr_struct.blocks.list_block import ListBlock, repeat from supyr_struct.defs.constants import NAME, UNNAMED, NAME_MAP from supyr_struct.exceptions import DescEditError, DescKeyError from supyr_struct.buffer import get_rawdata_context @@ -37,7 +37,7 @@ def __init__(self, desc, parent=None, init_attrs=None, **kwargs): self.parse(init_attrs=init_attrs, **kwargs) else: # populate the listblock with the right number of fields - list.__init__(self, [None]*self.get_size()) + list.__init__(self, repeat(None, self.get_size())) def __sizeof__(self, seenset=None): ''' @@ -98,8 +98,6 @@ def __setitem__(self, index, new_value): # a Block, set its parent attribute to this Block. if isinstance(new_value, Block): new_value.parent = self - - desc = object.__getattribute__(self, 'desc') elif isinstance(index, slice): # if this is an array, dont worry about # the descriptor since its list indexes @@ -150,10 +148,9 @@ def __delitem__(self, index): # the descriptor since its list indexes # aren't attributes, but instanced objects start, stop, step = index.indices(len(self)) + step = -step if step < 0 else step if start < stop: start, stop = stop, start - if step > 0: - step = -step list.__delitem__(self, index) self.set_size() @@ -239,7 +236,7 @@ def extend(self, new_attrs, **kwargs): index = len(self) # create new, empty indices - list.extend(self, [None]*new_attrs) + list.extend(self, repeat(None, new_attrs)) # read new sub_structs into the empty indices for i in range(index, index + new_attrs): attr_f_type.parser(attr_desc, parent=self, @@ -705,10 +702,10 @@ def parse(self, **kwargs): # parsing/initializing all array elements, so clear and resize list.__delitem__(self, slice(None, None, None)) if initdata is not None: - list.extend(self, [None]*len(initdata)) + list.extend(self, repeat(None, len(initdata))) self.set_size() # update the size to the initdata length else: - list.extend(self, [None]*self.get_size()) + list.extend(self, repeat(None, self.get_size())) if rawdata is not None: # parse the ArrayBlock from raw data @@ -774,7 +771,7 @@ class PArrayBlock(ArrayBlock): node it describes to be stored as well as a reference to whatever Block it is parented to ''' - __slots__ = ('STEPTREE') + __slots__ = ('STEPTREE', ) def __init__(self, desc, parent=None, steptree=None, init_attrs=None, **kwargs): @@ -798,7 +795,7 @@ def __init__(self, desc, parent=None, steptree=None, self.parse(init_attrs=init_attrs, **kwargs) else: # populate the listblock with the right number of fields - list.__init__(self, [None]*self.get_size()) + list.__init__(self, repeat(None, self.get_size())) def __sizeof__(self, seenset=None): ''' diff --git a/supyr_struct/blocks/block.py b/supyr_struct/blocks/block.py index be556ef..f4c1f7b 100644 --- a/supyr_struct/blocks/block.py +++ b/supyr_struct/blocks/block.py @@ -252,11 +252,7 @@ def __sizeof__(self, seenset=None): return 0 seenset.add(id(self)) - bytes_total = object.__sizeof__(self) - - desc = object.__getattribute__(self, 'desc') - - return bytes_total + return object.__sizeof__(self) def __binsize__(self, node, substruct=False): '''You must override this method''' @@ -312,16 +308,19 @@ def get_desc(self, desc_key, attr_name=None): "descriptor of '%s'.") % (desc_key, desc.get('NAME'))) - def get_root(node): + def get_root(self): '''Navigates up the given node and returns the root node.''' # rather than name the function argument 'self' it's slightly # faster to just name it 'root' and not have to do 'root = self' try: - while node.parent: - node = node.parent + while self.parent: + self = self.parent # pylint: disable=W0642 # SHUTUP PYLINT + # for the sake of minimal operations and + # keeping the method signature clean, we + # will be evil and redefine self except AttributeError: pass - return node + return self def get_neighbor(self, path, node=None): ''' @@ -680,12 +679,10 @@ def serialize(self, **kwargs): else: parent_tag = self.get_root() - if "calc_pointers" in kwargs: - calc_pointers = kwargs["calc_pointers"] + calc_pointers = kwargs.get("calc_pointers", True) if isinstance(parent_tag, supyr_struct.tag.Tag): calc_pointers = parent_tag.calc_pointers else: - calc_pointers = True parent_tag = None # convert string attr_indexes to ints @@ -820,7 +817,7 @@ def parent(self, new_val): except TypeError: # some object types don't support __weakref__ so we have to # wrap them in something that our getter will still work with - new_val = lambda val=new_val: val + new_val = lambda val=new_val: val # pylint: disable=C3001 # we just need to set self._parent to the new wrapped value. # we want to do this as fast as possible, so we're going to diff --git a/supyr_struct/blocks/data_block.py b/supyr_struct/blocks/data_block.py index bf2646e..39dd865 100644 --- a/supyr_struct/blocks/data_block.py +++ b/supyr_struct/blocks/data_block.py @@ -131,14 +131,12 @@ def __sizeof__(self, seenset=None): seenset.add(id(self)) data = self.data - if isinstance(data, Block): - bytes_total = object.__sizeof__(self) + data.__sizeof__(seenset) - else: - bytes_total = object.__sizeof__(self) + getsizeof(data) - - desc = object.__getattribute__(self, 'desc') - return bytes_total + return object.__sizeof__(self) + ( + data.__sizeof__(seenset) + if isinstance(data, Block) else + getsizeof(data) + ) def __copy__(self): ''' diff --git a/supyr_struct/blocks/list_block.py b/supyr_struct/blocks/list_block.py index 8706f96..480c401 100644 --- a/supyr_struct/blocks/list_block.py +++ b/supyr_struct/blocks/list_block.py @@ -1,6 +1,7 @@ ''' ''' from copy import deepcopy +from itertools import repeat, takewhile from sys import getsizeof from supyr_struct.blocks.block import Block @@ -46,7 +47,7 @@ def __init__(self, desc, parent=None, init_attrs=None, **kwargs): self.parse(init_attrs=init_attrs, **kwargs) else: # populate the listblock with the right number of fields - list.__init__(self, [None]*desc['ENTRIES']) + list.__init__(self, repeat(None, desc['ENTRIES'])) def __str__(self, **kwargs): ''' @@ -214,7 +215,7 @@ def __deepcopy__(self, memo): # clear the Block so it can be populated list.__delitem__(dup_block, slice(None, None, None)) - list.extend(dup_block, [None]*len(self)) + list.extend(dup_block, repeat(None, len(self))) # populate the duplicate for i in range(len(self)): @@ -248,8 +249,6 @@ def __sizeof__(self, seenset=None): seenset.add(id(self)) bytes_total = list.__sizeof__(self) - desc = object.__getattribute__(self, 'desc') - for i in range(len(self)): item = list.__getitem__(self, i) if not id(item) in seenset: @@ -268,9 +267,9 @@ def __getitem__(self, index): If index is a string, returns self.__getattr__(index) ''' - if isinstance(index, str): - return self.__getattr__(index) - return list.__getitem__(self, index) + return (self.__getattr__(index) if isinstance(index, str) else + list.__getitem__(self, index) + ) def __setitem__(self, index, new_value): ''' @@ -299,8 +298,7 @@ def __setitem__(self, index, new_value): ''' if isinstance(index, int): # handle accessing negative indexes - if index < 0: - index += len(self) + index = index + len(self) if index < 0 else index assert not self.assert_is_valid_field_value(index, new_value) list.__setitem__(self, index, new_value) @@ -331,10 +329,9 @@ def __setitem__(self, index, new_value): elif isinstance(index, slice): start, stop, step = index.indices(len(self)) + step = -step if step < 0 else step if start > stop: start, stop = stop, start - if step < 0: - step = -step assert hasattr(new_value, '__iter__'), ( "must assign iterable to extended slice") @@ -422,8 +419,14 @@ def index_by_id(self, node): Returns the index that node is in. Raises ValueError if node can not be found. ''' - return [id(list.__getitem__(self, i)) for - i in range(len(self))].index(id(node)) + index_finder = takewhile( + id(node).__ne__, map(id, self) + ) + index = sum(map(bool, index_finder)) + if index < len(self): + return index + + raise ValueError("Item '%s' is not in the list" % node) def get_size(self, attr_index=None, **context): ''' @@ -803,7 +806,7 @@ def parse(self, **kwargs): if kwargs.get("clear", True): # parsing/initializing all attributes, so clear the block # and create as many elements as it needs to hold - list.__init__(self, [None]*desc['ENTRIES']) + list.__init__(self, repeat(None, desc['ENTRIES'])) if rawdata is not None: # parse the ListBlock from raw data @@ -868,7 +871,7 @@ class PListBlock(ListBlock): node it describes to be stored as well as a reference to whatever Block it is parented to. ''' - __slots__ = ('STEPTREE') + __slots__ = ('STEPTREE', ) def __init__(self, desc, parent=None, steptree=None, init_attrs=None, **kwargs): @@ -892,7 +895,7 @@ def __init__(self, desc, parent=None, steptree=None, self.parse(init_attrs=init_attrs, **kwargs) else: # populate the listblock with the right number of fields - list.__init__(self, [None]*desc['ENTRIES']) + list.__init__(self, repeat(None, desc['ENTRIES'])) def __sizeof__(self, seenset=None): ''' @@ -920,8 +923,6 @@ def __sizeof__(self, seenset=None): seenset.add(id(steptree)) bytes_total += getsizeof(steptree) - desc = object.__getattribute__(self, 'desc') - for i in range(len(self)): item = list.__getitem__(self, i) if not id(item) in seenset: diff --git a/supyr_struct/blocks/union_block.py b/supyr_struct/blocks/union_block.py index 47d1a41..cab02d4 100644 --- a/supyr_struct/blocks/union_block.py +++ b/supyr_struct/blocks/union_block.py @@ -334,8 +334,6 @@ def __sizeof__(self, seenset=None): seenset.add(id(self)) bytes_total = object.__sizeof__(self) + getsizeof(self.u_node) - desc = object.__getattribute__(self, 'desc') - return bytes_total def __binsize__(self, node, substruct=False): @@ -387,7 +385,7 @@ def flush(self): # If they are smaller, some of the most significant bytes # arent used, which in big endian are the first bytes. u_type.serializer(u_node, self, None, self, 0, - desc.get(SIZE) - u_desc.get(SIZE)) + desc.get(SIZE, 0) - u_desc.get(SIZE, u_type.size)) else: u_type.serializer(u_node, self, None, self) diff --git a/supyr_struct/blocks/void_block.py b/supyr_struct/blocks/void_block.py index 72bb6da..70cac32 100644 --- a/supyr_struct/blocks/void_block.py +++ b/supyr_struct/blocks/void_block.py @@ -32,6 +32,9 @@ def __init__(self, desc=None, parent=None, **kwargs): object.__setattr__(self, "desc", desc) self.parent = parent + def set_size(self, new_value, attr_index=None, **context): + pass + def __copy__(self): ''' Creates a copy of this Block which references @@ -125,4 +128,4 @@ def get_size(self, attr_index=None, **context): def parse(self, **kwargs): '''VoidBlocks have nothing to parse. Does nothing.''' - pass + pass # pylint: disable=W0107 # removing pass here would look horrible diff --git a/supyr_struct/blocks/while_block.py b/supyr_struct/blocks/while_block.py index acdbd46..82d3f13 100644 --- a/supyr_struct/blocks/while_block.py +++ b/supyr_struct/blocks/while_block.py @@ -4,7 +4,7 @@ stored anywhere and must be parsed until some function says to stop. ''' from supyr_struct.blocks.block import Block -from supyr_struct.blocks.list_block import ListBlock +from supyr_struct.blocks.list_block import ListBlock, repeat from supyr_struct.blocks.array_block import ArrayBlock, PArrayBlock from supyr_struct.defs.constants import SUB_STRUCT, NAME, UNNAMED from supyr_struct.exceptions import DescEditError, DescKeyError @@ -36,8 +36,7 @@ def __setitem__(self, index, new_value): ''' if isinstance(index, int): # handle accessing negative indexes - if index < 0: - index += len(self) + index = index + len(self) if index < 0 else index assert not self.assert_is_valid_field_value(index, new_value) list.__setitem__(self, index, new_value) @@ -48,10 +47,9 @@ def __setitem__(self, index, new_value): elif isinstance(index, slice): start, stop, step = index.indices(len(self)) + step = -step if step < 0 else step if start > stop: start, stop = stop, start - if step < 0: - step = -step assert hasattr(new_value, '__iter__'), ( "must assign iterable to extended slice") @@ -138,8 +136,7 @@ def extend(self, new_attrs, **kwargs): elif isinstance(new_attrs, int): # if this Block is an array and "new_attr" is an int it means # that we are supposed to append this many of the SUB_STRUCT - for i in range(new_attrs): - self.append(**kwargs) + [self.append(**kwargs) for _ in range(new_attrs)] else: raise TypeError("Argument type for 'extend' must be an " + "instance of ListBlock or int, not %s" % @@ -266,24 +263,21 @@ def set_size(self, new_value=None, attr_index=None, **context): self_desc.get('NAME', UNNAMED), f_type, f_type.size)) + # if a new size wasnt provided then it needs to be calculated + newsize = desc['TYPE'].sizecalc( + node, parent=self, attr_index=attr_index, **context + ) if new_value is None else new_value + if isinstance(size, int): # Because literal descriptor sizes are supposed to be static # (unless you're changing the structure), we don't even try to # change the size if the new size is less than the current one. - if new_value is None and newsize <= size: + if newsize <= size: return raise DescEditError("Changing a size statically defined in a " + "descriptor is not supported through " + "set_size. Make a new descriptor instead.") - - # if a new size wasnt provided then it needs to be calculated - if new_value is not None: - newsize = new_value - else: - newsize = desc['TYPE'].sizecalc(node, parent=self, - attr_index=attr_index, **context) - - if isinstance(size, str): + elif isinstance(size, str): # set size by traversing the tag structure # along the path specified by the string self.set_neighbor(size, newsize, node) @@ -428,7 +422,7 @@ def parse(self, **kwargs): raise TypeError("Could not locate the sub-struct descriptor." + "\nCould not initialize array") - list.extend(self, [None]*init_len) + list.extend(self, repeat(None, init_len)) if kwargs.get('init_attrs', True) or issubclass(attr_f_type.node_cls, Block): # loop through each element in the array and initialize it @@ -465,7 +459,7 @@ class PWhileBlock(WhileBlock): See supyr_struct.blocks.while_block.WhileBlock.__doc__ for more help. ''' - __slots__ = ('STEPTREE') + __slots__ = ('STEPTREE', ) __init__ = PArrayBlock.__init__ diff --git a/supyr_struct/buffer.py b/supyr_struct/buffer.py index d869025..ca89a26 100644 --- a/supyr_struct/buffer.py +++ b/supyr_struct/buffer.py @@ -37,7 +37,7 @@ def __exit__(self, except_type, except_value, traceback): if self._close_rawdata: self._rawdata.close() except AttributeError: - return + pass def get_rawdata(**kwargs): @@ -121,7 +121,7 @@ class Buffer(): def __init__(self, *args): # Dummy __init__ that makes sure there is always a self._pos. # Accepts args like *args to account for child objects. - self._pos = 0 + self._pos = 0 # pylint: disable=E0237 def read(self, count=None): ''' @@ -194,14 +194,11 @@ def peek(self, count=None, offset=None): Reads and returns 'count' number of bytes without changing the current read/write pointer position. ''' - if offset is None: - pos = self._pos - else: - pos = offset + pos = self._pos if offset is None else offset try: - if pos + count < len(self): - return self[pos:pos + count] - return self[pos:pos + len(self)] + len_self = len(self) + peek_end = pos + count + return self[pos: len_self if peek_end >= len_self else peek_end] except TypeError: pass @@ -243,7 +240,7 @@ def seek(self, pos, whence=SEEK_SET): if whence == SEEK_SET: assert pos >= 0, "Read position cannot be negative." - if pos - 1 not in range(len(self)): + if pos not in range(len(self) + 1): raise IndexError('seek position out of range') self._pos = pos @@ -251,7 +248,7 @@ def seek(self, pos, whence=SEEK_SET): pos = self._pos + pos assert pos >= 0, "Read position cannot be negative." - if pos - 1 not in range(len(self)): + if pos not in range(len(self) + 1): raise IndexError('seek position out of range') self._pos = pos @@ -259,7 +256,7 @@ def seek(self, pos, whence=SEEK_SET): pos += len(self) assert pos >= 0, "Read position cannot be negative." - if pos - 1 not in range(len(self)): + if pos not in range(len(self) + 1): raise IndexError('seek position out of range') self._pos = pos @@ -287,6 +284,9 @@ class BytearrayBuffer(bytearray, Buffer): Uses os.SEEK_SET, os.SEEK_CUR, and os.SEEK_END when calling seek. ''' __slots__ = ('_pos',) + def __init__(self, *args): + bytearray.__init__(self, *args) + Buffer.__init__(self, *args) def peek(self, count=None, offset=None): ''' diff --git a/supyr_struct/defs/audio/wav.py b/supyr_struct/defs/audio/wav.py index 95a50c1..97f85fe 100644 --- a/supyr_struct/defs/audio/wav.py +++ b/supyr_struct/defs/audio/wav.py @@ -11,6 +11,46 @@ "data", "fact", "PEAK", + "LIST", + "id3 ", + ) + +list_type_sigs = ( + "INFO", + "adtl", + ) + +list_info_type_sigs = ( + # taken from here: + # https://www.recordingblogs.com/wiki/list-chunk-of-a-wave-file + "IARL", # The location where the subject of the file is archived + "IART", # The artist of the original subject of the file + "ICMS", # The name of the person or organization that commissioned the original subject of the file + "ICMT", # General comments about the file or its subject + "ICOP", # Copyright information about the file (e.g., "Copyright Some Company 2011") + "ICRD", # The date the subject of the file was created (creation date) (e.g., "2022-12-31") + "ICRP", # Whether and how an image was cropped + "IDIM", # The dimensions of the original subject of the file + "IDPI", # Dots per inch settings used to digitize the file + "IENG", # The name of the engineer who worked on the file + "IGNR", # The genre of the subject + "IKEY", # A list of keywords for the file or its subject + "ILGT", # Lightness settings used to digitize the file + "IMED", # Medium for the original subject of the file + "INAM", # Title of the subject of the file (name) + "IPLT", # The number of colors in the color palette used to digitize the file + "IPRD", # Name of the title the subject was originally intended for + "ISBJ", # Description of the contents of the file (subject) + "ISFT", # Name of the software package used to create the file + "ISRC", # The name of the person or organization that supplied the original subject of the file + "ISRF", # The original form of the material that was digitized (source form) + "ITCH", # The name of the technician who digitized the subject file + ) + +list_adtl_type_sigs = ( + "labl", + "note", + "ltxt", ) wav_formats = ( @@ -38,6 +78,19 @@ def fmt_extra_data_size(parent=None, new_value=None, *args, **kwargs): parent.length = new_value + 16 +def get_set_chunk_size(parent=None, new_value=None, *args, **kwargs): + if parent is None: + return 0 + if new_value is None: + return ((parent.data_size+3)//4)*4 + + parent.data_size = ((new_value+3)//4)*4 + +def get_list_chunk_size(list_data): + return 4 + sum( + 8 + get_set_chunk_size(parent=p) for p in list_data + ) + def has_next_chunk(rawdata=None, **kwargs): try: data = rawdata.peek(8) @@ -49,6 +102,18 @@ def has_next_chunk(rawdata=None, **kwargs): except AttributeError: return False +def has_next_list_sub_chunk(parent=None, rawdata=None, **kwargs): + if None in (parent, rawdata): + return False + + try: + return ( + get_set_chunk_size(parent=parent.parent) > + get_list_chunk_size(parent.parent.list_data) + ) + except Exception: + pass + def get_chunk_type(rawdata=None, **kwargs): try: data = rawdata.peek(4) @@ -57,7 +122,6 @@ def get_chunk_type(rawdata=None, **kwargs): except AttributeError: pass - def chunk_extra_data_size(parent=None, rawdata=None, new_value=None, extra_size=0, **kwargs): if new_value is None: @@ -85,39 +149,64 @@ def peak_chunk_extra_data_size(parent=None, rawdata=None, new_value=None, parent=parent, rawdata=rawdata, new_value=new_value, extra_size=8 + 8 * channel_count, **kwargs) +def read_write_id3_data_size( + parent=None, writebuffer=None, rawdata=None, offset=0, root_offset=0, **kwargs + ): + buffer = writebuffer if rawdata is None else rawdata + if not parent or buffer is None: + return -peak_position = QStruct("peak_position", - Float("value"), - UInt32("position"), - ) + try: + buffer.seek(offset + root_offset) + + # it's weird, but here's how they define the size: + # The ID3 tag size is encoded with four bytes where the first bit (bit 7) + # is set to zero in every byte, making a total of 28 bits. The zeroed bits + # are ignored, so a 257 bytes long tag is represented as $00 00 02 01. + if writebuffer is not None: + buffer.write(bytes( + (val >> (7 * (3 - i))) & 0x7F + for i, val in enumerate([parent.frame_data_size] * 4) + )) + else: + parent.frame_data_size = sum( + (b & 0x7F) << 8*i + for i, b in enumerate(buffer.read(4)) + ) -chunk_sig_enum = UEnum32("sig", - *((fourcc, fourcc[::-1]) for fourcc in chunk_sigs), - EDITABLE=False - ) + return offset + 4 + except Exception: + pass + +def Chunk(name, all_sigs, sig_default, *fields, **desc): + return Container(name, + UEnum32("sig", + *((sig, sig[::-1]) for sig in all_sigs), + DEFAULT=sig_default[::-1] + ), + UInt32("data_size", EDITABLE=False), + *fields, + **desc + ) -unknown_chunk = Container("unknown_chunk", - UEnum32("sig", INCLUDE=chunk_sig_enum), - UInt32("data_size", EDITABLE=False), +unknown_chunk = Chunk("unknown_chunk", + chunk_sigs, '\x00\x00\x00\x00', BytesRaw("data", SIZE=chunk_extra_data_size) ) -data_chunk = Container("data_chunk", - UEnum32("sig", INCLUDE=chunk_sig_enum, DEFAULT="atad"), - UInt32("data_size", EDITABLE=False), +data_chunk = Chunk("data_chunk", + chunk_sigs, 'data', BytesRaw("data", SIZE=chunk_extra_data_size) ) -fact_chunk = Container("fact_chunk", - UEnum32("sig", INCLUDE=chunk_sig_enum, DEFAULT="tcaf"), - UInt32("data_size", DEFAULT=4, EDITABLE=False), - UInt32("sample_count"), - BytesRaw("data", SIZE=fact_chunk_extra_data_size, VISIBLE=False) + +peak_position = QStruct("peak_position", + Float("value"), + UInt32("position"), ) -peak_chunk = Container("peak_chunk", - UEnum32("sig", INCLUDE=chunk_sig_enum, DEFAULT="KAEP"), - UInt32("data_size", EDITABLE=False), +peak_chunk = Chunk("peak", + chunk_sigs, 'PEAK', UInt32("version"), Timestamp32("timestamp"), Array("peak", @@ -128,6 +217,70 @@ def peak_chunk_extra_data_size(parent=None, rawdata=None, new_value=None, ) +fact_chunk = Chunk("fact", + chunk_sigs, 'fact', + UInt32("sample_count"), + BytesRaw("data", SIZE=fact_chunk_extra_data_size, VISIBLE=False) + ) + + +adtl_sub_chunk = Chunk("label", + list_adtl_type_sigs, "\x00\x00\x00\x00", + BytesRaw("data", SIZE=chunk_extra_data_size, VISIBLE=False) + ) + +info_sub_chunk = Chunk("list_info", + list_info_type_sigs, "\x00\x00\x00\x00", + StrLatin1("info", SIZE=get_set_chunk_size), + ) + +list_chunk = Chunk("list_chunk", + chunk_sigs, 'LIST', + UEnum32("list_type_sig", + *((sig, sig[::-1]) for sig in list_type_sigs) + ), + Switch("list_data", + CASE=".list_type_sig.enum_name", + CASES={ + "INFO": WhileArray("list_data", + SUB_STRUCT=info_sub_chunk, + CASE=has_next_list_sub_chunk + ), + "adtl": WhileArray("list_data", + SUB_STRUCT=adtl_sub_chunk, + CASE=has_next_list_sub_chunk + ) + } + ) + ) + + +id3_chunk = Chunk("id3_chunk", + chunk_sigs, 'id3 ', + # what the fuck is up with this spec? it's like it was written by a + # sanitarium patient. Look at the comment in read_write_id3_data_size + # to get an idea. anyway, the spec is here: + # https://mutagen-specs.readthedocs.io/en/latest/id3/id3v2.2.html#id3v2-header + UInt24("id3_sig", DEFAULT="ID3", EDITABLE=False, ENDIAN=">"), + UInt8("version"), + UInt8("revision"), + Bool8("flags", + "uses_unsynchronisation", + "uses_compression", + ), + # okay so, this value was designed by essentially a skooma + # eater, so you'll have to bear with how it's calculated. + WritableComputed("frame_data_size", + COMPUTE_READ=read_write_id3_data_size, + COMPUTE_WRITE=read_write_id3_data_size, + SIZE=4, EDITABLE=False, MAX=((1<<27) - 1) + ), + # yeah so, the frame data spec is even more weird. we're not gonna bother + # trying to parse it, and instead just read it as a byte string. + BytesRaw("frame_data", SIZE=".frame_data_size", MAX=((1<<27) - 1)), + ) + + wav_header = QStruct("wav_header", UInt32("riff_sig", DEFAULT="FFIR", EDITABLE=False), UInt32("filesize"), @@ -160,6 +313,8 @@ def peak_chunk_extra_data_size(parent=None, rawdata=None, new_value=None, "data": data_chunk, "fact": fact_chunk, "PEAK": peak_chunk, + "LIST": list_chunk, + "id3 ": id3_chunk, } ) diff --git a/supyr_struct/defs/block_def.py b/supyr_struct/defs/block_def.py index 33c020e..73f678c 100644 --- a/supyr_struct/defs/block_def.py +++ b/supyr_struct/defs/block_def.py @@ -482,7 +482,6 @@ def make_desc(self, *desc_entries, **desc): for key in tuple(desc.keys()): if key not in desc_keywords: del desc[key] - continue elif isinstance(desc[key], BlockDef): # if the entry in desc is a BlockDef, it # needs to be replaced with its descriptor. @@ -505,8 +504,6 @@ def make_subdefs(self, replace_subdefs=False): Converts all the entries in self.subdefs into BlockDefs and tries to make BlockDefs for all the entries in the descriptor. ''' - desc = self.descriptor - sub_kwargs = {'align_mode': self.align_mode, 'endian': self.endian} # make sure all the subdefs are BlockDefs @@ -515,10 +512,11 @@ def make_subdefs(self, replace_subdefs=False): if not isinstance(d, BlockDef): self.subdefs[i] = BlockDef(str(i), descriptor=d, **sub_kwargs) - # DO NOT REMOVE THE RETURN!!!!! + # DO NOT UNCOMMENT!!!!! # The below code was causing a 300% memory bloat and making library # startup take much longer. Only enable if a solution is found. - return + ''' + desc = self.descriptor # try to make all descriptors in this Blockdef into their own BlockDefs for i in desc: @@ -533,6 +531,7 @@ def make_subdefs(self, replace_subdefs=False): **sub_kwargs) except Exception: pass + ''' def sanitize(self, desc=None): ''' @@ -662,29 +661,35 @@ def set_entry_count(self, src_dict, key=None): int_count += 1 src_dict[ENTRIES] = int_count - def str_to_name(self, string, reserved_names=reserved_desc_names, **kwargs): - try: + def str_to_name(self, string, reserved_names=None, **kwargs): + if reserved_names is None: + reserved_names = reserved_desc_names + e_str = "" + try: if not isinstance(string, str): - self._e_str += (("ERROR: INVALID TYPE FOR NAME. EXPECTED " + + e_str += (("ERROR: INVALID TYPE FOR NAME. EXPECTED " + "%s, GOT %s.\n") % (str, type(string))) - self._bad = True - return None - sanitized_str = str_to_identifier(string) + sanitized_str = "" if e_str else str_to_identifier(string) if not sanitized_str: - self._e_str += (("ERROR: CANNOT USE '%s' AS AN ATTRIBUTE " + + e_str += (("ERROR: CANNOT USE '%s' AS AN ATTRIBUTE " + "NAME.\nWHEN SANITIZED IT BECAME ''\n\n") % string) - self._bad = True - return None elif sanitized_str in reserved_names and\ not kwargs.get('allow_reserved', False): - self._e_str += ("ERROR: CANNOT USE THE RESERVED KEYWORD " + + e_str += ("ERROR: CANNOT USE THE RESERVED KEYWORD " + "'%s' AS AN ATTRIBUTE NAME.\n\n" % string) - self._bad = True - return None + + if e_str: + if self is None: + raise ValueError(e_str) + + self._e_str = e_str + self._bad = True + sanitized_str = None + return sanitized_str except Exception: print(format_exc()) diff --git a/supyr_struct/defs/filesystem/objs/olecf.py b/supyr_struct/defs/filesystem/objs/olecf.py index 38b6484..6564709 100644 --- a/supyr_struct/defs/filesystem/objs/olecf.py +++ b/supyr_struct/defs/filesystem/objs/olecf.py @@ -24,49 +24,48 @@ class OlecfDataStream(Buffer): _contig_ministream = None # the ministream after it's been assembled into # a contiguous bytes object. much faster to read - _sector_chain = () # an iterable which contains the sector numbers - # of the FAT sectors of the olecf Tag being parsed. - # If the stream being parsed is in the ministream, - # this will instead contain the miniFAT sector numbers. - # This basically functions as a contiguous DIFAT array. - - _pos = 0 # the virtual offset within the data stream that the - # read/write pointer would be at if it were contiguous - _sector = 0 # the offset sector the read/write pointer is at. - _cell = 0 # the offset within the sector the read/write pointer is at. - _sector_size = 512 # number of bytes in a sector - _sects_per_fat = 128 # number of array entries in each FAT/miniFAT sector + _sectors = () + _fat_chain = () # an iterable which contains the sector numbers + # of the FAT sectors of the olecf Tag being parsed. + # If the stream being parsed is in the ministream, + # this will instead contain the miniFAT sector numbers. + # This basically functions as a contiguous DIFAT array. + + _start_sector = 0 + _sector_idx = 0 + _pos = 0 + _mini_sector_size = 64 # number of bytes in a miniFAT sector + _sector_size = 512 # number of bytes in a sector + _sects_per_fat = 128 # number of array entries in each FAT/miniFAT sector def __init__(self, storage_block): self._storage_block = storage_block self._tag = tag = storage_block.get_root() header = tag.data.header - self._pos = self._cell = 0 + self._pos = 0 - self._sector = storage_block.stream_sect_start - self._sects_per_fat = (1 << header.sector_shift) // 4 + self._sectors = self._tag.data.sectors + self._start_sector = self._storage_block.stream_sect_start + self._sector_idx = self._start_sector + self._mini_sector_size = 1 << header.mini_sector_shift + self._sector_size = 1 << header.sector_shift + self._sects_per_fat = self._sector_size // 4 - if (storage_block.stream_len < header.mini_stream_cutoff and - storage_block.storage_type.enum_name != 'root'): + is_minifat = ( + storage_block.stream_len < header.mini_stream_cutoff and + storage_block.storage_type.enum_name != 'root' + ) + self._fat_chain = tag.minifat_sectors if is_minifat else tag.fat_sectors + + if is_minifat: # this stream exists in the ministream, so make an instance # of OlecfDataStream to handle parsing the ministream so this # can simply focus on parsing the stream within the ministream. - self._sector_size = 1 << header.mini_sector_shift - self._sector_chain = tag.minifat_sectors - if hasattr(tag, 'ministream'): - self._ministream = tag.ministream - else: - self._ministream = tag.get_stream_by_index(0) - if hasattr(tag, 'contig_ministream'): - self._contig_ministream = tag.contig_ministream - else: - self._contig_ministream = self._ministream.peek() - else: - # this is either too large to be in the - # ministream, or it IS the ministream - self._sector_size = 1 << header.sector_shift - self._sector_chain = tag.fat_sectors + self._ministream = (getattr(tag, "ministream", None) + or tag.get_stream_by_index(0)) + self._contig_ministream = (getattr(tag, "contig_ministream", None) + or self._ministream.peek()) def flush_ministream(self): ''' @@ -87,142 +86,81 @@ def recache_ministream(self): self._ministream.seek(0) self._contig_ministream = self._ministream.peek() - def __len__(self): - return self._storage_block.stream_len - - def size(self): - return self._storage_block.stream_len + @property + def contig_ministream(self): + return self._contig_ministream or ( + self._ministream.peek() if self._ministream else None + ) + + @property + def fat_sector_idx(self): return self._fat_chain[self.chain_idx] + @property + def sub_sector_idx(self): return self.sector_idx % self._sects_per_fat + @property + def chain_idx(self): return self.sector_idx // self._sects_per_fat + @property + def sector_idx(self): return self._sector_idx + @property + def next_sector_idx(self): return self.fat_sector.sect_nums[self.sub_sector_idx] + @property + def sector(self): return self._sectors[self.sector_idx] + @property + def fat_sector(self): return self._sectors[self.fat_sector_idx] + @property + def sector_data(self): return self.contig_ministream or self.sector.data + + def __len__(self): return self._storage_block.stream_len + def size(self): return len(self) + def tell(self): return self._pos def read(self, count=None): '''Reads and returns 'count' number of bytes as a bytes object.''' + streamsize = self.size() + remainder = streamsize - self.tell() + + # make sure to clip 'count' to how many can actually be read + count = remainder if count is None else min(remainder, count) + + # read and return 'count' number of bytes + assert isinstance(count, int), "'count' must be None or an int." + + # determine if we need to read from the sectors array or the ministream. + # if it's the ministream, we have it assemble it from all the sectors + data = b'' + + is_mini = bool(self.contig_ministream) + mini_stride = self._mini_sector_size if is_mini else 0 + stride = mini_stride or self._sector_size + while count > 0: + offset = (self._pos % stride) + self.sector_idx * mini_stride + size = count if count < stride else stride + chunk = self.sector_data[offset: offset + size] + if not chunk: + break + + # if we've moved to the next FAT or miniFAT sector, + # update the pos, sector, and cell to reflect it + data += chunk + size = len(chunk) + if (self._pos % stride) + size >= stride: + self._sector_idx = self.next_sector_idx + + self._pos += len(chunk) + count -= len(chunk) + + return data - if count is None: - # read and return everything after self._pos - count = len(self) - self._pos - else: - # read and return 'count' number of bytes - assert isinstance(count, int), "'count' must be None or an int." - - # make sure to clip 'count' to how many can actually be read - count = min(len(self) - self._pos, count) - - if count == 0: - return b'' - - sect = self._sector - sect_size = self._sector_size - sect_chain = self._sector_chain - sect_array = self._tag.data.sectors - sects_per_fat = self._sects_per_fat - start_cell = self._cell - - # determine how many bytes need to be read from the first - # sector in the chain and the last sector in the chain. - # Every sector between the first and last is fully read. - sect_0_len = sect_size - (self._pos % sect_size) - sect_n_len = (self._pos + count) % sect_size - - # determine if more than one sector is being read(if there - # is a last sector in the chain instead of just a beginning) - if sect_0_len < count: - has_last = True - - # if the number of bytes being read is an exact multiple of the - # sector size, the last sector's size will be set to 0. Fix this. - if sect_n_len == 0: - sect_n_len = sect_size - # determine how many sectors are between the first and last sectors - if sect_0_len + sect_n_len < count: - middle_count = (count - sect_0_len - sect_n_len) // sect_size - else: - middle_count = 0 - else: - has_last = False - middle_count = 0 - - # get the FAT or miniFAT sect_nums of the next sector - fat_sect = sect_array[sect_chain[sect // sects_per_fat]].sect_nums - - # determine if we need to read from the sectors array or the ministream - mini_stream = self._ministream - contig_ministream = self._contig_ministream - - if mini_stream or contig_ministream: - if not contig_ministream: - # reading from the ministream, so have it assemble it for us - contig_ministream = mini_stream.peek() - - # the offset within the ministream to read at - offset = sect * sect_size + start_cell - - # slice out the bytes we want from the first sector - contig_stream = contig_ministream[offset:offset + sect_0_len] - - # get the next sect and its FAT or miniFAT sect_nums - sect = fat_sect[sect % sects_per_fat] - offset = sect * sect_size + start_cell - - # add the middle sectors to the contiguous stream - while middle_count: - fat_sect = sect_array[sect_chain[sect // - sects_per_fat]].sect_nums - contig_stream += contig_ministream[offset:offset + sect_size] - - # decrement the number of remaining middle sectors - middle_count -= 1 - - # get the next sector and its FAT or miniFAT sect_nums - sect = fat_sect[sect % sects_per_fat] - offset = sect * sect_size - - # add the last sector to the contiguous stream - if has_last: - fat_sect = sect_array[sect_chain[sect // - sects_per_fat]].sect_nums - contig_stream += contig_ministream[offset:offset + sect_n_len] - sect = fat_sect[sect % sects_per_fat] - else: - # slice out the bytes we want from the first sector - contig_stream = sect_array[sect].data[start_cell: - start_cell + sect_0_len] - - # get the next sect and its FAT or miniFAT sect_nums - sect = fat_sect[sect % sects_per_fat] - - # add the middle sectors to the contiguous stream - while middle_count: - fat_sect = sect_array[sect_chain[sect // - sects_per_fat]].sect_nums - contig_stream += sect_array[sect].data[:] - - # decrement the number of remaining middle sectors - middle_count -= 1 - - # get the next sector and its FAT or miniFAT sect_nums - sect = fat_sect[sect % sects_per_fat] - - # add the last sector to the contiguous stream - if has_last: - fat_sect = sect_array[sect_chain[sect // - sects_per_fat]].sect_nums - contig_stream += sect_array[sect].data[:sect_n_len] - sect = fat_sect[sect % sects_per_fat] - - # change the pos, sector, and cell to reflect the change - self._pos += count - self._sector = sect - self._cell = sect_n_len - - return contig_stream - - def peek(self, count=None): + def peek(self, count=None, offset=None): ''' Reads and returns 'count' number of bytes from the Buffer without changing the current read/write pointer position. ''' - self._pos, self._sector, self._cell, data = ( - self._pos, self._sector, self._cell, self.read(count)) - return data + orig = (self._sector_idx, self._pos) + try: + offset is None or self.seek(min(offset, self.size())) + return self.read(count) + finally: + (self._sector_idx, self._pos) = orig def seek(self, pos, whence=SEEK_SET): ''' @@ -234,48 +172,34 @@ def seek(self, pos, whence=SEEK_SET): Raises AssertionError if the read pointer would be outside the buffer. Raises ValueError if whence is not SEEK_SET, SEEK_CUR, or SEEK_END. - Raises TypeError if whence is not an int. ''' - - if whence == SEEK_SET: - assert pos < len(self), "Read position cannot be outside buffer." - assert pos >= 0, "Read position cannot be negative." - self._pos = pos - elif whence == SEEK_CUR: - p = self._pos + pos - assert p < len(self), "Read position cannot be outside buffer." - assert p >= 0, "Read position cannot be negative." - self._pos += pos - elif whence == SEEK_END: - assert pos <= 0, "Read position cannot be outside buffer." - pos += len(self) - assert pos >= 0, "Read position cannot be negative." - self._pos = pos - elif type(whence) is int: + if whence not in (SEEK_SET, SEEK_CUR, SEEK_END): raise ValueError("Invalid value for whence. Expected " + "0, 1, or 2, got %s." % whence) - else: - raise TypeError("Invalid type for whence. Expected " + - "%s, got %s" % (int, type(whence))) - pos = self._pos + if whence == SEEK_SET: + # reset so we can seek forward + self._sector_idx = self._start_sector + self._pos = 0 + elif whence == SEEK_END: + pos = (pos + len(self)) - self.tell() - # change the sector and cell to reflect the new pos - self._cell = pos % self._sector_size - self._sector = self._sector_chain[pos // self._sects_per_fat] + assert pos >= 0, "Read position cannot be negative." + assert pos < self.size(), "Read position cannot be outside the buffer." - def write(self, s): - raise NotImplementedError('Cant do that yet.') + # to seek, need to actually jump from sector to sector + is_mini = bool(self.contig_ministream) + stride = self._mini_sector_size if is_mini else self._sector_size + while pos > 0: + size = pos if pos < stride else stride + if (self._pos % stride) + size >= stride: + self._sector_idx = self.next_sector_idx - # NEED TO MAKE DIS CRAP WURK - s = memoryview(s).tobytes() - str_len = len(s) + pos -= size + self._pos += size - if len(s) + self._pos > len(self): - raise IndexError( - 'Input too long to write to data stream at the current offset') - - self._pos += str_len + def write(self, s): + raise NotImplementedError('Writing to Olecf not currently supported.') class OlecfTag(Tag): @@ -310,6 +234,15 @@ def __init__(self, **kwargs): Tag.__init__(self, **kwargs) + try: + self.ministream = self.get_stream_by_index(0) + except Exception: + self.ministream = None + try: + self.contig_ministream = self.ministream.peek() + except Exception: + self.contig_ministream = b'' + def get_dir_entry_by_name(self, name): '''Returns the directory entry linked to the given name.''' return self.get_dir_entry_by_index(self.dir_names.index(name)) diff --git a/supyr_struct/defs/filesystem/objs/thumbs.py b/supyr_struct/defs/filesystem/objs/thumbs.py index 62f9fb0..d4d06c8 100644 --- a/supyr_struct/defs/filesystem/objs/thumbs.py +++ b/supyr_struct/defs/filesystem/objs/thumbs.py @@ -59,7 +59,7 @@ def jfif_stream_size(node=None, parent=None, attr_index=None, jfif_stream = Container('jfif_stream', BytesRaw('segment_mark', SIZE=2), UInt16('stream_len', ENDIAN='>'), # length of the upcoming data stream - # plus the size of this field + # plus the size of this field BytesRaw('data_stream', SIZE=jfif_stream_size) ) @@ -104,7 +104,7 @@ def jfif_stream_size(node=None, parent=None, attr_index=None, catalog_entry = Container('catalog_entry', UInt32('record_len'), # the number of bytes of this entry UInt32('thumb_id'), # begins at 1 for the first thumbnail and - # increments by 1 for each subsequent thumbnail + # increments by 1 for each subsequent thumbnail UInt64('timestamp'), # timestamp in win32 standard time. # Use win32time_to_pytime to convert to a # python timestamp and pytime_to_win32time @@ -130,19 +130,29 @@ def jfif_stream_size(node=None, parent=None, attr_index=None, class ThumbsTag(OlecfTag): - ''' - ''' - def __init__(self, **kwargs): - OlecfTag.__init__(self, **kwargs) - try: - self.data.sectors - except (AttributeError, IndexError, KeyError): - return - try: - self.ministream = self.get_stream_by_index(0) - except Exception: - self.ministream = None - try: - self.contig_ministream = self.ministream.peek() - except Exception: - self.contig_ministream = b'' + _catalog = None + + @property + def catalog(self): + if self._catalog is None: + self._catalog = catalog_def.build( + rawdata = self.get_stream_by_name('Catalog').read() + ) + return self._catalog.catalog_array + + def get_thumbnail_path(self, index): + return self.catalog[index].name.replace('\\', '/') + ".jpg" + + def get_thumbnail_data(self, index, offset=0, size=None): + # Get a stream buffer to read the thumbnail from. + # The name is the reversed thumbnail index as a string + thumb_stream = self.get_stream_by_name(str(index+1)[::-1]) + + # get the raw thumbnail stream data + thumb_data = thumb_stream.peek() + # if this is a headered thumbnail then build the jpeg struct + if thumb_data[:2] == SOI: + thumb_data = fast_thumb_stream_def.build( + rawdata = thumb_data).data_stream + + return thumb_data diff --git a/supyr_struct/defs/filesystem/olecf.py b/supyr_struct/defs/filesystem/olecf.py index c2f79c4..3167808 100644 --- a/supyr_struct/defs/filesystem/olecf.py +++ b/supyr_struct/defs/filesystem/olecf.py @@ -172,17 +172,16 @@ def sector_parser(self, desc, node=None, parent=None, attr_index=None, sector_desc = sector_array.desc[SUB_STRUCT] sector_field_parser = sector_desc[TYPE].parser - sector_size = 1 << sector_array.get_root().data.header.sector_shift - sector_count = len(rawdata) // sector_size - 1 + size = 1 << sector_array.get_root().data.header.sector_shift + count = len(rawdata) // size - 1 # the number of entries in a FAT sect_nums array - fat_array_size = sector_size // 4 + fat_array_size = size // 4 # This is the last sector number whose FAT is addressed within # the header_difat. Any sector number higher than it will be # allocated to a FAT sector which is allocated to a DIFAT sector header_difat_max_sect = (HEADER_DIFAT_LEN - 1)*fat_array_size - 1 - sects_per_difat = (fat_array_size - 1)*fat_array_size # get the tag that will be used for caching quick sector mappings parent_tag = parent.get_root() @@ -214,19 +213,19 @@ def sector_parser(self, desc, node=None, parent=None, attr_index=None, dir_sectors = parent_tag.dir_sectors dir_names = parent_tag.dir_names - parent_tag.sector_size = sector_size + parent_tag.sector_size = size # clear the quick sector mappings difat_sectors[:] = fat_sectors[:] = minifat_sectors[:] =\ dir_sectors[:] = dir_names[:] = () # read all the sectors as regular sectors - sector_array.extend(sector_count) + sector_array.extend(count) kwargs.update(parent=sector_array, rawdata=rawdata, root_offset=root_offset, offset=offset, case='regular') - for i in range(sector_count): + for i in range(count): kwargs['offset'] = sector_field_parser(sector_desc, attr_index=i, **kwargs) diff --git a/supyr_struct/defs/filesystem/thumbs.py b/supyr_struct/defs/filesystem/thumbs.py index 003d750..0e23d9b 100644 --- a/supyr_struct/defs/filesystem/thumbs.py +++ b/supyr_struct/defs/filesystem/thumbs.py @@ -12,6 +12,7 @@ def get(): return thumbs_def thumbs_def = TagDef("thumbs", - descriptor=olecf_def.descriptor, sanitize=False, + descriptor=olecf_def.descriptor, + sanitize=False, # already been sanitized for olecf_def ext=".db", endian="<", tag_cls=thumbs.ThumbsTag ) diff --git a/supyr_struct/defs/sanitizers.py b/supyr_struct/defs/sanitizers.py index 3cadb12..705223c 100644 --- a/supyr_struct/defs/sanitizers.py +++ b/supyr_struct/defs/sanitizers.py @@ -195,7 +195,6 @@ def struct_sanitizer(blockdef, src_dict, **kwargs): nameset = set() # contains the name of each entriy in the desc rem = 0 # number of dict entries removed key = 0 - pad_count = 0 # loops through the entire descriptor and # finalizes each of the integer keyed attributes @@ -231,15 +230,16 @@ def struct_sanitizer(blockdef, src_dict, **kwargs): rem += 1 src_dict[ENTRIES] -= 1 continue - elif f_type is not None: + + if f_type is not None: # make sure the node has an offset if it needs one if OFFSET not in this_d: this_d[OFFSET] = def_offset elif p_f_type: blockdef._bad = True blockdef._e_str += ( - "ERROR: DESCRIPTOR FOUND MISSING ITS TYPE IN '%s' OF " + - "TYPE '%s' AT INDEX %s.\n" % (p_name, p_f_type, key)) + ("ERROR: DESCRIPTOR FOUND MISSING ITS TYPE IN '%s' OF " + "TYPE '%s' AT INDEX %s.\n") % (p_name, p_f_type, key)) kwargs["key_name"] = key this_d = src_dict[key] = blockdef.sanitize_loop(this_d, **kwargs) @@ -252,7 +252,7 @@ def struct_sanitizer(blockdef, src_dict, **kwargs): name = this_d[NAME] if name in nameset: blockdef._e_str += ( - ("ERROR: DUPLICATE NAME FOUND IN '%s' AT INDEX " + + ("ERROR: DUPLICATE NAME FOUND IN '%s' AT INDEX " "%s.\n NAME OF OFFENDING ELEMENT IS '%s'\n") % (p_name, key, name)) blockdef._bad = True @@ -381,7 +381,7 @@ def sequence_sanitizer(blockdef, src_dict, **kwargs): if size is None: blockdef._bad = True blockdef._e_str += ( - ("ERROR: Pad ENTRY IN '%s' OF TYPE %s AT INDEX %s " + + ("ERROR: Pad ENTRY IN '%s' OF TYPE %s AT INDEX %s " "IS MISSING A SIZE KEY.\n") % (p_name, p_f_type, key)) # make sure the padding follows convention and has a name this_d.setdefault(NAME, 'pad_entry_%s' % pad_count) @@ -389,28 +389,31 @@ def sequence_sanitizer(blockdef, src_dict, **kwargs): src_dict[NAME_MAP][this_d[NAME]] = key pad_count += 1 continue - elif f_type is None and p_f_type: + + if f_type is None and p_f_type: blockdef._bad = True blockdef._e_str += ( - "ERROR: DESCRIPTOR FOUND MISSING ITS TYPE IN '%s' OF " + - "TYPE '%s' AT INDEX %s.\n" % (p_name, p_f_type, key)) + ("ERROR: DESCRIPTOR FOUND MISSING ITS TYPE IN '%s' OF " + "TYPE '%s' AT INDEX %s.\n") % (p_name, p_f_type, key)) kwargs["key_name"] = key this_d = src_dict[key] = blockdef.sanitize_loop(this_d, **kwargs) - if f_type: - sani_name = blockdef.sanitize_name(src_dict, key, **kwargs) - if NAME_MAP in src_dict: - src_dict[NAME_MAP][sani_name] = key + if not f_type: + continue - name = this_d[NAME] - if name in nameset: - blockdef._e_str += ( - ("ERROR: DUPLICATE NAME FOUND IN '%s' AT INDEX " + - "%s.\n NAME OF OFFENDING ELEMENT IS '%s'\n") % - (p_name, key, name)) - blockdef._bad = True - nameset.add(name) + sani_name = blockdef.sanitize_name(src_dict, key, **kwargs) + if NAME_MAP in src_dict: + src_dict[NAME_MAP][sani_name] = key + + name = this_d[NAME] + if name in nameset: + blockdef._e_str += ( + ("ERROR: DUPLICATE NAME FOUND IN '%s' AT INDEX " + "%s.\n NAME OF OFFENDING ELEMENT IS '%s'\n") % + (p_name, key, name)) + blockdef._bad = True + nameset.add(name) return src_dict @@ -453,9 +456,9 @@ def standard_sanitizer(blockdef, src_dict, **kwargs): except AttributeError: blockdef._bad = True blockdef._e_str += ( - ("ERROR: FOUND DESCRIPTOR WHICH SPECIFIES A STEPTREE, BUT " + - "THE CORROSPONDING Block\nHAS NO SLOT FOR A STEPTREE " + - "AND DOES NOT SPECIFY A BLOCK THAT HAS A SLOT.\n " + + ("ERROR: FOUND DESCRIPTOR WHICH SPECIFIES A STEPTREE, BUT " + "THE CORROSPONDING Block\nHAS NO SLOT FOR A STEPTREE " + "AND DOES NOT SPECIFY A BLOCK THAT HAS A SLOT.\n " "OFFENDING ELEMENT IS %s OF TYPE %s\n") % (p_name, p_f_type)) # loops through the descriptors non-integer keyed sub-sections @@ -467,14 +470,16 @@ def standard_sanitizer(blockdef, src_dict, **kwargs): #blockdef._bad = True src_dict.pop(key) continue + if isinstance(src_dict[key], dict) and key != ADDED: kwargs["key_name"] = key f_type = src_dict[key].get(TYPE) this_d = dict(src_dict[key]) # replace with the modified copy so the original is intact - src_dict[key] = this_d = blockdef.sanitize_loop(this_d, - **kwargs) + src_dict[key] = this_d = blockdef.sanitize_loop( + this_d, **kwargs + ) if f_type: # if this is the repeated substruct of an array @@ -484,7 +489,7 @@ def standard_sanitizer(blockdef, src_dict, **kwargs): align = blockdef.get_align(src_dict, key) # if the alignment is 1 then adjustments arent needed if align > 1: - this_d[ALIGN] + this_d[ALIGN] = align sani_name = blockdef.sanitize_name(src_dict, key, **kwargs) if key != SUB_STRUCT: diff --git a/supyr_struct/examples/olecf_extractor.py b/supyr_struct/examples/olecf_extractor.py index fe8aa5a..b2763cc 100644 --- a/supyr_struct/examples/olecf_extractor.py +++ b/supyr_struct/examples/olecf_extractor.py @@ -6,24 +6,24 @@ import tkinter as tk import tkinter.filedialog +from pathlib import Path from traceback import format_exc from supyr_struct.defs.filesystem.olecf import olecf_def from supyr_struct.defs.filesystem.objs.olecf import OlecfTag -test_path = (__file__.split('\\olecf_extractor.py')[0] + - '\\test_tags\\documents\\test.doc') -curr_dir = os.path.abspath(os.curdir) -RESERVED_WINDOWS_FILENAME_MAP = {} -INVALID_PATH_CHARS = set([str(i.to_bytes(1, 'little'), 'ascii') - for i in range(32)]) -for name in ('CON', 'PRN', 'AUX', 'NUL'): - RESERVED_WINDOWS_FILENAME_MAP[name] = '_' + name -for i in range(1, 9): - RESERVED_WINDOWS_FILENAME_MAP['COM%s' % i] = '_COM%s' % i - RESERVED_WINDOWS_FILENAME_MAP['LPT%s' % i] = '_LPT%s' % i -INVALID_PATH_CHARS.update(('<', '>', ':', '"', '/', '\\', '|', '?', '*')) +TAGS_DIR = Path(__file__).parent.joinpath('test_tags') + +INVALID_PATH_CHARS = set( + "".join(str(i.to_bytes(1, 'little'), 'latin-1') + for i in (*range(32), *range(128, 256))) + '<>:"|?*' + ) +RESERVED_WINDOWS_FILENAME_MAP = { + **{name: '_%s' % name for name in ('COM', 'PRN', 'AUX', 'NUL')}, + **{'COM%s' % i: '_COM%s' %i for i in range(10)}, + **{'LPT%s' % i: '_LPT%s' %i for i in range(10)}, + } class OlecfExtractor(tk.Tk): @@ -31,7 +31,7 @@ class OlecfExtractor(tk.Tk): loaded_tag = None listbox_entries = None - initial_dir = curr_dir + initial_dir = TAGS_DIR # each index in the listbox_map maps linearly to the entries in # the listbox and each stores the SID of the dir_entry it points to @@ -45,9 +45,9 @@ def __init__(self, **kwargs): tk.Tk.__init__(self, **kwargs) - self.title("OLECF File Extractor v1.0") + self.title("OLECF File Extractor v1.1") self.geometry("368x243+0+0") - self.resizable(0, 0) + self.resizable(1, 1) self.filepath = tk.StringVar(self, filepath) self.listbox_entries = {} @@ -60,7 +60,7 @@ def __init__(self, **kwargs): self.filepath_entry.insert(tk.INSERT, self.filepath.get()) self.filepath_entry.config(width=59, state=tk.DISABLED) - # add the buttons + # add the buttons and listbox self.btn_load = tk.Button( self, text="Select file", width=15, command=self.browse) self.btn_extract = tk.Button( @@ -68,22 +68,18 @@ def __init__(self, **kwargs): command=lambda: self.extract(extract_selected=True)) self.btn_extract_all = tk.Button( self, text="Extract all", width=15, command=self.extract_all) + self.file_listbox = tk.Listbox(self, selectmode=tk.EXTENDED) - # add the listbox - self.listbox_canvas = tk.Canvas(self, highlightthickness=0) - self.file_listbox = tk.Listbox( - self.listbox_canvas, width=61, height=13, - selectmode=tk.EXTENDED, highlightthickness=0) + self.columnconfigure(3, weight = 1) + self.rowconfigure(2, weight = 1) # place the buttons and filepath field - self.filepath_entry.place(x=5, y=5, anchor=tk.NW) - self.btn_load.place(x=15, y=30, anchor=tk.NW) - self.btn_extract.place(x=150, y=30, anchor=tk.NW) - self.btn_extract_all.place(x=250, y=30, anchor=tk.NW) + self.filepath_entry.grid(row=0, column=0, columnspan=3, sticky="EW") + self.btn_load.grid(row=1, column=0) + self.btn_extract.grid(row=1, column=1) + self.btn_extract_all.grid(row=1, column=2) - # pack the listbox and scrollbars - self.listbox_canvas.place(x=0, y=60) - self.file_listbox.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + self.file_listbox.grid(row=2, column=0, columnspan=4, sticky="NSEW") if filepath: self.load_tag(filepath) @@ -100,57 +96,42 @@ def browse(self): self.load_tag(filepath) def extract(self, file_indices=(), extract_selected=False): - loaded_tag = self.loaded_tag - if not isinstance(loaded_tag, self.tag_cls): + ''' + Extracts the specified files from the given loaded_tag to a folder + with the same name as the OLECF file in the same parent folder. + ''' + if not isinstance(self.loaded_tag, self.tag_cls): print('Loaded tag is not an instance of %s' % self.tag_cls) return - # faster local reference and shortens line lengths - get_stream = loaded_tag.get_stream_by_index - dirname = os.path.dirname - exists = os.path.exists - makedirs = os.makedirs - # get the filepath of the tag without the extension - tag_path = os.path.splitext(loaded_tag.filepath)[0] - output_path_template = tag_path + '\\%s.stream' - - # make sure an output folder exists - makedirs(tag_path + '\\', exist_ok=True) + tag_path = Path(self.loaded_tag.filepath).with_suffix("") if extract_selected: file_indices = [self.listbox_map[i] for i in self.file_listbox.curselection()] + else: + file_indices = self.listbox_entries - print('extracting %s files' % len(file_indices)) + print('extracting %s thumbnails' % len(file_indices)) - # loop over every directory entry + # loop over every entry in the catalog, get the raw + # thumbnail stream data, and write it to a file for i in file_indices: try: - dir_entry = loaded_tag.get_dir_entry_by_index(i) - if dir_entry.storage_type.enum_name == 'unallocated': - continue + name = Path(self.sanitize_filename(self.listbox_entries[i])) + print(' %s' % name) - print(' %s' % dir_entry.name) + data = self.get_item_data(i) - # get the filename - name = self.sanitize_filename(dir_entry.name) + # make sure an output folder exists + tag_path.mkdir(parents=True, exist_ok=True) - # make the output path for the thumbnail - output_path = output_path_template % name - output_folder = dirname(output_path) + with tag_path.joinpath(name).open('w+b') as f: + f.write(data) - # make sure an output folder exists - if not exists(output_folder): - makedirs(output_folder, exist_ok=True) - - # open the output file - with open(output_path_template % name, 'w+b') as f: - # Get a stream buffer to read the data - # from and write it to the output file - f.write(get_stream(i).read()) except Exception: - print(' FAILED TO EXTRACT FILE STREAM AT INDEX %s' % i) + print('FAILED TO EXTRACT FILE STREAM AT INDEX %s' % i) print(format_exc()) continue @@ -163,6 +144,9 @@ def extract_all(self): return self.extract(range(len(loaded_tag.dir_names))) + def get_item_data(self, index): + return self.loaded_tag.get_stream_by_index(index).read() + def get_listbox_entries(self): loaded_tag = self.loaded_tag if not loaded_tag: @@ -181,19 +165,20 @@ def get_listbox_entries(self): return listbox_entries, listbox_map def load_tag(self, filepath=None): - if filepath is None: - filepath = self.filepath.get() - if filepath: - del self.loaded_tag - self.loaded_tag = None - gc.collect() + filepath = filepath or self.filepath.get() + if not filepath: + return - try: - self.loaded_tag = self.tag_def_cls.build(filepath=filepath) - self.filepath.set(filepath) - except Exception: - self.filepath.set('') - self.populate_listbox() + del self.loaded_tag + self.loaded_tag = None + gc.collect() + + try: + self.loaded_tag = self.tag_def_cls.build(filepath=filepath) + self.filepath.set(filepath) + except Exception: + self.filepath.set('') + self.populate_listbox() def populate_listbox(self): if not self.populating_listbox: @@ -209,20 +194,24 @@ def populate_listbox(self): self.populating_listbox = False def sanitize_filename(self, name): + if not name: + return 'EMPTY FILENAME' + # make sure to rename reserved windows filenames to a valid one if name in RESERVED_WINDOWS_FILENAME_MAP: return RESERVED_WINDOWS_FILENAME_MAP[name] - final_name = '' - for c in name: - if c not in INVALID_PATH_CHARS: - final_name += c - if final_name == '': - return 'BAD %s CHAR FILENAME' % len(name) - return final_name + + return ''.join( + "".join("%%%02x" % b for b in c.encode("utf8")).upper() + if c in INVALID_PATH_CHARS else c + for c in name + ) try: if __name__ == '__main__': - extractor = OlecfExtractor(filepath=test_path) + extractor = OlecfExtractor( + filepath=TAGS_DIR.joinpath('documents/test.doc') + ) extractor.mainloop() except Exception: print(format_exc()) diff --git a/supyr_struct/examples/print_test.py b/supyr_struct/examples/print_test.py index 1954371..e826644 100644 --- a/supyr_struct/examples/print_test.py +++ b/supyr_struct/examples/print_test.py @@ -10,7 +10,8 @@ images_folder = join(folder, "images") keyblobs_folder = join(folder, "keyblobs") -def test(tags): +def test(): + tags = [] for bmp_name in ("test16color", "test24_dibv2", "test24_dibv3", "test32_dibv4", "test32_dibv5", "test32_unknown_dib", "test256color", "testmono_os2"): @@ -34,16 +35,17 @@ def test(tags): for tga_name in ("aeskey", "rsaprikey", "rsapubkey"): tags.append(keyblob.keyblob_def.build( filepath=join(keyblobs_folder, tga_name + ".bin"))) + return tags if __name__ == "__main__": try: - tags = [] + loaded_tags = [] try: - test(tags) + loaded_tags.extend(test()) except Exception: print(traceback.format_exc()) - for tag in tags: + for tag in loaded_tags: print(tag) except Exception: print(traceback.format_exc()) diff --git a/supyr_struct/examples/thumbnail_extractor.py b/supyr_struct/examples/thumbnail_extractor.py index d2a720c..e76e3d1 100644 --- a/supyr_struct/examples/thumbnail_extractor.py +++ b/supyr_struct/examples/thumbnail_extractor.py @@ -3,132 +3,45 @@ ''' import os +from pathlib import Path from traceback import format_exc from supyr_struct.defs.filesystem.thumbs import thumbs_def from supyr_struct.defs.filesystem.objs.thumbs import catalog_def,\ fast_thumb_stream_def, thumb_stream_def, SOI from supyr_struct.defs.filesystem.objs.thumbs import ThumbsTag -from supyr_struct.examples.olecf_extractor import OlecfExtractor - -test_path = (__file__.split('\\thumbnail_extractor.py')[0] + - '\\test_tags\\images\\test_thumbs.db') +from supyr_struct.examples.olecf_extractor import OlecfExtractor, TAGS_DIR class ThumbsExtractor(OlecfExtractor): tag_def_cls = thumbs_def tag_cls = ThumbsTag - catalog = None def __init__(self, **kwargs): OlecfExtractor.__init__(self, **kwargs) - self.title("Thumbnail database extractor v1.0") - - def load_tag(self, filepath=None): - OlecfExtractor.load_tag(self, filepath) - self.catalog = None - - if self.loaded_tag: - self.catalog = catalog_def.build( - rawdata=self.loaded_tag.get_stream_by_name('Catalog').read()) - self.populate_listbox() + self.title("Thumbnail database extractor v1.1") def extract_all(self): loaded_tag = self.loaded_tag if not isinstance(loaded_tag, self.tag_cls): print('Loaded tag is not an instance of %s' % self.tag_cls) return - self.extract(range(len(self.catalog.catalog_array))) - - def extract(self, file_indices=(), extract_selected=False): - ''' - Extracts the specified thumbnails from the given loaded_tag to a - folder with the same name as the thumbs file in the same parent folder. - ''' - loaded_tag = self.loaded_tag - catalog = self.catalog - - if not isinstance(loaded_tag, self.tag_cls): - print('Loaded tag is not an instance of %s' % self.tag_cls) - return - - # faster local reference and shortens line lengths - get_stream = loaded_tag.get_stream_by_name - dirname = os.path.dirname - exists = os.path.exists - makedirs = os.makedirs - - # get the filepath of the tag without the extension - thumbs_path = os.path.splitext(loaded_tag.filepath)[0] - output_path_template = thumbs_path + '\\%s.jpg' - - # make sure a thumbnail output folder exists - makedirs(thumbs_path + '\\', exist_ok=True) - - if extract_selected: - file_indices = [self.listbox_map[i] for i in - self.file_listbox.curselection()] + self.extract(range(len(loaded_tag.catalog))) - print('extracting %s thumbnails' % len(file_indices)) - - # loop over every entry in the catalog - for i in file_indices: - try: - catalog_entry = catalog.catalog_array[i] - print(' %s' % catalog_entry.name) - - # get the filename - name = catalog_entry.name.replace('/', '\\').split('\\', 1)[-1] - - # make the output path for the thumbnail - output_path = output_path_template % name - output_folder = dirname(output_path) - - # make sure an output folder exists - if not exists(output_folder): - makedirs(output_folder, exist_ok=True) - - # Get a stream buffer to read the thumbnail from. - # The name is the reversed thumbnail index as a string - thumb_stream = get_stream(str(i+1)[::-1]) - - # get the raw thumbnail stream data - thumb_data = thumb_stream.read() - - with open(output_path, 'w+b') as f: - # if this is a non-headered thumbnail then just write it - if thumb_stream.peek(2) == SOI: - f.write(thumb_stream) - continue - - # othewise build the jpeg thumbnail object - thumbnail = fast_thumb_stream_def.build(rawdata=thumb_data) - - # write the thumbnails jpeg stream to the output file - f.write(thumbnail.data_stream) - except Exception: - print(' FAILED TO EXTRACT THUMBNAIL AT INDEX %s' % i) - print(format_exc()) - continue - - print('\n%s\n%s\n%s' % ('-'*79, 'Finished extracting', '-'*79)) + def get_item_data(self, index): + return self.loaded_tag.get_thumbnail_data(index) def get_listbox_entries(self): - catalog = self.catalog - loaded_tag = self.loaded_tag - if not(loaded_tag and catalog): - return {}, () - - listbox_entries = {} - - # loop over every directory entry - for i in range(len(catalog.catalog_array)): - listbox_entries[i] = catalog.catalog_array[i].name - return listbox_entries, list(range(len(catalog.catalog_array))) + tag = self.loaded_tag + indices = list(range(len(tag.catalog))) if tag else [] + entries = {i: tag.get_thumbnail_path(i) for i in indices} + return entries, indices try: if __name__ == '__main__': - extractor = ThumbsExtractor(filepath=test_path) + extractor = ThumbsExtractor( + filepath=TAGS_DIR.joinpath('images/test_thumbs.db') + ) extractor.mainloop() except Exception: print(format_exc()) diff --git a/supyr_struct/field_type_methods/decoders.py b/supyr_struct/field_type_methods/decoders.py index 4c1c520..8e6913b 100644 --- a/supyr_struct/field_type_methods/decoders.py +++ b/supyr_struct/field_type_methods/decoders.py @@ -67,12 +67,9 @@ def decode_decimal(self, rawdata, desc=None, parent=None, attr_index=None): Returns a Decimal represention of the "rawdata" argument. ''' - if self.endian == '<': - endian = 'little' - else: - endian = 'big' - d_exp = parent.get_meta('DECIMAL_EXP', attr_index) - bigint = str(int.from_bytes( + endian = 'little' if self.endian == '<' else 'big' + d_exp = parent.get_meta('DECIMAL_EXP', attr_index) + bigint = str(int.from_bytes( rawdata, endian, signed=self.enc.endswith('S'))) return Decimal(bigint[:len(bigint)-d_exp] + '.' + @@ -87,11 +84,10 @@ def decode_24bit_numeric(self, rawdata, desc=None, Returns an int decoded represention of the "rawdata" argument. ''' - if self.endian == '<': - rawint = unpack('I', b'\x00' + rawdata)[0] - + rawint = ( + unpack('I', b'\x00' + rawdata) + )[0] # if the int can be signed and IS signed then take care of that if rawint & 0x800000 and self.enc[1] == 't': return rawint - 0x1000000 # 0x1000000 == 0x800000 * 2 @@ -148,20 +144,14 @@ def decode_big_int(self, rawdata, desc=None, parent=None, attr_index=None): if not len(rawdata): return 0 - if self.endian == '<': - endian = 'little' - else: - endian = 'big' - - if self.enc[-1] == 's': - # ones compliment - bigint = int.from_bytes(rawdata, endian, signed=True) - if bigint < 0: - return bigint + 1 - return bigint - elif self.enc[-1] == 'S': + endian = 'little' if self.endian == '<' else 'big' + if self.enc[-1] == 'S': # twos compliment return int.from_bytes(rawdata, endian, signed=True) + elif self.enc[-1] == 's': + # ones compliment + bigint = int.from_bytes(rawdata, endian, signed=True) + return bigint + (bigint < 0) return int.from_bytes(rawdata, endian) diff --git a/supyr_struct/field_type_methods/encoders.py b/supyr_struct/field_type_methods/encoders.py index f7081d5..ea56de8 100644 --- a/supyr_struct/field_type_methods/encoders.py +++ b/supyr_struct/field_type_methods/encoders.py @@ -78,17 +78,17 @@ def encode_24bit_numeric(self, node, parent=None, attr_index=None): # int can be signed assert node >= -0x800000 and node <= 0x7fffff, ( '%s is too large to pack as a 24bit signed int.' % node) - if node < 0: - # int IS signed - node += 0x1000000 + # int IS signed + node += 0x1000000 if node < 0 else 0 else: assert node >= 0 and node <= 0xffffff, ( '%s is too large to pack as a 24bit unsigned int.' % node) # pack and return the int - if self.endian == '<': - return pack('I', node)[1:4] + return ( + pack('I', node)[1:4] + ) def encode_int_timestamp(self, node, parent=None, attr_index=None): @@ -148,19 +148,13 @@ def encode_big_int(self, node, parent=None, attr_index=None): if not bytecount: return b'' - if self.endian == '<': - endian = 'little' - else: - endian = 'big' - + endian = 'little' if self.endian == '<' else 'big' if self.enc[-1] == 'S': # twos compliment return node.to_bytes(bytecount, endian, signed=True) elif self.enc[-1] == 's': # ones compliment - if node < 0: - return (node-1).to_bytes(bytecount, endian, signed=True) - return node.to_bytes(bytecount, endian, signed=False) + return (node - (node < 0)).to_bytes(bytecount, endian, signed=True) return node.to_bytes(bytecount, endian) diff --git a/supyr_struct/field_type_methods/parsers.py b/supyr_struct/field_type_methods/parsers.py index e0b7ad0..3f35e56 100644 --- a/supyr_struct/field_type_methods/parsers.py +++ b/supyr_struct/field_type_methods/parsers.py @@ -111,8 +111,8 @@ def container_parser(self, desc, node=None, parent=None, attr_index=None, """ """ + orig_offset = offset try: - orig_offset = offset if node is None: parent[attr_index] = node = desc.get(NODE_CLS, self.node_cls)\ (desc, parent=parent) @@ -124,17 +124,17 @@ def container_parser(self, desc, node=None, parent=None, attr_index=None, if 'STEPTREE' in desc: kwargs['steptree_parents'].append(node) - align = desc.get('ALIGN') - # If there is a specific pointer to read the node from then go to it. # Only do this, however, if the POINTER can be expected to be accurate. # If the pointer is a path to a previously parsed field, but this node # is being built without a parent(such as from an exported block) # then the path wont be valid. The current offset will be used instead. - if attr_index is not None and desc.get('POINTER') is not None: - offset = node.get_meta('POINTER', **kwargs) - elif align: - offset += (align - (offset % align)) % align + align = desc.get('ALIGN') + offset = ( + offset + ((align - (offset%align))%align if align else 0) + if None in (attr_index, desc.get('POINTER')) else + node.get_meta('POINTER', **kwargs) + ) # loop once for each field in the node for i in range(len(node)): @@ -178,8 +178,8 @@ def array_parser(self, desc, node=None, parent=None, attr_index=None, """ """ + orig_offset = offset try: - orig_offset = offset if node is None: parent[attr_index] = node = desc.get(NODE_CLS, self.node_cls)\ (desc, parent=parent) @@ -193,17 +193,17 @@ def array_parser(self, desc, node=None, parent=None, attr_index=None, a_desc = desc['SUB_STRUCT'] a_parser = a_desc['TYPE'].parser - align = desc.get('ALIGN') - # If there is a specific pointer to read the node from then go to it. # Only do this, however, if the POINTER can be expected to be accurate. # If the pointer is a path to a previously parsed field, but this node # is being built without a parent(such as from an exported block) # then the path wont be valid. The current offset will be used instead. - if attr_index is not None and desc.get('POINTER') is not None: - offset = node.get_meta('POINTER', **kwargs) - elif align: - offset += (align - (offset % align)) % align + align = desc.get('ALIGN') + offset = ( + offset + ((align - (offset%align))%align if align else 0) + if None in (attr_index, desc.get('POINTER')) else + node.get_meta('POINTER', **kwargs) + ) # loop once for each field in the node for i in range(node.get_size(**kwargs)): @@ -247,8 +247,8 @@ def while_array_parser(self, desc, node=None, parent=None, attr_index=None, """ """ + orig_offset = offset try: - orig_offset = offset if node is None: parent[attr_index] = node = desc.get(NODE_CLS, self.node_cls)\ (desc, parent=parent) @@ -262,17 +262,17 @@ def while_array_parser(self, desc, node=None, parent=None, attr_index=None, a_desc = desc['SUB_STRUCT'] a_parser = a_desc['TYPE'].parser - align = desc.get('ALIGN') - # If there is a specific pointer to read the node from then go to it. # Only do this, however, if the POINTER can be expected to be accurate. # If the pointer is a path to a previously parsed field, but this node # is being built without a parent(such as from an exported block) # then the path wont be valid. The current offset will be used instead. - if attr_index is not None and desc.get('POINTER') is not None: - offset = node.get_meta('POINTER', **kwargs) - elif align: - offset += (align - (offset % align)) % align + align = desc.get('ALIGN') + offset = ( + offset + ((align - (offset%align))%align if align else 0) + if None in (attr_index, desc.get('POINTER')) else + node.get_meta('POINTER', **kwargs) + ) i = 0 decider = desc.get('CASE') @@ -360,6 +360,7 @@ def switch_parser(self, desc, node=None, parent=None, attr_index=None, align = desc.get('ALIGN') if align: offset += (align - (offset % align)) % align + try: # try to reposition the rawdata if it needs to be peeked rawdata.seek(root_offset + offset) @@ -395,8 +396,8 @@ def struct_parser(self, desc, node=None, parent=None, attr_index=None, """ """ + orig_offset = offset try: - orig_offset = offset if node is None: parent[attr_index] = node = desc.get(NODE_CLS, self.node_cls)\ (desc, parent=parent, init_attrs=rawdata is None) @@ -415,11 +416,12 @@ def struct_parser(self, desc, node=None, parent=None, attr_index=None, # a previously parsed field, but this node is being built # without a parent(such as from an exported block) then # the path wont be valid. The current offset will be used instead. - if attr_index is not None and 'POINTER' in desc: - offset = node.get_meta('POINTER', **kwargs) - elif 'ALIGN' in desc: - align = desc['ALIGN'] - offset += (align - (offset % align)) % align + align = desc.get('ALIGN') + offset = ( + offset + ((align - (offset%align))%align if align else 0) + if None in (attr_index, desc.get('POINTER')) else + node.get_meta('POINTER', **kwargs) + ) # loop once for each field in the node for i, off in enumerate(desc['ATTR_OFFS']): @@ -465,12 +467,12 @@ def quickstruct_parser(self, desc, node=None, parent=None, attr_index=None, """ """ + orig_offset = offset try: # we wanna go as fast as possible, so we completely skip over the # nodes __setitem__ magic method by calling the lists one directly __lsi__ = list.__setitem__ - orig_offset = offset if node is None: parent[attr_index] = node = desc.get(NODE_CLS, self.node_cls)\ (desc, parent=parent) @@ -483,39 +485,39 @@ def quickstruct_parser(self, desc, node=None, parent=None, attr_index=None, # a previously parsed field, but this node is being built # without a parent(such as from an exported block) then # the path wont be valid. The current offset will be used instead. - if attr_index is not None and 'POINTER' in desc: - offset = node.get_meta('POINTER', **kwargs) - elif 'ALIGN' in desc: - align = desc['ALIGN'] - offset += (align - (offset % align)) % align + align = desc.get('ALIGN') + offset = ( + offset + ((align - (offset%align))%align if align else 0) + if None in (attr_index, desc.get('POINTER')) else + node.get_meta('POINTER', **kwargs) + ) struct_off = root_offset + offset f_endian = self.f_endian # loop once for each field in the node for i, off in enumerate(desc['ATTR_OFFS']): - off += struct_off typ = desc[i]['TYPE'] # check the forced endianness of the typ being parsed # before trying to use the endianness of the struct - if f_endian == "=" and typ.f_endian == "=": - pass - elif typ.f_endian == ">": - typ = typ.big - elif typ.f_endian == "<" or f_endian == "<": - typ = typ.little - else: - typ = typ.big + endian = f_endian if typ.f_endian == "=" else typ.f_endian + typ = ( + typ if endian == "=" else + typ.little if endian == "<" else + typ.big + ) __lsi__(node, i, typ.struct_unpacker( - rawdata[off:off + typ.size])[0]) + rawdata[off + struct_off: off + struct_off + typ.size] + )[0]) # increment offset by the size of the struct offset += desc['SIZE'] else: for i in range(len(node)): - __lsi__(node, i, - desc[i].get(DEFAULT, desc[i]['TYPE'].default())) + sub_desc = desc[i] + sub_type = sub_desc['TYPE'] + __lsi__(node, i, sub_desc.get(DEFAULT, sub_type.default())) if 'STEPTREE' in desc: s_desc = desc['STEPTREE'] @@ -551,11 +553,9 @@ def quickstruct_parser(self, desc, node=None, parent=None, attr_index=None, def stream_adapter_parser(self, desc, node=None, parent=None, attr_index=None, rawdata=None, root_offset=0, offset=0, **kwargs): - - + orig_root_offset = root_offset + orig_offset = offset try: - orig_root_offset = root_offset - orig_offset = offset if node is None: parent[attr_index] = node = ( desc.get(NODE_CLS, self.node_cls)(desc, parent=parent)) @@ -564,18 +564,18 @@ def stream_adapter_parser(self, desc, node=None, parent=None, attr_index=None, # If there is rawdata to build from if rawdata is not None: - align = desc.get('ALIGN') - # If there is a specific pointer to read the node from # then go to it. Only do this, however, if the POINTER can # be expected to be accurate. If the pointer is a path to # a previously parsed field, but this node is being built # without a parent(such as from an exported block) then # the path wont be valid. The current offset will be used instead. - if attr_index is not None and desc.get('POINTER') is not None: - offset = node.get_meta('POINTER', **kwargs) - elif align: - offset += (align - (offset % align)) % align + align = desc.get('ALIGN') + offset = ( + offset + ((align - (offset%align))%align if align else 0) + if None in (attr_index, desc.get('POINTER')) else + node.get_meta('POINTER', **kwargs) + ) # use the decoder method to get a decoded stream and # the length of the stream before it was decoded @@ -604,10 +604,9 @@ def stream_adapter_parser(self, desc, node=None, parent=None, attr_index=None, def union_parser(self, desc, node=None, parent=None, attr_index=None, rawdata=None, root_offset=0, offset=0, **kwargs): - + orig_offset = offset try: - orig_offset = offset if node is None: parent[attr_index] = node = ( desc.get(NODE_CLS, self.node_cls)(desc, parent=parent)) @@ -618,14 +617,14 @@ def union_parser(self, desc, node=None, parent=None, attr_index=None, # A case may be provided through kwargs. # This is to allow overriding behavior of the union and # to allow creating a node specified by the user - case_i = case = desc.get('CASE') case_map = desc['CASE_MAP'] - align = desc.get('ALIGN') - - if attr_index is not None and desc.get('POINTER') is not None: - offset = node.get_meta('POINTER', **kwargs) - elif align: - offset += (align - (offset % align)) % align + case_i = case = desc.get('CASE') + align = desc.get('ALIGN') + offset = ( + offset + ((align - (offset%align))%align if align else 0) + if None in (attr_index, desc.get('POINTER')) else + node.get_meta('POINTER', **kwargs) + ) # read and store the rawdata to the new node rawdata.seek(root_offset + offset) @@ -743,15 +742,15 @@ def cstring_parser(self, desc, node=None, parent=None, attr_index=None, "and not None when reading a data field.") if rawdata is not None: - orig_offset = offset - align = desc.get('ALIGN') - if attr_index is not None and desc.get('POINTER') is not None: - offset = parent.get_meta('POINTER', attr_index, **kwargs) - elif align: - offset += (align - (offset % align)) % align - - start = root_offset + offset - charsize = self.size + align = desc.get('ALIGN') + offset = ( + parent.get_meta('POINTER', attr_index, **kwargs) + if desc.get('POINTER') is not None else + offset + ((align - (offset%align))%align if align else 0) + ) + + start = root_offset + offset + charsize = self.size delimiter = self.delimiter # if the character size is greater than 1 we need to do special @@ -794,12 +793,12 @@ def py_array_parser(self, desc, node=None, parent=None, attr_index=None, "and not None when reading a data field.") if rawdata is not None: - orig_offset = offset - align = desc.get('ALIGN') - if attr_index is not None and desc.get('POINTER') is not None: - offset = parent.get_meta('POINTER', attr_index, **kwargs) - elif align: - offset += (align - (offset % align)) % align + align = desc.get('ALIGN') + offset = ( + parent.get_meta('POINTER', attr_index, **kwargs) + if desc.get('POINTER') is not None else + offset + ((align - (offset%align))%align if align else 0) + ) bytecount = parent.get_size(attr_index, offset=offset, rawdata=rawdata, **kwargs) @@ -807,16 +806,12 @@ def py_array_parser(self, desc, node=None, parent=None, attr_index=None, rawdata.seek(root_offset + offset) offset += bytecount + py_array = self.node_cls(self.enc, rawdata.read(bytecount)) # if the system the array is being created on # has a different endianness than what the array is # packed as, swap the endianness after reading it. - if self.endian != byteorder_char and self.endian != '=': - parent[attr_index] = py_array = self.node_cls( - self.enc, rawdata.read(bytecount)) - py_array.byteswap() - return offset - - parent[attr_index] = self.node_cls(self.enc, rawdata.read(bytecount)) + self.endian in (byteorder_char, '=') or py_array.byteswap() + parent[attr_index] = py_array # pass the incremented offset to the caller return offset @@ -844,8 +839,7 @@ def bytes_parser(self, desc, node=None, parent=None, attr_index=None, "parent and attr_index must be provided " + "and not None when reading a data field.") if rawdata is not None: - orig_offset = offset - if attr_index is not None and desc.get('POINTER') is not None: + if desc.get('POINTER') is not None: offset = parent.get_meta('POINTER', attr_index, **kwargs) bytecount = parent.get_size(attr_index, offset=offset, @@ -885,11 +879,10 @@ def bit_struct_parser(self, desc, node=None, parent=None, attr_index=None, """If there is file data to build the structure from""" if rawdata is not None: rawdata.seek(root_offset + offset) - structsize = desc['SIZE'] - if self.endian == '<': - rawint = int.from_bytes(rawdata.read(structsize), 'little') - else: - rawint = int.from_bytes(rawdata.read(structsize), 'big') + size = desc['SIZE'] + rawint = int.from_bytes( + rawdata.read(size), 'little' if self.endian == '<' else 'big' + ) # loop once for each field in the node for i in range(len(node)): @@ -897,7 +890,7 @@ def bit_struct_parser(self, desc, node=None, parent=None, attr_index=None, rawint, desc=desc[i], parent=node, attr_index=i) # increment offset by the size of the struct - offset += structsize + offset += size return offset except (Exception, KeyboardInterrupt) as e: diff --git a/supyr_struct/field_type_methods/serializers.py b/supyr_struct/field_type_methods/serializers.py index 75abfbb..2d18d9f 100644 --- a/supyr_struct/field_type_methods/serializers.py +++ b/supyr_struct/field_type_methods/serializers.py @@ -87,9 +87,8 @@ def container_serializer(self, node, parent=None, attr_index=None, writebuffer=None, root_offset=0, offset=0, **kwargs): """ """ - + orig_offset = offset try: - orig_offset = offset desc = node.desc is_steptree_root = (desc.get('STEPTREE_ROOT') or @@ -99,17 +98,17 @@ def container_serializer(self, node, parent=None, attr_index=None, if hasattr(node, 'STEPTREE'): kwargs['steptree_parents'].append(node) - align = desc.get('ALIGN') - # If there is a specific pointer to read the node from then go to it. # Only do this, however, if the POINTER can be expected to be accurate. # If the pointer is a path to a previously parsed field, but this node # is being built without a parent(such as from an exported block) # then the path wont be valid. The current offset will be used instead. - if attr_index is not None and desc.get('POINTER') is not None: - offset = node.get_meta('POINTER', **kwargs) - elif align: - offset += (align - (offset % align)) % align + align = desc.get('ALIGN') + offset = ( + offset + ((align - (offset%align))%align if align else 0) + if None in (attr_index, desc.get('POINTER')) else + node.get_meta('POINTER', **kwargs) + ) # loop once for each node in the node for i in range(len(node)): @@ -164,9 +163,8 @@ def array_serializer(self, node, parent=None, attr_index=None, writebuffer=None, root_offset=0, offset=0, **kwargs): """ """ - + orig_offset = offset try: - orig_offset = offset desc = node.desc a_desc = desc['SUB_STRUCT'] a_serializer = a_desc['TYPE'].serializer @@ -178,16 +176,17 @@ def array_serializer(self, node, parent=None, attr_index=None, if hasattr(node, 'STEPTREE'): kwargs['steptree_parents'].append(node) - align = desc.get('ALIGN') # If there is a specific pointer to read the node from then go to it. # Only do this, however, if the POINTER can be expected to be accurate. # If the pointer is a path to a previously parsed field, but this node # is being built without a parent(such as from an exported block) # then the path wont be valid. The current offset will be used instead. - if attr_index is not None and desc.get('POINTER') is not None: - offset = node.get_meta('POINTER', **kwargs) - elif align: - offset += (align - (offset % align)) % align + align = desc.get('ALIGN') + offset = ( + offset + ((align - (offset%align))%align if align else 0) + if None in (attr_index, desc.get('POINTER')) else + node.get_meta('POINTER', **kwargs) + ) # loop once for each node in the node for i in range(len(node)): @@ -246,9 +245,8 @@ def struct_serializer(self, node, parent=None, attr_index=None, writebuffer=None, root_offset=0, offset=0, **kwargs): """ """ - + orig_offset = offset try: - orig_offset = offset desc = node.desc structsize = desc['SIZE'] is_tree_root = 'steptree_parents' not in kwargs @@ -258,17 +256,17 @@ def struct_serializer(self, node, parent=None, attr_index=None, if hasattr(node, 'STEPTREE'): kwargs['steptree_parents'].append(node) - align = desc.get('ALIGN') - # If there is a specific pointer to read the node from then go to it. # Only do this, however, if the POINTER can be expected to be accurate. # If the pointer is a path to a previously parsed field, but this node # is being built without a parent(such as from an exported block) # then the path wont be valid. The current offset will be used instead. - if attr_index is not None and desc.get('POINTER') is not None: - offset = node.get_meta('POINTER', **kwargs) - elif align: - offset += (align - (offset % align)) % align + align = desc.get('ALIGN') + offset = ( + offset + ((align - (offset%align))%align if align else 0) + if None in (attr_index, desc.get('POINTER')) else + node.get_meta('POINTER', **kwargs) + ) # write the whole size of the node so # any padding is filled in properly @@ -332,25 +330,23 @@ def quickstruct_serializer(self, node, parent=None, attr_index=None, **kwargs): """ """ - + orig_offset = offset try: __lgi__ = list.__getitem__ - orig_offset = offset desc = node.desc - offsets = desc['ATTR_OFFS'] structsize = desc['SIZE'] - align = desc.get('ALIGN') - # If there is a specific pointer to read the node from then go to it. # Only do this, however, if the POINTER can be expected to be accurate. # If the pointer is a path to a previously parsed field, but this node # is being built without a parent(such as from an exported block) # then the path wont be valid. The current offset will be used instead. - if attr_index is not None and desc.get('POINTER') is not None: - offset = node.get_meta('POINTER', **kwargs) - elif align: - offset += (align - (offset % align)) % align + align = desc.get('ALIGN') + offset = ( + offset + ((align - (offset%align))%align if align else 0) + if None in (attr_index, desc.get('POINTER')) else + node.get_meta('POINTER', **kwargs) + ) # write the whole size of the node so # any padding is filled in properly @@ -365,14 +361,12 @@ def quickstruct_serializer(self, node, parent=None, attr_index=None, typ = desc[i]['TYPE'] # check the forced endianness of the typ being serialized # before trying to use the endianness of the struct - if f_endian == "=" and typ.f_endian == "=": - pass - elif typ.f_endian == ">": - typ = typ.big - elif typ.f_endian == "<" or f_endian == "<": - typ = typ.little - else: - typ = typ.big + endian = f_endian if typ.f_endian == "=" else typ.f_endian + typ = ( + typ if endian == "=" else + typ.little if endian == "<" else + typ.big + ) writebuffer.seek(struct_off + off) writebuffer.write(typ.struct_packer(__lgi__(node, i))) @@ -421,14 +415,11 @@ def quickstruct_serializer(self, node, parent=None, attr_index=None, def stream_adapter_serializer(self, node, parent=None, attr_index=None, writebuffer=None, root_offset=0, offset=0, **kwargs): - - + orig_offset = offset try: # make a new buffer to write the data to temp_buffer = BytearrayBuffer() - orig_offset = offset desc = node.desc - align = desc.get('ALIGN') try: sub_desc = node.data.desc @@ -440,10 +431,12 @@ def stream_adapter_serializer(self, node, parent=None, attr_index=None, # If the pointer is a path to a previously parsed field, but this node # is being built without a parent(such as from an exported block) # then the path wont be valid. The current offset will be used instead. - if attr_index is not None and desc.get('POINTER') is not None: - offset = node.get_meta('POINTER', **kwargs) - elif align: - offset += (align - (offset % align)) % align + align = desc.get('ALIGN') + offset = ( + offset + ((align - (offset%align))%align if align else 0) + if None in (attr_index, desc.get('POINTER')) else + node.get_meta('POINTER', **kwargs) + ) # write the sub_struct to the temp buffer sub_desc['TYPE'].serializer(node.data, node, 'SUB_STRUCT', @@ -462,9 +455,9 @@ def stream_adapter_serializer(self, node, parent=None, attr_index=None, except (Exception, KeyboardInterrupt) as e: desc = locals().get('desc', None) error = format_serialize_error( - e, field_type=self, desc=desc, parent=parent, buffer=temp_buffer, - attr_index=attr_index, root_offset=root_offset, offset=offset, - **kwargs) + e, field_type=self, desc=desc, parent=parent, + attr_index=attr_index, buffer=temp_buffer, + root_offset=root_offset, offset=orig_offset, **kwargs) # raise a new error if it was replaced, otherwise reraise if error is e: raise @@ -473,22 +466,19 @@ def stream_adapter_serializer(self, node, parent=None, attr_index=None, def union_serializer(self, node, parent=None, attr_index=None, writebuffer=None, root_offset=0, offset=0, **kwargs): - - + orig_offset = offset try: - orig_offset = offset - desc = node.desc - align = desc.get('ALIGN') - - if attr_index is not None and desc.get('POINTER') is not None: - offset = node.get_meta('POINTER', **kwargs) - elif align: - offset += (align - (offset % align)) % align + desc = node.desc + align = desc.get('ALIGN') + offset = ( + offset + ((align - (offset%align))%align if align else 0) + if None in (attr_index, desc.get('POINTER')) else + node.get_meta('POINTER', **kwargs) + ) # if the u_node is not flushed to the UnionBlock, do it # before writing the UnionBlock to the writebuffer - if node.u_index is not None: - node.flush() + node.u_index is None or node.flush() # write the UnionBlock to the writebuffer writebuffer.seek(root_offset + offset) @@ -502,9 +492,9 @@ def union_serializer(self, node, parent=None, attr_index=None, except (Exception, KeyboardInterrupt) as e: desc = locals().get('desc', None) error = format_serialize_error( - e, field_type=self, desc=desc, parent=parent, buffer=writebuffer, - attr_index=attr_index, root_offset=root_offset, offset=offset, - **kwargs) + e, field_type=self, desc=desc, parent=parent, + attr_index=attr_index, buffer=writebuffer, + root_offset=root_offset, offset=orig_offset, **kwargs) # raise a new error if it was replaced, otherwise reraise if error is e: raise @@ -528,12 +518,13 @@ def data_serializer(self, node, parent=None, attr_index=None, """ """ node_bytes = self.encoder(node, parent, attr_index) - writebuffer.seek(root_offset + offset) - writebuffer.write(node_bytes) size = parent.get_size(attr_index, root_offset=root_offset, offset=offset, **kwargs) if size - len(node_bytes): - writebuffer.write(b'\x00'*(size - len(node_bytes))) + node_bytes += b'\x00'*(size - len(node_bytes)) + + writebuffer.seek(root_offset + offset) + writebuffer.write(node_bytes) return offset + size @@ -541,24 +532,18 @@ def cstring_serializer(self, node, parent=None, attr_index=None, writebuffer=None, root_offset=0, offset=0, **kwargs): """ """ - orig_offset = offset - p_desc = parent.desc - if p_desc['TYPE'].is_array: - desc = p_desc['SUB_STRUCT'] - else: - desc = p_desc[attr_index] - - if attr_index is not None: - if parent is not None: - # if the parent and attr_index arent - # None, pointers may need to be used - align = desc.get('ALIGN') - if desc.get('POINTER') is not None: - offset = parent.get_meta('POINTER', attr_index, **kwargs) - elif align: - offset += (align - (offset % align)) % align - elif align: - offset += (align - (offset % align)) % align + p_desc = parent.desc + desc = p_desc[ + 'SUB_STRUCT' if p_desc['TYPE'].is_array else attr_index + ] + # if the parent and attr_index arent + # None, pointers may need to be used + align = desc.get('ALIGN') + offset = ( + parent.get_meta('POINTER', attr_index, **kwargs) + if desc.get('POINTER') is not None else + offset + ((align - (offset%align))%align if align else 0) + ) node = self.encoder(node, parent, attr_index) writebuffer.seek(root_offset + offset) @@ -572,39 +557,31 @@ def py_array_serializer(self, node, parent=None, attr_index=None, writebuffer=None, root_offset=0, offset=0, **kwargs): """ """ - orig_offset = offset - p_desc = parent.desc - if p_desc['TYPE'].is_array: - desc = p_desc['SUB_STRUCT'] - else: - desc = p_desc[attr_index] - - if attr_index is not None: - if parent is not None: - # if the parent and attr_index arent - # None, pointers may need to be used - align = desc.get('ALIGN') - if desc.get('POINTER') is not None: - offset = parent.get_meta('POINTER', attr_index, **kwargs) - elif align: - offset += (align - (offset % align)) % align - elif align: - offset += (align - (offset % align)) % align + p_desc = parent.desc + desc = p_desc[ + 'SUB_STRUCT' if p_desc['TYPE'].is_array else attr_index + ] + # if the parent and attr_index arent None, pointers may need to be used + align = desc.get('ALIGN') + offset = ( + parent.get_meta('POINTER', attr_index, **kwargs) + if desc.get('POINTER') is not None else + offset + ((align - (offset%align))%align if align else 0) + ) writebuffer.seek(root_offset + offset) # This is the only method I can think of to tell if # the endianness of an array needs to be changed since # the array.array objects dont know their own endianness''' - if self.endian != byteorder_char and self.endian != '=': - # if the system the array exists on has a different - # endianness than what the array should be written as, - # then the endianness is swapped before writing it. - node.byteswap() - writebuffer.write(node) - node.byteswap() - else: - writebuffer.write(node) + endian_is_correct = self.endian in (byteorder_char, '=') + + # if the system the array exists on has a different + # endianness than what the array should be written as, + # then the endianness is swapped before writing it. + endian_is_correct or node.byteswap() + writebuffer.write(node) + endian_is_correct or node.byteswap() size = parent.get_size(attr_index, root_offset=root_offset, offset=offset, **kwargs) @@ -618,17 +595,12 @@ def bytes_serializer(self, node, parent=None, attr_index=None, writebuffer=None, root_offset=0, offset=0, **kwargs): """ """ - orig_offset = offset - - if parent and attr_index is not None: - p_desc = parent.desc - if p_desc['TYPE'].is_array: - desc = p_desc['SUB_STRUCT'] - else: - desc = p_desc[attr_index] - - if desc.get('POINTER') is not None: - offset = parent.get_meta('POINTER', attr_index, **kwargs) + p_desc = parent.desc + desc = p_desc[ + 'SUB_STRUCT' if p_desc['TYPE'].is_array else attr_index + ] + if desc.get('POINTER') is not None: + offset = parent.get_meta('POINTER', attr_index, **kwargs) writebuffer.seek(root_offset + offset) writebuffer.write(node) @@ -647,28 +619,26 @@ def bit_struct_serializer(self, node, parent=None, attr_index=None, try: data = 0 desc = node.desc - structsize = desc['SIZE'] + size = desc['SIZE'] # get a list of everything as unsigned # ints with their masks and offsets - for i in range(len(node)): + for i, subnode in enumerate(node): try: - bitint = node[i].desc[TYPE].encoder(node[i], node, i) + bitint = subnode.desc[TYPE].encoder(subnode, node, i) except AttributeError: - bitint = desc[i][TYPE].encoder(node[i], node, i) + bitint = desc[i][TYPE].encoder(subnode, node, i) # combine with the other data # 0=U_Int being written, 1=bit offset of U_Int, 2=U_Int mask - data += (bitint[0] & bitint[2]) << bitint[1] + data |= (bitint[0] & bitint[2]) << bitint[1] writebuffer.seek(root_offset + offset) + writebuffer.write(data.to_bytes( + size, ('little' if self.endian == '<' else 'big') + )) - if self.endian == '<': - writebuffer.write(data.to_bytes(structsize, 'little')) - else: - writebuffer.write(data.to_bytes(structsize, 'big')) - - return offset + structsize + return offset + size except (Exception, KeyboardInterrupt) as e: # if the error occurred while parsing something that doesnt have an # error report routine built into the function, do it for it. diff --git a/supyr_struct/field_type_methods/sizecalcs.py b/supyr_struct/field_type_methods/sizecalcs.py index 0b5b2fb..11c0c10 100644 --- a/supyr_struct/field_type_methods/sizecalcs.py +++ b/supyr_struct/field_type_methods/sizecalcs.py @@ -21,7 +21,7 @@ def sizecalc_wrapper(sc): ''' ''' - def wrapped_sizecalc(self, node, _sizecalc=sc, *a, **kw): + def wrapped_sizecalc(self, node, *a, _sizecalc=sc, **kw): return _sizecalc(self, node.data, *a, **kw) return wrapped_sizecalc diff --git a/supyr_struct/field_types.py b/supyr_struct/field_types.py index 4c82c86..e8cd96d 100644 --- a/supyr_struct/field_types.py +++ b/supyr_struct/field_types.py @@ -616,14 +616,14 @@ def __init__(self, **kwargs): if isinstance(kwargs.get("enc"), str): self.enc = kwargs["enc"] elif isinstance(kwargs.get("enc"), dict): - enc = kwargs["enc"] - if not('<' in enc and '>' in enc): + enc_dict = kwargs["enc"] + if not('<' in enc_dict and '>' in enc_dict): raise TypeError( "When providing endianness reliant encodings, " + "big and little endian\nmust both be provided " + "under the keys '>' and '<' respectively.") # make the first encoding the endianness of the system - self.enc = enc['<'] + self.enc = enc_dict['<'] self.endian = byteorder_char if self.is_container and self.is_struct: diff --git a/supyr_struct/util.py b/supyr_struct/util.py index 88aa1be..9f25fcd 100644 --- a/supyr_struct/util.py +++ b/supyr_struct/util.py @@ -87,7 +87,7 @@ def str_to_identifier(string): of invalid non-alphanumeric characters with an underscore. Trailing underscores are removed. ''' - assert isinstance(string, str) + assert isinstance(string, str), "Expected str, but got %s" % type(string) new_string = re.sub(non_alphanum_set, '_', string) new_string = re.sub(digits_at_start, '', new_string) @@ -98,13 +98,19 @@ def str_to_identifier(string): return new_string -def desc_variant(desc, *replacements): +def desc_variant(desc, *replacements, verify=False, **kwargs): ''' Fringe: Used to generate a new descriptor using a set of replacements. + Replacements can either be a field descriptor, or a tuple containing + the name of the field to replace, and the replacement field descriptor. + If verify is True, replacement fields will have their size checked to + ensure it matches the size of the replaced. If they don't match or it + can't be determiend if they do, a ValueError is thrown. desc_variant(some_descriptor, (str:name_of_old_field, FieldType:new_field_def), (str:name_of_another_old_field, FieldType:some_other_field_def), + FieldType:new_field_def_with_same_name_as_old, ) Ex: ```py @@ -112,33 +118,124 @@ def desc_variant(desc, *replacements): UInt32("one"), UInt32("two"), UInt32("three"), - ) + ) thing_variant = desc_variant(thing, - ("two", - Struct("new_two", UInt16("something"), Uint16("some_other")) - ), - ) + ("two", Struct("new_two", UInt16("something"), Uint16("some_other"))), + Struct("three", UInt16("aaaa"), Uint16("bbbb")), + ) ``` - This would make thing_variant a variant of thing where UInt32 "two" - is replaced by a Struct called "new_two". + This would make thing_variant a variant of thing where UInt32 "two" is + replaced by a Struct called "new_two", and "three" is similarly replaced. ''' desc, name_map = dict(desc), dict() + desc.update(kwargs) + + # NOTE: this function has been improved to make it much harder to + # accidentally replace the wrong field, or use a field with + # a mismatched size. If a name isn't provided, we're assumed + # to need to find something to replace with the same name as + # what we've been provided. Additionally, we can check that + # the size of the replacement is the same as what is replaced for i in range(desc['ENTRIES']): - name = desc[i].get('NAME', '_') - # padding uses _ as its name - if name == '_': - # Doing this is midly faster + sub_desc = desc[i] + name = sub_desc.get('NAME', None) + ftyp = sub_desc.get('TYPE') + + # padding uses an underscore as its name, so + # we only match by name if it's not padding + if name != "_": + name_map[str_to_identifier(name)] = i + elif not ftyp or ftyp.name != "Pad": + # dont let this silently cause bugs + raise ValueError("Expected padding, but got %s" % ftyp) + else: + # generate the name we expect the user to pass for the padding name_map['pad_%d' % i] = i - continue - name_map[str_to_identifier(name)] = i - for name, new_sub_desc in replacements: - desc[name_map[str_to_identifier(name)]] = new_sub_desc + for replacement in replacements: + name, new_sub_desc = None, None + + if isinstance(replacement, dict): + # we were provided just a desc + new_sub_desc = replacement + elif not(replacement and isinstance(replacement, (list, tuple))): + raise ValueError("Invalid replacement supplied: %s of type %s" % + (replacement, type(replacement)) + ) + else: + # we were given a list or tuple. figure out what was passed + for val in replacement: + if not name and isinstance(val, str): + name = val + elif not isinstance(val, dict): + raise ValueError("Unknown replacement value passed: %s" % val) + elif not new_sub_desc and val.get('TYPE'): + new_sub_desc = val + else: + raise ValueError("Unexpected replacement value passed: %s" % val) + + if not new_sub_desc: + raise ValueError("No replacement desc provided.") + + if name is None: + # we were provided a replacement desc without a target name. + # assume the name of the field we're replacing matches its name + name = new_sub_desc["NAME"] + + # figure out what index to put the replacement into, and + # (if requested) do some validation on the replacement. + index = name_map[str_to_identifier(name)] + verify_args = (desc, new_sub_desc, desc[index]) + if verify and get_replacement_field_conflict(*verify_args): + raise ValueError( + "Incompatible replacement detected for field '%s':\n\t%s" % + (name, get_replacement_field_conflict(*verify_args)) + ) + + desc[index] = new_sub_desc return desc +def desc_variant_with_verify(desc, *replacements, **kwargs): + '''Version of desc_variant with size verification defaulted to True.''' + kwargs.setdefault("verify", True) + return desc_variant(desc, *replacements, **kwargs) + + +def get_replacement_field_conflict(parent_desc, new_desc, old_desc): + ''' + Returns a string to indicate why a field is not a valid + replacement for another field. An empty string will be + returned if there are no conflicts. + ''' + parent_type = parent_desc["TYPE"] + old_type = old_desc["TYPE"] + new_type = new_desc["TYPE"] + old_size = old_desc.get("SIZE") if old_type.is_var_size else old_type.size + new_size = new_desc.get("SIZE") if new_type.is_var_size else new_type.size + + error_str = "" + error_args = () + if old_size is None and new_size is None: + # sizes are both undefined, so the parent must be a container. + # if not, then the size of both will be calculated later, and + # that means we can't verify they match here. + if not parent_type.is_container: + error_str = "Neither field size defined in non open-ended parent" + elif None in (old_size, new_size): + # only one field is missing its size. this is a problem, as + # we can't verify they match. + error_str = "One field size could not be determined - %s vs %s" + error_args = (old_size, new_size) + elif old_size != new_size: + error_str = "Field sizes dont match - %s vs %s" + error_args = (old_size, new_size) + + return error_str % error_args + + def is_in_dir(path, directory): '''Checks if path is in directory. Respects symlinks.''' try: @@ -214,7 +311,11 @@ def tagpath_to_fullpath( return fullpath # Check if we can find the right file at the end of the chain - files = os.listdir(cur_path) # Get all files in the current dir + try: + files = os.listdir(cur_path) # Get all files in the current dir + except FileNotFoundError: + files = () + for file in files: fullpath = os.path.join(cur_path, file) if file.lower() == tagname and os.path.isfile(fullpath):