Skip to content

Commit

Permalink
code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
thejanky committed Jan 24, 2024
1 parent 2e0b6b7 commit 693d12e
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 199 deletions.
224 changes: 45 additions & 179 deletions app.py

Large diffs are not rendered by default.

7 changes: 1 addition & 6 deletions repair_algorithms/BMPFileRepair.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def parse_bmp(self, *args, **kwargs):
dtype=error_pos.dtype)
res = Bmp.from_bytes(self.reconstructed_bmp_bytes)

# TODO: check if correct:
if res.file_hdr.reserved1 != 0:
self.reconstructed_bmp_bytes[6] = 0
self.reconstructed_bmp_bytes[7] = 0
Expand All @@ -89,8 +88,6 @@ def parse_bmp(self, *args, **kwargs):
[a ^ b for a, b in zip(self.bmp_bytes[10:14], self.reconstructed_bmp_bytes[10:14])],
dtype=error_pos.dtype)
res = Bmp.from_bytes(self.reconstructed_bmp_bytes)
# TODO: depends: right now this is only correct for 16bit
# check up res.dib_info.header.bits_per_pixel!
mask_mask = res.dib_info.color_mask_red ^ res.dib_info.color_mask_blue ^ res.dib_info.color_mask_alpha ^ res.dib_info.color_mask_green
if mask_mask != 2 ** res.dib_info.header.bits_per_pixel - 1 or (
res.dib_info.header.bits_per_pixel == 32 and mask_mask | 0b11100000000000000000000000000000 != 2 ** res.dib_info.header.bits_per_pixel - 1):
Expand Down Expand Up @@ -147,7 +144,6 @@ def repair(self, *args, **kwargs):
"refresh_view": True, "chunk_tag": self.chunk_tag}

def reload_image(self, *args, **kwargs):
# todo: load canvas_json from args + chunk_tag to find the invalid packet
self.parser_error_matrix = None
self.no_inspect_chunks = self.gepp.b.shape[0]
if self.reconstructed_bmp_bytes is not None:
Expand All @@ -165,7 +161,6 @@ def reload_image(self, *args, **kwargs):

def is_compatible(self, meta_info, *args, **kwargs):
# parse magic info string:
# TODO: add check for filename / extension
return meta_info == "Bitmap" or "PC bitmap" in meta_info

def set_image_width(self, width, *args, **kwargs):
Expand Down Expand Up @@ -402,7 +397,7 @@ def update_canvas(self, canvas_json, *args, **kwargs):
self.error_matrix = self.error_matrix.reshape(-1, self.gepp.b.shape[1])
res = self.find_errors_tags()
res["updates_canvas"] = True
res["image_content"] = None # todo: add line to each chunk that is incorrect?
res["image_content"] = None
return {"updates_canvas": True, "image_content": None}


Expand Down
4 changes: 2 additions & 2 deletions repair_algorithms/LangaugeToolTextRepair.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def find_error_region_by_words(self, language=None, *args, **kwargs):

# WARNING: this method is rather slow but will yield better results than the character based method
# IF the words stored in the file are in the used dictionary
# TODO change language according to "language" parameter:
# change language according to "language" parameter:
if self.tool is None or self.tool.language.normalized_tag != lang_to_LanguageTool.get(language, "en-US"):
self.tool = language_tool_python.LanguageTool(lang_to_LanguageTool.get(language, "en-US"))
# , config={'disabledRuleIds': "DROP_DOWN,SOME_OF_THE,THE_SUPERLATIVE,UPPERCASE_SENTENCE_START,DOPPELPUNKT_GROSS,KOMMA_ZWISCHEN_HAUPT_UND_NEBENSATZ_2,VIELZAHL_PLUS_SINGULAR,EMPFOHLENE_ZUSAMMENSCHREIBUNG,SEMIKOLON_VOR_ANFUEHRUNGSZEICHEN,DURCHEINANDER"})
Expand All @@ -91,7 +91,7 @@ def find_error_region_by_words(self, language=None, *args, **kwargs):
matches = self.tool.check(blob)
for matching_rule in matches:
offset = matching_rule.offset
# TODO: extend error_length to include non-printable characters behind the error,
# TODO: we might want to extend error_length to include non-printable characters behind the error,
# then iterate over range(error_length, error_length + len(offset)) to find the correct word
error_length = matching_rule.errorLength
if matching_rule.category == "TYPOS" or matching_rule.ruleIssueType == "misspelling":
Expand Down
83 changes: 82 additions & 1 deletion repair_algorithms/PluginManager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import typing

