Skip to content

Commit

Permalink
Valid history strategy (#196)
Browse files Browse the repository at this point in the history
* Better target table management  and valid "end" time feature.

* Updated readme to document valid_history parameters

* added tests for valid_hostory

* workflow changes

* removed unwanted and duplicate file

* removed .gitkeep

* changes in the test models

* incremental_strategy_change

* deleted foreign tables models

* deleted older test file

* setting permission for run.sh file

* Update ci-integration-tests-csae.yml

* Update ci-integration-tests-csae.yml

* Update ci-integration-tests-csae.yml

* added use_valid_to_time for a testing model

* removed python 3.8 from workflow as dbt-core removed it from latest version

---------

Co-authored-by: Turpaud, Remi <[email protected]>
Co-authored-by: Mohan Talla <[email protected]>
  • Loading branch information
3 people authored Oct 28, 2024
1 parent 116924f commit 89f7cc7
Show file tree
Hide file tree
Showing 37 changed files with 1,675 additions and 154 deletions.
22 changes: 20 additions & 2 deletions .github/workflows/ci-integration-tests-csae.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
test:
strategy:
matrix:
python: ['3.8', '3.9', '3.10', '3.11', '3.12']
python: ['3.9', '3.10', '3.11', '3.12']
runs-on: ubuntu-latest
name: Functional test
steps:
Expand Down Expand Up @@ -205,7 +205,7 @@ jobs:
cd $GITHUB_WORKSPACE/test/catalog/without_qvci
./run.sh
- name: Setup environment with TERA for performance and catalog tests
- name: Setup environment with TERA for performance, catalog and valid history tests
run: |
rm ~/.dbt/profiles.yml
cat << EOF > ~/.dbt/profiles.yml
Expand Down Expand Up @@ -247,6 +247,18 @@ jobs:
timeout_seconds: 300
priority: interactive
retries: 1
dbt_test_valid_history:
type: teradata
host: $DBT_TERADATA_SERVER_NAME
user: $DBT_TERADATA_USERNAME
password: $DBT_TERADATA_PASSWORD
logmech: TD2
schema: dbt_valid_history
tmode: TERA
threads: 4
timeout_seconds: 300
priority: interactive
retries: 1
EOF
env:
DBT_TERADATA_SERVER_NAME: ${{ steps.create-csae-environments.outputs.teradata-server-name }}
Expand All @@ -268,6 +280,12 @@ jobs:
cd $GITHUB_WORKSPACE/test/catalog/without_qvci
./run.sh
- name: Run valid history tests
run: |
cd $GITHUB_WORKSPACE/test/valid_history_test
chmod 777 run.sh
./run.sh
- name: Run nopi tests
run: |
sed -i "s/hostname=dbt.*/hostname='$DBT_TERADATA_SERVER_NAME'/g" tests/conftest.py
Expand Down
21 changes: 11 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,27 +270,28 @@ The following incremental materialization strategies are supported:
unique_key='id',
on_schema_change='fail',
incremental_strategy='valid_history',
valid_from='valid_from_column',
history_column_in_target='history_period_column'
valid_period='valid_period_col',
use_valid_to_time='no',
)
}}
```
`valid_history` incremental strategy requires the following parameters:
* `valid_from` - Column in the source table of **timestamp** datatype indicating when each record became valid.
* `history_column_in_target` - Column in the target table of **period** datatype that tracks history.
* `unique_key`: The primary key of the model (excluding the valid time components), specified as a column name or list of column names.
* `valid_period`: Name of the model column indicating the period for which the record is considered to be valid. The datatype must be `PERIOD(DATE)` or `PERIOD(TIMESTAMP)`.
* `use_valid_to_time`: Wether the end bound value of the valid period in the input is considered by the strategy when building the valid timeline. Use 'no' if you consider your record to be valid until changed (and supply any value greater to the begin bound for the end bound of the period - a typical convention is `9999-12-31` of ``9999-12-31 23:59:59.999999`). Use 'yes' if you know until when the record is valid (typically this is a correction in the history timeline).



