Skip to content

Commit

Permalink
test: @dlt.transformer ignores write_disposition="replace"
Browse files Browse the repository at this point in the history
  • Loading branch information
joscha committed Nov 28, 2024
1 parent 9a49868 commit 3fd9d25
Showing 1 changed file with 35 additions and 1 deletion.
36 changes: 35 additions & 1 deletion tests/load/pipeline/test_merge_disposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
)
from dlt.common.schema.typing import TLoaderMergeStrategy
from dlt.common.typing import StrAny
from dlt.common.utils import digest128
from dlt.common.utils import digest128, uniq_id
from dlt.common.destination import AnyDestination, DestinationCapabilitiesContext
from dlt.common.destination.exceptions import DestinationCapabilitiesException
from dlt.common.libs.pyarrow import row_tuples_to_arrow
Expand Down Expand Up @@ -1598,3 +1598,37 @@ def arrow_items(rows, schema_columns, timezone="UTC"):
{"id": 2, "name": "updated bar"},
],
)

def test_transformer_replace() -> None:
@dlt.resource()
def gen_pages_of_ids():
yield [{"id": 1},{"id": 2}]
yield [{"id": 1},{"id": 2}]

current_page = 0
@dlt.transformer(
data_from=gen_pages_of_ids,
write_disposition="replace",
primary_key="id",
merge_key="id"
)
def transformed_ids(
page: List[StrAny],
):
nonlocal current_page
current_page += 1
for item in page:
yield item | { "current_page": current_page }

pipeline_name = "pipe_" + uniq_id()
pipeline = dlt.pipeline(pipeline_name=pipeline_name, destination="duckdb")
info = pipeline.run(transformed_ids)
assert_load_info(info)
tables = load_tables_to_dicts(pipeline, transformed_ids.name, exclude_system_cols=True)
assert_records_as_set(
tables[transformed_ids.name],
[
{"id": 1, "current_page": 2},
{"id": 2, "current_page": 2},
],
)

0 comments on commit 3fd9d25

Please sign in to comment.