from singleton_decorator.decorator import singleton
from dash_extensions.enrich import html, dcc
import dash_daq as daq

from repair_algorithms.FileSpecificRepair import FileSpecificRepair

Expand All @@ -20,3 +21,83 @@ def get_plugins(self):

def get_plugin_instances(self):
return self.plugin_instances

def load_plugin(self, plugin_inst):
global input_callback_handler, show_canvas

plugin_inst.on_load()

# Get the UI elements from the plugin instance:
ui: typing.Dict[
str, typing.Dict[str, typing.Union[str, bool, typing.Callable]]] = plugin_inst.get_ui_elements()
# Initialize a list to store the plugin's child elements:
_plugin_childs = [html.H4(f'Plugin: "{plugin_inst.__class__.__name__}"', className="tag")]

# Iterate over the UI elements and create the corresponding Dash elements:
for key, value in ui.items():
if value["type"] == "button":
_plugin_childs.append(
html.Button(value["text"], id={'type': 'plugin_io_btn', 'index': key}, className="button"))
if "updates_canvas" in value and value["updates_canvas"]:
show_canvas = True
elif value["type"] == "int":
default_value = 0 if "default" not in value else value["default"]
_plugin_childs.append(html.Div([html.Label(value["text"], className="label"),
html.Div(
[dcc.Input(id={'type': 'plugin_io_value', 'index': key},
type="number",
className="input", value=default_value), ],
className="control")], className="field"))
if "updates_canvas" in value and value["updates_canvas"]:
show_canvas = True
elif value["type"] == "text":
_plugin_childs.append(html.Div([html.Label(value["text"], className="label"),
html.Div(
[dcc.Input(id={'type': 'plugin_io_value', 'index': key},
type="text",
className="input"), ],
className="control")], className="field"))
if "updates_canvas" in value and value["updates_canvas"]:
show_canvas = True
elif value["type"] == "canvas":
show_canvas = True
elif value["type"] == "kaitai_view":
_plugin_childs.append(
html.Button(value["text"], id={'type': 'plugin_io_btn', 'index': key}, className="button"))
elif value["type"] == "upload":
_plugin_childs.append(html.Div([
dcc.Upload(
id={'type': 'plugin_io_upload-data', 'index': key},
children=html.Div([
'Drag and Drop or ',
html.A('Select Files')
]),
style={
'width': '100%',
'height': '60px',
'lineHeight': '60px',
'borderWidth': '1px',
'borderStyle': 'dashed',
'borderRadius': '5px',
'textAlign': 'center',
'margin': '10px'
},
# Don't allow multiple files to be uploaded
multiple=False
),
html.Div(id={'type': 'output-data-upload', 'index': key}),
]))
if "updates_canvas" in value and value["updates_canvas"]:
show_canvas = True
elif value["type"] == "download":
_plugin_childs.append(dcc.Download(id={'type': 'plugin_io_download-data', 'index': key}))
_plugin_childs.append(
html.A('Download Data', id={'type': 'plugin_io_download', 'index': key}, className="button")
)
elif value["type"] == "toggle":
_plugin_childs.append(html.Div([html.Label(value["off_label"]),
daq.ToggleSwitch(id={'type': 'plugin_io_switch', 'index': key},
label=value["label"],
labelPosition='bottom', className="inline-switch"
), html.Label(value["on_label"])]))
return _plugin_childs
2 changes: 1 addition & 1 deletion repair_algorithms/RandomShuffleRepair.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ def update_num_shuffle(self, *args, **kwargs):
num_shuffle = kwargs["c_ctx"].triggered[0]["value"]
# we could check if kwargs["c_ctx"].triggered[X] has a prop_io equal to the textbox's id
if num_shuffle is None or num_shuffle < 1:
self.num_shuffles = self.gepp.b[0]
self.num_shuffles = len(self.gepp.b[0])
else:
self.num_shuffles = num_shuffle

Expand Down
1 change: 1 addition & 0 deletions repair_algorithms/UploadRepair.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

