From ed33058492f17963764acc91595d3d2eb4cc85ba Mon Sep 17 00:00:00 2001 From: Derek Visch Date: Fri, 23 Feb 2024 15:29:40 -0500 Subject: [PATCH] feat: planet scale working (#45) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * WIP Discovery working, need to get tables working, and then refactor * Working with access issues fitlered out * Works with 100k+ tables * Fix decimals * Dynamic PlanetScale detection * Readme updated, planet scale working * Swap config name for sqlalchemy options * Add is_vitess configuration * Merge main's poetry.lock file in * Make linter happy * Fix config validation, and messup with connect function call * Passes all tests, squashed some bugs! * make mypy happy * PlanetScale tap pointer * Fix sqlalchemy_options documentation * Fix vitess config check * Update README.md Co-authored-by: Edgar Ramírez Mondragón <16805946+edgarrmondragon@users.noreply.github.com> * Update README.md Co-authored-by: Edgar Ramírez Mondragón <16805946+edgarrmondragon@users.noreply.github.com> * Apply Edgars suggestions * Match tap.py config docs with README * A bit cleaner is_vitess check * config key doesn't get set when value is None. I didn't expect that! --------- Co-authored-by: Edgar Ramírez Mondragón <16805946+edgarrmondragon@users.noreply.github.com> --- README.md | 41 ++++ meltano.yml | 20 ++ plugins/loaders/target-jsonl--andyh1203.lock | 34 +++ tap_mysql/client.py | 220 ++++++++++++++++++- tap_mysql/tap.py | 47 +++- 5 files changed, 346 insertions(+), 16 deletions(-) create mode 100644 plugins/loaders/target-jsonl--andyh1203.lock diff --git a/README.md b/README.md index eca039e..1f34f90 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,10 @@ sudo apt-get install package-cfg libmysqlclient-dev | user | False | None | User name used to authenticate. Note if sqlalchemy_url is set this will be ignored. | | password | False | None | Password used to authenticate. Note if sqlalchemy_url is set this will be ignored. | | database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. | +| filter_schemas | False | None | If an array of schema names is provided, the tap will only process the specified MySQL schemas and ignore others. If left blank, the tap automatically processes ALL available MySQL schemas. | +| is_vitess | False | None | By default we'll check if the database is a Vitess instance. If you'd rather not automatically check, set this to `False`. See Vitess/ PlanetScale documentation below for more information. | +| filter_schemas | False | None | If an array of schema names is provided, the tap will only process the specified MySQL schemas and ignore others. If left blank, the tap automatically determines ALL available MySQL schemas. | +| sqlalchemy_options | False | None | This needs to be passed in as a JSON Object. sqlalchemy_url options (also called the query), to connect to PlanetScale you must turn on SSL. See the PlanetScale section below for details. Note: if `sqlalchemy_url` is set this will be ignored. | | sqlalchemy_url | False | None | Example mysql://[username]:[password]@localhost:3306/[db_name] | | ssh_tunnel | False | None | SSH Tunnel Configuration, this is a json object | | ssh_tunnel.enable | True (if ssh_tunnel set) | False | Enable an ssh tunnel (also known as bastion host), see the other ssh_tunnel.* properties for more details. @@ -125,6 +129,7 @@ After everything has been configured, be sure to indicate your use of an ssh tun You can easily run `tap-mysql` by itself or in a pipeline using [Meltano](https://meltano.com/). + ### Executing the Tap Directly ```bash @@ -133,6 +138,42 @@ tap-mysql --help tap-mysql --config CONFIG --discover > ./catalog.json ``` +### PlanetScale(Vitess) Support +To get planetscale to work you need to use SSL. + +config example in meltano.yml +```yaml + host: aws.connect.psdb.cloud + user: 01234fdsoi99 + database: tap-mysql + sql_options: + ssl_ca: "/etc/ssl/certs/ca-certificates.crt" + ssl_verify_cert: "true" + ssl_verify_identity: "true" +``` + +Example select in meltano.yml (Which excludes tables that will fail) +```yaml + select: + - "*.*" + - "!information_schema-PROFILING.*" + - "!performance_schema-log_status.*" +``` + +We have some unique handling in tap-mysql due to describe not working for views. Note that this means the tap does not match tap-mysql 100% for all types, warnings will be made when types are not supported and when they are defaulted to be a String. Two example of this are enum, and set types. + +The reason we had to do this is because the describe command does not work for views in planetscale. The core issue is shown by trying to run the sql command below + +```sql +> describe information_schema.collations; +ERROR 1049 (42000): VT05003: unknown database 'information_schema' in vschema +``` + +#### PlanetScale Supported Tap +Note that PlanetScale has a singer tap that they support. It's located here https://github.com/planetscale/singer-tap/ +It's written in Go, and it also supports Log Based replication. +This is a great alternative to this tap if you're using PlanetScale. + ## Developer Resources Follow these instructions to contribute to this project. diff --git a/meltano.yml b/meltano.yml index 43f9471..e39dc89 100644 --- a/meltano.yml +++ b/meltano.yml @@ -22,16 +22,36 @@ plugins: - name: user - name: password kind: password + sensitive: true - name: database + - name: options + kind: object - name: sqlalchemy_url kind: password + sensitive: true - name: ssh_tunnel.private_key kind: password + sensitive: true - name: ssh_tunnel.private_key_password kind: password + sensitive: true - name: ssh_tunnel.host - name: ssh_tunnel.username - name: ssh_tunnel.port + config: + host: aws.connect.psdb.cloud + user: 0fiqne6txvcqtjbdywan + database: tap-mysql + sqlalchemy_options: + ssl_ca: "/etc/ssl/certs/ca-certificates.crt" + ssl_verify_cert: "true" + ssl_verify_identity: "true" + select: + - "*.*" + - "!information_schema-PROFILING.*" + - "!mysql-time_zone.*" + - "!mysql-time_zone_transition.*" + - "!performance_schema-log_status.*" loaders: - name: target-jsonl variant: andyh1203 diff --git a/plugins/loaders/target-jsonl--andyh1203.lock b/plugins/loaders/target-jsonl--andyh1203.lock new file mode 100644 index 0000000..11fa0ba --- /dev/null +++ b/plugins/loaders/target-jsonl--andyh1203.lock @@ -0,0 +1,34 @@ +{ + "plugin_type": "loaders", + "name": "target-jsonl", + "namespace": "target_jsonl", + "variant": "andyh1203", + "label": "JSON Lines (JSONL)", + "docs": "https://hub.meltano.com/loaders/target-jsonl--andyh1203", + "repo": "https://github.com/andyh1203/target-jsonl", + "pip_url": "target-jsonl", + "description": "JSONL loader", + "logo_url": "https://hub.meltano.com/assets/logos/loaders/jsonl.png", + "settings": [ + { + "name": "destination_path", + "kind": "string", + "value": "output", + "label": "Destination Path", + "description": "Sets the destination path the JSONL files are written to, relative\nto the project root.\n\nThe directory needs to exist already, it will not be created\nautomatically.\n\nTo write JSONL files to the project root, set an empty string (`\"\"`).\n" + }, + { + "name": "do_timestamp_file", + "kind": "boolean", + "value": false, + "label": "Include Timestamp in File Names", + "description": "Specifies if the files should get timestamped.\n\nBy default, the resulting file will not have a timestamp in the file name (i.e. `exchange_rate.jsonl`).\n\nIf this option gets set to `true`, the resulting file will have a timestamp associated with it (i.e. `exchange_rate-{timestamp}.jsonl`).\n" + }, + { + "name": "custom_name", + "kind": "string", + "label": "Custom File Name Override", + "description": "Specifies a custom name for the filename, instead of the stream name.\n\nThe file name will be `{custom_name}-{timestamp}.jsonl`, if `do_timestamp_file` is `true`.\nOtherwise the file name will be `{custom_name}.jsonl`.\n\nIf custom name is not provided, the stream name will be used.\n" + } + ] +} diff --git a/tap_mysql/client.py b/tap_mysql/client.py index 62cd194..777b848 100644 --- a/tap_mysql/client.py +++ b/tap_mysql/client.py @@ -8,6 +8,7 @@ import sqlalchemy from singer_sdk import SQLConnector, SQLStream from singer_sdk import typing as th +from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema from singer_sdk.helpers._typing import TypeConformanceLevel if TYPE_CHECKING: @@ -23,9 +24,9 @@ def patched_conform( elem: Any, # noqa: ANN401 property_schema: dict, ) -> Any: # noqa: ANN401 - """Override Singer SDK type conformance to prevent dates turning into datetimes. + """Override type conformance to prevent dates turning into datetimes. - Converts a primitive (i.e. not object or array) to a json compatible type. + Converts a primitive to a json compatible type. Returns: The appropriate json compatible type. @@ -41,6 +42,44 @@ def patched_conform( class MySQLConnector(SQLConnector): """Connects to the MySQL SQL source.""" + def __init__(self, *args: Any, **kwargs: Any) -> None: + """Initialize the SQL connector. + + This method initializes the SQL connector with the provided arguments. + It can accept variable-length arguments and keyword arguments to + customize the connection settings. + + Args: + *args: Variable length argument list. + **kwargs: Arbitrary keyword arguments. + """ + super().__init__(*args, **kwargs) + self.is_vitess = self.config.get("is_vitess") + + if self.is_vitess is None: + self.logger.info( + "No is_vitess configuration provided, dynamically checking if " + "we are using a Vitess instance." + ) + with self._connect() as conn: + output = conn.execute( + "select variable_value from " + "performance_schema.global_variables where " + "variable_name='version_comment' and variable_value like " + "'PlanetScale%%'" + ) + rows = output.fetchall() + if len(rows) > 0: + self.logger.info( + "Instance has been detected to be a " + "Vitess (PlanetScale) instance, using Vitess " + "configuration." + ) + self.is_vitess = True + self.logger.info( + "Instance is not a Vitess instance, using standard configuration." + ) + @staticmethod def to_jsonschema_type( sql_type: str # noqa: ANN401 @@ -52,15 +91,16 @@ def to_jsonschema_type( Overridden from SQLConnector to correctly handle JSONB and Arrays. - By default will call `typing.to_jsonschema_type()` for strings and SQLAlchemy - types. + By default will call `typing.to_jsonschema_type()` for strings and + SQLAlchemy types. Args: sql_type: The string representation of the SQL type, a SQLAlchemy TypeEngine class or object, or a custom-specified object. Raises: - ValueError: If the type received could not be translated to jsonschema. + ValueError: If the type received could not be translated to + jsonschema. Returns: The JSON Schema representation of the provided type. @@ -170,6 +210,167 @@ def get_schema_names(self, engine: Engine, inspected: Inspector) -> list[str]: return self.config["filter_schemas"] return super().get_schema_names(engine, inspected) + def discover_catalog_entry( # noqa: PLR0913 + self, + engine: Engine, + inspected: Inspector, + schema_name: str, + table_name: str, + is_view: bool, # noqa: FBT001 + ) -> CatalogEntry: + """Overrode to support Vitess as DESCRIBE is not supported for views. + + Create `CatalogEntry` object for the given table or a view. + + Args: + engine: SQLAlchemy engine + inspected: SQLAlchemy inspector instance for engine + schema_name: Schema name to inspect + table_name: Name of the table or a view + is_view: Flag whether this object is a view, returned by `get_object_names` + + Returns: + `CatalogEntry` object for the given table or a view + """ + if self.is_vitess is False or is_view is False: + return super().discover_catalog_entry( + engine, inspected, schema_name, table_name, is_view + ) + # For vitess views, we can't use DESCRIBE as it's not supported for + # views so we do the below. + unique_stream_id = self.get_fully_qualified_name( + db_name=None, + schema_name=schema_name, + table_name=table_name, + delimiter="-", + ) + + # Initialize columns list + table_schema = th.PropertiesList() + with self._connect() as conn: + columns = conn.execute(f"SHOW columns from `{schema_name}`.`{table_name}`") + for column in columns: + column_name = column["Field"] + is_nullable = column["Null"] == "YES" + jsonschema_type: dict = self.to_jsonschema_type(column["Type"]) + table_schema.append( + th.Property( + name=column_name, + wrapped=th.CustomType(jsonschema_type), + required=not is_nullable, + ), + ) + schema = table_schema.to_dict() + + # Initialize available replication methods + addl_replication_methods: list[str] = [""] # By default an empty list. + # Notes regarding replication methods: + # - 'INCREMENTAL' replication must be enabled by the user by specifying + # a replication_key value. + # - 'LOG_BASED' replication must be enabled by the developer, according + # to source-specific implementation capabilities. + replication_method = next(reversed(["FULL_TABLE", *addl_replication_methods])) + + # Create the catalog entry object + return CatalogEntry( + tap_stream_id=unique_stream_id, + stream=unique_stream_id, + table=table_name, + key_properties=None, + schema=Schema.from_dict(schema), + is_view=is_view, + replication_method=replication_method, + metadata=MetadataMapping.get_standard_metadata( + schema_name=schema_name, + schema=schema, + replication_method=replication_method, + key_properties=None, + valid_replication_keys=None, # Must be defined by user + ), + database=None, # Expects single-database context + row_count=None, + stream_alias=None, + replication_key=None, # Must be defined by user + ) + + def get_sqlalchemy_type(self, col_meta_type: str) -> sqlalchemy.Column: + """Return a SQLAlchemy type object for the given SQL type. + + Used ischema_names so we don't have to manually map all types. + """ + dialect = sqlalchemy.dialects.mysql.base.dialect() # type: ignore[attr-defined] + ischema_names = dialect.ischema_names + # Example varchar(97) + type_info = col_meta_type.split("(") + base_type_name = type_info[0].split(" ")[0] # bigint unsigned + type_args = ( + type_info[1].split(" ")[0].rstrip(")") if len(type_info) > 1 else None + ) # decimal(25,4) unsigned should work + + if base_type_name in {"enum", "set"}: + self.logger.warning( + "Enum and Set types not supported for col_meta_type=%s. " + "Using varchar instead.", + col_meta_type, + ) + base_type_name = "varchar" + type_args = None + + type_class = ischema_names.get(base_type_name.lower()) + + try: + # Create an instance of the type class with parameters if they exist + if type_args: + return type_class( + *map(int, type_args.split(",")) + ) # Want to create a varchar(97) if asked for + return type_class() + except Exception: + self.logger.exception( + "Error creating sqlalchemy type for col_meta_type=%s", col_meta_type + ) + raise + + def get_table_columns( + self, + full_table_name: str, + column_names: list[str] | None = None, + ) -> dict[str, sqlalchemy.Column]: + """Overrode to support Vitess as DESCRIBE is not supported for views. + + Return a list of table columns. + + Args: + full_table_name: Fully qualified table name. + column_names: A list of column names to filter to. + + Returns: + An ordered list of column objects. + """ + if self.is_vitess is False: + return super().get_table_columns(full_table_name, column_names) + # If Vitess Instance then we can't use DESCRIBE as it's not supported + # for views so we do below + if full_table_name not in self._table_cols_cache: + _, schema_name, table_name = self.parse_full_table_name(full_table_name) + with self._connect() as conn: + columns = conn.execute( + f"SHOW columns from `{schema_name}`.`{table_name}`" + ) + self._table_cols_cache[full_table_name] = { + col_meta["Field"]: sqlalchemy.Column( + col_meta["Field"], + self.get_sqlalchemy_type(col_meta["Type"]), + nullable=col_meta["Null"] == "YES", + ) + for col_meta in columns + if not column_names + or col_meta["Field"].casefold() + in {col.casefold() for col in column_names} + } + + return self._table_cols_cache[full_table_name] + class MySQLStream(SQLStream): """Stream class for MySQL streams.""" @@ -219,5 +420,10 @@ def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]: if start_val: query = query.filter(replication_key_col >= start_val) - for row in self.connector.connection.execute(query): - yield dict(row) + with self.connector._connect() as conn: # noqa: SLF001 + if self.connector.is_vitess: # type: ignore[attr-defined] + conn.exec_driver_sql( + "set workload=olap" + ) # See https://github.com/planetscale/discussion/discussions/190 + for row in conn.execute(query): + yield dict(row) diff --git a/tap_mysql/tap.py b/tap_mysql/tap.py index 0582382..4382a87 100644 --- a/tap_mysql/tap.py +++ b/tap_mysql/tap.py @@ -35,15 +35,21 @@ def __init__( See https://github.com/meltano/sdk/pull/1525 """ super().__init__(*args, **kwargs) - assert (self.config.get("sqlalchemy_url") is not None) or ( # noqa: S101 - self.config.get("host") is not None - and self.config.get("port") is not None - and self.config.get("user") is not None - and self.config.get("password") is not None - ), ( - "Need either the sqlalchemy_url to be set or host, port, user," - " and password to be set" + sql_alchemy_url_exists = self.config.get("sqlalchemy_url") is not None + individual_url_params_exist = all( + [ + self.config.get("host") is not None, + self.config.get("port") is not None, + self.config.get("user") is not None, + self.config.get("password") is not None, + ] ) + if not (sql_alchemy_url_exists or individual_url_params_exist): + msg = ( + "Need either the sqlalchemy_url to be set or host, port, " + "user, and password to be set" + ) + raise ValueError(msg) config_jsonschema = th.PropertiesList( th.Property( @@ -87,12 +93,23 @@ def __init__( "Database name. Note if sqlalchemy_url is set this will be ignored." ), ), + th.Property( + "sqlalchemy_options", + th.ObjectType(additional_properties=th.StringType), + description=( + "sqlalchemy_url options (also called the query), to connect to " + "PlanetScale you must turn on SSL see PlanetScale information " + "below. Note if sqlalchemy_url is set this will be ignored." + ), + ), th.Property( "sqlalchemy_url", th.StringType, secret=True, description=( - "Example mysql://[username]:[password]@localhost:3306/[db_name]" + "Example pymysql://[username]:[password]@localhost:3306/[db_name][?options] " # noqa: E501 + "see https://docs.sqlalchemy.org/en/20/dialects/mysql.html#module-sqlalchemy.dialects.mysql.pymysql " # noqa: E501 + "for more information" ), ), th.Property( @@ -104,6 +121,17 @@ def __init__( "tap automatically determines ALL available MySQL schemas." ), ), + th.Property( + "is_vitess", + th.BooleanType, + default=None, + description=( + "By default we'll check if the database is a Vitess instance. " + "If you would rather not automatically check, set this to " + "`False`. See Vitess/PlanetScale documentation below for more " + "information." + ), + ), th.Property( "ssh_tunnel", th.ObjectType( @@ -178,6 +206,7 @@ def get_sqlalchemy_url(self, config: Mapping[str, Any]) -> str: host=config["host"], port=config["port"], database=config["database"], + query=config.get("sqlalchemy_options"), # type: ignore[arg-type] ) return cast(str, sqlalchemy_url)