From 193f4d86c596b6851f5ab8623290cb526a31f432 Mon Sep 17 00:00:00 2001 From: Louis Pieterse <45560107+louis-pie@users.noreply.github.com> Date: Tue, 10 Mar 2020 13:30:21 +0000 Subject: [PATCH] Backward compatibility for PG version > 10 (#44) --- setup.py | 2 +- .../sync_strategies/logical_replication.py | 46 ++++++++++--------- 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/setup.py b/setup.py index 4edf2fd2..19959470 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ long_description = f.read() setup(name='pipelinewise-tap-postgres', - version='1.5.1', + version='1.5.2', description='Singer.io tap for extracting data from PostgresSQL - PipelineWise compatible', long_description=long_description, long_description_content_type='text/markdown', diff --git a/tap_postgres/sync_strategies/logical_replication.py b/tap_postgres/sync_strategies/logical_replication.py index 53004715..98daa121 100644 --- a/tap_postgres/sync_strategies/logical_replication.py +++ b/tap_postgres/sync_strategies/logical_replication.py @@ -19,10 +19,12 @@ UPDATE_BOOKMARK_PERIOD = 10000 -def get_pg_version(cur): - cur.execute("SELECT setting::int AS version FROM pg_settings WHERE name='server_version_num'") - version = cur.fetchone()[0] - LOGGER.debug("Detected PostgreSQL version: %s", version) +def get_pg_version(conn_info): + with post_db.open_connection(conn_info, False) as conn: + with conn.cursor() as cur: + cur.execute("SELECT setting::int AS version FROM pg_settings WHERE name='server_version_num'") + version = cur.fetchone()[0] + LOGGER.debug("Detected PostgreSQL version: {}".format(version)) return version @@ -60,25 +62,24 @@ def int_to_lsn(lsni): def fetch_current_lsn(conn_config): + version = get_pg_version(conn_config) + # Make sure PostgreSQL version is 9.4 or higher + # Do not allow minor versions with PostgreSQL BUG #15114 + if (version >= 110000) and (version < 110002): + raise Exception('PostgreSQL upgrade required to minor version 11.2') + elif (version >= 100000) and (version < 100007): + raise Exception('PostgreSQL upgrade required to minor version 10.7') + elif (version >= 90600) and (version < 90612): + raise Exception('PostgreSQL upgrade required to minor version 9.6.12') + elif (version >= 90500) and (version < 90516): + raise Exception('PostgreSQL upgrade required to minor version 9.5.16') + elif (version >= 90400) and (version < 90421): + raise Exception('PostgreSQL upgrade required to minor version 9.4.21') + elif (version < 90400): + raise Exception('Logical replication not supported before PostgreSQL 9.4') + with post_db.open_connection(conn_config, False) as conn: with conn.cursor() as cur: - # Make sure PostgreSQL version is 9.4 or higher - version = get_pg_version(cur) - - # Do not allow minor versions with PostgreSQL BUG #15114 - if (version >= 110000) and (version < 110002): - raise Exception('PostgreSQL upgrade required to minor version 11.2') - elif (version >= 100000) and (version < 100007): - raise Exception('PostgreSQL upgrade required to minor version 10.7') - elif (version >= 90600) and (version < 90612): - raise Exception('PostgreSQL upgrade required to minor version 9.6.12') - elif (version >= 90500) and (version < 90516): - raise Exception('PostgreSQL upgrade required to minor version 9.5.16') - elif (version >= 90400) and (version < 90421): - raise Exception('PostgreSQL upgrade required to minor version 9.4.21') - elif (version < 90400): - raise Exception('Logical replication not supported before PostgreSQL 9.4') - # Use version specific lsn command if version >= 100000: cur.execute("SELECT pg_current_wal_lsn() AS current_lsn") @@ -388,12 +389,13 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): for s in logical_streams: sync_common.send_schema_message(s, ['lsn']) + version = get_pg_version(conn_info) + # Create replication connection and cursor conn = post_db.open_connection(conn_info, True) cur = conn.cursor() # Set session wal_sender_timeout for PG12 and above - version = get_pg_version(cur) if (version >= 120000): wal_sender_timeout = 10800000 #10800000ms = 3 hours LOGGER.info("Set session wal_sender_timeout = {} milliseconds".format(wal_sender_timeout))