From 4a8ab42ee84ff9dc35cc3b3747f35aa1d4397cb4 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 5 Apr 2022 14:46:01 +0100 Subject: [PATCH 1/9] Make `create_replication_slot` return consistent LSN This change ensures that the LSN at which the replication slot is first consistent is availiable to the caller of the method. --- pipelinewise/fastsync/commons/tap_postgres.py | 36 ++++++++++--------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/pipelinewise/fastsync/commons/tap_postgres.py b/pipelinewise/fastsync/commons/tap_postgres.py index c1d46b03a..e5a7cc140 100644 --- a/pipelinewise/fastsync/commons/tap_postgres.py +++ b/pipelinewise/fastsync/commons/tap_postgres.py @@ -4,6 +4,7 @@ import re import sys import psycopg2 +import psycopg2.errors import psycopg2.extras from typing import Dict @@ -221,7 +222,7 @@ def primary_host_query(self, query, params=None): return [] # pylint: disable=no-member - def create_replication_slot(self): + def create_replication_slot(self) -> int: """ Create replication slot on the primary host @@ -236,24 +237,21 @@ def create_replication_slot(self): format you won't be able to do LOG_BASED replication from the same postgres database by multiple taps. If that the case then you need to drop the old replication slot and full-resync the new taps. + + Returns the LSN at which the replication slot is consistent. """ - try: - slot_name = self.__get_slot_name( - self.primary_host_conn, - self.connection_config['dbname'], - self.connection_config['tap_id'], - ) + slot_name = self.__get_slot_name( + self.primary_host_conn, + self.connection_config['dbname'], + self.connection_config['tap_id'], + ) - # Create the replication host + # Create the replication host + return parse_lsn( self.primary_host_query( f"SELECT * FROM pg_create_logical_replication_slot('{slot_name}', 'wal2json')" - ) - except Exception as exc: - # ERROR: replication slot already exists SQL state: 42710 - if hasattr(exc, 'pgcode') and exc.pgcode == '42710': - pass - else: - raise exc + )[0]['lsn'] + ) # pylint: disable=too-many-branches,no-member,chained-comparison def fetch_current_log_pos(self): @@ -287,8 +285,12 @@ def fetch_current_log_pos(self): if version < 90400: raise Exception('Logical replication not supported before PostgreSQL 9.4') - # Create replication slot - self.create_replication_slot() + try: + # Create replication slot. + self.create_replication_slot() + except psycopg2.errors.DuplicateObject: + # Replication slot exists already so continue. + pass # Close replication slot dedicated connection self.primary_host_conn.close() From 8ee54937557487380bb2ee12b40fd37dc1650b8f Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 5 Apr 2022 14:57:04 +0100 Subject: [PATCH 2/9] Refactor fetching current LSN into method --- pipelinewise/fastsync/commons/tap_postgres.py | 70 ++++++++++--------- 1 file changed, 36 insertions(+), 34 deletions(-) diff --git a/pipelinewise/fastsync/commons/tap_postgres.py b/pipelinewise/fastsync/commons/tap_postgres.py index e5a7cc140..5dad43cb5 100644 --- a/pipelinewise/fastsync/commons/tap_postgres.py +++ b/pipelinewise/fastsync/commons/tap_postgres.py @@ -29,6 +29,7 @@ def __init__(self, connection_config, tap_type_to_target_type, target_quote=None self.curr = None self.primary_host_conn = None self.primary_host_curr = None + self.version = None @staticmethod def generate_replication_slot_name(dbname, tap_id=None, prefix='pipelinewise'): @@ -253,6 +254,33 @@ def create_replication_slot(self) -> int: )[0]['lsn'] ) + def get_current_lsn(self) -> int: + """Obtain the most recent LSN availiable.""" + # is replica_host set ? + if self.connection_config.get('replica_host'): + # Get latest applied lsn from replica_host + if self.version >= 100000: + result = self.query('SELECT pg_last_wal_replay_lsn() AS current_lsn') + elif self.version >= 90400: + result = self.query( + 'SELECT pg_last_xlog_replay_location() AS current_lsn' + ) + else: + raise Exception( + 'Logical replication not supported before PostgreSQL 9.4' + ) + else: + # Get current lsn from primary host + if self.version >= 100000: + result = self.query('SELECT pg_current_wal_lsn() AS current_lsn') + elif self.version >= 90400: + result = self.query('SELECT pg_current_xlog_location() AS current_lsn') + else: + raise Exception( + 'Logical replication not supported before PostgreSQL 9.4' + ) + return parse_lsn(result[0]['current_lsn']) + # pylint: disable=too-many-branches,no-member,chained-comparison def fetch_current_log_pos(self): """ @@ -269,20 +297,20 @@ def fetch_current_log_pos(self): result = self.primary_host_query( "SELECT setting::int AS version FROM pg_settings WHERE name='server_version_num'" ) - version = result[0].get('version') + self.version = result[0].get('version') # Do not allow minor versions with PostgreSQL BUG #15114 - if (version >= 110000) and (version < 110002): + if (self.version >= 110000) and (self.version < 110002): raise Exception('PostgreSQL upgrade required to minor version 11.2') - if (version >= 100000) and (version < 100007): + if (self.version >= 100000) and (self.version < 100007): raise Exception('PostgreSQL upgrade required to minor version 10.7') - if (version >= 90600) and (version < 90612): + if (self.version >= 90600) and (self.version < 90612): raise Exception('PostgreSQL upgrade required to minor version 9.6.12') - if (version >= 90500) and (version < 90516): + if (self.version >= 90500) and (self.version < 90516): raise Exception('PostgreSQL upgrade required to minor version 9.5.16') - if (version >= 90400) and (version < 90421): + if (self.version >= 90400) and (self.version < 90421): raise Exception('PostgreSQL upgrade required to minor version 9.4.21') - if version < 90400: + if self.version < 90400: raise Exception('Logical replication not supported before PostgreSQL 9.4') try: @@ -295,33 +323,7 @@ def fetch_current_log_pos(self): # Close replication slot dedicated connection self.primary_host_conn.close() - # is replica_host set ? - if self.connection_config.get('replica_host'): - # Get latest applied lsn from replica_host - if version >= 100000: - result = self.query('SELECT pg_last_wal_replay_lsn() AS current_lsn') - elif version >= 90400: - result = self.query( - 'SELECT pg_last_xlog_replay_location() AS current_lsn' - ) - else: - raise Exception( - 'Logical replication not supported before PostgreSQL 9.4' - ) - else: - # Get current lsn from primary host - if version >= 100000: - result = self.query('SELECT pg_current_wal_lsn() AS current_lsn') - elif version >= 90400: - result = self.query('SELECT pg_current_xlog_location() AS current_lsn') - else: - raise Exception( - 'Logical replication not supported before PostgreSQL 9.4' - ) - - current_lsn = result[0].get('current_lsn') - file, index = current_lsn.split('/') - lsn = (int(file, 16) << 32) + int(index, 16) + lsn = self.get_current_lsn() return {'lsn': lsn, 'version': 1} From 0e91b926cba6b4b39e7879f6297b0176a540d8cb Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 5 Apr 2022 15:05:15 +0100 Subject: [PATCH 3/9] Add method to get `confirmed_flush_lsn` --- pipelinewise/fastsync/commons/tap_postgres.py | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/pipelinewise/fastsync/commons/tap_postgres.py b/pipelinewise/fastsync/commons/tap_postgres.py index 5dad43cb5..2fdba4c02 100644 --- a/pipelinewise/fastsync/commons/tap_postgres.py +++ b/pipelinewise/fastsync/commons/tap_postgres.py @@ -3,6 +3,7 @@ import logging import re import sys +import textwrap import psycopg2 import psycopg2.errors import psycopg2.extras @@ -281,6 +282,33 @@ def get_current_lsn(self) -> int: ) return parse_lsn(result[0]['current_lsn']) + def get_confirmed_flush_lsn(self) -> int: + """ + Get the last flushed LSN for the replication slot. + + For Postgres <9.6 this defaults to the restart_lsn as confirmed_flush_lsn + is not availiable. + """ + slot_name = self.__get_slot_name( + self.primary_host_conn, + self.connection_config['dbname'], + self.connection_config['tap_id'], + ) + + res = self.primary_host_query( + textwrap.dedent( + f"""SELECT * + FROM pg_replication_slots + WHERE slot_name = '{slot_name}' + AND plugin = 'wal2json' + AND slot_type = 'logical'""" + ) + )[0] + + # confirmed_flush_lsn was introduced in Postgres 9.6 so fallback to + # restart_lsn if needed. + return parse_lsn(res.get('confirmed_flush_lsn', res['restart_lsn'])) + # pylint: disable=too-many-branches,no-member,chained-comparison def fetch_current_log_pos(self): """ From b0fb60f37506293997381aba58a2dd8b5f6777b5 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 5 Apr 2022 15:18:59 +0100 Subject: [PATCH 4/9] Ensure that replica is ahead of replication slot There is a replication lag between a primary and replica database. We get information about what data the replication slot can provide from the primary but perform the initial data snapshot on the replica. Therefore, we must make sure that the replica has caught up to at least the LSN which we are going to begin streaming from the replication slot. --- pipelinewise/fastsync/commons/tap_postgres.py | 28 +++++++++++++++---- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/pipelinewise/fastsync/commons/tap_postgres.py b/pipelinewise/fastsync/commons/tap_postgres.py index 2fdba4c02..c5c69ae68 100644 --- a/pipelinewise/fastsync/commons/tap_postgres.py +++ b/pipelinewise/fastsync/commons/tap_postgres.py @@ -4,6 +4,7 @@ import re import sys import textwrap +import time import psycopg2 import psycopg2.errors import psycopg2.extras @@ -342,18 +343,33 @@ def fetch_current_log_pos(self): raise Exception('Logical replication not supported before PostgreSQL 9.4') try: - # Create replication slot. - self.create_replication_slot() + # Create replication slot and obtain the oldest LSN + # that it can deliver. + oldest_lsn = self.create_replication_slot() except psycopg2.errors.DuplicateObject: - # Replication slot exists already so continue. - pass + # Replication slot exists already so just get the + # oldest LSN which the slot can deliver. + oldest_lsn = self.get_confirmed_flush_lsn() # Close replication slot dedicated connection self.primary_host_conn.close() - lsn = self.get_current_lsn() + current_lsn = self.get_current_lsn() - return {'lsn': lsn, 'version': 1} + if self.connection_config.get('replica_host'): + # Ensure that the newest LSN availiable on the replica is newer than + # the oldest LSN which is deliverable by the replication slot. + time_waited = 0 # seconds + wait_period = 1 # seconds + max_time_waited = 60 # seconds + while oldest_lsn > current_lsn: + time.sleep(wait_period) + time_waited += wait_period + if time_waited > max_time_waited: + raise RuntimeError('Replica database is lagging too far behind Primary.') + current_lsn = self.get_current_lsn() + + return {'lsn': current_lsn, 'version': 1} # pylint: disable=invalid-name def fetch_current_incremental_key_pos(self, table, replication_key): From d78347bcfe4a5881a20498846fe0238433924814 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 5 Apr 2022 15:26:57 +0100 Subject: [PATCH 5/9] Remove unneccesary attributes --- pipelinewise/fastsync/commons/tap_postgres.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/pipelinewise/fastsync/commons/tap_postgres.py b/pipelinewise/fastsync/commons/tap_postgres.py index c5c69ae68..e605113fa 100644 --- a/pipelinewise/fastsync/commons/tap_postgres.py +++ b/pipelinewise/fastsync/commons/tap_postgres.py @@ -28,9 +28,7 @@ def __init__(self, connection_config, tap_type_to_target_type, target_quote=None self.tap_type_to_target_type = tap_type_to_target_type self.target_quote = target_quote self.conn = None - self.curr = None self.primary_host_conn = None - self.primary_host_curr = None self.version = None @staticmethod @@ -188,7 +186,6 @@ def open_connection(self): self.conn = self.get_connection( self.connection_config, prioritize_primary=False ) - self.curr = self.conn.cursor() def close_connection(self): """ @@ -320,7 +317,6 @@ def fetch_current_log_pos(self): self.primary_host_conn = self.get_connection( self.connection_config, prioritize_primary=True ) - self.primary_host_curr = self.primary_host_conn.cursor() # Make sure PostgreSQL version is 9.4 or higher result = self.primary_host_query( @@ -561,4 +557,5 @@ def copy_table( ) with gzip_splitter as split_gzip_files: - self.curr.copy_expert(sql, split_gzip_files, size=131072) + with self.conn.cursor() as cur: + cur.copy_expert(sql, split_gzip_files, size=131072) From d1f7ece74a3a387b82657769f53b5e7e0e4e18cc Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 5 Apr 2022 15:35:20 +0100 Subject: [PATCH 6/9] Satisfy Pylint that there aren't too many variables --- pipelinewise/fastsync/commons/tap_postgres.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pipelinewise/fastsync/commons/tap_postgres.py b/pipelinewise/fastsync/commons/tap_postgres.py index e605113fa..6559a0515 100644 --- a/pipelinewise/fastsync/commons/tap_postgres.py +++ b/pipelinewise/fastsync/commons/tap_postgres.py @@ -529,8 +529,9 @@ def copy_table( split_file_chunk_size_mb: File chunk sizes if `split_large_files` enabled. (Default: 1000) split_file_max_chunks: Max number of chunks if `split_large_files` enabled. (Default: 20) """ - table_columns = self.get_table_columns(table_name, max_num, date_type) - column_safe_sql_values = [c.get('safe_sql_value') for c in table_columns] + column_safe_sql_values = [ + c.get('safe_sql_value') for c in self.get_table_columns(table_name, max_num, date_type) + ] # If self.get_table_columns returns zero row then table not exist if len(column_safe_sql_values) == 0: From f40d130293581122e126c5f426b99d92313d9ed3 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 5 Apr 2022 16:12:55 +0100 Subject: [PATCH 7/9] Update tests --- .../commons/test_fastsync_tap_postgres.py | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/tests/units/fastsync/commons/test_fastsync_tap_postgres.py b/tests/units/fastsync/commons/test_fastsync_tap_postgres.py index 7dafa9b34..dee154041 100644 --- a/tests/units/fastsync/commons/test_fastsync_tap_postgres.py +++ b/tests/units/fastsync/commons/test_fastsync_tap_postgres.py @@ -21,11 +21,6 @@ def setUp(self) -> None: self.postgres.executed_queries_primary_host = [] self.postgres.executed_queries = [] - def primary_host_query_mock(query, _=None): - self.postgres.executed_queries_primary_host.append(query) - - self.postgres.primary_host_query = primary_host_query_mock - def test_generate_repl_slot_name(self): """Validate if the replication slot name generated correctly""" # Provide only database name @@ -70,17 +65,20 @@ def test_create_replication_slot_1(self): Validate if replication slot creation SQL commands generated correctly in case no v15 slots exists """ - def execute_mock(query): + def execute_mock(query, _=None): print('Mocked execute called') self.postgres.executed_queries_primary_host.append(query) # mock cursor with execute method cursor_mock = MagicMock().return_value cursor_mock.__enter__.return_value.execute.side_effect = execute_mock - type(cursor_mock.__enter__.return_value).rowcount = PropertyMock(return_value=0) + # First query -> 0 rows, second query -> 1 row + type(cursor_mock.__enter__.return_value).rowcount = PropertyMock(side_effect=[0, 1]) + cursor_mock.__enter__.return_value.fetchall.return_value = [{'lsn': '24368/44107178'}] # mock PG connection instance with ability to open cursor - pg_con = Mock() + pg_con = MagicMock() + pg_con.__enter__.return_value = pg_con pg_con.cursor.return_value = cursor_mock self.postgres.primary_host_conn = pg_con @@ -96,17 +94,20 @@ def test_create_replication_slot_2(self): Validate if replication slot creation SQL commands generated correctly in case a v15 slots exists """ - def execute_mock(query): + def execute_mock(query, _=None): print('Mocked execute called') self.postgres.executed_queries_primary_host.append(query) # mock cursor with execute method cursor_mock = MagicMock().return_value cursor_mock.__enter__.return_value.execute.side_effect = execute_mock - type(cursor_mock.__enter__.return_value).rowcount = PropertyMock(return_value=1) + # First query -> 1 row, second query -> 1 row + type(cursor_mock.__enter__.return_value).rowcount = PropertyMock(side_effect=[1, 1]) + cursor_mock.__enter__.return_value.fetchall.return_value = [{'lsn': '24368/44107178'}] # mock PG connection instance with ability to open cursor - pg_con = Mock() + pg_con = MagicMock() + pg_con.__enter__.return_value = pg_con pg_con.cursor.return_value = cursor_mock self.postgres.primary_host_conn = pg_con From 0d6fb6334a6118739bd91f01ff278b5918efc228 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 19 Apr 2022 09:19:44 +0100 Subject: [PATCH 8/9] Fix call to `textwrap.dedent` --- pipelinewise/fastsync/commons/tap_postgres.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pipelinewise/fastsync/commons/tap_postgres.py b/pipelinewise/fastsync/commons/tap_postgres.py index 6559a0515..70346c1b4 100644 --- a/pipelinewise/fastsync/commons/tap_postgres.py +++ b/pipelinewise/fastsync/commons/tap_postgres.py @@ -295,7 +295,8 @@ def get_confirmed_flush_lsn(self) -> int: res = self.primary_host_query( textwrap.dedent( - f"""SELECT * + f"""\ + SELECT * FROM pg_replication_slots WHERE slot_name = '{slot_name}' AND plugin = 'wal2json' From f795486b2f397c1c1f9579b97d2907e1fdabc0c4 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 19 Apr 2022 09:41:22 +0100 Subject: [PATCH 9/9] Timeout better --- pipelinewise/fastsync/commons/tap_postgres.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/pipelinewise/fastsync/commons/tap_postgres.py b/pipelinewise/fastsync/commons/tap_postgres.py index 70346c1b4..2c2e6fdba 100644 --- a/pipelinewise/fastsync/commons/tap_postgres.py +++ b/pipelinewise/fastsync/commons/tap_postgres.py @@ -356,13 +356,10 @@ def fetch_current_log_pos(self): if self.connection_config.get('replica_host'): # Ensure that the newest LSN availiable on the replica is newer than # the oldest LSN which is deliverable by the replication slot. - time_waited = 0 # seconds - wait_period = 1 # seconds max_time_waited = 60 # seconds + now = time.time() while oldest_lsn > current_lsn: - time.sleep(wait_period) - time_waited += wait_period - if time_waited > max_time_waited: + if time.time() - now > max_time_waited: raise RuntimeError('Replica database is lagging too far behind Primary.') current_lsn = self.get_current_lsn()