diff --git a/app.py b/app.py index 88ff128..4ab69e4 100644 --- a/app.py +++ b/app.py @@ -1,9 +1,8 @@ # -*- coding: utf-8 -*- -# Run this app with `python app.py` and +# Run this app using: `python app.py ` and # visit http://127.0.0.1:8050/ in your web browser. import argparse import copy -import os import shutil import string import typing @@ -15,7 +14,7 @@ from dash import ctx import dash_daq as daq from dash_canvas.DashCanvas import DashCanvas -from dash_canvas.utils.io_utils import image_string_to_PILImage, array_to_data_url +from dash_canvas.utils.io_utils import array_to_data_url from app_callbacks import callbacks from repair_algorithms import * # NOSONAR @@ -23,7 +22,6 @@ from repair_algorithms.FileSpecificRepair import FileSpecificRepair from NOREC4DNA.ConfigWorker import ConfigReadAndExecute -from repair_algorithms.LangaugeToolTextRepair import LangaugeToolTextRepair from semi_automatic_reconstruction_toolkit import SemiAutomaticReconstructionToolkit @@ -36,8 +34,6 @@ def update_point(trace, points, selector): EXTERNAL_STYLESHEETS = ["https://cdn.jsdelivr.net/npm/bulma@0.9.4/css/bulma.min.css"] META_TAGS = [{"name": "viewport", "content": "width=device-width, initial-scale=1"}] -app = DashProxy(__name__, external_stylesheets=EXTERNAL_STYLESHEETS, meta_tags=META_TAGS, - prevent_initial_callbacks=True, transforms=[MultiplexerTransform()]) white_button_style = {'backgroundColor': 'white'} red_button_style = {'backgroundColor': 'red'} green_button_style = {'backgroundColor': 'green'} @@ -60,14 +56,30 @@ def update_point(trace, points, selector): chunk_tag = [] column_tag = [] -input_callback_handler = [Input('repair-button', 'n_clicks'), Input('repair-reorder-button-possible', 'n_clicks'), - Input('repair-id-input-box', 'value'), Input('hex-repair-input', 'value'), - Input('txt-repair-input', 'value'), Input('repair-chunks-button', 'n_clicks'), - Input('analyze-button', 'n_clicks'), Input('repair-exclusion-button', 'n_clicks'), - Input('repair-reorder-button', 'n_clicks'), Input('reset-chunk-tag-button', 'n_clicks'), - Input('calculate-rank-button', 'n_clicks'), Input('save-button', 'n_clicks'), +canvas_list = [] +plugin_manager = PluginManager() +plugin_manager.plugin_instances.clear() +force_load_plugins = [] +all_plugins_childs = [] + +app = DashProxy(__name__, external_stylesheets=EXTERNAL_STYLESHEETS, meta_tags=META_TAGS, + prevent_initial_callbacks=True, transforms=[MultiplexerTransform()]) + +input_callback_handler = [Input('repair-button', 'n_clicks'), + Input('repair-reorder-button-possible', 'n_clicks'), + Input('repair-id-input-box', 'value'), + Input('hex-repair-input', 'value'), + Input('txt-repair-input', 'value'), + Input('repair-chunks-button', 'n_clicks'), + Input('analyze-button', 'n_clicks'), + Input('repair-exclusion-button', 'n_clicks'), + Input('repair-reorder-button', 'n_clicks'), + Input('reset-chunk-tag-button', 'n_clicks'), + Input('calculate-rank-button', 'n_clicks'), + Input('save-button', 'n_clicks'), Input('packet-tag-chunk-invalid-button', 'n_clicks'), - Input('packet-tag-chunk-valid-button', 'n_clicks'), Input('mode-switch', 'value'), + Input('packet-tag-chunk-valid-button', 'n_clicks'), + Input('mode-switch', 'value'), Input('colorblind-switch', 'value'), Input({'type': 'forceload-plugin-button', 'index': ALL}, 'n_clicks'), Input({'type': 'plugin_io_upload-data', 'index': ALL}, 'contents'), @@ -75,12 +87,6 @@ def update_point(trace, points, selector): Input({'type': 'plugin_io_value', 'index': ALL}, 'value'), Input({'type': 'plugin_io_upload-data', "index": ALL}, 'contents')] -canvas_list = [] -plugin_manager = PluginManager() -plugin_manager.plugin_instances.clear() -force_load_plugins = [] -all_plugins_childs = [] - def init_globals(semi_automatic_solver): global chunk_tag, column_tag @@ -94,7 +100,7 @@ def init_globals(semi_automatic_solver): force_load_plugins.append(html.Button(plugin_instance.__class__.__name__, id={"type": "forceload-plugin-button", "index": plugin_instance.__class__.__name__})) if plugin_instance.is_compatible(semi_automatic_solver.predict_file_type()): - plugin_childs = load_plugin(plugin_instance) + plugin_childs = plugin_manager.load_plugin(plugin_instance) if len(plugin_childs) > 0: div = html.Div(id="plugin_" + plugin_instance.__class__.__name__.lower(), className="box", children=plugin_childs) @@ -205,11 +211,6 @@ def update_column_tag(tag): column_tag = tag -def update_single_element_column_tag(key, value): - global column_tag - column_tag[key] = value - - def reset_column_tag(): update_column_tag([0 for _ in range(len(get_column_tag()))]) @@ -233,87 +234,6 @@ def reset_chunk_tag(): update_chunk_tag([0 for _ in range(len(get_chunk_tag()))]) -def load_plugin(plugin_inst): - global input_callback_handler, show_canvas - - plugin_inst.on_load() - - # Get the UI elements from the plugin instance: - ui: typing.Dict[ - str, typing.Dict[str, typing.Union[str, bool, typing.Callable]]] = plugin_inst.get_ui_elements() - # Initialize a list to store the plugin's child elements: - _plugin_childs = [html.H4(f'Plugin: "{plugin_inst.__class__.__name__}"', className="tag")] - - # Iterate over the UI elements and create the corresponding Dash elements: - for key, value in ui.items(): - if value["type"] == "button": - _plugin_childs.append( - html.Button(value["text"], id={'type': 'plugin_io_btn', 'index': key}, className="button")) - if "updates_canvas" in value and value["updates_canvas"]: - show_canvas = True - elif value["type"] == "int": - default_value = 0 if "default" not in value else value["default"] - _plugin_childs.append(html.Div([html.Label(value["text"], className="label"), - html.Div( - [dcc.Input(id={'type': 'plugin_io_value', 'index': key}, type="number", - className="input", value=default_value), ], - className="control")], className="field")) - if "updates_canvas" in value and value["updates_canvas"]: - show_canvas = True - elif value["type"] == "text": - _plugin_childs.append(html.Div([html.Label(value["text"], className="label"), - html.Div( - [dcc.Input(id={'type': 'plugin_io_value', 'index': key}, type="text", - className="input"), ], - className="control")], className="field")) - if "updates_canvas" in value and value["updates_canvas"]: - show_canvas = True - elif value["type"] == "canvas": - show_canvas = True - elif value["type"] == "kaitai_view": - _plugin_childs.append( - html.Button(value["text"], id={'type': 'plugin_io_btn', 'index': key}, className="button")) - elif value["type"] == "upload": - _plugin_childs.append(html.Div([ - dcc.Upload( - id={'type': 'plugin_io_upload-data', 'index': key}, - children=html.Div([ - 'Drag and Drop or ', - html.A('Select Files') - ]), - style={ - 'width': '100%', - 'height': '60px', - 'lineHeight': '60px', - 'borderWidth': '1px', - 'borderStyle': 'dashed', - 'borderRadius': '5px', - 'textAlign': 'center', - 'margin': '10px' - }, - # Don't allow multiple files to be uploaded - multiple=False - ), - html.Div(id={'type': 'output-data-upload', 'index': key}), - ])) - if "updates_canvas" in value and value["updates_canvas"]: - show_canvas = True - elif value["type"] == "download": - _plugin_childs.append(dcc.Download(id={'type': 'plugin_io_download-data', 'index': key})) - _plugin_childs.append( - html.A('Download Data', id={'type': 'plugin_io_download', 'index': key}, className="button") - ) - elif value["type"] == "toggle": - _plugin_childs.append(html.Div([html.Label(value["off_label"]), - daq.ToggleSwitch(id={'type': 'plugin_io_switch', 'index': key}, - label=value["label"], - labelPosition='bottom', className="inline-switch" - ), html.Label(value["on_label"])])) - return _plugin_childs - - -# input_callback_handler.append(Input({'type': 'download', 'index': ALL}, 'n_clicks')) - @app.callback(Output({'type': 'plugin_io_download-data', 'index': MATCH}, "data"), Input({'type': 'plugin_io_download', 'index': MATCH}, "n_clicks"), prevent_initial_call=True, ) @@ -389,15 +309,6 @@ def propagate_gepp_update(): propagete_chunk_tag_update() -def get_language(): - repair_tool = LangaugeToolTextRepair(semi_automatic_solver.decoder.GEPP) - lang = repair_tool.detect_language() - language = lang.lang if lang.confidence > 0.5 else "en" - print(f"Detected language: {language} with confidence {lang.confidence}") - x = repair_tool.find_error_region_by_words(language=language) - return x - - def repair_chunks(repair_id, hex_value): if sum(common_packets) != 1 and not semi_automatic_solver.multi_error_packets_mode: return html.Div("More than one packet still possible!"), dash.no_update, dash.no_update @@ -417,40 +328,6 @@ def repair_chunks(repair_id, hex_value): return recalculate_view() -def repair_and_store_by_packet(chunk_id, packet_id, hex_value, clear_working_dir=False, correctness_function=None): - # this function will be used if we have multiple invalid packets (and corrected chunks) to save multiple version, - # where each saved version used a different possible packet to repair the chunk. - bkp_A = semi_automatic_solver.decoder.GEPP.A.copy() - bkp_b = semi_automatic_solver.decoder.GEPP.b.copy() - semi_automatic_solver.manual_repair(chunk_id, packet_id, hex_value) - working_dir = "multi_file_repair" - if clear_working_dir: - # delete the folder working_dir if it exists: - if Path(working_dir).exists(): - shutil.rmtree(working_dir) - # create the folder working_dir: - Path(working_dir).mkdir(parents=True, exist_ok=True) - # we might have to check if header chunk is used! - semi_automatic_solver.parse_header("I") - if semi_automatic_solver.headerChunk is not None and semi_automatic_solver.headerChunk.checksum_len_format is not None: - is_correct = semi_automatic_solver.is_checksum_correct() - else: - if correctness_function is not None: - is_correct = correctness_function(semi_automatic_solver.decoder.GEPP.b) - else: - is_correct = False - try: - filename = semi_automatic_solver.decoder.saveDecodedFile(return_file_name=True, print_to_output=False) - except ValueError as ve: - filename = ve.args[1] - _file = Path(filename) - stem = ("CORRECT_" if is_correct else "") + _file.stem + f"_{chunk_id}_{packet_id}" - _new_file = _file.rename(Path(working_dir + "/" + stem + _file.suffix)) - semi_automatic_solver.decoder.GEPP.A = bkp_A - semi_automatic_solver.decoder.GEPP.b = bkp_b - return f"{_new_file.name}" - - @app.callback( Output({'type': 'e_row', 'index': MATCH}, 'style'), [Input({'type': 'e_row', 'index': MATCH}, 'n_clicks'), @@ -459,7 +336,7 @@ def repair_and_store_by_packet(chunk_id, packet_id, hex_value, clear_working_dir def change_button_style(n_clicks, n_clicks2): clicked_line = ctx.triggered_id["index"] if get_chunk_tag()[clicked_line] == 3: - # the selected chunk is not decoded yet, return yellow and dont update the state. + # the selected chunk is not decoded yet, return yellow and don't update the state. return yellow_button_style update_single_element_chunk_tag(clicked_line, (get_chunk_tag()[clicked_line] + 1) % 3) if get_chunk_tag()[clicked_line] == 1: @@ -542,13 +419,6 @@ def update_analytics(n): return dash.no_update -def parse_contents(contents): - img = image_string_to_PILImage(contents) - pix = np.array(img) - img_content = array_to_data_url(pix) - return img_content - - @app.callback(Output('analyze-count-output', 'children'), Output('row_view', 'children'), Output("ls-loading-output-2", "children"), @@ -590,7 +460,7 @@ def update_canvas_data(json_data): # Canvas style: Output("canvas", "style"), # Canvas data: - Output("dashCanvas", "image_content"), # Output("dashCanvas", "json_data"), + Output("dashCanvas", "image_content"), Output("kaitai_view", "children"), State("packet-tag-chunk-input", "value"), State("dashCanvas", "json_data"), @@ -618,7 +488,6 @@ def callback_handler(*args, **kwargs): for key, value in ui.items(): if trigger_id == key: res = value["callback"](chunk_tag=get_chunk_tag(), c_ctx=c_ctx, *args, **kwargs) - # res is a dict with (optional) keys: chunk_tag:typing.List[int], update_b:bool update_b = False refresh_view = True for k, res_value in res.items(): @@ -635,17 +504,12 @@ def callback_handler(*args, **kwargs): elif k == "canvas_data": if "updates_canvas" in res and res["updates_canvas"]: # we may want to update all canvas data (the image including ALL tags/drawings) - if "height" in res and "width" in res: canvas_height = res["height"] canvas_width = res["width"] canvas_image_content = array_to_data_url(res_value) elif k == "kaitai_content": - # if res["kaitai_content"] is not None: - # # set the corresponding div ({'type': 'plugin_io_value', 'index': key}) to the kaitai content kaitai_view = res["kaitai_content"] - # else: - # return html.Div() elif k == "info": info_str = res_value @@ -664,8 +528,10 @@ def callback_handler(*args, **kwargs): chunk_id in res_value: # packet _i_ was used to create chunk _chunk_id_, # thus we can back-propagate the repair to the packet: - tmp.append(repair_and_store_by_packet(chunk_id, i, res_value[chunk_id], - len(tmp) == 0)) + tmp.append(semi_automatic_solver.repair_and_store_by_packet(chunk_id, i, + res_value[ + chunk_id], + len(tmp) == 0)) if not generate_all: break res += f"{', '.join(tmp)}]" @@ -687,8 +553,10 @@ def callback_handler(*args, **kwargs): # packet _i_ was used to create chunk _chunk_id_, # thus we can back-propagate the repair to the packet: tmp.append( - repair_and_store_by_packet(invalid_row, packet_to_repair, repaired_content_row, - len(tmp) == 0, correctness_function)) + semi_automatic_solver.repair_and_store_by_packet(invalid_row, packet_to_repair, + repaired_content_row, + len(tmp) == 0, + correctness_function)) if not generate_all and any([x.startswith("CORRECT_") for x in tmp]): break res += f"{', '.join(tmp)}]" @@ -864,13 +732,13 @@ def callback_handler(*args, **kwargs): return (res, dash.no_update, dash.no_update, dash.no_update, dash.no_update, dash.no_update, dash.no_update,) + recalculate_view() + ( dash.no_update, dash.no_update, canvas_image_content, kaitai_view) - elif c_ctx.triggered_id is not None and not isinstance(c_ctx.triggered_id, str) and c_ctx.triggered_id[ - "type"] == "forceload-plugin-button": + elif (c_ctx.triggered_id is not None and not isinstance(c_ctx.triggered_id, str) and + c_ctx.triggered_id["type"] == "forceload-plugin-button"): canvas_style = dash.no_update for _plugin in plugin_manager.plugin_instances: if _plugin.__class__.__name__ == c_ctx.triggered_id["index"]: _div = html.Div(id="plugin_" + _plugin.__class__.__name__.lower(), className="box", - children=load_plugin(_plugin)) + children=plugin_manager.load_plugin(_plugin)) _plugin.on_load() all_plugins_childs.append(_div) canvas_style = {"display": "block"} if show_canvas else {"display": "none"} @@ -945,13 +813,11 @@ def recalculate_view(): return html.Div(poss_packet_str), html.Div(child_view), html.Div("") -""" -""" if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument("ini", metavar="ini", type=str, help="config file (ini)") - args = parser.parse_args() - ini_file = args.ini + parsed_args = parser.parse_args() + ini_file = parsed_args.ini cfg_worker = ConfigReadAndExecute(ini_file) x = cfg_worker.execute(return_decoder=True, skip_solve=True)[0] @@ -962,9 +828,9 @@ def recalculate_view(): CHECKSUM_LEN_FORMAT = None init_globals(semi_automatic_solver) callbacks(app) - app.run(host="0.0.0.0", dev_tools_ui=True, dev_tools_hot_reload=False, debug=False, threaded=True, + app.run(threaded=True, host="0.0.0.0") + """ + # to enable debugging / dev tools: + app.run(host="0.0.0.0", dev_tools_ui=True, dev_tools_hot_reload=True, debug=True, threaded=True, dev_tools_hot_reload_interval=10000, dev_tools_hot_reload_watch_interval=10000) - """, dev_tools_silence_routes_logging=None, - dev_tools_hot_reload=None, dev_tools_hot_reload_interval=None, dev_tools_hot_reload_watch_interval=None, - dev_tools_hot_reload_max_retry=None) # _server(debug=True) """ diff --git a/repair_algorithms/BMPFileRepair.py b/repair_algorithms/BMPFileRepair.py index 3794bdc..cd2bba1 100644 --- a/repair_algorithms/BMPFileRepair.py +++ b/repair_algorithms/BMPFileRepair.py @@ -68,7 +68,6 @@ def parse_bmp(self, *args, **kwargs): dtype=error_pos.dtype) res = Bmp.from_bytes(self.reconstructed_bmp_bytes) - # TODO: check if correct: if res.file_hdr.reserved1 != 0: self.reconstructed_bmp_bytes[6] = 0 self.reconstructed_bmp_bytes[7] = 0 @@ -89,8 +88,6 @@ def parse_bmp(self, *args, **kwargs): [a ^ b for a, b in zip(self.bmp_bytes[10:14], self.reconstructed_bmp_bytes[10:14])], dtype=error_pos.dtype) res = Bmp.from_bytes(self.reconstructed_bmp_bytes) - # TODO: depends: right now this is only correct for 16bit - # check up res.dib_info.header.bits_per_pixel! mask_mask = res.dib_info.color_mask_red ^ res.dib_info.color_mask_blue ^ res.dib_info.color_mask_alpha ^ res.dib_info.color_mask_green if mask_mask != 2 ** res.dib_info.header.bits_per_pixel - 1 or ( res.dib_info.header.bits_per_pixel == 32 and mask_mask | 0b11100000000000000000000000000000 != 2 ** res.dib_info.header.bits_per_pixel - 1): @@ -147,7 +144,6 @@ def repair(self, *args, **kwargs): "refresh_view": True, "chunk_tag": self.chunk_tag} def reload_image(self, *args, **kwargs): - # todo: load canvas_json from args + chunk_tag to find the invalid packet self.parser_error_matrix = None self.no_inspect_chunks = self.gepp.b.shape[0] if self.reconstructed_bmp_bytes is not None: @@ -165,7 +161,6 @@ def reload_image(self, *args, **kwargs): def is_compatible(self, meta_info, *args, **kwargs): # parse magic info string: - # TODO: add check for filename / extension return meta_info == "Bitmap" or "PC bitmap" in meta_info def set_image_width(self, width, *args, **kwargs): @@ -402,7 +397,7 @@ def update_canvas(self, canvas_json, *args, **kwargs): self.error_matrix = self.error_matrix.reshape(-1, self.gepp.b.shape[1]) res = self.find_errors_tags() res["updates_canvas"] = True - res["image_content"] = None # todo: add line to each chunk that is incorrect? + res["image_content"] = None return {"updates_canvas": True, "image_content": None} diff --git a/repair_algorithms/LangaugeToolTextRepair.py b/repair_algorithms/LangaugeToolTextRepair.py index 02354cd..2d634b5 100644 --- a/repair_algorithms/LangaugeToolTextRepair.py +++ b/repair_algorithms/LangaugeToolTextRepair.py @@ -68,7 +68,7 @@ def find_error_region_by_words(self, language=None, *args, **kwargs): # WARNING: this method is rather slow but will yield better results than the character based method # IF the words stored in the file are in the used dictionary - # TODO change language according to "language" parameter: + # change language according to "language" parameter: if self.tool is None or self.tool.language.normalized_tag != lang_to_LanguageTool.get(language, "en-US"): self.tool = language_tool_python.LanguageTool(lang_to_LanguageTool.get(language, "en-US")) # , config={'disabledRuleIds': "DROP_DOWN,SOME_OF_THE,THE_SUPERLATIVE,UPPERCASE_SENTENCE_START,DOPPELPUNKT_GROSS,KOMMA_ZWISCHEN_HAUPT_UND_NEBENSATZ_2,VIELZAHL_PLUS_SINGULAR,EMPFOHLENE_ZUSAMMENSCHREIBUNG,SEMIKOLON_VOR_ANFUEHRUNGSZEICHEN,DURCHEINANDER"}) @@ -91,7 +91,7 @@ def find_error_region_by_words(self, language=None, *args, **kwargs): matches = self.tool.check(blob) for matching_rule in matches: offset = matching_rule.offset - # TODO: extend error_length to include non-printable characters behind the error, + # TODO: we might want to extend error_length to include non-printable characters behind the error, # then iterate over range(error_length, error_length + len(offset)) to find the correct word error_length = matching_rule.errorLength if matching_rule.category == "TYPOS" or matching_rule.ruleIssueType == "misspelling": diff --git a/repair_algorithms/PluginManager.py b/repair_algorithms/PluginManager.py index ea4420c..a96465c 100644 --- a/repair_algorithms/PluginManager.py +++ b/repair_algorithms/PluginManager.py @@ -1,6 +1,7 @@ import typing - from singleton_decorator.decorator import singleton +from dash_extensions.enrich import html, dcc +import dash_daq as daq from repair_algorithms.FileSpecificRepair import FileSpecificRepair @@ -20,3 +21,83 @@ def get_plugins(self): def get_plugin_instances(self): return self.plugin_instances + + def load_plugin(self, plugin_inst): + global input_callback_handler, show_canvas + + plugin_inst.on_load() + + # Get the UI elements from the plugin instance: + ui: typing.Dict[ + str, typing.Dict[str, typing.Union[str, bool, typing.Callable]]] = plugin_inst.get_ui_elements() + # Initialize a list to store the plugin's child elements: + _plugin_childs = [html.H4(f'Plugin: "{plugin_inst.__class__.__name__}"', className="tag")] + + # Iterate over the UI elements and create the corresponding Dash elements: + for key, value in ui.items(): + if value["type"] == "button": + _plugin_childs.append( + html.Button(value["text"], id={'type': 'plugin_io_btn', 'index': key}, className="button")) + if "updates_canvas" in value and value["updates_canvas"]: + show_canvas = True + elif value["type"] == "int": + default_value = 0 if "default" not in value else value["default"] + _plugin_childs.append(html.Div([html.Label(value["text"], className="label"), + html.Div( + [dcc.Input(id={'type': 'plugin_io_value', 'index': key}, + type="number", + className="input", value=default_value), ], + className="control")], className="field")) + if "updates_canvas" in value and value["updates_canvas"]: + show_canvas = True + elif value["type"] == "text": + _plugin_childs.append(html.Div([html.Label(value["text"], className="label"), + html.Div( + [dcc.Input(id={'type': 'plugin_io_value', 'index': key}, + type="text", + className="input"), ], + className="control")], className="field")) + if "updates_canvas" in value and value["updates_canvas"]: + show_canvas = True + elif value["type"] == "canvas": + show_canvas = True + elif value["type"] == "kaitai_view": + _plugin_childs.append( + html.Button(value["text"], id={'type': 'plugin_io_btn', 'index': key}, className="button")) + elif value["type"] == "upload": + _plugin_childs.append(html.Div([ + dcc.Upload( + id={'type': 'plugin_io_upload-data', 'index': key}, + children=html.Div([ + 'Drag and Drop or ', + html.A('Select Files') + ]), + style={ + 'width': '100%', + 'height': '60px', + 'lineHeight': '60px', + 'borderWidth': '1px', + 'borderStyle': 'dashed', + 'borderRadius': '5px', + 'textAlign': 'center', + 'margin': '10px' + }, + # Don't allow multiple files to be uploaded + multiple=False + ), + html.Div(id={'type': 'output-data-upload', 'index': key}), + ])) + if "updates_canvas" in value and value["updates_canvas"]: + show_canvas = True + elif value["type"] == "download": + _plugin_childs.append(dcc.Download(id={'type': 'plugin_io_download-data', 'index': key})) + _plugin_childs.append( + html.A('Download Data', id={'type': 'plugin_io_download', 'index': key}, className="button") + ) + elif value["type"] == "toggle": + _plugin_childs.append(html.Div([html.Label(value["off_label"]), + daq.ToggleSwitch(id={'type': 'plugin_io_switch', 'index': key}, + label=value["label"], + labelPosition='bottom', className="inline-switch" + ), html.Label(value["on_label"])])) + return _plugin_childs diff --git a/repair_algorithms/RandomShuffleRepair.py b/repair_algorithms/RandomShuffleRepair.py index 8b95143..93e1e78 100644 --- a/repair_algorithms/RandomShuffleRepair.py +++ b/repair_algorithms/RandomShuffleRepair.py @@ -344,7 +344,7 @@ def update_num_shuffle(self, *args, **kwargs): num_shuffle = kwargs["c_ctx"].triggered[0]["value"] # we could check if kwargs["c_ctx"].triggered[X] has a prop_io equal to the textbox's id if num_shuffle is None or num_shuffle < 1: - self.num_shuffles = self.gepp.b[0] + self.num_shuffles = len(self.gepp.b[0]) else: self.num_shuffles = num_shuffle diff --git a/repair_algorithms/UploadRepair.py b/repair_algorithms/UploadRepair.py index 4501e39..20c02a5 100644 --- a/repair_algorithms/UploadRepair.py +++ b/repair_algorithms/UploadRepair.py @@ -21,6 +21,7 @@ class UploadRepair(FileSpecificRepair): # TODO: we might want to create and save __all__ possible results for a modified chunk + # (however, this can already be done using partial repair / multi file) # example: we change a byte in a (or multiple) chunk(s) and we want to decode assuming the error happening in all possible packets. # to further limit the number of packets we might aswell use the chunktags to pinpoint the corrupt packet! def __init__(self, *args, **kwargs): diff --git a/repair_algorithms/ZipFileRepair.py b/repair_algorithms/ZipFileRepair.py index c73e2a5..4141400 100644 --- a/repair_algorithms/ZipFileRepair.py +++ b/repair_algorithms/ZipFileRepair.py @@ -141,7 +141,7 @@ def parse_zipfile(self, iterations=50): def is_compatible(self, meta_info): # parse magic info string: - return "zip" in meta_info.lower() # TODO check... + return "zip" in meta_info.lower() def repair(self, *args, **kwargs): if self.zip_structure is None or self.parser_error_matrix is None: @@ -309,7 +309,7 @@ def update_error_pos(_start, _end, new_error_pos=None, corrected_bytes=None, ove # general purpose bit flag: if section.body.header.flags.reserved_1 != 0 or section.body.header.flags.reserved_2 != 0 or \ section.body.header.flags.reserved_3 != 0 or section.body.header.flags.reserved_4 != 0: - # todo check if offset is correct for all reserved fields + # we might need to check if offset is correct for all reserved fields error_counter += update_error_pos(section.body.header.start + 2 + 1, section.body.header.start + 2 + 2, [1]) # if section.body.header.general_purpose_bit_flag & 0b00000011101011 == 0: @@ -318,6 +318,7 @@ def update_error_pos(_start, _end, new_error_pos=None, corrected_bytes=None, ove # error_counter += update_error_pos(section.body.header.start + 2, section.body.header.start + 4, [1] * 4) # TODO: make some basic sanity check that these numbers are not too large: + # these are currently inactive as we have to make sure that all zip implementations correctly use these: # file last modification time: # error_pos[section.header.start + 10:section.header.start + 10 + 2] = [0] * 2 # file last modification date: @@ -435,7 +436,7 @@ def update_error_pos(_start, _end, new_error_pos=None, corrected_bytes=None, ove else: error_counter += update_error_pos(section.body.start + 16, section.body.start + 16 + 2, [0] * 2) if error_counter > 4: - # TODO find best magic number... + # TODO: we might want to find best (closest) magic number... error_pos = error_pos_bkp self.reconstructed_zip_bytes = reconstructed_zip_bytes_bkp if start: @@ -467,7 +468,6 @@ def update_error_pos(_start, _end, new_error_pos=None, corrected_bytes=None, ove return np.array(error_pos).reshape(-1, self.gepp.b.shape[1]) def compare_sections(self, error_pos, sections): - # TODO. this is not DRY: def update_error_pos(_start, _end, new_error_pos=None, corrected_bytes=None, overwrite=False): offset = start * self.gepp.b.shape[1] _parser_error_pos = self.parser_error_matrix[offset + _start: offset + _end] @@ -666,7 +666,6 @@ def update_error_pos(_start, _end, new_error_pos=None, corrected_bytes=None, ove # error_counter += update_error_pos(central_dir_section.body.start + 34, # central_dir_section.body.start + 34 + 2, [1] * 2) # file comment len: - # TODO weiter machen mit richtigem offset next_signature = self.get_raw_bytes( central_dir_section.body.start + 42 + central_dir_section.body.len_file_name + central_dir_section.body.len_extra + central_dir_section.body.len_comment, 2) @@ -689,7 +688,7 @@ def update_error_pos(_start, _end, new_error_pos=None, corrected_bytes=None, ove central_dir_section.body.start + 36 + 2, [1] * 2) # internal file attributes: - # TODO: check bit 1 and 3-16 (reserved/unused!) + # TODO: test if we can check bit 1 and 3-16 (reserved/unused!) # external file attributes: # NO way to check... # error_counter += update_error_pos(section.body.header.start + 18, section.body.header.start + 18 + 4, [0] * 4) @@ -700,7 +699,7 @@ def update_error_pos(_start, _end, new_error_pos=None, corrected_bytes=None, ove error_counter += update_error_pos(central_dir_section.start + 42, central_dir_section.start + 42 + 4, [0] * 4) - # TODO: we have to go trough all local file headers and invalidate the one BEFORE the reference... + # TODO: go trough all local file headers and invalidate the one BEFORE the reference... else: # either the reference is wrong or this was no real central directory entry! error_counter += update_error_pos(central_dir_section.start + 42, @@ -718,6 +717,7 @@ def update_error_pos(_start, _end, new_error_pos=None, corrected_bytes=None, ove # TODO we might be able to match it with an unmatched central directory entry by comparing # other entries such as the filename # alternatively we choose the central directory entry with the smallest edit distance + # This might further increase the recovery chance but may increase the complexity. pass return error_pos @@ -918,7 +918,7 @@ def sweep_zip_header(self, error_pos=None): flat_signature_positions = [item for sublist in flat_signature_positions for item in sublist] sections = [] for start_offset in flat_signature_positions: - # todo: create a copy of error_pos for each section canididate and merge them at the end (only if the section was "valid") + # create a copy of error_pos for each section canididate and merge them at the end (only if the section was "valid") # make sure the sections are not overlapping and if they are, choose the one that produces the least errors error_pos_bkp = error_pos.copy() diff --git a/repair_algorithms/zip.py b/repair_algorithms/zip.py index a42193b..1d7dc4a 100644 --- a/repair_algorithms/zip.py +++ b/repair_algorithms/zip.py @@ -371,7 +371,7 @@ def _read(self, start_offset): position_of_signature = Zip.find_signtures(self._raw_file_name) if len(self._raw_file_name) != self.len_file_name or position_of_signature is not None: # filename-length should be: - # filename_length = filename_length - position_of_signature - len_extra - len_comment (TODO: check if "-1" ???) + # filename_length = filename_length - position_of_signature - len_extra - len_comment expected_val = self.len_file_name - max(0, self.len_file_name - position_of_signature - self.len_extra - self.len_comment) raise InvalidDataException("/types/filename/invalid", len_filename_io_pos, expected=expected_val) @@ -490,7 +490,7 @@ def _read(self, start_offset=0): self.file_mod_time = dos_datetime.DosDatetime(_io__raw_file_mod_time, self, self._root) self.crc32 = self._io.read_u4le() self.len_body_compressed = self._io.read_u4le() - # TODO we should check if len_body_compress is unreasonalbe (e.g. if there is a valid signature in it) + # TODO we may want to check if len_body_compress is unreasonable (e.g. if there is a valid signature in it) self.len_body_uncompressed = self._io.read_u4le() len_filename_io_pos = self._io.pos() self.len_file_name = self._io.read_u2le() diff --git a/semi_automatic_reconstruction_toolkit.py b/semi_automatic_reconstruction_toolkit.py index 39fc170..959bc24 100644 --- a/semi_automatic_reconstruction_toolkit.py +++ b/semi_automatic_reconstruction_toolkit.py @@ -355,6 +355,41 @@ def solve_lin_dep(a, b): return None + + def repair_and_store_by_packet(self, chunk_id, packet_id, hex_value, clear_working_dir=False, correctness_function=None): + # this function will be used if we have multiple invalid packets (and corrected chunks) to save multiple version, + # where each saved version used a different possible packet to repair the chunk. + bkp_A = self.decoder.GEPP.A.copy() + bkp_b = self.decoder.GEPP.b.copy() + self.manual_repair(chunk_id, packet_id, hex_value) + working_dir = "multi_file_repair" + if clear_working_dir: + # delete the folder working_dir if it exists: + if Path(working_dir).exists(): + shutil.rmtree(working_dir) + # create the folder working_dir: + Path(working_dir).mkdir(parents=True, exist_ok=True) + # we might have to check if header chunk is used! + self.parse_header("I") + if self.headerChunk is not None and self.headerChunk.checksum_len_format is not None: + is_correct = self.is_checksum_correct() + else: + if correctness_function is not None: + is_correct = correctness_function(self.decoder.GEPP.b) + else: + is_correct = False + try: + filename = self.decoder.saveDecodedFile(return_file_name=True, print_to_output=False) + except ValueError as ve: + filename = ve.args[1] + _file = Path(filename) + stem = ("CORRECT_" if is_correct else "") + _file.stem + f"_{chunk_id}_{packet_id}" + _new_file = _file.rename(Path(working_dir + "/" + stem + _file.suffix)) + self.decoder.GEPP.A = bkp_A + self.decoder.GEPP.b = bkp_b + return f"{_new_file.name}" + + if __name__ == "__main__": x = ConfigReadAndExecute("NOREC4DNA/logo.jpg_Fri_Jan__7_13_18_39_2022.ini").execute(return_decoder=True)[0] semi_automatic_solver = SemiAutomaticReconstructionToolkit(x)