Skip to content

Commit

Permalink
Ensure that replica is ahead of replication slot
Browse files Browse the repository at this point in the history
There is a replication lag between a primary and replica database.
We get information about what data the replication slot can provide
from the primary but perform the initial data snapshot on the replica.
Therefore, we must make sure that the replica has caught up to at
least the LSN which we are going to begin streaming from the
replication slot.
  • Loading branch information
judahrand committed Apr 5, 2022
1 parent 6dc27f2 commit 03f5505
Showing 1 changed file with 22 additions and 6 deletions.
28 changes: 22 additions & 6 deletions pipelinewise/fastsync/commons/tap_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import re
import sys
import textwrap
import time
import psycopg2
import psycopg2.errors
import psycopg2.extras
Expand Down Expand Up @@ -348,18 +349,33 @@ def fetch_current_log_pos(self):
raise Exception('Logical replication not supported before PostgreSQL 9.4')

try:
# Create replication slot.
self.create_replication_slot()
# Create replication slot and obtain the oldest LSN
# that it can deliver.
oldest_lsn = self.create_replication_slot()
except psycopg2.errors.DuplicateObject:
# Replication slot exists already so continue.
pass
# Replication slot exists already so just get the
# oldest LSN which the slot can deliver.
oldest_lsn = self.get_confirmed_flush_lsn()

# Close replication slot dedicated connection
self.primary_host_conn.close()

lsn = self.get_current_lsn()
current_lsn = self.get_current_lsn()

return {'lsn': lsn, 'version': 1}
if self.connection_config.get('replica_host'):
# Ensure that the newest LSN availiable on the replica is newer than
# the oldest LSN which is deliverable by the replication slot.
time_waited = 0 # seconds
wait_period = 1 # seconds
max_time_waited = 60 # seconds
while oldest_lsn > current_lsn:
time.sleep(wait_period)
time_waited += wait_period
if time_waited > max_time_waited:
raise RuntimeError('Replica database is lagging too far behind Primary.')
current_lsn = self.get_current_lsn()

return {'lsn': current_lsn, 'version': 1}

# pylint: disable=invalid-name
def fetch_current_incremental_key_pos(self, table, replication_key):
Expand Down

0 comments on commit 03f5505

Please sign in to comment.