diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 7e04cff..5973a53 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -13,15 +13,12 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [ 3.6, 3.7, 3.8 ] + python-version: [ 3.7, 3.8 ] steps: - name: Checkout repository uses: actions/checkout@v2 - - name: Build Mysql test container - run: docker-compose up -d --build test_mysql - - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v2 with: @@ -36,14 +33,73 @@ jobs: - name: Unit Tests run: make unit_test + + integration_test_mariadb: + name: Intergation tests with MariaDB database + runs-on: ubuntu-20.04 + strategy: + matrix: + python-version: [ 3.7, 3.8 ] + + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + - name: Build db test container + run: docker-compose up -d mariadb_server1 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + + - name: Setup virtual environment + run: make venv + - name: Integration Tests env: TAP_MYSQL_PORT: 3306 - TAP_MYSQL_USER: root - TAP_MYSQL_PASSWORD: my-secret-passwd + TAP_MYSQL_USER: replication_user + TAP_MYSQL_PASSWORD: secret123passwd + TAP_MYSQL_HOST: localhost + TAP_MYSQL_ENGINE: mariadb + LOGGING_CONF_FILE: ./sample_logging.conf + run: make integration_test + + - name: Shutdown db test container + run: docker-compose down mariadb_server1 + + integration_test_mysql: + name: Intergation tests with Mysql database + runs-on: ubuntu-20.04 + strategy: + matrix: + python-version: [ 3.7, 3.8 ] + + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + - name: Build db test container + run: docker-compose up -d mysql_server + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + + - name: Setup virtual environment + run: make venv + + - name: Integration Tests + env: + TAP_MYSQL_PORT: 3307 + TAP_MYSQL_USER: replication_user + TAP_MYSQL_PASSWORD: secret123passwd TAP_MYSQL_HOST: localhost + TAP_MYSQL_ENGINE: mysql LOGGING_CONF_FILE: ./sample_logging.conf run: make integration_test - - name: Shutdown Mysql test container - run: docker-compose down + - name: Shutdown db test container + run: docker-compose down mysql_server diff --git a/Makefile b/Makefile index 919d53c..eb21705 100644 --- a/Makefile +++ b/Makefile @@ -10,8 +10,8 @@ pylint: unit_test: . ./venv/bin/activate ;\ - nosetests -c .noserc --cover-min-percentage=35 tests/unit + nosetests -c .noserc --cover-min-percentage=42 tests/unit integration_test: . ./venv/bin/activate ;\ - nosetests -c .noserc --cover-min-percentage=87 tests/integration + nosetests -c .noserc --cover-min-percentage=85 tests/integration diff --git a/README.md b/README.md index b2f51e1..09eb5fc 100644 --- a/README.md +++ b/README.md @@ -61,31 +61,29 @@ mysql> select * from example_db.animals; ### Create the configuration file -Create a config file containing the database connection credentials, e.g.: +Create a config file containing the database connection credentials, see [sample](config.json.sample). + +List of config parameters: + +| Parameter | type | required | default | description | +|--------------|-------------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------| +| host | string | yes | - | mysql/mariadb host | +| port | int | yes | - | mysql/mariadb port | +| user | string | yes | - | db username | +| password | string | yes | - | db password | +| cursorclass | string | No | `pymysql.cursors.SSCursor` | set cursorclass used by PyMYSQL | +| database | string | No | - | Database to use, None to not use a particular one. Used by PyMYSQL | +| server_id | int | False | Randomly generated int | Used as the slave id when this tap is connecting to the server | +| filter_db | string | False | - | Comma separated list of schemas to extract tables only from particular schemas and to improve data extraction performance | +| use_gtid | bool | False | False | Flag to enable log based replication using GTID | +| engine | string ('mysql' or 'mariadb') | False | 'mysql' | Indicate which flavor the server is, used for LOG_BASED with GTID | +| ssl | string ("true") | No | False | Enable SSL connection | +| ssl_ca | string | No | - | for self-signed SSL | +| ssl_cert | string | No | - | for self-signed SSL | +| ssl_key | string | No | - | for self-signed SSL | +| internal_hostname | string | No | - | Override match hostname for google cloud | +| session_sqls | List of strings | No | ```['SET @@session.time_zone="+0:00"', 'SET @@session.wait_timeout=28800', 'SET @@session.net_read_timeout=3600', 'SET @@session.innodb_lock_wait_timeout=3600']``` | Set session variables dynamically.| -```json -{ - "host": "localhost", - "port": "3306", - "user": "root", - "password": "password" -} -``` - -These are the same basic configuration properties used by the MySQL command-line -client (`mysql`). - -### Optional config parameters - -`filter_db`: Comma separated list of schemas to extract tables only from particular schemas and to improve data extraction performance - -`session_sqls`: List of SQL commands to run when a connection made. This allows to set session variables dynamically, like timeouts. If not set then the following commands will be executed: -``` -SET @@session.time_zone="+0:00" -SET @@session.wait_timeout=28800 -SET @@session.net_read_timeout=3600 -SET @@session.innodb_lock_wait_timeout=3600 -``` ### Discovery mode diff --git a/config.json.sample b/config.json.sample new file mode 100644 index 0000000..1c90bea --- /dev/null +++ b/config.json.sample @@ -0,0 +1,23 @@ +{ + "host": "", + "port": "", + "user": "", + "password": "", + "filter_db": "db1,db2", + "engine": "", + "use_gtid": , + "cursorclass": "" + "server_id": "", + "database": "", + "ssl": "", + "ssl_ca": "", + "ssl_cert": "", + "ssl_key": "", + "internal_hostname": "", + "session_sqls": [ + "SET @@session.time_zone=\"+0:00\"", + "SET @@session.wait_timeout=28800", + "SET @@session.net_read_timeout=3600", + "SET @@session.innodb_lock_wait_timeout=3600" + ] +} \ No newline at end of file diff --git a/dev/init.sql b/dev/init.sql new file mode 100644 index 0000000..52c0d88 --- /dev/null +++ b/dev/init.sql @@ -0,0 +1,38 @@ +# configure replication user +grant replication client on *.* to 'replication_user'@'%'; +grant replication slave on *.* to 'replication_user'@'%'; +flush privileges; + +use tap_mysql_test; + +# create objects +create table r1 ( + i1 int auto_increment primary key, + c1 varchar(100), + d1 datetime default current_timestamp() +); + +select * from r1; + +insert into r1 (c1) values ('#1'),('#2'),('#3'),('#4'),('#5'),('#6'),('#7'); +insert into r1 (c1) values ('#8'),('#9'),('#10'),('#11'),('#12'),('#13'),('#14'); +insert into r1 (c1) values ('#15'),('#16'),('#17'),('#18'); + +update r1 set c1=concat(c1, '- updated 1') where i1 < 10; + +create table r2 ( + i2 int primary key, + d2 datetime +) ; +insert into r2 (i2, d2) values (1, now()), (2, now()), (3, now()), (4, now()); + +update r1 set c1=concat(c1, '- update 2') where i1 >= 10; + +select * from r2; + +delete from r1 where i1 < 4; + +drop table r2; + +alter table r1 add column b1 bool default False; +insert into r1 (c1, b1) values ('#8', True); diff --git a/dev/mariadb_logical_replication_notes.md b/dev/mariadb_logical_replication_notes.md new file mode 100644 index 0000000..d767d6b --- /dev/null +++ b/dev/mariadb_logical_replication_notes.md @@ -0,0 +1,75 @@ +# Guide for how to setup MariaDB logical replication + +## Create a replica server + +1. Create a new Mariadb server by adding the following bits to [docker-compose](../docker-compose.yml): +```yaml + mariadb_server2: + container_name: "mariadb_server2" + image: mariadb:10.6 + environment: + MYSQL_ROOT_PASSWORD: my-secret-passwd + MYSQL_USER: replication_user + MYSQL_PASSWORD: secret123passwd + ports: + - "3307:3306" + command: | + --server-id=2 + --default-authentication-plugin=mysql_native_password + --binlog-format=row + networks: + - mariadb-cluster +``` + +2. Freeze all activity on server 1 (primary) and get current binlog file and pos +```mariadb +FLUSH TABLES WITH READ LOCK; + +show master status ; +``` + +Save binlog file and pos somewhere. + +3. Export all objects/data from primary into a file: + + * Log into the container: `docker exec -it mariadb_server1 bash` + * Export objects/data: `mysqldump -B mydb --master-data --gtid > masterfull.sql -p` + * Copy export file into your host: `docker cp mariadb_server1:/masterfull.sql .` + +2. Import the exported file into server 2 + * Copy exported file into server 2: `docker cp masterfull.sql mariadb_server2:/` + * Log into the container: `docker exec -it mariadb_server2 bash` + * Export objects/data: `mysql -p < masterfull.sql` + +3. Setup replica +```mariadb + +stop slave; +reset master; + +CHANGE MASTER TO + MASTER_HOST='mariadb_primary', + MASTER_USER='replication_user', + MASTER_PASSWORD='secret123passwd', + MASTER_LOG_FILE='', + MASTER_LOG_POS=, + MASTER_PORT=3306, + MASTER_CONNECT_RETRY=10 +; + +SELECT @@gtid_slave_pos; -- check this is set + +start slave; + +show slave status; -- should say Slave is waiting for master to send events +``` +4. Unfreeze activity on primary +```mariadb +UNLOCK TABLES; +``` + +5. Test replication is working fine by writing some data into an existing table: +```mariadb +Insert into table_x(id) values(1); +``` +Check replica has this new row in `table_x` diff --git a/docker-compose.yml b/docker-compose.yml index 8e3a3a2..c36160a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,12 +1,54 @@ version: '3.7' services: - test_mysql: - container_name: "pipelinewise-tap-mysql-db" + mariadb_server1: + container_name: "mariadb_server1" image: mariadb:10.6 + volumes: + - ./dev/init.sql:/docker-entrypoint-initdb.d/init.sql environment: MYSQL_ROOT_PASSWORD: my-secret-passwd MYSQL_DATABASE: tap_mysql_test + MYSQL_USER: replication_user + MYSQL_PASSWORD: secret123passwd ports: - - 3306:3306 - command: --default-authentication-plugin=mysql_native_password --log-bin=mysql-bin --binlog-format=ROW + - "3306:3306" + command: + --server-id=1 + --default-authentication-plugin=mysql_native_password + --log-bin=mysql-bin + --binlog-format=ROW + networks: + - mariadb-cluster + + mysql_server: + container_name: 'mysql_server' + image: mysql:5.7 + ports: + - "3307:3306" + environment: + MYSQL_ROOT_PASSWORD: master1 + MYSQL_DATABASE: tap_mysql_test + MYSQL_USER: replication_user + MYSQL_PASSWORD: secret123passwd + volumes: + - ./dev/init.sql:/docker-entrypoint-initdb.d/init.sql + command: + - --server-id=1 + - --gtid-mode=ON + - --enforce-gtid-consistency=ON + - --master-info-repository=TABLE + - --relay-log-info-repository=TABLE + - --binlog-checksum=NONE + - --log-slave-updates=ON + - --log-bin=binlog + - --binlog-format=ROW + networks: + - mariadb-cluster + +networks: + mariadb-cluster: + ipam: + driver: default + config: + - subnet: 172.100.0.0/24 diff --git a/tap_mysql/__init__.py b/tap_mysql/__init__.py index f692428..63641ad 100644 --- a/tap_mysql/__init__.py +++ b/tap_mysql/__init__.py @@ -1,22 +1,21 @@ -#!/usr/bin/env python3 # pylint: disable=missing-docstring,too-many-locals import copy import pymysql import singer -from singer import metrics +from typing import Dict from singer import metadata, get_logger +from singer import metrics from singer.catalog import Catalog +from tap_mysql.connection import connect_with_backoff, MySQLConnection, fetch_server_id, MYSQL_ENGINE +from tap_mysql.discover_utils import discover_catalog, resolve_catalog +from tap_mysql.stream_utils import write_schema_message from tap_mysql.sync_strategies import binlog from tap_mysql.sync_strategies import common from tap_mysql.sync_strategies import full_table from tap_mysql.sync_strategies import incremental -from tap_mysql.connection import connect_with_backoff, MySQLConnection -from tap_mysql.discover_utils import discover_catalog, resolve_catalog -from tap_mysql.stream_utils import write_schema_message - LOGGER = get_logger('tap_mysql') REQUIRED_CONFIG_KEYS = [ @@ -78,6 +77,10 @@ def binlog_stream_requires_historical(catalog_entry, state): catalog_entry.tap_stream_id, 'log_pos') + gtid = singer.get_bookmark(state, + catalog_entry.tap_stream_id, + 'gtid') + max_pk_values = singer.get_bookmark(state, catalog_entry.tap_stream_id, 'max_pk_values') @@ -86,7 +89,7 @@ def binlog_stream_requires_historical(catalog_entry, state): catalog_entry.tap_stream_id, 'last_pk_fetched') - if (log_file and log_pos) and (not max_pk_values and not last_pk_fetched): + if ((log_file and log_pos) or gtid) and (not max_pk_values and not last_pk_fetched): return False return True @@ -200,9 +203,13 @@ def do_sync_incremental(mysql_conn, catalog_entry, state, columns): singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) -def do_sync_historical_binlog(mysql_conn, catalog_entry, state, columns): +# pylint: disable=too-many-arguments +def do_sync_historical_binlog(mysql_conn, catalog_entry, state, columns, use_gtid: bool, engine: str): binlog.verify_binlog_config(mysql_conn) + if use_gtid and engine == MYSQL_ENGINE: + binlog.verify_gtid_config(mysql_conn) + is_view = common.get_is_view(catalog_entry) if is_view: @@ -216,6 +223,12 @@ def do_sync_historical_binlog(mysql_conn, catalog_entry, state, columns): catalog_entry.tap_stream_id, 'log_pos') + gtid = None + if use_gtid: + gtid = singer.get_bookmark(state, + catalog_entry.tap_stream_id, + 'gtid') + max_pk_values = singer.get_bookmark(state, catalog_entry.tap_stream_id, 'max_pk_values') @@ -224,10 +237,9 @@ def do_sync_historical_binlog(mysql_conn, catalog_entry, state, columns): stream_version = common.get_stream_version(catalog_entry.tap_stream_id, state) - if log_file and log_pos and max_pk_values: + if max_pk_values and ((use_gtid and gtid) or (log_file and log_pos)): LOGGER.info("Resuming initial full table sync for LOG_BASED stream %s", catalog_entry.tap_stream_id) full_table.sync_table(mysql_conn, catalog_entry, state, columns, stream_version) - else: LOGGER.info("Performing initial full table sync for LOG_BASED stream %s", catalog_entry.tap_stream_id) @@ -237,13 +249,18 @@ def do_sync_historical_binlog(mysql_conn, catalog_entry, state, columns): False) current_log_file, current_log_pos = binlog.fetch_current_log_file_and_pos(mysql_conn) + + current_gtid = None + if use_gtid: + current_gtid = binlog.fetch_current_gtid_pos(mysql_conn, engine) + state = singer.write_bookmark(state, catalog_entry.tap_stream_id, 'version', stream_version) if full_table.pks_are_auto_incrementing(mysql_conn, catalog_entry): - # We must save log_file and log_pos across FULL_TABLE syncs when using + # We must save log_file, log_pos, gtid across FULL_TABLE syncs when using # an incrementing PK state = singer.write_bookmark(state, catalog_entry.tap_stream_id, @@ -255,6 +272,12 @@ def do_sync_historical_binlog(mysql_conn, catalog_entry, state, columns): 'log_pos', current_log_pos) + if current_gtid: + state = singer.write_bookmark(state, + catalog_entry.tap_stream_id, + 'gtid', + current_gtid) + full_table.sync_table(mysql_conn, catalog_entry, state, columns, stream_version) else: @@ -269,6 +292,12 @@ def do_sync_historical_binlog(mysql_conn, catalog_entry, state, columns): 'log_pos', current_log_pos) + if current_gtid: + state = singer.write_bookmark(state, + catalog_entry.tap_stream_id, + 'gtid', + current_gtid) + def do_sync_full_table(mysql_conn, catalog_entry, state, columns): LOGGER.info("Stream %s is using full table replication", catalog_entry.stream) @@ -290,7 +319,7 @@ def do_sync_full_table(mysql_conn, catalog_entry, state, columns): singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) -def sync_non_binlog_streams(mysql_conn, non_binlog_catalog, state): +def sync_non_binlog_streams(mysql_conn, non_binlog_catalog, state, use_gtid, engine): for catalog_entry in non_binlog_catalog.streams: columns = list(catalog_entry.schema.properties.keys()) @@ -318,7 +347,7 @@ def sync_non_binlog_streams(mysql_conn, non_binlog_catalog, state): if replication_method == 'INCREMENTAL': do_sync_incremental(mysql_conn, catalog_entry, state, columns) elif replication_method == 'LOG_BASED': - do_sync_historical_binlog(mysql_conn, catalog_entry, state, columns) + do_sync_historical_binlog(mysql_conn, catalog_entry, state, columns, use_gtid, engine) elif replication_method == 'FULL_TABLE': do_sync_full_table(mysql_conn, catalog_entry, state, columns) else: @@ -340,10 +369,19 @@ def sync_binlog_streams(mysql_conn, binlog_catalog, config, state): def do_sync(mysql_conn, config, catalog, state): + + config['use_gtid'] = config.get('use_gtid', False) + config['engine'] = config.get('engine', MYSQL_ENGINE).lower() + non_binlog_catalog = get_non_binlog_streams(mysql_conn, catalog, config, state) binlog_catalog = get_binlog_streams(mysql_conn, catalog, config, state) - sync_non_binlog_streams(mysql_conn, non_binlog_catalog, state) + sync_non_binlog_streams(mysql_conn, + non_binlog_catalog, + state, + config['use_gtid'], + config['engine'] + ) sync_binlog_streams(mysql_conn, binlog_catalog, config, state) @@ -394,7 +432,7 @@ def main_impl(): state = args.state or {} do_sync(mysql_conn, args.config, catalog, state) else: - LOGGER.info("No properties were selected") + raise ValueError("Hmm I don't know what to do! Neither discovery nor sync mode was selected.") def main(): diff --git a/tap_mysql/connection.py b/tap_mysql/connection.py index cfd59dc..46ef001 100644 --- a/tap_mysql/connection.py +++ b/tap_mysql/connection.py @@ -14,12 +14,15 @@ # We need to hold onto this for self-signed SSL MATCH_HOSTNAME = ssl.match_hostname +MARIADB_ENGINE = 'mariadb' +MYSQL_ENGINE = 'mysql' DEFAULT_SESSION_SQLS = ['SET @@session.time_zone="+0:00"', 'SET @@session.wait_timeout=28800', 'SET @@session.net_read_timeout=3600', 'SET @@session.innodb_lock_wait_timeout=3600'] + @backoff.on_exception(backoff.expo, (pymysql.err.OperationalError), max_tries=5, @@ -156,3 +159,35 @@ def __init__(self, *args, **kwargs): # pylint: disable=unused-argument connect_with_backoff(self) return ConnectionWrapper + + +def fetch_server_id(mysql_conn: MySQLConnection) -> int: + """ + Finds server ID + Args: + mysql_conn: Mysql connection instance + + Returns: server ID + """ + with connect_with_backoff(mysql_conn) as open_conn: + with open_conn.cursor() as cur: + cur.execute("SELECT @@server_id") + server_id = cur.fetchone()[0] + + return server_id + + +def fetch_server_uuid(mysql_conn: MySQLConnection) -> str: + """ + Finds server UUID + Args: + mysql_conn: Mysql connection instance + + Returns: server UUID + """ + with connect_with_backoff(mysql_conn) as open_conn: + with open_conn.cursor() as cur: + cur.execute("SELECT @@server_uuid") + server_uuid = cur.fetchone()[0] + + return server_uuid diff --git a/tap_mysql/discover_utils.py b/tap_mysql/discover_utils.py index 143fa7a..1cdd789 100644 --- a/tap_mysql/discover_utils.py +++ b/tap_mysql/discover_utils.py @@ -9,7 +9,7 @@ from singer import metadata, Schema, get_logger from singer.catalog import Catalog, CatalogEntry -from tap_mysql.connection import connect_with_backoff +from tap_mysql.connection import connect_with_backoff, MySQLConnection from tap_mysql.sync_strategies import common LOGGER = get_logger('tap_mysql') @@ -120,7 +120,7 @@ def should_run_discovery(column_names: Set[str], md_map: Dict[Tuple, Dict]) -> b return False -def discover_catalog(mysql_conn: Dict, dbs: str = None, tables: Optional[str] = None): +def discover_catalog(mysql_conn: MySQLConnection, dbs: str = None, tables: Optional[str] = None): """Returns a Catalog describing the structure of the database.""" if dbs: diff --git a/tap_mysql/sync_strategies/binlog.py b/tap_mysql/sync_strategies/binlog.py index 8fa0690..939c706 100644 --- a/tap_mysql/sync_strategies/binlog.py +++ b/tap_mysql/sync_strategies/binlog.py @@ -1,22 +1,25 @@ -#!/usr/bin/env python3 # pylint: disable=missing-function-docstring,too-many-arguments,too-many-branches import codecs import copy import datetime import json +import random import re +import socket import pymysql.connections import pymysql.err import pytz import singer import tzlocal -from typing import Dict, Set, Union, Optional, Any +import tap_mysql.sync_strategies.common as common +import tap_mysql.connection as connection + +from typing import Dict, Set, Union, Optional, Any, Tuple from plpygis import Geometry from pymysqlreplication import BinLogStreamReader from pymysqlreplication.constants import FIELD_TYPE -from pymysqlreplication.event import RotateEvent -import tap_mysql.sync_strategies.common as common +from pymysqlreplication.event import RotateEvent, MariadbGtidEvent, GtidEvent from pymysqlreplication.row_event import ( DeleteRowsEvent, UpdateRowsEvent, @@ -31,10 +34,8 @@ LOGGER = singer.get_logger('tap_mysql') SDC_DELETED_AT = "_sdc_deleted_at" - UPDATE_BOOKMARK_PERIOD = 1000 - -BOOKMARK_KEYS = {'log_file', 'log_pos', 'version'} +BOOKMARK_KEYS = {'log_file', 'log_pos', 'version', 'gtid'} MYSQL_TIMESTAMP_TYPES = { FIELD_TYPE.TIMESTAMP, @@ -78,22 +79,21 @@ def verify_binlog_config(mysql_conn): f"not set to 'FULL': {binlog_row_image}.") -def verify_log_file_exists(mysql_conn, log_file, log_pos): +def verify_gtid_config(mysql_conn: MySQLConnection): + """ + Checks if GTID is enabled, raises exception if it's not + Args: + mysql_conn: instance of MySQLConnection + + Returns: None if gtid is enabled + """ with connect_with_backoff(mysql_conn) as open_conn: with open_conn.cursor() as cur: - cur.execute("SHOW BINARY LOGS") - result = cur.fetchall() - - existing_log_file = list(filter(lambda log: log[0] == log_file, result)) - - if not existing_log_file: - raise Exception(f"Unable to replicate binlog stream because log file {log_file} does not exist.") - - current_log_pos = existing_log_file[0][1] + cur.execute("select @@gtid_mode;") + binlog_format = cur.fetchone()[0] - if log_pos > current_log_pos: - raise Exception(f"Unable to replicate binlog stream because requested position ({log_pos}) " - f"for log file {log_file} is greater than current position ({current_log_pos}). ") + if binlog_format != 'ON': + raise Exception('Unable to replicate binlog stream because GTID mode is not enabled.') def fetch_current_log_file_and_pos(mysql_conn): @@ -111,13 +111,74 @@ def fetch_current_log_file_and_pos(mysql_conn): return current_log_file, current_log_pos -def fetch_server_id(mysql_conn): +def fetch_current_gtid_pos( + mysql_conn: MySQLConnection, + engine: str +) -> str: + """ + Find the given server's current GTID position. + + The sever we're connected to can have a comma separated list of gtids (e.g from past server migrations), + the right gtid is the one with the same server ID as the given server ID. + + Args: + mysql_conn: Mysql connection instance + engine: DB engine (mariadb/mysql) + + Returns: Gtid position if found, otherwise raises exception + """ + + if engine == connection.MARIADB_ENGINE: + server = str(connection.fetch_server_id(mysql_conn)) + else: + server = connection.fetch_server_uuid(mysql_conn) + with connect_with_backoff(mysql_conn) as open_conn: with open_conn.cursor() as cur: - cur.execute("SELECT @@server_id") - server_id = cur.fetchone()[0] - return server_id + if engine != connection.MARIADB_ENGINE: + cur.execute('select @@GLOBAL.gtid_executed;') + else: + cur.execute('select @@gtid_current_pos;') + + result = cur.fetchone() + + if result is None: + raise Exception("GTID is not present on this server!") + + gtids = result[0] + LOGGER.debug('Found GTID(s): %s in server %s', gtids, server) + + gtid_to_use = None + + for gtid in gtids.split(','): + gtid = gtid.strip() + + if not gtid: + continue + + if engine != connection.MARIADB_ENGINE: + gtid_parts = gtid.split(':') + + if len(gtid_parts) != 2: + continue + + if gtid_parts[0] == server: + gtid_to_use = gtid + else: + gtid_parts = gtid.split('-') + + if len(gtid_parts) != 3: + continue + + if gtid_parts[1] == server: + gtid_to_use = gtid + + if gtid_to_use: + LOGGER.info('Using GTID %s for state bookmark', gtid_to_use) + return gtid_to_use + + raise Exception(f'No suitable GTID was found for server {server}.') def json_bytes_to_string(data): @@ -127,6 +188,7 @@ def json_bytes_to_string(data): if isinstance(data, list): return list(map(json_bytes_to_string, data)) return data + # pylint: disable=too-many-locals def row_to_singer_record(catalog_entry, version, db_column_map, row, time_extracted): row_to_persist = {} @@ -196,7 +258,57 @@ def row_to_singer_record(catalog_entry, version, db_column_map, row, time_extrac time_extracted=time_extracted) -def get_min_log_pos_per_log_file(binlog_streams_map, state): +def calculate_gtid_bookmark( + binlog_streams_map: Dict[str, Any], + state: Dict, + engine: str +) -> str: + """ + Finds the earliest bookmarked gtid in the state + Args: + binlog_streams_map: dictionary of selected streams + state: state dict with bookmarks + engine: the DB flavor mysql/mariadb + + Returns: Min Gtid + """ + min_gtid = None + min_seq_no = None + + for tap_stream_id, bookmark in state.get('bookmarks', {}).items(): + stream = binlog_streams_map.get(tap_stream_id) + + if not stream: + continue + + gtid = bookmark.get('gtid') + + if gtid: + if engine == connection.MARIADB_ENGINE: + gtid_seq_no = int(gtid.split('-')[2]) + else: + gtid_interval = gtid.split(':')[1] + + if '-' in gtid_interval: + gtid_seq_no = int(gtid_interval.split('-')[1]) + else: + gtid_seq_no = int(gtid_interval) + + if min_seq_no is None or gtid_seq_no < min_seq_no: + min_seq_no = gtid_seq_no + min_gtid = gtid + + + if not min_gtid: + raise Exception("Couldn't find any gtid in state bookmarks to resume logical replication") + + LOGGER.info('The earliest bookmarked GTID found in the state is "%s", and will be used to resume replication', + min_gtid) + + return min_gtid + + +def get_min_log_pos_per_log_file(binlog_streams_map, state) -> Dict[str, Dict]: min_log_pos_per_file = {} for tap_stream_id, bookmark in state.get('bookmarks', {}).items(): @@ -224,7 +336,7 @@ def get_min_log_pos_per_log_file(binlog_streams_map, state): return min_log_pos_per_file -def calculate_bookmark(mysql_conn, binlog_streams_map, state): +def calculate_bookmark(mysql_conn, binlog_streams_map, state) -> Tuple[str, int]: min_log_pos_per_file = get_min_log_pos_per_log_file(binlog_streams_map, state) with connect_with_backoff(mysql_conn) as open_conn: @@ -249,7 +361,29 @@ def calculate_bookmark(mysql_conn, binlog_streams_map, state): raise Exception("Unable to replicate binlog stream because no binary logs exist on the server.") -def update_bookmarks(state, binlog_streams_map, log_file, log_pos): +def update_bookmarks( + state: Dict, + binlog_streams_map: Dict, + log_file: str, + log_pos: int, + gtid: Optional[str]) -> Dict: + """ + Updates the state bookmarks with the given binlog file & position or GTID + Args: + state: state to update + binlog_streams_map: dictionary of log based streams + log_file: new binlog file + log_pos: new binlog pos + gtid: new gtid pos + + Returns: updated state + """ + LOGGER.debug('Updating state bookmark to binlog file and pos and GTID: %s, %d, %s', log_file, log_pos, gtid) + + if log_file and not log_pos: + raise ValueError("binlog_file is present but binlog_pos is null! Please provide a binlog position " + "to properly update the state") + for tap_stream_id in binlog_streams_map.keys(): state = singer.write_bookmark(state, tap_stream_id, @@ -261,6 +395,13 @@ def update_bookmarks(state, binlog_streams_map, log_file, log_pos): 'log_pos', log_pos) + # update gtid only if it's not null + if gtid: + state = singer.write_bookmark(state, + tap_stream_id, + 'gtid', + gtid) + return state @@ -283,7 +424,7 @@ def handle_write_rows_event(event, catalog_entry, state, columns, rows_saved, ti time_extracted) singer.write_message(record_message) - rows_saved = rows_saved + 1 + rows_saved += 1 return rows_saved @@ -304,7 +445,7 @@ def handle_update_rows_event(event, catalog_entry, state, columns, rows_saved, t singer.write_message(record_message) - rows_saved = rows_saved + 1 + rows_saved += 1 return rows_saved @@ -331,7 +472,7 @@ def handle_delete_rows_event(event, catalog_entry, state, columns, rows_saved, t singer.write_message(record_message) - rows_saved = rows_saved + 1 + rows_saved += 1 return rows_saved @@ -380,6 +521,7 @@ def __get_diff_in_columns_list( return set(binlog_columns_filtered).difference(schema_properties) + # pylint: disable=R1702,R0915 def _run_binlog_sync( mysql_conn: MySQLConnection, @@ -389,13 +531,13 @@ def _run_binlog_sync( config: Dict, end_log_file: str, end_log_pos: int): - time_extracted = utils.now() - rows_saved = 0 + processed_rows_events = 0 events_skipped = 0 log_file = None log_pos = None + gtid_pos = reader.auto_position # initial gtid, we set this when we created the reader's instance # A set to hold all columns that are detected as we sync but should be ignored cuz they are unsupported types. # Saving them here to avoid doing the check if we should ignore a column over and over again @@ -427,11 +569,40 @@ def _run_binlog_sync( break if isinstance(binlog_event, RotateEvent): + LOGGER.debug('RotateEvent: log_file=%s, log_pos=%d', + binlog_event.next_binlog, + binlog_event.position) + state = update_bookmarks(state, binlog_streams_map, binlog_event.next_binlog, - binlog_event.position) + binlog_event.position, + gtid_pos + ) + + elif isinstance(binlog_event, MariadbGtidEvent) or isinstance(binlog_event, GtidEvent): + gtid_pos = binlog_event.gtid + + LOGGER.debug('%s: gtid=%s', + binlog_event.__class__.__name__, + gtid_pos) + + state = update_bookmarks(state, + binlog_streams_map, + log_file, + log_pos, + gtid_pos + ) + + # There is strange behavior happening when using GTID in the pymysqlreplication lib, + # explained here: https://github.com/noplay/python-mysql-replication/issues/367 + # Fix: Updating the reader's auto-position to the newly encountered gtid means we won't have to restart + # consuming binlog from old GTID pos when connection to server is lost. + reader.auto_position = gtid_pos + else: + time_extracted = utils.now() + tap_stream_id = common.generate_tap_stream_id(binlog_event.schema, binlog_event.table) streams_map_entry = binlog_streams_map.get(tap_stream_id, {}) catalog_entry = streams_map_entry.get('catalog_entry') @@ -443,7 +614,7 @@ def _run_binlog_sync( if events_skipped % UPDATE_BOOKMARK_PERIOD == 0: LOGGER.debug("Skipped %s events so far as they were not for selected tables; %s rows extracted", events_skipped, - rows_saved) + processed_rows_events) else: # Compare event's columns to the schema properties diff = __get_diff_in_columns_list(binlog_event, @@ -502,50 +673,119 @@ def _run_binlog_sync( columns = new_columns if isinstance(binlog_event, WriteRowsEvent): - rows_saved = handle_write_rows_event(binlog_event, - catalog_entry, - state, - columns, - rows_saved, - time_extracted) + processed_rows_events = handle_write_rows_event(binlog_event, + catalog_entry, + state, + columns, + processed_rows_events, + time_extracted) elif isinstance(binlog_event, UpdateRowsEvent): - rows_saved = handle_update_rows_event(binlog_event, - catalog_entry, - state, - columns, - rows_saved, - time_extracted) + processed_rows_events = handle_update_rows_event(binlog_event, + catalog_entry, + state, + columns, + processed_rows_events, + time_extracted) elif isinstance(binlog_event, DeleteRowsEvent): - rows_saved = handle_delete_rows_event(binlog_event, - catalog_entry, - state, - columns, - rows_saved, - time_extracted) + processed_rows_events = handle_delete_rows_event(binlog_event, + catalog_entry, + state, + columns, + processed_rows_events, + time_extracted) else: LOGGER.debug("Skipping event for table %s.%s as it is not an INSERT, UPDATE, or DELETE", binlog_event.schema, binlog_event.table) # Update singer bookmark and send STATE message periodically - if ((rows_saved and rows_saved % UPDATE_BOOKMARK_PERIOD == 0) or + if ((processed_rows_events and processed_rows_events % UPDATE_BOOKMARK_PERIOD == 0) or (events_skipped and events_skipped % UPDATE_BOOKMARK_PERIOD == 0)): state = update_bookmarks(state, binlog_streams_map, log_file, - log_pos) + log_pos, + gtid_pos + ) singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) - LOGGER.info('Processed %s rows', rows_saved) + LOGGER.info('Processed %s rows', processed_rows_events) - # Update singer bookmark at the last time to point it the the last processed binlog event + # Update singer bookmark at the last time to point it the last processed binlog event if log_file and log_pos: state = update_bookmarks(state, binlog_streams_map, log_file, - log_pos) + log_pos, + gtid_pos) + + +def create_binlog_stream_reader( + config: Dict, + log_file: Optional[str], + log_pos: Optional[int], + gtid_pos: Optional[str] +) -> BinLogStreamReader: + """ + Create an instance of BinlogStreamReader with the right config + + Args: + config: dictionary of the content of tap config.json + log_file: binlog file name to start replication from (Optional if using gtid) + log_pos: binlog pos to start replication from (Optional if using gtid) + gtid_pos: GTID pos to start replication from (Optional if using log_file & pos) + + Returns: Instance of BinlogStreamReader + """ + if config.get('server_id'): + server_id = int(config.get('server_id')) + LOGGER.info("Using provided server_id=%s", server_id) + else: + server_id = random.randint(1, 2 ^ 32) # generate random server id for this slave + LOGGER.info("Using randomly generated server_id=%s", server_id) + + engine = config['engine'] + + kwargs = { + 'connection_settings': {}, + 'pymysql_wrapper': make_connection_wrapper(config), + 'is_mariadb': connection.MARIADB_ENGINE == engine, + 'server_id': server_id, # slave server ID + 'report_slave': socket.gethostname() or 'pipelinewise', # this is so this slave appears in SHOW SLAVE HOSTS; + 'only_events': [WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent], + } + + # only fetch events pertaining to the schemas in filter db. + if config.get('filter_db'): + kwargs['only_schemas'] = config['filter_db'].split(',') + + if config['use_gtid']: + + if not gtid_pos: + raise ValueError(f'gtid_pos is empty "{gtid_pos}"! Cannot start logical replication from empty gtid.') + + LOGGER.info("Starting logical replication from GTID '%s' on engine '%s'", gtid_pos, engine) + + # When using GTID, we want to listen in for GTID events and start from given gtid pos + kwargs['only_events'].extend([GtidEvent, MariadbGtidEvent]) + kwargs['auto_position'] = gtid_pos + + else: + if not log_file or not log_pos or log_pos < 0: + raise ValueError(f'log file or pos is empty ("{log_file}", "{log_pos}")! ' + f'Cannot start logical replication from invalid log file/pos.') + + LOGGER.info("Starting logical replication from binlog file ['%s', %d]", log_file, log_pos) + + # When not using GTID, we want to listen in for rotate events, and start from given log position and file + kwargs['only_events'].append(RotateEvent) + kwargs['log_file'] = log_file + kwargs['log_pos'] = log_pos + kwargs['resume_stream'] = True + + return BinLogStreamReader(**kwargs) def sync_binlog_stream( @@ -562,45 +802,38 @@ def sync_binlog_stream( binlog_streams_map: tables to stream using binlog state: the current state """ - for tap_stream_id in binlog_streams_map: common.whitelist_bookmark_keys(BOOKMARK_KEYS, tap_stream_id, state) - log_file, log_pos = calculate_bookmark(mysql_conn, binlog_streams_map, state) + log_file = log_pos = gtid = None - verify_log_file_exists(mysql_conn, log_file, log_pos) - - if config.get('server_id'): - server_id = int(config.get('server_id')) - LOGGER.info("Using provided server_id=%s", server_id) + if config['use_gtid']: + gtid = calculate_gtid_bookmark(binlog_streams_map, state, config['engine']) else: - server_id = fetch_server_id(mysql_conn) - LOGGER.info("No server_id provided, will use global server_id=%s", server_id) + log_file, log_pos = calculate_bookmark(mysql_conn, binlog_streams_map, state) - connection_wrapper = make_connection_wrapper(config) reader = None + try: - reader = BinLogStreamReader( - connection_settings={}, - server_id=server_id, - slave_uuid=f'pipelinewise-slave-{server_id}', - log_file=log_file, - log_pos=log_pos, - resume_stream=True, - only_events=[RotateEvent, WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent], - pymysql_wrapper=connection_wrapper - ) - - LOGGER.info("Starting binlog replication with log_file=%s, log_pos=%s", log_file, log_pos) + reader = create_binlog_stream_reader(config, log_file, log_pos, gtid) end_log_file, end_log_pos = fetch_current_log_file_and_pos(mysql_conn) LOGGER.info('Current Master binlog file and pos: %s %s', end_log_file, end_log_pos) _run_binlog_sync(mysql_conn, reader, binlog_streams_map, state, config, end_log_file, end_log_pos) + except pymysql.err.OperationalError as ex: + if ex.args[0] == 1236: + LOGGER.error('Cannot resume logical replication from given GTID %s! This GTID might date back to before ' + 'the new primary has been setup, connect to old primary and consume all binlog events to get ' + 'a newer GTID then switch back.', gtid) + + raise + finally: # BinLogStreamReader doesn't implement the `with` methods # So, try/finally will close the chain from the top - reader.close() + if reader: + reader.close() singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) diff --git a/tests/integration/test_tap_mysql.py b/tests/integration/test_tap_mysql.py index b7c459f..aaa02fa 100644 --- a/tests/integration/test_tap_mysql.py +++ b/tests/integration/test_tap_mysql.py @@ -1,3 +1,4 @@ +import os import unittest from unittest.mock import patch @@ -7,7 +8,7 @@ import tap_mysql import tap_mysql.discover_utils -from tap_mysql.connection import connect_with_backoff, MySQLConnection +from tap_mysql.connection import connect_with_backoff, MySQLConnection, fetch_server_id, MYSQL_ENGINE try: import tests.integration.utils as test_utils @@ -573,10 +574,13 @@ def setUp(self): with connect_with_backoff(self.conn) as open_conn: with open_conn.cursor() as cursor: - cursor.execute('CREATE TABLE incremental (val int, updated datetime)') - cursor.execute('INSERT INTO incremental (val, updated) VALUES (1, \'2017-06-01\')') - cursor.execute('INSERT INTO incremental (val, updated) VALUES (2, \'2017-06-20\')') - cursor.execute('INSERT INTO incremental (val, updated) VALUES (3, \'2017-09-22\')') + cursor.execute('CREATE TABLE incremental (val int, updated datetime, ctime time)') + cursor.execute('INSERT INTO incremental (val, updated, ctime) VALUES (1, \'2017-06-01\', ' + 'current_time())') + cursor.execute('INSERT INTO incremental (val, updated, ctime) VALUES (2, \'2017-06-20\', ' + 'current_time())') + cursor.execute('INSERT INTO incremental (val, updated, ctime) VALUES (3, \'2017-09-22\', ' + 'current_time())') cursor.execute('CREATE TABLE integer_incremental (val int, updated int)') cursor.execute('INSERT INTO integer_incremental (val, updated) VALUES (1, 1)') cursor.execute('INSERT INTO integer_incremental (val, updated) VALUES (2, 2)') @@ -699,16 +703,6 @@ def test_version_not_cleared_from_state_after_incremental_success(self): class TestBinlogReplication(unittest.TestCase): - # def tearDown(self) -> None: - # with connect_with_backoff(self.conn) as open_conn: - # with open_conn.cursor() as cursor: - # cursor.execute('DROP TABLE binlog_1;') - # cursor.execute('DROP TABLE binlog_2;') - # - # open_conn.commit() - # - # self.conn = None - def setUp(self): self.maxDiff = None self.state = {} @@ -718,14 +712,24 @@ def setUp(self): with connect_with_backoff(self.conn) as open_conn: with open_conn.cursor() as cursor: - cursor.execute('CREATE TABLE binlog_1 (id int, updated datetime)') - cursor.execute('CREATE TABLE binlog_2 (id int, updated datetime)') - cursor.execute('INSERT INTO binlog_1 (id, updated) VALUES (1, \'2017-06-01\')') - cursor.execute('INSERT INTO binlog_1 (id, updated) VALUES (2, \'2017-06-20\')') - cursor.execute('INSERT INTO binlog_1 (id, updated) VALUES (3, \'2017-09-22\')') - cursor.execute('INSERT INTO binlog_2 (id, updated) VALUES (1, \'2017-10-22\')') - cursor.execute('INSERT INTO binlog_2 (id, updated) VALUES (2, \'2017-11-10\')') - cursor.execute('INSERT INTO binlog_2 (id, updated) VALUES (3, \'2017-12-10\')') + cursor.execute('CREATE TABLE binlog_1 (id int, updated datetime, ' + 'created_date Date)') + cursor.execute(""" + CREATE TABLE binlog_2 (id int, + updated datetime, + is_good bool default False, + ctime time, + cjson json) + """) + cursor.execute('INSERT INTO binlog_1 (id, updated, created_date) VALUES (1, \'2017-06-01\', current_date())') + cursor.execute('INSERT INTO binlog_1 (id, updated, created_date) VALUES (2, \'2017-06-20\', current_date())') + cursor.execute('INSERT INTO binlog_1 (id, updated, created_date) VALUES (3, \'2017-09-22\', current_date())') + cursor.execute('INSERT INTO binlog_2 (id, updated, ctime, cjson) VALUES (1, \'2017-10-22\', ' + 'current_time(), \'[{"key1": "A", "key2": ["B", 2], "key3": {}}]\')') + cursor.execute('INSERT INTO binlog_2 (id, updated, ctime, cjson) VALUES (2, \'2017-11-10\', ' + 'current_time(), \'[{"key1": "A", "key2": ["B", 2], "key3": {}}]\')') + cursor.execute('INSERT INTO binlog_2 (id, updated, ctime, cjson) VALUES (3, \'2017-12-10\', ' + 'current_time(), \'[{"key1": "A", "key2": ["B", 2], "key3": {}}]\')') cursor.execute('UPDATE binlog_1 set updated=\'2018-06-18\' WHERE id = 3') cursor.execute('UPDATE binlog_2 set updated=\'2018-06-18\' WHERE id = 2') cursor.execute('DELETE FROM binlog_1 WHERE id = 2') @@ -766,12 +770,15 @@ def setUp(self): 'version', singer.utils.now()) + def tearDown(self) -> None: + global SINGER_MESSAGES + SINGER_MESSAGES.clear() + def test_initial_full_table(self): state = {} - binlog.fetch_current_log_file_and_pos(self.conn) global SINGER_MESSAGES - SINGER_MESSAGES.clear() + tap_mysql.do_sync(self.conn, {}, self.catalog, state) message_types = [type(m) for m in SINGER_MESSAGES] @@ -801,11 +808,13 @@ def test_initial_full_table(self): lambda m: isinstance(m, singer.ActivateVersionMessage) and m.stream == 'tap_mysql_test-binlog_2', SINGER_MESSAGES))[0] - self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_1', 'log_file')) - self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_1', 'log_pos')) + self.assertIsNotNone(singer.get_bookmark(state, 'tap_mysql_test-binlog_1', 'log_file')) + self.assertIsNotNone(singer.get_bookmark(state, 'tap_mysql_test-binlog_1', 'log_pos')) + self.assertIsNone(singer.get_bookmark(state, 'tap_mysql_test-binlog_1', 'gtid')) - self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_2', 'log_file')) - self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_2', 'log_pos')) + self.assertIsNotNone(singer.get_bookmark(state, 'tap_mysql_test-binlog_2', 'log_file')) + self.assertIsNotNone(singer.get_bookmark(state, 'tap_mysql_test-binlog_2', 'log_pos')) + self.assertIsNone(singer.get_bookmark(state, 'tap_mysql_test-binlog_2', 'gtid')) self.assertEqual(singer.get_bookmark(state, 'tap_mysql_test-binlog_1', 'version'), activate_version_message_1.version) @@ -853,7 +862,6 @@ def test_fail_if_log_file_does_not_exist(self): def test_binlog_stream(self): global SINGER_MESSAGES - SINGER_MESSAGES.clear() config = test_utils.get_db_config() config['server_id'] = "100" @@ -909,7 +917,6 @@ def test_binlog_stream(self): def test_binlog_stream_with_alteration(self): global SINGER_MESSAGES - SINGER_MESSAGES.clear() config = test_utils.get_db_config() config['server_id'] = "100" @@ -999,6 +1006,68 @@ def test_binlog_stream_with_alteration(self): self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_2', 'log_file')) self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_2', 'log_pos')) + def test_binlog_stream_with_gtid(self): + global SINGER_MESSAGES + + engine = os.getenv('TAP_MYSQL_ENGINE', MYSQL_ENGINE) + gtid = binlog.fetch_current_gtid_pos(self.conn, os.environ['TAP_MYSQL_ENGINE']) + + config = test_utils.get_db_config() + config['use_gtid'] = True + config['engine'] = engine + + self.state = singer.write_bookmark(self.state, + 'tap_mysql_test-binlog_1', + 'gtid', + gtid) + + self.state = singer.write_bookmark(self.state, + 'tap_mysql_test-binlog_2', + 'gtid', + gtid) + + with connect_with_backoff(self.conn) as open_conn: + with open_conn.cursor() as cursor: + cursor.execute('INSERT INTO binlog_1 (id, updated) VALUES (4, \'2022-06-20\')') + cursor.execute('INSERT INTO binlog_1 (id, updated) VALUES (5, \'2022-09-21\')') + cursor.execute('INSERT INTO binlog_2 (id, updated) VALUES (4, \'2017-12-10\')') + cursor.execute('delete from binlog_1 WHERE id = 3') + + open_conn.commit() + + tap_mysql.do_sync(self.conn, config, self.catalog, self.state) + + record_messages = list(filter(lambda m: isinstance(m, singer.RecordMessage), SINGER_MESSAGES)) + + message_types = [type(m) for m in SINGER_MESSAGES] + self.assertEqual(message_types, + [singer.StateMessage, + singer.SchemaMessage, + singer.SchemaMessage, + singer.RecordMessage, + singer.RecordMessage, + singer.RecordMessage, + singer.RecordMessage, + singer.StateMessage, + ]) + + self.assertEqual([('tap_mysql_test-binlog_1', 4, False), + ('tap_mysql_test-binlog_1', 5, False), + ('tap_mysql_test-binlog_2', 4, False), + ('tap_mysql_test-binlog_1', 3, True)], + [(m.stream, + m.record['id'], + m.record.get(binlog.SDC_DELETED_AT) is not None) + for m in record_messages]) + + self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_1', 'log_file')) + self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_1', 'log_pos')) + self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_1', 'gtid')) + + self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_2', 'log_file')) + self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_2', 'log_pos')) + self.assertIsNotNone(singer.get_bookmark(self.state, 'tap_mysql_test-binlog_2', 'gtid')) + class TestViews(unittest.TestCase): def setUp(self): @@ -1313,9 +1382,3 @@ def tearDown(self) -> None: with connect_with_backoff(self.conn) as open_conn: with open_conn.cursor() as cursor: cursor.execute('DROP TABLE bit_booleans_table;') - - -# if __name__ == "__main__": -# test1 = TestBinlogReplication() -# test1.setUp() -# test1.test_binlog_stream() diff --git a/tests/unit/sync_strategies/test_binlog.py b/tests/unit/sync_strategies/test_binlog.py index c9648ef..d1b5ec4 100644 --- a/tests/unit/sync_strategies/test_binlog.py +++ b/tests/unit/sync_strategies/test_binlog.py @@ -1,16 +1,22 @@ import datetime +import socket + import pytz import os from collections import namedtuple from typing import Dict from unittest import TestCase -from unittest.mock import patch, Mock, call +from unittest.mock import patch, Mock, call, MagicMock + +from pymysql import InternalError +from pymysql.cursors import Cursor from pymysqlreplication.constants import FIELD_TYPE -from pymysqlreplication.event import RotateEvent +from pymysqlreplication.event import RotateEvent, MariadbGtidEvent, GtidEvent from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent from singer import CatalogEntry, Schema, Catalog, RecordMessage, StateMessage, SchemaMessage +from tap_mysql import connection from tap_mysql.connection import MySQLConnection from tap_mysql.sync_strategies import binlog @@ -56,7 +62,6 @@ def test_add_automatic_properties(self): self.assertListEqual(['x', binlog.SDC_DELETED_AT], columns) - @patch('tap_mysql.sync_strategies.binlog.verify_log_file_exists') @patch('tap_mysql.sync_strategies.binlog.calculate_bookmark', return_value=('binlog0001', 50)) @patch('tap_mysql.sync_strategies.binlog.fetch_current_log_file_and_pos', @@ -65,17 +70,19 @@ def test_add_automatic_properties(self): tzinfo=pytz.UTC)) @patch('tap_mysql.sync_strategies.binlog.discover_catalog') @patch('tap_mysql.sync_strategies.binlog.make_connection_wrapper') - def test_sync_binlog_stream_1(self, - make_connection_wrapper_mock, - discover_catalog_mock, - *args): + def test_sync_binlog_stream_with_log_file_and_pos(self, + make_connection_wrapper_mock, + discover_catalog_mock, + *args): # we're dealing with local datetimes, so tests passing depend on the local timezone # pin the TZ to EET to avoid flakiness os.environ['TZ'] = 'EET' config = { - 'server_id': '123' + 'server_id': '123', + 'use_gtid': False, + 'engine': connection.MYSQL_ENGINE, } mysql_con = Mock(spec_set=MySQLConnection) @@ -467,6 +474,7 @@ def iter_mock(_): yield x reader_mock.close.return_value = 'Closing' + reader_mock.return_value.auto_position = None reader_mock.return_value.__iter__ = iter_mock @@ -800,35 +808,1237 @@ def iter_mock(_): ]) self.assertListEqual([msg for msg in singer_messages if isinstance(msg, StateMessage)], - [ - StateMessage(value={ - 'bookmarks': { - 'my_db-stream1': { - 'log_file': 'binlog0003', - 'log_pos': 999, - 'version': 1 - }, - 'my_db-stream2': { - 'log_file': 'binlog0003', - 'log_pos': 999, - 'version': 1 - }, - - } - }), - ]) + [ + StateMessage(value={ + 'bookmarks': { + 'my_db-stream1': { + 'log_file': 'binlog0003', + 'log_pos': 999, + 'version': 1 + }, + 'my_db-stream2': { + 'log_file': 'binlog0003', + 'log_pos': 999, + 'version': 1 + }, + + } + }), + ]) reader_mock.assert_called_once_with( **{ 'connection_settings': {}, + 'pymysql_wrapper': make_connection_wrapper_mock.return_value, + 'is_mariadb': False, 'server_id': 123, - 'slave_uuid': 'pipelinewise-slave-123', + 'report_slave': socket.gethostname(), + 'only_events': [WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent, RotateEvent], 'log_file': 'binlog0001', 'log_pos': 50, 'resume_stream': True, - 'only_events': [RotateEvent, WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent], - 'pymysql_wrapper': make_connection_wrapper_mock.return_value } ) self.assertEqual(1, reader_mock.return_value.close.call_count) + + @patch('tap_mysql.sync_strategies.binlog.calculate_gtid_bookmark', + return_value='0-123-555') + @patch('tap_mysql.sync_strategies.binlog.fetch_current_log_file_and_pos', + return_value=('binlog0003', 1000)) + @patch('tap_mysql.sync_strategies.binlog.utils.now', return_value=datetime.datetime(2020, 10, 13, 8, 29, 58, + tzinfo=pytz.UTC)) + @patch('tap_mysql.sync_strategies.binlog.discover_catalog') + @patch('tap_mysql.sync_strategies.binlog.make_connection_wrapper') + def test_sync_binlog_stream_with_gtid(self, + make_connection_wrapper_mock, + discover_catalog_mock, + *args): + + # we're dealing with local datetimes, so tests passing depend on the local timezone + # pin the TZ to EET to avoid flakiness + os.environ['TZ'] = 'EET' + + config = { + 'server_id': '123', + 'use_gtid': True, + 'engine': 'mariadb', + } + mysql_con = Mock(spec_set=MySQLConnection) + + catalog = { + 'my_db-stream1': { + 'catalog_entry': CatalogEntry( + table='stream1', + stream='my_db-stream1', + tap_stream_id='my_db-stream1', + schema=Schema( + properties={ + 'c_int': Schema(inclusion='available', type=['null', 'integer']), + 'c_varchar': Schema(inclusion='available', type=['null', 'string']), + 'c_timestamp': Schema(inclusion='available', type=['null', 'string'], format='date-time'), + } + ), + metadata=[ + { + 'breadcrumb': [], + 'metadata': { + 'selected-by-default': False, + 'database-name': 'my_db', + 'row-count': 1, + 'replication-method': 'LOG_BASED', + 'selected': True, + 'is-view': False, + 'table-key-properties': ['c_int'] + } + }, + { + 'breadcrumb': ['properties', 'c_int'], + 'metadata': { + 'selected-by-default': True, + 'sql-datatype': 'int(11)', + 'datatype': 'int' + } + }, + { + 'breadcrumb': ['properties', 'c_varchar'], + 'metadata': { + 'selected-by-default': True, + 'sql-datatype': 'varchar(100)', + 'datatype': 'varchar' + } + }, + { + 'breadcrumb': ['properties', 'c_blob'], + 'metadata': { + 'selected-by-default': False, + 'sql-datatype': 'blob', + 'datatype': 'blob' + } + }, + { + 'breadcrumb': ['properties', 'c_timestamp'], + 'metadata': { + 'selected-by-default': True, + 'sql-datatype': 'timestamp', + 'datatype': 'timestamp' + } + } + ] + ), + 'desired_columns': {'c_int', 'c_varchar', 'c_timestamp'} + }, + 'my_db-stream2': { + 'catalog_entry': CatalogEntry( + table='stream2', + stream='my_db-stream2', + tap_stream_id='my_db-stream2', + schema=Schema( + properties={ + 'c_bool': Schema(inclusion='available', type=['null', 'bool']), + 'c_double': Schema(inclusion='available', type=['null', 'number']), + 'c_time': Schema(inclusion='available', type=['null', 'string'], format='time'), + } + ), + metadata=[ + { + 'breadcrumb': [], + 'metadata': { + 'selected-by-default': False, + 'database-name': 'my_db', + 'row-count': 1, + 'replication-method': 'LOG_BASED', + 'selected': True, + 'is-view': False, + 'table-key-properties': [] + } + }, + { + 'breadcrumb': ['properties', 'c_bool'], + 'metadata': { + 'selected-by-default': True, + 'sql-datatype': 'tinyint(1)' + } + }, + { + 'breadcrumb': ['properties', 'c_double'], + 'metadata': { + 'selected-by-default': True, + 'sql-datatype': 'double' + } + }, + { + 'breadcrumb': ['properties', 'c_time'], + 'metadata': { + 'selected-by-default': True, + 'sql-datatype': 'time' + } + } + ] + ), + 'desired_columns': {'c_bool', 'c_double', 'c_time'} + } + } + + singer_messages = [] + + state = { + 'bookmarks': { + 'my_db-stream1': { + 'version': 1 + }, + 'my_db-stream2': { + 'version': 1 + } + } + } + + with patch('tap_mysql.sync_strategies.binlog.singer.write_message') as write_msg: + write_msg.side_effect = lambda msg: singer_messages.append(msg) + + with patch('tap_mysql.sync_strategies.binlog.BinLogStreamReader', + autospec=True) as reader_mock: + def iter_mock(_): + log_files = [ + 'binlog0001', + 'binlog0001', + 'binlog0002', + 'binlog0002', + 'binlog0002', + 'binlog0002', + 'binlog0002', + 'binlog0003', + 'binlog0003', + 'binlog0003', + 'binlog0003', + 'binlog0003', + 'binlog0003' + ] + + log_positions = [ + 50, + 300, + 520, + 4, + 100, + 250, + 7, + 14, + 20, + 140, + 300, + 470, + 999, + ] + + for idx, x in enumerate([ + get_binlogevent(WriteRowsEvent, { + 'schema': 'my_db', + 'table': 'stream2', + 'columns': [ + Column('c_bool', FIELD_TYPE.TINY), + Column('c_time', FIELD_TYPE.TIME2), + Column('c_double', FIELD_TYPE.DOUBLE), + ], + 'rows': [ + {'values': { + 'c_bool': True, + 'c_time': datetime.time(20, 1, 14), + 'c_double': 19.44 + }}, + {'values': { + 'c_bool': False, + 'c_time': datetime.time(9, 10, 24), + 'c_double': 0.54 + }}, + ] + }), + get_binlogevent(UpdateRowsEvent, { + 'schema': 'my_db', + 'table': 'stream2', + 'columns': [ + Column('c_bool', FIELD_TYPE.TINY), + Column('c_time', FIELD_TYPE.TIME2), + Column('c_double', FIELD_TYPE.DOUBLE), + ], + 'rows': [ + {'after_values': { + 'c_bool': False, + 'c_time': datetime.time(8, 13, 12), + 'c_double': 100.22 + }}, + {'after_values': { + 'c_bool': False, + 'c_time': datetime.time(0, 10, 59, 44), + 'c_double': 0.54344 + }}, + {'after_values': { + 'c_bool': False, + 'c_time': datetime.time(0, 0, 0, 38), + 'c_double': 1.565667 + }}, + ] + }), + get_binlogevent(MariadbGtidEvent, { + 'gtid': '0-123-556', + }), + get_binlogevent(UpdateRowsEvent, { + 'schema': 'my_db', + 'table': 'stream1', + 'columns': [ + Column('c_int', FIELD_TYPE.INT24), + Column('c_varchar', FIELD_TYPE.VARCHAR), + Column('c_timestamp', FIELD_TYPE.TIMESTAMP2), + Column('c_blob', FIELD_TYPE.BLOB), + ], + 'rows': [ + {'after_values': { + 'c_int': 1, + 'c_timestamp': datetime.datetime(2021, 3, 24, 12, 12, 56), + 'c_varchar': 'varchar 1', + 'c_blob': b'dfhdfhsdhf' + }}, + {'after_values': { + 'c_int': 2, + 'c_timestamp': datetime.datetime(2019, 12, 24, 5, 1, 6), + 'c_varchar': 'varchar 2', + 'c_blob': b'dflldskjf' + }} + ] + }), + get_binlogevent(DeleteRowsEvent, { + 'schema': 'my_db', + 'table': 'stream3', + 'columns': [] + }), + get_binlogevent(MariadbGtidEvent, { + 'gtid': '0-123-556', + }), + get_binlogevent(WriteRowsEvent, { + 'schema': 'my_db', + 'table': 'stream2', + 'columns': [ + Column('c_bool', FIELD_TYPE.TINY), + Column('c_double', FIELD_TYPE.DOUBLE), + Column('c_time', FIELD_TYPE.TIME2), + ], + 'rows': [ + {'values': { + 'c_bool': False, + 'c_time': datetime.time(0, 0, 0, 38), + 'c_double': 10000.234 + }} + ] + }), + get_binlogevent(WriteRowsEvent, { + 'schema': 'my_db', + 'table': 'stream1', + 'columns': [ + Column('c_int', FIELD_TYPE.INT24), + Column('c_varchar', FIELD_TYPE.VARCHAR), + Column('__dropped_col_2__', FIELD_TYPE.TIMESTAMP2), + Column('c_blob', FIELD_TYPE.BLOB), + ], + 'rows': [ + {'values': { + 'c_int': 3, + 'c_varchar': 'varchar 3', + 'c_blob': b'dfhdfhsdhf' + }}, + {'values': { + 'c_int': 4, + 'c_varchar': 'varchar 4', + 'c_blob': b'32fgdf243' + }} + ] + }), + get_binlogevent(DeleteRowsEvent, { + 'schema': 'my_db', + 'table': 'stream1', + 'timestamp': datetime.datetime.timestamp(datetime.datetime(2021, 1, 1, 10, 20, 55, + tzinfo=pytz.UTC)), + 'columns': [ + Column('c_int', FIELD_TYPE.INT24), + Column('c_varchar', FIELD_TYPE.VARCHAR), + Column('c_datetime', FIELD_TYPE.TIMESTAMP2), + Column('c_blob', FIELD_TYPE.BLOB), + ], + 'rows': [ + {'values': { + 'c_int': 5, + 'c_datetime': datetime.datetime(2002, 8, 20, 8, 5, 9), + 'c_varchar': 'varchar 5', + 'c_blob': b'dfhdfhsdhf' + }}, + {'values': { + 'c_int': 6, + 'c_datetime': datetime.datetime(2020, 1, 1, 0, 0, 0), + 'c_varchar': 'varchar 6', + 'c_blob': b'32fgdf243' + }}, + {'values': { + 'c_int': 7, + 'c_datetime': datetime.datetime(2021, 1, 1, 3, 4, 0, 67483), + 'c_varchar': 'varchar 7', + 'c_blob': None + }} + ] + }), + get_binlogevent(MariadbGtidEvent, { + 'gtid': '0-123-557', + }), + get_binlogevent(WriteRowsEvent, { + 'schema': 'my_db', + 'table': 'stream2', + 'columns': [ + Column('c_bool', FIELD_TYPE.TINY), + Column('c_double', FIELD_TYPE.DOUBLE), + Column('c_time', FIELD_TYPE.TIME2), + Column('c_json', FIELD_TYPE.JSON), + ], + 'rows': [ + {'values': { + 'c_bool': False, + 'c_time': datetime.time(12, 30, 0, 354676), + 'c_double': 10000, + 'c_json': {'a': 1, 'b': 2} + }}, + {'values': { + 'c_bool': True, + 'c_time': datetime.time(12, 30, 0, 0), + 'c_double': 10.40, + 'c_json': [{}, {}] + }}, + {'values': { + 'c_bool': True, + 'c_time': None, + 'c_double': -457.10, + 'c_json': None + }} + ] + }), + get_binlogevent(DeleteRowsEvent, { + 'schema': 'my_db', + 'table': 'stream1', + 'timestamp': datetime.datetime.timestamp(datetime.datetime(2021, 1, 1, 20, 0, 0, + tzinfo=pytz.UTC)), + 'columns': [ + Column('c_int', FIELD_TYPE.INT24), + Column('c_varchar', FIELD_TYPE.VARCHAR), + Column('c_datetime', FIELD_TYPE.TIMESTAMP2), + Column('c_tiny_blob', FIELD_TYPE.TINY_BLOB), + Column('c_blob', FIELD_TYPE.BLOB), + ], + 'rows': [ + {'values': { + 'c_int': 8, + 'c_datetime': datetime.datetime(2002, 8, 20, 3, 5, 9), + 'c_varchar': 'varchar 8', + 'c_blob': b'464thh', + 'c_tiny_blob': b'1' + }}, + {'values': { + 'c_int': 9, + 'c_datetime': None, + 'c_varchar': 'varchar 9', + 'c_blob': b'32fgdf243', + 'c_tiny_blob': None + }}, + {'values': { + 'c_int': 10, + 'c_datetime': None, + 'c_varchar': 'varchar 10', + 'c_blob': None, + 'c_tiny_blob': b'1' + }} + ] + }), + get_binlogevent(MariadbGtidEvent, { + 'gtid': '0-123-558', + }), + ]): + reader_mock.return_value.log_file = log_files[idx] + reader_mock.return_value.log_pos = log_positions[idx] + yield x + + reader_mock.close.return_value = 'Closing' + reader_mock.return_value.auto_position = None + + reader_mock.return_value.__iter__ = iter_mock + + discover_catalog_mock.side_effect = [ + Catalog([ + CatalogEntry( + table='stream1', + stream='my_db-stream1', + tap_stream_id='my_db-stream1', + schema=Schema( + properties={ + 'c_int': Schema(inclusion='available', type=['null', 'integer']), + 'c_varchar': Schema(inclusion='available', type=['null', 'string']), + 'c_datetime': Schema(inclusion='available', type=['null', 'string'], + format='date-time'), + } + ), + metadata=[ + { + 'breadcrumb': [], + 'metadata': { + 'selected-by-default': False, + 'database-name': 'my_db', + 'row-count': 1, + 'replication-method': 'LOG_BASED', + 'selected': True, + 'is-view': False, + 'table-key-properties': ['c_int'] + } + }, + { + 'breadcrumb': ['properties', 'c_int'], + 'metadata': { + 'selected-by-default': True, + 'sql-datatype': 'int(11)', + 'datatype': 'int' + } + }, + { + 'breadcrumb': ['properties', 'c_varchar'], + 'metadata': { + 'selected-by-default': True, + 'sql-datatype': 'varchar(100)', + 'datatype': 'varchar' + } + }, + { + 'breadcrumb': ['properties', 'c_blob'], + 'metadata': { + 'selected-by-default': True, + 'sql-datatype': 'blob', + 'datatype': 'blob' + } + }, + { + 'breadcrumb': ['properties', 'c_tiny_blob'], + 'metadata': { + 'selected-by-default': False, + 'sql-datatype': 'blob', + 'datatype': 'blob' + } + }, + { + 'breadcrumb': ['properties', 'c_datetime'], + 'metadata': { + 'selected-by-default': True, + 'sql-datatype': 'timestamp', + 'datatype': 'timestamp' + } + } + ] + ), + ]), + Catalog([ + CatalogEntry( + table='stream2', + stream='my_db-stream2', + tap_stream_id='my_db-stream2', + schema=Schema( + properties={ + 'c_bool': Schema(inclusion='available', type=['null', 'bool']), + 'c_double': Schema(inclusion='available', type=['null', 'number']), + 'c_time': Schema(inclusion='available', type=['null', 'string'], format='time'), + 'c_json': Schema(inclusion='available', type=['null', 'string']), + } + ), + metadata=[ + { + 'breadcrumb': [], + 'metadata': { + 'selected-by-default': False, + 'database-name': 'my_db', + 'row-count': 1, + 'replication-method': 'LOG_BASED', + 'selected': True, + 'is-view': False, + 'table-key-properties': [] + } + }, + { + 'breadcrumb': ['properties', 'c_bool'], + 'metadata': { + 'selected-by-default': True, + 'sql-datatype': 'tinyint(1)' + } + }, + { + 'breadcrumb': ['properties', 'c_double'], + 'metadata': { + 'selected-by-default': True, + 'sql-datatype': 'double' + } + }, + { + 'breadcrumb': ['properties', 'c_time'], + 'metadata': { + 'selected-by-default': True, + 'sql-datatype': 'time' + } + }, + { + 'breadcrumb': ['properties', 'c_json'], + 'metadata': { + 'selected-by-default': True, + 'sql-datatype': 'longtext' + } + } + ] + ) + ]) + + ] + + binlog.sync_binlog_stream( + mysql_con, + config, + catalog, + state + ) + + discover_catalog_mock.assert_has_calls([ + call(mysql_con, None, 'stream1'), + call(mysql_con, None, 'stream2'), + ], any_order=False) + + self.assertListEqual([type(msg) for msg in singer_messages], [ + RecordMessage, + RecordMessage, + RecordMessage, + RecordMessage, + RecordMessage, + RecordMessage, + RecordMessage, + RecordMessage, + RecordMessage, + RecordMessage, + SchemaMessage, + RecordMessage, + RecordMessage, + RecordMessage, + SchemaMessage, + RecordMessage, + RecordMessage, + RecordMessage, + RecordMessage, + RecordMessage, + RecordMessage, + StateMessage, + ]) + + self.assertListEqual([msg for msg in singer_messages if isinstance(msg, RecordMessage)], [ + RecordMessage(stream='my_db-stream2', + time_extracted=datetime.datetime(2020, 10, 13, 8, 29, 58, tzinfo=pytz.UTC), + version=1, + record={ + 'c_bool': True, + 'c_time': datetime.time(20, 1, 14), + 'c_double': 19.44 + }), + RecordMessage(stream='my_db-stream2', + time_extracted=datetime.datetime(2020, 10, 13, 8, 29, 58, tzinfo=pytz.UTC), + version=1, + record={ + 'c_bool': False, + 'c_time': datetime.time(9, 10, 24), + 'c_double': 0.54 + }), + RecordMessage(stream='my_db-stream2', + time_extracted=datetime.datetime(2020, 10, 13, 8, 29, 58, tzinfo=pytz.UTC), + version=1, + record={ + 'c_bool': False, + 'c_time': datetime.time(8, 13, 12), + 'c_double': 100.22 + }), + RecordMessage(stream='my_db-stream2', + time_extracted=datetime.datetime(2020, 10, 13, 8, 29, 58, tzinfo=pytz.UTC), + version=1, + record={ + 'c_bool': False, + 'c_time': datetime.time(0, 10, 59, 44), + 'c_double': 0.54344 + }), + RecordMessage(stream='my_db-stream2', + time_extracted=datetime.datetime(2020, 10, 13, 8, 29, 58, tzinfo=pytz.UTC), + version=1, + record={ + 'c_bool': False, + 'c_time': datetime.time(0, 0, 0, 38), + 'c_double': 1.565667 + }), + RecordMessage(stream='my_db-stream1', + time_extracted=datetime.datetime(2020, 10, 13, 8, 29, 58, tzinfo=pytz.UTC), + version=1, + record={ + 'c_int': 1, + 'c_timestamp': '2021-03-24T10:12:56+00:00', + 'c_varchar': 'varchar 1' + }), + RecordMessage(stream='my_db-stream1', + time_extracted=datetime.datetime(2020, 10, 13, 8, 29, 58, tzinfo=pytz.UTC), + version=1, + record={ + 'c_int': 2, + 'c_timestamp': '2019-12-24T03:01:06+00:00', + 'c_varchar': 'varchar 2' + }), + RecordMessage(stream='my_db-stream2', + time_extracted=datetime.datetime(2020, 10, 13, 8, 29, 58, tzinfo=pytz.UTC), + version=1, + record={ + 'c_bool': False, + 'c_time': datetime.time(0, 0, 0, 38), + 'c_double': 10000.234 + }), + RecordMessage(stream='my_db-stream1', + time_extracted=datetime.datetime(2020, 10, 13, 8, 29, 58, tzinfo=pytz.UTC), + version=1, + record={ + 'c_int': 3, + 'c_varchar': 'varchar 3', + }), + RecordMessage(stream='my_db-stream1', + time_extracted=datetime.datetime(2020, 10, 13, 8, 29, 58, tzinfo=pytz.UTC), + version=1, + record={ + 'c_int': 4, + 'c_varchar': 'varchar 4', + }), + RecordMessage(stream='my_db-stream1', + time_extracted=datetime.datetime(2020, 10, 13, 8, 29, 58, tzinfo=pytz.UTC), + version=1, + record={ + 'c_int': 5, + 'c_datetime': '2002-08-20T05:05:09+00:00', + 'c_varchar': 'varchar 5', + '_sdc_deleted_at': '2021-01-01T10:20:55+00:00' + }), + RecordMessage(stream='my_db-stream1', + time_extracted=datetime.datetime(2020, 10, 13, 8, 29, 58, tzinfo=pytz.UTC), + version=1, + record={ + 'c_int': 6, + 'c_datetime': '2019-12-31T22:00:00+00:00', + 'c_varchar': 'varchar 6', + '_sdc_deleted_at': '2021-01-01T10:20:55+00:00' + }), + RecordMessage(stream='my_db-stream1', + time_extracted=datetime.datetime(2020, 10, 13, 8, 29, 58, tzinfo=pytz.UTC), + version=1, + record={ + 'c_int': 7, + 'c_datetime': '2021-01-01T01:04:00.067483+00:00', + 'c_varchar': 'varchar 7', + '_sdc_deleted_at': '2021-01-01T10:20:55+00:00' + }), + RecordMessage(stream='my_db-stream2', + time_extracted=datetime.datetime(2020, 10, 13, 8, 29, 58, tzinfo=pytz.UTC), + version=1, + record={ + 'c_bool': False, + 'c_time': datetime.time(12, 30, 0, 354676), + 'c_double': 10000, + 'c_json': '{"a": 1, "b": 2}' + }), + RecordMessage(stream='my_db-stream2', + time_extracted=datetime.datetime(2020, 10, 13, 8, 29, 58, tzinfo=pytz.UTC), + version=1, + record={ + 'c_bool': True, + 'c_time': datetime.time(12, 30, 0), + 'c_double': 10.40, + 'c_json': '[{}, {}]' + }), + RecordMessage(stream='my_db-stream2', + time_extracted=datetime.datetime(2020, 10, 13, 8, 29, 58, tzinfo=pytz.UTC), + version=1, + record={ + 'c_bool': True, + 'c_time': None, + 'c_double': -457.10, + 'c_json': 'null' + }), + RecordMessage(stream='my_db-stream1', + time_extracted=datetime.datetime(2020, 10, 13, 8, 29, 58, tzinfo=pytz.UTC), + version=1, + record={ + 'c_int': 8, + 'c_datetime': '2002-08-20T00:05:09+00:00', + 'c_varchar': 'varchar 8', + '_sdc_deleted_at': '2021-01-01T20:00:00+00:00' + }), + RecordMessage(stream='my_db-stream1', + time_extracted=datetime.datetime(2020, 10, 13, 8, 29, 58, tzinfo=pytz.UTC), + version=1, + record={ + 'c_int': 9, + 'c_datetime': None, + 'c_varchar': 'varchar 9', + '_sdc_deleted_at': '2021-01-01T20:00:00+00:00' + }), + RecordMessage(stream='my_db-stream1', + time_extracted=datetime.datetime(2020, 10, 13, 8, 29, 58, tzinfo=pytz.UTC), + version=1, + record={ + 'c_int': 10, + 'c_datetime': None, + 'c_varchar': 'varchar 10', + '_sdc_deleted_at': '2021-01-01T20:00:00+00:00' + }) + ]) + + self.assertListEqual([msg for msg in singer_messages if isinstance(msg, StateMessage)], + [ + StateMessage(value={ + 'bookmarks': { + 'my_db-stream1': { + 'gtid': '0-123-558', + 'log_file': 'binlog0003', + 'log_pos': 999, + 'version': 1, + }, + 'my_db-stream2': { + 'gtid': '0-123-558', + 'log_file': 'binlog0003', + 'log_pos': 999, + 'version': 1 + }, + + } + }), + ]) + + reader_mock.assert_called_once_with( + **{ + 'connection_settings': {}, + 'pymysql_wrapper': make_connection_wrapper_mock.return_value, + 'is_mariadb': True, + 'server_id': 123, + 'report_slave': socket.gethostname(), + 'only_events': [WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent, GtidEvent, MariadbGtidEvent], + 'auto_position': '0-123-555', + } + ) + + self.assertEqual(1, reader_mock.return_value.close.call_count) + + @patch('tap_mysql.sync_strategies.binlog.connect_with_backoff') + def test_verify_binlog_config_success(self, connect_with_backoff): + + mysql_con = MagicMock(spec_set=MySQLConnection).return_value + + cur_mock = MagicMock(spec_set=Cursor).return_value + cur_mock.__enter__.return_value.fetchone.side_effect = [ + ['ROW'], + ['FULL'] + ] + + mysql_con.__enter__.return_value.cursor.return_value = cur_mock + + connect_with_backoff.return_value = mysql_con + + binlog.verify_binlog_config(mysql_con) + + connect_with_backoff.assert_called_with(mysql_con) + cur_mock.__enter__.return_value.execute.assert_has_calls( + [ + call('SELECT @@binlog_format'), + call('SELECT @@binlog_row_image'), + ] + ) + + @patch('tap_mysql.sync_strategies.binlog.connect_with_backoff') + def test_verify_binlog_config_fail_if_not_FULL(self, connect_with_backoff): + + mysql_con = MagicMock(spec_set=MySQLConnection).return_value + + cur_mock = MagicMock(spec_set=Cursor).return_value + cur_mock.__enter__.return_value.fetchone.side_effect = [ + ['ROW'], + ['Not-FULL'] + ] + + mysql_con.__enter__.return_value.cursor.return_value = cur_mock + + connect_with_backoff.return_value = mysql_con + + with self.assertRaises(Exception) as ex: + binlog.verify_binlog_config(mysql_con) + + self.assertEqual("Unable to replicate binlog stream because binlog_row_image is " + f"not set to 'FULL': Not-FULL.", + str()) + + connect_with_backoff.assert_called_with(mysql_con) + cur_mock.__enter__.return_value.execute.assert_has_calls( + [ + call('SELECT @@binlog_format'), + call('SELECT @@binlog_row_image'), + ] + ) + + @patch('tap_mysql.sync_strategies.binlog.connect_with_backoff') + def test_verify_binlog_config_fail_if_not_ROW(self, connect_with_backoff): + + mysql_con = MagicMock(spec_set=MySQLConnection).return_value + + cur_mock = MagicMock(spec_set=Cursor).return_value + cur_mock.__enter__.return_value.fetchone.side_effect = [ + ['Not-ROW'], + ] + + mysql_con.__enter__.return_value.cursor.return_value = cur_mock + + connect_with_backoff.return_value = mysql_con + + with self.assertRaises(Exception) as ex: + binlog.verify_binlog_config(mysql_con) + + self.assertEqual("Unable to replicate binlog stream because binlog_format is " + f"not set to 'ROW': Not-ROW.", + str()) + + connect_with_backoff.assert_called_with(mysql_con) + cur_mock.__enter__.return_value.execute.assert_has_calls( + [ + call('SELECT @@binlog_format'), + ] + ) + + @patch('tap_mysql.sync_strategies.binlog.connect_with_backoff') + def test_verify_binlog_config_fail_if_binlog_row_image_not_supported(self, connect_with_backoff): + + mysql_con = MagicMock(spec_set=MySQLConnection).return_value + + cur_mock = MagicMock(spec_set=Cursor).return_value + cur_mock.__enter__.return_value.fetchone.side_effect = [ + ['ROW'], + ] + + cur_mock.__enter__.return_value.execute.side_effect = [ + None, + InternalError(1193) + ] + + mysql_con.__enter__.return_value.cursor.return_value = cur_mock + + connect_with_backoff.return_value = mysql_con + + with self.assertRaises(Exception) as ex: + binlog.verify_binlog_config(mysql_con) + + self.assertEqual("Unable to replicate binlog stream because binlog_row_image " + "system variable does not exist. MySQL version must be at " + "least 5.6.2 to use binlog replication.", + str()) + + connect_with_backoff.assert_called_with(mysql_con) + cur_mock.__enter__.return_value.execute.assert_has_calls( + [ + call('SELECT @@binlog_format'), + call('SELECT @@binlog_row_image'), + ] + ) + + @patch('tap_mysql.sync_strategies.binlog.connect_with_backoff') + def test_verify_gtid_config_success(self, connect_with_backoff): + + mysql_con = MagicMock(spec_set=MySQLConnection).return_value + + cur_mock = MagicMock(spec_set=Cursor).return_value + cur_mock.__enter__.return_value.fetchone.side_effect = [ + ['ON'], + ] + + mysql_con.__enter__.return_value.cursor.return_value = cur_mock + + connect_with_backoff.return_value = mysql_con + + binlog.verify_gtid_config(mysql_con) + + connect_with_backoff.assert_called_with(mysql_con) + cur_mock.__enter__.return_value.execute.assert_has_calls( + [ + call('select @@gtid_mode;'), + ] + ) + + @patch('tap_mysql.sync_strategies.binlog.connect_with_backoff') + def test_verify_gtid_config_fail_if_not_on(self, connect_with_backoff): + + mysql_con = MagicMock(spec_set=MySQLConnection).return_value + + cur_mock = MagicMock(spec_set=Cursor).return_value + cur_mock.__enter__.return_value.fetchone.side_effect = [ + ['OFF'], + ] + + mysql_con.__enter__.return_value.cursor.return_value = cur_mock + + connect_with_backoff.return_value = mysql_con + + with self.assertRaises(Exception) as ex: + binlog.verify_gtid_config(mysql_con) + + self.assertEqual("Unable to replicate binlog stream because GTID mode is not enabled.") + + connect_with_backoff.assert_called_with(mysql_con) + cur_mock.__enter__.return_value.execute.assert_has_calls( + [ + call('select @@gtid_mode;'), + ] + ) + + @patch('tap_mysql.sync_strategies.binlog.connect_with_backoff') + def test_fetch_current_log_file_and_pos_success(self, connect_with_backoff): + mysql_con = MagicMock(spec_set=MySQLConnection).return_value + cur_mock = MagicMock(spec_set=Cursor).return_value + cur_mock.__enter__.return_value.fetchone.side_effect = [ + ['binlog.000033', 345, ''], + ] + + mysql_con.__enter__.return_value.cursor.return_value = cur_mock + + connect_with_backoff.return_value = mysql_con + + result = binlog.fetch_current_log_file_and_pos(mysql_con) + + self.assertEqual(result, ('binlog.000033', 345)) + + connect_with_backoff.assert_called_with(mysql_con) + cur_mock.__enter__.return_value.execute.assert_has_calls( + [ + call('SHOW MASTER STATUS'), + ] + ) + + @patch('tap_mysql.sync_strategies.binlog.connect_with_backoff') + def test_fetch_current_log_file_and_pos_fail_if_no_result(self, connect_with_backoff): + mysql_con = MagicMock(spec_set=MySQLConnection).return_value + cur_mock = MagicMock(spec_set=Cursor).return_value + cur_mock.__enter__.return_value.fetchone.side_effect = [ + None + ] + + mysql_con.__enter__.return_value.cursor.return_value = cur_mock + + connect_with_backoff.return_value = mysql_con + + with self.assertRaises(Exception) as ex: + binlog.fetch_current_log_file_and_pos(mysql_con) + + self.assertEqual('MySQL binary logging is not enabled.', str(ex)) + + connect_with_backoff.assert_called_with(mysql_con) + cur_mock.__enter__.return_value.execute.assert_has_calls( + [ + call('SHOW MASTER STATUS'), + ] + ) + + @patch('tap_mysql.sync_strategies.binlog.connection.fetch_server_uuid') + @patch('tap_mysql.sync_strategies.binlog.connect_with_backoff') + def test_fetch_current_gtid_pos_for_mysql_not_found_expect_exception( + self, connect_with_backoff, fetch_server_uuid): + mysql_con = MagicMock(spec_set=MySQLConnection).return_value + cur_mock = MagicMock(spec_set=Cursor).return_value + cur_mock.__enter__.return_value.fetchone.side_effect = [ + ['3E11FA47-71CA-11E1-9E21-C80AA9429562:1,3E11FA47-71BB-11E1-9E33-C80AA9429562:2:143,0-3-1123,,' + '3E11FA47-71CA-11E1-9E33-C80AA9429562:2:332'], + ] + + mysql_con.__enter__.return_value.cursor.return_value = cur_mock + + connect_with_backoff.return_value = mysql_con + fetch_server_uuid.return_value = '3E11FA47-71CA-11E1-9E33-C80AA9429562' + + with self.assertRaises(Exception): + binlog.fetch_current_gtid_pos(mysql_con, connection.MYSQL_ENGINE) + + connect_with_backoff.assert_called_with(mysql_con) + fetch_server_uuid.assert_called_with(mysql_con) + cur_mock.__enter__.return_value.execute.assert_has_calls( + [ + call('select @@GLOBAL.gtid_executed;'), + ] + ) + + @patch('tap_mysql.sync_strategies.binlog.connection.fetch_server_uuid') + @patch('tap_mysql.sync_strategies.binlog.connect_with_backoff') + def test_fetch_current_gtid_pos_for_mysql_succeeds( + self, connect_with_backoff, fetch_server_uuid): + + mysql_con = MagicMock(spec_set=MySQLConnection).return_value + cur_mock = MagicMock(spec_set=Cursor).return_value + cur_mock.__enter__.return_value.fetchone.side_effect = [ + ['3E11FA47-71CA-11E1-9E33-C80AA9429562:1,3E11FA47-71BB-11E1-9E33-C80AA9429562:2:143,0-3-1123,,' + '3E11FA47-71CA-11E1-9E33-C80AA9429562:2:332'], + ] + + mysql_con.__enter__.return_value.cursor.return_value = cur_mock + + connect_with_backoff.return_value = mysql_con + fetch_server_uuid.return_value = '3E11FA47-71CA-11E1-9E33-C80AA9429562' + + result = binlog.fetch_current_gtid_pos(mysql_con, connection.MYSQL_ENGINE) + + self.assertEqual('3E11FA47-71CA-11E1-9E33-C80AA9429562:1', result) + + connect_with_backoff.assert_called_with(mysql_con) + fetch_server_uuid.assert_called_with(mysql_con) + + cur_mock.__enter__.return_value.execute.assert_has_calls( + [ + call('select @@GLOBAL.gtid_executed;'), + ] + ) + + @patch('tap_mysql.sync_strategies.binlog.connection.fetch_server_id') + @patch('tap_mysql.sync_strategies.binlog.connect_with_backoff') + def test_fetch_current_gtid_pos_for_mariadb_no_gtid_found_expect_exception( + self, connect_with_backoff, fetch_server_id): + mysql_con = MagicMock(spec_set=MySQLConnection).return_value + cur_mock = MagicMock(spec_set=Cursor).return_value + cur_mock.__enter__.return_value.fetchone.side_effect = [ + None + ] + + mysql_con.__enter__.return_value.cursor.return_value = cur_mock + + connect_with_backoff.return_value = mysql_con + fetch_server_id.return_value = 2 + + with self.assertRaises(Exception) as ex: + binlog.fetch_current_gtid_pos(mysql_con, connection.MARIADB_ENGINE) + + self.assertIn('No suitable GTID was found for server', str(ex)) + + connect_with_backoff.assert_called_with(mysql_con) + fetch_server_id.assert_called_with(mysql_con) + + cur_mock.__enter__.return_value.execute.assert_has_calls( + [ + call('select @@gtid_current_pos;'), + ] + ) + + @patch('tap_mysql.sync_strategies.binlog.connection.fetch_server_id') + @patch('tap_mysql.sync_strategies.binlog.connect_with_backoff') + def test_fetch_current_gtid_pos_no_gtid_found_for_given_server_expect_exception( + self, connect_with_backoff, fetch_server_id): + + mysql_con = MagicMock(spec_set=MySQLConnection).return_value + cur_mock = MagicMock(spec_set=Cursor).return_value + cur_mock.__enter__.return_value.fetchone.side_effect = [ + ['0, 0-4-222,'] + ] + + mysql_con.__enter__.return_value.cursor.return_value = cur_mock + + connect_with_backoff.return_value = mysql_con + + fetch_server_id.return_value = 2 + + with self.assertRaises(Exception) as ex: + binlog.fetch_current_gtid_pos(mysql_con, connection.MARIADB_ENGINE) + + self.assertIn('No suitable GTID was found for server', str(ex)) + + connect_with_backoff.assert_called_with(mysql_con) + fetch_server_id.assert_called_with(mysql_con) + cur_mock.__enter__.return_value.execute.assert_has_calls( + [ + call('select @@gtid_current_pos;'), + ] + ) + + def test_calculate_gtid_bookmark_for_mariadb_returns_earliest(self): + + binlog_streams = { + 'stream1': {'schema': {}}, + 'stream2': {'schema': {}}, + 'stream3': {'schema': {}}, + } + + state = { + 'bookmarks': { + 'stream1': {'gtid': '0-3-165'}, + 'stream2': {'gtid': '0-20-12'}, + 'stream3': {'gtid': '0-12-43'}, + 'stream4': {'gtid': '0-1-1'}, + 'stream6': {'gtid': '0-3-4'}, + 'stream5': {}, + } + } + + result = binlog.calculate_gtid_bookmark(binlog_streams, state, connection.MARIADB_ENGINE) + + self.assertEqual(result, '0-20-12') + + def test_calculate_gtid_bookmark_for_mariadb_no_gtid_found_expect_exception(self): + + binlog_streams = { + 'stream1': {'schema': {}}, + 'stream2': {'schema': {}}, + 'stream3': {'schema': {}}, + } + + state = { + 'bookmarks': { + } + } + + with self.assertRaises(Exception) as ex: + binlog.calculate_gtid_bookmark(binlog_streams, state, connection.MARIADB_ENGINE) + + self.assertEqual("Couldn't find any gtid in state bookmarks to resume logical replication", str(ex)) + + def test_calculate_gtid_bookmark_for_mysql_returns_earliest(self): + + binlog_streams = { + 'stream1': {'schema': {}}, + 'stream2': {'schema': {}}, + 'stream3': {'schema': {}}, + 'stream4': {'schema': {}}, + } + + state = { + 'bookmarks': { + 'stream1': {'gtid': '3E11FA47-71CA-11E1-9E33-C80AA9429562:1-165'}, + 'stream2': {'gtid': '3E11FA47-71CA-11E1-9E33-C80AA9429562:12'}, + 'stream3': {'gtid': '3E11FA47-71CA-11E1-9E33-C80AA9429562:1-43'}, + 'stream4': {'gtid': '3E11FA47-71CA-11E1-9E33-C80AA9429562:1-2'}, + 'stream6': {'gtid': '3E11FA47-71CA-11E1-9E33-C80AA9429562:1'}, + 'stream5': {}, + } + } + + result = binlog.calculate_gtid_bookmark(binlog_streams, state, connection.MYSQL_ENGINE) + + self.assertEqual(result, '3E11FA47-71CA-11E1-9E33-C80AA9429562:1-2') + + def test_calculate_gtid_bookmark_for_mysql_no_gtid_found_expect_exception(self): + + binlog_streams = { + 'stream1': {'schema': {}}, + 'stream2': {'schema': {}}, + 'stream3': {'schema': {}}, + } + + state = { + 'bookmarks': { + } + } + + with self.assertRaises(Exception) as ex: + binlog.calculate_gtid_bookmark(binlog_streams, state, connection.MYSQL_ENGINE) + + self.assertEqual("Couldn't find any gtid in state bookmarks to resume logical replication", str(ex)) diff --git a/tests/unit/test__init__.py b/tests/unit/test__init__.py new file mode 100644 index 0000000..67e9b31 --- /dev/null +++ b/tests/unit/test__init__.py @@ -0,0 +1,104 @@ +import unittest + +from singer import CatalogEntry + +from tap_mysql import binlog_stream_requires_historical + + +class TestTapMysql(unittest.TestCase): + + def test_binlog_stream_requires_historical_with_log_coordinates_returns_false(self): + + catalog = CatalogEntry(tap_stream_id='stream_1', schema={}) + + state = { + 'bookmarks': { + 'stream_1': {'log_file': 'binlog.0001', 'log_pos': 1123}, + 'stream_2': {}, + } + } + + self.assertFalse(binlog_stream_requires_historical( + catalog, + state + )) + + def test_binlog_stream_requires_historical_with_partial_log_coordinates_returns_true(self): + + catalog = CatalogEntry(tap_stream_id='stream_1', schema={}) + + state = { + 'bookmarks': { + 'stream_1': {'log_pos': 1123}, + 'stream_2': {}, + } + } + + self.assertTrue(binlog_stream_requires_historical( + catalog, + state + )) + + def test_binlog_stream_requires_historical_with_gtid_returns_false(self): + + catalog = CatalogEntry(tap_stream_id='stream_1', schema={}) + + state = { + 'bookmarks': { + 'stream_1': {'gtid': '0-3834-222'}, + 'stream_2': {}, + } + } + + self.assertFalse(binlog_stream_requires_historical( + catalog, + state + )) + + def test_binlog_stream_requires_historical_with_no_log_coordinates_returns_true(self): + + catalog = CatalogEntry(tap_stream_id='stream_1', schema={}) + + state = { + 'bookmarks': { + 'stream_1': {}, + 'stream_2': {}, + } + } + + self.assertTrue(binlog_stream_requires_historical( + catalog, + state + )) + + def test_binlog_stream_requires_historical_with_log_coordinates_and_max_value_returns_true(self): + + catalog = CatalogEntry(tap_stream_id='stream_1', schema={}) + + state = { + 'bookmarks': { + 'stream_1': {'log_file': 'binlog.0001', 'log_pos': 1123, 'max_pk_values': '111'}, + 'stream_2': {}, + } + } + + self.assertTrue(binlog_stream_requires_historical( + catalog, + state + )) + + def test_binlog_stream_requires_historical_with_log_coordinates_and_last_pk_value_returns_true(self): + + catalog = CatalogEntry(tap_stream_id='stream_1', schema={}) + + state = { + 'bookmarks': { + 'stream_1': {'log_file': 'binlog.0001', 'log_pos': 1123, 'last_pk_fetched': '111'}, + 'stream_2': {}, + } + } + + self.assertTrue(binlog_stream_requires_historical( + catalog, + state + )) diff --git a/tests/unit/test_connection.py b/tests/unit/test_connection.py new file mode 100644 index 0000000..7fb3ec0 --- /dev/null +++ b/tests/unit/test_connection.py @@ -0,0 +1,59 @@ +import unittest + +from unittest.mock import patch, MagicMock, call + +from pymysql.cursors import Cursor +from singer import CatalogEntry + +from tap_mysql.connection import MySQLConnection, fetch_server_id, fetch_server_uuid + +import tap_mysql.connection + + +class TestConnection(unittest.TestCase): + + @patch('tap_mysql.connection.connect_with_backoff') + def test_fetch_server_id(self, connect_with_backoff): + + mysql_con = MagicMock(spec_set=MySQLConnection).return_value + cur_mock = MagicMock(spec_set=Cursor).return_value + cur_mock.__enter__.return_value.fetchone.return_value = [111] + + mysql_con.__enter__.return_value.cursor.return_value = cur_mock + + connect_with_backoff.return_value = mysql_con + + result = fetch_server_id(mysql_con) + + self.assertEqual(111, result) + + connect_with_backoff.assert_called_with(mysql_con) + + cur_mock.__enter__.return_value.execute.assert_has_calls( + [ + call('SELECT @@server_id'), + ] + ) + + @patch('tap_mysql.connection.connect_with_backoff') + def test_fetch_server_uuid(self, connect_with_backoff): + + mysql_con = MagicMock(spec_set=MySQLConnection).return_value + cur_mock = MagicMock(spec_set=Cursor).return_value + cur_mock.__enter__.return_value.fetchone.return_value = ['dkfhdsf0-ejr-dfbsf-dnfnsbdmfbdf'] + + mysql_con.__enter__.return_value.cursor.return_value = cur_mock + + connect_with_backoff.return_value = mysql_con + + result = fetch_server_uuid(mysql_con) + + self.assertEqual('dkfhdsf0-ejr-dfbsf-dnfnsbdmfbdf', result) + + connect_with_backoff.assert_called_with(mysql_con) + + cur_mock.__enter__.return_value.execute.assert_has_calls( + [ + call('SELECT @@server_uuid'), + ] + )