From 58107197e0c62004532850dbc31e43963039c2af Mon Sep 17 00:00:00 2001 From: Peter Kosztolanyi Date: Mon, 20 Apr 2020 10:45:43 +0100 Subject: [PATCH] [AP-659] Fix extracting data from tables with space in the name (#49) --- .../sync_strategies/logical_replication.py | 49 ++++++++++++++++--- tests/test_logical_replication.py | 41 ++++++++++++++++ 2 files changed, 82 insertions(+), 8 deletions(-) create mode 100644 tests/test_logical_replication.py diff --git a/tap_postgres/sync_strategies/logical_replication.py b/tap_postgres/sync_strategies/logical_replication.py index 50c28226..3e1a0ecb 100644 --- a/tap_postgres/sync_strategies/logical_replication.py +++ b/tap_postgres/sync_strategies/logical_replication.py @@ -401,6 +401,36 @@ def locate_replication_slot(conn_info): raise Exception("Unable to find replication slot {} with wal2json".format(db_specific_slot)) +# pylint: disable=anomalous-backslash-in-string +def streams_to_wal2json_tables(streams): + """Converts a list of singer stream dictionaries to wal2json plugin compatible string list. + The output is compatible with the 'filter-tables' and 'add-tables' option of wal2json plugin. + + Special characters (space, single quote, comma, period, asterisk) must be escaped with backslash. + Schema and table are case-sensitive. Table "public"."Foo bar" should be specified as "public.Foo\ bar". + Documentation in wal2json plugin: https://github.com/eulerto/wal2json/blob/master/README.md#parameters + + :param streams: List of singer stream dictionaries + :return: tables(str): comma separated and escaped list of tables, compatible for wal2json plugin + :rtype: str + """ + def escape_spec_chars(string): + escaped = string + wal2json_special_chars = " ',.*" + for ch in wal2json_special_chars: + escaped = escaped.replace(ch, f'\\{ch}') + return escaped + + tables = [] + for s in streams: + schema_name = escape_spec_chars(s['metadata'][0]['metadata']['schema-name']) + table_name = escape_spec_chars(s['table_name']) + + tables.append(f'{schema_name}.{table_name}') + + return ','.join(tables) + + def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): state_comitted = state lsn_comitted = min([get_bookmark(state_comitted, s['tap_stream_id'], 'lsn') for s in logical_streams]) @@ -419,10 +449,6 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): poll_interval = 10 poll_timestamp = None - selected_tables = [] - for s in logical_streams: - selected_tables.append("{}.{}".format(s['metadata'][0]['metadata']['schema-name'], s['table_name'])) - for s in logical_streams: sync_common.send_schema_message(s, ['lsn']) @@ -444,10 +470,17 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): int_to_lsn(end_lsn), slot) # psycopg2 2.8.4 will send a keep-alive message to postgres every status_interval - cur.start_replication(slot_name=slot, decode=True, start_lsn=start_lsn, status_interval=poll_interval, - options={'write-in-chunks': 1, 'add-tables': ','.join(selected_tables)}) - except psycopg2.ProgrammingError: - raise Exception("Unable to start replication with logical replication (slot {})".format(slot)) + cur.start_replication(slot_name=slot, + decode=True, + start_lsn=start_lsn, + status_interval=poll_interval, + options={ + 'write-in-chunks': 1, + 'add-tables': streams_to_wal2json_tables(logical_streams) + }) + + except psycopg2.ProgrammingError as ex: + raise Exception("Unable to start replication with logical replication (slot {})".format(ex)) lsn_received_timestamp = datetime.datetime.utcnow() poll_timestamp = datetime.datetime.utcnow() diff --git a/tests/test_logical_replication.py b/tests/test_logical_replication.py new file mode 100644 index 00000000..dd0300cf --- /dev/null +++ b/tests/test_logical_replication.py @@ -0,0 +1,41 @@ +import unittest + +from tap_postgres.sync_strategies import logical_replication + + +class TestLogicalReplication(unittest.TestCase): + maxDiff = None + + def setUp(self): + pass + + def test_streams_to_wal2json_tables(self): + """Validate if table names are escaped to wal2json format""" + streams = [ + {'metadata': [{'metadata': {'schema-name': 'public'}}], + 'table_name': 'dummy_table'}, + {'metadata': [{'metadata': {'schema-name': 'public'}}], + 'table_name': 'CaseSensitiveTable'}, + {'metadata': [{'metadata': {'schema-name': 'public'}}], + 'table_name': 'Case Sensitive Table With Space'}, + {'metadata': [{'metadata': {'schema-name': 'CaseSensitiveSchema'}}], + 'table_name': 'dummy_table'}, + {'metadata': [{'metadata': {'schema-name': 'Case Sensitive Schema With Space'}}], + 'table_name': 'CaseSensitiveTable'}, + {'metadata': [{'metadata': {'schema-name': 'Case Sensitive Schema With Space'}}], + 'table_name': 'Case Sensitive Table With Space'}, + {'metadata': [{'metadata': {'schema-name': 'public'}}], + 'table_name': 'table_with_comma_,'}, + {'metadata': [{'metadata': {'schema-name': 'public'}}], + 'table_name': "table_with_quote_'"} + ] + + self.assertEqual(logical_replication.streams_to_wal2json_tables(streams), + 'public.dummy_table,' + 'public.CaseSensitiveTable,' + 'public.Case\\ Sensitive\\ Table\\ With\\ Space,' + 'CaseSensitiveSchema.dummy_table,' + 'Case\\ Sensitive\\ Schema\\ With\\ Space.CaseSensitiveTable,' + 'Case\\ Sensitive\\ Schema\\ With\\ Space.Case\\ Sensitive\\ Table\\ With\\ Space,' + 'public.table_with_comma_\\,,' + "public.table_with_quote_\\'")