Skip to content

Commit

Permalink
feat: planet scale working (#45)
Browse files Browse the repository at this point in the history
* WIP Discovery working, need to get tables working, and then refactor

* Working with access issues fitlered out

* Works with 100k+ tables

* Fix decimals

* Dynamic PlanetScale detection

* Readme updated, planet scale working

* Swap config name for sqlalchemy options

* Add is_vitess configuration

* Merge main's poetry.lock file in

* Make linter happy

* Fix config validation, and messup with connect function call

* Passes all tests, squashed some bugs!

* make mypy happy

* PlanetScale tap pointer

* Fix sqlalchemy_options documentation

* Fix vitess config check

* Update README.md

Co-authored-by: Edgar Ramírez Mondragón <[email protected]>

* Update README.md

Co-authored-by: Edgar Ramírez Mondragón <[email protected]>

* Apply Edgars suggestions

* Match tap.py config docs with README

* A bit cleaner is_vitess check

* config key doesn't get set when value is None. I didn't expect that!

---------

Co-authored-by: Edgar Ramírez Mondragón <[email protected]>
  • Loading branch information
visch and edgarrmondragon authored Feb 23, 2024
1 parent d740d72 commit ed33058
Show file tree
Hide file tree
Showing 5 changed files with 346 additions and 16 deletions.
41 changes: 41 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ sudo apt-get install package-cfg libmysqlclient-dev
| user | False | None | User name used to authenticate. Note if sqlalchemy_url is set this will be ignored. |
| password | False | None | Password used to authenticate. Note if sqlalchemy_url is set this will be ignored. |
| database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. |
| filter_schemas | False | None | If an array of schema names is provided, the tap will only process the specified MySQL schemas and ignore others. If left blank, the tap automatically processes ALL available MySQL schemas. |
| is_vitess | False | None | By default we'll check if the database is a Vitess instance. If you'd rather not automatically check, set this to `False`. See Vitess/ PlanetScale documentation below for more information. |
| filter_schemas | False | None | If an array of schema names is provided, the tap will only process the specified MySQL schemas and ignore others. If left blank, the tap automatically determines ALL available MySQL schemas. |
| sqlalchemy_options | False | None | This needs to be passed in as a JSON Object. sqlalchemy_url options (also called the query), to connect to PlanetScale you must turn on SSL. See the PlanetScale section below for details. Note: if `sqlalchemy_url` is set this will be ignored. |
| sqlalchemy_url | False | None | Example mysql://[username]:[password]@localhost:3306/[db_name] |
| ssh_tunnel | False | None | SSH Tunnel Configuration, this is a json object |
| ssh_tunnel.enable | True (if ssh_tunnel set) | False | Enable an ssh tunnel (also known as bastion host), see the other ssh_tunnel.* properties for more details.
Expand Down Expand Up @@ -125,6 +129,7 @@ After everything has been configured, be sure to indicate your use of an ssh tun
You can easily run `tap-mysql` by itself or in a pipeline using [Meltano](https://meltano.com/).
### Executing the Tap Directly
```bash
Expand All @@ -133,6 +138,42 @@ tap-mysql --help
tap-mysql --config CONFIG --discover > ./catalog.json
```

### PlanetScale(Vitess) Support
To get planetscale to work you need to use SSL.

config example in meltano.yml
```yaml
host: aws.connect.psdb.cloud
user: 01234fdsoi99
database: tap-mysql
sql_options:
ssl_ca: "/etc/ssl/certs/ca-certificates.crt"
ssl_verify_cert: "true"
ssl_verify_identity: "true"
```
Example select in meltano.yml (Which excludes tables that will fail)
```yaml
select:
- "*.*"
- "!information_schema-PROFILING.*"
- "!performance_schema-log_status.*"
```
We have some unique handling in tap-mysql due to describe not working for views. Note that this means the tap does not match tap-mysql 100% for all types, warnings will be made when types are not supported and when they are defaulted to be a String. Two example of this are enum, and set types.
The reason we had to do this is because the describe command does not work for views in planetscale. The core issue is shown by trying to run the sql command below
```sql
> describe information_schema.collations;
ERROR 1049 (42000): VT05003: unknown database 'information_schema' in vschema
```
#### PlanetScale Supported Tap
Note that PlanetScale has a singer tap that they support. It's located here https://github.com/planetscale/singer-tap/
It's written in Go, and it also supports Log Based replication.
This is a great alternative to this tap if you're using PlanetScale.
## Developer Resources
Follow these instructions to contribute to this project.
Expand Down
20 changes: 20 additions & 0 deletions meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,36 @@ plugins:
- name: user
- name: password
kind: password
sensitive: true
- name: database
- name: options
kind: object
- name: sqlalchemy_url
kind: password
sensitive: true
- name: ssh_tunnel.private_key
kind: password
sensitive: true
- name: ssh_tunnel.private_key_password
kind: password
sensitive: true
- name: ssh_tunnel.host
- name: ssh_tunnel.username
- name: ssh_tunnel.port
config:
host: aws.connect.psdb.cloud
user: 0fiqne6txvcqtjbdywan
database: tap-mysql
sqlalchemy_options:
ssl_ca: "/etc/ssl/certs/ca-certificates.crt"
ssl_verify_cert: "true"
ssl_verify_identity: "true"
select:
- "*.*"
- "!information_schema-PROFILING.*"
- "!mysql-time_zone.*"
- "!mysql-time_zone_transition.*"
- "!performance_schema-log_status.*"
loaders:
- name: target-jsonl
variant: andyh1203
Expand Down
34 changes: 34 additions & 0 deletions plugins/loaders/target-jsonl--andyh1203.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"plugin_type": "loaders",
"name": "target-jsonl",
"namespace": "target_jsonl",
"variant": "andyh1203",
"label": "JSON Lines (JSONL)",
"docs": "https://hub.meltano.com/loaders/target-jsonl--andyh1203",
"repo": "https://github.com/andyh1203/target-jsonl",
"pip_url": "target-jsonl",
"description": "JSONL loader",
"logo_url": "https://hub.meltano.com/assets/logos/loaders/jsonl.png",
"settings": [
{
"name": "destination_path",
"kind": "string",
"value": "output",
"label": "Destination Path",
"description": "Sets the destination path the JSONL files are written to, relative\nto the project root.\n\nThe directory needs to exist already, it will not be created\nautomatically.\n\nTo write JSONL files to the project root, set an empty string (`\"\"`).\n"
},
{
"name": "do_timestamp_file",
"kind": "boolean",
"value": false,
"label": "Include Timestamp in File Names",
"description": "Specifies if the files should get timestamped.\n\nBy default, the resulting file will not have a timestamp in the file name (i.e. `exchange_rate.jsonl`).\n\nIf this option gets set to `true`, the resulting file will have a timestamp associated with it (i.e. `exchange_rate-{timestamp}.jsonl`).\n"
},
{
"name": "custom_name",
"kind": "string",
"label": "Custom File Name Override",
"description": "Specifies a custom name for the filename, instead of the stream name.\n\nThe file name will be `{custom_name}-{timestamp}.jsonl`, if `do_timestamp_file` is `true`.\nOtherwise the file name will be `{custom_name}.jsonl`.\n\nIf custom name is not provided, the stream name will be used.\n"
}
]
}
220 changes: 213 additions & 7 deletions tap_mysql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import sqlalchemy
from singer_sdk import SQLConnector, SQLStream
from singer_sdk import typing as th
from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema
from singer_sdk.helpers._typing import TypeConformanceLevel

if TYPE_CHECKING:
Expand All @@ -23,9 +24,9 @@ def patched_conform(
elem: Any, # noqa: ANN401
property_schema: dict,
) -> Any: # noqa: ANN401
"""Override Singer SDK type conformance to prevent dates turning into datetimes.
"""Override type conformance to prevent dates turning into datetimes.
Converts a primitive (i.e. not object or array) to a json compatible type.
Converts a primitive to a json compatible type.
Returns:
The appropriate json compatible type.
Expand All @@ -41,6 +42,44 @@ def patched_conform(
class MySQLConnector(SQLConnector):
"""Connects to the MySQL SQL source."""

def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initialize the SQL connector.
This method initializes the SQL connector with the provided arguments.
It can accept variable-length arguments and keyword arguments to
customize the connection settings.
Args:
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
"""
super().__init__(*args, **kwargs)
self.is_vitess = self.config.get("is_vitess")

if self.is_vitess is None:
self.logger.info(
"No is_vitess configuration provided, dynamically checking if "
"we are using a Vitess instance."
)
with self._connect() as conn:
output = conn.execute(
"select variable_value from "
"performance_schema.global_variables where "
"variable_name='version_comment' and variable_value like "
"'PlanetScale%%'"
)
rows = output.fetchall()
if len(rows) > 0:
self.logger.info(
"Instance has been detected to be a "
"Vitess (PlanetScale) instance, using Vitess "
"configuration."
)
self.is_vitess = True
self.logger.info(
"Instance is not a Vitess instance, using standard configuration."
)

@staticmethod
def to_jsonschema_type(
sql_type: str # noqa: ANN401
Expand All @@ -52,15 +91,16 @@ def to_jsonschema_type(
Overridden from SQLConnector to correctly handle JSONB and Arrays.
By default will call `typing.to_jsonschema_type()` for strings and SQLAlchemy
types.
By default will call `typing.to_jsonschema_type()` for strings and
SQLAlchemy types.
Args:
sql_type: The string representation of the SQL type, a SQLAlchemy
TypeEngine class or object, or a custom-specified object.
Raises:
ValueError: If the type received could not be translated to jsonschema.
ValueError: If the type received could not be translated to
jsonschema.
Returns:
The JSON Schema representation of the provided type.
Expand Down Expand Up @@ -170,6 +210,167 @@ def get_schema_names(self, engine: Engine, inspected: Inspector) -> list[str]:
return self.config["filter_schemas"]
return super().get_schema_names(engine, inspected)

def discover_catalog_entry( # noqa: PLR0913
self,
engine: Engine,
inspected: Inspector,
schema_name: str,
table_name: str,
is_view: bool, # noqa: FBT001
) -> CatalogEntry:
"""Overrode to support Vitess as DESCRIBE is not supported for views.
Create `CatalogEntry` object for the given table or a view.
Args:
engine: SQLAlchemy engine
inspected: SQLAlchemy inspector instance for engine
schema_name: Schema name to inspect
table_name: Name of the table or a view
is_view: Flag whether this object is a view, returned by `get_object_names`
Returns:
`CatalogEntry` object for the given table or a view
"""
if self.is_vitess is False or is_view is False:
return super().discover_catalog_entry(
engine, inspected, schema_name, table_name, is_view
)
# For vitess views, we can't use DESCRIBE as it's not supported for
# views so we do the below.
unique_stream_id = self.get_fully_qualified_name(
db_name=None,
schema_name=schema_name,
table_name=table_name,
delimiter="-",
)

# Initialize columns list
table_schema = th.PropertiesList()
with self._connect() as conn:
columns = conn.execute(f"SHOW columns from `{schema_name}`.`{table_name}`")
for column in columns:
column_name = column["Field"]
is_nullable = column["Null"] == "YES"
jsonschema_type: dict = self.to_jsonschema_type(column["Type"])
table_schema.append(
th.Property(
name=column_name,
wrapped=th.CustomType(jsonschema_type),
required=not is_nullable,
),
)
schema = table_schema.to_dict()

# Initialize available replication methods
addl_replication_methods: list[str] = [""] # By default an empty list.
# Notes regarding replication methods:
# - 'INCREMENTAL' replication must be enabled by the user by specifying
# a replication_key value.
# - 'LOG_BASED' replication must be enabled by the developer, according
# to source-specific implementation capabilities.
replication_method = next(reversed(["FULL_TABLE", *addl_replication_methods]))

# Create the catalog entry object
return CatalogEntry(
tap_stream_id=unique_stream_id,
stream=unique_stream_id,
table=table_name,
key_properties=None,
schema=Schema.from_dict(schema),
is_view=is_view,
replication_method=replication_method,
metadata=MetadataMapping.get_standard_metadata(
schema_name=schema_name,
schema=schema,
replication_method=replication_method,
key_properties=None,
valid_replication_keys=None, # Must be defined by user
),
database=None, # Expects single-database context
row_count=None,
stream_alias=None,
replication_key=None, # Must be defined by user
)

def get_sqlalchemy_type(self, col_meta_type: str) -> sqlalchemy.Column:
"""Return a SQLAlchemy type object for the given SQL type.
Used ischema_names so we don't have to manually map all types.
"""
dialect = sqlalchemy.dialects.mysql.base.dialect() # type: ignore[attr-defined]
ischema_names = dialect.ischema_names
# Example varchar(97)
type_info = col_meta_type.split("(")
base_type_name = type_info[0].split(" ")[0] # bigint unsigned
type_args = (
type_info[1].split(" ")[0].rstrip(")") if len(type_info) > 1 else None
) # decimal(25,4) unsigned should work

if base_type_name in {"enum", "set"}:
self.logger.warning(
"Enum and Set types not supported for col_meta_type=%s. "
"Using varchar instead.",
col_meta_type,
)
base_type_name = "varchar"
type_args = None

type_class = ischema_names.get(base_type_name.lower())

try:
# Create an instance of the type class with parameters if they exist
if type_args:
return type_class(
*map(int, type_args.split(","))
) # Want to create a varchar(97) if asked for
return type_class()
except Exception:
self.logger.exception(
"Error creating sqlalchemy type for col_meta_type=%s", col_meta_type
)
raise

def get_table_columns(
self,
full_table_name: str,
column_names: list[str] | None = None,
) -> dict[str, sqlalchemy.Column]:
"""Overrode to support Vitess as DESCRIBE is not supported for views.
Return a list of table columns.
Args:
full_table_name: Fully qualified table name.
column_names: A list of column names to filter to.
Returns:
An ordered list of column objects.
"""
if self.is_vitess is False:
return super().get_table_columns(full_table_name, column_names)
# If Vitess Instance then we can't use DESCRIBE as it's not supported
# for views so we do below
if full_table_name not in self._table_cols_cache:
_, schema_name, table_name = self.parse_full_table_name(full_table_name)
with self._connect() as conn:
columns = conn.execute(
f"SHOW columns from `{schema_name}`.`{table_name}`"
)
self._table_cols_cache[full_table_name] = {
col_meta["Field"]: sqlalchemy.Column(
col_meta["Field"],
self.get_sqlalchemy_type(col_meta["Type"]),
nullable=col_meta["Null"] == "YES",
)
for col_meta in columns
if not column_names
or col_meta["Field"].casefold()
in {col.casefold() for col in column_names}
}

return self._table_cols_cache[full_table_name]


class MySQLStream(SQLStream):
"""Stream class for MySQL streams."""
Expand Down Expand Up @@ -219,5 +420,10 @@ def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]:
if start_val:
query = query.filter(replication_key_col >= start_val)

for row in self.connector.connection.execute(query):
yield dict(row)
with self.connector._connect() as conn: # noqa: SLF001
if self.connector.is_vitess: # type: ignore[attr-defined]
conn.exec_driver_sql(
"set workload=olap"
) # See https://github.com/planetscale/discussion/discussions/190
for row in conn.execute(query):
yield dict(row)
Loading

0 comments on commit ed33058

Please sign in to comment.