Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-fivetran] Use Fivetran translator instance in load specs fn and state-backed defs #26133

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
from datetime import datetime, timedelta
from functools import partial
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type, Union
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Union
from urllib.parse import urljoin

import requests
Expand Down Expand Up @@ -924,10 +924,9 @@ def load_asset_specs(
fivetran_specs = fivetran_workspace.load_asset_specs()
defs = dg.Definitions(assets=[*fivetran_specs], resources={"fivetran": fivetran_workspace}
"""
dagster_fivetran_translator = dagster_fivetran_translator or DagsterFivetranTranslator()

return load_fivetran_asset_specs(
workspace=self, dagster_fivetran_translator=dagster_fivetran_translator.__class__
workspace=self,
dagster_fivetran_translator=dagster_fivetran_translator or DagsterFivetranTranslator(),
)

def sync_and_poll(
Expand All @@ -939,14 +938,15 @@ def sync_and_poll(
@experimental
def load_fivetran_asset_specs(
workspace: FivetranWorkspace,
dagster_fivetran_translator: Type[DagsterFivetranTranslator] = DagsterFivetranTranslator,
dagster_fivetran_translator: Optional[DagsterFivetranTranslator] = None,
) -> Sequence[AssetSpec]:
"""Returns a list of AssetSpecs representing the Fivetran content in the workspace.
Args:
workspace (FivetranWorkspace): The Fivetran workspace to fetch assets from.
dagster_fivetran_translator (Type[DagsterFivetranTranslator]): The translator to use
to convert Fivetran content into AssetSpecs. Defaults to DagsterFivetranTranslator.
dagster_fivetran_translator (Optional[DagsterFivetranTranslator], optional): The translator to use
to convert Fivetran content into :py:class:`dagster.AssetSpec`.
Defaults to :py:class:`DagsterFivetranTranslator`.
Returns:
List[AssetSpec]: The set of assets representing the Fivetran content in the workspace.
Expand All @@ -972,7 +972,7 @@ def load_fivetran_asset_specs(
return check.is_list(
FivetranWorkspaceDefsLoader(
workspace=initialized_workspace,
translator_cls=dagster_fivetran_translator,
translator=dagster_fivetran_translator or DagsterFivetranTranslator(),
)
.build_defs()
.assets,
Expand All @@ -983,7 +983,7 @@ def load_fivetran_asset_specs(
@record
class FivetranWorkspaceDefsLoader(StateBackedDefinitionsLoader[Mapping[str, Any]]):
workspace: FivetranWorkspace
translator_cls: Type[DagsterFivetranTranslator]
translator: DagsterFivetranTranslator

@property
def defs_key(self) -> str:
Expand All @@ -993,10 +993,8 @@ def fetch_state(self) -> FivetranWorkspaceData:
return self.workspace.fetch_fivetran_workspace_data()

def defs_from_state(self, state: FivetranWorkspaceData) -> Definitions:
translator = self.translator_cls()

all_asset_specs = [
translator.get_asset_spec(props)
self.translator.get_asset_spec(props)
for props in state.to_fivetran_connector_table_props_data()
]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import responses
from dagster._config.field_utils import EnvVar
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.test_utils import environ
from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs
from dagster_fivetran import (
DagsterFivetranTranslator,
FivetranConnectorTableProps,
FivetranWorkspace,
load_fivetran_asset_specs,
)
from dagster_fivetran.asset_defs import build_fivetran_assets_definitions
from dagster_fivetran.translator import FivetranMetadataSet

Expand Down Expand Up @@ -112,3 +118,37 @@ def test_cached_load_spec_with_asset_factory(
# then load_fivetran_asset_specs is called once per connector ID in fivetran_assets
build_fivetran_assets_definitions(workspace=resource)
assert len(fetch_workspace_data_api_mocks.calls) == 4


class MyCustomTranslator(DagsterFivetranTranslator):
def get_asset_spec(self, data: FivetranConnectorTableProps) -> AssetSpec:
default_spec = super().get_asset_spec(data)
return default_spec.replace_attributes(
key=default_spec.key.with_prefix("prefix"),
metadata={**default_spec.metadata, "custom": "metadata"},
)


def test_translator_custom_metadata(
fetch_workspace_data_api_mocks: responses.RequestsMock,
) -> None:
with environ({"FIVETRAN_API_KEY": TEST_API_KEY, "FIVETRAN_API_SECRET": TEST_API_SECRET}):
workspace = FivetranWorkspace(
account_id=TEST_ACCOUNT_ID,
api_key=EnvVar("FIVETRAN_API_KEY"),
api_secret=EnvVar("FIVETRAN_API_SECRET"),
)

all_asset_specs = load_fivetran_asset_specs(
workspace=workspace, dagster_fivetran_translator=MyCustomTranslator()
)
asset_spec = next(spec for spec in all_asset_specs)

assert "custom" in asset_spec.metadata
assert asset_spec.metadata["custom"] == "metadata"
assert asset_spec.key.path == [
"prefix",
"schema_name_in_destination_1",
"table_name_in_destination_1",
]
assert "dagster/kind/fivetran" in asset_spec.tags
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
from typing import Callable

from dagster_fivetran import FivetranWorkspace
import responses
from dagster._config.field_utils import EnvVar
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.test_utils import environ
from dagster_fivetran import (
DagsterFivetranTranslator,
FivetranConnectorTableProps,
FivetranWorkspace,
)

from dagster_fivetran_tests.experimental.conftest import (
TEST_ACCOUNT_ID,
Expand All @@ -23,3 +31,39 @@ def test_fivetran_workspace_data_to_fivetran_connector_table_props_data(
assert table_props_data[1].table == "schema_name_in_destination_1.table_name_in_destination_2"
assert table_props_data[2].table == "schema_name_in_destination_2.table_name_in_destination_1"
assert table_props_data[3].table == "schema_name_in_destination_2.table_name_in_destination_2"


class MyCustomTranslator(DagsterFivetranTranslator):
def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec:
default_spec = super().get_asset_spec(props)
return default_spec.replace_attributes(
key=default_spec.key.with_prefix("prefix"),
metadata={**default_spec.metadata, "custom": "metadata"},
)


def test_translator_custom_metadata(
fetch_workspace_data_api_mocks: responses.RequestsMock,
) -> None:
with environ({"FIVETRAN_API_KEY": TEST_API_KEY, "FIVETRAN_API_SECRET": TEST_API_SECRET}):
resource = FivetranWorkspace(
account_id=TEST_ACCOUNT_ID,
api_key=EnvVar("FIVETRAN_API_KEY"),
api_secret=EnvVar("FIVETRAN_API_SECRET"),
)

actual_workspace_data = resource.fetch_fivetran_workspace_data()
table_props_data = actual_workspace_data.to_fivetran_connector_table_props_data()

first_table_props_data = next(props for props in table_props_data)

asset_spec = MyCustomTranslator().get_asset_spec(first_table_props_data)

assert "custom" in asset_spec.metadata
assert asset_spec.metadata["custom"] == "metadata"
assert asset_spec.key.path == [
"prefix",
"schema_name_in_destination_1",
"table_name_in_destination_1",
]
assert "dagster/kind/fivetran" in asset_spec.tags