diff --git a/dlt/destinations/impl/filesystem/sql_client.py b/dlt/destinations/impl/filesystem/sql_client.py index d39f4c3431..e6b84343bb 100644 --- a/dlt/destinations/impl/filesystem/sql_client.py +++ b/dlt/destinations/impl/filesystem/sql_client.py @@ -214,14 +214,17 @@ def create_views_for_tables(self, tables: Dict[str, str]) -> None: # unknown views will not be created continue - # only create view if it does not exist in the current schema yet - existing_tables = [tname[0] for tname in self._conn.execute("SHOW TABLES").fetchall()] - if view_name in existing_tables: - continue - # NOTE: if this is staging configuration then `prepare_load_table` will remove some info # from table schema, if we ever extend this to handle staging destination, this needs to change schema_table = self.fs_client.prepare_load_table(table_name) + table_format = schema_table.get("table_format") + + # skip if view already exists and does not need to be replaced each time + existing_tables = [tname[0] for tname in self._conn.execute("SHOW TABLES").fetchall()] + needs_replace = table_format == "iceberg" or self.fs_client.config.protocol == "abfss" + if view_name in existing_tables and not needs_replace: + continue + # discover file type folder = self.fs_client.get_table_dir(table_name) files = self.fs_client.list_table_files(table_name) @@ -258,15 +261,17 @@ def create_views_for_tables(self, tables: Dict[str, str]) -> None: # create from statement from_statement = "" - if schema_table.get("table_format") == "delta": + if table_format == "delta": from_statement = f"delta_scan('{resolved_folder}')" - elif schema_table.get("table_format") == "iceberg": + elif table_format == "iceberg": from dlt.common.libs.pyiceberg import _get_last_metadata_file self._setup_iceberg(self._conn) metadata_path = f"{resolved_folder}/metadata" last_metadata_file = _get_last_metadata_file(metadata_path, self.fs_client) - from_statement = f"iceberg_scan('{last_metadata_file}')" + # skip schema inference to make nested data types work + # https://github.com/duckdb/duckdb_iceberg/issues/47 + from_statement = f"iceberg_scan('{last_metadata_file}', skip_schema_inference=True)" elif first_file_type == "parquet": from_statement = f"read_parquet([{resolved_files_string}])" elif first_file_type == "jsonl": @@ -281,7 +286,9 @@ def create_views_for_tables(self, tables: Dict[str, str]) -> None: # create table view_name = self.make_qualified_table_name(view_name) - create_table_sql_base = f"CREATE VIEW {view_name} AS SELECT * FROM {from_statement}" + create_table_sql_base = ( + f"CREATE OR REPLACE VIEW {view_name} AS SELECT * FROM {from_statement}" + ) self._conn.execute(create_table_sql_base) @contextmanager diff --git a/tests/load/filesystem/test_sql_client.py b/tests/load/filesystem/test_sql_client.py index a73b0f7e31..4f537d129c 100644 --- a/tests/load/filesystem/test_sql_client.py +++ b/tests/load/filesystem/test_sql_client.py @@ -22,6 +22,7 @@ ) from dlt.destinations import filesystem from tests.utils import TEST_STORAGE_ROOT +from tests.cases import arrow_table_all_data_types from dlt.destinations.exceptions import DatabaseUndefinedRelation @@ -81,12 +82,17 @@ def double_items(): for i in range(total_records) ] - return [items, double_items] + @dlt.resource(table_format=table_format) + def arrow_all_types(): + yield arrow_table_all_data_types("arrow-table", num_rows=total_records)[0] + + return [items, double_items, arrow_all_types] # run source pipeline.run(source(), loader_file_format=destination_config.file_format) if alternate_access_pipeline: + orig_dest = pipeline.destination pipeline.destination = alternate_access_pipeline.destination import duckdb @@ -96,8 +102,11 @@ def double_items(): DuckDbCredentials, ) - # check we can create new tables from the views with pipeline.sql_client() as c: + # check if all data types are handled properly + c.execute_sql("SELECT * FROM arrow_all_types;") + + # check we can create new tables from the views c.execute_sql( "CREATE TABLE items_joined AS (SELECT i.id, di.double_id FROM items as i JOIN" " double_items as di ON (i.id = di.id));" @@ -109,16 +118,14 @@ def double_items(): assert list(joined_table[5]) == [5, 10] assert list(joined_table[10]) == [10, 20] - # inserting values into a view should fail gracefully - with pipeline.sql_client() as c: + # inserting values into a view should fail gracefully try: c.execute_sql("INSERT INTO double_items VALUES (1, 2)") except Exception as exc: assert "double_items is not an table" in str(exc) - # check that no automated views are created for a schema different than - # the known one - with pipeline.sql_client() as c: + # check that no automated views are created for a schema different than + # the known one c.execute_sql("CREATE SCHEMA other_schema;") with pytest.raises(DatabaseUndefinedRelation): with c.execute_query("SELECT * FROM other_schema.items ORDER BY id ASC;") as cursor: @@ -172,6 +179,24 @@ def _fs_sql_client_for_external_db( # views exist assert len(external_db.sql("SELECT * FROM second.referenced_items").fetchall()) == total_records assert len(external_db.sql("SELECT * FROM first.items").fetchall()) == 3 + + # test if view reflects source table accurately after it has changed + # conretely, this tests if an existing view is replaced with formats that need it, such as + # `iceberg` table format + with fs_sql_client as sql_client: + sql_client.create_views_for_tables({"arrow_all_types": "arrow_all_types"}) + assert external_db.sql("FROM second.arrow_all_types;").arrow().num_rows == total_records + if alternate_access_pipeline: + # switch back for the write path + pipeline.destination = orig_dest + pipeline.run( # run pipeline again to add rows to source table + source().with_resources("arrow_all_types"), + loader_file_format=destination_config.file_format, + ) + with fs_sql_client as sql_client: + sql_client.create_views_for_tables({"arrow_all_types": "arrow_all_types"}) + assert external_db.sql("FROM second.arrow_all_types;").arrow().num_rows == (2 * total_records) + external_db.close() # in case we are not connecting to a bucket that needs secrets, views should still be here after connection reopen @@ -298,6 +323,7 @@ def test_table_formats( pipeline = destination_config.setup_pipeline( "read_pipeline", dataset_name="read_test", + dev_mode=True, ) # in case of gcs we use the s3 compat layer for reading @@ -310,7 +336,7 @@ def test_table_formats( GCS_BUCKET.replace("gs://", "s3://"), destination_name="filesystem_s3_gcs_comp" ) access_pipeline = destination_config.setup_pipeline( - "read_pipeline", dataset_name="read_test", destination=gcp_bucket + "read_pipeline", dataset_name="read_test", dev_mode=True, destination=gcp_bucket ) _run_dataset_checks(