-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[docs] Add docs for reworked Fivetran API (#26026)
## Summary & Motivation As title ## How I Tested These Changes See preview
- Loading branch information
1 parent
d442b3a
commit d51e2d8
Showing
14 changed files
with
321 additions
and
13 deletions.
There are no files selected for viewing
Binary file not shown.
Binary file not shown.
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,184 @@ | ||
--- | ||
title: "Using Dagster with Fivetran" | ||
description: Represent your Fivetran connectors in Dagster | ||
--- | ||
|
||
# Using Dagster with Fivetran | ||
|
||
This guide provides instructions for using Dagster with Fivetran using the `dagster-fivetran` library. Your Fivetran connector tables can be represented as assets in the Dagster asset graph, allowing you to track lineage and dependencies between Fivetran assets and data assets you are already modeling in Dagster. You can also use Dagster to orchestrate Fivetran connectors, allowing you to trigger syncs for these on a cadence or based on upstream data changes. | ||
|
||
## What you'll learn | ||
|
||
- How to represent Fivetran assets in the Dagster asset graph, including lineage to other Dagster assets. | ||
- How to customize asset definition metadata for these Fivetran assets. | ||
- How to materialize Fivetran connector tables from Dagster. | ||
- How to customize how Fivetran connector tables are materialized. | ||
|
||
<details> | ||
<summary>Prerequisites</summary> | ||
|
||
- The `dagster` and `dagster-fivetran` libraries installed in your environment | ||
- Familiarity with asset definitions and the Dagster asset graph | ||
- Familiarity with Dagster resources | ||
- Familiarity with Fivetran concepts, like connectors and connector tables | ||
- A Fivetran workspace | ||
- A Fivetran API key and API secret. For more information, see [Getting Started](https://fivetran.com/docs/rest-api/getting-started) in the Fivetran REST API documentation. | ||
|
||
</details> | ||
|
||
## Represent Fivetran assets in the asset graph | ||
|
||
To load Fivetran assets into the Dagster asset graph, you must first construct a <PyObject module="dagster_fivetran" object="FivetranWorkspace" /> resource, which allows Dagster to communicate with your Fivetran workspace. You'll need to supply your account ID, API key and API secret. See [Getting Started](https://fivetran.com/docs/rest-api/getting-started) in the Fivetran REST API documentation for more information on how to create your API key and API secret. | ||
|
||
Dagster can automatically load all connector tables from your Fivetran workspace as asset specs. Call the <PyObject module="dagster_fivetran" method="load_fivetran_asset_specs" /> function, which returns list of <PyObject object="AssetSpec" />s representing your Fivetran assets. You can then include these asset specs in your <PyObject object="Definitions" /> object: | ||
|
||
```python file=/integrations/fivetran/representing_fivetran_assets.py | ||
from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs | ||
|
||
import dagster as dg | ||
|
||
fivetran_workspace = FivetranWorkspace( | ||
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"), | ||
api_key=dg.EnvVar("FIVETRAN_API_KEY"), | ||
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"), | ||
) | ||
|
||
fivetran_specs = load_fivetran_asset_specs(fivetran_workspace) | ||
defs = dg.Definitions(assets=fivetran_specs, resources={"fivetran": fivetran_workspace}) | ||
``` | ||
|
||
### Sync and materialize Fivetran assets | ||
|
||
You can use Dagster to sync Fivetran connectors and materialize Fivetran connector tables. You can use the <PyObject module="dagster_fivetran" method="build_fivetran_assets_definitions" /> factory to create all assets definitions for your Fivetran workspace. | ||
|
||
```python file=/integrations/fivetran/sync_and_materialize_fivetran_assets.py | ||
from dagster_fivetran import FivetranWorkspace, build_fivetran_assets_definitions | ||
|
||
import dagster as dg | ||
|
||
fivetran_workspace = FivetranWorkspace( | ||
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"), | ||
api_key=dg.EnvVar("FIVETRAN_API_KEY"), | ||
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"), | ||
) | ||
|
||
all_fivetran_assets = build_fivetran_assets_definitions(workspace=fivetran_workspace) | ||
|
||
defs = dg.Definitions( | ||
assets=all_fivetran_assets, | ||
resources={"fivetran": fivetran_workspace}, | ||
) | ||
``` | ||
|
||
### Customize the materialization of Fivetran assets | ||
|
||
If you want to customize the sync of your connectors, you can use the <PyObject module="dagster_fivetran" method="fivetran_assets" /> decorator to do so. This allows you to execute custom code before and after the call to the fivetran sync. | ||
|
||
```python file=/integrations/fivetran/customize_fivetran_asset_defs.py | ||
from dagster_fivetran import FivetranWorkspace, fivetran_assets | ||
|
||
import dagster as dg | ||
|
||
fivetran_workspace = FivetranWorkspace( | ||
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"), | ||
api_key=dg.EnvVar("FIVETRAN_API_KEY"), | ||
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"), | ||
) | ||
|
||
|
||
@fivetran_assets( | ||
connector_id="fivetran_connector_id", | ||
name="fivetran_connector_id", | ||
group_name="fivetran_connector_id", | ||
workspace=fivetran_workspace, | ||
) | ||
def fivetran_connector_assets( | ||
context: dg.AssetExecutionContext, fivetran: FivetranWorkspace | ||
): | ||
# Do something before the materialization... | ||
yield from fivetran.sync_and_poll(context=context) | ||
# Do something after the materialization... | ||
|
||
|
||
defs = dg.Definitions( | ||
assets=[fivetran_connector_assets], | ||
resources={"fivetran": fivetran_workspace}, | ||
) | ||
``` | ||
|
||
### Customize asset definition metadata for Fivetran assets | ||
|
||
By default, Dagster will generate asset specs for each Fivetran asset and populate default metadata. You can further customize asset properties by passing an instance of the custom <PyObject module="dagster_fivetran" object="DagsterFivetranTranslator" /> to the <PyObject module="dagster_fivetran" method="load_fivetran_asset_specs" /> function. | ||
|
||
```python file=/integrations/fivetran/customize_fivetran_translator_asset_spec.py | ||
from dagster_fivetran import ( | ||
DagsterFivetranTranslator, | ||
FivetranConnectorTableProps, | ||
FivetranWorkspace, | ||
load_fivetran_asset_specs, | ||
) | ||
|
||
import dagster as dg | ||
|
||
fivetran_workspace = FivetranWorkspace( | ||
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"), | ||
api_key=dg.EnvVar("FIVETRAN_API_KEY"), | ||
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"), | ||
) | ||
|
||
|
||
# A translator class lets us customize properties of the built | ||
# Fivetran assets, such as the owners or asset key | ||
class MyCustomFivetranTranslator(DagsterFivetranTranslator): | ||
def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec: | ||
# We create the default asset spec using super() | ||
default_spec = super().get_asset_spec(props) | ||
# We customize the metadata and asset key prefix for all assets | ||
return default_spec.replace_attributes( | ||
key=default_spec.key.with_prefix("prefix"), | ||
).merge_attributes(metadata={"custom": "metadata"}) | ||
|
||
|
||
fivetran_specs = load_fivetran_asset_specs( | ||
fivetran_workspace, dagster_fivetran_translator=MyCustomFivetranTranslator() | ||
) | ||
|
||
defs = dg.Definitions(assets=fivetran_specs, resources={"fivetran": fivetran_workspace}) | ||
``` | ||
|
||
Note that `super()` is called in each of the overridden methods to generate the default asset spec. It is best practice to generate the default asset spec before customizing it. | ||
|
||
You can pass an instance of the custom <PyObject module="dagster_fivetran" object="DagsterFivetranTranslator" /> to the <PyObject module="dagster_fivetran" method="fivetran_assets" /> decorator or the <PyObject module="dagster_fivetran" method="build_fivetran_assets_definitions" /> factory. | ||
|
||
### Load Fivetran assets from multiple workspaces | ||
|
||
Definitions from multiple Fivetran workspaces can be combined by instantiating multiple <PyObject module="dagster_fivetran" object="FivetranWorkspace" /> resources and merging their specs. This lets you view all your Fivetran assets in a single asset graph: | ||
|
||
```python file=/integrations/fivetran/multiple_fivetran_workspaces.py | ||
from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs | ||
|
||
import dagster as dg | ||
|
||
sales_fivetran_workspace = FivetranWorkspace( | ||
account_id=dg.EnvVar("FIVETRAN_SALES_ACCOUNT_ID"), | ||
api_key=dg.EnvVar("FIVETRAN_SALES_API_KEY"), | ||
api_secret=dg.EnvVar("FIVETRAN_SALES_API_SECRET"), | ||
) | ||
marketing_fivetran_workspace = FivetranWorkspace( | ||
account_id=dg.EnvVar("FIVETRAN_MARKETING_ACCOUNT_ID"), | ||
api_key=dg.EnvVar("FIVETRAN_MARKETING_API_KEY"), | ||
api_secret=dg.EnvVar("FIVETRAN_MARKETING_API_SECRET"), | ||
) | ||
|
||
sales_fivetran_specs = load_fivetran_asset_specs(sales_fivetran_workspace) | ||
marketing_fivetran_specs = load_fivetran_asset_specs(marketing_fivetran_workspace) | ||
|
||
# Merge the specs into a single set of definitions | ||
defs = dg.Definitions( | ||
assets=[*sales_fivetran_specs, *marketing_fivetran_specs], | ||
resources={ | ||
"marketing_fivetran": marketing_fivetran_workspace, | ||
"sales_fivetran": sales_fivetran_workspace, | ||
}, | ||
) | ||
``` |
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
29 changes: 29 additions & 0 deletions
29
examples/docs_snippets/docs_snippets/integrations/fivetran/customize_fivetran_asset_defs.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
from dagster_fivetran import FivetranWorkspace, fivetran_assets | ||
|
||
import dagster as dg | ||
|
||
fivetran_workspace = FivetranWorkspace( | ||
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"), | ||
api_key=dg.EnvVar("FIVETRAN_API_KEY"), | ||
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"), | ||
) | ||
|
||
|
||
@fivetran_assets( | ||
connector_id="fivetran_connector_id", | ||
name="fivetran_connector_id", | ||
group_name="fivetran_connector_id", | ||
workspace=fivetran_workspace, | ||
) | ||
def fivetran_connector_assets( | ||
context: dg.AssetExecutionContext, fivetran: FivetranWorkspace | ||
): | ||
# Do something before the materialization... | ||
yield from fivetran.sync_and_poll(context=context) | ||
# Do something after the materialization... | ||
|
||
|
||
defs = dg.Definitions( | ||
assets=[fivetran_connector_assets], | ||
resources={"fivetran": fivetran_workspace}, | ||
) |
33 changes: 33 additions & 0 deletions
33
..._snippets/docs_snippets/integrations/fivetran/customize_fivetran_translator_asset_spec.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
from dagster_fivetran import ( | ||
DagsterFivetranTranslator, | ||
FivetranConnectorTableProps, | ||
FivetranWorkspace, | ||
load_fivetran_asset_specs, | ||
) | ||
|
||
import dagster as dg | ||
|
||
fivetran_workspace = FivetranWorkspace( | ||
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"), | ||
api_key=dg.EnvVar("FIVETRAN_API_KEY"), | ||
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"), | ||
) | ||
|
||
|
||
# A translator class lets us customize properties of the built | ||
# Fivetran assets, such as the owners or asset key | ||
class MyCustomFivetranTranslator(DagsterFivetranTranslator): | ||
def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec: | ||
# We create the default asset spec using super() | ||
default_spec = super().get_asset_spec(props) | ||
# We customize the metadata and asset key prefix for all assets | ||
return default_spec.replace_attributes( | ||
key=default_spec.key.with_prefix("prefix"), | ||
).merge_attributes(metadata={"custom": "metadata"}) | ||
|
||
|
||
fivetran_specs = load_fivetran_asset_specs( | ||
fivetran_workspace, dagster_fivetran_translator=MyCustomFivetranTranslator() | ||
) | ||
|
||
defs = dg.Definitions(assets=fivetran_specs, resources={"fivetran": fivetran_workspace}) |
26 changes: 26 additions & 0 deletions
26
examples/docs_snippets/docs_snippets/integrations/fivetran/multiple_fivetran_workspaces.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs | ||
|
||
import dagster as dg | ||
|
||
sales_fivetran_workspace = FivetranWorkspace( | ||
account_id=dg.EnvVar("FIVETRAN_SALES_ACCOUNT_ID"), | ||
api_key=dg.EnvVar("FIVETRAN_SALES_API_KEY"), | ||
api_secret=dg.EnvVar("FIVETRAN_SALES_API_SECRET"), | ||
) | ||
marketing_fivetran_workspace = FivetranWorkspace( | ||
account_id=dg.EnvVar("FIVETRAN_MARKETING_ACCOUNT_ID"), | ||
api_key=dg.EnvVar("FIVETRAN_MARKETING_API_KEY"), | ||
api_secret=dg.EnvVar("FIVETRAN_MARKETING_API_SECRET"), | ||
) | ||
|
||
sales_fivetran_specs = load_fivetran_asset_specs(sales_fivetran_workspace) | ||
marketing_fivetran_specs = load_fivetran_asset_specs(marketing_fivetran_workspace) | ||
|
||
# Merge the specs into a single set of definitions | ||
defs = dg.Definitions( | ||
assets=[*sales_fivetran_specs, *marketing_fivetran_specs], | ||
resources={ | ||
"marketing_fivetran": marketing_fivetran_workspace, | ||
"sales_fivetran": sales_fivetran_workspace, | ||
}, | ||
) |
12 changes: 12 additions & 0 deletions
12
examples/docs_snippets/docs_snippets/integrations/fivetran/representing_fivetran_assets.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs | ||
|
||
import dagster as dg | ||
|
||
fivetran_workspace = FivetranWorkspace( | ||
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"), | ||
api_key=dg.EnvVar("FIVETRAN_API_KEY"), | ||
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"), | ||
) | ||
|
||
fivetran_specs = load_fivetran_asset_specs(fivetran_workspace) | ||
defs = dg.Definitions(assets=fivetran_specs, resources={"fivetran": fivetran_workspace}) |
16 changes: 16 additions & 0 deletions
16
...docs_snippets/docs_snippets/integrations/fivetran/sync_and_materialize_fivetran_assets.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
from dagster_fivetran import FivetranWorkspace, build_fivetran_assets_definitions | ||
|
||
import dagster as dg | ||
|
||
fivetran_workspace = FivetranWorkspace( | ||
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"), | ||
api_key=dg.EnvVar("FIVETRAN_API_KEY"), | ||
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"), | ||
) | ||
|
||
all_fivetran_assets = build_fivetran_assets_definitions(workspace=fivetran_workspace) | ||
|
||
defs = dg.Definitions( | ||
assets=all_fivetran_assets, | ||
resources={"fivetran": fivetran_workspace}, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
d51e2d8
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deploy preview for dagster-docs ready!
✅ Preview
https://dagster-docs-hicd3w2pa-elementl.vercel.app
https://master.dagster.dagster-docs.io
Built with commit d51e2d8.
This pull request is being automatically deployed with vercel-action
d51e2d8
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deploy preview for dagster-docs-beta ready!
✅ Preview
https://dagster-docs-beta-e05pr2cvy-elementl.vercel.app
Built with commit d51e2d8.
This pull request is being automatically deployed with vercel-action