From 7fc8fa1bd32fefb2e1058dcfaf5d5d790c159a8d Mon Sep 17 00:00:00 2001 From: Dean Morin Date: Tue, 20 Jul 2021 14:26:37 -0700 Subject: [PATCH] Fixes https://github.com/transferwise/pipelinewise-tap-postgres/issues/107 --- tap_postgres/__init__.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/tap_postgres/__init__.py b/tap_postgres/__init__.py index 553f2d57..59d18470 100644 --- a/tap_postgres/__init__.py +++ b/tap_postgres/__init__.py @@ -194,7 +194,8 @@ def sync_traditional_stream(conn_config, stream, state, sync_method, end_lsn): return state -def sync_logical_streams(conn_config, logical_streams, state, end_lsn, state_file): +# pylint: disable=too-many-arguments +def sync_logical_streams(conn_config, logical_streams, traditional_streams, state, end_lsn, state_file): """ Sync streams that use LOG_BASED method """ @@ -212,10 +213,20 @@ def sync_logical_streams(conn_config, logical_streams, state, end_lsn, state_fil selected_streams.add("{}".format(stream['tap_stream_id'])) new_state = dict(currently_syncing=state['currently_syncing'], bookmarks={}) + traditional_stream_ids = [s['tap_stream_id'] for s in traditional_streams] for stream, bookmark in state['bookmarks'].items(): - if bookmark == {} or bookmark['last_replication_method'] != 'LOG_BASED' or stream in selected_streams: + if ( + bookmark == {} + or bookmark['last_replication_method'] != 'LOG_BASED' + or stream in selected_streams + # The first time a LOG_BASED stream runs it needs to do an + # initial full table sync, and so will be treated as a + # traditional stream. + or (stream in traditional_stream_ids and bookmark['last_replication_method'] == 'LOG_BASED') + ): new_state['bookmarks'][stream] = bookmark + state = new_state state = logical_replication.sync_tables(conn_config, logical_streams, state, end_lsn, state_file) @@ -319,7 +330,7 @@ def do_sync(conn_config, catalog, default_replication_method, state, state_file= for dbname, streams in itertools.groupby(logical_streams, lambda s: metadata.to_map(s['metadata']).get(()).get('database-name')): conn_config['dbname'] = dbname - state = sync_logical_streams(conn_config, list(streams), state, end_lsn, state_file) + state = sync_logical_streams(conn_config, list(streams), traditional_streams, state, end_lsn, state_file) return state