Skip to content

Commit

Permalink
dt/dl: Integration test for past-schema fallback
Browse files Browse the repository at this point in the history
Signed-off-by: Oren Leiman <[email protected]>
  • Loading branch information
oleiman committed Jan 31, 2025
1 parent a5d9fba commit ac6735d
Showing 1 changed file with 33 additions and 0 deletions.
33 changes: 33 additions & 0 deletions tests/rptest/tests/datalake/schema_evolution_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,3 +586,36 @@ def test_reorder_columns(self, cloud_storage_type, query_engine):
f"Expected {count*2} rows, got {len(select_out)}"
assert all(r[1] == field for r in select_out), \
f"{field} column mangled: {select_out}"

@cluster(num_nodes=3)
@matrix(
cloud_storage_type=supported_storage_types(),
query_engine=QUERY_ENGINES,
test_case=list(LEGAL_TEST_CASES.keys()),
)
def test_old_schema_writer(self, cloud_storage_type, query_engine,
test_case):
"""
Tests that, after a backwards compatible update from schema A to schema B, we can keep
tranlsating records produced with schema A without another schema update by falling back
to an already extant parquet writer for schema A.
"""
with self.setup_services(query_engine) as dl:

count = 10
ctx = TranslationContext()

initial_schema, next_schema = LEGAL_TEST_CASES[test_case]

for schema in [initial_schema, next_schema]:
self.produce(dl, schema, count, ctx)
self.check_table_schema(dl, query_engine, schema)

self.produce(dl, initial_schema, count, ctx)
self.check_table_schema(dl, query_engine, next_schema)

select_out = self._select(dl, query_engine,
next_schema.field_names)

assert len(select_out) == count * 3, \
f"Expected {count*3} rows, got {len(select_out)}"

0 comments on commit ac6735d

Please sign in to comment.