diff --git a/README.md b/README.md index a9136e6..803238c 100644 --- a/README.md +++ b/README.md @@ -65,24 +65,25 @@ Create a config file containing the database connection credentials, see [sample List of config parameters: -| Parameter | type | required | default | description | -|-------------------|-------------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------| -| host | string | yes | - | mysql/mariadb host | -| port | int | yes | - | mysql/mariadb port | -| user | string | yes | - | db username | -| password | string | yes | - | db password | -| cursorclass | string | No | `pymysql.cursors.SSCursor` | set cursorclass used by PyMYSQL | -| database | string | No | - | Database to use, None to not use a particular one. Used by PyMYSQL | -| server_id | int | False | Randomly generated int | Used as the slave id when this tap is connecting to the server | -| filter_dbs | string | False | - | Comma separated list of schemas to extract tables only from particular schemas and to improve data extraction performance | -| use_gtid | bool | False | False
| Flag to enable log based replication using GTID | -| engine | string ('mysql' or 'mariadb') | False | 'mysql' | Indicate which flavor the server is, used for LOG_BASED with GTID | -| ssl | string ("true") | No | False | Enable SSL connection | -| ssl_ca | string | No | - | for self-signed SSL | -| ssl_cert | string | No | - | for self-signed SSL | -| ssl_key | string | No | - | for self-signed SSL | -| internal_hostname | string | No | - | Override match hostname for google cloud | -| session_sqls | List of strings | No | ```['SET @@session.time_zone="+0:00"', 'SET @@session.wait_timeout=28800', 'SET @@session.net_read_timeout=3600', 'SET @@session.innodb_lock_wait_timeout=3600']``` | Set session variables dynamically. | +| Parameter | type | required | default | description | +|-------------------|-------------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------| +| host | string | yes | - | mysql/mariadb host | +| port | int | yes | - | mysql/mariadb port | +| user | string | yes | - | db username | +| password | string | yes | - | db password | +| cursorclass | string | No | `pymysql.cursors.SSCursor` | set cursorclass used by PyMYSQL | +| database | string | No | - | Database to use, None to not use a particular one. Used by PyMYSQL | +| server_id | int | False | Randomly generated int | Used as the slave id when this tap is connecting to the server | +| filter_dbs | string | False | - | Comma separated list of schemas to extract tables only from particular schemas and to improve data extraction performance| +| tables | string | False |
| Comma separated list of tables to extract from a databases | +| use_gtid | bool | False | False
| Flag to enable log based replication using GTID | +| engine | string ('mysql' or 'mariadb') | False | 'mysql' | Indicate which flavor the server is, used for LOG_BASED with GTID | +| ssl | string ("true") | No | False | Enable SSL connection | +| ssl_ca | string | No | - | for self-signed SSL | +| ssl_cert | string | No | - | for self-signed SSL | +| ssl_key | string | No | - | for self-signed SSL | +| internal_hostname | string | No | - | Override match hostname for google cloud | +| session_sqls | List of strings | No | ```['SET @@session.time_zone="+0:00"', 'SET @@session.wait_timeout=28800', 'SET @@session.net_read_timeout=3600', 'SET @@session.innodb_lock_wait_timeout=3600']``` | Set session variables dynamically. | ### Discovery mode diff --git a/tap_mysql/__init__.py b/tap_mysql/__init__.py index 63641ad..b12fad2 100644 --- a/tap_mysql/__init__.py +++ b/tap_mysql/__init__.py @@ -27,7 +27,11 @@ def do_discover(mysql_conn, config): - discover_catalog(mysql_conn, config.get('filter_dbs')).dump() + discover_catalog( + mysql_conn=mysql_conn, + dbs=config.get('filter_dbs'), + tables=config.get('tables') + ).dump() def log_engine(mysql_conn, catalog_entry): @@ -116,7 +120,11 @@ def get_non_binlog_streams(mysql_conn, catalog, config, state): 3. any streams that do not have a replication method of LOG_BASED """ - discovered = discover_catalog(mysql_conn, config.get('filter_dbs')) + discovered = discover_catalog( + mysql_conn=mysql_conn, + dbs=config.get('filter_dbs'), + tables=config.get('tables') + ) # Filter catalog to include only selected streams selected_streams = list(filter(common.stream_is_selected, catalog.streams)) @@ -170,7 +178,11 @@ def get_non_binlog_streams(mysql_conn, catalog, config, state): def get_binlog_streams(mysql_conn, catalog, config, state): - discovered = discover_catalog(mysql_conn, config.get('filter_dbs')) + discovered = discover_catalog( + mysql_conn=mysql_conn, + dbs=config.get('filter_dbs'), + tables=config.get('tables') + ) selected_streams = list(filter(common.stream_is_selected, catalog.streams)) binlog_streams = [] diff --git a/tests/integration/test_tap_mysql.py b/tests/integration/test_tap_mysql.py index 0af047d..0f86686 100644 --- a/tests/integration/test_tap_mysql.py +++ b/tests/integration/test_tap_mysql.py @@ -1473,3 +1473,69 @@ def tearDown(self) -> None: with connect_with_backoff(self.conn) as open_conn: with open_conn.cursor() as cursor: cursor.execute('DROP TABLE bit_booleans_table;') + + +class TestTableSelector(unittest.TestCase): + + def setUp(self): + self.conn = test_utils.get_test_connection() + + with connect_with_backoff(self.conn) as open_conn: + with open_conn.cursor() as cursor: + cursor.execute(""" + CREATE TABLE table_1 ( + `id` int, + a INTEGER) + """) + cursor.execute(""" + CREATE TABLE table_2 ( + `id_2` int, + b INTEGER) + """) + cursor.execute(""" + INSERT INTO table_1(`id`,`a`) VALUES (1, 69) + """) + cursor.execute(""" + INSERT INTO table_2(`id_2`,`b`) VALUES (1, 70) + """) + + def testSelectTable1Catalog(self): + catalog = tap_mysql.discover_catalog( + mysql_conn=self.conn, + tables='table_1' + ) + + self.assertEqual(len(catalog.streams), 1) + self.assertEqual(catalog.streams[0].table, 'table_1') + + def testIncrementalSync(self): + catalog = tap_mysql.discover_catalog( + mysql_conn=self.conn, + tables='table_1' + ) + + self.assertEqual(len(catalog.streams), 1) + catalog.streams[0] = test_utils.set_replication_method_and_key( + stream=catalog.streams[0], + r_method='INCREMENTAL', + r_key='id' + ) + catalog.streams[0] = test_utils.set_selected(catalog.streams[0], True) + + global SINGER_MESSAGES + SINGER_MESSAGES.clear() + + tap_mysql.do_sync(self.conn, {}, catalog, {}) + + record_messages = list(filter(lambda m: isinstance(m, singer.RecordMessage), SINGER_MESSAGES)) + + self.assertEqual(len(record_messages), 1) + self.assertListEqual([ + {'id': 1, 'a': 69}, + ], [rec.record for rec in record_messages]) + + def tearDown(self) -> None: + with connect_with_backoff(self.conn) as open_conn: + with open_conn.cursor() as cursor: + cursor.execute('DROP TABLE table_1;') + cursor.execute('DROP TABLE table_2;')