Skip to content

Commit

Permalink
Add e2e test for deletion with pg log-based replication
Browse files Browse the repository at this point in the history
Currently, deleting a record when using Postgres with log-based replication
will overwrite all non-PK and non-metadata columns in Snowflake with `null`.

This is an integration test for the fix in
transferwise/pipelinewise-target-snowflake#276
  • Loading branch information
Tolsto committed May 10, 2022
1 parent 8e0eaf0 commit 5aa5f8c
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 0 deletions.
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
extras_require={
'test': [
'pre-commit==2.18.1',
'python-dateutil==2.8.2',
'flake8==4.0.1',
'pytest==7.1.1',
'pytest-dependency==0.4.0',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from datetime import datetime
import dateutil.parser

from pipelinewise.fastsync import postgres_to_snowflake
from tests.end_to_end.helpers import assertions
from tests.end_to_end.target_snowflake.tap_postgres import TapPostgres

TAP_ID = 'postgres_to_sf_without_delete_in_target'
TARGET_ID = 'snowflake'


class TestReplicatePGToSFWithoutDeleteInTarget(TapPostgres):
"""
Resync tables from Postgres to Snowflake without replicating deletes
"""

# pylint: disable=arguments-differ
def setUp(self):
super().setUp(tap_id=TAP_ID, target_id=TARGET_ID)

def test_replicate_pg_to_sf_without_delete_in_target(self):
assertions.assert_run_tap_success(
self.tap_id, self.target_id, ['fastsync', 'singer']
)

result = self.e2e_env.run_query_target_snowflake(
f'SELECT _SDC_DELETED_AT FROM '
f'ppw_e2e_tap_postgres{self.e2e_env.sf_schema_postfix}."TABLE_WITH_SPACE AND UPPERCASE"'
f" where cvarchar='A';"
)[0][0]

self.assertIsNone(result)

# Delete row in source
self.e2e_env.run_query_tap_postgres(
'DELETE FROM public."table_with_space and UPPERCase" WHERE cvarchar = \'A\';'
)

# Run tap second time
assertions.assert_run_tap_success(
self.tap_id, self.target_id, ['singer'], profiling=True
)

deleted_row = self.e2e_env.run_query_target_snowflake(
f'SELECT _SDC_DELETED_AT FROM '
f'ppw_e2e_tap_postgres{self.e2e_env.sf_schema_postfix}."TABLE_WITH_SPACE AND UPPERCASE"'
f" where cvarchar='A';"
)[0]

# Validate that the entire row data is still in the target
for column in deleted_row:
self.assertIsNotNone(column)

deleted_at = self.e2e_env.run_query_target_snowflake(
f'SELECT _SDC_DELETED_AT FROM '
f'ppw_e2e_tap_postgres{self.e2e_env.sf_schema_postfix}."TABLE_WITH_SPACE AND UPPERCASE"'
f" where cvarchar='A';"
)[0][0]

# Validate that _sdc_deleted_at column exists and has been set
self.assertIsNotNone(deleted_at)
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
---

# ------------------------------------------------------------------------------
# General Properties
# ------------------------------------------------------------------------------
id: "postgres_to_sf_without_delete_in_target"
name: "PostgreSQL source test database"
type: "tap-postgres"
owner: "test-runner"


# ------------------------------------------------------------------------------
# Source (Tap) - PostgreSQL connection details
# ------------------------------------------------------------------------------
db_conn:
host: "${TAP_POSTGRES_HOST}" # PostgreSQL host
logical_poll_total_seconds: 30 # Time out if no LOG_BASED changes received for 3 seconds
port: ${TAP_POSTGRES_PORT} # PostgreSQL port
user: "${TAP_POSTGRES_USER}" # PostgreSQL user
password: "${TAP_POSTGRES_PASSWORD}" # Plain string or vault encrypted
dbname: "${TAP_POSTGRES_DB}" # PostgreSQL database name

hard_delete: false
add_metadata_columns: true

# ------------------------------------------------------------------------------
# Destination (Target) - Target properties
# Connection details should be in the relevant target YAML file
# ------------------------------------------------------------------------------
target: "snowflake" # ID of the target connector where the data will be loaded
batch_size_rows: 1000 # Batch size for the stream to optimise load performance
stream_buffer_size: 0 # In-memory buffer size (MB) between taps and targets for asynchronous data pipes


# ------------------------------------------------------------------------------
# Source to target Schema mapping
# ------------------------------------------------------------------------------
schemas:

### SOURCE SCHEMA 1: public
- source_schema: "public"
target_schema: "ppw_e2e_tap_postgres${TARGET_SNOWFLAKE_SCHEMA_POSTFIX}"

tables:
### Table with space and mixed upper and lowercase characters
- table_name: "table_with_space and UPPERCase"
replication_method: "LOG_BASED"

0 comments on commit 5aa5f8c

Please sign in to comment.