diff --git a/tap_postgres/sync_strategies/logical_replication.py b/tap_postgres/sync_strategies/logical_replication.py index 1d28eede..afe2442e 100644 --- a/tap_postgres/sync_strategies/logical_replication.py +++ b/tap_postgres/sync_strategies/logical_replication.py @@ -279,9 +279,6 @@ def consume_message(streams, state, msg, time_extracted, conn_info, end_lsn): singer.write_message(record_message) state = singer.write_bookmark(state, target_stream['tap_stream_id'], 'lsn', lsn) - LOGGER.debug("Sending keep-alive to server with NO flush_lsn") - msg.cursor.send_feedback() - if msg.data_start > end_lsn: raise Exception("incorrectly attempting to flush an lsn({}) > end_lsn({})".format(msg.data_start, end_lsn)) @@ -312,7 +309,10 @@ def sync_tables(conn_info, logical_streams, state, end_lsn): #When no data is received, poll every 5 seconds for 15 seconds total poll_interval = 5.0 poll_total_seconds = conn_info['logical_poll_total_seconds'] or 15 - begin_ts = datetime.datetime.now() + begin_timestamp = datetime.datetime.now() + # When data is received, send feedback to keep connection open + feedback_interval = 5.0 + feedback_timestamp = datetime.datetime.now() for s in logical_streams: sync_common.send_schema_message(s, ['lsn']) @@ -331,14 +331,14 @@ def sync_tables(conn_info, logical_streams, state, end_lsn): wal_entries_processed = 0 while True: - poll_duration = (datetime.datetime.now() - begin_ts).total_seconds() + poll_duration = (datetime.datetime.now() - begin_timestamp).total_seconds() if poll_duration > poll_total_seconds: LOGGER.info("Breaking after %s seconds of polling with no data", poll_duration) break msg = cur.read_message() if msg: - begin_ts = datetime.datetime.now() + begin_timestamp = datetime.datetime.now() if msg.data_start > end_lsn: LOGGER.info("Gone past end_lsn %s for run. breaking", end_lsn) break @@ -350,6 +350,12 @@ def sync_tables(conn_info, logical_streams, state, end_lsn): singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) wal_entries_processed = 0 + # Keep connection to server alive + if datetime.datetime.now() >= (feedback_timestamp + datetime.timedelta(seconds=feedback_interval)): + LOGGER.debug("Sending keep-alive to server with NO flush_lsn") + msg.cursor.send_feedback() + feedback_timestamp = datetime.datetime.now() + else: now = datetime.datetime.now() timeout = poll_interval - (now - cur.io_timestamp).total_seconds()