From 886ee6549f7889ab12d30e5a012aa8eb241592f9 Mon Sep 17 00:00:00 2001 From: Nils Mueller Date: Wed, 20 Apr 2022 00:45:28 +0200 Subject: [PATCH] Add support for Postgres' hstore type to fast sync Fast sync would map an hstore type to VARCHAR in Snowflake. However, that log based replication would then migrate the column to VARIANT during the first run. --- pipelinewise/fastsync/commons/tap_postgres.py | 6 +++++- .../fastsync/postgres_to_snowflake.py | 8 +++++++- tests/db/tap_postgres_data.sql | 20 +++++++++++++++++++ tests/end_to_end/helpers/db.py | 2 +- .../tap_postgres_to_sf.yml.template | 4 ++++ 5 files changed, 37 insertions(+), 3 deletions(-) diff --git a/pipelinewise/fastsync/commons/tap_postgres.py b/pipelinewise/fastsync/commons/tap_postgres.py index c1d46b03a..da91c4eca 100644 --- a/pipelinewise/fastsync/commons/tap_postgres.py +++ b/pipelinewise/fastsync/commons/tap_postgres.py @@ -415,9 +415,13 @@ def get_table_columns(self, table_name, max_num=None, date_type='date'): ,character_maximum_length FROM (SELECT column_name, - data_type, + CASE + WHEN data_type = 'USER-DEFINED' AND udt_name = 'hstore' THEN 'hstore' + ELSE data_type + END as data_type, CASE WHEN data_type = 'ARRAY' THEN 'array_to_json("' || column_name || '") AS ' || column_name + WHEN data_type = 'USER-DEFINED' AND udt_name = 'hstore' THEN column_name|| '::json AS ' || column_name WHEN data_type = 'date' THEN 'CASE WHEN "' ||column_name|| E'" < \\'0001-01-01\\' ' 'OR "' ||column_name|| E'" > \\'9999-12-31\\' THEN \\'9999-12-31\\' ' diff --git a/pipelinewise/fastsync/postgres_to_snowflake.py b/pipelinewise/fastsync/postgres_to_snowflake.py index cbbe8abfd..de097bf3a 100644 --- a/pipelinewise/fastsync/postgres_to_snowflake.py +++ b/pipelinewise/fastsync/postgres_to_snowflake.py @@ -41,8 +41,13 @@ LOCK = multiprocessing.Lock() -def tap_type_to_target_type(pg_type, *_): +def tap_type_to_target_type(pg_type, extra=None): """Data type mapping from Postgres to Snowflake""" + + # Needed for call from assertion test helper + if pg_type == 'user-defined' and extra == 'hstore': + pg_type = 'hstore' + return { 'char': 'VARCHAR', 'character': 'VARCHAR', @@ -75,6 +80,7 @@ def tap_type_to_target_type(pg_type, *_): 'ARRAY': 'VARIANT', 'json': 'VARIANT', 'jsonb': 'VARIANT', + 'hstore': 'VARIANT' }.get(pg_type, 'VARCHAR') diff --git a/tests/db/tap_postgres_data.sql b/tests/db/tap_postgres_data.sql index e83367f46..f93f6a12c 100644 --- a/tests/db/tap_postgres_data.sql +++ b/tests/db/tap_postgres_data.sql @@ -58,6 +58,26 @@ Maatjies', DATE '2021-01-30'), COMMIT; +BEGIN; +SET client_encoding = 'UTF8'; + +CREATE EXTENSION IF NOT EXISTS hstore; + +DROP TABLE IF EXISTS public.special_data_types; + +CREATE TABLE public.special_data_types( + id serial PRIMARY KEY, + cHstore hstore +); + +INSERt INTO public.special_data_types + (cHstore) +VALUES + ('"key" => "some value"'), + ('"spaced key" => "another value"'); +COMMIT; + + BEGIN; SET client_encoding = 'LATIN1'; diff --git a/tests/end_to_end/helpers/db.py b/tests/end_to_end/helpers/db.py index 700b58305..11bdc0683 100644 --- a/tests/end_to_end/helpers/db.py +++ b/tests/end_to_end/helpers/db.py @@ -133,7 +133,7 @@ def sql_get_columns_postgres(schemas: list) -> str: sql_schemas = ', '.join(f"'{schema}'" for schema in schemas) return f""" - SELECT table_name, STRING_AGG(CONCAT(column_name, ':', data_type, ':'), ';' ORDER BY column_name) + SELECT table_name, STRING_AGG(CONCAT(column_name, ':', data_type, ':', udt_name), ';' ORDER BY column_name) FROM information_schema.columns WHERE table_schema IN ({sql_schemas}) GROUP BY table_name diff --git a/tests/end_to_end/test-project/tap_postgres_to_sf.yml.template b/tests/end_to_end/test-project/tap_postgres_to_sf.yml.template index 4705983c9..2ce48b619 100644 --- a/tests/end_to_end/test-project/tap_postgres_to_sf.yml.template +++ b/tests/end_to_end/test-project/tap_postgres_to_sf.yml.template @@ -67,6 +67,10 @@ schemas: replication_method: "INCREMENTAL" replication_key: "id" + ### Table with special data types + - table_name: "special_data_types" + replication_method: "LOG_BASED" + ### Table with space and mixed upper and lowercase characters - table_name: "table_with_space and UPPERCase" replication_method: "LOG_BASED"