Skip to content

Commit

Permalink
[dagster-fivetran] Use Fivetran translator instance in load specs fn …
Browse files Browse the repository at this point in the history
…and state-backed defs
  • Loading branch information
maximearmstrong committed Nov 25, 2024
1 parent 9f2b4ff commit c4d02f2
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Callable, Optional, Type
from typing import Any, Callable, Optional

from dagster import AssetsDefinition, multi_asset
from dagster._annotations import experimental
Expand Down Expand Up @@ -107,7 +107,8 @@ def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: Fivet
specs=[
spec
for spec in workspace.load_asset_specs(
dagster_fivetran_translator=dagster_fivetran_translator or DagsterFivetranTranslator()
dagster_fivetran_translator=dagster_fivetran_translator
or DagsterFivetranTranslator()
)
if FivetranMetadataSet.extract(spec.metadata).connector_id == connector_id
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
from datetime import datetime, timedelta
from functools import partial
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type, Union
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Union
from urllib.parse import urljoin

import requests
Expand Down Expand Up @@ -924,10 +924,9 @@ def load_asset_specs(
fivetran_specs = fivetran_workspace.load_asset_specs()
defs = dg.Definitions(assets=[*fivetran_specs], resources={"fivetran": fivetran_workspace}
"""
dagster_fivetran_translator = dagster_fivetran_translator or DagsterFivetranTranslator()

return load_fivetran_asset_specs(
workspace=self, dagster_fivetran_translator=dagster_fivetran_translator.__class__
workspace=self,
dagster_fivetran_translator=dagster_fivetran_translator or DagsterFivetranTranslator(),
)

def sync_and_poll(
Expand All @@ -939,14 +938,15 @@ def sync_and_poll(
@experimental
def load_fivetran_asset_specs(
workspace: FivetranWorkspace,
dagster_fivetran_translator: Type[DagsterFivetranTranslator] = DagsterFivetranTranslator,
dagster_fivetran_translator: Optional[DagsterFivetranTranslator] = None,
) -> Sequence[AssetSpec]:
"""Returns a list of AssetSpecs representing the Fivetran content in the workspace.
Args:
workspace (FivetranWorkspace): The Fivetran workspace to fetch assets from.
dagster_fivetran_translator (Type[DagsterFivetranTranslator]): The translator to use
to convert Fivetran content into AssetSpecs. Defaults to DagsterFivetranTranslator.
dagster_fivetran_translator (Optional[DagsterFivetranTranslator], optional): The translator to use
to convert Fivetran content into :py:class:`dagster.AssetSpec`.
Defaults to :py:class:`DagsterFivetranTranslator`.
Returns:
List[AssetSpec]: The set of assets representing the Fivetran content in the workspace.
Expand All @@ -972,7 +972,7 @@ def load_fivetran_asset_specs(
return check.is_list(
FivetranWorkspaceDefsLoader(
workspace=initialized_workspace,
translator_cls=dagster_fivetran_translator,
translator=dagster_fivetran_translator or DagsterFivetranTranslator(),
)
.build_defs()
.assets,
Expand All @@ -983,7 +983,7 @@ def load_fivetran_asset_specs(
@record
class FivetranWorkspaceDefsLoader(StateBackedDefinitionsLoader[Mapping[str, Any]]):
workspace: FivetranWorkspace
translator_cls: Type[DagsterFivetranTranslator]
translator: DagsterFivetranTranslator

@property
def defs_key(self) -> str:
Expand All @@ -993,10 +993,8 @@ def fetch_state(self) -> FivetranWorkspaceData:
return self.workspace.fetch_fivetran_workspace_data()

def defs_from_state(self, state: FivetranWorkspaceData) -> Definitions:
translator = self.translator_cls()

all_asset_specs = [
translator.get_asset_spec(props)
self.translator.get_asset_spec(props)
for props in state.to_fivetran_connector_table_props_data()
]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import responses
from dagster._config.field_utils import EnvVar
from dagster._core.definitions.asset_spec import AssetSpec, replace_attributes
from dagster._core.test_utils import environ
from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs
from dagster_fivetran import (
DagsterFivetranTranslator,
FivetranConnectorTableProps,
FivetranWorkspace,
load_fivetran_asset_specs,
)
from dagster_fivetran.asset_defs import build_fivetran_assets_definitions
from dagster_fivetran.translator import FivetranMetadataSet

Expand Down Expand Up @@ -112,3 +118,38 @@ def test_cached_load_spec_with_asset_factory(
# then load_fivetran_asset_specs is called once per connector ID in fivetran_assets
build_fivetran_assets_definitions(workspace=resource)
assert len(fetch_workspace_data_api_mocks.calls) == 4


class MyCustomTranslator(DagsterFivetranTranslator):
def get_asset_spec(self, data: FivetranConnectorTableProps) -> AssetSpec:
default_spec = super().get_asset_spec(data)
return replace_attributes(
default_spec,
key=default_spec.key.with_prefix("prefix"),
metadata={**default_spec.metadata, "custom": "metadata"},
)


def test_translator_custom_metadata(
fetch_workspace_data_api_mocks: responses.RequestsMock,
) -> None:
with environ({"FIVETRAN_API_KEY": TEST_API_KEY, "FIVETRAN_API_SECRET": TEST_API_SECRET}):
workspace = FivetranWorkspace(
account_id=TEST_ACCOUNT_ID,
api_key=EnvVar("FIVETRAN_API_KEY"),
api_secret=EnvVar("FIVETRAN_API_SECRET"),
)

all_asset_specs = load_fivetran_asset_specs(
workspace=workspace, dagster_fivetran_translator=MyCustomTranslator()
)
asset_spec = next(spec for spec in all_asset_specs)

assert "custom" in asset_spec.metadata
assert asset_spec.metadata["custom"] == "metadata"
assert asset_spec.key.path == [
"prefix",
"schema_name_in_destination_1",
"table_name_in_destination_1",
]
assert "dagster/kind/fivetran" in asset_spec.tags
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
from typing import Callable

from dagster_fivetran import FivetranWorkspace
import responses
from dagster._config.field_utils import EnvVar
from dagster._core.definitions.asset_spec import AssetSpec, replace_attributes
from dagster._core.test_utils import environ
from dagster_fivetran import (
DagsterFivetranTranslator,
FivetranConnectorTableProps,
FivetranWorkspace,
)

from dagster_fivetran_tests.experimental.conftest import (
TEST_ACCOUNT_ID,
Expand All @@ -23,3 +31,40 @@ def test_fivetran_workspace_data_to_fivetran_connector_table_props_data(
assert table_props_data[1].table == "schema_name_in_destination_1.table_name_in_destination_2"
assert table_props_data[2].table == "schema_name_in_destination_2.table_name_in_destination_1"
assert table_props_data[3].table == "schema_name_in_destination_2.table_name_in_destination_2"


class MyCustomTranslator(DagsterFivetranTranslator):
def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec:
default_spec = super().get_asset_spec(props)
return replace_attributes(
default_spec,
key=default_spec.key.with_prefix("prefix"),
metadata={**default_spec.metadata, "custom": "metadata"},
)


def test_translator_custom_metadata(
fetch_workspace_data_api_mocks: responses.RequestsMock,
) -> None:
with environ({"FIVETRAN_API_KEY": TEST_API_KEY, "FIVETRAN_API_SECRET": TEST_API_SECRET}):
resource = FivetranWorkspace(
account_id=TEST_ACCOUNT_ID,
api_key=EnvVar("FIVETRAN_API_KEY"),
api_secret=EnvVar("FIVETRAN_API_SECRET"),
)

actual_workspace_data = resource.fetch_fivetran_workspace_data()
table_props_data = actual_workspace_data.to_fivetran_connector_table_props_data()

first_table_props_data = next(props for props in table_props_data)

asset_spec = MyCustomTranslator().get_asset_spec(first_table_props_data)

assert "custom" in asset_spec.metadata
assert asset_spec.metadata["custom"] == "metadata"
assert asset_spec.key.path == [
"prefix",
"schema_name_in_destination_1",
"table_name_in_destination_1",
]
assert "dagster/kind/fivetran" in asset_spec.tags

0 comments on commit c4d02f2

Please sign in to comment.