Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
Send feedback messages to server while prosessing wal_logs to prevent…
Browse files Browse the repository at this point in the history
… wal_sender_timeout
  • Loading branch information
louis-pie authored and koszti committed Jul 11, 2019
1 parent 443aa63 commit 56d9c3f
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions tap_postgres/sync_strategies/logical_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,6 @@ def consume_message(streams, state, msg, time_extracted, conn_info, end_lsn):

singer.write_message(record_message)
state = singer.write_bookmark(state, target_stream['tap_stream_id'], 'lsn', lsn)
LOGGER.debug("Sending keep-alive to server with NO flush_lsn")
msg.cursor.send_feedback()


if msg.data_start > end_lsn:
raise Exception("incorrectly attempting to flush an lsn({}) > end_lsn({})".format(msg.data_start, end_lsn))
Expand Down Expand Up @@ -312,7 +309,10 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):
#When no data is received, poll every 5 seconds for 15 seconds total
poll_interval = 5.0
poll_total_seconds = conn_info['logical_poll_total_seconds'] or 15
begin_ts = datetime.datetime.now()
begin_timestamp = datetime.datetime.now()
# When data is received, send feedback to keep connection open
feedback_interval = 5.0
feedback_timestamp = datetime.datetime.now()

for s in logical_streams:
sync_common.send_schema_message(s, ['lsn'])
Expand All @@ -331,14 +331,14 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):

wal_entries_processed = 0
while True:
poll_duration = (datetime.datetime.now() - begin_ts).total_seconds()
poll_duration = (datetime.datetime.now() - begin_timestamp).total_seconds()
if poll_duration > poll_total_seconds:
LOGGER.info("Breaking after %s seconds of polling with no data", poll_duration)
break

msg = cur.read_message()
if msg:
begin_ts = datetime.datetime.now()
begin_timestamp = datetime.datetime.now()
if msg.data_start > end_lsn:
LOGGER.info("Gone past end_lsn %s for run. breaking", end_lsn)
break
Expand All @@ -350,6 +350,12 @@ def sync_tables(conn_info, logical_streams, state, end_lsn):
singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))
wal_entries_processed = 0

# Keep connection to server alive
if datetime.datetime.now() >= (feedback_timestamp + datetime.timedelta(seconds=feedback_interval)):
LOGGER.debug("Sending keep-alive to server with NO flush_lsn")
msg.cursor.send_feedback()
feedback_timestamp = datetime.datetime.now()

else:
now = datetime.datetime.now()
timeout = poll_interval - (now - cur.io_timestamp).total_seconds()
Expand Down

0 comments on commit 56d9c3f

Please sign in to comment.