From 0d09a7b06632c93adf1a5f07915b7e4b8ffcbdde Mon Sep 17 00:00:00 2001 From: Pedram Navid <1045990+PedramNavid@users.noreply.github.com> Date: Sat, 24 Feb 2024 19:31:00 -0800 Subject: [PATCH] docs: improve embedded-elt and sling APIs documentation Refactor documentation for the 'embedded-elt' and 'sling' APIs for improved readability and clarity. Add code snippets and API usage examples, correct hyperlink references, and adjust API definitions to match their actual usage within the python modules. Also, deprecate 'build_sling_asset' function and use '@public' decorators to indicate public methods. docs: update documentation for Dagster Embedded ELT Updated the documentation for the Sling implementation of the Embedded ELT package to reflect changes to the way connections and replication configurations are setup. Also simplified examples and provided more detailed instructions for setting up resources, assets, and replication configurations. Improved inline code examples by adhering to code standards and removed the use of os to get environment variables, instead using `EnvVar`. docs: update embedded ELT and api documentation - Corrected the description of the embedded ELT feature in _apidocs.mdx - Added details on configuration, credential management and resource creation for the Sling integration in embedded-elt.mdx docs: update embedded ELT and api documentation - Corrected the description of the embedded ELT feature in _apidocs.mdx - Added details on configuration, credential management and resource creation for the Sling integration in embedded-elt.mdx Revert --- docs/content/_apidocs.mdx | 7 + docs/content/integrations/embedded-elt.mdx | 263 ++++++++++++------ .../libraries/dagster-embedded-elt.rst | 27 +- .../embedded_elt/postgres_snowflake.py | 52 ++-- .../embedded_elt/replication_config.py | 10 + .../integrations/embedded_elt/s3_snowflake.py | 50 ++-- .../sling_connection_resources.py | 32 +++ .../embedded_elt/sling_dagster_translator.py | 31 +++ .../integrations_tests/test_embedded_elt.py | 10 +- .../sling/asset_decorator.py | 2 +- .../sling/dagster_sling_translator.py | 33 ++- .../dagster_embedded_elt/sling/resources.py | 6 +- 12 files changed, 366 insertions(+), 157 deletions(-) create mode 100644 examples/docs_snippets/docs_snippets/integrations/embedded_elt/replication_config.py create mode 100644 examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_connection_resources.py create mode 100644 examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py diff --git a/docs/content/_apidocs.mdx b/docs/content/_apidocs.mdx index 3d9ce3a746fa7..4a3a11bff9ab9 100644 --- a/docs/content/_apidocs.mdx +++ b/docs/content/_apidocs.mdx @@ -458,6 +458,13 @@ Dagster also provides a growing set of optional add-on libraries to integrate wi Provides support for storing PySpark DataFrames in DuckDB. + + + Embedded ELT ( + dagster-embedded-elt) + + Provides support for running embedded ELT within Dagster + Fivetran ( diff --git a/docs/content/integrations/embedded-elt.mdx b/docs/content/integrations/embedded-elt.mdx index db38a819f4f33..5db2a7966ef7f 100644 --- a/docs/content/integrations/embedded-elt.mdx +++ b/docs/content/integrations/embedded-elt.mdx @@ -5,172 +5,253 @@ description: Lightweight ELT framework for building ELT pipelines with Dagster, # Dagster Embedded ELT -This package provides a framework for building ELT pipelines with Dagster through helpful pre-built assets and resources. It is currently in experimental development, and we'd love to hear your feedback. +This package provides a framework for building ELT pipelines with Dagster through helpful asset decorators and resources. It is in experimental development, and we'd love to hear your feedback. -This package currently includes a single implementation using Sling, which provides a simple way to sync data between databases and file systems. +This package includes a single implementation using Sling, which provides a simple way to sync data between databases and file systems. We plan on adding additional embedded ELT tool integrations in the future. --- ---- - -## Getting started +## Overview -To get started with `dagster-embedded-elt` and Sling, familiarize yourself with Sling's replication configuration. +To get started with `dagster-embedded-elt` and Sling, first, familiarize yourself with Sling's replication configuration. The replication configuration is a YAML file that specifies the source and target connections, as well as which streams to sync from. The `dagser-embedded-elt` integration uses this configuration to build the assets for both sources and destinations. The typical pattern for building an ELT pipeline with Sling has three steps: -1. First, define a `replication.yaml` file that specifies the source and target connections, as well as which streams to sync from. -2. Next, define a -for each connection, ensuring you name the resource using the same name given to the connection in the Sling configuration. -3. Create a Resource object and pass all the connections from the previous step to it. -4. Use the decorator to define an asset that will run the Sling replication job and yield from the method to run the sync. +1. First, define a `replication.yaml` file that specifies the source and target connections, as well as which streams to sync from. + +2. Next, create a and pass a list of for each connection to the `connection` parameter, ensuring you name the resource using the same name given to the connection in the Sling configuration. + +3. Use the decorator to define an asset that will run the Sling replication job and yield from the method to run the sync. + +Each step is explained in detail below: -```python --- -## Step 1: Setting up a Sling resource +## Step 1: Setting up a Sling replication configuration -A Sling resource is a Dagster resource that contains references to both a source connection and a target connection. Sling is versatile in what a source or destination can represent. You can provide arbitrary keywords to the and classes. +Dagster's Sling integration is built around Sling's replication configuration. You may provide either a path to an existing `replication.yaml` file, or construct a dictionary that represents the configuration in Python. -The types and parameters for each connection are defined by [Sling's connections](https://docs.slingdata.io/connections/database-connections). +This configuration is passed to the Sling CLI to run the replication job. -The simplest connection is a file connection, which can be defined as: +Here's an example of a `replication.yaml` file: -```python -from dagster_embedded_elt.sling import SlingSourceConnection -source = SlingSourceConnection(type="file") -sling = SlingResource(source_connection=source, ...) -``` +```yaml +SOURCE: MY_POSTGRES +TARGET: MY_SNOWFLAKE -Note that no path is required in the source connection, as that is provided by the asset itself. +defaults: + mode: full-refresh + object: "{stream_schema}_{stream_table}" -```python -asset_def = build_sling_asset( - asset_spec=AssetSpec("my_file"), - source_stream=f"file://{path_to_file}", - ... -) +streams: + public.accounts: + public.users: + public.finance_departments: + object: "departments" ``` -For database connections, you can provide a connection string or a dictionary of keyword arguments. For example, to connect to a SQLite database, you can provide a path to the database using the `instance` keyword, which is specified in [Sling's SQLite connection](https://docs.slingdata.io/connections/database-connections/sqlite) documentation. - -```python -source = SlingSourceConnection(type="sqlite", instance="path/to/sqlite.db") +Or in Python: + +```python file=/integrations/embedded_elt/replication_config.py +replication_config = { + "SOURCE": "MY_POSTGRES", + "TARGET": "MY_DUCKDB", + "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, + "streams": { + "public.accounts": None, + "public.users": None, + "public.finance_departments": {"object": "departments"}, + }, +} ``` ---- +## Step 2: Creating a Sling resource -## Step 2: Creating a Sling sync +Next, you will need to create a object that contains references to the connections specified in the replication configuration. -To create a Sling sync, once you have defined your resource, you can use the factory to create an asset. +A takes a `connections` parameter, where each represents a connection to a source or target database. You may provide as many connections to the `SlingResource` as needed. -```python +The `name` parameter in the should match the `SOURCE` and `TARGET` keys in the replication configuration. -sling_resource = SlingResource( - source_connection=SlingSourceConnection(type="file"), - target_connection=SlingTargetConnection( - type="sqlite", connection_string="sqlite://path/to/sqlite.db" - ), -) +You may pass a connection string or arbitrary keyword arguments to the to specify the connection details. See the Sling connections reference for the specific connection types and parameters. -asset_spec = AssetSpec( - key=["main", "tbl"], - group_name="etl", - description="ETL Test", - deps=["foo"], +```python file=/integrations/embedded_elt/sling_connection_resources.py +from dagster_embedded_elt.sling.resources import ( + SlingConnectionResource, + SlingResource, ) -asset_def = build_sling_asset( - asset_spec=asset_spec, - source_stream="file://path/to/file.csv", - target_object="main.dest_tbl", - mode=SlingMode.INCREMENTAL, - primary_key="id", +from dagster import EnvVar + +sling_resource = SlingResource( + connections=[ + # Using a connection string from an environment variable + SlingConnectionResource( + name="MY_POSTGRES", + type="postgres", + connection_string=EnvVar("POSTGRES_CONNECTION_STRING"), + ), + # Using a hard-coded connection string + SlingConnectionResource( + name="MY_DUCKDB", + type="duckdb", + connection_string="duckdb:///var/tmp/duckdb.db", + ), + # Using a keyword-argument constructor + SlingConnectionResource( + name="MY_SNOWFLAKE", + type="snowflake", + host=EnvVar("SNOWFLAKE_HOST"), + user=EnvVar("SNOWFLAKE_USER"), + role="REPORTING", + ), + ] ) +``` + +## Step 3: Define the Sling assets + +Now you can define a Sling asset using the decorator. Dagster will read the replication configuration to produce Assets. -sling_job = build_assets_job( - "sling_job", - [asset_def], - resource_defs={"sling": sling_resource}, +Each stream will render two assets, one for the source stream and one for the target destination. You may override how assets are named by passing in a custom object. + +```python file=/integrations/embedded_elt/sling_dagster_translator.py +from dagster_embedded_elt import sling +from dagster_embedded_elt.sling import ( + DagsterSlingTranslator, + SlingResource, + sling_assets, ) +from dagster import Definitions, file_relative_path + +replication_config = file_relative_path(__file__, "../sling_replication.yaml") +sling_resource = SlingResource(connections=[...]) # Add connections here + + +@sling_assets(replication_config=replication_config) +def my_assets(context, sling: SlingResource): + yield from sling.replicate( + replication_config=replication_config, + dagster_sling_translator=DagsterSlingTranslator(), + ) + for row in sling.stream_raw_logs(): + context.log.info(row) + + +defs = Definitions( + assets=[ + my_assets, + ], + resources={ + "sling": sling_resource, + }, +) ``` ---- +That's it! You should now be able to view your assets in Dagster and run the replication job. ## Examples This is an example of how to setup a Sling sync between Postgres and Snowflake: ```python file=/integrations/embedded_elt/postgres_snowflake.py -import os - from dagster_embedded_elt.sling import ( - SlingMode, + DagsterSlingTranslator, + SlingConnectionResource, SlingResource, - SlingSourceConnection, - SlingTargetConnection, - build_sling_asset, + sling_assets, ) -from dagster import AssetSpec +from dagster import EnvVar -source = SlingSourceConnection( +source = SlingConnectionResource()( + name="MY_PG", type="postgres", host="localhost", port=5432, database="my_database", user="my_user", - password=os.getenv("PG_PASS"), + password=EnvVar("PG_PASS"), ) -target = SlingTargetConnection( +target = SlingConnectionResource( + name="MY_SF", type="snowflake", host="hostname.snowflake", user="username", database="database", - password=os.getenv("SF_PASSWORD"), + password=EnvVar("SF_PASSWORD"), role="role", ) -sling = SlingResource(source_connection=source, target_connection=target) - -asset_def = build_sling_asset( - asset_spec=AssetSpec("my_asset_name"), - source_stream="public.my_table", - target_object="marts.my_table", - mode=SlingMode.INCREMENTAL, - primary_key="id", +sling = SlingResource( + connections=[ + source, + target, + ] ) +replication_config = { + "SOURCE": "MY_PG", + "TARGET": "MY_SF", + "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, + "streams": { + "public.accounts": None, + "public.users": None, + "public.finance_departments": {"object": "departments"}, + }, +} + + +@sling_assets(replication_config=replication_config) +def my_assets(context, sling: SlingResource): + yield from sling.replicate( + replication_config=replication_config, + dagster_sling_translator=DagsterSlingTranslator(), + ) ``` Similarily, you can define file/storage connections: ```python startafter=start_storage_config endbefore=end_storage_config file=/integrations/embedded_elt/s3_snowflake.py -source = SlingSourceConnection( +source = SlingConnectionResource()( + name="MY_S3", type="s3", bucket="sling-bucket", - access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), - secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), + access_key_id=EnvVar("AWS_ACCESS_KEY_ID"), + secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"), ) -sling = SlingResource(source_connection=source, target_connection=target) - -asset_def = build_sling_asset( - asset_spec=AssetSpec("my_asset_name"), - source_stream="s3://my-bucket/my_file.parquet", - target_object="marts.my_table", - primary_key="id", -) +sling = SlingResource(connections=[source, target]) + +replication_config = { + "SOURCE": "MY_S3", + "TARGET": "MY_SF", + "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, + "streams": { + "s3://my-bucket/my_file.parquet": { + "object": "marts.my_table", + "primary_key": "id", + }, + }, +} + + +@sling_assets +def my_assets(context, sling: SlingResource): + yield from sling.replicate( + replication_config=replication_config, + dagster_sling_translator=DagsterSlingTranslator(), + ) ``` + ## Relevant APIs | Name | Description | | --------------------------------------------------------------------------------- | ------------------------------------------------------------------------------ | -| | The core Sling asset factory for building syncs | +| | The core Sling asset factory for building syncs | | | The Sling Resource used for handing credentials to databases and object stores | -| | The core Sling asset decorator for running a Sling replication job | | | A translator for specifying how to map between Sling and Dagster types | - +| | A Sling connection resource for specifying the connection details | diff --git a/docs/sphinx/sections/api/apidocs/libraries/dagster-embedded-elt.rst b/docs/sphinx/sections/api/apidocs/libraries/dagster-embedded-elt.rst index 6cc1d23e1660b..69b5fcdb03277 100644 --- a/docs/sphinx/sections/api/apidocs/libraries/dagster-embedded-elt.rst +++ b/docs/sphinx/sections/api/apidocs/libraries/dagster-embedded-elt.rst @@ -10,23 +10,30 @@ provides a simple way to sync data between databases and file systems. Related documentation pages: `embedded-elt `_. +.. currentmodule:: dagster_embedded_elt.sling -****** -Sling -****** +*************************** +dagster-embedded-elt.sling +*************************** -.. currentmodule:: dagster_embedded_elt.sling +Assets (Sling) +============== -Assets -====== +.. autodecorator:: sling_assets -.. autofunction:: build_sling_asset +.. autoclass:: DagsterSlingTranslator -Resources -========= +Resources (Sling) +================= .. autoclass:: SlingResource - :members: sync + :members: sync, replicate +.. autoclass:: SlingConnectionResource + +Deprecated +----------- + +.. autofunction:: build_sling_asset .. autoclass:: dagster_embedded_elt.sling.resources.SlingSourceConnection .. autoclass:: dagster_embedded_elt.sling.resources.SlingTargetConnection diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py index cb667468d6c27..cca09ed35d820 100644 --- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py @@ -1,42 +1,56 @@ # pyright: reportCallIssue=none # pyright: reportOptionalMemberAccess=none -import os - from dagster_embedded_elt.sling import ( - SlingMode, + DagsterSlingTranslator, + SlingConnectionResource, SlingResource, - SlingSourceConnection, - SlingTargetConnection, - build_sling_asset, + sling_assets, ) -from dagster import AssetSpec +from dagster import EnvVar -source = SlingSourceConnection( +source = SlingConnectionResource( + name="MY_PG", type="postgres", host="localhost", port=5432, database="my_database", user="my_user", - password=os.getenv("PG_PASS"), + password=EnvVar("PG_PASS"), ) -target = SlingTargetConnection( +target = SlingConnectionResource( + name="MY_SF", type="snowflake", host="hostname.snowflake", user="username", database="database", - password=os.getenv("SF_PASSWORD"), + password=EnvVar("SF_PASSWORD"), role="role", ) -sling = SlingResource(source_connection=source, target_connection=target) - -asset_def = build_sling_asset( - asset_spec=AssetSpec("my_asset_name"), - source_stream="public.my_table", - target_object="marts.my_table", - mode=SlingMode.INCREMENTAL, - primary_key="id", +sling = SlingResource( + connections=[ + source, + target, + ] ) +replication_config = { + "SOURCE": "MY_PG", + "TARGET": "MY_SF", + "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, + "streams": { + "public.accounts": None, + "public.users": None, + "public.finance_departments": {"object": "departments"}, + }, +} + + +@sling_assets(replication_config=replication_config) +def my_assets(context, sling: SlingResource): + yield from sling.replicate( + replication_config=replication_config, + dagster_sling_translator=DagsterSlingTranslator(), + ) diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/replication_config.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/replication_config.py new file mode 100644 index 0000000000000..b2a5abca43540 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/replication_config.py @@ -0,0 +1,10 @@ +replication_config = { + "SOURCE": "MY_POSTGRES", + "TARGET": "MY_DUCKDB", + "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, + "streams": { + "public.accounts": None, + "public.users": None, + "public.finance_departments": {"object": "departments"}, + }, +} diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py index c8f8a66580f2c..229afeda16ecc 100644 --- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py @@ -1,42 +1,56 @@ # pyright: reportCallIssue=none # pyright: reportOptionalMemberAccess=none -import os - from dagster_embedded_elt.sling import ( + DagsterSlingTranslator, + SlingConnectionResource, SlingResource, - SlingSourceConnection, - SlingTargetConnection, - build_sling_asset, + sling_assets, ) -from dagster import AssetSpec +from dagster import EnvVar -target = SlingTargetConnection( +target = SlingConnectionResource( + name="MY_SF", type="snowflake", host="hostname.snowflake", user="username", database="database", - password=os.getenv("SF_PASSWORD"), + password=EnvVar("SF_PASSWORD"), role="role", ) # start_storage_config -source = SlingSourceConnection( +source = SlingConnectionResource( + name="MY_S3", type="s3", bucket="sling-bucket", - access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), - secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), + access_key_id=EnvVar("AWS_ACCESS_KEY_ID"), + secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"), ) -sling = SlingResource(source_connection=source, target_connection=target) +sling = SlingResource(connections=[source, target]) + +replication_config = { + "SOURCE": "MY_S3", + "TARGET": "MY_SF", + "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, + "streams": { + "s3://my-bucket/my_file.parquet": { + "object": "marts.my_table", + "primary_key": "id", + }, + }, +} + + +@sling_assets +def my_assets(context, sling: SlingResource): + yield from sling.replicate( + replication_config=replication_config, + dagster_sling_translator=DagsterSlingTranslator(), + ) -asset_def = build_sling_asset( - asset_spec=AssetSpec("my_asset_name"), - source_stream="s3://my-bucket/my_file.parquet", - target_object="marts.my_table", - primary_key="id", -) # end_storage_config diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_connection_resources.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_connection_resources.py new file mode 100644 index 0000000000000..bd6c76014e188 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_connection_resources.py @@ -0,0 +1,32 @@ +# pyright: reportCallIssue=none +from dagster_embedded_elt.sling.resources import ( + SlingConnectionResource, + SlingResource, +) + +from dagster import EnvVar + +sling_resource = SlingResource( + connections=[ + # Using a connection string from an environment variable + SlingConnectionResource( + name="MY_POSTGRES", + type="postgres", + connection_string=EnvVar("POSTGRES_CONNECTION_STRING"), + ), + # Using a hard-coded connection string + SlingConnectionResource( + name="MY_DUCKDB", + type="duckdb", + connection_string="duckdb:///var/tmp/duckdb.db", + ), + # Using a keyword-argument constructor + SlingConnectionResource( + name="MY_SNOWFLAKE", + type="snowflake", + host=EnvVar("SNOWFLAKE_HOST"), + user=EnvVar("SNOWFLAKE_USER"), + role="REPORTING", + ), + ] +) diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py new file mode 100644 index 0000000000000..09226414221d5 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py @@ -0,0 +1,31 @@ +from dagster_embedded_elt import sling +from dagster_embedded_elt.sling import ( + DagsterSlingTranslator, + SlingResource, + sling_assets, +) + +from dagster import Definitions, file_relative_path + +replication_config = file_relative_path(__file__, "../sling_replication.yaml") +sling_resource = SlingResource(connections=[...]) # Add connections here + + +@sling_assets(replication_config=replication_config) +def my_assets(context, sling: SlingResource): + yield from sling.replicate( + replication_config=replication_config, + dagster_sling_translator=DagsterSlingTranslator(), + ) + for row in sling.stream_raw_logs(): + context.log.info(row) + + +defs = Definitions( + assets=[ + my_assets, + ], + resources={ + "sling": sling_resource, + }, +) diff --git a/examples/docs_snippets/docs_snippets_tests/integrations_tests/test_embedded_elt.py b/examples/docs_snippets/docs_snippets_tests/integrations_tests/test_embedded_elt.py index 8dde56f5a2e43..955598028019b 100644 --- a/examples/docs_snippets/docs_snippets_tests/integrations_tests/test_embedded_elt.py +++ b/examples/docs_snippets/docs_snippets_tests/integrations_tests/test_embedded_elt.py @@ -1,11 +1,15 @@ from docs_snippets.integrations.embedded_elt.postgres_snowflake import ( - asset_def as asset_def_postgres, + my_assets as asset_def_postgres, ) from docs_snippets.integrations.embedded_elt.s3_snowflake import ( - asset_def as asset_def_s3, + my_assets as asset_def_s3, +) +from docs_snippets.integrations.embedded_elt.sling_connection_resources import ( + sling_resource, ) -def test_asset_defs(): +def test_asset_defs() -> None: assert asset_def_postgres assert asset_def_s3 + assert sling_resource diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py index 4dd0c4e813e07..1952d46ee1fa8 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py @@ -39,7 +39,7 @@ def sling_assets( Args: replication_config: Union[Mapping[str, Any], str, Path]: A path to a Sling replication config, or a dictionary - of a replication config. + of a replication config. dagster_sling_translator: DagsterSlingTranslator: Allows customization of how to map a Sling stream to a Dagster AssetKey. partitions_def: Optional[PartitionsDefinition]: The partitions definition for this asset. diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/dagster_sling_translator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/dagster_sling_translator.py index 27962adbda73a..2df39a75a6da6 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/dagster_sling_translator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/dagster_sling_translator.py @@ -49,10 +49,12 @@ def get_asset_key(self, stream_definition: Mapping[str, Any]) -> AssetKey: For example: - stream_definition = {"public.users": - {'sql': 'select all_user_id, name from public."all_Users"', - 'object': 'public.all_users'} - } + .. code-block:: python + + stream_definition = {"public.users": + {'sql': 'select all_user_id, name from public."all_Users"', + 'object': 'public.all_users'} + } By default, this returns the class's target_prefix paramater concatenated with the stream name. A stream named "public.accounts" will create an AssetKey named "target_public_accounts". @@ -78,11 +80,13 @@ def get_asset_key(self, stream_definition: Mapping[str, Any]) -> AssetKey: Examples: Using a custom mapping for streams: - class CustomSlingTranslator(DagsterSlingTranslator): - @classmethod - def get_asset_key_for_target(self, stream_definition) -> AssetKey: - map = {"stream1": "asset1", "stream2": "asset2"} - return AssetKey(map[stream_name]) + .. code-block:: python + + class CustomSlingTranslator(DagsterSlingTranslator): + @classmethod + def get_asset_key_for_target(self, stream_definition) -> AssetKey: + map = {"stream1": "asset1", "stream2": "asset2"} + return AssetKey(map[stream_name]) """ config = stream_definition.get("config", {}) or {} object_key = config.get("object") @@ -131,12 +135,13 @@ def get_deps_asset_key(cls, stream_definition: Mapping[str, Any]) -> Iterable[As Examples: Using a custom mapping for streams: - class CustomSlingTranslator(DagsterSlingTranslator): - @classmethod - def get_deps_asset_key(cls, stream_name: str) -> AssetKey: - map = {"stream1": "asset1", "stream2": "asset2"} - return AssetKey(map[stream_name]) + .. code-block:: python + class CustomSlingTranslator(DagsterSlingTranslator): + @classmethod + def get_deps_asset_key(cls, stream_name: str) -> AssetKey: + map = {"stream1": "asset1", "stream2": "asset2"} + return AssetKey(map[stream_name]) """ config = stream_definition.get("config", {}) or {} diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py index d7afbeefdfcd9..803086dd99a2d 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py @@ -18,7 +18,7 @@ PermissiveConfig, get_dagster_logger, ) -from dagster._annotations import deprecated, experimental +from dagster._annotations import deprecated, experimental, public from dagster._utils.env import environ from dagster._utils.warnings import deprecation_warning from pydantic import Field @@ -33,6 +33,7 @@ DEPRECATION_WARNING_TEXT = "{name} has been deprecated, use `SlingConnectionResource` for both source and target connections." +@public class SlingMode(str, Enum): """The mode to use when syncing. @@ -123,6 +124,7 @@ class SlingTargetConnection(PermissiveConfig): ) +@public class SlingConnectionResource(PermissiveConfig): """A representation a connection to a database or file to be used by Sling. This resource can be used as a source or a target for a Sling sync. @@ -318,6 +320,7 @@ def sync( yield from self._exec_sling_cmd(cmd, encoding=encoding) + @public def replicate( self, *, @@ -325,6 +328,7 @@ def replicate( dagster_sling_translator: DagsterSlingTranslator, debug: bool = False, ): + """Docs go here.""" replication_config = validate_replication(replication_config) stream_definition = get_streams_from_replication(replication_config)