Skip to content

Commit

Permalink
[dagster-fivetran] Use Fivetran translator instance in load specs fn …
Browse files Browse the repository at this point in the history
…and state-backed defs (#26133)

## Summary & Motivation

Updates load_fivetran_asset_specs() and state-backed definitions to
accept an instance of `DagsterFivetranTranslator`.

See more about the motivation in the original thread
[here](#25944 (comment)).

## How I Tested These Changes

Additional unit tests to test custom translators with BK

## Changelog

[dagster-fivetran] `load_fivetran_asset_specs` is updated to accept an
instance of `DagsterFivetranTranslator` or custom subclass.
  • Loading branch information
maximearmstrong authored Dec 5, 2024
1 parent c1950dc commit 11b0bff
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
from datetime import datetime, timedelta
from functools import partial
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type, Union
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Union
from urllib.parse import urljoin

import requests
Expand Down Expand Up @@ -924,10 +924,9 @@ def load_asset_specs(
fivetran_specs = fivetran_workspace.load_asset_specs()
defs = dg.Definitions(assets=[*fivetran_specs], resources={"fivetran": fivetran_workspace}
"""
dagster_fivetran_translator = dagster_fivetran_translator or DagsterFivetranTranslator()

return load_fivetran_asset_specs(
workspace=self, dagster_fivetran_translator=dagster_fivetran_translator.__class__
workspace=self,
dagster_fivetran_translator=dagster_fivetran_translator or DagsterFivetranTranslator(),
)

def sync_and_poll(
Expand All @@ -939,14 +938,15 @@ def sync_and_poll(
@experimental
def load_fivetran_asset_specs(
workspace: FivetranWorkspace,
dagster_fivetran_translator: Type[DagsterFivetranTranslator] = DagsterFivetranTranslator,
dagster_fivetran_translator: Optional[DagsterFivetranTranslator] = None,
) -> Sequence[AssetSpec]:
"""Returns a list of AssetSpecs representing the Fivetran content in the workspace.
Args:
workspace (FivetranWorkspace): The Fivetran workspace to fetch assets from.
dagster_fivetran_translator (Type[DagsterFivetranTranslator]): The translator to use
to convert Fivetran content into AssetSpecs. Defaults to DagsterFivetranTranslator.
dagster_fivetran_translator (Optional[DagsterFivetranTranslator], optional): The translator to use
to convert Fivetran content into :py:class:`dagster.AssetSpec`.
Defaults to :py:class:`DagsterFivetranTranslator`.
Returns:
List[AssetSpec]: The set of assets representing the Fivetran content in the workspace.
Expand All @@ -972,7 +972,7 @@ def load_fivetran_asset_specs(
return check.is_list(
FivetranWorkspaceDefsLoader(
workspace=initialized_workspace,
translator_cls=dagster_fivetran_translator,
translator=dagster_fivetran_translator or DagsterFivetranTranslator(),
)
.build_defs()
.assets,
Expand All @@ -983,7 +983,7 @@ def load_fivetran_asset_specs(
@record
class FivetranWorkspaceDefsLoader(StateBackedDefinitionsLoader[Mapping[str, Any]]):
workspace: FivetranWorkspace
translator_cls: Type[DagsterFivetranTranslator]
translator: DagsterFivetranTranslator

@property
def defs_key(self) -> str:
Expand All @@ -993,10 +993,8 @@ def fetch_state(self) -> FivetranWorkspaceData:
return self.workspace.fetch_fivetran_workspace_data()

def defs_from_state(self, state: FivetranWorkspaceData) -> Definitions:
translator = self.translator_cls()

all_asset_specs = [
translator.get_asset_spec(props)
self.translator.get_asset_spec(props)
for props in state.to_fivetran_connector_table_props_data()
]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import responses
from dagster._config.field_utils import EnvVar
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.test_utils import environ
from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs
from dagster_fivetran import (
DagsterFivetranTranslator,
FivetranConnectorTableProps,
FivetranWorkspace,
load_fivetran_asset_specs,
)
from dagster_fivetran.asset_defs import build_fivetran_assets_definitions
from dagster_fivetran.translator import FivetranMetadataSet

Expand Down Expand Up @@ -112,3 +118,37 @@ def test_cached_load_spec_with_asset_factory(
# then load_fivetran_asset_specs is called once per connector ID in fivetran_assets
build_fivetran_assets_definitions(workspace=resource)
assert len(fetch_workspace_data_api_mocks.calls) == 4


class MyCustomTranslator(DagsterFivetranTranslator):
def get_asset_spec(self, data: FivetranConnectorTableProps) -> AssetSpec:
default_spec = super().get_asset_spec(data)
return default_spec.replace_attributes(
key=default_spec.key.with_prefix("prefix"),
metadata={**default_spec.metadata, "custom": "metadata"},
)


def test_translator_custom_metadata(
fetch_workspace_data_api_mocks: responses.RequestsMock,
) -> None:
with environ({"FIVETRAN_API_KEY": TEST_API_KEY, "FIVETRAN_API_SECRET": TEST_API_SECRET}):
workspace = FivetranWorkspace(
account_id=TEST_ACCOUNT_ID,
api_key=EnvVar("FIVETRAN_API_KEY"),
api_secret=EnvVar("FIVETRAN_API_SECRET"),
)

all_asset_specs = load_fivetran_asset_specs(
workspace=workspace, dagster_fivetran_translator=MyCustomTranslator()
)
asset_spec = next(spec for spec in all_asset_specs)

assert "custom" in asset_spec.metadata
assert asset_spec.metadata["custom"] == "metadata"
assert asset_spec.key.path == [
"prefix",
"schema_name_in_destination_1",
"table_name_in_destination_1",
]
assert "dagster/kind/fivetran" in asset_spec.tags
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
from typing import Callable

from dagster_fivetran import FivetranWorkspace
import responses
from dagster._config.field_utils import EnvVar
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.test_utils import environ
from dagster_fivetran import (
DagsterFivetranTranslator,
FivetranConnectorTableProps,
FivetranWorkspace,
)

from dagster_fivetran_tests.experimental.conftest import (
TEST_ACCOUNT_ID,
Expand All @@ -23,3 +31,39 @@ def test_fivetran_workspace_data_to_fivetran_connector_table_props_data(
assert table_props_data[1].table == "schema_name_in_destination_1.table_name_in_destination_2"
assert table_props_data[2].table == "schema_name_in_destination_2.table_name_in_destination_1"
assert table_props_data[3].table == "schema_name_in_destination_2.table_name_in_destination_2"


class MyCustomTranslator(DagsterFivetranTranslator):
def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec:
default_spec = super().get_asset_spec(props)
return default_spec.replace_attributes(
key=default_spec.key.with_prefix("prefix"),
metadata={**default_spec.metadata, "custom": "metadata"},
)


def test_translator_custom_metadata(
fetch_workspace_data_api_mocks: responses.RequestsMock,
) -> None:
with environ({"FIVETRAN_API_KEY": TEST_API_KEY, "FIVETRAN_API_SECRET": TEST_API_SECRET}):
resource = FivetranWorkspace(
account_id=TEST_ACCOUNT_ID,
api_key=EnvVar("FIVETRAN_API_KEY"),
api_secret=EnvVar("FIVETRAN_API_SECRET"),
)

actual_workspace_data = resource.fetch_fivetran_workspace_data()
table_props_data = actual_workspace_data.to_fivetran_connector_table_props_data()

first_table_props_data = next(props for props in table_props_data)

asset_spec = MyCustomTranslator().get_asset_spec(first_table_props_data)

assert "custom" in asset_spec.metadata
assert asset_spec.metadata["custom"] == "metadata"
assert asset_spec.key.path == [
"prefix",
"schema_name_in_destination_1",
"table_name_in_destination_1",
]
assert "dagster/kind/fivetran" in asset_spec.tags

0 comments on commit 11b0bff

Please sign in to comment.