Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

[NO-REF] Support tables config in get_binlog_streams and get_non_binlog_streams #154

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 19 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,24 +65,25 @@ Create a config file containing the database connection credentials, see [sample

List of config parameters:

| Parameter | type | required | default | description |
|-------------------|-------------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------|
| host | string | yes | - | mysql/mariadb host |
| port | int | yes | - | mysql/mariadb port |
| user | string | yes | - | db username |
| password | string | yes | - | db password |
| cursorclass | string | No | `pymysql.cursors.SSCursor` | set cursorclass used by PyMYSQL |
| database | string | No | - | Database to use, None to not use a particular one. Used by PyMYSQL |
| server_id | int | False | Randomly generated int | Used as the slave id when this tap is connecting to the server |
| filter_dbs | string | False | - | Comma separated list of schemas to extract tables only from particular schemas and to improve data extraction performance |
| use_gtid | bool | False | False <br/> | Flag to enable log based replication using GTID |
| engine | string ('mysql' or 'mariadb') | False | 'mysql' | Indicate which flavor the server is, used for LOG_BASED with GTID |
| ssl | string ("true") | No | False | Enable SSL connection |
| ssl_ca | string | No | - | for self-signed SSL |
| ssl_cert | string | No | - | for self-signed SSL |
| ssl_key | string | No | - | for self-signed SSL |
| internal_hostname | string | No | - | Override match hostname for google cloud |
| session_sqls | List of strings | No | ```['SET @@session.time_zone="+0:00"', 'SET @@session.wait_timeout=28800', 'SET @@session.net_read_timeout=3600', 'SET @@session.innodb_lock_wait_timeout=3600']``` | Set session variables dynamically. |
| Parameter | type | required | default | description |
|-------------------|-------------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------|
| host | string | yes | - | mysql/mariadb host |
| port | int | yes | - | mysql/mariadb port |
| user | string | yes | - | db username |
| password | string | yes | - | db password |
| cursorclass | string | No | `pymysql.cursors.SSCursor` | set cursorclass used by PyMYSQL |
| database | string | No | - | Database to use, None to not use a particular one. Used by PyMYSQL |
| server_id | int | False | Randomly generated int | Used as the slave id when this tap is connecting to the server |
| filter_dbs | string | False | - | Comma separated list of schemas to extract tables only from particular schemas and to improve data extraction performance|
| tables | string | False | <br/> | Comma separated list of tables to extract from a databases |
| use_gtid | bool | False | False <br/> | Flag to enable log based replication using GTID |
| engine | string ('mysql' or 'mariadb') | False | 'mysql' | Indicate which flavor the server is, used for LOG_BASED with GTID |
| ssl | string ("true") | No | False | Enable SSL connection |
| ssl_ca | string | No | - | for self-signed SSL |
| ssl_cert | string | No | - | for self-signed SSL |
| ssl_key | string | No | - | for self-signed SSL |
| internal_hostname | string | No | - | Override match hostname for google cloud |
| session_sqls | List of strings | No | ```['SET @@session.time_zone="+0:00"', 'SET @@session.wait_timeout=28800', 'SET @@session.net_read_timeout=3600', 'SET @@session.innodb_lock_wait_timeout=3600']``` | Set session variables dynamically. |


### Discovery mode
Expand Down
18 changes: 15 additions & 3 deletions tap_mysql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@


def do_discover(mysql_conn, config):
discover_catalog(mysql_conn, config.get('filter_dbs')).dump()
discover_catalog(
mysql_conn=mysql_conn,
dbs=config.get('filter_dbs'),
tables=config.get('tables')
).dump()


def log_engine(mysql_conn, catalog_entry):
Expand Down Expand Up @@ -116,7 +120,11 @@ def get_non_binlog_streams(mysql_conn, catalog, config, state):
3. any streams that do not have a replication method of LOG_BASED

"""
discovered = discover_catalog(mysql_conn, config.get('filter_dbs'))
discovered = discover_catalog(
mysql_conn=mysql_conn,
dbs=config.get('filter_dbs'),
tables=config.get('tables')
)

# Filter catalog to include only selected streams
selected_streams = list(filter(common.stream_is_selected, catalog.streams))
Expand Down Expand Up @@ -170,7 +178,11 @@ def get_non_binlog_streams(mysql_conn, catalog, config, state):


def get_binlog_streams(mysql_conn, catalog, config, state):
discovered = discover_catalog(mysql_conn, config.get('filter_dbs'))
discovered = discover_catalog(
mysql_conn=mysql_conn,
dbs=config.get('filter_dbs'),
tables=config.get('tables')
)

selected_streams = list(filter(common.stream_is_selected, catalog.streams))
binlog_streams = []
Expand Down
66 changes: 66 additions & 0 deletions tests/integration/test_tap_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1473,3 +1473,69 @@ def tearDown(self) -> None:
with connect_with_backoff(self.conn) as open_conn:
with open_conn.cursor() as cursor:
cursor.execute('DROP TABLE bit_booleans_table;')


class TestTableSelector(unittest.TestCase):

def setUp(self):
self.conn = test_utils.get_test_connection()

with connect_with_backoff(self.conn) as open_conn:
with open_conn.cursor() as cursor:
cursor.execute("""
CREATE TABLE table_1 (
`id` int,
a INTEGER)
""")
cursor.execute("""
CREATE TABLE table_2 (
`id_2` int,
b INTEGER)
""")
cursor.execute("""
INSERT INTO table_1(`id`,`a`) VALUES (1, 69)
""")
cursor.execute("""
INSERT INTO table_2(`id_2`,`b`) VALUES (1, 70)
""")

def testSelectTable1Catalog(self):
catalog = tap_mysql.discover_catalog(
mysql_conn=self.conn,
tables='table_1'
)

self.assertEqual(len(catalog.streams), 1)
self.assertEqual(catalog.streams[0].table, 'table_1')

def testIncrementalSync(self):
catalog = tap_mysql.discover_catalog(
mysql_conn=self.conn,
tables='table_1'
)

self.assertEqual(len(catalog.streams), 1)
catalog.streams[0] = test_utils.set_replication_method_and_key(
stream=catalog.streams[0],
r_method='INCREMENTAL',
r_key='id'
)
catalog.streams[0] = test_utils.set_selected(catalog.streams[0], True)

global SINGER_MESSAGES
SINGER_MESSAGES.clear()

tap_mysql.do_sync(self.conn, {}, catalog, {})

record_messages = list(filter(lambda m: isinstance(m, singer.RecordMessage), SINGER_MESSAGES))

self.assertEqual(len(record_messages), 1)
self.assertListEqual([
{'id': 1, 'a': 69},
], [rec.record for rec in record_messages])

def tearDown(self) -> None:
with connect_with_backoff(self.conn) as open_conn:
with open_conn.cursor() as cursor:
cursor.execute('DROP TABLE table_1;')
cursor.execute('DROP TABLE table_2;')