Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7/n][dagster-fivetran] Implement load_fivetran_asset_specs #25808

Merged
merged 5 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
FivetranResource as FivetranResource,
FivetranWorkspace as FivetranWorkspace,
fivetran_resource as fivetran_resource,
load_fivetran_asset_specs as load_fivetran_asset_specs,
)
from dagster_fivetran.translator import DagsterFivetranTranslator as DagsterFivetranTranslator
from dagster_fivetran.types import FivetranOutput as FivetranOutput
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)
from dagster._annotations import experimental
from dagster._config.pythonic_config import ConfigurableResource
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._record import record
Expand Down Expand Up @@ -683,6 +684,50 @@ def fetch_fivetran_workspace_data(
return FivetranWorkspaceData.from_content_data(connectors + destinations)


@experimental
def load_fivetran_asset_specs(
workspace: FivetranWorkspace,
dagster_fivetran_translator: Type[DagsterFivetranTranslator] = DagsterFivetranTranslator,
) -> Sequence[AssetSpec]:
"""Returns a list of AssetSpecs representing the Fivetran content in the workspace.

Args:
workspace (FivetranWorkspace): The Fivetran workspace to fetch assets from.
dagster_fivetran_translator (Type[DagsterFivetranTranslator]): The translator to use
to convert Fivetran content into AssetSpecs. Defaults to DagsterFivetranTranslator.

Returns:
List[AssetSpec]: The set of assets representing the Fivetran content in the workspace.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some examples?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure - I added one in 14f49b7, more to come when I update the docs.


Examples:
Loading the asset specs for a given Fivetran workspace:

.. code-block:: python
from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs

import dagster as dg

fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)

fivetran_specs = load_fivetran_asset_specs(fivetran_workspace)
defs = dg.Definitions(assets=[*fivetran_specs], resources={"fivetran": fivetran_workspace}
"""
with workspace.process_config_and_initialize_cm() as initialized_workspace:
return check.is_list(
FivetranWorkspaceDefsLoader(
workspace=initialized_workspace,
translator_cls=dagster_fivetran_translator,
)
.build_defs()
.assets,
AssetSpec,
)


@record
class FivetranWorkspaceDefsLoader(StateBackedDefinitionsLoader[Mapping[str, Any]]):
workspace: FivetranWorkspace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class FivetranConnectorTableProps(NamedTuple):
service: Optional[str]


@whitelist_for_serdes
class FivetranContentType(Enum):
"""Enum representing each object in Fivetran's ontology."""

Expand All @@ -42,6 +43,7 @@ class FivetranContentData:
properties: Mapping[str, Any]


@whitelist_for_serdes
@record
class FivetranWorkspaceData:
"""A record representing all content in a Fivetran workspace.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import responses
from dagster_fivetran import FivetranWorkspace
from dagster._config.field_utils import EnvVar
from dagster._core.test_utils import environ
from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs

from dagster_fivetran_tests.experimental.conftest import (
TEST_ACCOUNT_ID,
Expand All @@ -18,3 +20,28 @@ def test_fetch_fivetran_workspace_data(
actual_workspace_data = resource.fetch_fivetran_workspace_data()
assert len(actual_workspace_data.connectors_by_id) == 1
assert len(actual_workspace_data.destinations_by_id) == 1


def test_translator_spec(
fetch_workspace_data_api_mocks: responses.RequestsMock,
) -> None:
with environ({"FIVETRAN_API_KEY": TEST_API_KEY, "FIVETRAN_API_SECRET": TEST_API_SECRET}):
resource = FivetranWorkspace(
account_id=TEST_ACCOUNT_ID,
api_key=EnvVar("FIVETRAN_API_KEY"),
api_secret=EnvVar("FIVETRAN_API_SECRET"),
)

all_assets = load_fivetran_asset_specs(resource)
all_assets_keys = [asset.key for asset in all_assets]

# 4 tables for the connector
assert len(all_assets) == 4
assert len(all_assets_keys) == 4

# Sanity check outputs, translator tests cover details here
first_asset_key = next(key for key in all_assets_keys)
assert first_asset_key.path == [
"schema_name_in_destination_1",
"table_name_in_destination_1",
]