Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
Backward compatibility for PG version > 10 (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
louis-pie authored Mar 10, 2020
1 parent 68e8cb5 commit 193f4d8
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 23 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
long_description = f.read()

setup(name='pipelinewise-tap-postgres',
version='1.5.1',
version='1.5.2',
description='Singer.io tap for extracting data from PostgresSQL - PipelineWise compatible',
long_description=long_description,
long_description_content_type='text/markdown',
Expand Down
46 changes: 24 additions & 22 deletions tap_postgres/sync_strategies/logical_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

UPDATE_BOOKMARK_PERIOD = 10000

def get_pg_version(cur):
cur.execute("SELECT setting::int AS version FROM pg_settings WHERE name='server_version_num'")
version = cur.fetchone()[0]
LOGGER.debug("Detected PostgreSQL version: %s", version)
def get_pg_version(conn_info):
with post_db.open_connection(conn_info, False) as conn:
with conn.cursor() as cur:
cur.execute("SELECT setting::int AS version FROM pg_settings WHERE name='server_version_num'")
version = cur.fetchone()[0]
LOGGER.debug("Detected PostgreSQL version: {}".format(version))
return version


Expand Down Expand Up @@ -60,25 +62,24 @@ def int_to_lsn(lsni):


def fetch_current_lsn(conn_config):
version = get_pg_version(conn_config)
# Make sure PostgreSQL version is 9.4 or higher
# Do not allow minor versions with PostgreSQL BUG #15114
if (version >= 110000) and (version < 110002):
raise Exception('PostgreSQL upgrade required to minor version 11.2')
elif (version >= 100000) and (version < 100007):
raise Exception('PostgreSQL upgrade required to minor version 10.7')
elif (version >= 90600) and (version < 90612):
raise Exception('PostgreSQL upgrade required to minor version 9.6.12')
elif (version >= 90500) and (version < 90516):
raise Exception('PostgreSQL upgrade required to minor version 9.5.16')
elif (version >= 90400) and (version < 90421):
raise Exception('PostgreSQL upgrade required to minor version 9.4.21')
elif (version < 90400):
raise Exception('Logical replication not supported before PostgreSQL 9.4')

with post_db.open_connection(conn_config, False) as conn:
with conn.cursor() as cur:
# Make sure PostgreSQL version is 9.4 or higher
version = get_pg_version(cur)

# Do not allow minor versions with PostgreSQL BUG #15114
if (version >= 110000) and (version < 110002):
raise Exception('PostgreSQL upgrade required to minor version 11.2')
elif (version >= 100000) and (version < 100007):
raise Exception('PostgreSQL upgrade required to minor version 10.7')
elif (version >= 90600) and (version < 90612):
raise Exception('PostgreSQL upgrade required to minor version 9.6.12')
elif (version >= 90500) and (version < 90516):
raise Exception('PostgreSQL upgrade required to minor version 9.5.16')
elif (version >= 90400) and (version < 90421):
raise Exception('PostgreSQL upgrade required to minor version 9.4.21')
elif (version < 90400):
raise Exception('Logical replication not supported before PostgreSQL 9.4')

# Use version specific lsn command
if version >= 100000:
cur.execute("SELECT pg_current_wal_lsn() AS current_lsn")
Expand Down Expand Up @@ -388,12 +389,13 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
for s in logical_streams:
sync_common.send_schema_message(s, ['lsn'])

version = get_pg_version(conn_info)

# Create replication connection and cursor
conn = post_db.open_connection(conn_info, True)
cur = conn.cursor()

# Set session wal_sender_timeout for PG12 and above
version = get_pg_version(cur)
if (version >= 120000):
wal_sender_timeout = 10800000 #10800000ms = 3 hours
LOGGER.info("Set session wal_sender_timeout = {} milliseconds".format(wal_sender_timeout))
Expand Down

0 comments on commit 193f4d8

Please sign in to comment.