Skip to content

Commit

Permalink
sped up discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
Josh Lloyd committed Apr 22, 2023
1 parent eb5bbf6 commit 87e8891
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 79 deletions.
195 changes: 122 additions & 73 deletions tap_mysql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,29 @@

from __future__ import annotations

from typing import cast
import collections
import datetime
import itertools
from typing import Iterator, cast

import sqlalchemy
from singer_sdk import SQLConnector, SQLStream
from singer_sdk import typing as th
from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema
from sqlalchemy import text

Column = collections.namedtuple(
"Column",
[
"table_schema",
"table_name",
"column_name",
"column_type",
"is_nullable",
"column_key",
],
)


class MySQLConnector(SQLConnector):
"""Connects to the MySQL SQL source."""
Expand Down Expand Up @@ -73,67 +88,20 @@ def to_sql_type(jsonschema_type: dict) -> sqlalchemy.types.TypeEngine:
# You may delete this method if overrides are not needed.
return SQLConnector.to_sql_type(jsonschema_type)

def get_instance_schema(self) -> dict:
"""Return list of data necessary to construct a complete catalog.
Returns:
dict
"""
with self._engine.connect() as connection:
query = text(
"""
SELECT c.table_schema,
c.table_name,
t.table_type,
c.column_name,
c.data_type,
c.is_nullable,
c.column_key
FROM information_schema.columns AS c
LEFT JOIN information_schema.tables AS t
ON c.table_name = t.table_name
WHERE c.table_schema NOT IN (
'information_schema'
, 'performance_schema'
, 'mysql'
, 'sys'
)
ORDER BY table_schema, table_name, column_name
-- LIMIT 40
"""
)
result = connection.execute(query).fetchall()
instance_schema = {}

# Parse data into useable python objects
for row in result:
db_schema = row[0]
table = row[1]
column_def = {
"name": row[3],
"type": row[4],
"nullable": row[5] == "YES",
"key_type": row[6],
}
table_def = {"table_type": row[2], "columns": [column_def]}
if db_schema not in instance_schema:
instance_schema[db_schema] = {table: table_def}
elif table not in instance_schema[db_schema]:
instance_schema[db_schema][table] = table_def
else:
instance_schema[db_schema][table]["columns"].append(column_def)

return instance_schema

def create_catalog_entry(
self, db_schema_name: str, table_name: str, table_def: dict
self,
db_schema_name: str,
table_name: str,
table_def: dict,
columns: Iterator[Column],
) -> CatalogEntry:
"""Create `CatalogEntry` object for the given table or a view.
Args:
db_schema_name: Name of the MySQL Schema being cataloged.
table_name: Name of the MySQL Table being cataloged.
table_def: A dict defining relevant elements of the table.
columns: list of named tuples describing the column.
Returns:
CatalogEntry
Expand All @@ -148,23 +116,19 @@ def create_catalog_entry(
# Detect column properties
primary_keys: list[str] = []
table_schema = th.PropertiesList()
for column_def in table_def["columns"]:
column_name = column_def["name"]
key_type = column_def["key_type"]
is_nullable = column_def["nullable"]

if key_type == "PRI":
primary_keys.append(column_name)
for col in columns:
if col.column_key == "PRI":
primary_keys.append(col.column_name)

# Initialize columns list
jsonschema_type: dict = self.to_jsonschema_type(
cast(sqlalchemy.types.TypeEngine, column_def["type"]),
cast(sqlalchemy.types.TypeEngine, col.column_type),
)
table_schema.append(
th.Property(
name=column_name,
name=col.column_name,
wrapped=th.CustomType(jsonschema_type),
required=not is_nullable,
required=not col.is_nullable,
),
)
schema = table_schema.to_dict()
Expand All @@ -176,7 +140,7 @@ def create_catalog_entry(
table=table_name,
key_properties=primary_keys,
schema=Schema.from_dict(schema),
is_view=table_def["table_type"] == "VIEW",
is_view=table_def[db_schema_name][table_name]["is_view"] == "VIEW",
replication_method=replication_method, # Can be defined by user
metadata=MetadataMapping.get_standard_metadata(
schema_name=db_schema_name,
Expand All @@ -199,16 +163,101 @@ def discover_catalog_entries(self) -> list[dict]:
Returns:
The discovered catalog entries as a list.
"""
result: list[dict] = []
instance_schema = self.get_instance_schema()
self.logger.info(f"discover catalog start: {datetime.datetime.now()}")
entries: list[dict] = []

