Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
[AP-659] Fix extracting data from tables with space in the name (#49)
Browse files Browse the repository at this point in the history
  • Loading branch information
koszti authored Apr 20, 2020
1 parent d79d7ef commit 5810719
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 8 deletions.
49 changes: 41 additions & 8 deletions tap_postgres/sync_strategies/logical_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,36 @@ def locate_replication_slot(conn_info):
raise Exception("Unable to find replication slot {} with wal2json".format(db_specific_slot))


# pylint: disable=anomalous-backslash-in-string
def streams_to_wal2json_tables(streams):
"""Converts a list of singer stream dictionaries to wal2json plugin compatible string list.
The output is compatible with the 'filter-tables' and 'add-tables' option of wal2json plugin.
Special characters (space, single quote, comma, period, asterisk) must be escaped with backslash.
Schema and table are case-sensitive. Table "public"."Foo bar" should be specified as "public.Foo\ bar".
Documentation in wal2json plugin: https://github.com/eulerto/wal2json/blob/master/README.md#parameters
:param streams: List of singer stream dictionaries
:return: tables(str): comma separated and escaped list of tables, compatible for wal2json plugin
:rtype: str
"""
def escape_spec_chars(string):
escaped = string
wal2json_special_chars = " ',.*"
for ch in wal2json_special_chars:
escaped = escaped.replace(ch, f'\\{ch}')
return escaped

tables = []
for s in streams:
schema_name = escape_spec_chars(s['metadata'][0]['metadata']['schema-name'])
table_name = escape_spec_chars(s['table_name'])

tables.append(f'{schema_name}.{table_name}')

return ','.join(tables)


def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
state_comitted = state
lsn_comitted = min([get_bookmark(state_comitted, s['tap_stream_id'], 'lsn') for s in logical_streams])
Expand All @@ -419,10 +449,6 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
poll_interval = 10
poll_timestamp = None

selected_tables = []
for s in logical_streams:
selected_tables.append("{}.{}".format(s['metadata'][0]['metadata']['schema-name'], s['table_name']))

for s in logical_streams:
sync_common.send_schema_message(s, ['lsn'])

Expand All @@ -444,10 +470,17 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
int_to_lsn(end_lsn),
slot)
# psycopg2 2.8.4 will send a keep-alive message to postgres every status_interval
cur.start_replication(slot_name=slot, decode=True, start_lsn=start_lsn, status_interval=poll_interval,
options={'write-in-chunks': 1, 'add-tables': ','.join(selected_tables)})
except psycopg2.ProgrammingError:
raise Exception("Unable to start replication with logical replication (slot {})".format(slot))
cur.start_replication(slot_name=slot,
decode=True,
start_lsn=start_lsn,
status_interval=poll_interval,
options={
'write-in-chunks': 1,
'add-tables': streams_to_wal2json_tables(logical_streams)
})

except psycopg2.ProgrammingError as ex:
raise Exception("Unable to start replication with logical replication (slot {})".format(ex))

lsn_received_timestamp = datetime.datetime.utcnow()
poll_timestamp = datetime.datetime.utcnow()
Expand Down
41 changes: 41 additions & 0 deletions tests/test_logical_replication.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import unittest

from tap_postgres.sync_strategies import logical_replication


class TestLogicalReplication(unittest.TestCase):
maxDiff = None

def setUp(self):
pass

def test_streams_to_wal2json_tables(self):
"""Validate if table names are escaped to wal2json format"""
streams = [
{'metadata': [{'metadata': {'schema-name': 'public'}}],
'table_name': 'dummy_table'},
{'metadata': [{'metadata': {'schema-name': 'public'}}],
'table_name': 'CaseSensitiveTable'},
{'metadata': [{'metadata': {'schema-name': 'public'}}],
'table_name': 'Case Sensitive Table With Space'},
{'metadata': [{'metadata': {'schema-name': 'CaseSensitiveSchema'}}],
'table_name': 'dummy_table'},
{'metadata': [{'metadata': {'schema-name': 'Case Sensitive Schema With Space'}}],
'table_name': 'CaseSensitiveTable'},
{'metadata': [{'metadata': {'schema-name': 'Case Sensitive Schema With Space'}}],
'table_name': 'Case Sensitive Table With Space'},
{'metadata': [{'metadata': {'schema-name': 'public'}}],
'table_name': 'table_with_comma_,'},
{'metadata': [{'metadata': {'schema-name': 'public'}}],
'table_name': "table_with_quote_'"}
]

self.assertEqual(logical_replication.streams_to_wal2json_tables(streams),
'public.dummy_table,'
'public.CaseSensitiveTable,'
'public.Case\\ Sensitive\\ Table\\ With\\ Space,'
'CaseSensitiveSchema.dummy_table,'
'Case\\ Sensitive\\ Schema\\ With\\ Space.CaseSensitiveTable,'
'Case\\ Sensitive\\ Schema\\ With\\ Space.Case\\ Sensitive\\ Table\\ With\\ Space,'
'public.table_with_comma_\\,,'
"public.table_with_quote_\\'")

0 comments on commit 5810719

Please sign in to comment.