-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
docs: update embedded ELT and api documentation
- Corrected the description of the embedded ELT feature in _apidocs.mdx - Added details on configuration, credential management and resource creation for the Sling integration in embedded-elt.mdx
- Loading branch information
1 parent
b20e510
commit 5b4f3c8
Showing
3 changed files
with
3 additions
and
264 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,261 +0,0 @@ | ||
--- | ||
title: "Dagster Embedded ELT" | ||
description: Lightweight ELT framework for building ELT pipelines with Dagster, through helpful pre-built assets and resources | ||
--- | ||
|
||
# Dagster Embedded ELT | ||
|
||
This package provides a framework for building ELT pipelines with Dagster through helpful asset decorators and resources. It is in experimental development, and we'd love to hear your feedback. | ||
|
||
This package includes a single implementation using <a href="https://slingdata.io">Sling</a>, which provides a simple way to sync data between databases and file systems. | ||
|
||
We plan on adding additional embedded ELT tool integrations in the future. | ||
|
||
--- | ||
|
||
## Overview | ||
|
||
To get started with `dagster-embedded-elt` and Sling, first, familiarize yourself with <a href="https://docs.slingdata.io/sling-cli/run/configuration/replication">Sling's replication</a> configuration. The replication configuration is a YAML file that specifies the source and target connections, as well as which streams to sync from. The `dagser-embedded-elt` integration uses this configuration to build the assets for both sources and destinations. | ||
|
||
The typical pattern for building an ELT pipeline with Sling has three steps: | ||
|
||
1. First, define a <a href="https://docs.slingdata.io/sling-cli/run/configuration/replication"> `replication.yaml`</a> file that specifies the source and target connections, as well as which streams to sync from. | ||
|
||
2. Next, create a <PyObject module="dagster_embedded_elt.sling" object="SlingResource" /> and pass a list of <PyObject module="dagster_embedded_elt.sling" object="SlingConnectionResource" /> for each connection to the `connection` parameter, ensuring you name the resource using the same name given to the connection in the Sling configuration. | ||
|
||
3. Use the <PyObject module="dagster_embedded_elt.sling" object="sling_assets" /> decorator to define an asset that will run the Sling replication job and yield from the <PyObject module="dagster_embedded_elt.sling" object="SlingResource" method="replicate" /> method to run the sync. | ||
|
||
Each step is explained in detail below: | ||
|
||
--- | ||
|
||
## Step 1: Setting up a Sling replication configuration | ||
|
||
Dagster's Sling integration is built around Sling's replication configuration. You may provide either a path to an existing `replication.yaml` file, or construct a dictionary that represents the configuration in Python. | ||
|
||
This configuration is passed to the Sling CLI to run the replication job, and as such needs to be able to be read by Dagster. | ||
The simplest way is to colocate the code with your existing Dagster code and use the `file_relative_path` function to point to the `replication.yaml` file, however you may choose to store this in an external system like S3 and read from there instead. | ||
|
||
Here's an example of a `replication.yaml` file: | ||
|
||
```yaml | ||
SOURCE: MY_POSTGRES | ||
TARGET: MY_SNOWFLAKE | ||
|
||
defaults: | ||
mode: full-refresh | ||
object: "{stream_schema}_{stream_table}" | ||
|
||
streams: | ||
public.accounts: | ||
public.users: | ||
public.finance_departments: | ||
object: "departments" | ||
``` | ||
Or in Python: | ||
```python file=/integrations/embedded_elt/replication_config.py | ||
replication_config = { | ||
"SOURCE": "MY_POSTGRES", | ||
"TARGET": "MY_DUCKDB", | ||
"defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, | ||
"streams": { | ||
"public.accounts": None, | ||
"public.users": None, | ||
"public.finance_departments": {"object": "departments"}, | ||
}, | ||
} | ||
``` | ||
|
||
## Step 2: Creating a Sling resource | ||
|
||
Sling requires credentials in order to access your databases. While the CLI uses a `env.yaml` file to read credentials, the | ||
Dagster integration uses `Resources` to manage credentials instead. These are passed as environment variables to the Sling CLI. | ||
|
||
You will need to create a <PyObject module="dagster_embedded_elt.sling" object="SlingResource" /> object that contains references to the connections specified in the replication configuration. | ||
|
||
A <PyObject module="dagster_embedded_elt.sling" object="SlingResource" /> takes a `connections` parameter, where each <PyObject module="dagster_embedded_elt.sling" object="SlingConnectionResource" /> represents a connection to a source or target database. You may provide as many connections to the `SlingResource` as needed. | ||
|
||
The `name` parameter in the <PyObject module="dagster_embedded_elt.sling" object="SlingConnectionResource" /> should match the `SOURCE` and `TARGET` keys in the replication configuration. | ||
|
||
You may pass a connection string or arbitrary keyword arguments to the <PyObject module="dagster_embedded_elt.sling" object="SlingConnectionResource" /> to specify the connection details. See the <a href="https://docs.slingdata.io/connections/database-connections">Sling connections reference</a> for the specific connection types and parameters. | ||
|
||
```python file=/integrations/embedded_elt/sling_connection_resources.py | ||
from dagster_embedded_elt.sling.resources import ( | ||
SlingConnectionResource, | ||
SlingResource, | ||
) | ||
|
||
from dagster import EnvVar | ||
|
||
sling_resource = SlingResource( | ||
connections=[ | ||
# Using a connection string from an environment variable | ||
SlingConnectionResource( | ||
name="MY_POSTGRES", | ||
type="postgres", | ||
connection_string=EnvVar("POSTGRES_CONNECTION_STRING"), | ||
), | ||
# Using a hard-coded connection string | ||
SlingConnectionResource( | ||
name="MY_DUCKDB", | ||
type="duckdb", | ||
connection_string="duckdb:///var/tmp/duckdb.db", | ||
), | ||
# Using a keyword-argument constructor | ||
SlingConnectionResource( | ||
name="MY_SNOWFLAKE", | ||
type="snowflake", | ||
host=EnvVar("SNOWFLAKE_HOST"), | ||
user=EnvVar("SNOWFLAKE_USER"), | ||
role="REPORTING", | ||
), | ||
] | ||
) | ||
``` | ||
|
||
## Step 3: Define the Sling assets | ||
|
||
Now you can define a Sling asset using the <PyObject module="dagster_embedded_elt.sling" object="sling_assets" /> decorator. Dagster will read the replication configuration to produce Assets. | ||
|
||
Each stream will render two assets, one for the source stream and one for the target destination. You may override how assets are named by passing in a custom <PyObject module="dagster_embedded_elt.sling" object="DagsterSlingTranslator" /> object. | ||
|
||
```python file=/integrations/embedded_elt/sling_dagster_translator.py | ||
from dagster_embedded_elt import sling | ||
from dagster_embedded_elt.sling import ( | ||
DagsterSlingTranslator, | ||
SlingResource, | ||
sling_assets, | ||
) | ||
|
||
from dagster import Definitions, file_relative_path | ||
|
||
replication_config = file_relative_path(__file__, "../sling_replication.yaml") | ||
sling_resource = SlingResource(connections=[...]) # Add connections here | ||
|
||
|
||
@sling_assets(replication_config=replication_config) | ||
def my_assets(context, sling: SlingResource): | ||
yield from sling.replicate( | ||
replication_config=replication_config, | ||
dagster_sling_translator=DagsterSlingTranslator(), | ||
) | ||
for row in sling.stream_raw_logs(): | ||
context.log.info(row) | ||
|
||
|
||
defs = Definitions( | ||
assets=[ | ||
my_assets, | ||
], | ||
resources={ | ||
"sling": sling_resource, | ||
}, | ||
) | ||
``` | ||
|
||
That's it! You should now be able to view your assets in Dagster and run the replication job. | ||
|
||
## Examples | ||
|
||
This is an example of how to setup a Sling sync between Postgres and Snowflake: | ||
|
||
```python file=/integrations/embedded_elt/postgres_snowflake.py | ||
from dagster_embedded_elt.sling import ( | ||
DagsterSlingTranslator, | ||
SlingConnectionResource, | ||
SlingResource, | ||
sling_assets, | ||
) | ||
|
||
from dagster import EnvVar | ||
|
||
source = SlingConnectionResource()( | ||
name="MY_PG", | ||
type="postgres", | ||
host="localhost", | ||
port=5432, | ||
database="my_database", | ||
user="my_user", | ||
password=EnvVar("PG_PASS"), | ||
) | ||
|
||
target = SlingConnectionResource( | ||
name="MY_SF", | ||
type="snowflake", | ||
host="hostname.snowflake", | ||
user="username", | ||
database="database", | ||
password=EnvVar("SF_PASSWORD"), | ||
role="role", | ||
) | ||
|
||
sling = SlingResource( | ||
connections=[ | ||
source, | ||
target, | ||
] | ||
) | ||
replication_config = { | ||
"SOURCE": "MY_PG", | ||
"TARGET": "MY_SF", | ||
"defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, | ||
"streams": { | ||
"public.accounts": None, | ||
"public.users": None, | ||
"public.finance_departments": {"object": "departments"}, | ||
}, | ||
} | ||
|
||
|
||
@sling_assets(replication_config=replication_config) | ||
def my_assets(context, sling: SlingResource): | ||
yield from sling.replicate( | ||
replication_config=replication_config, | ||
dagster_sling_translator=DagsterSlingTranslator(), | ||
) | ||
``` | ||
|
||
Similarily, you can define file/storage connections: | ||
|
||
```python startafter=start_storage_config endbefore=end_storage_config file=/integrations/embedded_elt/s3_snowflake.py | ||
source = SlingConnectionResource()( | ||
name="MY_S3", | ||
type="s3", | ||
bucket="sling-bucket", | ||
access_key_id=EnvVar("AWS_ACCESS_KEY_ID"), | ||
secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"), | ||
) | ||
|
||
sling = SlingResource(connections=[source, target]) | ||
|
||
replication_config = { | ||
"SOURCE": "MY_S3", | ||
"TARGET": "MY_SF", | ||
"defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, | ||
"streams": { | ||
"s3://my-bucket/my_file.parquet": { | ||
"object": "marts.my_table", | ||
"primary_key": "id", | ||
}, | ||
}, | ||
} | ||
|
||
|
||
@sling_assets | ||
def my_assets(context, sling: SlingResource): | ||
yield from sling.replicate( | ||
replication_config=replication_config, | ||
dagster_sling_translator=DagsterSlingTranslator(), | ||
) | ||
``` | ||
|
||
## Relevant APIs | ||
|
||
| Name | Description | | ||
| --------------------------------------------------------------------------------- | ------------------------------------------------------------------------------ | | ||
| <PyObject module="dagster_embedded_elt.sling" object="sling_assets" /> | The core Sling asset factory for building syncs | | ||
| <PyObject module="dagster_embedded_elt.sling" object="SlingResource" /> | The Sling Resource used for handing credentials to databases and object stores | | ||
| <PyObject module="dagster_embedded_elt.sling" object="DagsterSlingTranslator" /> | A translator for specifying how to map between Sling and Dagster types | | ||
| <PyObject module="dagster_embedded_elt.sling" object="SlingConnectionResource" /> | A Sling connection resource for specifying the connection details | | ||
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters