Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
[AP-635] Discover only the required tables
Browse files Browse the repository at this point in the history
  • Loading branch information
koszti committed Apr 8, 2020
1 parent f19c71b commit eb5e989
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 174 deletions.
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,15 @@ or
"user": "my_user",
"password": "password",
"warehouse": "my_virtual_warehouse",
"filter_dbs": "database_name",
"filter_schemas": "schema1,schema2"
"tables": "db.schema.table1,db.schema.table2"
}
```

`filter_dbs` and `filter_schemas` are optional.
**Note**: `tables` is a mandatory parameter as well to avoid long running catalog discovery process.
Please specify fully qualified table and view names and only that ones that you need to extract otherwise you can
end up with very long running discovery mode of this tap. Discovery mode is analysing table structures but
Snowflake doesn't like selecting lot of rows from `INFORMATION_SCHEMA` or running `SHOW` commands that returns lot of
rows. Please be as specific as possible.

2. Run it in discovery mode to generate a `properties.json`

Expand Down
167 changes: 45 additions & 122 deletions tap_snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
'dbname',
'user',
'password',
'warehouse'
'warehouse',
'tables'
]

# Snowflake data types
Expand Down Expand Up @@ -118,138 +119,60 @@ def create_column_metadata(cols):
return metadata.to_list(mdata)


def get_databases(snowflake_conn):
"""Get snowflake databases"""
databases = snowflake_conn.query('SHOW DATABASES', max_records=SHOW_COMMAND_MAX_ROWS)
def get_table_columns(snowflake_conn, tables):
"""Get column definitions of a list of tables
# Return only the name of databases as a list
return [db['name'] for db in databases]


def get_schemas(snowflake_conn, database):
"""Get schemas of a database"""
schemas = []
try:
schemas = snowflake_conn.query(f'SHOW SCHEMAS IN DATABASE {database}', max_records=SHOW_COMMAND_MAX_ROWS)

# Get only the name of schemas as a list
schemas = [schema['name'] for schema in schemas]

# Catch exception when schema not exists and SHOW SCHEMAS throws a ProgrammingError
# Regexp to extract snowflake error code and message from the exception message
# Do nothing if schema not exists
except snowflake.connector.errors.ProgrammingError as exc:
# pylint: disable=anomalous-backslash-in-string
if re.match('.*\(02000\):.*\n.*does not exist.*', str(sys.exc_info()[1])):
pass
else:
raise exc

return schemas


def get_table_columns(snowflake_conn, database, table_schemas=None, table_name=None):
"""Get column definitions for every table in specific schemas(s)
It's using SHOW commands instead of INFORMATION_SCHEMA views bucause information_schemas views are slow
It's using SHOW commands instead of INFORMATION_SCHEMA views because information_schemas views are slow
and can cause unexpected exception of:
Information schema query returned too much data. Please repeat query with more selective predicates.
"""
table_columns = []
if table_schemas or table_name:
for schema in table_schemas:
queries = []

LOGGER.info('Getting schema information for %s.%s...', database, schema)

# Get column data types by SHOW commands
show_tables = f'SHOW TABLES IN SCHEMA {database}.{schema}'
show_views = f'SHOW TABLES IN SCHEMA {database}.{schema}'
show_columns = f'SHOW COLUMNS IN SCHEMA {database}.{schema}'

# Convert output of SHOW commands to tables and use SQL joins to get every required information
select = f"""
WITH
show_tables AS (SELECT * FROM TABLE(RESULT_SCAN(LAST_QUERY_ID(-3)))),
show_views AS (SELECT * FROM TABLE(RESULT_SCAN(LAST_QUERY_ID(-2)))),
show_columns AS (SELECT * FROM TABLE(RESULT_SCAN(LAST_QUERY_ID(-1))))
SELECT show_columns."database_name" AS table_catalog
,show_columns."schema_name" AS table_schema
,show_columns."table_name" AS table_name
,CASE
WHEN show_tables."name" IS NOT NULL THEN 'BASE TABLE'
ELSE 'VIEW'
END table_type
,show_tables."rows" AS row_count
,show_columns."column_name" AS column_name
-- ----------------------------------------------------------------------------------------
-- Character and numeric columns display their generic data type rather than their defined
-- data type (i.e. TEXT for all character types, FIXED for all fixed-point numeric types,
-- and REAL for all floating-point numeric types).
--
-- Further info at https://docs.snowflake.net/manuals/sql-reference/sql/show-columns.html
-- ----------------------------------------------------------------------------------------
,CASE PARSE_JSON(show_columns."data_type"):type::varchar
WHEN 'FIXED' THEN 'NUMBER'
WHEN 'REAL' THEN 'FLOAT'
ELSE PARSE_JSON("data_type"):type::varchar
END data_type
,PARSE_JSON(show_columns."data_type"):length::number AS character_maximum_length
,PARSE_JSON(show_columns."data_type"):precision::number AS numeric_precision
,PARSE_JSON(show_columns."data_type"):scale::number AS numeric_scale
FROM show_columns
LEFT JOIN show_tables
ON show_tables."database_name" = show_columns."database_name"
AND show_tables."schema_name" = show_columns."schema_name"
AND show_tables."name" = show_columns."table_name"
LEFT JOIN show_views
ON show_views."database_name" = show_columns."database_name"
AND show_views."schema_name" = show_columns."schema_name"
AND show_views."name" = show_columns."table_name"
"""
queries.extend([show_tables, show_views, show_columns, select])

# Run everything in one transaction
try:
columns = snowflake_conn.query(queries, max_records=SHOW_COMMAND_MAX_ROWS)
table_columns.extend(columns)

# Catch exception when schema not exists and SHOW COLUMNS throws a ProgrammingError
# Regexp to extract snowflake error code and message from the exception message
# Do nothing if schema not exists
except snowflake.connector.errors.ProgrammingError as exc:
# pylint: disable=anomalous-backslash-in-string
if re.match('.*\(02000\):.*\n.*does not exist.*', str(sys.exc_info()[1])):
pass
else:
raise exc
for table in tables:
queries = []

LOGGER.info('Getting column information for %s...', table)

# Get column data types by SHOW commands
show_columns = f"SHOW COLUMNS IN TABLE {table}"

# Convert output of SHOW commands to tables and use SQL joins to get every required information
select = f"""
WITH
show_columns AS (SELECT * FROM TABLE(RESULT_SCAN(LAST_QUERY_ID(-1))))
SELECT show_columns."database_name" AS table_catalog
,show_columns."schema_name" AS table_schema
,show_columns."table_name" AS table_name
,show_columns."column_name" AS column_name
-- ----------------------------------------------------------------------------------------
-- Character and numeric columns display their generic data type rather than their defined
-- data type (i.e. TEXT for all character types, FIXED for all fixed-point numeric types,
-- and REAL for all floating-point numeric types).
--
-- Further info at https://docs.snowflake.net/manuals/sql-reference/sql/show-columns.html
-- ----------------------------------------------------------------------------------------
,CASE PARSE_JSON(show_columns."data_type"):type::varchar
WHEN 'FIXED' THEN 'NUMBER'
WHEN 'REAL' THEN 'FLOAT'
ELSE PARSE_JSON("data_type"):type::varchar
END data_type
,PARSE_JSON(show_columns."data_type"):length::number AS character_maximum_length
,PARSE_JSON(show_columns."data_type"):precision::number AS numeric_precision
,PARSE_JSON(show_columns."data_type"):scale::number AS numeric_scale
FROM show_columns
"""
queries.extend([show_columns, select])

# Run everything in one transaction
columns = snowflake_conn.query(queries, max_records=SHOW_COMMAND_MAX_ROWS)
table_columns.extend(columns)

return table_columns


def discover_catalog(snowflake_conn, config):
"""Returns a Catalog describing the structure of the database."""
filter_dbs_config = config.get('filter_dbs')
filter_schemas_config = config.get('filter_schemas')
databases = []
schemas = []

# Get databases
sql_columns = []
if filter_dbs_config:
databases = filter_dbs_config.split(',')
else:
databases = get_databases(snowflake_conn)
for database in databases:

# Get schemas
if filter_schemas_config:
schemas = filter_schemas_config.split(',')
else:
schemas = get_schemas(snowflake_conn, database)

table_columns = get_table_columns(snowflake_conn, database, schemas)
sql_columns.extend(table_columns)
tables = config.get('tables').split(',')
sql_columns = get_table_columns(snowflake_conn, tables)

table_info = {}
columns = []
Expand Down
3 changes: 2 additions & 1 deletion tap_snowflake/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ def validate_config(config):
'dbname',
'user',
'password',
'warehouse'
'warehouse',
'tables'
]

# Check if mandatory keys exist
Expand Down
Loading

0 comments on commit eb5e989

Please sign in to comment.