Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[embedded-elt] Update Sling docs and examples #20028

Merged
merged 7 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/content/_apidocs.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,13 @@ Dagster also provides a growing set of optional add-on libraries to integrate wi
</td>
<td>Provides support for storing PySpark DataFrames in DuckDB.</td>
</tr>
<tr>
<td>
<a href="/_apidocs/libraries/dagster-embedded-elt">Embedded ELT</a> (
<code>dagster-embedded-elt</code>)
</td>
<td>Provides support for running embedded ELT within Dagster</td>
</tr>
<tr>
<td>
<a href="/_apidocs/libraries/dagster-fivetran">Fivetran</a> (
Expand Down
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
266 changes: 176 additions & 90 deletions docs/content/integrations/embedded-elt.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -5,167 +5,253 @@ description: Lightweight ELT framework for building ELT pipelines with Dagster,

# Dagster Embedded ELT

This package provides a framework for building ELT pipelines with Dagster through helpful pre-built assets and resources. It is currently in experimental development, and we'd love to hear your feedback.
This package provides a framework for building ELT pipelines with Dagster through helpful asset decorators and resources. It is in experimental development, and we'd love to hear your feedback.

This package currently includes a single implementation using <a href="https://slingdata.io">Sling</a>, which provides a simple way to sync data between databases and file systems.
This package includes a single implementation using <a href="https://slingdata.io">Sling</a>, which provides a simple way to sync data between databases and file systems.

We plan on adding additional embedded ELT tool integrations in the future.

---

## Relevant APIs
## Overview

| Name | Description |
| --------------------------------------------------------------------------- | ------------------------------------------------------------------------------ |
| <PyObject module="dagster_embedded_elt.sling" object="build_sling_asset" /> | The core Sling asset factory for building syncs |
| <PyObject module="dagster_embedded_elt.sling" object="SlingResource" /> | The Sling Resource used for handing credentials to databases and object stores |
To get started with `dagster-embedded-elt` and Sling, first, familiarize yourself with <a href="https://docs.slingdata.io/sling-cli/run/configuration/replication">Sling's replication</a> configuration. The replication configuration is a YAML file that specifies the source and target connections, as well as which streams to sync from. The `dagser-embedded-elt` integration uses this configuration to build the assets for both sources and destinations.
PedramNavid marked this conversation as resolved.
Show resolved Hide resolved

---
The typical pattern for building an ELT pipeline with Sling has three steps:

## Getting started
1. First, define a <a href="https://docs.slingdata.io/sling-cli/run/configuration/replication"> `replication.yaml`</a> file that specifies the source and target connections, as well as which streams to sync from.

To get started with `dagster-embedded-elt` and Sling, familiarize yourself with <a href="https://docs.slingdata.io/sling-cli/run">Sling</a> by reading their docs which describe how sources and targets are configured.
2. Next, create a <PyObject module="dagster_embedded_elt.sling" object="SlingResource" /> and pass a list of <PyObject module="dagster_embedded_elt.sling" object="SlingConnectionResource" /> for each connection to the `connection` parameter, ensuring you name the resource using the same name given to the connection in the Sling configuration.

The typical pattern for building an ELT pipeline with Sling has three steps:
3. Use the <PyObject module="dagster_embedded_elt.sling" object="sling_assets" /> decorator to define an asset that will run the Sling replication job and yield from the <PyObject module="dagster_embedded_elt.sling" object="SlingResource" method="replicate" /> method to run the sync.
PedramNavid marked this conversation as resolved.
Show resolved Hide resolved

1. First, create a <PyObject module="dagster-embedded-elt.sling" object="SlingResource" /> which is a container for the source and the target.
2. In the <PyObject module="dagster-embedded-elt.sling" object="SlingResource" /> define both a <PyObject module="dagster-embedded-elt.sling" object="SlingSourceConnection" /> and a <PyObject module="dagster-embedded-elt.sling" object="SlingTargetConnection" /> which holds the source and target credentials that Sling will use to sync data.
3. Finally, create an asset that syncs between two connections. You can use the <PyObject module="dagster-embedded-elt.sling" object="build_sling_asset" /> factory for most use cases.
Each step is explained in detail below:

---

## Step 1: Setting up a Sling resource
## Step 1: Setting up a Sling replication configuration
PedramNavid marked this conversation as resolved.
Show resolved Hide resolved

A Sling resource is a Dagster resource that contains references to both a source connection and a target connection. Sling is versatile in what a source or destination can represent. You can provide arbitrary keywords to the <PyObject module="dagster-embedded-elt.sling" object="SlingSourceConnection" /> and <PyObject module="dagster-embedded-elt.sling" object="SlingTargetConnection" /> classes.
Dagster's Sling integration is built around Sling's replication configuration. You may provide either a path to an existing `replication.yaml` file, or construct a dictionary that represents the configuration in Python.
PedramNavid marked this conversation as resolved.
Show resolved Hide resolved

The types and parameters for each connection are defined by [Sling's connections](https://docs.slingdata.io/connections/database-connections).
This configuration is passed to the Sling CLI to run the replication job.

The simplest connection is a file connection, which can be defined as:
Here's an example of a `replication.yaml` file:

```python
from dagster_embedded_elt.sling import SlingSourceConnection
source = SlingSourceConnection(type="file")
sling = SlingResource(source_connection=source, ...)
```
```yaml
PedramNavid marked this conversation as resolved.
Show resolved Hide resolved
SOURCE: MY_POSTGRES
TARGET: MY_SNOWFLAKE

Note that no path is required in the source connection, as that is provided by the asset itself.
defaults:
mode: full-refresh
object: "{stream_schema}_{stream_table}"

```python
asset_def = build_sling_asset(
asset_spec=AssetSpec("my_file"),
source_stream=f"file://{path_to_file}",
...
)
streams:
public.accounts:
public.users:
public.finance_departments:
object: "departments"
```

For database connections, you can provide a connection string or a dictionary of keyword arguments. For example, to connect to a SQLite database, you can provide a path to the database using the `instance` keyword, which is specified in [Sling's SQLite connection](https://docs.slingdata.io/connections/database-connections/sqlite) documentation.

```python
source = SlingSourceConnection(type="sqlite", instance="path/to/sqlite.db")
Or in Python:

```python file=/integrations/embedded_elt/replication_config.py
replication_config = {
"SOURCE": "MY_POSTGRES",
"TARGET": "MY_DUCKDB",
"defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
"streams": {
"public.accounts": None,
"public.users": None,
"public.finance_departments": {"object": "departments"},
},
}
```

---
## Step 2: Creating a Sling resource
PedramNavid marked this conversation as resolved.
Show resolved Hide resolved

## Step 2: Creating a Sling sync
Next, you will need to create a <PyObject module="dagster_embedded_elt.sling" object="SlingResource" /> object that contains references to the connections specified in the replication configuration.

To create a Sling sync, once you have defined your resource, you can use the <PyObject module="dagster_embedded_elt.sling" object="build_sling_asset" /> factory to create an asset.
A <PyObject module="dagster_embedded_elt.sling" object="SlingResource" /> takes a `connections` parameter, where each <PyObject module="dagster_embedded_elt.sling" object="SlingConnectionResource" /> represents a connection to a source or target database. You may provide as many connections to the `SlingResource` as needed.

```python
The `name` parameter in the <PyObject module="dagster_embedded_elt.sling" object="SlingConnectionResource" /> should match the `SOURCE` and `TARGET` keys in the replication configuration.

sling_resource = SlingResource(
source_connection=SlingSourceConnection(type="file"),
target_connection=SlingTargetConnection(
type="sqlite", connection_string="sqlite://path/to/sqlite.db"
),
)
You may pass a connection string or arbitrary keyword arguments to the <PyObject module="dagster_embedded_elt.sling" object="SlingConnectionResource" /> to specify the connection details. See the <a href="https://docs.slingdata.io/connections/database-connections">Sling connections reference</a> for the specific connection types and parameters.
cmpadden marked this conversation as resolved.
Show resolved Hide resolved

asset_spec = AssetSpec(
key=["main", "tbl"],
group_name="etl",
description="ETL Test",
deps=["foo"],
```python file=/integrations/embedded_elt/sling_connection_resources.py
from dagster_embedded_elt.sling.resources import (
SlingConnectionResource,
SlingResource,
)

asset_def = build_sling_asset(
asset_spec=asset_spec,
source_stream="file://path/to/file.csv",
target_object="main.dest_tbl",
mode=SlingMode.INCREMENTAL,
primary_key="id",
from dagster import EnvVar

sling_resource = SlingResource(
connections=[
# Using a connection string from an environment variable
SlingConnectionResource(
name="MY_POSTGRES",
type="postgres",
connection_string=EnvVar("POSTGRES_CONNECTION_STRING"),
),
# Using a hard-coded connection string
SlingConnectionResource(
name="MY_DUCKDB",
type="duckdb",
connection_string="duckdb:///var/tmp/duckdb.db",
),
# Using a keyword-argument constructor
SlingConnectionResource(
name="MY_SNOWFLAKE",
type="snowflake",
host=EnvVar("SNOWFLAKE_HOST"),
user=EnvVar("SNOWFLAKE_USER"),
role="REPORTING",
),
]
)
```

## Step 3: Define the Sling assets
PedramNavid marked this conversation as resolved.
Show resolved Hide resolved

Now you can define a Sling asset using the <PyObject module="dagster_embedded_elt.sling" object="sling_assets" /> decorator. Dagster will read the replication configuration to produce Assets.
PedramNavid marked this conversation as resolved.
Show resolved Hide resolved

Each stream will render two assets, one for the source stream and one for the target destination. You may override how assets are named by passing in a custom <PyObject module="dagster_embedded_elt.sling" object="DagsterSlingTranslator" /> object.

sling_job = build_assets_job(
"sling_job",
[asset_def],
resource_defs={"sling": sling_resource},
```python file=/integrations/embedded_elt/sling_dagster_translator.py
from dagster_embedded_elt import sling
from dagster_embedded_elt.sling import (
DagsterSlingTranslator,
SlingResource,
sling_assets,
)

from dagster import Definitions, file_relative_path

replication_config = file_relative_path(__file__, "../sling_replication.yaml")
sling_resource = SlingResource(connections=[...]) # Add connections here


@sling_assets(replication_config=replication_config)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
for row in sling.stream_raw_logs():
context.log.info(row)


defs = Definitions(
assets=[
my_assets,
],
resources={
"sling": sling_resource,
},
)
```

---
That's it! You should now be able to view your assets in Dagster and run the replication job.
PedramNavid marked this conversation as resolved.
Show resolved Hide resolved

## Examples
PedramNavid marked this conversation as resolved.
Show resolved Hide resolved

This is an example of how to setup a Sling sync between Postgres and Snowflake:
PedramNavid marked this conversation as resolved.
Show resolved Hide resolved

```python file=/integrations/embedded_elt/postgres_snowflake.py
import os

from dagster_embedded_elt.sling import (
SlingMode,
DagsterSlingTranslator,
SlingConnectionResource,
SlingResource,
SlingSourceConnection,
SlingTargetConnection,
build_sling_asset,
sling_assets,
)

from dagster import AssetSpec
from dagster import EnvVar

source = SlingSourceConnection(
source = SlingConnectionResource(
name="MY_PG",
type="postgres",
host="localhost",
port=5432,
database="my_database",
user="my_user",
password=os.getenv("PG_PASS"),
password=EnvVar("PG_PASS"),
)

target = SlingTargetConnection(
target = SlingConnectionResource(
name="MY_SF",
type="snowflake",
host="hostname.snowflake",
user="username",
database="database",
password=os.getenv("SF_PASSWORD"),
password=EnvVar("SF_PASSWORD"),
role="role",
)

sling = SlingResource(source_connection=source, target_connection=target)

asset_def = build_sling_asset(
asset_spec=AssetSpec("my_asset_name"),
source_stream="public.my_table",
target_object="marts.my_table",
mode=SlingMode.INCREMENTAL,
primary_key="id",
sling = SlingResource(
connections=[
source,
target,
]
)
replication_config = {
"SOURCE": "MY_PG",
"TARGET": "MY_SF",
"defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
"streams": {
"public.accounts": None,
"public.users": None,
"public.finance_departments": {"object": "departments"},
},
}


@sling_assets(replication_config=replication_config)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
```

Similarily, you can define file/storage connections:

```python startafter=start_storage_config endbefore=end_storage_config file=/integrations/embedded_elt/s3_snowflake.py
source = SlingSourceConnection(
source = SlingConnectionResource(
name="MY_S3",
type="s3",
bucket="sling-bucket",
access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
access_key_id=EnvVar("AWS_ACCESS_KEY_ID"),
secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"),
)

sling = SlingResource(source_connection=source, target_connection=target)

asset_def = build_sling_asset(
asset_spec=AssetSpec("my_asset_name"),
source_stream="s3://my-bucket/my_file.parquet",
target_object="marts.my_table",
primary_key="id",
)
sling = SlingResource(connections=[source, target])

replication_config = {
"SOURCE": "MY_S3",
"TARGET": "MY_SF",
"defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
"streams": {
"s3://my-bucket/my_file.parquet": {
"object": "marts.my_table",
"primary_key": "id",
},
},
}


@sling_assets(replication_config=replication_config)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
```

## Relevant APIs
PedramNavid marked this conversation as resolved.
Show resolved Hide resolved

| Name | Description |
| --------------------------------------------------------------------------------- | ------------------------------------------------------------------------------ |
| <PyObject module="dagster_embedded_elt.sling" object="sling_assets" /> | The core Sling asset factory for building syncs |
PedramNavid marked this conversation as resolved.
Show resolved Hide resolved
| <PyObject module="dagster_embedded_elt.sling" object="SlingResource" /> | The Sling Resource used for handing credentials to databases and object stores |
PedramNavid marked this conversation as resolved.
Show resolved Hide resolved
| <PyObject module="dagster_embedded_elt.sling" object="DagsterSlingTranslator" /> | A translator for specifying how to map between Sling and Dagster types |
| <PyObject module="dagster_embedded_elt.sling" object="SlingConnectionResource" /> | A Sling connection resource for specifying the connection details |
PedramNavid marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,30 @@ provides a simple way to sync data between databases and file systems.

Related documentation pages: `embedded-elt </integrations/embedded-elt>`_.

.. currentmodule:: dagster_embedded_elt.sling

******
Sling
******
***************************
dagster-embedded-elt.sling
***************************

.. currentmodule:: dagster_embedded_elt.sling
Assets (Sling)
==============

Assets
======
.. autodecorator:: sling_assets

.. autofunction:: build_sling_asset
.. autoclass:: DagsterSlingTranslator

Resources
=========
Resources (Sling)
=================

.. autoclass:: SlingResource
:members: sync
:members: sync, replicate

.. autoclass:: SlingConnectionResource

Deprecated
-----------

.. autofunction:: build_sling_asset
.. autoclass:: dagster_embedded_elt.sling.resources.SlingSourceConnection
.. autoclass:: dagster_embedded_elt.sling.resources.SlingTargetConnection
Loading
Loading