Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
Fixes #107
Browse files Browse the repository at this point in the history
  • Loading branch information
deanmorin committed Sep 15, 2021
1 parent 280aaf0 commit f8b4440
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions tap_postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ def sync_traditional_stream(conn_config, stream, state, sync_method, end_lsn):
return state


def sync_logical_streams(conn_config, logical_streams, state, end_lsn, state_file):
def sync_logical_streams(conn_config, logical_streams, traditional_streams, state, end_lsn, state_file):
"""
Sync streams that use LOG_BASED method
"""
Expand All @@ -212,10 +212,20 @@ def sync_logical_streams(conn_config, logical_streams, state, end_lsn, state_fil
selected_streams.add("{}".format(stream['tap_stream_id']))

new_state = dict(currently_syncing=state['currently_syncing'], bookmarks={})
traditional_stream_ids = [s['tap_stream_id'] for s in traditional_streams]

for stream, bookmark in state['bookmarks'].items():
if bookmark == {} or bookmark['last_replication_method'] != 'LOG_BASED' or stream in selected_streams:
if (
bookmark == {}
or bookmark['last_replication_method'] != 'LOG_BASED'
or stream in selected_streams
# The first time a LOG_BASED stream runs it needs to do an
# initial full table sync, and so will be treated as a
# traditional stream.
or (stream in traditional_stream_ids and bookmark['last_replication_method'] == 'LOG_BASED')
):
new_state['bookmarks'][stream] = bookmark

state = new_state

state = logical_replication.sync_tables(conn_config, logical_streams, state, end_lsn, state_file)
Expand Down Expand Up @@ -319,7 +329,7 @@ def do_sync(conn_config, catalog, default_replication_method, state, state_file=
for dbname, streams in itertools.groupby(logical_streams,
lambda s: metadata.to_map(s['metadata']).get(()).get('database-name')):
conn_config['dbname'] = dbname
state = sync_logical_streams(conn_config, list(streams), state, end_lsn, state_file)
state = sync_logical_streams(conn_config, list(streams), traditional_streams, state, end_lsn, state_file)
return state


Expand Down

0 comments on commit f8b4440

Please sign in to comment.