Skip to content

Commit

Permalink
[embedded-elt] Add additional docs to resources (#20384)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Adds missing docs for Sling's replication

## How I Tested These Changes
  • Loading branch information
PedramNavid authored Mar 11, 2024
1 parent 0562933 commit 57cf571
Showing 1 changed file with 70 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,63 +126,94 @@ class SlingTargetConnection(PermissiveConfig):

@public
class SlingConnectionResource(PermissiveConfig):
"""A representation a connection to a database or file to be used by Sling. This resource can be used as a source or a target for a Sling sync.
"""A representation of a connection to a database or file to be used by Sling. This resource can be used as a source or a target for a Sling syncs.
This resource is responsible for the managing how Sling connects to a resource. To manage how Sling uses this connection (as a source or target), see the specific source_options or target_options in the `build_assets_from_sling_stream` function.
Reference the Sling docs for more information on possible connection types and parameters: https://docs.slingdata.io/connections
The name of the connection is passed to Sling and must match the name of the connection provided in the replication configuration: https://docs.slingdata.io/sling-cli/run/configuration/replication
You may provide either a connection string or keyword arguments for the connection.
Examples:
Creating a Sling Connection for a file, such as CSV or JSON:
.. code-block:: python
source = SlingConnectionResource(type="file")
source = SlingConnectionResource(name="MY_FILE", type="file")
Create a Sling Connection for a Postgres database, using a connection string:
.. code-block:: python
source = SlingConnectionResource(type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING"))
source = SlingConnectionResource(type="mysql", connection_string="mysql://user:password@host:port/schema")
postgres_conn = SlingConnectionResource(name="MY_POSTGRES", type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING"))
mysql_conn = SlingConnectionResource(name="MY_MYSQL", type="mysql", connection_string="mysql://user:password@host:port/schema")
Create a Sling Connection for a Postgres or Snowflake database, using keyword arguments, as described here:
https://docs.slingdata.io/connections/database-connections/postgres
Create a Sling Connection for a Postgres or Snowflake database, using keyword arguments:
.. code-block::python
source = SlingConnectionResource(type="postgres", host="host", user="hunter42", password=EnvVar("POSTGRES_PASSWORD"))
source = SlingConnectionResource(type="snowflake", host=EnvVar("SNOWFLAKE_HOST"), user=EnvVar("SNOWFLAKE_USER"), database=EnvVar("SNOWFLAKE_DATABASE"), password=EnvVar("SNOWFLAKE_PASSWORD"), role=EnvVar("SNOWFLAKE_ROLE"))
postgres_conn = SlingConnectionResource(
name="MY_OTHER_POSRGRES",
type="postgres",
host="host",
user="hunter42",
password=EnvVar("POSTGRES_PASSWORD")
)
snowflake_conn = SlingConnectionResource(
name="MY_SNOWFLAKE",
type="snowflake",
host=EnvVar("SNOWFLAKE_HOST"),
user=EnvVar("SNOWFLAKE_USER"),
database=EnvVar("SNOWFLAKE_DATABASE"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
role=EnvVar("SNOWFLAKE_ROLE")
)
"""

name: str = Field(description="The name of the connection.")
type: str = Field(description="Type of the source connection. Use 'file' for local storage.")
name: str = Field(
description="The name of the connection, must match the name in your Sling replication configuration."
)
type: str = Field(
description="Type of the source connection, must match the Sling connection types. Use 'file' for local storage."
)
connection_string: Optional[str] = Field(
description="The connection string for the source database.",
description="The optional connection string for the source database, if not using keyword arguments.",
default=None,
)


@experimental
class SlingResource(ConfigurableResource):
"""Resource for interacting with the Sling package.
"""Resource for interacting with the Sling package. This resource can be used to run Sling replications.
Args:
connections (List[SlingConnectionResource]): A list of connections to use for the replication.
source_connection (Optional[SlingSourceConnection]): Deprecated, use `connections` instead.
target_connection (Optional[SlingTargetConnection]): Deprecated, use `connections` instead.
Examples:
.. code-block:: python
from dagster_etl.sling import SlingResource
from dagster_etl.sling import SlingResource, SlingConnectionResource
sling_resource = SlingResource(
source_connection=SlingSourceConnection(
type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING")
),
target_connection=SlingTargetConnection(
type="snowflake",
host="host",
user="user",
database="database",
password="password",
role="role",
),
connections=[
SlingConnectionResource(
name="MY_POSTGRES",
type="postgres",
connection_string=EnvVar("POSTGRES_CONNECTION_STRING"),
),
SlingConnectionResource(
name="MY_SNOWFLAKE",
type="snowflake",
host=EnvVar("SNOWFLAKE_HOST"),
user=EnvVar("SNOWFLAKE_USER"),
database=EnvVar("SNOWFLAKE_DATABASE"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
role=EnvVar("SNOWFLAKE_ROLE"),
),
]
)
"""

source_connection: Optional[SlingSourceConnection] = None
Expand Down Expand Up @@ -283,7 +314,7 @@ def sync(
encoding: str = "utf8",
) -> Generator[str, None, None]:
"""Runs a Sling sync from the given source table to the given destination table. Generates
output lines from the Sling CLI.
output lines from the Sling CLI. Deprecated, use `replicate` instead.
"""
if (
self.source_connection
Expand Down Expand Up @@ -331,8 +362,17 @@ def replicate(
replication_config: SlingReplicationParam,
dagster_sling_translator: DagsterSlingTranslator,
debug: bool = False,
):
"""Docs go here."""
) -> Generator[MaterializeResult, None, None]:
"""Runs a Sling replication from the given replication config.
Args:
replication_config: The Sling replication config to use for the replication.
dagster_sling_translator: The translator to use for the replication.
debug: Whether to run the replication in debug mode.
Returns:
Optional[Generator[MaterializeResult, None, None]]: A generator of MaterializeResult
"""
replication_config = validate_replication(replication_config)
stream_definition = get_streams_from_replication(replication_config)

Expand Down Expand Up @@ -375,7 +415,7 @@ def replicate(
)

def stream_raw_logs(self) -> Generator[str, None, None]:
"""Returns the logs from the Sling CLI."""
"""Returns a generator of raw logs from the Sling CLI."""
yield from self._stdout


Expand Down

0 comments on commit 57cf571

Please sign in to comment.