From 879e4e5ea61229c79b4d297a917e3930abf65609 Mon Sep 17 00:00:00 2001 From: Steinthor Palsson Date: Wed, 29 May 2024 17:40:11 -0400 Subject: [PATCH] Fix filesystem + test --- dlt/destinations/impl/filesystem/filesystem.py | 16 ++++++++++------ tests/load/pipeline/test_filesystem_pipeline.py | 5 +++-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 1efbcbc9f3..d75226be13 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -176,8 +176,9 @@ def drop_tables(self, *tables: str, delete_schema: bool = True) -> None: if not delete_schema: return # Delete all stored schemas - for filename, _ in self._iter_stored_schema_files(): - self._delete_file(filename) + for filename, fileparts in self._iter_stored_schema_files(): + if fileparts[0] == self.schema.name: + self._delete_file(filename) def truncate_tables(self, table_names: List[str]) -> None: """Truncate table with given name""" @@ -414,10 +415,9 @@ def _get_schema_file_name(self, version_hash: str, load_id: str) -> str: ) def _iter_stored_schema_files(self) -> Iterator[Tuple[str, List[str]]]: - """Iterator over all schema files matching the current schema name""" + """Iterator over all stored schema files""" for filepath, fileparts in self._list_dlt_table_files(self.schema.version_table_name): - if fileparts[0] == self.schema.name: - yield filepath, fileparts + yield filepath, fileparts def _get_stored_schema_by_hash_or_newest( self, version_hash: str = None @@ -428,7 +428,11 @@ def _get_stored_schema_by_hash_or_newest( selected_path = None newest_load_id = "0" for filepath, fileparts in self._iter_stored_schema_files(): - if not version_hash and fileparts[1] > newest_load_id: + if ( + not version_hash + and fileparts[0] == self.schema.name + and fileparts[1] > newest_load_id + ): newest_load_id = fileparts[1] selected_path = filepath elif fileparts[2] == version_hash: diff --git a/tests/load/pipeline/test_filesystem_pipeline.py b/tests/load/pipeline/test_filesystem_pipeline.py index 5f24daf57f..623284d8a7 100644 --- a/tests/load/pipeline/test_filesystem_pipeline.py +++ b/tests/load/pipeline/test_filesystem_pipeline.py @@ -362,8 +362,9 @@ def _collect_table_counts(p) -> Dict[str, int]: ) # generate 4 loads from 2 pipelines, store load ids - p1 = destination_config.setup_pipeline("p1", dataset_name="layout_test") - p2 = destination_config.setup_pipeline("p2", dataset_name="layout_test") + dataset_name = "layout_test_" + uniq_id() + p1 = destination_config.setup_pipeline("p1", dataset_name=dataset_name) + p2 = destination_config.setup_pipeline("p2", dataset_name=dataset_name) c1 = cast(FilesystemClient, p1.destination_client()) c2 = cast(FilesystemClient, p2.destination_client())