Skip to content

Commit

Permalink
[7/n][dagster-fivetran] Implement load_fivetran_asset_specs (#25808)
Browse files Browse the repository at this point in the history
## Summary & Motivation

This PR implements the `load_fivetran_asset_specs` function:

```python
from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs

import dagster as dg

# Connect to Fivetran using the credentials
fivetran_workspace = FivetranWorkspace(
    account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
    api_key=dg.EnvVar("FIVETRAN_API_KEY"),
    api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)

fivetran_specs = load_fivetran_asset_specs(fivetran_workspace)
defs = dg.Definitions(assets=[*fivetran_specs], resources={"fivetran": fivetran_workspace}
```

## How I Tested These Changes

Additional unit test

## Changelog

[dagster-fivetran] The `load_fivetran_asset_specs` function is added. It
can be used with the `FivetranWorkspace` resource and
`DagsterFivetranTranslator` translator to load your Fivetran connector
tables as external assets in Dagster.
  • Loading branch information
maximearmstrong authored Nov 13, 2024
1 parent e0f097e commit e500fa9
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
FivetranResource as FivetranResource,
FivetranWorkspace as FivetranWorkspace,
fivetran_resource as fivetran_resource,
load_fivetran_asset_specs as load_fivetran_asset_specs,
)
from dagster_fivetran.translator import DagsterFivetranTranslator as DagsterFivetranTranslator
from dagster_fivetran.types import FivetranOutput as FivetranOutput
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)
from dagster._annotations import experimental
from dagster._config.pythonic_config import ConfigurableResource
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._record import record
Expand Down Expand Up @@ -683,6 +684,50 @@ def fetch_fivetran_workspace_data(
return FivetranWorkspaceData.from_content_data(connectors + destinations)


@experimental
def load_fivetran_asset_specs(
workspace: FivetranWorkspace,
dagster_fivetran_translator: Type[DagsterFivetranTranslator] = DagsterFivetranTranslator,
) -> Sequence[AssetSpec]:
"""Returns a list of AssetSpecs representing the Fivetran content in the workspace.
Args:
workspace (FivetranWorkspace): The Fivetran workspace to fetch assets from.
dagster_fivetran_translator (Type[DagsterFivetranTranslator]): The translator to use
to convert Fivetran content into AssetSpecs. Defaults to DagsterFivetranTranslator.
Returns:
List[AssetSpec]: The set of assets representing the Fivetran content in the workspace.
Examples:
Loading the asset specs for a given Fivetran workspace:
.. code-block:: python
from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs
import dagster as dg
fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)
fivetran_specs = load_fivetran_asset_specs(fivetran_workspace)
defs = dg.Definitions(assets=[*fivetran_specs], resources={"fivetran": fivetran_workspace}
"""
with workspace.process_config_and_initialize_cm() as initialized_workspace:
return check.is_list(
FivetranWorkspaceDefsLoader(
workspace=initialized_workspace,
translator_cls=dagster_fivetran_translator,
)
.build_defs()
.assets,
AssetSpec,
)


@record
class FivetranWorkspaceDefsLoader(StateBackedDefinitionsLoader[Mapping[str, Any]]):
workspace: FivetranWorkspace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class FivetranConnectorTableProps(NamedTuple):
service: Optional[str]


@whitelist_for_serdes
class FivetranContentType(Enum):
"""Enum representing each object in Fivetran's ontology."""

Expand All @@ -42,6 +43,7 @@ class FivetranContentData:
properties: Mapping[str, Any]


@whitelist_for_serdes
@record
class FivetranWorkspaceData:
"""A record representing all content in a Fivetran workspace.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import responses
from dagster_fivetran import FivetranWorkspace
from dagster._config.field_utils import EnvVar
from dagster._core.test_utils import environ
from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs

from dagster_fivetran_tests.experimental.conftest import (
TEST_ACCOUNT_ID,
Expand All @@ -18,3 +20,28 @@ def test_fetch_fivetran_workspace_data(
actual_workspace_data = resource.fetch_fivetran_workspace_data()
assert len(actual_workspace_data.connectors_by_id) == 1
assert len(actual_workspace_data.destinations_by_id) == 1


def test_translator_spec(
fetch_workspace_data_api_mocks: responses.RequestsMock,
) -> None:
with environ({"FIVETRAN_API_KEY": TEST_API_KEY, "FIVETRAN_API_SECRET": TEST_API_SECRET}):
resource = FivetranWorkspace(
account_id=TEST_ACCOUNT_ID,
api_key=EnvVar("FIVETRAN_API_KEY"),
api_secret=EnvVar("FIVETRAN_API_SECRET"),
)

all_assets = load_fivetran_asset_specs(resource)
all_assets_keys = [asset.key for asset in all_assets]

# 4 tables for the connector
assert len(all_assets) == 4
assert len(all_assets_keys) == 4

# Sanity check outputs, translator tests cover details here
first_asset_key = next(key for key in all_assets_keys)
assert first_asset_key.path == [
"schema_name_in_destination_1",
"table_name_in_destination_1",
]

0 comments on commit e500fa9

Please sign in to comment.