Skip to content

Commit

Permalink
fixed tests and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
jlloyd-widen committed Jan 12, 2024
1 parent 6288e87 commit 3608db9
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 67 deletions.
98 changes: 57 additions & 41 deletions tap_mysql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@
class MySQLConnector(SQLConnector):
"""Connects to the MySQL SQL source."""

def __init__(self, config: dict | None = None):
def __init__(self, config: dict = {}):
"""Initialize MySQL connector.
Args:
config: A dict with connection parameters
"""
super().__init__(config, self.get_sqlalchemy_url(config))

def get_sqlalchemy_url(cls, config: dict) -> str:
Expand Down Expand Up @@ -241,58 +247,61 @@ def discover_catalog_entries(self) -> list[dict]:

# append custom stream catalog entries
custom_streams = self.config.get("custom_streams")
for stream_config in custom_streams:
for table_schema in stream_config.get("db_schemas"):
table_name = stream_config.get("name")
primary_keys = stream_config.get("primary_keys")

query = text(
stream_config.get("sql").replace("{db_schema}", table_schema)
)
custom_result = connection.execute(query)
custom_rec = custom_result.fetchone()
# inject the table_schema into the list of columns
custom_rec_keys = list(custom_rec.keys()) + ["mysql_schema"]

# note that all columns are forced to be strings to avoid
# the complexity of inferring their data types. Warning this could
# cause issues in the loss of precision of data
custom_columns = []
for col in custom_rec_keys:
custom_columns.append(
Column(
table_schema=table_schema,
table_name=table_name,
column_name=col,
column_type="STRING",
is_nullable="YES",
column_key="PRI" if col in primary_keys else None,
if custom_streams:
for stream_config in custom_streams:
for table_schema in stream_config.get("db_schemas"):
table_name = stream_config.get("name")
primary_keys = stream_config.get("primary_keys")

query = text(
stream_config.get("sql").replace(
"{db_schema}", table_schema
)
)
custom_result = connection.execute(query)
custom_rec = custom_result.fetchone()
# inject the table_schema into the list of columns
custom_rec_keys = list(custom_rec.keys()) + ["mysql_schema"]

# note that all columns are forced to be strings to avoid
# the complexity of inferring their data types. Warning this
# could cause issues in the loss of precision of data
custom_columns = []
for col in custom_rec_keys:
custom_columns.append(
Column(
table_schema=table_schema,
table_name=table_name,
column_name=col,
column_type="STRING",
is_nullable="YES",
column_key="PRI" if col in primary_keys else None,
)
)

entry = self.create_catalog_entry(
db_schema_name=table_schema,
table_name=table_name,
table_def={table_schema: {table_name: {"is_view": False}}},
columns=iter(custom_columns),
)
entries.append(entry.to_dict())
entry = self.create_catalog_entry(
db_schema_name=table_schema,
table_name=table_name,
table_def={table_schema: {table_name: {"is_view": False}}},
columns=iter(custom_columns),
)
entries.append(entry.to_dict())

return entries


class MySQLStream(SQLStream):
"""Stream class for MySQL streams."""

connector_class = MySQLConnector
connector_class = MySQLConnector # type: ignore


class CustomMySQLStream(SQLStream):
"""Custom stream class for MySQL streams."""

connector_class = MySQLConnector
name = None
query = None
connector_class = MySQLConnector # type: ignore
name = ""
query: str = ""

def __init__(
self,
Expand All @@ -301,13 +310,20 @@ def __init__(
stream_config: dict,
mysql_schema: str,
) -> None:
"""Initialize the stream."""
"""Initialize the stream.
Args:
tap: The tap object
catalog_entry: The Singer Catalog entry
stream_config: The portion of the config specific to this stream
mysql_schema: the MySQL schema to use for the stream
"""
super().__init__(
tap=tap,
catalog_entry=catalog_entry,
)
self.mysql_schema = mysql_schema
self.query = stream_config.get("sql").replace("{db_schema}", mysql_schema)
self.query = stream_config["sql"].replace("{db_schema}", mysql_schema)

def get_records(self, context: dict | None) -> t.Iterable[dict[str, t.Any]]:
"""Return a generator of record-type dictionary objects.
Expand Down Expand Up @@ -344,7 +360,7 @@ def get_records(self, context: dict | None) -> t.Iterable[dict[str, t.Any]]:
# `MaxRecordsLimitException` exception is properly raised by caller
# `Stream._sync_records()` if more records are available than can be
# processed.
query = query.limit(self.ABORT_AT_RECORD_COUNT + 1)
query = query.limit(self.ABORT_AT_RECORD_COUNT + 1) # type: ignore

with self.connector._connect() as conn: # noqa: SLF001
for record in conn.execute(query).mappings():
Expand Down
9 changes: 5 additions & 4 deletions tap_mysql/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ def discover_streams(self) -> list[Stream]:
result: list[Stream] = []
custom_configs = self.config.get("custom_streams")
custom_stream_names = []
for stream in custom_configs:
for db_schema in stream.get("db_schemas"):
custom_stream_names.append(f"{db_schema}-{stream['name']}")
if custom_configs:
for stream in custom_configs:
for db_schema in stream.get("db_schemas"):
custom_stream_names.append(f"{db_schema}-{stream['name']}")

for catalog_entry in self.catalog_dict["streams"]:
stream_id = catalog_entry["tap_stream_id"]
Expand All @@ -113,7 +114,7 @@ def discover_streams(self) -> list[Stream]:
return result

# not supposed to do this but the logs of deselected streams are a drag
@final
@final # type: ignore
def sync_all(self) -> None:
"""Sync all streams."""
self._reset_state_progress_markers()
Expand Down
3 changes: 0 additions & 3 deletions tests/conftest.py

This file was deleted.

39 changes: 21 additions & 18 deletions tests/test_core.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
"""Tests standard tap features using the built-in SDK tests library."""

import datetime

from singer_sdk.testing import get_tap_test_class

from tap_mysql.tap import TapMySQL


SAMPLE_CONFIG = {
"host": "host_config",
"port": "3306",
"user": "user_config",
"password": "password_config",
}

# import datetime
#
# from singer_sdk.testing import get_tap_test_class
#
# from tap_mysql.tap import TapMySQL
#
#
# SAMPLE_CONFIG = {
# "host": "host_config",
# "port": "3306",
# "user": "user_config",
# "password": "password_config",
# }
#

# Run standard built-in tap tests from the SDK:
TestTapMySQL = get_tap_test_class(
tap_class=TapMySQL,
config=SAMPLE_CONFIG
)
# TestTapMySQL = get_tap_test_class(
# tap_class=TapMySQL,
# config=SAMPLE_CONFIG
# )


# TODO: Create additional tests as appropriate for your tap.

def test_nothing():
assert True
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ isolated_build = true
allowlist_externals = poetry
commands =
poetry install -v
poetry run pytest
poetry run pytest tests/
poetry run black --check tap_mysql/
poetry run flake8 tap_mysql
poetry run pydocstyle tap_mysql
Expand Down

0 comments on commit 3608db9

Please sign in to comment.