diff --git a/docs/content/_apidocs.mdx b/docs/content/_apidocs.mdx
index 3d9ce3a746fa7..0f98dfd3f46bb 100644
--- a/docs/content/_apidocs.mdx
+++ b/docs/content/_apidocs.mdx
@@ -458,6 +458,13 @@ Dagster also provides a growing set of optional add-on libraries to integrate wi
Fivetran (
diff --git a/docs/content/integrations/embedded-elt.mdx b/docs/content/integrations/embedded-elt.mdx
index 8424f6addfa6b..5db2a7966ef7f 100644
--- a/docs/content/integrations/embedded-elt.mdx
+++ b/docs/content/integrations/embedded-elt.mdx
@@ -5,171 +5,253 @@ description: Lightweight ELT framework for building ELT pipelines with Dagster,
# Dagster Embedded ELT
-This package provides a framework for building ELT pipelines with Dagster through helpful pre-built assets and resources. It is currently in experimental development, and we'd love to hear your feedback.
+This package provides a framework for building ELT pipelines with Dagster through helpful asset decorators and resources. It is in experimental development, and we'd love to hear your feedback.
-This package currently includes a single implementation using Sling, which provides a simple way to sync data between databases and file systems.
+This package includes a single implementation using Sling, which provides a simple way to sync data between databases and file systems.
We plan on adding additional embedded ELT tool integrations in the future.
---
-## Getting started
+## Overview
-To get started with `dagster-embedded-elt` and Sling, familiarize yourself with Sling's replication configuration.
+To get started with `dagster-embedded-elt` and Sling, first, familiarize yourself with Sling's replication configuration. The replication configuration is a YAML file that specifies the source and target connections, as well as which streams to sync from. The `dagser-embedded-elt` integration uses this configuration to build the assets for both sources and destinations.
The typical pattern for building an ELT pipeline with Sling has three steps:
1. First, define a `replication.yaml` file that specifies the source and target connections, as well as which streams to sync from.
-2. Next, define a for each connection, ensuring you name the resource using the same name given to the connection in the Sling configuration.
+2. Next, create a and pass a list of for each connection to the `connection` parameter, ensuring you name the resource using the same name given to the connection in the Sling configuration.
-3. Create a Resource object and pass all the connections from the previous step to it.
+3. Use the decorator to define an asset that will run the Sling replication job and yield from the method to run the sync.
-4. Use the decorator to define an asset that will run the Sling replication job and yield from the method to run the sync.
+Each step is explained in detail below:
-````python
---
-## Step 1: Setting up a Sling resource
+## Step 1: Setting up a Sling replication configuration
-A Sling resource is a Dagster resource that contains references to both a source connection and a target connection. Sling is versatile in what a source or destination can represent. You can provide arbitrary keywords to the and classes.
+Dagster's Sling integration is built around Sling's replication configuration. You may provide either a path to an existing `replication.yaml` file, or construct a dictionary that represents the configuration in Python.
-The types and parameters for each connection are defined by [Sling's connections](https://docs.slingdata.io/connections/database-connections).
+This configuration is passed to the Sling CLI to run the replication job.
-The simplest connection is a file connection, which can be defined as:
+Here's an example of a `replication.yaml` file:
-```python
-from dagster_embedded_elt.sling import SlingSourceConnection
-source = SlingSourceConnection(type="file")
-sling = SlingResource(source_connection=source, ...)
-````
+```yaml
+SOURCE: MY_POSTGRES
+TARGET: MY_SNOWFLAKE
-Note that no path is required in the source connection, as that is provided by the asset itself.
+defaults:
+ mode: full-refresh
+ object: "{stream_schema}_{stream_table}"
-```python
-asset_def = build_sling_asset(
- asset_spec=AssetSpec("my_file"),
- source_stream=f"file://{path_to_file}",
- ...
-)
+streams:
+ public.accounts:
+ public.users:
+ public.finance_departments:
+ object: "departments"
```
-For database connections, you can provide a connection string or a dictionary of keyword arguments. For example, to connect to a SQLite database, you can provide a path to the database using the `instance` keyword, which is specified in [Sling's SQLite connection](https://docs.slingdata.io/connections/database-connections/sqlite) documentation.
-
-```python
-source = SlingSourceConnection(type="sqlite", instance="path/to/sqlite.db")
+Or in Python:
+
+```python file=/integrations/embedded_elt/replication_config.py
+replication_config = {
+ "SOURCE": "MY_POSTGRES",
+ "TARGET": "MY_DUCKDB",
+ "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
+ "streams": {
+ "public.accounts": None,
+ "public.users": None,
+ "public.finance_departments": {"object": "departments"},
+ },
+}
```
----
+## Step 2: Creating a Sling resource
-## Step 2: Creating a Sling sync
+Next, you will need to create a object that contains references to the connections specified in the replication configuration.
-To create a Sling sync, once you have defined your resource, you can use the factory to create an asset.
+A takes a `connections` parameter, where each represents a connection to a source or target database. You may provide as many connections to the `SlingResource` as needed.
-```python
+The `name` parameter in the should match the `SOURCE` and `TARGET` keys in the replication configuration.
-sling_resource = SlingResource(
- source_connection=SlingSourceConnection(type="file"),
- target_connection=SlingTargetConnection(
- type="sqlite", connection_string="sqlite://path/to/sqlite.db"
- ),
-)
+You may pass a connection string or arbitrary keyword arguments to the to specify the connection details. See the Sling connections reference for the specific connection types and parameters.
-asset_spec = AssetSpec(
- key=["main", "tbl"],
- group_name="etl",
- description="ETL Test",
- deps=["foo"],
+```python file=/integrations/embedded_elt/sling_connection_resources.py
+from dagster_embedded_elt.sling.resources import (
+ SlingConnectionResource,
+ SlingResource,
)
-asset_def = build_sling_asset(
- asset_spec=asset_spec,
- source_stream="file://path/to/file.csv",
- target_object="main.dest_tbl",
- mode=SlingMode.INCREMENTAL,
- primary_key="id",
+from dagster import EnvVar
+
+sling_resource = SlingResource(
+ connections=[
+ # Using a connection string from an environment variable
+ SlingConnectionResource(
+ name="MY_POSTGRES",
+ type="postgres",
+ connection_string=EnvVar("POSTGRES_CONNECTION_STRING"),
+ ),
+ # Using a hard-coded connection string
+ SlingConnectionResource(
+ name="MY_DUCKDB",
+ type="duckdb",
+ connection_string="duckdb:///var/tmp/duckdb.db",
+ ),
+ # Using a keyword-argument constructor
+ SlingConnectionResource(
+ name="MY_SNOWFLAKE",
+ type="snowflake",
+ host=EnvVar("SNOWFLAKE_HOST"),
+ user=EnvVar("SNOWFLAKE_USER"),
+ role="REPORTING",
+ ),
+ ]
)
+```
+
+## Step 3: Define the Sling assets
-sling_job = build_assets_job(
- "sling_job",
- [asset_def],
- resource_defs={"sling": sling_resource},
+Now you can define a Sling asset using the decorator. Dagster will read the replication configuration to produce Assets.
+
+Each stream will render two assets, one for the source stream and one for the target destination. You may override how assets are named by passing in a custom object.
+
+```python file=/integrations/embedded_elt/sling_dagster_translator.py
+from dagster_embedded_elt import sling
+from dagster_embedded_elt.sling import (
+ DagsterSlingTranslator,
+ SlingResource,
+ sling_assets,
)
+from dagster import Definitions, file_relative_path
+
+replication_config = file_relative_path(__file__, "../sling_replication.yaml")
+sling_resource = SlingResource(connections=[...]) # Add connections here
+
+
+@sling_assets(replication_config=replication_config)
+def my_assets(context, sling: SlingResource):
+ yield from sling.replicate(
+ replication_config=replication_config,
+ dagster_sling_translator=DagsterSlingTranslator(),
+ )
+ for row in sling.stream_raw_logs():
+ context.log.info(row)
+
+
+defs = Definitions(
+ assets=[
+ my_assets,
+ ],
+ resources={
+ "sling": sling_resource,
+ },
+)
```
----
+That's it! You should now be able to view your assets in Dagster and run the replication job.
## Examples
This is an example of how to setup a Sling sync between Postgres and Snowflake:
```python file=/integrations/embedded_elt/postgres_snowflake.py
-import os
-
from dagster_embedded_elt.sling import (
- SlingMode,
+ DagsterSlingTranslator,
+ SlingConnectionResource,
SlingResource,
- SlingSourceConnection,
- SlingTargetConnection,
- build_sling_asset,
+ sling_assets,
)
-from dagster import AssetSpec
+from dagster import EnvVar
-source = SlingSourceConnection(
+source = SlingConnectionResource()(
+ name="MY_PG",
type="postgres",
host="localhost",
port=5432,
database="my_database",
user="my_user",
- password=os.getenv("PG_PASS"),
+ password=EnvVar("PG_PASS"),
)
-target = SlingTargetConnection(
+target = SlingConnectionResource(
+ name="MY_SF",
type="snowflake",
host="hostname.snowflake",
user="username",
database="database",
- password=os.getenv("SF_PASSWORD"),
+ password=EnvVar("SF_PASSWORD"),
role="role",
)
-sling = SlingResource(source_connection=source, target_connection=target)
-
-asset_def = build_sling_asset(
- asset_spec=AssetSpec("my_asset_name"),
- source_stream="public.my_table",
- target_object="marts.my_table",
- mode=SlingMode.INCREMENTAL,
- primary_key="id",
+sling = SlingResource(
+ connections=[
+ source,
+ target,
+ ]
)
+replication_config = {
+ "SOURCE": "MY_PG",
+ "TARGET": "MY_SF",
+ "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
+ "streams": {
+ "public.accounts": None,
+ "public.users": None,
+ "public.finance_departments": {"object": "departments"},
+ },
+}
+
+
+@sling_assets(replication_config=replication_config)
+def my_assets(context, sling: SlingResource):
+ yield from sling.replicate(
+ replication_config=replication_config,
+ dagster_sling_translator=DagsterSlingTranslator(),
+ )
```
Similarily, you can define file/storage connections:
```python startafter=start_storage_config endbefore=end_storage_config file=/integrations/embedded_elt/s3_snowflake.py
-source = SlingSourceConnection(
+source = SlingConnectionResource()(
+ name="MY_S3",
type="s3",
bucket="sling-bucket",
- access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
- secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
+ access_key_id=EnvVar("AWS_ACCESS_KEY_ID"),
+ secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"),
)
-sling = SlingResource(source_connection=source, target_connection=target)
-
-asset_def = build_sling_asset(
- asset_spec=AssetSpec("my_asset_name"),
- source_stream="s3://my-bucket/my_file.parquet",
- target_object="marts.my_table",
- primary_key="id",
-)
+sling = SlingResource(connections=[source, target])
+
+replication_config = {
+ "SOURCE": "MY_S3",
+ "TARGET": "MY_SF",
+ "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
+ "streams": {
+ "s3://my-bucket/my_file.parquet": {
+ "object": "marts.my_table",
+ "primary_key": "id",
+ },
+ },
+}
+
+
+@sling_assets
+def my_assets(context, sling: SlingResource):
+ yield from sling.replicate(
+ replication_config=replication_config,
+ dagster_sling_translator=DagsterSlingTranslator(),
+ )
```
## Relevant APIs
-| Name | Description |
-| -------------------------------------------------------------------------------- | ------------------------------------------------------------------------------ |
-| | The core Sling asset factory for building syncs |
-| | The Sling Resource used for handing credentials to databases and object stores |
-| | A translator for specifying how to map between Sling and Dagster types |
+| Name | Description |
+| --------------------------------------------------------------------------------- | ------------------------------------------------------------------------------ |
+| | The core Sling asset factory for building syncs |
+| | The Sling Resource used for handing credentials to databases and object stores |
+| | A translator for specifying how to map between Sling and Dagster types |
+| | A Sling connection resource for specifying the connection details |
diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py
index cb667468d6c27..cca09ed35d820 100644
--- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py
+++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py
@@ -1,42 +1,56 @@
# pyright: reportCallIssue=none
# pyright: reportOptionalMemberAccess=none
-import os
-
from dagster_embedded_elt.sling import (
- SlingMode,
+ DagsterSlingTranslator,
+ SlingConnectionResource,
SlingResource,
- SlingSourceConnection,
- SlingTargetConnection,
- build_sling_asset,
+ sling_assets,
)
-from dagster import AssetSpec
+from dagster import EnvVar
-source = SlingSourceConnection(
+source = SlingConnectionResource(
+ name="MY_PG",
type="postgres",
host="localhost",
port=5432,
database="my_database",
user="my_user",
- password=os.getenv("PG_PASS"),
+ password=EnvVar("PG_PASS"),
)
-target = SlingTargetConnection(
+target = SlingConnectionResource(
+ name="MY_SF",
type="snowflake",
host="hostname.snowflake",
user="username",
database="database",
- password=os.getenv("SF_PASSWORD"),
+ password=EnvVar("SF_PASSWORD"),
role="role",
)
-sling = SlingResource(source_connection=source, target_connection=target)
-
-asset_def = build_sling_asset(
- asset_spec=AssetSpec("my_asset_name"),
- source_stream="public.my_table",
- target_object="marts.my_table",
- mode=SlingMode.INCREMENTAL,
- primary_key="id",
+sling = SlingResource(
+ connections=[
+ source,
+ target,
+ ]
)
+replication_config = {
+ "SOURCE": "MY_PG",
+ "TARGET": "MY_SF",
+ "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
+ "streams": {
+ "public.accounts": None,
+ "public.users": None,
+ "public.finance_departments": {"object": "departments"},
+ },
+}
+
+
+@sling_assets(replication_config=replication_config)
+def my_assets(context, sling: SlingResource):
+ yield from sling.replicate(
+ replication_config=replication_config,
+ dagster_sling_translator=DagsterSlingTranslator(),
+ )
diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/replication_config.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/replication_config.py
new file mode 100644
index 0000000000000..b2a5abca43540
--- /dev/null
+++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/replication_config.py
@@ -0,0 +1,10 @@
+replication_config = {
+ "SOURCE": "MY_POSTGRES",
+ "TARGET": "MY_DUCKDB",
+ "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
+ "streams": {
+ "public.accounts": None,
+ "public.users": None,
+ "public.finance_departments": {"object": "departments"},
+ },
+}
diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py
index c8f8a66580f2c..9d13fe66d1496 100644
--- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py
+++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py
@@ -1,42 +1,56 @@
# pyright: reportCallIssue=none
# pyright: reportOptionalMemberAccess=none
-import os
-
from dagster_embedded_elt.sling import (
+ DagsterSlingTranslator,
+ SlingConnectionResource,
SlingResource,
- SlingSourceConnection,
- SlingTargetConnection,
- build_sling_asset,
+ sling_assets,
)
-from dagster import AssetSpec
+from dagster import EnvVar
-target = SlingTargetConnection(
+target = SlingConnectionResource(
+ name="MY_SF",
type="snowflake",
host="hostname.snowflake",
user="username",
database="database",
- password=os.getenv("SF_PASSWORD"),
+ password=EnvVar("SF_PASSWORD"),
role="role",
)
# start_storage_config
-source = SlingSourceConnection(
+source = SlingConnectionResource()(
+ name="MY_S3",
type="s3",
bucket="sling-bucket",
- access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
- secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
+ access_key_id=EnvVar("AWS_ACCESS_KEY_ID"),
+ secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"),
)
-sling = SlingResource(source_connection=source, target_connection=target)
+sling = SlingResource(connections=[source, target])
+
+replication_config = {
+ "SOURCE": "MY_S3",
+ "TARGET": "MY_SF",
+ "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
+ "streams": {
+ "s3://my-bucket/my_file.parquet": {
+ "object": "marts.my_table",
+ "primary_key": "id",
+ },
+ },
+}
+
+
+@sling_assets
+def my_assets(context, sling: SlingResource):
+ yield from sling.replicate(
+ replication_config=replication_config,
+ dagster_sling_translator=DagsterSlingTranslator(),
+ )
-asset_def = build_sling_asset(
- asset_spec=AssetSpec("my_asset_name"),
- source_stream="s3://my-bucket/my_file.parquet",
- target_object="marts.my_table",
- primary_key="id",
-)
# end_storage_config
diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_connection_resources.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_connection_resources.py
new file mode 100644
index 0000000000000..bd6c76014e188
--- /dev/null
+++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_connection_resources.py
@@ -0,0 +1,32 @@
+# pyright: reportCallIssue=none
+from dagster_embedded_elt.sling.resources import (
+ SlingConnectionResource,
+ SlingResource,
+)
+
+from dagster import EnvVar
+
+sling_resource = SlingResource(
+ connections=[
+ # Using a connection string from an environment variable
+ SlingConnectionResource(
+ name="MY_POSTGRES",
+ type="postgres",
+ connection_string=EnvVar("POSTGRES_CONNECTION_STRING"),
+ ),
+ # Using a hard-coded connection string
+ SlingConnectionResource(
+ name="MY_DUCKDB",
+ type="duckdb",
+ connection_string="duckdb:///var/tmp/duckdb.db",
+ ),
+ # Using a keyword-argument constructor
+ SlingConnectionResource(
+ name="MY_SNOWFLAKE",
+ type="snowflake",
+ host=EnvVar("SNOWFLAKE_HOST"),
+ user=EnvVar("SNOWFLAKE_USER"),
+ role="REPORTING",
+ ),
+ ]
+)
diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py
new file mode 100644
index 0000000000000..09226414221d5
--- /dev/null
+++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py
@@ -0,0 +1,31 @@
+from dagster_embedded_elt import sling
+from dagster_embedded_elt.sling import (
+ DagsterSlingTranslator,
+ SlingResource,
+ sling_assets,
+)
+
+from dagster import Definitions, file_relative_path
+
+replication_config = file_relative_path(__file__, "../sling_replication.yaml")
+sling_resource = SlingResource(connections=[...]) # Add connections here
+
+
+@sling_assets(replication_config=replication_config)
+def my_assets(context, sling: SlingResource):
+ yield from sling.replicate(
+ replication_config=replication_config,
+ dagster_sling_translator=DagsterSlingTranslator(),
+ )
+ for row in sling.stream_raw_logs():
+ context.log.info(row)
+
+
+defs = Definitions(
+ assets=[
+ my_assets,
+ ],
+ resources={
+ "sling": sling_resource,
+ },
+)
diff --git a/examples/docs_snippets/docs_snippets_tests/integrations_tests/test_embedded_elt.py b/examples/docs_snippets/docs_snippets_tests/integrations_tests/test_embedded_elt.py
index 8dde56f5a2e43..955598028019b 100644
--- a/examples/docs_snippets/docs_snippets_tests/integrations_tests/test_embedded_elt.py
+++ b/examples/docs_snippets/docs_snippets_tests/integrations_tests/test_embedded_elt.py
@@ -1,11 +1,15 @@
from docs_snippets.integrations.embedded_elt.postgres_snowflake import (
- asset_def as asset_def_postgres,
+ my_assets as asset_def_postgres,
)
from docs_snippets.integrations.embedded_elt.s3_snowflake import (
- asset_def as asset_def_s3,
+ my_assets as asset_def_s3,
+)
+from docs_snippets.integrations.embedded_elt.sling_connection_resources import (
+ sling_resource,
)
-def test_asset_defs():
+def test_asset_defs() -> None:
assert asset_def_postgres
assert asset_def_s3
+ assert sling_resource
|