Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure that Postgres always get a consistent snapshot #929

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
158 changes: 101 additions & 57 deletions pipelinewise/fastsync/commons/tap_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import logging
import re
import sys
import textwrap
import time
import psycopg2
import psycopg2.errors
import psycopg2.extras

from typing import Dict
Expand All @@ -25,9 +28,8 @@ def __init__(self, connection_config, tap_type_to_target_type, target_quote=None
self.tap_type_to_target_type = tap_type_to_target_type
self.target_quote = target_quote
self.conn = None
self.curr = None
self.primary_host_conn = None
self.primary_host_curr = None
self.version = None

@staticmethod
def generate_replication_slot_name(dbname, tap_id=None, prefix='pipelinewise'):
Expand Down Expand Up @@ -184,7 +186,6 @@ def open_connection(self):
self.conn = self.get_connection(
self.connection_config, prioritize_primary=False
)
self.curr = self.conn.cursor()

def close_connection(self):
"""
Expand Down Expand Up @@ -221,7 +222,7 @@ def primary_host_query(self, query, params=None):
return []

# pylint: disable=no-member
def create_replication_slot(self):
def create_replication_slot(self) -> int:
"""
Create replication slot on the primary host

Expand All @@ -236,24 +237,76 @@ def create_replication_slot(self):
format you won't be able to do LOG_BASED replication from the same postgres
database by multiple taps. If that the case then you need to drop the old
replication slot and full-resync the new taps.

Returns the LSN at which the replication slot is consistent.
"""
try:
slot_name = self.__get_slot_name(
self.primary_host_conn,
self.connection_config['dbname'],
self.connection_config['tap_id'],
)
slot_name = self.__get_slot_name(
self.primary_host_conn,
self.connection_config['dbname'],
self.connection_config['tap_id'],
)

# Create the replication host
# Create the replication host
return parse_lsn(
self.primary_host_query(
f"SELECT * FROM pg_create_logical_replication_slot('{slot_name}', 'wal2json')"
)
except Exception as exc:
# ERROR: replication slot already exists SQL state: 42710
if hasattr(exc, 'pgcode') and exc.pgcode == '42710':
pass
)[0]['lsn']
)

def get_current_lsn(self) -> int:
"""Obtain the most recent LSN availiable."""
# is replica_host set ?
if self.connection_config.get('replica_host'):
# Get latest applied lsn from replica_host
if self.version >= 100000:
result = self.query('SELECT pg_last_wal_replay_lsn() AS current_lsn')
elif self.version >= 90400:
result = self.query(
'SELECT pg_last_xlog_replay_location() AS current_lsn'
)
else:
raise exc
raise Exception(
'Logical replication not supported before PostgreSQL 9.4'
)
else:
# Get current lsn from primary host
if self.version >= 100000:
result = self.query('SELECT pg_current_wal_lsn() AS current_lsn')
elif self.version >= 90400:
result = self.query('SELECT pg_current_xlog_location() AS current_lsn')
else:
raise Exception(
'Logical replication not supported before PostgreSQL 9.4'
)
return parse_lsn(result[0]['current_lsn'])

def get_confirmed_flush_lsn(self) -> int:
"""
Get the last flushed LSN for the replication slot.

For Postgres <9.6 this defaults to the restart_lsn as confirmed_flush_lsn
is not availiable.
"""
slot_name = self.__get_slot_name(
self.primary_host_conn,
self.connection_config['dbname'],
self.connection_config['tap_id'],
)

res = self.primary_host_query(
textwrap.dedent(
f"""\
SELECT *
FROM pg_replication_slots
WHERE slot_name = '{slot_name}'
AND plugin = 'wal2json'
AND slot_type = 'logical'"""
)
)[0]

# confirmed_flush_lsn was introduced in Postgres 9.6 so fallback to
# restart_lsn if needed.
return parse_lsn(res.get('confirmed_flush_lsn', res['restart_lsn']))

# pylint: disable=too-many-branches,no-member,chained-comparison
def fetch_current_log_pos(self):
Expand All @@ -265,63 +318,52 @@ def fetch_current_log_pos(self):
self.primary_host_conn = self.get_connection(
self.connection_config, prioritize_primary=True
)
self.primary_host_curr = self.primary_host_conn.cursor()

# Make sure PostgreSQL version is 9.4 or higher
result = self.primary_host_query(
"SELECT setting::int AS version FROM pg_settings WHERE name='server_version_num'"
)
version = result[0].get('version')
self.version = result[0].get('version')

# Do not allow minor versions with PostgreSQL BUG #15114
if (version >= 110000) and (version < 110002):
if (self.version >= 110000) and (self.version < 110002):
raise Exception('PostgreSQL upgrade required to minor version 11.2')
if (version >= 100000) and (version < 100007):
if (self.version >= 100000) and (self.version < 100007):
raise Exception('PostgreSQL upgrade required to minor version 10.7')
if (version >= 90600) and (version < 90612):
if (self.version >= 90600) and (self.version < 90612):
raise Exception('PostgreSQL upgrade required to minor version 9.6.12')
if (version >= 90500) and (version < 90516):
if (self.version >= 90500) and (self.version < 90516):
raise Exception('PostgreSQL upgrade required to minor version 9.5.16')
if (version >= 90400) and (version < 90421):
if (self.version >= 90400) and (self.version < 90421):
raise Exception('PostgreSQL upgrade required to minor version 9.4.21')
if version < 90400:
if self.version < 90400:
raise Exception('Logical replication not supported before PostgreSQL 9.4')

# Create replication slot
self.create_replication_slot()
try:
# Create replication slot and obtain the oldest LSN
# that it can deliver.
oldest_lsn = self.create_replication_slot()
except psycopg2.errors.DuplicateObject:
# Replication slot exists already so just get the
# oldest LSN which the slot can deliver.
oldest_lsn = self.get_confirmed_flush_lsn()

# Close replication slot dedicated connection
self.primary_host_conn.close()

# is replica_host set ?
if self.connection_config.get('replica_host'):
# Get latest applied lsn from replica_host
if version >= 100000:
result = self.query('SELECT pg_last_wal_replay_lsn() AS current_lsn')
elif version >= 90400:
result = self.query(
'SELECT pg_last_xlog_replay_location() AS current_lsn'
)
else:
raise Exception(
'Logical replication not supported before PostgreSQL 9.4'
)
else:
# Get current lsn from primary host
if version >= 100000:
result = self.query('SELECT pg_current_wal_lsn() AS current_lsn')
elif version >= 90400:
result = self.query('SELECT pg_current_xlog_location() AS current_lsn')
else:
raise Exception(
'Logical replication not supported before PostgreSQL 9.4'
)
current_lsn = self.get_current_lsn()

current_lsn = result[0].get('current_lsn')
file, index = current_lsn.split('/')
lsn = (int(file, 16) << 32) + int(index, 16)
if self.connection_config.get('replica_host'):
# Ensure that the newest LSN availiable on the replica is newer than
# the oldest LSN which is deliverable by the replication slot.
max_time_waited = 60 # seconds
now = time.time()
while oldest_lsn > current_lsn:
if time.time() - now > max_time_waited:
raise RuntimeError('Replica database is lagging too far behind Primary.')
current_lsn = self.get_current_lsn()

return {'lsn': lsn, 'version': 1}
return {'lsn': current_lsn, 'version': 1}

# pylint: disable=invalid-name
def fetch_current_incremental_key_pos(self, table, replication_key):
Expand Down Expand Up @@ -485,8 +527,9 @@ def copy_table(
split_file_chunk_size_mb: File chunk sizes if `split_large_files` enabled. (Default: 1000)
split_file_max_chunks: Max number of chunks if `split_large_files` enabled. (Default: 20)
"""
table_columns = self.get_table_columns(table_name, max_num, date_type)
column_safe_sql_values = [c.get('safe_sql_value') for c in table_columns]
column_safe_sql_values = [
c.get('safe_sql_value') for c in self.get_table_columns(table_name, max_num, date_type)
]

# If self.get_table_columns returns zero row then table not exist
if len(column_safe_sql_values) == 0:
Expand All @@ -513,4 +556,5 @@ def copy_table(
)

with gzip_splitter as split_gzip_files:
self.curr.copy_expert(sql, split_gzip_files, size=131072)
with self.conn.cursor() as cur:
cur.copy_expert(sql, split_gzip_files, size=131072)
23 changes: 12 additions & 11 deletions tests/units/fastsync/commons/test_fastsync_tap_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@ def setUp(self) -> None:
self.postgres.executed_queries_primary_host = []
self.postgres.executed_queries = []

def primary_host_query_mock(query, _=None):
self.postgres.executed_queries_primary_host.append(query)

self.postgres.primary_host_query = primary_host_query_mock

def test_generate_repl_slot_name(self):
"""Validate if the replication slot name generated correctly"""
# Provide only database name
Expand Down Expand Up @@ -70,17 +65,20 @@ def test_create_replication_slot_1(self):
Validate if replication slot creation SQL commands generated correctly in case no v15 slots exists
"""

def execute_mock(query):
def execute_mock(query, _=None):
print('Mocked execute called')
self.postgres.executed_queries_primary_host.append(query)

# mock cursor with execute method
cursor_mock = MagicMock().return_value
cursor_mock.__enter__.return_value.execute.side_effect = execute_mock
type(cursor_mock.__enter__.return_value).rowcount = PropertyMock(return_value=0)
# First query -> 0 rows, second query -> 1 row
type(cursor_mock.__enter__.return_value).rowcount = PropertyMock(side_effect=[0, 1])
cursor_mock.__enter__.return_value.fetchall.return_value = [{'lsn': '24368/44107178'}]

# mock PG connection instance with ability to open cursor
pg_con = Mock()
pg_con = MagicMock()
pg_con.__enter__.return_value = pg_con
pg_con.cursor.return_value = cursor_mock

self.postgres.primary_host_conn = pg_con
Expand All @@ -96,17 +94,20 @@ def test_create_replication_slot_2(self):
Validate if replication slot creation SQL commands generated correctly in case a v15 slots exists
"""

def execute_mock(query):
def execute_mock(query, _=None):
print('Mocked execute called')
self.postgres.executed_queries_primary_host.append(query)

# mock cursor with execute method
cursor_mock = MagicMock().return_value
cursor_mock.__enter__.return_value.execute.side_effect = execute_mock
type(cursor_mock.__enter__.return_value).rowcount = PropertyMock(return_value=1)
# First query -> 1 row, second query -> 1 row
type(cursor_mock.__enter__.return_value).rowcount = PropertyMock(side_effect=[1, 1])
cursor_mock.__enter__.return_value.fetchall.return_value = [{'lsn': '24368/44107178'}]

# mock PG connection instance with ability to open cursor
pg_con = Mock()
pg_con = MagicMock()
pg_con.__enter__.return_value = pg_con
pg_con.cursor.return_value = cursor_mock

self.postgres.primary_host_conn = pg_con
Expand Down