Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
remove unused timestamps (#41)
Browse files Browse the repository at this point in the history
* remove unused timestamps
* bump version
  • Loading branch information
louis-pie authored Feb 27, 2020
1 parent 03db5d1 commit 486ebd6
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 14 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
long_description = f.read()

setup(name='pipelinewise-tap-postgres',
version='1.4.0',
version='1.4.1',
description='Singer.io tap for extracting data from PostgresSQL - PipelineWise compatible',
long_description=long_description,
long_description_content_type='text/markdown',
Expand Down
26 changes: 13 additions & 13 deletions tap_postgres/sync_strategies/logical_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
cur = conn.cursor()

try:
LOGGER.info("{} : Request wal streaming from {} to {} (slot {})".format(datetime.datetime.utcnow(), int_to_lsn(start_lsn), int_to_lsn(end_lsn), slot))
LOGGER.info("Request wal streaming from {} to {} (slot {})".format(int_to_lsn(start_lsn), int_to_lsn(end_lsn), slot))
# psycopg2 2.8.4 will send a keep-alive message to postgres every status_interval
cur.start_replication(slot_name=slot, decode=True, start_lsn=start_lsn, status_interval=poll_interval, options={'write-in-chunks': 1, 'add-tables': ','.join(selected_tables)})
except psycopg2.ProgrammingError:
Expand All @@ -407,22 +407,22 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
# needs to be long enough to wait for the largest single wal payload to avoid unplanned timeouts
poll_duration = (datetime.datetime.utcnow() - lsn_received_timestamp).total_seconds()
if poll_duration > logical_poll_total_seconds:
LOGGER.info("{} : Breaking - {} seconds of polling with no data".format(datetime.datetime.utcnow(), poll_duration))
LOGGER.info("Breaking - {} seconds of polling with no data".format(poll_duration))
break

try:
msg = cur.read_message()
except Exception as e:
LOGGER.error("{} : {}".format(datetime.datetime.utcnow(), e))
LOGGER.error("{}".format(e))
raise

if msg:
if (break_at_end_lsn) and (msg.data_start > end_lsn):
LOGGER.info("{} : Breaking - latest wal message {} is past end_lsn {}".format(datetime.datetime.utcnow(), int_to_lsn(msg.data_start), int_to_lsn(end_lsn)))
LOGGER.info("Breaking - latest wal message {} is past end_lsn {}".format(int_to_lsn(msg.data_start), int_to_lsn(end_lsn)))
break

if datetime.datetime.utcnow() >= (start_run_timestamp + datetime.timedelta(seconds=max_run_seconds)):
LOGGER.info("{} : Breaking - reached max_run_seconds of {}".format(datetime.datetime.utcnow(), max_run_seconds))
LOGGER.info("Breaking - reached max_run_seconds of {}".format(max_run_seconds))
break

state = consume_message(logical_streams, state, msg, time_extracted, conn_info, end_lsn)
Expand All @@ -431,12 +431,12 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
# This is to ensure we only flush to lsn that has completed entirely
if lsn_currently_processing is None:
lsn_currently_processing = msg.data_start
LOGGER.info("{} : First wal message received is {}".format(datetime.datetime.utcnow(), int_to_lsn(lsn_currently_processing)))
LOGGER.info("First wal message received is {}".format(int_to_lsn(lsn_currently_processing)))

# Flush Postgres wal up to lsn comitted in previous run, or first lsn received in this run
lsn_to_flush = lsn_comitted
if lsn_currently_processing < lsn_to_flush: lsn_to_flush = lsn_currently_processing
LOGGER.info("{} : Confirming write up to {}, flush to {}".format(datetime.datetime.utcnow(), int_to_lsn(lsn_to_flush), int_to_lsn(lsn_to_flush)))
LOGGER.info("Confirming write up to {}, flush to {}".format(int_to_lsn(lsn_to_flush), int_to_lsn(lsn_to_flush)))
cur.send_feedback(write_lsn=lsn_to_flush, flush_lsn=lsn_to_flush, reply=True, force=True)

elif (int(msg.data_start) > lsn_currently_processing):
Expand All @@ -445,7 +445,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
lsn_received_timestamp = datetime.datetime.utcnow()
lsn_processed_count = lsn_processed_count + 1
if lsn_processed_count >= UPDATE_BOOKMARK_PERIOD:
LOGGER.debug("{} : Updating bookmarks for all streams to lsn = {} ({})".format(datetime.datetime.utcnow(), lsn_last_processed, int_to_lsn(lsn_last_processed)))
LOGGER.debug("Updating bookmarks for all streams to lsn = {} ({})".format(lsn_last_processed, int_to_lsn(lsn_last_processed)))
for s in logical_streams:
state = singer.write_bookmark(state, s['tap_stream_id'], 'lsn', lsn_last_processed)
singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))
Expand All @@ -454,19 +454,19 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
# Every poll_interval, update latest comitted lsn position from the state_file
if datetime.datetime.utcnow() >= (poll_timestamp + datetime.timedelta(seconds=poll_interval)):
if lsn_currently_processing is None:
LOGGER.info("{} : Waiting for first wal message".format(datetime.datetime.utcnow()))
LOGGER.info("Waiting for first wal message")
else:
LOGGER.info("{} : Lastest wal message received was {}".format(datetime.datetime.utcnow(), int_to_lsn(lsn_last_processed)))
LOGGER.info("Lastest wal message received was {}".format(int_to_lsn(lsn_last_processed)))
try:
state_comitted_file = open(state_file)
state_comitted = json.load(state_comitted_file)
except:
LOGGER.info("{} : Unable to open and parse {}".format(datetime.datetime.utcnow(), state_file))
LOGGER.info("Unable to open and parse {}".format(state_file))
finally:
lsn_comitted = min([get_bookmark(state_comitted, s['tap_stream_id'], 'lsn') for s in logical_streams])
if (lsn_currently_processing > lsn_comitted) and (lsn_comitted > lsn_to_flush):
lsn_to_flush = lsn_comitted
LOGGER.info("{} : Confirming write up to {}, flush to {}".format(datetime.datetime.utcnow(), int_to_lsn(lsn_to_flush), int_to_lsn(lsn_to_flush)))
LOGGER.info("Confirming write up to {}, flush to {}".format(int_to_lsn(lsn_to_flush), int_to_lsn(lsn_to_flush)))
cur.send_feedback(write_lsn=lsn_to_flush, flush_lsn=lsn_to_flush, reply=True, force=True)

poll_timestamp = datetime.datetime.utcnow()
Expand All @@ -480,7 +480,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
lsn_last_processed = lsn_comitted
LOGGER.info("Current lsn_last_processed {} is older than lsn_comitted {}".format(int_to_lsn(lsn_last_processed), int_to_lsn(lsn_comitted)))

LOGGER.info("{} : Updating bookmarks for all streams to lsn = {} ({})".format(datetime.datetime.utcnow(), lsn_last_processed, int_to_lsn(lsn_last_processed)))
LOGGER.info("Updating bookmarks for all streams to lsn = {} ({})".format(lsn_last_processed, int_to_lsn(lsn_last_processed)))
for s in logical_streams:
state = singer.write_bookmark(state, s['tap_stream_id'], 'lsn', lsn_last_processed)

Expand Down

0 comments on commit 486ebd6

Please sign in to comment.