diff --git a/setup.py b/setup.py index 453468f3..d73b0b06 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ long_description = f.read() setup(name='pipelinewise-tap-postgres', - version='1.4.0', + version='1.4.1', description='Singer.io tap for extracting data from PostgresSQL - PipelineWise compatible', long_description=long_description, long_description_content_type='text/markdown', diff --git a/tap_postgres/sync_strategies/logical_replication.py b/tap_postgres/sync_strategies/logical_replication.py index 3d57afe4..4ddc5497 100644 --- a/tap_postgres/sync_strategies/logical_replication.py +++ b/tap_postgres/sync_strategies/logical_replication.py @@ -393,7 +393,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): cur = conn.cursor() try: - LOGGER.info("{} : Request wal streaming from {} to {} (slot {})".format(datetime.datetime.utcnow(), int_to_lsn(start_lsn), int_to_lsn(end_lsn), slot)) + LOGGER.info("Request wal streaming from {} to {} (slot {})".format(int_to_lsn(start_lsn), int_to_lsn(end_lsn), slot)) # psycopg2 2.8.4 will send a keep-alive message to postgres every status_interval cur.start_replication(slot_name=slot, decode=True, start_lsn=start_lsn, status_interval=poll_interval, options={'write-in-chunks': 1, 'add-tables': ','.join(selected_tables)}) except psycopg2.ProgrammingError: @@ -407,22 +407,22 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): # needs to be long enough to wait for the largest single wal payload to avoid unplanned timeouts poll_duration = (datetime.datetime.utcnow() - lsn_received_timestamp).total_seconds() if poll_duration > logical_poll_total_seconds: - LOGGER.info("{} : Breaking - {} seconds of polling with no data".format(datetime.datetime.utcnow(), poll_duration)) + LOGGER.info("Breaking - {} seconds of polling with no data".format(poll_duration)) break try: msg = cur.read_message() except Exception as e: - LOGGER.error("{} : {}".format(datetime.datetime.utcnow(), e)) + LOGGER.error("{}".format(e)) raise if msg: if (break_at_end_lsn) and (msg.data_start > end_lsn): - LOGGER.info("{} : Breaking - latest wal message {} is past end_lsn {}".format(datetime.datetime.utcnow(), int_to_lsn(msg.data_start), int_to_lsn(end_lsn))) + LOGGER.info("Breaking - latest wal message {} is past end_lsn {}".format(int_to_lsn(msg.data_start), int_to_lsn(end_lsn))) break if datetime.datetime.utcnow() >= (start_run_timestamp + datetime.timedelta(seconds=max_run_seconds)): - LOGGER.info("{} : Breaking - reached max_run_seconds of {}".format(datetime.datetime.utcnow(), max_run_seconds)) + LOGGER.info("Breaking - reached max_run_seconds of {}".format(max_run_seconds)) break state = consume_message(logical_streams, state, msg, time_extracted, conn_info, end_lsn) @@ -431,12 +431,12 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): # This is to ensure we only flush to lsn that has completed entirely if lsn_currently_processing is None: lsn_currently_processing = msg.data_start - LOGGER.info("{} : First wal message received is {}".format(datetime.datetime.utcnow(), int_to_lsn(lsn_currently_processing))) + LOGGER.info("First wal message received is {}".format(int_to_lsn(lsn_currently_processing))) # Flush Postgres wal up to lsn comitted in previous run, or first lsn received in this run lsn_to_flush = lsn_comitted if lsn_currently_processing < lsn_to_flush: lsn_to_flush = lsn_currently_processing - LOGGER.info("{} : Confirming write up to {}, flush to {}".format(datetime.datetime.utcnow(), int_to_lsn(lsn_to_flush), int_to_lsn(lsn_to_flush))) + LOGGER.info("Confirming write up to {}, flush to {}".format(int_to_lsn(lsn_to_flush), int_to_lsn(lsn_to_flush))) cur.send_feedback(write_lsn=lsn_to_flush, flush_lsn=lsn_to_flush, reply=True, force=True) elif (int(msg.data_start) > lsn_currently_processing): @@ -445,7 +445,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): lsn_received_timestamp = datetime.datetime.utcnow() lsn_processed_count = lsn_processed_count + 1 if lsn_processed_count >= UPDATE_BOOKMARK_PERIOD: - LOGGER.debug("{} : Updating bookmarks for all streams to lsn = {} ({})".format(datetime.datetime.utcnow(), lsn_last_processed, int_to_lsn(lsn_last_processed))) + LOGGER.debug("Updating bookmarks for all streams to lsn = {} ({})".format(lsn_last_processed, int_to_lsn(lsn_last_processed))) for s in logical_streams: state = singer.write_bookmark(state, s['tap_stream_id'], 'lsn', lsn_last_processed) singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) @@ -454,19 +454,19 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): # Every poll_interval, update latest comitted lsn position from the state_file if datetime.datetime.utcnow() >= (poll_timestamp + datetime.timedelta(seconds=poll_interval)): if lsn_currently_processing is None: - LOGGER.info("{} : Waiting for first wal message".format(datetime.datetime.utcnow())) + LOGGER.info("Waiting for first wal message") else: - LOGGER.info("{} : Lastest wal message received was {}".format(datetime.datetime.utcnow(), int_to_lsn(lsn_last_processed))) + LOGGER.info("Lastest wal message received was {}".format(int_to_lsn(lsn_last_processed))) try: state_comitted_file = open(state_file) state_comitted = json.load(state_comitted_file) except: - LOGGER.info("{} : Unable to open and parse {}".format(datetime.datetime.utcnow(), state_file)) + LOGGER.info("Unable to open and parse {}".format(state_file)) finally: lsn_comitted = min([get_bookmark(state_comitted, s['tap_stream_id'], 'lsn') for s in logical_streams]) if (lsn_currently_processing > lsn_comitted) and (lsn_comitted > lsn_to_flush): lsn_to_flush = lsn_comitted - LOGGER.info("{} : Confirming write up to {}, flush to {}".format(datetime.datetime.utcnow(), int_to_lsn(lsn_to_flush), int_to_lsn(lsn_to_flush))) + LOGGER.info("Confirming write up to {}, flush to {}".format(int_to_lsn(lsn_to_flush), int_to_lsn(lsn_to_flush))) cur.send_feedback(write_lsn=lsn_to_flush, flush_lsn=lsn_to_flush, reply=True, force=True) poll_timestamp = datetime.datetime.utcnow() @@ -480,7 +480,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file): lsn_last_processed = lsn_comitted LOGGER.info("Current lsn_last_processed {} is older than lsn_comitted {}".format(int_to_lsn(lsn_last_processed), int_to_lsn(lsn_comitted))) - LOGGER.info("{} : Updating bookmarks for all streams to lsn = {} ({})".format(datetime.datetime.utcnow(), lsn_last_processed, int_to_lsn(lsn_last_processed))) + LOGGER.info("Updating bookmarks for all streams to lsn = {} ({})".format(lsn_last_processed, int_to_lsn(lsn_last_processed))) for s in logical_streams: state = singer.write_bookmark(state, s['tap_stream_id'], 'lsn', lsn_last_processed)