Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
deanmorin committed Jan 28, 2022
1 parent 443e67d commit aa05bad
Showing 1 changed file with 14 additions and 3 deletions.
17 changes: 14 additions & 3 deletions tap_postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ def sync_traditional_stream(conn_config, stream, state, sync_method, end_lsn):
return state


def sync_logical_streams(conn_config, logical_streams, state, end_lsn, state_file):
# pylint: disable=too-many-arguments
def sync_logical_streams(conn_config, logical_streams, traditional_streams, state, end_lsn, state_file):
"""
Sync streams that use LOG_BASED method
"""
Expand All @@ -212,10 +213,20 @@ def sync_logical_streams(conn_config, logical_streams, state, end_lsn, state_fil
selected_streams.add(stream['tap_stream_id'])

new_state = dict(currently_syncing=state['currently_syncing'], bookmarks={})
traditional_stream_ids = [s['tap_stream_id'] for s in traditional_streams]

for stream, bookmark in state['bookmarks'].items():
if bookmark == {} or bookmark['last_replication_method'] != 'LOG_BASED' or stream in selected_streams:
if (
bookmark == {}
or bookmark['last_replication_method'] != 'LOG_BASED'
or stream in selected_streams
# The first time a LOG_BASED stream runs it needs to do an
# initial full table sync, and so will be treated as a
# traditional stream.
or (stream in traditional_stream_ids and bookmark['last_replication_method'] == 'LOG_BASED')
):
new_state['bookmarks'][stream] = bookmark

state = new_state

state = logical_replication.sync_tables(conn_config, logical_streams, state, end_lsn, state_file)
Expand Down Expand Up @@ -319,7 +330,7 @@ def do_sync(conn_config, catalog, default_replication_method, state, state_file=
for dbname, streams in itertools.groupby(logical_streams,
lambda s: metadata.to_map(s['metadata']).get(()).get('database-name')):
conn_config['dbname'] = dbname
state = sync_logical_streams(conn_config, list(streams), state, end_lsn, state_file)
state = sync_logical_streams(conn_config, list(streams), traditional_streams, state, end_lsn, state_file)
return state


Expand Down

0 comments on commit aa05bad

Please sign in to comment.