From eb5e989e8cee5cf1154f1f8598344469ec97687c Mon Sep 17 00:00:00 2001 From: Peter Kosztolanyi Date: Wed, 8 Apr 2020 09:24:40 +0100 Subject: [PATCH] [AP-635] Discover only the required tables --- README.md | 9 +- tap_snowflake/__init__.py | 167 +++++++----------------- tap_snowflake/connection.py | 3 +- tests/integration/test_tap_snowflake.py | 125 ++++++++++++++---- tests/integration/utils.py | 30 ++--- 5 files changed, 160 insertions(+), 174 deletions(-) diff --git a/README.md b/README.md index 07ea65c..26b622c 100644 --- a/README.md +++ b/README.md @@ -47,12 +47,15 @@ or "user": "my_user", "password": "password", "warehouse": "my_virtual_warehouse", - "filter_dbs": "database_name", - "filter_schemas": "schema1,schema2" + "tables": "db.schema.table1,db.schema.table2" } ``` -`filter_dbs` and `filter_schemas` are optional. +**Note**: `tables` is a mandatory parameter as well to avoid long running catalog discovery process. +Please specify fully qualified table and view names and only that ones that you need to extract otherwise you can +end up with very long running discovery mode of this tap. Discovery mode is analysing table structures but +Snowflake doesn't like selecting lot of rows from `INFORMATION_SCHEMA` or running `SHOW` commands that returns lot of +rows. Please be as specific as possible. 2. Run it in discovery mode to generate a `properties.json` diff --git a/tap_snowflake/__init__.py b/tap_snowflake/__init__.py index 89f446f..aeec60d 100644 --- a/tap_snowflake/__init__.py +++ b/tap_snowflake/__init__.py @@ -47,7 +47,8 @@ 'dbname', 'user', 'password', - 'warehouse' + 'warehouse', + 'tables' ] # Snowflake data types @@ -118,138 +119,60 @@ def create_column_metadata(cols): return metadata.to_list(mdata) -def get_databases(snowflake_conn): - """Get snowflake databases""" - databases = snowflake_conn.query('SHOW DATABASES', max_records=SHOW_COMMAND_MAX_ROWS) +def get_table_columns(snowflake_conn, tables): + """Get column definitions of a list of tables - # Return only the name of databases as a list - return [db['name'] for db in databases] - - -def get_schemas(snowflake_conn, database): - """Get schemas of a database""" - schemas = [] - try: - schemas = snowflake_conn.query(f'SHOW SCHEMAS IN DATABASE {database}', max_records=SHOW_COMMAND_MAX_ROWS) - - # Get only the name of schemas as a list - schemas = [schema['name'] for schema in schemas] - - # Catch exception when schema not exists and SHOW SCHEMAS throws a ProgrammingError - # Regexp to extract snowflake error code and message from the exception message - # Do nothing if schema not exists - except snowflake.connector.errors.ProgrammingError as exc: - # pylint: disable=anomalous-backslash-in-string - if re.match('.*\(02000\):.*\n.*does not exist.*', str(sys.exc_info()[1])): - pass - else: - raise exc - - return schemas - - -def get_table_columns(snowflake_conn, database, table_schemas=None, table_name=None): - """Get column definitions for every table in specific schemas(s) - - It's using SHOW commands instead of INFORMATION_SCHEMA views bucause information_schemas views are slow + It's using SHOW commands instead of INFORMATION_SCHEMA views because information_schemas views are slow and can cause unexpected exception of: Information schema query returned too much data. Please repeat query with more selective predicates. """ table_columns = [] - if table_schemas or table_name: - for schema in table_schemas: - queries = [] - - LOGGER.info('Getting schema information for %s.%s...', database, schema) - - # Get column data types by SHOW commands - show_tables = f'SHOW TABLES IN SCHEMA {database}.{schema}' - show_views = f'SHOW TABLES IN SCHEMA {database}.{schema}' - show_columns = f'SHOW COLUMNS IN SCHEMA {database}.{schema}' - - # Convert output of SHOW commands to tables and use SQL joins to get every required information - select = f""" - WITH - show_tables AS (SELECT * FROM TABLE(RESULT_SCAN(LAST_QUERY_ID(-3)))), - show_views AS (SELECT * FROM TABLE(RESULT_SCAN(LAST_QUERY_ID(-2)))), - show_columns AS (SELECT * FROM TABLE(RESULT_SCAN(LAST_QUERY_ID(-1)))) - SELECT show_columns."database_name" AS table_catalog - ,show_columns."schema_name" AS table_schema - ,show_columns."table_name" AS table_name - ,CASE - WHEN show_tables."name" IS NOT NULL THEN 'BASE TABLE' - ELSE 'VIEW' - END table_type - ,show_tables."rows" AS row_count - ,show_columns."column_name" AS column_name - -- ---------------------------------------------------------------------------------------- - -- Character and numeric columns display their generic data type rather than their defined - -- data type (i.e. TEXT for all character types, FIXED for all fixed-point numeric types, - -- and REAL for all floating-point numeric types). - -- - -- Further info at https://docs.snowflake.net/manuals/sql-reference/sql/show-columns.html - -- ---------------------------------------------------------------------------------------- - ,CASE PARSE_JSON(show_columns."data_type"):type::varchar - WHEN 'FIXED' THEN 'NUMBER' - WHEN 'REAL' THEN 'FLOAT' - ELSE PARSE_JSON("data_type"):type::varchar - END data_type - ,PARSE_JSON(show_columns."data_type"):length::number AS character_maximum_length - ,PARSE_JSON(show_columns."data_type"):precision::number AS numeric_precision - ,PARSE_JSON(show_columns."data_type"):scale::number AS numeric_scale - FROM show_columns - LEFT JOIN show_tables - ON show_tables."database_name" = show_columns."database_name" - AND show_tables."schema_name" = show_columns."schema_name" - AND show_tables."name" = show_columns."table_name" - LEFT JOIN show_views - ON show_views."database_name" = show_columns."database_name" - AND show_views."schema_name" = show_columns."schema_name" - AND show_views."name" = show_columns."table_name" - """ - queries.extend([show_tables, show_views, show_columns, select]) - - # Run everything in one transaction - try: - columns = snowflake_conn.query(queries, max_records=SHOW_COMMAND_MAX_ROWS) - table_columns.extend(columns) - - # Catch exception when schema not exists and SHOW COLUMNS throws a ProgrammingError - # Regexp to extract snowflake error code and message from the exception message - # Do nothing if schema not exists - except snowflake.connector.errors.ProgrammingError as exc: - # pylint: disable=anomalous-backslash-in-string - if re.match('.*\(02000\):.*\n.*does not exist.*', str(sys.exc_info()[1])): - pass - else: - raise exc + for table in tables: + queries = [] + + LOGGER.info('Getting column information for %s...', table) + + # Get column data types by SHOW commands + show_columns = f"SHOW COLUMNS IN TABLE {table}" + + # Convert output of SHOW commands to tables and use SQL joins to get every required information + select = f""" + WITH + show_columns AS (SELECT * FROM TABLE(RESULT_SCAN(LAST_QUERY_ID(-1)))) + SELECT show_columns."database_name" AS table_catalog + ,show_columns."schema_name" AS table_schema + ,show_columns."table_name" AS table_name + ,show_columns."column_name" AS column_name + -- ---------------------------------------------------------------------------------------- + -- Character and numeric columns display their generic data type rather than their defined + -- data type (i.e. TEXT for all character types, FIXED for all fixed-point numeric types, + -- and REAL for all floating-point numeric types). + -- + -- Further info at https://docs.snowflake.net/manuals/sql-reference/sql/show-columns.html + -- ---------------------------------------------------------------------------------------- + ,CASE PARSE_JSON(show_columns."data_type"):type::varchar + WHEN 'FIXED' THEN 'NUMBER' + WHEN 'REAL' THEN 'FLOAT' + ELSE PARSE_JSON("data_type"):type::varchar + END data_type + ,PARSE_JSON(show_columns."data_type"):length::number AS character_maximum_length + ,PARSE_JSON(show_columns."data_type"):precision::number AS numeric_precision + ,PARSE_JSON(show_columns."data_type"):scale::number AS numeric_scale + FROM show_columns + """ + queries.extend([show_columns, select]) + + # Run everything in one transaction + columns = snowflake_conn.query(queries, max_records=SHOW_COMMAND_MAX_ROWS) + table_columns.extend(columns) return table_columns def discover_catalog(snowflake_conn, config): """Returns a Catalog describing the structure of the database.""" - filter_dbs_config = config.get('filter_dbs') - filter_schemas_config = config.get('filter_schemas') - databases = [] - schemas = [] - - # Get databases - sql_columns = [] - if filter_dbs_config: - databases = filter_dbs_config.split(',') - else: - databases = get_databases(snowflake_conn) - for database in databases: - - # Get schemas - if filter_schemas_config: - schemas = filter_schemas_config.split(',') - else: - schemas = get_schemas(snowflake_conn, database) - - table_columns = get_table_columns(snowflake_conn, database, schemas) - sql_columns.extend(table_columns) + tables = config.get('tables').split(',') + sql_columns = get_table_columns(snowflake_conn, tables) table_info = {} columns = [] diff --git a/tap_snowflake/connection.py b/tap_snowflake/connection.py index eb52e80..c97894a 100644 --- a/tap_snowflake/connection.py +++ b/tap_snowflake/connection.py @@ -36,7 +36,8 @@ def validate_config(config): 'dbname', 'user', 'password', - 'warehouse' + 'warehouse', + 'tables' ] # Check if mandatory keys exist diff --git a/tests/integration/test_tap_snowflake.py b/tests/integration/test_tap_snowflake.py index 4f59a50..a964956 100644 --- a/tests/integration/test_tap_snowflake.py +++ b/tests/integration/test_tap_snowflake.py @@ -1,6 +1,8 @@ +import os import unittest import json import singer +import snowflake.connector import tap_snowflake import tap_snowflake.sync_strategies.common as common @@ -68,110 +70,175 @@ def setUpClass(cls): ,HEX_ENCODE('varbinary') '''.format(SCHEMA_NAME)) - catalog = test_utils.discover_catalog(cls.snowflake_conn) - cls.stream = catalog.streams[0] - cls.schema = catalog.streams[0].schema - cls.metadata = catalog.streams[0].metadata + cur.execute(''' + CREATE TABLE {}.empty_table_1 ( + c_pk INTEGER PRIMARY KEY, + c_int INT + )'''.format(SCHEMA_NAME)) + + cur.execute(''' + CREATE TABLE {}.empty_table_2 ( + c_pk INTEGER PRIMARY KEY, + c_int INT + )'''.format(SCHEMA_NAME)) - def get_metadata_for_column(self, colName): - return next(md for md in self.metadata if md['breadcrumb'] == ('properties', colName))['metadata'] + cur.execute(''' + CREATE VIEW {}.empty_view_1 AS + SELECT c_pk, c_int FROM {}.empty_table_1 + '''.format(SCHEMA_NAME, SCHEMA_NAME)) + + # Discover catalog object including only TEST_TYPE_MAPPING table to run detailed tests later + cls.dt_catalog = test_utils.discover_catalog( + cls.snowflake_conn, + {'tables': f"{SCHEMA_NAME}.test_type_mapping"}) + + cls.dt_stream = cls.dt_catalog.streams[0] + cls.dt_schema = cls.dt_catalog.streams[0].schema + cls.dt_metadata = cls.dt_catalog.streams[0].metadata + + def get_dt_metadata_for_column(self, col_name): + """Helper function to get metadata entry from catalog with TEST_TYPE_MAPPING table""" + return next(md for md in self.dt_metadata if md['breadcrumb'] == ('properties', col_name))['metadata'] + + def test_discover_catalog_with_multiple_table(self): + """Validate if discovering catalog with filter_tables option working as expected""" + # Create config to discover three tables + catalog = test_utils.discover_catalog( + self.snowflake_conn, + {'tables': f'{SCHEMA_NAME}.empty_table_1,{SCHEMA_NAME}.empty_table_2,{SCHEMA_NAME}.test_type_mapping'}) + + # Three tables should be discovered + tap_stream_ids = [s.tap_stream_id for s in catalog.streams] + self.assertCountEqual(tap_stream_ids, + ['ANALYTICS_DB_TEST-TAP_SNOWFLAKE_TEST-EMPTY_TABLE_1', + 'ANALYTICS_DB_TEST-TAP_SNOWFLAKE_TEST-EMPTY_TABLE_2', + 'ANALYTICS_DB_TEST-TAP_SNOWFLAKE_TEST-TEST_TYPE_MAPPING']) + + def test_discover_catalog_with_single_table(self): + """Validate if discovering catalog with filter_tables option working as expected""" + # Create config to discover only one table + catalog = test_utils.discover_catalog( + self.snowflake_conn, {'tables': f'{SCHEMA_NAME}.empty_table_2'}) + + # Only one table should be discovered + tap_stream_ids = [s.tap_stream_id for s in catalog.streams] + self.assertCountEqual(tap_stream_ids, + ['ANALYTICS_DB_TEST-TAP_SNOWFLAKE_TEST-EMPTY_TABLE_2']) + + def test_discover_catalog_with_not_existing_table(self): + """Validate if discovering catalog raises as exception when table not exist""" + # Create config to discover a not existing table + with self.assertRaises(snowflake.connector.errors.ProgrammingError): + test_utils.discover_catalog( + self.snowflake_conn, {'tables': f'{SCHEMA_NAME}.empty_table_2,{SCHEMA_NAME}.not_existing_table'}) + + def test_discover_catalog_with_view(self): + """Validate if discovering catalog with filter_tables option working as expected""" + # Create config to discover only one view + catalog = test_utils.discover_catalog( + self.snowflake_conn, {'tables': f'{SCHEMA_NAME}.empty_view_1'}) + + # Only one view should be discovered + tap_stream_ids = [s.tap_stream_id for s in catalog.streams] + self.assertCountEqual(tap_stream_ids, + ['ANALYTICS_DB_TEST-TAP_SNOWFLAKE_TEST-EMPTY_VIEW_1']) def test_decimal(self): - self.assertEqual(self.schema.properties['C_DECIMAL'], + self.assertEqual(self.dt_schema.properties['C_DECIMAL'], Schema(['null', 'number'], inclusion='available')) - self.assertEqual(self.get_metadata_for_column('C_DECIMAL'), + self.assertEqual(self.get_dt_metadata_for_column('C_DECIMAL'), {'selected-by-default': True, 'sql-datatype': 'number'}) def test_decimal_with_defined_scale_and_precision(self): - self.assertEqual(self.schema.properties['C_DECIMAL_2'], + self.assertEqual(self.dt_schema.properties['C_DECIMAL_2'], Schema(['null', 'number'], inclusion='available')) - self.assertEqual(self.get_metadata_for_column('C_DECIMAL_2'), + self.assertEqual(self.get_dt_metadata_for_column('C_DECIMAL_2'), {'selected-by-default': True, 'sql-datatype': 'number'}) def test_smallint(self): - self.assertEqual(self.schema.properties['C_SMALLINT'], + self.assertEqual(self.dt_schema.properties['C_SMALLINT'], Schema(['null', 'number'], inclusion='available')) - self.assertEqual(self.get_metadata_for_column('C_SMALLINT'), + self.assertEqual(self.get_dt_metadata_for_column('C_SMALLINT'), {'selected-by-default': True, 'sql-datatype': 'number'}) def test_int(self): - self.assertEqual(self.schema.properties['C_INT'], + self.assertEqual(self.dt_schema.properties['C_INT'], Schema(['null', 'number'], inclusion='available')) - self.assertEqual(self.get_metadata_for_column('C_INT'), + self.assertEqual(self.get_dt_metadata_for_column('C_INT'), {'selected-by-default': True, 'sql-datatype': 'number'}) def test_bigint(self): - self.assertEqual(self.schema.properties['C_BIGINT'], + self.assertEqual(self.dt_schema.properties['C_BIGINT'], Schema(['null', 'number'], inclusion='available')) - self.assertEqual(self.get_metadata_for_column('C_BIGINT'), + self.assertEqual(self.get_dt_metadata_for_column('C_BIGINT'), {'selected-by-default': True, 'sql-datatype': 'number'}) def test_float(self): - self.assertEqual(self.schema.properties['C_FLOAT'], + self.assertEqual(self.dt_schema.properties['C_FLOAT'], Schema(['null', 'number'], inclusion='available')) - self.assertEqual(self.get_metadata_for_column('C_FLOAT'), + self.assertEqual(self.get_dt_metadata_for_column('C_FLOAT'), {'selected-by-default': True, 'sql-datatype': 'float'}) def test_double(self): - self.assertEqual(self.schema.properties['C_DOUBLE'], + self.assertEqual(self.dt_schema.properties['C_DOUBLE'], Schema(['null', 'number'], inclusion='available')) - self.assertEqual(self.get_metadata_for_column('C_DOUBLE'), + self.assertEqual(self.get_dt_metadata_for_column('C_DOUBLE'), {'selected-by-default': True, 'sql-datatype': 'float'}) def test_date(self): - self.assertEqual(self.schema.properties['C_DATE'], + self.assertEqual(self.dt_schema.properties['C_DATE'], Schema(['null', 'string'], format='date-time', inclusion='available')) - self.assertEqual(self.get_metadata_for_column('C_DATE'), + self.assertEqual(self.get_dt_metadata_for_column('C_DATE'), {'selected-by-default': True, 'sql-datatype': 'date'}) def test_time(self): - self.assertEqual(self.schema.properties['C_TIME'], + self.assertEqual(self.dt_schema.properties['C_TIME'], Schema(['null', 'string'], format='time', inclusion='available')) - self.assertEqual(self.get_metadata_for_column('C_TIME'), + self.assertEqual(self.get_dt_metadata_for_column('C_TIME'), {'selected-by-default': True, 'sql-datatype': 'time'}) def test_binary(self): - self.assertEqual(self.schema.properties['C_BINARY'], + self.assertEqual(self.dt_schema.properties['C_BINARY'], Schema(['null', 'string'], format='binary', inclusion='available')) - self.assertEqual(self.get_metadata_for_column('C_BINARY'), + self.assertEqual(self.get_dt_metadata_for_column('C_BINARY'), {'selected-by-default': True, 'sql-datatype': 'binary'}) def test_varbinary(self): - self.assertEqual(self.schema.properties['C_VARBINARY'], + self.assertEqual(self.dt_schema.properties['C_VARBINARY'], Schema(['null', 'string'], format='binary', inclusion='available')) - self.assertEqual(self.get_metadata_for_column('C_VARBINARY'), + self.assertEqual(self.get_dt_metadata_for_column('C_VARBINARY'), {'selected-by-default': True, 'sql-datatype': 'binary'}) def test_row_to_singer_record(self): """Select every supported data type from snowflake, generate the singer JSON output message and compare to expected JSON""" - catalog_entry = self.stream + catalog_entry = self.dt_stream columns = list(catalog_entry.schema.properties.keys()) select_sql = common.generate_select_sql(catalog_entry, columns) diff --git a/tests/integration/utils.py b/tests/integration/utils.py index a71d12a..61684db 100644 --- a/tests/integration/utils.py +++ b/tests/integration/utils.py @@ -7,23 +7,16 @@ SCHEMA_NAME='tap_snowflake_test' -def get_db_config(): - config = {} - config['account'] = os.environ.get('TAP_SNOWFLAKE_ACCOUNT') - config['dbname'] = os.environ.get('TAP_SNOWFLAKE_DBNAME') - config['user'] = os.environ.get('TAP_SNOWFLAKE_USER') - config['password'] = os.environ.get('TAP_SNOWFLAKE_PASSWORD') - config['warehouse'] = os.environ.get('TAP_SNOWFLAKE_WAREHOUSE') - - return config - -def get_tap_config(): - config = {} - config['filter_dbs'] = os.environ.get('TAP_SNOWFLAKE_DBNAME') - config['filter_schemas'] = SCHEMA_NAME - - return config +def get_db_config(): + return { + 'account': os.environ.get('TAP_SNOWFLAKE_ACCOUNT'), + 'dbname': os.environ.get('TAP_SNOWFLAKE_DBNAME'), + 'user': os.environ.get('TAP_SNOWFLAKE_USER'), + 'password': os.environ.get('TAP_SNOWFLAKE_PASSWORD'), + 'warehouse': os.environ.get('TAP_SNOWFLAKE_WAREHOUSE'), + 'tables': 'FAKE_TABLES' + } def get_test_connection(): @@ -41,9 +34,8 @@ def get_test_connection(): return snowflake_conn -def discover_catalog(snowflake_conn): - tap_config = get_tap_config() - catalog = tap_snowflake.discover_catalog(snowflake_conn, tap_config) +def discover_catalog(snowflake_conn, config): + catalog = tap_snowflake.discover_catalog(snowflake_conn, config) streams = [] for stream in catalog.streams: