diff --git a/setup.py b/setup.py index 43bd50bb9..fa0a5058f 100644 --- a/setup.py +++ b/setup.py @@ -46,6 +46,7 @@ extras_require={ 'test': [ 'pre-commit==2.18.1', + 'python-dateutil==2.8.2', 'flake8==4.0.1', 'pytest==7.1.1', 'pytest-dependency==0.4.0', diff --git a/tests/end_to_end/target_snowflake/tap_postgres/test_replicate_pg_to_sf_without_delete_in_target.py b/tests/end_to_end/target_snowflake/tap_postgres/test_replicate_pg_to_sf_without_delete_in_target.py new file mode 100644 index 000000000..859e136e9 --- /dev/null +++ b/tests/end_to_end/target_snowflake/tap_postgres/test_replicate_pg_to_sf_without_delete_in_target.py @@ -0,0 +1,61 @@ +from datetime import datetime +import dateutil.parser + +from pipelinewise.fastsync import postgres_to_snowflake +from tests.end_to_end.helpers import assertions +from tests.end_to_end.target_snowflake.tap_postgres import TapPostgres + +TAP_ID = 'postgres_to_sf_without_delete_in_target' +TARGET_ID = 'snowflake' + + +class TestReplicatePGToSFWithoutDeleteInTarget(TapPostgres): + """ + Resync tables from Postgres to Snowflake without replicating deletes + """ + + # pylint: disable=arguments-differ + def setUp(self): + super().setUp(tap_id=TAP_ID, target_id=TARGET_ID) + + def test_replicate_pg_to_sf_without_delete_in_target(self): + assertions.assert_run_tap_success( + self.tap_id, self.target_id, ['fastsync', 'singer'] + ) + + result = self.e2e_env.run_query_target_snowflake( + f'SELECT _SDC_DELETED_AT FROM ' + f'ppw_e2e_tap_postgres{self.e2e_env.sf_schema_postfix}."TABLE_WITH_SPACE AND UPPERCASE"' + f" where cvarchar='A';" + )[0][0] + + self.assertIsNone(result) + + # Delete row in source + self.e2e_env.run_query_tap_postgres( + 'DELETE FROM public."table_with_space and UPPERCase" WHERE cvarchar = \'A\';' + ) + + # Run tap second time + assertions.assert_run_tap_success( + self.tap_id, self.target_id, ['singer'], profiling=True + ) + + deleted_row = self.e2e_env.run_query_target_snowflake( + f'SELECT _SDC_DELETED_AT FROM ' + f'ppw_e2e_tap_postgres{self.e2e_env.sf_schema_postfix}."TABLE_WITH_SPACE AND UPPERCASE"' + f" where cvarchar='A';" + )[0] + + # Validate that the entire row data is still in the target + for column in deleted_row: + self.assertIsNotNone(column) + + deleted_at = self.e2e_env.run_query_target_snowflake( + f'SELECT _SDC_DELETED_AT FROM ' + f'ppw_e2e_tap_postgres{self.e2e_env.sf_schema_postfix}."TABLE_WITH_SPACE AND UPPERCASE"' + f" where cvarchar='A';" + )[0][0] + + # Validate that _sdc_deleted_at column exists and has been set + self.assertIsNotNone(deleted_at) diff --git a/tests/end_to_end/test-project/tap_postgres_to_sf_without_delete_in_target.yml.template b/tests/end_to_end/test-project/tap_postgres_to_sf_without_delete_in_target.yml.template new file mode 100644 index 000000000..f5c7ff3cc --- /dev/null +++ b/tests/end_to_end/test-project/tap_postgres_to_sf_without_delete_in_target.yml.template @@ -0,0 +1,47 @@ +--- + +# ------------------------------------------------------------------------------ +# General Properties +# ------------------------------------------------------------------------------ +id: "postgres_to_sf_without_delete_in_target" +name: "PostgreSQL source test database" +type: "tap-postgres" +owner: "test-runner" + + +# ------------------------------------------------------------------------------ +# Source (Tap) - PostgreSQL connection details +# ------------------------------------------------------------------------------ +db_conn: + host: "${TAP_POSTGRES_HOST}" # PostgreSQL host + logical_poll_total_seconds: 30 # Time out if no LOG_BASED changes received for 3 seconds + port: ${TAP_POSTGRES_PORT} # PostgreSQL port + user: "${TAP_POSTGRES_USER}" # PostgreSQL user + password: "${TAP_POSTGRES_PASSWORD}" # Plain string or vault encrypted + dbname: "${TAP_POSTGRES_DB}" # PostgreSQL database name + +hard_delete: false +add_metadata_columns: true + +# ------------------------------------------------------------------------------ +# Destination (Target) - Target properties +# Connection details should be in the relevant target YAML file +# ------------------------------------------------------------------------------ +target: "snowflake" # ID of the target connector where the data will be loaded +batch_size_rows: 1000 # Batch size for the stream to optimise load performance +stream_buffer_size: 0 # In-memory buffer size (MB) between taps and targets for asynchronous data pipes + + +# ------------------------------------------------------------------------------ +# Source to target Schema mapping +# ------------------------------------------------------------------------------ +schemas: + + ### SOURCE SCHEMA 1: public + - source_schema: "public" + target_schema: "ppw_e2e_tap_postgres${TARGET_SNOWFLAKE_SCHEMA_POSTFIX}" + + tables: + ### Table with space and mixed upper and lowercase characters + - table_name: "table_with_space and UPPERCase" + replication_method: "LOG_BASED"