Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
[AP-483] Update bookmark when reading bookmark finished (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
koszti authored Jan 21, 2020
1 parent 550547f commit 36231e6
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 4 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from setuptools import setup

setup(name='pipelinewise-tap-mysql',
version='1.1.3',
version='1.1.4',
description='Singer.io tap for extracting data from MySQL - PipelineWise compatible',
author='Stitch',
url='https://github.com/transferwise/pipelinewise-tap-mysql',
Expand Down
19 changes: 16 additions & 3 deletions tap_mysql/sync_strategies/binlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ def _run_binlog_sync(mysql_conn, reader, binlog_streams_map, state):
events_skipped = 0

current_log_file, current_log_pos = fetch_current_log_file_and_pos(mysql_conn)
log_file = None
log_pos = None

for binlog_event in reader:
if isinstance(binlog_event, RotateEvent):
Expand Down Expand Up @@ -378,20 +380,31 @@ def _run_binlog_sync(mysql_conn, reader, binlog_streams_map, state):
binlog_event.schema,
binlog_event.table)

# Update log_file and log_pos after every processed binlog event
log_file = reader.log_file
log_pos = reader.log_pos

# The iterator across python-mysql-replication's fetchone method should ultimately terminate
# upon receiving an EOF packet. There seem to be some cases when a MySQL server will not send
# one causing binlog replication to hang.
if current_log_file == reader.log_file and reader.log_pos >= current_log_pos:
if current_log_file == log_file and log_pos >= current_log_pos:
break

# Update singer bookmark and send STATE message periodically
if ((rows_saved and rows_saved % UPDATE_BOOKMARK_PERIOD == 0) or
(events_skipped and events_skipped % UPDATE_BOOKMARK_PERIOD == 0)):
state = update_bookmarks(state,
binlog_streams_map,
reader.log_file,
reader.log_pos)
log_file,
log_pos)
singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))

# Update singer bookmark at the last time to point it the the last processed binlog event
state = update_bookmarks(state,
binlog_streams_map,
log_file,
log_pos)


def sync_binlog_stream(mysql_conn, config, binlog_streams, state):
binlog_streams_map = generate_streams_map(binlog_streams)
Expand Down

0 comments on commit 36231e6

Please sign in to comment.