diff --git a/docs/content/_apidocs.mdx b/docs/content/_apidocs.mdx
index 3d9ce3a746fa7..4a3a11bff9ab9 100644
--- a/docs/content/_apidocs.mdx
+++ b/docs/content/_apidocs.mdx
@@ -458,6 +458,13 @@ Dagster also provides a growing set of optional add-on libraries to integrate wi
Fivetran (
diff --git a/docs/content/integrations/embedded-elt.mdx b/docs/content/integrations/embedded-elt.mdx
index db38a819f4f33..5db2a7966ef7f 100644
--- a/docs/content/integrations/embedded-elt.mdx
+++ b/docs/content/integrations/embedded-elt.mdx
@@ -5,172 +5,253 @@ description: Lightweight ELT framework for building ELT pipelines with Dagster,
# Dagster Embedded ELT
-This package provides a framework for building ELT pipelines with Dagster through helpful pre-built assets and resources. It is currently in experimental development, and we'd love to hear your feedback.
+This package provides a framework for building ELT pipelines with Dagster through helpful asset decorators and resources. It is in experimental development, and we'd love to hear your feedback.
-This package currently includes a single implementation using Sling, which provides a simple way to sync data between databases and file systems.
+This package includes a single implementation using Sling, which provides a simple way to sync data between databases and file systems.
We plan on adding additional embedded ELT tool integrations in the future.
---
----
-
-## Getting started
+## Overview
-To get started with `dagster-embedded-elt` and Sling, familiarize yourself with Sling's replication configuration.
+To get started with `dagster-embedded-elt` and Sling, first, familiarize yourself with Sling's replication configuration. The replication configuration is a YAML file that specifies the source and target connections, as well as which streams to sync from. The `dagser-embedded-elt` integration uses this configuration to build the assets for both sources and destinations.
The typical pattern for building an ELT pipeline with Sling has three steps:
-1. First, define a `replication.yaml` file that specifies the source and target connections, as well as which streams to sync from.
-2. Next, define a
-for each connection, ensuring you name the resource using the same name given to the connection in the Sling configuration.
-3. Create a Resource object and pass all the connections from the previous step to it.
-4. Use the decorator to define an asset that will run the Sling replication job and yield from the method to run the sync.
+1. First, define a `replication.yaml` file that specifies the source and target connections, as well as which streams to sync from.
+
+2. Next, create a and pass a list of for each connection to the `connection` parameter, ensuring you name the resource using the same name given to the connection in the Sling configuration.
+
+3. Use the decorator to define an asset that will run the Sling replication job and yield from the method to run the sync.
+
+Each step is explained in detail below:
-```python
---
-## Step 1: Setting up a Sling resource
+## Step 1: Setting up a Sling replication configuration
-A Sling resource is a Dagster resource that contains references to both a source connection and a target connection. Sling is versatile in what a source or destination can represent. You can provide arbitrary keywords to the and classes.
+Dagster's Sling integration is built around Sling's replication configuration. You may provide either a path to an existing `replication.yaml` file, or construct a dictionary that represents the configuration in Python.
-The types and parameters for each connection are defined by [Sling's connections](https://docs.slingdata.io/connections/database-connections).
+This configuration is passed to the Sling CLI to run the replication job.
-The simplest connection is a file connection, which can be defined as:
+Here's an example of a `replication.yaml` file:
-```python
-from dagster_embedded_elt.sling import SlingSourceConnection
-source = SlingSourceConnection(type="file")
-sling = SlingResource(source_connection=source, ...)
-```
+```yaml
+SOURCE: MY_POSTGRES
+TARGET: MY_SNOWFLAKE
-Note that no path is required in the source connection, as that is provided by the asset itself.
+defaults:
+ mode: full-refresh
+ object: "{stream_schema}_{stream_table}"
-```python
-asset_def = build_sling_asset(
- asset_spec=AssetSpec("my_file"),
- source_stream=f"file://{path_to_file}",
- ...
-)
+streams:
+ public.accounts:
+ public.users:
+ public.finance_departments:
+ object: "departments"
```
-For database connections, you can provide a connection string or a dictionary of keyword arguments. For example, to connect to a SQLite database, you can provide a path to the database using the `instance` keyword, which is specified in [Sling's SQLite connection](https://docs.slingdata.io/connections/database-connections/sqlite) documentation.
-
-```python
-source = SlingSourceConnection(type="sqlite", instance="path/to/sqlite.db")
+Or in Python:
+
+```python file=/integrations/embedded_elt/replication_config.py
+replication_config = {
+ "SOURCE": "MY_POSTGRES",
+ "TARGET": "MY_DUCKDB",
+ "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
+ "streams": {
+ "public.accounts": None,
+ "public.users": None,
+ "public.finance_departments": {"object": "departments"},
+ },
+}
```
----
+## Step 2: Creating a Sling resource
-## Step 2: Creating a Sling sync
+Next, you will need to create a object that contains references to the connections specified in the replication configuration.
-To create a Sling sync, once you have defined your resource, you can use the factory to create an asset.
+A takes a `connections` parameter, where each represents a connection to a source or target database. You may provide as many connections to the `SlingResource` as needed.
-```python
+The `name` parameter in the should match the `SOURCE` and `TARGET` keys in the replication configuration.
-sling_resource = SlingResource(
- source_connection=SlingSourceConnection(type="file"),
- target_connection=SlingTargetConnection(
- type="sqlite", connection_string="sqlite://path/to/sqlite.db"
- ),
-)
+You may pass a connection string or arbitrary keyword arguments to the to specify the connection details. See the Sling connections reference for the specific connection types and parameters.
-asset_spec = AssetSpec(
- key=["main", "tbl"],
- group_name="etl",
- description="ETL Test",
- deps=["foo"],
+```python file=/integrations/embedded_elt/sling_connection_resources.py
+from dagster_embedded_elt.sling.resources import (
+ SlingConnectionResource,
+ SlingResource,
)
-asset_def = build_sling_asset(
- asset_spec=asset_spec,
- source_stream="file://path/to/file.csv",
- target_object="main.dest_tbl",
- mode=SlingMode.INCREMENTAL,
- primary_key="id",
+from dagster import EnvVar
+
+sling_resource = SlingResource(
+ connections=[
+ # Using a connection string from an environment variable
+ SlingConnectionResource(
+ name="MY_POSTGRES",
+ type="postgres",
+ connection_string=EnvVar("POSTGRES_CONNECTION_STRING"),
+ ),
+ # Using a hard-coded connection string
+ SlingConnectionResource(
+ name="MY_DUCKDB",
+ type="duckdb",
+ connection_string="duckdb:///var/tmp/duckdb.db",
+ ),
+ # Using a keyword-argument constructor
+ SlingConnectionResource(
+ name="MY_SNOWFLAKE",
+ type="snowflake",
+ host=EnvVar("SNOWFLAKE_HOST"),
+ user=EnvVar("SNOWFLAKE_USER"),
+ role="REPORTING",
+ ),
+ ]
)
+```
+
+## Step 3: Define the Sling assets
+
+Now you can define a Sling asset using the decorator. Dagster will read the replication configuration to produce Assets.
-sling_job = build_assets_job(
- "sling_job",
- [asset_def],
- resource_defs={"sling": sling_resource},
+Each stream will render two assets, one for the source stream and one for the target destination. You may override how assets are named by passing in a custom object.
+
+```python file=/integrations/embedded_elt/sling_dagster_translator.py
+from dagster_embedded_elt import sling
+from dagster_embedded_elt.sling import (
+ DagsterSlingTranslator,
+ SlingResource,
+ sling_assets,
)
+from dagster import Definitions, file_relative_path
+
+replication_config = file_relative_path(__file__, "../sling_replication.yaml")
+sling_resource = SlingResource(connections=[...]) # Add connections here
+
+
+@sling_assets(replication_config=replication_config)
+def my_assets(context, sling: SlingResource):
+ yield from sling.replicate(
+ replication_config=replication_config,
+ dagster_sling_translator=DagsterSlingTranslator(),
+ )
+ for row in sling.stream_raw_logs():
+ context.log.info(row)
+
+
+defs = Definitions(
+ assets=[
+ my_assets,
+ ],
+ resources={
+ "sling": sling_resource,
+ },
+)
```
----
+That's it! You should now be able to view your assets in Dagster and run the replication job.
## Examples
This is an example of how to setup a Sling sync between Postgres and Snowflake:
```python file=/integrations/embedded_elt/postgres_snowflake.py
-import os
-
from dagster_embedded_elt.sling import (
- SlingMode,
+ DagsterSlingTranslator,
+ SlingConnectionResource,
SlingResource,
- SlingSourceConnection,
- SlingTargetConnection,
- build_sling_asset,
+ sling_assets,
)
-from dagster import AssetSpec
+from dagster import EnvVar
-source = SlingSourceConnection(
+source = SlingConnectionResource()(
+ name="MY_PG",
type="postgres",
host="localhost",
port=5432,
database="my_database",
user="my_user",
- password=os.getenv("PG_PASS"),
+ password=EnvVar("PG_PASS"),
)
-target = SlingTargetConnection(
+target = SlingConnectionResource(
+ name="MY_SF",
type="snowflake",
host="hostname.snowflake",
user="username",
database="database",
- password=os.getenv("SF_PASSWORD"),
+ password=EnvVar("SF_PASSWORD"),
role="role",
)
-sling = SlingResource(source_connection=source, target_connection=target)
-
-asset_def = build_sling_asset(
- asset_spec=AssetSpec("my_asset_name"),
- source_stream="public.my_table",
- target_object="marts.my_table",
- mode=SlingMode.INCREMENTAL,
- primary_key="id",
+sling = SlingResource(
+ connections=[
+ source,
+ target,
+ ]
)
+replication_config = {
+ "SOURCE": "MY_PG",
+ "TARGET": "MY_SF",
+ "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
+ "streams": {
+ "public.accounts": None,
+ "public.users": None,
+ "public.finance_departments": {"object": "departments"},
+ },
+}
+
+
+@sling_assets(replication_config=replication_config)
+def my_assets(context, sling: SlingResource):
+ yield from sling.replicate(
+ replication_config=replication_config,
+ dagster_sling_translator=DagsterSlingTranslator(),
+ )
```
Similarily, you can define file/storage connections:
```python startafter=start_storage_config endbefore=end_storage_config file=/integrations/embedded_elt/s3_snowflake.py
-source = SlingSourceConnection(
+source = SlingConnectionResource()(
+ name="MY_S3",
type="s3",
bucket="sling-bucket",
- access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
- secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
+ access_key_id=EnvVar("AWS_ACCESS_KEY_ID"),
+ secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"),
)
-sling = SlingResource(source_connection=source, target_connection=target)
-
-asset_def = build_sling_asset(
- asset_spec=AssetSpec("my_asset_name"),
- source_stream="s3://my-bucket/my_file.parquet",
- target_object="marts.my_table",
- primary_key="id",
-)
+sling = SlingResource(connections=[source, target])
+
+replication_config = {
+ "SOURCE": "MY_S3",
+ "TARGET": "MY_SF",
+ "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
+ "streams": {
+ "s3://my-bucket/my_file.parquet": {
+ "object": "marts.my_table",
+ "primary_key": "id",
+ },
+ },
+}
+
+
+@sling_assets
+def my_assets(context, sling: SlingResource):
+ yield from sling.replicate(
+ replication_config=replication_config,
+ dagster_sling_translator=DagsterSlingTranslator(),
+ )
```
+
## Relevant APIs
| Name | Description |
| --------------------------------------------------------------------------------- | ------------------------------------------------------------------------------ |
-| | The core Sling asset factory for building syncs |
+| | The core Sling asset factory for building syncs |
| | The Sling Resource used for handing credentials to databases and object stores |
-| | The core Sling asset decorator for running a Sling replication job |
| | A translator for specifying how to map between Sling and Dagster types |
-
+| | A Sling connection resource for specifying the connection details |
diff --git a/docs/sphinx/sections/api/apidocs/libraries/dagster-embedded-elt.rst b/docs/sphinx/sections/api/apidocs/libraries/dagster-embedded-elt.rst
index 6cc1d23e1660b..69b5fcdb03277 100644
--- a/docs/sphinx/sections/api/apidocs/libraries/dagster-embedded-elt.rst
+++ b/docs/sphinx/sections/api/apidocs/libraries/dagster-embedded-elt.rst
@@ -10,23 +10,30 @@ provides a simple way to sync data between databases and file systems.
Related documentation pages: `embedded-elt `_.
+.. currentmodule:: dagster_embedded_elt.sling
-******
-Sling
-******
+***************************
+dagster-embedded-elt.sling
+***************************
-.. currentmodule:: dagster_embedded_elt.sling
+Assets (Sling)
+==============
-Assets
-======
+.. autodecorator:: sling_assets
-.. autofunction:: build_sling_asset
+.. autoclass:: DagsterSlingTranslator
-Resources
-=========
+Resources (Sling)
+=================
.. autoclass:: SlingResource
- :members: sync
+ :members: sync, replicate
+.. autoclass:: SlingConnectionResource
+
+Deprecated
+-----------
+
+.. autofunction:: build_sling_asset
.. autoclass:: dagster_embedded_elt.sling.resources.SlingSourceConnection
.. autoclass:: dagster_embedded_elt.sling.resources.SlingTargetConnection
diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py
index cb667468d6c27..cca09ed35d820 100644
--- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py
+++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py
@@ -1,42 +1,56 @@
# pyright: reportCallIssue=none
# pyright: reportOptionalMemberAccess=none
-import os
-
from dagster_embedded_elt.sling import (
- SlingMode,
+ DagsterSlingTranslator,
+ SlingConnectionResource,
SlingResource,
- SlingSourceConnection,
- SlingTargetConnection,
- build_sling_asset,
+ sling_assets,
)
-from dagster import AssetSpec
+from dagster import EnvVar
-source = SlingSourceConnection(
+source = SlingConnectionResource(
+ name="MY_PG",
type="postgres",
host="localhost",
port=5432,
database="my_database",
user="my_user",
- password=os.getenv("PG_PASS"),
+ password=EnvVar("PG_PASS"),
)
-target = SlingTargetConnection(
+target = SlingConnectionResource(
+ name="MY_SF",
type="snowflake",
host="hostname.snowflake",
user="username",
database="database",
- password=os.getenv("SF_PASSWORD"),
+ password=EnvVar("SF_PASSWORD"),
role="role",
)
-sling = SlingResource(source_connection=source, target_connection=target)
-
-asset_def = build_sling_asset(
- asset_spec=AssetSpec("my_asset_name"),
- source_stream="public.my_table",
- target_object="marts.my_table",
- mode=SlingMode.INCREMENTAL,
- primary_key="id",
+sling = SlingResource(
+ connections=[
+ source,
+ target,
+ ]
)
+replication_config = {
+ "SOURCE": "MY_PG",
+ "TARGET": "MY_SF",
+ "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
+ "streams": {
+ "public.accounts": None,
+ "public.users": None,
+ "public.finance_departments": {"object": "departments"},
+ },
+}
+
+
+@sling_assets(replication_config=replication_config)
+def my_assets(context, sling: SlingResource):
+ yield from sling.replicate(
+ replication_config=replication_config,
+ dagster_sling_translator=DagsterSlingTranslator(),
+ )
diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/replication_config.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/replication_config.py
new file mode 100644
index 0000000000000..b2a5abca43540
--- /dev/null
+++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/replication_config.py
@@ -0,0 +1,10 @@
+replication_config = {
+ "SOURCE": "MY_POSTGRES",
+ "TARGET": "MY_DUCKDB",
+ "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
+ "streams": {
+ "public.accounts": None,
+ "public.users": None,
+ "public.finance_departments": {"object": "departments"},
+ },
+}
diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py
index c8f8a66580f2c..229afeda16ecc 100644
--- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py
+++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py
@@ -1,42 +1,56 @@
# pyright: reportCallIssue=none
# pyright: reportOptionalMemberAccess=none
-import os
-
from dagster_embedded_elt.sling import (
+ DagsterSlingTranslator,
+ SlingConnectionResource,
SlingResource,
- SlingSourceConnection,
- SlingTargetConnection,
- build_sling_asset,
+ sling_assets,
)
-from dagster import AssetSpec
+from dagster import EnvVar
-target = SlingTargetConnection(
+target = SlingConnectionResource(
+ name="MY_SF",
type="snowflake",
host="hostname.snowflake",
user="username",
database="database",
- password=os.getenv("SF_PASSWORD"),
+ password=EnvVar("SF_PASSWORD"),
role="role",
)
# start_storage_config
-source = SlingSourceConnection(
+source = SlingConnectionResource(
+ name="MY_S3",
type="s3",
bucket="sling-bucket",
- access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
- secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
+ access_key_id=EnvVar("AWS_ACCESS_KEY_ID"),
+ secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"),
)
-sling = SlingResource(source_connection=source, target_connection=target)
+sling = SlingResource(connections=[source, target])
+
+replication_config = {
+ "SOURCE": "MY_S3",
+ "TARGET": "MY_SF",
+ "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
+ "streams": {
+ "s3://my-bucket/my_file.parquet": {
+ "object": "marts.my_table",
+ "primary_key": "id",
+ },
+ },
+}
+
+
+@sling_assets
+def my_assets(context, sling: SlingResource):
+ yield from sling.replicate(
+ replication_config=replication_config,
+ dagster_sling_translator=DagsterSlingTranslator(),
+ )
-asset_def = build_sling_asset(
- asset_spec=AssetSpec("my_asset_name"),
- source_stream="s3://my-bucket/my_file.parquet",
- target_object="marts.my_table",
- primary_key="id",
-)
# end_storage_config
diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_connection_resources.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_connection_resources.py
new file mode 100644
index 0000000000000..bd6c76014e188
--- /dev/null
+++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_connection_resources.py
@@ -0,0 +1,32 @@
+# pyright: reportCallIssue=none
+from dagster_embedded_elt.sling.resources import (
+ SlingConnectionResource,
+ SlingResource,
+)
+
+from dagster import EnvVar
+
+sling_resource = SlingResource(
+ connections=[
+ # Using a connection string from an environment variable
+ SlingConnectionResource(
+ name="MY_POSTGRES",
+ type="postgres",
+ connection_string=EnvVar("POSTGRES_CONNECTION_STRING"),
+ ),
+ # Using a hard-coded connection string
+ SlingConnectionResource(
+ name="MY_DUCKDB",
+ type="duckdb",
+ connection_string="duckdb:///var/tmp/duckdb.db",
+ ),
+ # Using a keyword-argument constructor
+ SlingConnectionResource(
+ name="MY_SNOWFLAKE",
+ type="snowflake",
+ host=EnvVar("SNOWFLAKE_HOST"),
+ user=EnvVar("SNOWFLAKE_USER"),
+ role="REPORTING",
+ ),
+ ]
+)
diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py
new file mode 100644
index 0000000000000..09226414221d5
--- /dev/null
+++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py
@@ -0,0 +1,31 @@
+from dagster_embedded_elt import sling
+from dagster_embedded_elt.sling import (
+ DagsterSlingTranslator,
+ SlingResource,
+ sling_assets,
+)
+
+from dagster import Definitions, file_relative_path
+
+replication_config = file_relative_path(__file__, "../sling_replication.yaml")
+sling_resource = SlingResource(connections=[...]) # Add connections here
+
+
+@sling_assets(replication_config=replication_config)
+def my_assets(context, sling: SlingResource):
+ yield from sling.replicate(
+ replication_config=replication_config,
+ dagster_sling_translator=DagsterSlingTranslator(),
+ )
+ for row in sling.stream_raw_logs():
+ context.log.info(row)
+
+
+defs = Definitions(
+ assets=[
+ my_assets,
+ ],
+ resources={
+ "sling": sling_resource,
+ },
+)
diff --git a/examples/docs_snippets/docs_snippets_tests/integrations_tests/test_embedded_elt.py b/examples/docs_snippets/docs_snippets_tests/integrations_tests/test_embedded_elt.py
index 8dde56f5a2e43..955598028019b 100644
--- a/examples/docs_snippets/docs_snippets_tests/integrations_tests/test_embedded_elt.py
+++ b/examples/docs_snippets/docs_snippets_tests/integrations_tests/test_embedded_elt.py
@@ -1,11 +1,15 @@
from docs_snippets.integrations.embedded_elt.postgres_snowflake import (
- asset_def as asset_def_postgres,
+ my_assets as asset_def_postgres,
)
from docs_snippets.integrations.embedded_elt.s3_snowflake import (
- asset_def as asset_def_s3,
+ my_assets as asset_def_s3,
+)
+from docs_snippets.integrations.embedded_elt.sling_connection_resources import (
+ sling_resource,
)
-def test_asset_defs():
+def test_asset_defs() -> None:
assert asset_def_postgres
assert asset_def_s3
+ assert sling_resource
diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py
index 4dd0c4e813e07..1952d46ee1fa8 100644
--- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py
+++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py
@@ -39,7 +39,7 @@ def sling_assets(
Args:
replication_config: Union[Mapping[str, Any], str, Path]: A path to a Sling replication config, or a dictionary
- of a replication config.
+ of a replication config.
dagster_sling_translator: DagsterSlingTranslator: Allows customization of how to map a Sling stream to a Dagster
AssetKey.
partitions_def: Optional[PartitionsDefinition]: The partitions definition for this asset.
diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/dagster_sling_translator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/dagster_sling_translator.py
index 27962adbda73a..2df39a75a6da6 100644
--- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/dagster_sling_translator.py
+++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/dagster_sling_translator.py
@@ -49,10 +49,12 @@ def get_asset_key(self, stream_definition: Mapping[str, Any]) -> AssetKey:
For example:
- stream_definition = {"public.users":
- {'sql': 'select all_user_id, name from public."all_Users"',
- 'object': 'public.all_users'}
- }
+ .. code-block:: python
+
+ stream_definition = {"public.users":
+ {'sql': 'select all_user_id, name from public."all_Users"',
+ 'object': 'public.all_users'}
+ }
By default, this returns the class's target_prefix paramater concatenated with the stream name.
A stream named "public.accounts" will create an AssetKey named "target_public_accounts".
@@ -78,11 +80,13 @@ def get_asset_key(self, stream_definition: Mapping[str, Any]) -> AssetKey:
Examples:
Using a custom mapping for streams:
- class CustomSlingTranslator(DagsterSlingTranslator):
- @classmethod
- def get_asset_key_for_target(self, stream_definition) -> AssetKey:
- map = {"stream1": "asset1", "stream2": "asset2"}
- return AssetKey(map[stream_name])
+ .. code-block:: python
+
+ class CustomSlingTranslator(DagsterSlingTranslator):
+ @classmethod
+ def get_asset_key_for_target(self, stream_definition) -> AssetKey:
+ map = {"stream1": "asset1", "stream2": "asset2"}
+ return AssetKey(map[stream_name])
"""
config = stream_definition.get("config", {}) or {}
object_key = config.get("object")
@@ -131,12 +135,13 @@ def get_deps_asset_key(cls, stream_definition: Mapping[str, Any]) -> Iterable[As
Examples:
Using a custom mapping for streams:
- class CustomSlingTranslator(DagsterSlingTranslator):
- @classmethod
- def get_deps_asset_key(cls, stream_name: str) -> AssetKey:
- map = {"stream1": "asset1", "stream2": "asset2"}
- return AssetKey(map[stream_name])
+ .. code-block:: python
+ class CustomSlingTranslator(DagsterSlingTranslator):
+ @classmethod
+ def get_deps_asset_key(cls, stream_name: str) -> AssetKey:
+ map = {"stream1": "asset1", "stream2": "asset2"}
+ return AssetKey(map[stream_name])
"""
config = stream_definition.get("config", {}) or {}
diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py
index d7afbeefdfcd9..803086dd99a2d 100644
--- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py
+++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py
@@ -18,7 +18,7 @@
PermissiveConfig,
get_dagster_logger,
)
-from dagster._annotations import deprecated, experimental
+from dagster._annotations import deprecated, experimental, public
from dagster._utils.env import environ
from dagster._utils.warnings import deprecation_warning
from pydantic import Field
@@ -33,6 +33,7 @@
DEPRECATION_WARNING_TEXT = "{name} has been deprecated, use `SlingConnectionResource` for both source and target connections."
+@public
class SlingMode(str, Enum):
"""The mode to use when syncing.
@@ -123,6 +124,7 @@ class SlingTargetConnection(PermissiveConfig):
)
+@public
class SlingConnectionResource(PermissiveConfig):
"""A representation a connection to a database or file to be used by Sling. This resource can be used as a source or a target for a Sling sync.
@@ -318,6 +320,7 @@ def sync(
yield from self._exec_sling_cmd(cmd, encoding=encoding)
+ @public
def replicate(
self,
*,
@@ -325,6 +328,7 @@ def replicate(
dagster_sling_translator: DagsterSlingTranslator,
debug: bool = False,
):
+ """Docs go here."""
replication_config = validate_replication(replication_config)
stream_definition = get_streams_from_replication(replication_config)
|