Skip to content

Commit

Permalink
feat: Implement insert condition for merge strategy. (#475)
Browse files Browse the repository at this point in the history
  • Loading branch information
talhasenSC authored Oct 25, 2023
1 parent d9439ca commit d2bd2c8
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 3 deletions.
25 changes: 23 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,9 @@ It is possible to use Iceberg in an incremental fashion, specifically two strate
be useful for improving performance via predicate pushdown on the target table.
* `delete_condition` (optional): SQL condition used to identify records that should be deleted.
* `update_condition` (optional): SQL condition used to identify records that should be updated.
* `incremental_predicates`, `delete_condition` and `update_condition` can include any column of the incremental
table (`src`) or the final table (`target`).
* `insert_condition` (optional): SQL condition used to identify records that should be inserted.
* `incremental_predicates`, `delete_condition`, `update_condition` and `insert_condition` can include any column of
the incremental table (`src`) or the final table (`target`).
Column names must be prefixed by either `src` or `target` to prevent a `Column is ambiguous` error.

`delete_condition` example:
Expand Down Expand Up @@ -444,6 +445,26 @@ select * from (
{% endif %}
```

`insert_condition` example:

```sql
{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key=['id'],
insert_condition='target.status != 0',
schema='sandbox'
)
}}
select * from (
values
(1, 0)
, (2, 1)
) as t (id, status)
```

### Highly available table (HA)

The current implementation of the table materialization can lead to downtime, as target table is dropped and re-created.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
{% set incremental_predicates = config.get('incremental_predicates') %}
{% set delete_condition = config.get('delete_condition') %}
{% set update_condition = config.get('update_condition') %}
{% set insert_condition = config.get('insert_condition') %}
{% set empty_unique_key -%}
Merge strategy must implement unique_key as a single column or a list of columns.
{%- endset %}
Expand Down Expand Up @@ -81,6 +82,7 @@
existing_relation=existing_relation,
delete_condition=delete_condition,
update_condition=update_condition,
insert_condition=insert_condition,
)
%}
{% do to_drop.append(tmp_relation) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
existing_relation,
delete_condition,
update_condition,
insert_condition,
statement_name="main"
)
%}
Expand Down Expand Up @@ -118,7 +119,7 @@
{%- endif -%}
{%- endfor %}
{%- endif %}
when not matched
when not matched {% if insert_condition is not none -%} and {{ insert_condition }} {%- endif %}
then insert ({{ dest_cols_csv }})
values ({{ src_cols_csv }})
{%- endset -%}
Expand Down
63 changes: 63 additions & 0 deletions tests/functional/adapter/test_incremental_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,42 @@
2,v2-updated
"""

models__insert_condition_sql = """
{{ config(
table_type='iceberg',
materialized='incremental',
incremental_strategy='merge',
unique_key=['id'],
insert_condition='src.status != 0'
)
}}
{% if is_incremental() %}
select * from (
values
(1, -1)
, (2, 0)
, (3, 1)
) as t (id, status)
{% else %}
select * from (
values
(0, 1)
) as t (id, status)
{% endif %}
"""

seeds__expected_insert_condition_csv = """id,status
0, 1
1,-1
3,1
"""


class TestIcebergIncrementalUniqueKey(BaseIncrementalUniqueKey):
@pytest.fixture(scope="class")
Expand Down Expand Up @@ -325,3 +361,30 @@ def replace_cast_date(model: str) -> str:

new_model = re.sub("'[0-9]{4}-[0-9]{2}-[0-9]{2}'", r"cast(\g<0> as date)", model)
return new_model


class TestIcebergInsertCondition:
@pytest.fixture(scope="class")
def models(self):
return {"merge_insert_condition.sql": models__insert_condition_sql}

@pytest.fixture(scope="class")
def seeds(self):
return {"expected_merge_insert_condition.csv": seeds__expected_insert_condition_csv}

def test__merge_insert_condition(self, project):
"""Seed should match the model after run"""

expected_seed_name = "expected_merge_insert_condition"
run_dbt(["seed", "--select", expected_seed_name, "--full-refresh"])

relation_name = "merge_insert_condition"
model_run = run_dbt(["run", "--select", relation_name])
model_run_result = model_run.results[0]
assert model_run_result.status == RunStatus.Success

model_update = run_dbt(["run", "--select", relation_name])
model_update_result = model_update.results[0]
assert model_update_result.status == RunStatus.Success

check_relations_equal(project.adapter, [relation_name, expected_seed_name])

0 comments on commit d2bd2c8

Please sign in to comment.