Skip to content

Commit

Permalink
[7/n][dagster-fivetran] Implement load_fivetran_asset_specs
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Nov 8, 2024
1 parent e0fb1eb commit f3251a5
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
FivetranResource as FivetranResource,
FivetranWorkspace as FivetranWorkspace,
fivetran_resource as fivetran_resource,
load_fivetran_asset_specs as load_fivetran_asset_specs,
)
from dagster_fivetran.translator import DagsterFivetranTranslator as DagsterFivetranTranslator
from dagster_fivetran.types import FivetranOutput as FivetranOutput
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
)
from dagster._annotations import experimental
from dagster._config.pythonic_config import ConfigurableResource
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._record import record
Expand Down Expand Up @@ -671,6 +672,33 @@ def fetch_fivetran_workspace_data(
return FivetranWorkspaceData.from_content_data(connectors + destinations)


@experimental
def load_fivetran_asset_specs(
workspace: FivetranWorkspace,
dagster_fivetran_translator: Type[DagsterFivetranTranslator] = DagsterFivetranTranslator,
) -> Sequence[AssetSpec]:
"""Returns a list of AssetSpecs representing the Fivetran content in the workspace.
Args:
workspace (FivetranWorkspace): The Fivetran workspace to fetch assets from.
dagster_fivetran_translator (Type[DagsterFivetranTranslator]): The translator to use
to convert Fivetran content into AssetSpecs. Defaults to DagsterFivetranTranslator.
Returns:
List[AssetSpec]: The set of assets representing the Tableau content in the workspace.
"""
with workspace.process_config_and_initialize_cm() as initialized_workspace:
return check.is_list(
FivetranWorkspaceDefsLoader(
workspace=initialized_workspace,
translator_cls=dagster_fivetran_translator,
)
.build_defs()
.assets,
AssetSpec,
)


@record
class FivetranWorkspaceDefsLoader(StateBackedDefinitionsLoader[Mapping[str, Any]]):
workspace: FivetranWorkspace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class FivetranConnectorTableProps(NamedTuple):
service: Optional[str]


@whitelist_for_serdes
class FivetranContentType(Enum):
"""Enum representing each object in Fivetran's ontology."""

Expand All @@ -37,6 +38,7 @@ class FivetranContentData:
properties: Mapping[str, Any]


@whitelist_for_serdes
@record
class FivetranWorkspaceData:
"""A record representing all content in a Fivetran workspace.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import uuid
from typing import Callable

from dagster_fivetran import FivetranWorkspace
from dagster._config.field_utils import EnvVar
from dagster._core.test_utils import environ
from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs


def test_fetch_fivetran_workspace_data(workspace_data_api_mocks_fn: Callable) -> None:
Expand All @@ -15,3 +17,31 @@ def test_fetch_fivetran_workspace_data(workspace_data_api_mocks_fn: Callable) ->
actual_workspace_data = resource.fetch_fivetran_workspace_data()
assert len(actual_workspace_data.connectors_by_id) == 1
assert len(actual_workspace_data.destinations_by_id) == 1


def test_translator_spec(workspace_data_api_mocks_fn: Callable) -> None:
account_id = "fake_account_id"
api_key = uuid.uuid4().hex
api_secret = uuid.uuid4().hex

with environ({"FIVETRAN_API_KEY": api_key, "FIVETRAN_API_SECRET": api_secret}):
resource = FivetranWorkspace(
account_id=account_id,
api_key=EnvVar("FIVETRAN_API_KEY"),
api_secret=EnvVar("FIVETRAN_API_SECRET"),
)

with workspace_data_api_mocks_fn(include_sync_endpoints=False):
all_assets = load_fivetran_asset_specs(resource)
all_assets_keys = [asset.key for asset in all_assets]

# 4 tables for the connector
assert len(all_assets) == 4
assert len(all_assets_keys) == 4

# Sanity check outputs, translator tests cover details here
first_asset_key = next(key for key in all_assets_keys)
assert first_asset_key.path == [
"schema_name_in_destination_1",
"table_name_in_destination_1",
]

0 comments on commit f3251a5

Please sign in to comment.