From 36231e6199d56fe4c0f2b16215a0807e21c4c34e Mon Sep 17 00:00:00 2001 From: Peter Kosztolanyi Date: Tue, 21 Jan 2020 09:49:00 +0000 Subject: [PATCH] [AP-483] Update bookmark when reading bookmark finished (#14) --- setup.py | 2 +- tap_mysql/sync_strategies/binlog.py | 19 ++++++++++++++++--- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/setup.py b/setup.py index 4cf4d06..5a4fe8d 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup(name='pipelinewise-tap-mysql', - version='1.1.3', + version='1.1.4', description='Singer.io tap for extracting data from MySQL - PipelineWise compatible', author='Stitch', url='https://github.com/transferwise/pipelinewise-tap-mysql', diff --git a/tap_mysql/sync_strategies/binlog.py b/tap_mysql/sync_strategies/binlog.py index 5195e50..b670e25 100644 --- a/tap_mysql/sync_strategies/binlog.py +++ b/tap_mysql/sync_strategies/binlog.py @@ -328,6 +328,8 @@ def _run_binlog_sync(mysql_conn, reader, binlog_streams_map, state): events_skipped = 0 current_log_file, current_log_pos = fetch_current_log_file_and_pos(mysql_conn) + log_file = None + log_pos = None for binlog_event in reader: if isinstance(binlog_event, RotateEvent): @@ -378,20 +380,31 @@ def _run_binlog_sync(mysql_conn, reader, binlog_streams_map, state): binlog_event.schema, binlog_event.table) + # Update log_file and log_pos after every processed binlog event + log_file = reader.log_file + log_pos = reader.log_pos + # The iterator across python-mysql-replication's fetchone method should ultimately terminate # upon receiving an EOF packet. There seem to be some cases when a MySQL server will not send # one causing binlog replication to hang. - if current_log_file == reader.log_file and reader.log_pos >= current_log_pos: + if current_log_file == log_file and log_pos >= current_log_pos: break + # Update singer bookmark and send STATE message periodically if ((rows_saved and rows_saved % UPDATE_BOOKMARK_PERIOD == 0) or (events_skipped and events_skipped % UPDATE_BOOKMARK_PERIOD == 0)): state = update_bookmarks(state, binlog_streams_map, - reader.log_file, - reader.log_pos) + log_file, + log_pos) singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) + # Update singer bookmark at the last time to point it the the last processed binlog event + state = update_bookmarks(state, + binlog_streams_map, + log_file, + log_pos) + def sync_binlog_stream(mysql_conn, config, binlog_streams, state): binlog_streams_map = generate_streams_map(binlog_streams)