diff --git a/Makefile b/Makefile index eb21705..9173fff 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,7 @@ pylint: unit_test: . ./venv/bin/activate ;\ - nosetests -c .noserc --cover-min-percentage=42 tests/unit + nosetests -c .noserc --cover-min-percentage=47 tests/unit integration_test: . ./venv/bin/activate ;\ diff --git a/README.md b/README.md index 09eb5fc..e691f11 100644 --- a/README.md +++ b/README.md @@ -65,24 +65,24 @@ Create a config file containing the database connection credentials, see [sample List of config parameters: -| Parameter | type | required | default | description | -|--------------|-------------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------| -| host | string | yes | - | mysql/mariadb host | -| port | int | yes | - | mysql/mariadb port | -| user | string | yes | - | db username | -| password | string | yes | - | db password | -| cursorclass | string | No | `pymysql.cursors.SSCursor` | set cursorclass used by PyMYSQL | -| database | string | No | - | Database to use, None to not use a particular one. Used by PyMYSQL | -| server_id | int | False | Randomly generated int | Used as the slave id when this tap is connecting to the server | +| Parameter | type | required | default | description | +|--------------|-------------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------| +| host | string | yes | - | mysql/mariadb host | +| port | int | yes | - | mysql/mariadb port | +| user | string | yes | - | db username | +| password | string | yes | - | db password | +| cursorclass | string | No | `pymysql.cursors.SSCursor` | set cursorclass used by PyMYSQL | +| database | string | No | - | Database to use, None to not use a particular one. Used by PyMYSQL | +| server_id | int | False | Randomly generated int | Used as the slave id when this tap is connecting to the server | | filter_db | string | False | - | Comma separated list of schemas to extract tables only from particular schemas and to improve data extraction performance | -| use_gtid | bool | False | False | Flag to enable log based replication using GTID | -| engine | string ('mysql' or 'mariadb') | False | 'mysql' | Indicate which flavor the server is, used for LOG_BASED with GTID | -| ssl | string ("true") | No | False | Enable SSL connection | -| ssl_ca | string | No | - | for self-signed SSL | -| ssl_cert | string | No | - | for self-signed SSL | -| ssl_key | string | No | - | for self-signed SSL | -| internal_hostname | string | No | - | Override match hostname for google cloud | -| session_sqls | List of strings | No | ```['SET @@session.time_zone="+0:00"', 'SET @@session.wait_timeout=28800', 'SET @@session.net_read_timeout=3600', 'SET @@session.innodb_lock_wait_timeout=3600']``` | Set session variables dynamically.| +| use_gtid | bool | False | False
| Flag to enable log based replication using GTID | +| engine | string ('mysql' or 'mariadb') | False | 'mysql' | Indicate which flavor the server is, used for LOG_BASED with GTID | +| ssl | string ("true") | No | False | Enable SSL connection | +| ssl_ca | string | No | - | for self-signed SSL | +| ssl_cert | string | No | - | for self-signed SSL | +| ssl_key | string | No | - | for self-signed SSL | +| internal_hostname | string | No | - | Override match hostname for google cloud | +| session_sqls | List of strings | No | ```['SET @@session.time_zone="+0:00"', 'SET @@session.wait_timeout=28800', 'SET @@session.net_read_timeout=3600', 'SET @@session.innodb_lock_wait_timeout=3600']``` | Set session variables dynamically. | ### Discovery mode @@ -287,14 +287,50 @@ resultant stream of JSON data can be consumed by a Singer target. ## Replication methods and state file -In the above example, we invoked `tap-mysql` without providing a _state_ file -and without specifying a replication method. The two ways to replicate a given -table are `FULL_TABLE` and `INCREMENTAL`. +In the above example, we invoked `tap-mysql` without providing a _state_ file and without specifying a replication +method. The ways to replicate a given table are `FULL_TABLE`, `LOG_BASED` and `INCREMENTAL`. + +### LOG_BASED + +LOG_BASED replication makes use of the server's binary logs (binlogs), this method can work with primary +servers, the tap acts as a replica and requests the primary to stream log events,the tap then consumes events +pertaining to row changes (inserts, updates, deletes), binlog file rotate and gtid events. + +Log_based method always requires an initial sync to get a snapshot of the table and current binlog coordinates/gtid +position. + +The tap support two ways of consuming log events: using binlog coordinates or GTID, the default behavior is using +binlog coordinates, when turning the `use_gtid` flag, you have to specify the engine flavor (mariadb/mysql) due to +how different are the GTID implementations in these two engines. + +When enabling the `use_gtid` flag and the engine is MariaDB, the tap will dynamically infer the GTID pos from +existing binlog coordinate in the state, if the engine is mysql, it will fail. + +#### State when using binlog coordinates +```json +{ + "bookmarks": { + "example_db-table1": {"log_file": "mysql-binlog.0003", "log_pos": 3244}, + "example_db-table2": {"log_file": "mysql-binlog.0001", "log_pos": 42}, + "example_db-table3": {"log_file": "mysql-binlog.0003", "log_pos": 100} + } +} +``` + +#### State when using GTID +```json +{ + "bookmarks": { + "example_db-table1": {"log_file": "mysql-binlog.0003", "log_pos": 3244, "gtid": "0:364864374:599"}, + "example_db-table2": {"log_file": "mysql-binlog.0001", "log_pos": 42, "gtid": "0:364864374:375"}, + "example_db-table3": {"log_file": "mysql-binlog.0003", "log_pos": 100, "gtid": "0:364864374:399"} + } +} +``` ### Full Table -Full-table replication extracts all data from the source table each time the tap -is invoked. +Full-table replication extracts all data from the source table each time the tap is invoked. ### Incremental diff --git a/tap_mysql/sync_strategies/binlog.py b/tap_mysql/sync_strategies/binlog.py index 939c706..9e40fa1 100644 --- a/tap_mysql/sync_strategies/binlog.py +++ b/tap_mysql/sync_strategies/binlog.py @@ -259,6 +259,7 @@ def row_to_singer_record(catalog_entry, version, db_column_map, row, time_extrac def calculate_gtid_bookmark( + mysql_conn: MySQLConnection, binlog_streams_map: Dict[str, Any], state: Dict, engine: str @@ -266,6 +267,7 @@ def calculate_gtid_bookmark( """ Finds the earliest bookmarked gtid in the state Args: + mysql_conn: instance of MySqlConnection binlog_streams_map: dictionary of selected streams state: state dict with bookmarks engine: the DB flavor mysql/mariadb @@ -298,16 +300,51 @@ def calculate_gtid_bookmark( min_seq_no = gtid_seq_no min_gtid = gtid - if not min_gtid: - raise Exception("Couldn't find any gtid in state bookmarks to resume logical replication") - LOGGER.info('The earliest bookmarked GTID found in the state is "%s", and will be used to resume replication', - min_gtid) + # Mariadb has a handy sql function BINLOG_GTID_POS to infer the gtid position from given binlog coordinates so + # we will use that, as for mysql, there is no such thing, the only available option is using the cli utility + # mysqlbinlog which we deemed as not nice to use, and we don't wanna make it a system requirement of this tap, + # hence, this functionality of inferring gtid is not implemented for it. + + if engine != connection.MARIADB_ENGINE: + raise Exception("Couldn't find any gtid in state bookmarks to resume logical replication") + + LOGGER.info("Couldn't find a gtid in state, will try to infer one from binlog coordinates if they exist ..") + log_file, log_pos = calculate_bookmark(mysql_conn, binlog_streams_map, state) + + if not (log_file and log_pos): + raise Exception("No binlog coordinates in state to infer gtid position! Cannot resume logical replication") + + min_gtid = _find_gtid_by_binlog_coordinates(mysql_conn, log_file, log_pos) + + LOGGER.info('The inferred GTID is "%s", it will be used to resume replication', + min_gtid) + else: + LOGGER.info('The earliest bookmarked GTID found in the state is "%s", and will be used to resume replication', + min_gtid) return min_gtid +def _find_gtid_by_binlog_coordinates(mysql_conn: MySQLConnection, log_file: str, log_pos: int) -> str: + """ + Finds the equivalent gtid position from the given binlog file and pos. + This only works on MariaDB + + Args: + mysql_conn: instance of MySQLConnection + log_file: a binlog file + log_pos: a position in the log file + + Returns: gtid position + """ + with connect_with_backoff(mysql_conn) as open_conn: + with open_conn.cursor() as cur: + cur.execute(f"select BINLOG_GTID_POS('{log_file}', {log_pos});") + return cur.fetchone()[0] + + def get_min_log_pos_per_log_file(binlog_streams_map, state) -> Dict[str, Dict]: min_log_pos_per_file = {} @@ -808,7 +845,7 @@ def sync_binlog_stream( log_file = log_pos = gtid = None if config['use_gtid']: - gtid = calculate_gtid_bookmark(binlog_streams_map, state, config['engine']) + gtid = calculate_gtid_bookmark(mysql_conn, binlog_streams_map, state, config['engine']) else: log_file, log_pos = calculate_bookmark(mysql_conn, binlog_streams_map, state) diff --git a/tests/integration/test_tap_mysql.py b/tests/integration/test_tap_mysql.py index aaa02fa..0af047d 100644 --- a/tests/integration/test_tap_mysql.py +++ b/tests/integration/test_tap_mysql.py @@ -8,7 +8,7 @@ import tap_mysql import tap_mysql.discover_utils -from tap_mysql.connection import connect_with_backoff, MySQLConnection, fetch_server_id, MYSQL_ENGINE +from tap_mysql.connection import connect_with_backoff, MySQLConnection, fetch_server_id, MYSQL_ENGINE, MARIADB_ENGINE try: import tests.integration.utils as test_utils @@ -595,7 +595,7 @@ def setUp(self): 'selected': True, 'table-key-properties': [], 'database-name': 'tap_mysql_test' - }}, + }}, {'breadcrumb': ('properties', 'val'), 'metadata': {'selected': True}} ] @@ -721,9 +721,12 @@ def setUp(self): ctime time, cjson json) """) - cursor.execute('INSERT INTO binlog_1 (id, updated, created_date) VALUES (1, \'2017-06-01\', current_date())') - cursor.execute('INSERT INTO binlog_1 (id, updated, created_date) VALUES (2, \'2017-06-20\', current_date())') - cursor.execute('INSERT INTO binlog_1 (id, updated, created_date) VALUES (3, \'2017-09-22\', current_date())') + cursor.execute( + 'INSERT INTO binlog_1 (id, updated, created_date) VALUES (1, \'2017-06-01\', current_date())') + cursor.execute( + 'INSERT INTO binlog_1 (id, updated, created_date) VALUES (2, \'2017-06-20\', current_date())') + cursor.execute( + 'INSERT INTO binlog_1 (id, updated, created_date) VALUES (3, \'2017-09-22\', current_date())') cursor.execute('INSERT INTO binlog_2 (id, updated, ctime, cjson) VALUES (1, \'2017-10-22\', ' 'current_time(), \'[{"key1": "A", "key2": ["B", 2], "key3": {}}]\')') cursor.execute('INSERT INTO binlog_2 (id, updated, ctime, cjson) VALUES (2, \'2017-11-10\', ' @@ -748,7 +751,7 @@ def setUp(self): 'selected': True, 'database-name': 'tap_mysql_test', 'table-key-properties': ['id'] - }}, + }}, {'breadcrumb': ('properties', 'id'), 'metadata': {'selected': True}}, {'breadcrumb': ('properties', 'updated'), 'metadata': {'selected': True}} ] @@ -829,13 +832,13 @@ def test_fail_on_view(self): state = {} - expected_exception_message = "Unable to replicate stream(tap_mysql_test-{}) with binlog because it is a view.".\ + expected_exception_message = "Unable to replicate stream(tap_mysql_test-{}) with binlog because it is a view.". \ format(self.catalog.streams[0].stream) with self.assertRaises(Exception) as context: tap_mysql.do_sync(self.conn, {}, self.catalog, state) - self.assertEqual(expected_exception_message, str(context.exception)) + self.assertEqual(expected_exception_message, str(context.exception)) def test_fail_if_log_file_does_not_exist(self): log_file = 'chicken' @@ -850,15 +853,13 @@ def test_fail_if_log_file_does_not_exist(self): } } - expected_exception_message = "Unable to replicate stream({}) with binlog because log file {} does not exist.".format( - stream, - log_file - ) + expected_exception_message = "Unable to replicate binlog stream because the following binary log(s) no " \ + "longer exist: {}".format(log_file) with self.assertRaises(Exception) as context: tap_mysql.do_sync(self.conn, {}, self.catalog, state) - self.assertEqual(expected_exception_message, str(context.exception)) + self.assertEqual(expected_exception_message, str(context.exception)) def test_binlog_stream(self): global SINGER_MESSAGES @@ -927,7 +928,8 @@ def test_binlog_stream_with_alteration(self): with open_conn.cursor() as cursor: cursor.execute('ALTER TABLE binlog_1 add column data blob;') cursor.execute('ALTER TABLE binlog_1 add column is_cancelled boolean;') - cursor.execute('INSERT INTO binlog_1 (id, updated, is_cancelled, data) VALUES (2, \'2017-06-20\', true, \'blob content\')') + cursor.execute( + 'INSERT INTO binlog_1 (id, updated, is_cancelled, data) VALUES (2, \'2017-06-20\', true, \'blob content\')') cursor.execute('INSERT INTO binlog_1 (id, updated, is_cancelled) VALUES (3, \'2017-09-21\', false)') cursor.execute('INSERT INTO binlog_2 (id, updated) VALUES (3, \'2017-12-10\')') cursor.execute('ALTER TABLE binlog_1 change column updated date_updated datetime;') @@ -1068,6 +1070,97 @@ def test_binlog_stream_with_gtid(self): self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_2', 'log_pos')) self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_2', 'gtid')) + def test_binlog_stream_switching_from_binlog_to_gtid_with_mysql_fails(self): + global SINGER_MESSAGES + + engine = os.getenv('TAP_MYSQL_ENGINE', MYSQL_ENGINE) + + if engine != MYSQL_ENGINE: + self.skipTest('This test is only meant for Mysql flavor') + + log_file, log_pos = binlog.fetch_current_log_file_and_pos(self.conn) + + self.state = singer.write_bookmark(self.state, + 'tap_mysql_test-binlog_1', + 'log_file', + log_file) + + self.state = singer.write_bookmark(self.state, + 'tap_mysql_test-binlog_2', + 'log_pos', + log_pos) + + config = test_utils.get_db_config() + + config['use_gtid'] = True + config['engine'] = engine + + with self.assertRaises(Exception) as context: + tap_mysql.do_sync(self.conn, config, self.catalog, self.state) + + self.assertEqual("Couldn't find any gtid in state bookmarks to resume logical replication", + str(context.exception)) + + def test_binlog_stream_switching_from_binlog_to_gtid_with_mariadb_success(self): + global SINGER_MESSAGES + + engine = os.getenv('TAP_MYSQL_ENGINE', MYSQL_ENGINE) + + if engine != MARIADB_ENGINE: + self.skipTest('This test is only meant for Mariadb flavor') + + config = test_utils.get_db_config() + + config['use_gtid'] = True + config['engine'] = engine + + tap_mysql.do_sync(self.conn, config, self.catalog, self.state) + + record_messages = list(filter(lambda m: isinstance(m, singer.RecordMessage), SINGER_MESSAGES)) + + message_types = [type(m) for m in SINGER_MESSAGES] + self.assertEqual(message_types, + [singer.StateMessage, + singer.SchemaMessage, + singer.SchemaMessage, + singer.RecordMessage, + singer.RecordMessage, + singer.RecordMessage, + singer.RecordMessage, + singer.RecordMessage, + singer.RecordMessage, + singer.RecordMessage, + singer.RecordMessage, + singer.RecordMessage, + singer.RecordMessage, + singer.StateMessage, + ]) + + self.assertEqual([ + ('tap_mysql_test-binlog_1', 1, False), + ('tap_mysql_test-binlog_1', 2, False), + ('tap_mysql_test-binlog_1', 3, False), + ('tap_mysql_test-binlog_2', 1, False), + ('tap_mysql_test-binlog_2', 2, False), + ('tap_mysql_test-binlog_2', 3, False), + ('tap_mysql_test-binlog_1', 3, False), + ('tap_mysql_test-binlog_2', 2, False), + ('tap_mysql_test-binlog_1', 2, True), + ('tap_mysql_test-binlog_2', 1, True), + ], + [(m.stream, + m.record['id'], + m.record.get(binlog.SDC_DELETED_AT) is not None) + for m in record_messages]) + + self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_1', 'log_file')) + self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_1', 'log_pos')) + self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_1', 'gtid')) + + self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_2', 'log_file')) + self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_2', 'log_pos')) + self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_2', 'gtid')) + class TestViews(unittest.TestCase): def setUp(self): @@ -1133,7 +1226,7 @@ def setUp(self): 'selected': True, 'table-key-properties': [], 'database-name': 'tap_mysql_test' - }}, + }}, {'breadcrumb': ('properties', 'b c'), 'metadata': {'selected': True}} ] @@ -1209,7 +1302,6 @@ def test_primary_key_is_in_metadata(self): self.assertEqual(primary_keys, {'good_pk_tab': ['good_pk']}) def test_sync_messages_are_correct(self): - self.catalog.streams[0] = test_utils.set_replication_method_and_key(self.catalog.streams[0], 'LOG_BASED', None) self.catalog.streams[0] = test_utils.set_selected(self.catalog.streams[0], True) @@ -1360,7 +1452,6 @@ def setUp(self): self.catalog = test_utils.discover_catalog(self.conn, {}) def test_sync_messages_are_correct(self): - self.catalog.streams[0] = test_utils.set_replication_method_and_key(self.catalog.streams[0], 'FULL_TABLE', None) self.catalog.streams[0] = test_utils.set_selected(self.catalog.streams[0], True) diff --git a/tests/unit/sync_strategies/test_binlog.py b/tests/unit/sync_strategies/test_binlog.py index d1b5ec4..82c92ab 100644 --- a/tests/unit/sync_strategies/test_binlog.py +++ b/tests/unit/sync_strategies/test_binlog.py @@ -7,7 +7,7 @@ from collections import namedtuple from typing import Dict from unittest import TestCase -from unittest.mock import patch, Mock, call, MagicMock +from unittest.mock import patch, Mock, call, MagicMock, PropertyMock from pymysql import InternalError from pymysql.cursors import Cursor @@ -1670,12 +1670,11 @@ def test_verify_binlog_config_fail_if_not_FULL(self, connect_with_backoff): connect_with_backoff.return_value = mysql_con - with self.assertRaises(Exception) as ex: + with self.assertRaises(Exception) as context: binlog.verify_binlog_config(mysql_con) - self.assertEqual("Unable to replicate binlog stream because binlog_row_image is " - f"not set to 'FULL': Not-FULL.", - str()) + self.assertEqual("Unable to replicate binlog stream because binlog_row_image is not set to 'FULL': " + "Not-FULL.", str(context.exception)) connect_with_backoff.assert_called_with(mysql_con) cur_mock.__enter__.return_value.execute.assert_has_calls( @@ -1699,12 +1698,11 @@ def test_verify_binlog_config_fail_if_not_ROW(self, connect_with_backoff): connect_with_backoff.return_value = mysql_con - with self.assertRaises(Exception) as ex: + with self.assertRaises(Exception) as context: binlog.verify_binlog_config(mysql_con) - self.assertEqual("Unable to replicate binlog stream because binlog_format is " - f"not set to 'ROW': Not-ROW.", - str()) + self.assertEqual("Unable to replicate binlog stream because binlog_format is not set to 'ROW': Not-ROW.", + str(context.exception)) connect_with_backoff.assert_called_with(mysql_con) cur_mock.__enter__.return_value.execute.assert_has_calls( @@ -1732,13 +1730,13 @@ def test_verify_binlog_config_fail_if_binlog_row_image_not_supported(self, conne connect_with_backoff.return_value = mysql_con - with self.assertRaises(Exception) as ex: + with self.assertRaises(Exception) as context: binlog.verify_binlog_config(mysql_con) - self.assertEqual("Unable to replicate binlog stream because binlog_row_image " - "system variable does not exist. MySQL version must be at " - "least 5.6.2 to use binlog replication.", - str()) + self.assertEqual("Unable to replicate binlog stream because binlog_row_image " + "system variable does not exist. MySQL version must be at " + "least 5.6.2 to use binlog replication.", + str(context.exception)) connect_with_backoff.assert_called_with(mysql_con) cur_mock.__enter__.return_value.execute.assert_has_calls( @@ -1785,10 +1783,10 @@ def test_verify_gtid_config_fail_if_not_on(self, connect_with_backoff): connect_with_backoff.return_value = mysql_con - with self.assertRaises(Exception) as ex: + with self.assertRaises(Exception) as context: binlog.verify_gtid_config(mysql_con) - self.assertEqual("Unable to replicate binlog stream because GTID mode is not enabled.") + self.assertEqual("Unable to replicate binlog stream because GTID mode is not enabled.", str(context.exception)) connect_with_backoff.assert_called_with(mysql_con) cur_mock.__enter__.return_value.execute.assert_has_calls( @@ -1832,10 +1830,10 @@ def test_fetch_current_log_file_and_pos_fail_if_no_result(self, connect_with_bac connect_with_backoff.return_value = mysql_con - with self.assertRaises(Exception) as ex: + with self.assertRaises(Exception) as context: binlog.fetch_current_log_file_and_pos(mysql_con) - self.assertEqual('MySQL binary logging is not enabled.', str(ex)) + self.assertEqual('MySQL binary logging is not enabled.', str(context.exception)) connect_with_backoff.assert_called_with(mysql_con) cur_mock.__enter__.return_value.execute.assert_has_calls( @@ -1916,10 +1914,10 @@ def test_fetch_current_gtid_pos_for_mariadb_no_gtid_found_expect_exception( connect_with_backoff.return_value = mysql_con fetch_server_id.return_value = 2 - with self.assertRaises(Exception) as ex: + with self.assertRaises(Exception) as context: binlog.fetch_current_gtid_pos(mysql_con, connection.MARIADB_ENGINE) - self.assertIn('No suitable GTID was found for server', str(ex)) + self.assertIn('GTID is not present on this server!', str(context.exception)) connect_with_backoff.assert_called_with(mysql_con) fetch_server_id.assert_called_with(mysql_con) @@ -1947,10 +1945,10 @@ def test_fetch_current_gtid_pos_no_gtid_found_for_given_server_expect_exception( fetch_server_id.return_value = 2 - with self.assertRaises(Exception) as ex: + with self.assertRaises(Exception) as context: binlog.fetch_current_gtid_pos(mysql_con, connection.MARIADB_ENGINE) - self.assertIn('No suitable GTID was found for server', str(ex)) + self.assertIn('No suitable GTID was found for server', str(context.exception)) connect_with_backoff.assert_called_with(mysql_con) fetch_server_id.assert_called_with(mysql_con) @@ -1979,11 +1977,14 @@ def test_calculate_gtid_bookmark_for_mariadb_returns_earliest(self): } } - result = binlog.calculate_gtid_bookmark(binlog_streams, state, connection.MARIADB_ENGINE) + mysql_conn = Mock(spec_set=MySQLConnection) + + result = binlog.calculate_gtid_bookmark(mysql_conn, binlog_streams, state, connection.MARIADB_ENGINE) self.assertEqual(result, '0-20-12') - def test_calculate_gtid_bookmark_for_mariadb_no_gtid_found_expect_exception(self): + @patch('tap_mysql.sync_strategies.binlog.connect_with_backoff') + def test_calculate_gtid_bookmark_for_mariadb_no_gtid_found_would_infer_from_binlog(self, connect_with_backoff): binlog_streams = { 'stream1': {'schema': {}}, @@ -1993,13 +1994,63 @@ def test_calculate_gtid_bookmark_for_mariadb_no_gtid_found_expect_exception(self state = { 'bookmarks': { + 'stream1': {'log_file': 'binlog.040', 'log_pos': 138}, + 'stream2': {'log_file': 'binlog.040', 'log_pos': 50}, + 'stream3': {'log_file': 'binlog.032', 'log_pos': 14}, } } + mysql_con = MagicMock(spec_set=MySQLConnection).return_value + cur_mock = MagicMock(spec_set=Cursor).return_value + cur_mock.__enter__.return_value.fetchone.return_value = [ + ['0-4-222'], + ] + cur_mock.__enter__.return_value.fetchall.return_value = [ + ('binlog.030',), + ('binlog.031',), + ('binlog.032',), + ('binlog.033',), + ('binlog.034',), + ('binlog.040',), + ('binlog.041',), + ] + + mysql_con.__enter__.return_value.cursor.return_value = cur_mock + + connect_with_backoff.return_value = mysql_con + + result = binlog.calculate_gtid_bookmark(mysql_con, binlog_streams, state, connection.MARIADB_ENGINE) + + cur_mock.__enter__.return_value.execute.assert_has_calls( + [ + call('SHOW BINARY LOGS'), + call("select BINLOG_GTID_POS('binlog.032', 14);"), + ] + ) + + @patch('tap_mysql.sync_strategies.binlog.calculate_bookmark') + @patch('tap_mysql.sync_strategies.binlog.connect_with_backoff') + def test_calculate_gtid_bookmark_for_mariadb_no_gtid_nor_binlog_found_expect_exception(self, + connect_with_backoff, + calculate_bookmark): + + binlog_streams = { + 'stream1': {'schema': {}}, + 'stream2': {'schema': {}}, + 'stream3': {'schema': {}}, + } + + state = { + 'bookmarks': {} + } + mysql_conn = Mock(spec_set=MySQLConnection) + connect_with_backoff.return_value = mysql_conn + calculate_bookmark.return_value = None, None - with self.assertRaises(Exception) as ex: - binlog.calculate_gtid_bookmark(binlog_streams, state, connection.MARIADB_ENGINE) + with self.assertRaises(Exception) as context: + binlog.calculate_gtid_bookmark(mysql_conn, binlog_streams, state, connection.MARIADB_ENGINE) - self.assertEqual("Couldn't find any gtid in state bookmarks to resume logical replication", str(ex)) + self.assertEqual("No binlog coordinates in state to infer gtid position! Cannot resume logical replication", + str(context.exception)) def test_calculate_gtid_bookmark_for_mysql_returns_earliest(self): @@ -2020,8 +2071,8 @@ def test_calculate_gtid_bookmark_for_mysql_returns_earliest(self): 'stream5': {}, } } - - result = binlog.calculate_gtid_bookmark(binlog_streams, state, connection.MYSQL_ENGINE) + mysql_conn = Mock(spec_set=MySQLConnection) + result = binlog.calculate_gtid_bookmark(mysql_conn, binlog_streams, state, connection.MYSQL_ENGINE) self.assertEqual(result, '3E11FA47-71CA-11E1-9E33-C80AA9429562:1-2') @@ -2034,11 +2085,12 @@ def test_calculate_gtid_bookmark_for_mysql_no_gtid_found_expect_exception(self): } state = { - 'bookmarks': { - } + 'bookmarks': {} } + mysql_conn = Mock(spec_set=MySQLConnection) - with self.assertRaises(Exception) as ex: - binlog.calculate_gtid_bookmark(binlog_streams, state, connection.MYSQL_ENGINE) + with self.assertRaises(Exception) as context: + binlog.calculate_gtid_bookmark(mysql_conn, binlog_streams, state, connection.MYSQL_ENGINE) - self.assertEqual("Couldn't find any gtid in state bookmarks to resume logical replication", str(ex)) + self.assertEqual("Couldn't find any gtid in state bookmarks to resume logical replication", + str(context.exception))