Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Draft: Fix FULL_TABLE replication to comply with Singer spec #96

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,12 @@ GRANT ALL PRIVILEGES ON tap_mysql_test.* TO <mysql-user>;

**Note**: The user and password can be anything but the database name needs to be `tap_mysql_test`.

You can get a simple preconfigured test database by using the provided docker-compose file:

```bash
docker-compose up
```

2. Define the environment variables that are required to run the tests:
```
export TAP_MYSQL_HOST=<mysql-host>
Expand Down
141 changes: 77 additions & 64 deletions tap_mysql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,41 @@ def log_engine(mysql_conn, catalog_entry):


def is_valid_currently_syncing_stream(selected_stream, state):
"""Determine if the selected stream, marked as "currently syncing", is valid to continue incremental processing."""
stream_metadata = metadata.to_map(selected_stream.metadata)
replication_method = stream_metadata.get((), {}).get('replication-method')

if replication_method != 'LOG_BASED':
# It's never valid to be "currently syncing" a FULL_TABLE replication stream.
# These are always sync'd from the beginning.
if replication_method == 'FULL_TABLE':
return False

# It's always valid to be "currently syncing" an INCREMENTAL replication stream.
# These support retrying based on the replication key.
if replication_method == 'INCREMENTAL':
return True

if replication_method == 'LOG_BASED' and binlog_stream_requires_historical(selected_stream, state):
# If we're doing LOG_BASED replication, and the stream is "currently syncing",
# then we must have run a sync previously.
# If we still require historical data, then the initial sync must have failed,
# in which case we cannot continue the current sync and must start over with a fresh table scan.
if replication_method == 'LOG_BASED':
if binlog_stream_requires_historical(selected_stream, state):
return False

return True

return False


def binlog_stream_requires_historical(catalog_entry, state):
"""Check if the binlog stream has already run a FULL_TABLE sync to load historical state.

By default the LOG_BASED replication strategy will run a FULL_TABLE scan first,
to sweep all historical records. When this succeeds, the `initial_full_table_complete`
bookmark will be written. If that bookmark hasn't been written to the state,
then we need to load historical data.
"""
log_file = singer.get_bookmark(state,
catalog_entry.tap_stream_id,
'log_file')
Expand All @@ -78,15 +100,11 @@ def binlog_stream_requires_historical(catalog_entry, state):
catalog_entry.tap_stream_id,
'log_pos')

max_pk_values = singer.get_bookmark(state,
catalog_entry.tap_stream_id,
'max_pk_values')

last_pk_fetched = singer.get_bookmark(state,
catalog_entry.tap_stream_id,
'last_pk_fetched')
initial_full_table_complete = singer.get_bookmark(state,
catalog_entry.tap_stream_id,
'initial_binlog_complete')

if (log_file and log_pos) and (not max_pk_values and not last_pk_fetched):
if log_file and log_pos and initial_full_table_complete:
return False

return True
Expand Down Expand Up @@ -137,11 +155,19 @@ def get_non_binlog_streams(mysql_conn, catalog, config, state):
raise Exception(
f"Unable to replicate stream({stream.stream}) with binlog because it is a view.")

LOGGER.info("LOG_BASED stream %s will resume its historical sync", stream.tap_stream_id)
LOGGER.info("LOG_BASED stream %s requires a full historical sync", stream.tap_stream_id)

streams_without_state.append(stream)
elif stream_state and replication_method == 'INCREMENTAL':
streams_with_state.append(stream)
elif stream_state and replication_method != 'LOG_BASED':
streams_with_state.append(stream)
elif stream_state and replication_method == 'FULL_TABLE':
# The Singer spec requires us to do a full table sync
LOGGER.warning(
"FULL_TABLE stream %s had previous state. "
"This state is being ignored and a full table sync is being performed.",
stream.tap_stream_id
)
streams_without_state.append(stream)

# If the state says we were in the middle of processing a stream, skip
# to that stream. Then process streams without prior state and finally
Expand All @@ -152,11 +178,13 @@ def get_non_binlog_streams(mysql_conn, catalog, config, state):
ordered_streams = streams_without_state + streams_with_state

if currently_syncing:
currently_syncing_stream = list(filter(
lambda s: s.tap_stream_id == currently_syncing and is_valid_currently_syncing_stream(s, state),
streams_with_state))

non_currently_syncing_streams = list(filter(lambda s: s.tap_stream_id != currently_syncing, ordered_streams))
currently_syncing_stream = []
non_currently_syncing_streams = []
for stream in ordered_streams:
if stream.tap_stream_id == currently_syncing and is_valid_currently_syncing_stream(stream, state):
currently_syncing_stream.append(stream)
else:
non_currently_syncing_streams.append(stream)

streams_to_sync = currently_syncing_stream + non_currently_syncing_streams
else:
Expand Down Expand Up @@ -208,66 +236,51 @@ def do_sync_historical_binlog(mysql_conn, catalog_entry, state, columns):
if is_view:
raise Exception(f"Unable to replicate stream({catalog_entry.stream}) with binlog because it is a view.")

log_file = singer.get_bookmark(state,
catalog_entry.tap_stream_id,
'log_file')
write_schema_message(catalog_entry)

log_pos = singer.get_bookmark(state,
stream_version = common.get_stream_version(catalog_entry.tap_stream_id, state)

LOGGER.info("Performing initial full table sync for LOG_BASED stream %s", catalog_entry.tap_stream_id)

state = singer.write_bookmark(state,
catalog_entry.tap_stream_id,
'log_pos')
'initial_binlog_complete',
False)

max_pk_values = singer.get_bookmark(state,
catalog_entry.tap_stream_id,
'max_pk_values')
current_log_file, current_log_pos = binlog.fetch_current_log_file_and_pos(mysql_conn)
state = singer.write_bookmark(state,
catalog_entry.tap_stream_id,
'version',
stream_version)

write_schema_message(catalog_entry)
if full_table.pks_are_auto_incrementing(mysql_conn, catalog_entry):
# We must save log_file and log_pos across FULL_TABLE syncs when using
# an incrementing PK
state = singer.write_bookmark(state,
catalog_entry.tap_stream_id,
'log_file',
current_log_file)

stream_version = common.get_stream_version(catalog_entry.tap_stream_id, state)
state = singer.write_bookmark(state,
catalog_entry.tap_stream_id,
'log_pos',
current_log_pos)

if log_file and log_pos and max_pk_values:
LOGGER.info("Resuming initial full table sync for LOG_BASED stream %s", catalog_entry.tap_stream_id)
full_table.sync_table(mysql_conn, catalog_entry, state, columns, stream_version)

else:
LOGGER.info("Performing initial full table sync for LOG_BASED stream %s", catalog_entry.tap_stream_id)

full_table.sync_table(mysql_conn, catalog_entry, state, columns, stream_version)
state = singer.write_bookmark(state,
catalog_entry.tap_stream_id,
'initial_binlog_complete',
False)
'log_file',
current_log_file)

current_log_file, current_log_pos = binlog.fetch_current_log_file_and_pos(mysql_conn)
state = singer.write_bookmark(state,
catalog_entry.tap_stream_id,
'version',
stream_version)

if full_table.pks_are_auto_incrementing(mysql_conn, catalog_entry):
# We must save log_file and log_pos across FULL_TABLE syncs when using
# an incrementing PK
state = singer.write_bookmark(state,
catalog_entry.tap_stream_id,
'log_file',
current_log_file)

state = singer.write_bookmark(state,
catalog_entry.tap_stream_id,
'log_pos',
current_log_pos)

full_table.sync_table(mysql_conn, catalog_entry, state, columns, stream_version)

else:
full_table.sync_table(mysql_conn, catalog_entry, state, columns, stream_version)
state = singer.write_bookmark(state,
catalog_entry.tap_stream_id,
'log_file',
current_log_file)

state = singer.write_bookmark(state,
catalog_entry.tap_stream_id,
'log_pos',
current_log_pos)
'log_pos',
current_log_pos)

singer.write_bookmark(state, catalog_entry.tap_stream_id, 'initial_binlog_complete', True)


def do_sync_full_table(mysql_conn, catalog_entry, state, columns):
Expand Down
18 changes: 1 addition & 17 deletions tap_mysql/sync_strategies/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,23 +171,7 @@ def sync_query(cursor, catalog_entry, state, select_sql, columns, stream_version
stream_metadata = md_map.get((), {})
replication_method = stream_metadata.get('replication-method')

if replication_method in {'FULL_TABLE', 'LOG_BASED'}:
key_properties = get_key_properties(catalog_entry)

max_pk_values = singer.get_bookmark(state,
catalog_entry.tap_stream_id,
'max_pk_values')

if max_pk_values:
last_pk_fetched = {k:v for k, v in record_message.record.items()
if k in key_properties}

state = singer.write_bookmark(state,
catalog_entry.tap_stream_id,
'last_pk_fetched',
last_pk_fetched)

elif replication_method == 'INCREMENTAL':
if replication_method == 'INCREMENTAL':
if replication_key is not None:
state = singer.write_bookmark(state,
catalog_entry.tap_stream_id,
Expand Down
85 changes: 1 addition & 84 deletions tap_mysql/sync_strategies/full_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def generate_bookmark_keys(catalog_entry):
stream_metadata = md_map.get((), {})
replication_method = stream_metadata.get('replication-method')

base_bookmark_keys = {'last_pk_fetched', 'max_pk_values', 'version', 'initial_full_table_complete'}
base_bookmark_keys = {'last_pk_fetched', 'version', 'initial_full_table_complete'}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Remove last_pk_fetched too.


if replication_method == 'FULL_TABLE':
bookmark_keys = base_bookmark_keys
Expand Down Expand Up @@ -58,65 +58,6 @@ def pks_are_auto_incrementing(mysql_conn, catalog_entry):
return True


def get_max_pk_values(cursor, catalog_entry):
database_name = common.get_database_name(catalog_entry)
escaped_db = common.escape(database_name)
escaped_table = common.escape(catalog_entry.table)

key_properties = common.get_key_properties(catalog_entry)
escaped_columns = [common.escape(c) for c in key_properties]

sql = """SELECT {}
FROM {}.{}
ORDER BY {}
LIMIT 1
"""

select_column_clause = ", ".join(escaped_columns)
order_column_clause = ", ".join([primary_key + " DESC" for primary_key in escaped_columns])

cursor.execute(sql.format(select_column_clause,
escaped_db,
escaped_table,
order_column_clause))
result = cursor.fetchone()

if result:
max_pk_values = dict(zip(key_properties, result))
else:
max_pk_values = {}

return max_pk_values


def generate_pk_clause(catalog_entry, state):
key_properties = common.get_key_properties(catalog_entry)
escaped_columns = [common.escape(c) for c in key_properties]

max_pk_values = singer.get_bookmark(state,
catalog_entry.tap_stream_id,
'max_pk_values')

last_pk_fetched = singer.get_bookmark(state,
catalog_entry.tap_stream_id,
'last_pk_fetched')

if last_pk_fetched:
pk_comparisons = ["({} > {} AND {} <= {})".format(common.escape(pk),
last_pk_fetched[pk],
common.escape(pk),
max_pk_values[pk])
for pk in key_properties]
else:
pk_comparisons = ["{} <= {}".format(common.escape(pk), max_pk_values[pk])
for pk in key_properties]

sql = " WHERE {} ORDER BY {} ASC".format(" AND ".join(pk_comparisons),
", ".join(escaped_columns))

return sql


def sync_table(mysql_conn, catalog_entry, state, columns, stream_version):
common.whitelist_bookmark_keys(generate_bookmark_keys(catalog_entry), catalog_entry.tap_stream_id, state)

Expand All @@ -141,30 +82,10 @@ def sync_table(mysql_conn, catalog_entry, state, columns, stream_version):
if not initial_full_table_complete and not (version_exists and state_version is None):
singer.write_message(activate_version_message)

key_props_are_auto_incrementing = pks_are_auto_incrementing(mysql_conn, catalog_entry)

with connect_with_backoff(mysql_conn) as open_conn:
with open_conn.cursor() as cur:
select_sql = common.generate_select_sql(catalog_entry, columns)

if key_props_are_auto_incrementing:
LOGGER.info("Detected auto-incrementing primary key(s) - will replicate incrementally")
max_pk_values = singer.get_bookmark(state,
catalog_entry.tap_stream_id,
'max_pk_values') or get_max_pk_values(cur, catalog_entry)

if not max_pk_values:
LOGGER.info("No max value for auto-incrementing PK found for table %s", catalog_entry.table)
else:
state = singer.write_bookmark(state,
catalog_entry.tap_stream_id,
'max_pk_values',
max_pk_values)

pk_clause = generate_pk_clause(catalog_entry, state)

select_sql += pk_clause

params = {}

common.sync_query(cur,
Expand All @@ -175,8 +96,4 @@ def sync_table(mysql_conn, catalog_entry, state, columns, stream_version):
stream_version,
params)

# clear max pk value and last pk fetched upon successful sync
singer.clear_bookmark(state, catalog_entry.tap_stream_id, 'max_pk_values')
singer.clear_bookmark(state, catalog_entry.tap_stream_id, 'last_pk_fetched')

singer.write_message(activate_version_message)
Loading