From ac6735db91c6cdc1efea2b411e438f8672b54986 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Thu, 23 Jan 2025 16:26:50 -0800 Subject: [PATCH] dt/dl: Integration test for past-schema fallback Signed-off-by: Oren Leiman --- .../tests/datalake/schema_evolution_test.py | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/tests/rptest/tests/datalake/schema_evolution_test.py b/tests/rptest/tests/datalake/schema_evolution_test.py index ad9355b28c6f..6dee30e87446 100644 --- a/tests/rptest/tests/datalake/schema_evolution_test.py +++ b/tests/rptest/tests/datalake/schema_evolution_test.py @@ -586,3 +586,36 @@ def test_reorder_columns(self, cloud_storage_type, query_engine): f"Expected {count*2} rows, got {len(select_out)}" assert all(r[1] == field for r in select_out), \ f"{field} column mangled: {select_out}" + + @cluster(num_nodes=3) + @matrix( + cloud_storage_type=supported_storage_types(), + query_engine=QUERY_ENGINES, + test_case=list(LEGAL_TEST_CASES.keys()), + ) + def test_old_schema_writer(self, cloud_storage_type, query_engine, + test_case): + """ + Tests that, after a backwards compatible update from schema A to schema B, we can keep + tranlsating records produced with schema A without another schema update by falling back + to an already extant parquet writer for schema A. + """ + with self.setup_services(query_engine) as dl: + + count = 10 + ctx = TranslationContext() + + initial_schema, next_schema = LEGAL_TEST_CASES[test_case] + + for schema in [initial_schema, next_schema]: + self.produce(dl, schema, count, ctx) + self.check_table_schema(dl, query_engine, schema) + + self.produce(dl, initial_schema, count, ctx) + self.check_table_schema(dl, query_engine, next_schema) + + select_out = self._select(dl, query_engine, + next_schema.field_names) + + assert len(select_out) == count * 3, \ + f"Expected {count*3} rows, got {len(select_out)}"