for schema_name, schema_def in instance_schema.items():
# Iterate through each tables and views
for table_name, table_def in schema_def.items():
entry = self.create_catalog_entry(schema_name, table_name, table_def)
result.append(entry.to_dict())
with self._engine.connect() as connection:
table_query = text(
"""
SELECT
table_schema
, table_name
, table_type
FROM information_schema.tables
WHERE table_schema NOT IN (
'information_schema'
, 'performance_schema'
, 'mysql'
, 'sys'
)
ORDER BY table_schema, table_name
"""
)
self.logger.info(f"start table query: {datetime.datetime.now()}")
table_results = connection.execute(table_query).fetchall()
self.logger.info(f" end table query: {datetime.datetime.now()}")
table_defs: dict = {}

return result
for mysql_schema, table, table_type in table_results:
if mysql_schema not in table_defs:
table_defs[mysql_schema] = {}

table_defs[mysql_schema][table] = {"is_view": table_type == "VIEW"}

col_query = text(
"""
SELECT
table_schema
, table_name
, column_name
, column_type
, is_nullable
, column_key
FROM information_schema.columns
WHERE table_schema NOT IN (
'information_schema'
, 'performance_schema'
, 'mysql'
, 'sys'
)
ORDER BY table_schema, table_name, column_name
-- LIMIT 40
"""
)
self.logger.info(f"start col query: {datetime.datetime.now()}")
col_result = connection.execute(col_query)
self.logger.info(f" end col query: {datetime.datetime.now()}")

# Parse data into useable python objects
columns = []
rec = col_result.fetchone()
while rec is not None:
columns.append(Column(*rec))
rec = col_result.fetchone()

for k, cols in itertools.groupby(
columns, lambda c: (c.table_schema, c.table_name)
):
# cols = list(cols)
mysql_schema, table_name = k

entry = self.create_catalog_entry(
db_schema_name=mysql_schema,
table_name=table_name,
table_def=table_defs,
columns=cols,
)
entries.append(entry.to_dict())

# for row in result:
# db_schema = row[0]
# table = row[1]
# column_def = {
# "name": row[3],
# "type": row[4],
# "nullable": row[5] == "YES",
# "key_type": row[6],
# }
# table_def = {"table_type": row[2], "columns": [column_def]}
# if db_schema not in instance_schema:
# instance_schema[db_schema] = {table: table_def}
# elif table not in instance_schema[db_schema]:
# instance_schema[db_schema][table] = table_def
# else:
# instance_schema[db_schema][table]["columns"].append(column_def)

self.logger.info(f"discover catalog end : {datetime.datetime.now()}")
return entries


class MySQLStream(SQLStream):
Expand Down
33 changes: 32 additions & 1 deletion tap_mysql/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

from __future__ import annotations

from singer_sdk import SQLTap
from typing import final

from singer_sdk import SQLTap, Stream
from singer_sdk import typing as th

from tap_mysql.client import MySQLStream
Expand Down Expand Up @@ -42,6 +44,35 @@ class TapMySQL(SQLTap):
),
).to_dict()

# not supposed to do this but the logs of deselected streams are a drag
@final
def sync_all(self) -> None:
"""Sync all streams."""
self._reset_state_progress_markers()
self._set_compatible_replication_methods()
stream: Stream
for stream in self.streams.values():
if not stream.selected and not stream.has_selected_descendents:
self.logger.debug(f"Skipping deselected stream '{stream.name}'.")
continue

if stream.parent_stream_type:
self.logger.debug(
f"Child stream '{type(stream).__name__}' is expected to be called "
f"by parent stream '{stream.parent_stream_type.__name__}'. "
"Skipping direct invocation.",
)
continue

stream.sync()
stream.finalize_state_progress_markers()
stream._write_state_message()

# this second loop is needed for all streams to print out their costs
# including child streams which are otherwise skipped in the loop above
for stream in self.streams.values():
stream.log_sync_costs()


if __name__ == "__main__":
TapMySQL.cli()
6 changes: 4 additions & 2 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@


SAMPLE_CONFIG = {
"start_date": datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d")
# TODO: Initialize minimal tap config
"host": "host_config",
"port": "3306",
"user": "user_config",
"password": "password_config",
}


Expand Down
7 changes: 4 additions & 3 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ commands =
# To execute, run `tox -e lint`
commands =
poetry install -v
poetry run black --check --diff tap_mysql/
poetry run isort --check tap_mysql
poetry run black tap_mysql/
poetry run isort tap_mysql
poetry run flake8 tap_mysql
poetry run pydocstyle tap_mysql
# refer to mypy.ini for specific settings
poetry run mypy . --exclude='tests'
poetry run mypy tap_mysql --exclude='tap_mysql/tests'

[flake8]
docstring-convention = google
Expand Down

0 comments on commit 87e8891

Please sign in to comment.