From fa48041a464c33257684e7ade254d7324ced0b64 Mon Sep 17 00:00:00 2001 From: ashishparimi Date: Wed, 10 Apr 2024 08:55:17 -0700 Subject: [PATCH] Apply fixes from community Pr: https://github.com/transferwise/pipelinewise-tap-postgres/pull/219 --- tap_postgres/__init__.py | 4 ++++ tap_postgres/sync_strategies/logical_replication.py | 3 ++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/tap_postgres/__init__.py b/tap_postgres/__init__.py index bd52be3..d20c275 100644 --- a/tap_postgres/__init__.py +++ b/tap_postgres/__init__.py @@ -131,6 +131,8 @@ def sync_method_for_streams(streams, state, default_replication_method): # finishing previously interrupted full-table (first stage of logical replication) lookup[stream['tap_stream_id']] = 'logical_initial_interrupted' traditional_steams.append(stream) + # do any required logical replication after inital sync is complete + logical_streams.append(stream) # inconsistent state elif get_bookmark(state, stream['tap_stream_id'], 'xmin') and \ @@ -142,6 +144,8 @@ def sync_method_for_streams(streams, state, default_replication_method): # initial full-table phase of logical replication lookup[stream['tap_stream_id']] = 'logical_initial' traditional_steams.append(stream) + # do any required logical replication after inital sync is complete + logical_streams.append(stream) else: # no xmin but we have an lsn # initial stage of logical replication(full-table) has been completed. moving onto pure logical replication diff --git a/tap_postgres/sync_strategies/logical_replication.py b/tap_postgres/sync_strategies/logical_replication.py index a594b5a..54c8b2f 100644 --- a/tap_postgres/sync_strategies/logical_replication.py +++ b/tap_postgres/sync_strategies/logical_replication.py @@ -702,7 +702,8 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): LOGGER.debug('Unable to open and parse %s', state_file) finally: lsn_comitted = min( - [get_bookmark(state_comitted, s['tap_stream_id'], 'lsn') for s in logical_streams]) + [get_bookmark(state_comitted, s['tap_stream_id'], 'lsn', start_lsn) + for s in logical_streams]) if (lsn_currently_processing > lsn_comitted) and (lsn_comitted > lsn_to_flush): lsn_to_flush = lsn_comitted LOGGER.info('Confirming write up to %s, flush to %s',