> The valid_history strategy in dbt-teradata involves several critical steps to ensure the integrity and accuracy of historical data management:
> * Remove duplicates and conflicting values from the source data:
> * This step ensures that the data is clean and ready for further processing by eliminating any redundant or conflicting records.
> * The process of removing duplicates and conflicting values from the source data involves using a ranking mechanism to ensure that only the highest-priority records are retained. This is accomplished using the SQL RANK() function.
> * Identify and adjust overlapping time slices:
> * Overlapping time periods in the data are detected and corrected to maintain a consistent and non-overlapping timeline.
> * Manage records needing to be overwritten or split based on the source and target data:
> * The process of removing primary key duplicates (ie. two or more records with the same value for the `unique_key` and BEGIN() bond of the `valid_period` fields) in the dataset produced by the model. If such duplicates exist, the row with the lowest value is retained for all non-primary-key fields (in the order specified in the model) is retained. Full-row duplicates are always de-duplicated.
> * Identify and adjust overlapping time slices (if use_valid_to_time='yes):
> * Overlapping time periods in the data are corrected to maintain a consistent and non-overlapping timeline. To do so, the valid period end bound of a record is adjusted to meet the begin bound of the next record with the same `unique_key` value and overlapping `valid_period` value if any.
> * Manage records needing to be adjusted, deleted or split based on the source and target data:
> * This involves handling scenarios where records in the source data overlap with or need to replace records in the target data, ensuring that the historical timeline remains accurate.
> * Utilize the TD_NORMALIZE_MEET function to compact history:
> * This function helps to normalize and compact the history by merging adjacent time periods, improving the efficiency and performance of the database.
> * Compact history:
> * Normalize and compact the history by merging records of adjacent time periods withe same value, optimizing database storage and performance. We use the function TD_NORMALIZE_MEET for this purpose.
> * Delete existing overlapping records from the target table:
> * Before inserting new or updated records, any existing records in the target table that overlap with the new data are removed to prevent conflicts.
> * Insert the processed data into the target table:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@


{% macro teradata__get_incremental_sql(strategy, target_relation, tmp_relation, unique_key, dest_columns,incremental_predicates,
valid_period, valid_from, valid_to, use_valid_to_time, history_column_in_target, resolve_conflicts) %}
valid_period, use_valid_to_time, resolve_conflicts) %}
{% if strategy == 'delete+insert' %}
{% do return(teradata__get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns, incremental_predicates)) %}
{% elif strategy == 'append' %}
{% do return(teradata__get_incremental_append_sql(target_relation, tmp_relation, dest_columns)) %}
{% elif strategy == 'merge' %}
{% do return(teradata__get_merge_sql(target_relation, tmp_relation, unique_key, dest_columns,incremental_predicates)) %}
{% elif strategy == 'valid_history' %}
{% do return(teradata__get_incremental_valid_history_sql(target_relation, tmp_relation, unique_key, valid_period, valid_from, valid_to, use_valid_to_time, history_column_in_target, resolve_conflicts)) %}
{% do return(teradata__get_incremental_valid_history_sql(target_relation, tmp_relation, unique_key, valid_period, use_valid_to_time, resolve_conflicts)) %}
{% else %}
{% do exceptions.raise_compiler_error("Invalid Strategy") %}
{% endif %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,8 @@

-- Start: Below are the configuration options for the valid_history strategy
{% set valid_period = config.get('valid_period', none) %}
{% set valid_from = config.get('valid_from', none) %}
{% set valid_to = config.get('valid_to', none) %}
{% set use_valid_to_time = config.get('use_valid_to_time', default='no') %}
{% set resolve_conflicts = config.get('resolve_conflicts', default='yes') %}
{% set history_column_in_target = config.get('history_column_in_target', none) %}
-- End: Above are the configuration options for the valid_history strategy

{% set target_relation = this.incorporate(type='table') %}
Expand Down Expand Up @@ -62,7 +59,7 @@


{% set build_sql = teradata__get_incremental_sql(strategy, target_relation, tmp_relation, unique_key, dest_columns,incremental_predicates,
valid_period, valid_from, valid_to, use_valid_to_time, history_column_in_target, resolve_conflicts) %}
valid_period, use_valid_to_time, resolve_conflicts) %}


{% do to_drop.append(tmp_relation) %}
Expand Down
Loading

0 comments on commit 89f7cc7

Please sign in to comment.