class UploadRepair(FileSpecificRepair):
# TODO: we might want to create and save __all__ possible results for a modified chunk
# (however, this can already be done using partial repair / multi file)
# example: we change a byte in a (or multiple) chunk(s) and we want to decode assuming the error happening in all possible packets.
# to further limit the number of packets we might aswell use the chunktags to pinpoint the corrupt packet!
def __init__(self, *args, **kwargs):
Expand Down
16 changes: 8 additions & 8 deletions repair_algorithms/ZipFileRepair.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def parse_zipfile(self, iterations=50):

def is_compatible(self, meta_info):
# parse magic info string:
return "zip" in meta_info.lower() # TODO check...
return "zip" in meta_info.lower()

def repair(self, *args, **kwargs):
if self.zip_structure is None or self.parser_error_matrix is None:
Expand Down Expand Up @@ -309,7 +309,7 @@ def update_error_pos(_start, _end, new_error_pos=None, corrected_bytes=None, ove
# general purpose bit flag:
if section.body.header.flags.reserved_1 != 0 or section.body.header.flags.reserved_2 != 0 or \
section.body.header.flags.reserved_3 != 0 or section.body.header.flags.reserved_4 != 0:
# todo check if offset is correct for all reserved fields
# we might need to check if offset is correct for all reserved fields
error_counter += update_error_pos(section.body.header.start + 2 + 1,
section.body.header.start + 2 + 2, [1])
# if section.body.header.general_purpose_bit_flag & 0b00000011101011 == 0:
Expand All @@ -318,6 +318,7 @@ def update_error_pos(_start, _end, new_error_pos=None, corrected_bytes=None, ove
# error_counter += update_error_pos(section.body.header.start + 2, section.body.header.start + 4, [1] * 4)

# TODO: make some basic sanity check that these numbers are not too large:
# these are currently inactive as we have to make sure that all zip implementations correctly use these:
# file last modification time:
# error_pos[section.header.start + 10:section.header.start + 10 + 2] = [0] * 2
# file last modification date:
Expand Down Expand Up @@ -435,7 +436,7 @@ def update_error_pos(_start, _end, new_error_pos=None, corrected_bytes=None, ove
else:
error_counter += update_error_pos(section.body.start + 16, section.body.start + 16 + 2, [0] * 2)
if error_counter > 4:
# TODO find best magic number...
# TODO: we might want to find best (closest) magic number...
error_pos = error_pos_bkp
self.reconstructed_zip_bytes = reconstructed_zip_bytes_bkp
if start:
Expand Down Expand Up @@ -467,7 +468,6 @@ def update_error_pos(_start, _end, new_error_pos=None, corrected_bytes=None, ove
return np.array(error_pos).reshape(-1, self.gepp.b.shape[1])

def compare_sections(self, error_pos, sections):
# TODO. this is not DRY:
def update_error_pos(_start, _end, new_error_pos=None, corrected_bytes=None, overwrite=False):
offset = start * self.gepp.b.shape[1]
_parser_error_pos = self.parser_error_matrix[offset + _start: offset + _end]
Expand Down Expand Up @@ -666,7 +666,6 @@ def update_error_pos(_start, _end, new_error_pos=None, corrected_bytes=None, ove
# error_counter += update_error_pos(central_dir_section.body.start + 34,
# central_dir_section.body.start + 34 + 2, [1] * 2)
# file comment len:
# TODO weiter machen mit richtigem offset
next_signature = self.get_raw_bytes(
central_dir_section.body.start + 42 + central_dir_section.body.len_file_name
+ central_dir_section.body.len_extra + central_dir_section.body.len_comment, 2)
Expand All @@ -689,7 +688,7 @@ def update_error_pos(_start, _end, new_error_pos=None, corrected_bytes=None, ove
central_dir_section.body.start + 36 + 2, [1] * 2)

# internal file attributes:
# TODO: check bit 1 and 3-16 (reserved/unused!)
# TODO: test if we can check bit 1 and 3-16 (reserved/unused!)
# external file attributes:
# NO way to check...
# error_counter += update_error_pos(section.body.header.start + 18, section.body.header.start + 18 + 4, [0] * 4)
Expand All @@ -700,7 +699,7 @@ def update_error_pos(_start, _end, new_error_pos=None, corrected_bytes=None, ove
error_counter += update_error_pos(central_dir_section.start + 42,
central_dir_section.start + 42 + 4,
[0] * 4)
# TODO: we have to go trough all local file headers and invalidate the one BEFORE the reference...
# TODO: go trough all local file headers and invalidate the one BEFORE the reference...
else:
# either the reference is wrong or this was no real central directory entry!
error_counter += update_error_pos(central_dir_section.start + 42,
Expand All @@ -718,6 +717,7 @@ def update_error_pos(_start, _end, new_error_pos=None, corrected_bytes=None, ove
# TODO we might be able to match it with an unmatched central directory entry by comparing
# other entries such as the filename
# alternatively we choose the central directory entry with the smallest edit distance
# This might further increase the recovery chance but may increase the complexity.
pass
return error_pos

Expand Down Expand Up @@ -918,7 +918,7 @@ def sweep_zip_header(self, error_pos=None):
flat_signature_positions = [item for sublist in flat_signature_positions for item in sublist]
sections = []
for start_offset in flat_signature_positions:
# todo: create a copy of error_pos for each section canididate and merge them at the end (only if the section was "valid")
# create a copy of error_pos for each section canididate and merge them at the end (only if the section was "valid")
# make sure the sections are not overlapping and if they are, choose the one that produces the least errors

error_pos_bkp = error_pos.copy()
Expand Down
4 changes: 2 additions & 2 deletions repair_algorithms/zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def _read(self, start_offset):
position_of_signature = Zip.find_signtures(self._raw_file_name)
if len(self._raw_file_name) != self.len_file_name or position_of_signature is not None:
# filename-length should be:
# filename_length = filename_length - position_of_signature - len_extra - len_comment (TODO: check if "-1" ???)
# filename_length = filename_length - position_of_signature - len_extra - len_comment
expected_val = self.len_file_name - max(0,
self.len_file_name - position_of_signature - self.len_extra - self.len_comment)
raise InvalidDataException("/types/filename/invalid", len_filename_io_pos, expected=expected_val)
Expand Down Expand Up @@ -490,7 +490,7 @@ def _read(self, start_offset=0):
self.file_mod_time = dos_datetime.DosDatetime(_io__raw_file_mod_time, self, self._root)
self.crc32 = self._io.read_u4le()
self.len_body_compressed = self._io.read_u4le()
# TODO we should check if len_body_compress is unreasonalbe (e.g. if there is a valid signature in it)
# TODO we may want to check if len_body_compress is unreasonable (e.g. if there is a valid signature in it)
self.len_body_uncompressed = self._io.read_u4le()
len_filename_io_pos = self._io.pos()
self.len_file_name = self._io.read_u2le()
Expand Down
35 changes: 35 additions & 0 deletions semi_automatic_reconstruction_toolkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,41 @@ def solve_lin_dep(a, b):
return None



def repair_and_store_by_packet(self, chunk_id, packet_id, hex_value, clear_working_dir=False, correctness_function=None):
# this function will be used if we have multiple invalid packets (and corrected chunks) to save multiple version,
# where each saved version used a different possible packet to repair the chunk.
bkp_A = self.decoder.GEPP.A.copy()
bkp_b = self.decoder.GEPP.b.copy()
self.manual_repair(chunk_id, packet_id, hex_value)
working_dir = "multi_file_repair"
if clear_working_dir:
# delete the folder working_dir if it exists:
if Path(working_dir).exists():
shutil.rmtree(working_dir)
# create the folder working_dir:
Path(working_dir).mkdir(parents=True, exist_ok=True)
# we might have to check if header chunk is used!
self.parse_header("I")
if self.headerChunk is not None and self.headerChunk.checksum_len_format is not None:
is_correct = self.is_checksum_correct()
else:
if correctness_function is not None:
is_correct = correctness_function(self.decoder.GEPP.b)
else:
is_correct = False
try:
filename = self.decoder.saveDecodedFile(return_file_name=True, print_to_output=False)
except ValueError as ve:
filename = ve.args[1]
_file = Path(filename)
stem = ("CORRECT_" if is_correct else "") + _file.stem + f"_{chunk_id}_{packet_id}"
_new_file = _file.rename(Path(working_dir + "/" + stem + _file.suffix))
self.decoder.GEPP.A = bkp_A
self.decoder.GEPP.b = bkp_b
return f"{_new_file.name}"


if __name__ == "__main__":
x = ConfigReadAndExecute("NOREC4DNA/logo.jpg_Fri_Jan__7_13_18_39_2022.ini").execute(return_decoder=True)[0]
semi_automatic_solver = SemiAutomaticReconstructionToolkit(x)
Expand Down

0 comments on commit 693d12e

Please sign in to comment.