From ae3b94a6c30d0408c192edd79875ed45b87d1e27 Mon Sep 17 00:00:00 2001 From: Paul Tiplady Date: Fri, 18 Feb 2022 08:57:54 -0800 Subject: [PATCH 1/2] Fix FULL_TABLE sync mode to comply with spec The Singer Spec clearly states that FULL_TABLE replication mode must replicate "all available records dating back to a `start_date`". This tap erroneously switches to an incremental replication strategy if it detects an auto-incrementing primary key. This is both unsafe (it will omit updates to existing rows) and in violation of the spec. Remove this auto-incremental-update logic from the full-table mode. --- tap_mysql/sync_strategies/full_table.py | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/tap_mysql/sync_strategies/full_table.py b/tap_mysql/sync_strategies/full_table.py index 957364c..a5cae91 100644 --- a/tap_mysql/sync_strategies/full_table.py +++ b/tap_mysql/sync_strategies/full_table.py @@ -141,30 +141,10 @@ def sync_table(mysql_conn, catalog_entry, state, columns, stream_version): if not initial_full_table_complete and not (version_exists and state_version is None): singer.write_message(activate_version_message) - key_props_are_auto_incrementing = pks_are_auto_incrementing(mysql_conn, catalog_entry) - with connect_with_backoff(mysql_conn) as open_conn: with open_conn.cursor() as cur: select_sql = common.generate_select_sql(catalog_entry, columns) - if key_props_are_auto_incrementing: - LOGGER.info("Detected auto-incrementing primary key(s) - will replicate incrementally") - max_pk_values = singer.get_bookmark(state, - catalog_entry.tap_stream_id, - 'max_pk_values') or get_max_pk_values(cur, catalog_entry) - - if not max_pk_values: - LOGGER.info("No max value for auto-incrementing PK found for table %s", catalog_entry.table) - else: - state = singer.write_bookmark(state, - catalog_entry.tap_stream_id, - 'max_pk_values', - max_pk_values) - - pk_clause = generate_pk_clause(catalog_entry, state) - - select_sql += pk_clause - params = {} common.sync_query(cur, From 1ba5d77d19abed10d5c64a1f08cd69e6e8e05bfe Mon Sep 17 00:00:00 2001 From: Paul Tiplady Date: Wed, 23 Feb 2022 10:24:37 -0800 Subject: [PATCH 2/2] Remove `max_pk_values` bookmark --- README.md | 6 + tap_mysql/__init__.py | 141 ++++++++++-------- tap_mysql/sync_strategies/common.py | 18 +-- tap_mysql/sync_strategies/full_table.py | 65 +------- .../test_full_table_interruption.py | 56 +++---- 5 files changed, 115 insertions(+), 171 deletions(-) diff --git a/README.md b/README.md index b2f51e1..f6c5ac7 100644 --- a/README.md +++ b/README.md @@ -432,6 +432,12 @@ GRANT ALL PRIVILEGES ON tap_mysql_test.* TO ; **Note**: The user and password can be anything but the database name needs to be `tap_mysql_test`. +You can get a simple preconfigured test database by using the provided docker-compose file: + +```bash +docker-compose up +``` + 2. Define the environment variables that are required to run the tests: ``` export TAP_MYSQL_HOST= diff --git a/tap_mysql/__init__.py b/tap_mysql/__init__.py index f692428..5d9076c 100644 --- a/tap_mysql/__init__.py +++ b/tap_mysql/__init__.py @@ -57,19 +57,41 @@ def log_engine(mysql_conn, catalog_entry): def is_valid_currently_syncing_stream(selected_stream, state): + """Determine if the selected stream, marked as "currently syncing", is valid to continue incremental processing.""" stream_metadata = metadata.to_map(selected_stream.metadata) replication_method = stream_metadata.get((), {}).get('replication-method') - if replication_method != 'LOG_BASED': + # It's never valid to be "currently syncing" a FULL_TABLE replication stream. + # These are always sync'd from the beginning. + if replication_method == 'FULL_TABLE': + return False + + # It's always valid to be "currently syncing" an INCREMENTAL replication stream. + # These support retrying based on the replication key. + if replication_method == 'INCREMENTAL': return True - if replication_method == 'LOG_BASED' and binlog_stream_requires_historical(selected_stream, state): + # If we're doing LOG_BASED replication, and the stream is "currently syncing", + # then we must have run a sync previously. + # If we still require historical data, then the initial sync must have failed, + # in which case we cannot continue the current sync and must start over with a fresh table scan. + if replication_method == 'LOG_BASED': + if binlog_stream_requires_historical(selected_stream, state): + return False + return True return False def binlog_stream_requires_historical(catalog_entry, state): + """Check if the binlog stream has already run a FULL_TABLE sync to load historical state. + + By default the LOG_BASED replication strategy will run a FULL_TABLE scan first, + to sweep all historical records. When this succeeds, the `initial_full_table_complete` + bookmark will be written. If that bookmark hasn't been written to the state, + then we need to load historical data. + """ log_file = singer.get_bookmark(state, catalog_entry.tap_stream_id, 'log_file') @@ -78,15 +100,11 @@ def binlog_stream_requires_historical(catalog_entry, state): catalog_entry.tap_stream_id, 'log_pos') - max_pk_values = singer.get_bookmark(state, - catalog_entry.tap_stream_id, - 'max_pk_values') - - last_pk_fetched = singer.get_bookmark(state, - catalog_entry.tap_stream_id, - 'last_pk_fetched') + initial_full_table_complete = singer.get_bookmark(state, + catalog_entry.tap_stream_id, + 'initial_binlog_complete') - if (log_file and log_pos) and (not max_pk_values and not last_pk_fetched): + if log_file and log_pos and initial_full_table_complete: return False return True @@ -137,11 +155,19 @@ def get_non_binlog_streams(mysql_conn, catalog, config, state): raise Exception( f"Unable to replicate stream({stream.stream}) with binlog because it is a view.") - LOGGER.info("LOG_BASED stream %s will resume its historical sync", stream.tap_stream_id) + LOGGER.info("LOG_BASED stream %s requires a full historical sync", stream.tap_stream_id) + streams_without_state.append(stream) + elif stream_state and replication_method == 'INCREMENTAL': streams_with_state.append(stream) - elif stream_state and replication_method != 'LOG_BASED': - streams_with_state.append(stream) + elif stream_state and replication_method == 'FULL_TABLE': + # The Singer spec requires us to do a full table sync + LOGGER.warning( + "FULL_TABLE stream %s had previous state. " + "This state is being ignored and a full table sync is being performed.", + stream.tap_stream_id + ) + streams_without_state.append(stream) # If the state says we were in the middle of processing a stream, skip # to that stream. Then process streams without prior state and finally @@ -152,11 +178,13 @@ def get_non_binlog_streams(mysql_conn, catalog, config, state): ordered_streams = streams_without_state + streams_with_state if currently_syncing: - currently_syncing_stream = list(filter( - lambda s: s.tap_stream_id == currently_syncing and is_valid_currently_syncing_stream(s, state), - streams_with_state)) - - non_currently_syncing_streams = list(filter(lambda s: s.tap_stream_id != currently_syncing, ordered_streams)) + currently_syncing_stream = [] + non_currently_syncing_streams = [] + for stream in ordered_streams: + if stream.tap_stream_id == currently_syncing and is_valid_currently_syncing_stream(stream, state): + currently_syncing_stream.append(stream) + else: + non_currently_syncing_streams.append(stream) streams_to_sync = currently_syncing_stream + non_currently_syncing_streams else: @@ -208,66 +236,51 @@ def do_sync_historical_binlog(mysql_conn, catalog_entry, state, columns): if is_view: raise Exception(f"Unable to replicate stream({catalog_entry.stream}) with binlog because it is a view.") - log_file = singer.get_bookmark(state, - catalog_entry.tap_stream_id, - 'log_file') + write_schema_message(catalog_entry) - log_pos = singer.get_bookmark(state, + stream_version = common.get_stream_version(catalog_entry.tap_stream_id, state) + + LOGGER.info("Performing initial full table sync for LOG_BASED stream %s", catalog_entry.tap_stream_id) + + state = singer.write_bookmark(state, catalog_entry.tap_stream_id, - 'log_pos') + 'initial_binlog_complete', + False) - max_pk_values = singer.get_bookmark(state, - catalog_entry.tap_stream_id, - 'max_pk_values') + current_log_file, current_log_pos = binlog.fetch_current_log_file_and_pos(mysql_conn) + state = singer.write_bookmark(state, + catalog_entry.tap_stream_id, + 'version', + stream_version) - write_schema_message(catalog_entry) + if full_table.pks_are_auto_incrementing(mysql_conn, catalog_entry): + # We must save log_file and log_pos across FULL_TABLE syncs when using + # an incrementing PK + state = singer.write_bookmark(state, + catalog_entry.tap_stream_id, + 'log_file', + current_log_file) - stream_version = common.get_stream_version(catalog_entry.tap_stream_id, state) + state = singer.write_bookmark(state, + catalog_entry.tap_stream_id, + 'log_pos', + current_log_pos) - if log_file and log_pos and max_pk_values: - LOGGER.info("Resuming initial full table sync for LOG_BASED stream %s", catalog_entry.tap_stream_id) full_table.sync_table(mysql_conn, catalog_entry, state, columns, stream_version) else: - LOGGER.info("Performing initial full table sync for LOG_BASED stream %s", catalog_entry.tap_stream_id) - + full_table.sync_table(mysql_conn, catalog_entry, state, columns, stream_version) state = singer.write_bookmark(state, catalog_entry.tap_stream_id, - 'initial_binlog_complete', - False) + 'log_file', + current_log_file) - current_log_file, current_log_pos = binlog.fetch_current_log_file_and_pos(mysql_conn) state = singer.write_bookmark(state, catalog_entry.tap_stream_id, - 'version', - stream_version) - - if full_table.pks_are_auto_incrementing(mysql_conn, catalog_entry): - # We must save log_file and log_pos across FULL_TABLE syncs when using - # an incrementing PK - state = singer.write_bookmark(state, - catalog_entry.tap_stream_id, - 'log_file', - current_log_file) - - state = singer.write_bookmark(state, - catalog_entry.tap_stream_id, - 'log_pos', - current_log_pos) - - full_table.sync_table(mysql_conn, catalog_entry, state, columns, stream_version) - - else: - full_table.sync_table(mysql_conn, catalog_entry, state, columns, stream_version) - state = singer.write_bookmark(state, - catalog_entry.tap_stream_id, - 'log_file', - current_log_file) - - state = singer.write_bookmark(state, - catalog_entry.tap_stream_id, - 'log_pos', - current_log_pos) + 'log_pos', + current_log_pos) + + singer.write_bookmark(state, catalog_entry.tap_stream_id, 'initial_binlog_complete', True) def do_sync_full_table(mysql_conn, catalog_entry, state, columns): diff --git a/tap_mysql/sync_strategies/common.py b/tap_mysql/sync_strategies/common.py index d8f4ba8..2c2ed1b 100644 --- a/tap_mysql/sync_strategies/common.py +++ b/tap_mysql/sync_strategies/common.py @@ -171,23 +171,7 @@ def sync_query(cursor, catalog_entry, state, select_sql, columns, stream_version stream_metadata = md_map.get((), {}) replication_method = stream_metadata.get('replication-method') - if replication_method in {'FULL_TABLE', 'LOG_BASED'}: - key_properties = get_key_properties(catalog_entry) - - max_pk_values = singer.get_bookmark(state, - catalog_entry.tap_stream_id, - 'max_pk_values') - - if max_pk_values: - last_pk_fetched = {k:v for k, v in record_message.record.items() - if k in key_properties} - - state = singer.write_bookmark(state, - catalog_entry.tap_stream_id, - 'last_pk_fetched', - last_pk_fetched) - - elif replication_method == 'INCREMENTAL': + if replication_method == 'INCREMENTAL': if replication_key is not None: state = singer.write_bookmark(state, catalog_entry.tap_stream_id, diff --git a/tap_mysql/sync_strategies/full_table.py b/tap_mysql/sync_strategies/full_table.py index a5cae91..53346a1 100644 --- a/tap_mysql/sync_strategies/full_table.py +++ b/tap_mysql/sync_strategies/full_table.py @@ -18,7 +18,7 @@ def generate_bookmark_keys(catalog_entry): stream_metadata = md_map.get((), {}) replication_method = stream_metadata.get('replication-method') - base_bookmark_keys = {'last_pk_fetched', 'max_pk_values', 'version', 'initial_full_table_complete'} + base_bookmark_keys = {'last_pk_fetched', 'version', 'initial_full_table_complete'} if replication_method == 'FULL_TABLE': bookmark_keys = base_bookmark_keys @@ -58,65 +58,6 @@ def pks_are_auto_incrementing(mysql_conn, catalog_entry): return True -def get_max_pk_values(cursor, catalog_entry): - database_name = common.get_database_name(catalog_entry) - escaped_db = common.escape(database_name) - escaped_table = common.escape(catalog_entry.table) - - key_properties = common.get_key_properties(catalog_entry) - escaped_columns = [common.escape(c) for c in key_properties] - - sql = """SELECT {} - FROM {}.{} - ORDER BY {} - LIMIT 1 - """ - - select_column_clause = ", ".join(escaped_columns) - order_column_clause = ", ".join([primary_key + " DESC" for primary_key in escaped_columns]) - - cursor.execute(sql.format(select_column_clause, - escaped_db, - escaped_table, - order_column_clause)) - result = cursor.fetchone() - - if result: - max_pk_values = dict(zip(key_properties, result)) - else: - max_pk_values = {} - - return max_pk_values - - -def generate_pk_clause(catalog_entry, state): - key_properties = common.get_key_properties(catalog_entry) - escaped_columns = [common.escape(c) for c in key_properties] - - max_pk_values = singer.get_bookmark(state, - catalog_entry.tap_stream_id, - 'max_pk_values') - - last_pk_fetched = singer.get_bookmark(state, - catalog_entry.tap_stream_id, - 'last_pk_fetched') - - if last_pk_fetched: - pk_comparisons = ["({} > {} AND {} <= {})".format(common.escape(pk), - last_pk_fetched[pk], - common.escape(pk), - max_pk_values[pk]) - for pk in key_properties] - else: - pk_comparisons = ["{} <= {}".format(common.escape(pk), max_pk_values[pk]) - for pk in key_properties] - - sql = " WHERE {} ORDER BY {} ASC".format(" AND ".join(pk_comparisons), - ", ".join(escaped_columns)) - - return sql - - def sync_table(mysql_conn, catalog_entry, state, columns, stream_version): common.whitelist_bookmark_keys(generate_bookmark_keys(catalog_entry), catalog_entry.tap_stream_id, state) @@ -155,8 +96,4 @@ def sync_table(mysql_conn, catalog_entry, state, columns, stream_version): stream_version, params) - # clear max pk value and last pk fetched upon successful sync - singer.clear_bookmark(state, catalog_entry.tap_stream_id, 'max_pk_values') - singer.clear_bookmark(state, catalog_entry.tap_stream_id, 'last_pk_fetched') - singer.write_message(activate_version_message) diff --git a/tests/integration/test_full_table_interruption.py b/tests/integration/test_full_table_interruption.py index 3631b4c..64d2c52 100644 --- a/tests/integration/test_full_table_interruption.py +++ b/tests/integration/test_full_table_interruption.py @@ -111,18 +111,20 @@ def setUp(self): SINGER_MESSAGES.clear() def test_table_2_interrupted(self): + """Test that a binlog table correctly recovers from an interruption in the initial table sync. + + When using LOG_BASED replication, an inital FULL_TABLE load is first performed. + If that full-table load fails, we should re-run it. + """ singer.write_message = singer_write_message_no_table_2 state = {} - failed_syncing_table_2 = False try: tap_mysql.do_sync(self.conn, test_utils.get_db_config(), self.catalog, state) + self.fail('Expected a simulated exception.') except Exception as ex: - if str(ex) == 'simulated exception': - failed_syncing_table_2 = True - - self.assertTrue(failed_syncing_table_2) + self.assertEqual(str(ex), 'simulated exception') record_messages_1 = [[m.stream, m.record] for m in SINGER_MESSAGES if isinstance(m, singer.RecordMessage)] @@ -146,9 +148,6 @@ def test_table_2_interrupted(self): table_2_version = table_2_bookmark['version'] self.assertIsNotNone(table_2_version) - self.assertEqual(table_2_bookmark['max_pk_values'], {'id': 3}) - self.assertEqual(table_2_bookmark['last_pk_fetched'], {'id': 1}) - self.assertIsNotNone(table_2_bookmark.get('log_file')) self.assertIsNotNone(table_2_bookmark.get('log_pos')) @@ -164,12 +163,14 @@ def test_table_2_interrupted(self): record_messages_2 = [[m.stream, m.record] for m in SINGER_MESSAGES if isinstance(m, singer.RecordMessage)] - self.assertEqual(record_messages_2, - [['tap_mysql_test-table_2', {'id': 2, 'bar': 'def', 'foo': 200}], - ['tap_mysql_test-table_2', {'id': 3, 'bar': 'abc', 'foo': 100}], - ['tap_mysql_test-table_1', {'id': 1, 'bar': 'abc', 'foo': 100}], - ['tap_mysql_test-table_1', {'id': 2, 'bar': 'def', 'foo': 200}], - ['tap_mysql_test-table_1', {'id': 3, 'bar': 'ghi', 'foo': 300}]]) + self.assertEqual(record_messages_2, [ + ['tap_mysql_test-table_1', {'id': 1, 'bar': 'abc', 'foo': 100}], + ['tap_mysql_test-table_1', {'id': 2, 'bar': 'def', 'foo': 200}], + ['tap_mysql_test-table_1', {'id': 3, 'bar': 'ghi', 'foo': 300}], + ['tap_mysql_test-table_2', {'id': 1, 'bar': 'ghi', 'foo': 300}], + ['tap_mysql_test-table_2', {'id': 2, 'bar': 'def', 'foo': 200}], + ['tap_mysql_test-table_2', {'id': 3, 'bar': 'abc', 'foo': 100}], + ]) self.assertIsNone(state['currently_syncing']) @@ -205,12 +206,13 @@ def test_table_2_interrupted(self): record_messages_3 = [[m.stream, m.record] for m in SINGER_MESSAGES if isinstance(m, singer.RecordMessage)] - self.assertEqual(record_messages_3, - [['tap_mysql_test-table_1', {'id': 1, 'bar': 'abc', 'foo': 100}], - ['tap_mysql_test-table_1', {'id': 2, 'bar': 'def', 'foo': 200}], - ['tap_mysql_test-table_1', {'id': 3, 'bar': 'ghi', 'foo': 300}], - ['tap_mysql_test-table_2', {'id': 4, 'bar': 'jkl', 'foo': 400}], - ['tap_mysql_test-table_2', {'id': 5, 'bar': 'mno', 'foo': 500}]]) + self.assertEqual(record_messages_3, [ + ['tap_mysql_test-table_1', {'id': 1, 'bar': 'abc', 'foo': 100}], + ['tap_mysql_test-table_1', {'id': 2, 'bar': 'def', 'foo': 200}], + ['tap_mysql_test-table_1', {'id': 3, 'bar': 'ghi', 'foo': 300}], + ['tap_mysql_test-table_2', {'id': 4, 'bar': 'jkl', 'foo': 400}], + ['tap_mysql_test-table_2', {'id': 5, 'bar': 'mno', 'foo': 500}], + ]) self.assertIsNone(state['currently_syncing']) @@ -284,12 +286,14 @@ def test_table_2_interrupted(self): self.assertFalse(failed_syncing_table_2) record_messages_2 = [[m.stream, m.record] for m in SINGER_MESSAGES if isinstance(m, singer.RecordMessage)] - self.assertEqual(record_messages_2, - [['tap_mysql_test-table_2', {'id': 2, 'bar': 'def', 'foo': 200}], - ['tap_mysql_test-table_2', {'id': 3, 'bar': 'abc', 'foo': 100}], - ['tap_mysql_test-table_1', {'id': 1, 'bar': 'abc', 'foo': 100}], - ['tap_mysql_test-table_1', {'id': 2, 'bar': 'def', 'foo': 200}], - ['tap_mysql_test-table_1', {'id': 3, 'bar': 'ghi', 'foo': 300}]]) + self.assertEqual(record_messages_2, [ + ['tap_mysql_test-table_1', {'id': 1, 'bar': 'abc', 'foo': 100}], + ['tap_mysql_test-table_1', {'id': 2, 'bar': 'def', 'foo': 200}], + ['tap_mysql_test-table_1', {'id': 3, 'bar': 'ghi', 'foo': 300}], + ['tap_mysql_test-table_2', {'id': 1, 'bar': 'ghi', 'foo': 300}], + ['tap_mysql_test-table_2', {'id': 2, 'bar': 'def', 'foo': 200}], + ['tap_mysql_test-table_2', {'id': 3, 'bar': 'abc', 'foo': 100}], + ]) expected_state_2 = { 'currently_syncing': None,