From 0db0b1ba0519a9cc2391e105000fb8c4d99483a6 Mon Sep 17 00:00:00 2001 From: Samira El Aabidi <54845154+Samira-El@users.noreply.github.com> Date: Fri, 19 May 2023 13:01:27 +0100 Subject: [PATCH] AP-1477: Bump python-mysql-replication (#156) --- README.md | 1 + setup.py | 2 +- tap_mysql/binlogstream.py | 18 ------------------ tap_mysql/sync_strategies/binlog.py | 8 ++++---- tests/unit/sync_strategies/test_binlog.py | 4 ++-- 5 files changed, 8 insertions(+), 25 deletions(-) delete mode 100644 tap_mysql/binlogstream.py diff --git a/README.md b/README.md index a9136e6..b5f94e5 100644 --- a/README.md +++ b/README.md @@ -472,6 +472,7 @@ GRANT ALL PRIVILEGES ON tap_mysql_test.* TO ; export TAP_MYSQL_PORT= export TAP_MYSQL_USER= export TAP_MYSQL_PASSWORD= + export TAP_MYSQL_ENGINE= ``` 3. Install python test dependencies in a virtual env diff --git a/setup.py b/setup.py index 9a05492..0c9e14b 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ 'pendulum==2.1.2', 'pipelinewise-singer-python==1.*', 'PyMySQL==1.0.2', - 'mysql-replication==0.30', + 'mysql-replication==0.40', 'plpygis==0.2.0', 'tzlocal==2.1', ], diff --git a/tap_mysql/binlogstream.py b/tap_mysql/binlogstream.py deleted file mode 100644 index de8afa0..0000000 --- a/tap_mysql/binlogstream.py +++ /dev/null @@ -1,18 +0,0 @@ -from pymysqlreplication import BinLogStreamReader - - -class CustomBinlogStreamReader(BinLogStreamReader): - """ - Custom BinLogStreamReader to override slave capabilities - """ - def _register_slave(self): - - # When replicating from Mariadb 10.6.12 using binlog coordinates, a slave capability < 4 triggers a bug in - # Mariadb, when it tries to replace GTID events with dummy ones. Given that this library understands GTID - # events, setting the capability to 4 circumvents this error. - # If the DB is mysql, this won't have any effect so no need to run this in a condition - cur = self._stream_connection.cursor() - cur.execute("SET @mariadb_slave_capability=4") - cur.close() - - super()._register_slave() diff --git a/tap_mysql/sync_strategies/binlog.py b/tap_mysql/sync_strategies/binlog.py index 853e4c8..f569ba3 100644 --- a/tap_mysql/sync_strategies/binlog.py +++ b/tap_mysql/sync_strategies/binlog.py @@ -14,6 +14,7 @@ from typing import Dict, Set, Union, Optional, Any, Tuple from plpygis import Geometry +from pymysqlreplication import BinLogStreamReader from pymysqlreplication.constants import FIELD_TYPE from pymysqlreplication.event import RotateEvent, MariadbGtidEvent, GtidEvent from pymysqlreplication.row_event import ( @@ -24,7 +25,6 @@ from singer import utils, Schema, metadata from tap_mysql import connection -from tap_mysql.binlogstream import CustomBinlogStreamReader from tap_mysql.connection import connect_with_backoff, make_connection_wrapper, MySQLConnection from tap_mysql.discover_utils import discover_catalog, desired_columns, should_run_discovery from tap_mysql.stream_utils import write_schema_message @@ -591,7 +591,7 @@ def __get_diff_in_columns_list( # pylint: disable=R1702,R0915 def _run_binlog_sync( mysql_conn: MySQLConnection, - reader: CustomBinlogStreamReader, + reader: BinLogStreamReader, binlog_streams_map: Dict, state: Dict, config: Dict, @@ -793,7 +793,7 @@ def create_binlog_stream_reader( log_file: Optional[str], log_pos: Optional[int], gtid_pos: Optional[str] -) -> CustomBinlogStreamReader: +) -> BinLogStreamReader: """ Create an instance of BinlogStreamReader with the right config @@ -851,7 +851,7 @@ def create_binlog_stream_reader( kwargs['log_pos'] = log_pos kwargs['resume_stream'] = True - return CustomBinlogStreamReader(**kwargs) + return BinLogStreamReader(**kwargs) def sync_binlog_stream( diff --git a/tests/unit/sync_strategies/test_binlog.py b/tests/unit/sync_strategies/test_binlog.py index deb959f..19da059 100644 --- a/tests/unit/sync_strategies/test_binlog.py +++ b/tests/unit/sync_strategies/test_binlog.py @@ -216,7 +216,7 @@ def test_sync_binlog_stream_with_log_file_and_pos(self, with patch('tap_mysql.sync_strategies.binlog.singer.write_message') as write_msg: write_msg.side_effect = lambda msg: singer_messages.append(msg) - with patch('tap_mysql.sync_strategies.binlog.CustomBinlogStreamReader', + with patch('tap_mysql.sync_strategies.binlog.BinLogStreamReader', autospec=True) as reader_mock: def iter_mock(_): log_files = [ @@ -996,7 +996,7 @@ def test_sync_binlog_stream_with_gtid(self, with patch('tap_mysql.sync_strategies.binlog.singer.write_message') as write_msg: write_msg.side_effect = lambda msg: singer_messages.append(msg) - with patch('tap_mysql.sync_strategies.binlog.CustomBinlogStreamReader', + with patch('tap_mysql.sync_strategies.binlog.BinLogStreamReader', autospec=True) as reader_mock: def iter_mock(_): log_files = [