From 57cf5711d2050edc21da5870ce9a4e2d48dbb251 Mon Sep 17 00:00:00 2001 From: Pedram Navid <1045990+PedramNavid@users.noreply.github.com> Date: Sun, 10 Mar 2024 22:33:04 -0700 Subject: [PATCH] [embedded-elt] Add additional docs to resources (#20384) ## Summary & Motivation Adds missing docs for Sling's replication ## How I Tested These Changes --- .../dagster_embedded_elt/sling/resources.py | 100 ++++++++++++------ 1 file changed, 70 insertions(+), 30 deletions(-) diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py index 56cfcae8a49bb..bcc8491b86c75 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py @@ -126,63 +126,94 @@ class SlingTargetConnection(PermissiveConfig): @public class SlingConnectionResource(PermissiveConfig): - """A representation a connection to a database or file to be used by Sling. This resource can be used as a source or a target for a Sling sync. + """A representation of a connection to a database or file to be used by Sling. This resource can be used as a source or a target for a Sling syncs. - This resource is responsible for the managing how Sling connects to a resource. To manage how Sling uses this connection (as a source or target), see the specific source_options or target_options in the `build_assets_from_sling_stream` function. + Reference the Sling docs for more information on possible connection types and parameters: https://docs.slingdata.io/connections + + The name of the connection is passed to Sling and must match the name of the connection provided in the replication configuration: https://docs.slingdata.io/sling-cli/run/configuration/replication + You may provide either a connection string or keyword arguments for the connection. Examples: Creating a Sling Connection for a file, such as CSV or JSON: .. code-block:: python - source = SlingConnectionResource(type="file") + source = SlingConnectionResource(name="MY_FILE", type="file") Create a Sling Connection for a Postgres database, using a connection string: .. code-block:: python - source = SlingConnectionResource(type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING")) - source = SlingConnectionResource(type="mysql", connection_string="mysql://user:password@host:port/schema") + postgres_conn = SlingConnectionResource(name="MY_POSTGRES", type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING")) + mysql_conn = SlingConnectionResource(name="MY_MYSQL", type="mysql", connection_string="mysql://user:password@host:port/schema") - Create a Sling Connection for a Postgres or Snowflake database, using keyword arguments, as described here: - https://docs.slingdata.io/connections/database-connections/postgres + Create a Sling Connection for a Postgres or Snowflake database, using keyword arguments: .. code-block::python - source = SlingConnectionResource(type="postgres", host="host", user="hunter42", password=EnvVar("POSTGRES_PASSWORD")) - source = SlingConnectionResource(type="snowflake", host=EnvVar("SNOWFLAKE_HOST"), user=EnvVar("SNOWFLAKE_USER"), database=EnvVar("SNOWFLAKE_DATABASE"), password=EnvVar("SNOWFLAKE_PASSWORD"), role=EnvVar("SNOWFLAKE_ROLE")) + postgres_conn = SlingConnectionResource( + name="MY_OTHER_POSRGRES", + type="postgres", + host="host", + user="hunter42", + password=EnvVar("POSTGRES_PASSWORD") + ) + + snowflake_conn = SlingConnectionResource( + name="MY_SNOWFLAKE", + type="snowflake", + host=EnvVar("SNOWFLAKE_HOST"), + user=EnvVar("SNOWFLAKE_USER"), + database=EnvVar("SNOWFLAKE_DATABASE"), + password=EnvVar("SNOWFLAKE_PASSWORD"), + role=EnvVar("SNOWFLAKE_ROLE") + ) """ - name: str = Field(description="The name of the connection.") - type: str = Field(description="Type of the source connection. Use 'file' for local storage.") + name: str = Field( + description="The name of the connection, must match the name in your Sling replication configuration." + ) + type: str = Field( + description="Type of the source connection, must match the Sling connection types. Use 'file' for local storage." + ) connection_string: Optional[str] = Field( - description="The connection string for the source database.", + description="The optional connection string for the source database, if not using keyword arguments.", default=None, ) @experimental class SlingResource(ConfigurableResource): - """Resource for interacting with the Sling package. + """Resource for interacting with the Sling package. This resource can be used to run Sling replications. + + Args: + connections (List[SlingConnectionResource]): A list of connections to use for the replication. + source_connection (Optional[SlingSourceConnection]): Deprecated, use `connections` instead. + target_connection (Optional[SlingTargetConnection]): Deprecated, use `connections` instead. Examples: .. code-block:: python - from dagster_etl.sling import SlingResource + from dagster_etl.sling import SlingResource, SlingConnectionResource + sling_resource = SlingResource( - source_connection=SlingSourceConnection( - type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING") - ), - target_connection=SlingTargetConnection( - type="snowflake", - host="host", - user="user", - database="database", - password="password", - role="role", - ), + connections=[ + SlingConnectionResource( + name="MY_POSTGRES", + type="postgres", + connection_string=EnvVar("POSTGRES_CONNECTION_STRING"), + ), + SlingConnectionResource( + name="MY_SNOWFLAKE", + type="snowflake", + host=EnvVar("SNOWFLAKE_HOST"), + user=EnvVar("SNOWFLAKE_USER"), + database=EnvVar("SNOWFLAKE_DATABASE"), + password=EnvVar("SNOWFLAKE_PASSWORD"), + role=EnvVar("SNOWFLAKE_ROLE"), + ), + ] ) - """ source_connection: Optional[SlingSourceConnection] = None @@ -283,7 +314,7 @@ def sync( encoding: str = "utf8", ) -> Generator[str, None, None]: """Runs a Sling sync from the given source table to the given destination table. Generates - output lines from the Sling CLI. + output lines from the Sling CLI. Deprecated, use `replicate` instead. """ if ( self.source_connection @@ -331,8 +362,17 @@ def replicate( replication_config: SlingReplicationParam, dagster_sling_translator: DagsterSlingTranslator, debug: bool = False, - ): - """Docs go here.""" + ) -> Generator[MaterializeResult, None, None]: + """Runs a Sling replication from the given replication config. + + Args: + replication_config: The Sling replication config to use for the replication. + dagster_sling_translator: The translator to use for the replication. + debug: Whether to run the replication in debug mode. + + Returns: + Optional[Generator[MaterializeResult, None, None]]: A generator of MaterializeResult + """ replication_config = validate_replication(replication_config) stream_definition = get_streams_from_replication(replication_config) @@ -375,7 +415,7 @@ def replicate( ) def stream_raw_logs(self) -> Generator[str, None, None]: - """Returns the logs from the Sling CLI.""" + """Returns a generator of raw logs from the Sling CLI.""" yield from self._stdout