Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
AP-1139 dynamically infer gtid from binlog coordinates for MariaDB (#95)
Browse files Browse the repository at this point in the history
  • Loading branch information
Samira-El authored Mar 8, 2022
1 parent 386592e commit 53c6ccf
Show file tree
Hide file tree
Showing 5 changed files with 295 additions and 79 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pylint:

unit_test:
. ./venv/bin/activate ;\
nosetests -c .noserc --cover-min-percentage=42 tests/unit
nosetests -c .noserc --cover-min-percentage=47 tests/unit

integration_test:
. ./venv/bin/activate ;\
Expand Down
80 changes: 58 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,24 +65,24 @@ Create a config file containing the database connection credentials, see [sample

List of config parameters:

| Parameter | type | required | default | description |
|--------------|-------------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------|
| host | string | yes | - | mysql/mariadb host |
| port | int | yes | - | mysql/mariadb port |
| user | string | yes | - | db username |
| password | string | yes | - | db password |
| cursorclass | string | No | `pymysql.cursors.SSCursor` | set cursorclass used by PyMYSQL |
| database | string | No | - | Database to use, None to not use a particular one. Used by PyMYSQL |
| server_id | int | False | Randomly generated int | Used as the slave id when this tap is connecting to the server |
| Parameter | type | required | default | description |
|--------------|-------------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------|
| host | string | yes | - | mysql/mariadb host |
| port | int | yes | - | mysql/mariadb port |
| user | string | yes | - | db username |
| password | string | yes | - | db password |
| cursorclass | string | No | `pymysql.cursors.SSCursor` | set cursorclass used by PyMYSQL |
| database | string | No | - | Database to use, None to not use a particular one. Used by PyMYSQL |
| server_id | int | False | Randomly generated int | Used as the slave id when this tap is connecting to the server |
| filter_db | string | False | - | Comma separated list of schemas to extract tables only from particular schemas and to improve data extraction performance |
| use_gtid | bool | False | False | Flag to enable log based replication using GTID |
| engine | string ('mysql' or 'mariadb') | False | 'mysql' | Indicate which flavor the server is, used for LOG_BASED with GTID |
| ssl | string ("true") | No | False | Enable SSL connection |
| ssl_ca | string | No | - | for self-signed SSL |
| ssl_cert | string | No | - | for self-signed SSL |
| ssl_key | string | No | - | for self-signed SSL |
| internal_hostname | string | No | - | Override match hostname for google cloud |
| session_sqls | List of strings | No | ```['SET @@session.time_zone="+0:00"', 'SET @@session.wait_timeout=28800', 'SET @@session.net_read_timeout=3600', 'SET @@session.innodb_lock_wait_timeout=3600']``` | Set session variables dynamically.|
| use_gtid | bool | False | False <br/> | Flag to enable log based replication using GTID |
| engine | string ('mysql' or 'mariadb') | False | 'mysql' | Indicate which flavor the server is, used for LOG_BASED with GTID |
| ssl | string ("true") | No | False | Enable SSL connection |
| ssl_ca | string | No | - | for self-signed SSL |
| ssl_cert | string | No | - | for self-signed SSL |
| ssl_key | string | No | - | for self-signed SSL |
| internal_hostname | string | No | - | Override match hostname for google cloud |
| session_sqls | List of strings | No | ```['SET @@session.time_zone="+0:00"', 'SET @@session.wait_timeout=28800', 'SET @@session.net_read_timeout=3600', 'SET @@session.innodb_lock_wait_timeout=3600']``` | Set session variables dynamically. |


### Discovery mode
Expand Down Expand Up @@ -287,14 +287,50 @@ resultant stream of JSON data can be consumed by a Singer target.

## Replication methods and state file

In the above example, we invoked `tap-mysql` without providing a _state_ file
and without specifying a replication method. The two ways to replicate a given
table are `FULL_TABLE` and `INCREMENTAL`.
In the above example, we invoked `tap-mysql` without providing a _state_ file and without specifying a replication
method. The ways to replicate a given table are `FULL_TABLE`, `LOG_BASED` and `INCREMENTAL`.

### LOG_BASED

LOG_BASED replication makes use of the server's binary logs (binlogs), this method can work with primary
servers, the tap acts as a replica and requests the primary to stream log events,the tap then consumes events
pertaining to row changes (inserts, updates, deletes), binlog file rotate and gtid events.

Log_based method always requires an initial sync to get a snapshot of the table and current binlog coordinates/gtid
position.

The tap support two ways of consuming log events: using binlog coordinates or GTID, the default behavior is using
binlog coordinates, when turning the `use_gtid` flag, you have to specify the engine flavor (mariadb/mysql) due to
how different are the GTID implementations in these two engines.

When enabling the `use_gtid` flag and the engine is MariaDB, the tap will dynamically infer the GTID pos from
existing binlog coordinate in the state, if the engine is mysql, it will fail.

#### State when using binlog coordinates
```json
{
"bookmarks": {
"example_db-table1": {"log_file": "mysql-binlog.0003", "log_pos": 3244},
"example_db-table2": {"log_file": "mysql-binlog.0001", "log_pos": 42},
"example_db-table3": {"log_file": "mysql-binlog.0003", "log_pos": 100}
}
}
```

#### State when using GTID
```json
{
"bookmarks": {
"example_db-table1": {"log_file": "mysql-binlog.0003", "log_pos": 3244, "gtid": "0:364864374:599"},
"example_db-table2": {"log_file": "mysql-binlog.0001", "log_pos": 42, "gtid": "0:364864374:375"},
"example_db-table3": {"log_file": "mysql-binlog.0003", "log_pos": 100, "gtid": "0:364864374:399"}
}
}
```

### Full Table

Full-table replication extracts all data from the source table each time the tap
is invoked.
Full-table replication extracts all data from the source table each time the tap is invoked.

### Incremental

Expand Down
47 changes: 42 additions & 5 deletions tap_mysql/sync_strategies/binlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,15 @@ def row_to_singer_record(catalog_entry, version, db_column_map, row, time_extrac


def calculate_gtid_bookmark(
mysql_conn: MySQLConnection,
binlog_streams_map: Dict[str, Any],
state: Dict,
engine: str
) -> str:
"""
Finds the earliest bookmarked gtid in the state
Args:
mysql_conn: instance of MySqlConnection
binlog_streams_map: dictionary of selected streams
state: state dict with bookmarks
engine: the DB flavor mysql/mariadb
Expand Down Expand Up @@ -298,16 +300,51 @@ def calculate_gtid_bookmark(
min_seq_no = gtid_seq_no
min_gtid = gtid


if not min_gtid:
raise Exception("Couldn't find any gtid in state bookmarks to resume logical replication")

LOGGER.info('The earliest bookmarked GTID found in the state is "%s", and will be used to resume replication',
min_gtid)
# Mariadb has a handy sql function BINLOG_GTID_POS to infer the gtid position from given binlog coordinates so
# we will use that, as for mysql, there is no such thing, the only available option is using the cli utility
# mysqlbinlog which we deemed as not nice to use, and we don't wanna make it a system requirement of this tap,
# hence, this functionality of inferring gtid is not implemented for it.

if engine != connection.MARIADB_ENGINE:
raise Exception("Couldn't find any gtid in state bookmarks to resume logical replication")

LOGGER.info("Couldn't find a gtid in state, will try to infer one from binlog coordinates if they exist ..")
log_file, log_pos = calculate_bookmark(mysql_conn, binlog_streams_map, state)

if not (log_file and log_pos):
raise Exception("No binlog coordinates in state to infer gtid position! Cannot resume logical replication")

min_gtid = _find_gtid_by_binlog_coordinates(mysql_conn, log_file, log_pos)

LOGGER.info('The inferred GTID is "%s", it will be used to resume replication',
min_gtid)
else:
LOGGER.info('The earliest bookmarked GTID found in the state is "%s", and will be used to resume replication',
min_gtid)

return min_gtid


def _find_gtid_by_binlog_coordinates(mysql_conn: MySQLConnection, log_file: str, log_pos: int) -> str:
"""
Finds the equivalent gtid position from the given binlog file and pos.
This only works on MariaDB
Args:
mysql_conn: instance of MySQLConnection
log_file: a binlog file
log_pos: a position in the log file
Returns: gtid position
"""
with connect_with_backoff(mysql_conn) as open_conn:
with open_conn.cursor() as cur:
cur.execute(f"select BINLOG_GTID_POS('{log_file}', {log_pos});")
return cur.fetchone()[0]


def get_min_log_pos_per_log_file(binlog_streams_map, state) -> Dict[str, Dict]:
min_log_pos_per_file = {}

Expand Down Expand Up @@ -808,7 +845,7 @@ def sync_binlog_stream(
log_file = log_pos = gtid = None

if config['use_gtid']:
gtid = calculate_gtid_bookmark(binlog_streams_map, state, config['engine'])
gtid = calculate_gtid_bookmark(mysql_conn, binlog_streams_map, state, config['engine'])
else:
log_file, log_pos = calculate_bookmark(mysql_conn, binlog_streams_map, state)

Expand Down
Loading

0 comments on commit 53c6ccf

Please sign in to comment.