Skip to content

Commit

Permalink
Update translator post review
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Nov 7, 2024
1 parent 7a92c39 commit 6cb558b
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 118 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from dagster._core.libraries import DagsterLibraryRegistry

from dagster_fivetran.asset_defs import (
DagsterFivetranTranslator as DagsterFivetranTranslator,
build_fivetran_assets as build_fivetran_assets,
load_assets_from_fivetran_instance as load_assets_from_fivetran_instance,
)
Expand All @@ -14,6 +13,7 @@
FivetranWorkspace as FivetranWorkspace,
fivetran_resource as fivetran_resource,
)
from dagster_fivetran.translator import DagsterFivetranTranslator as DagsterFivetranTranslator
from dagster_fivetran.types import FivetranOutput as FivetranOutput
from dagster_fivetran.version import __version__ as __version__

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from dagster._utils.log import get_dagster_logger

from dagster_fivetran.resources import DEFAULT_POLL_INTERVAL, FivetranResource
from dagster_fivetran.translator import DagsterFivetranTranslator, FivetranConnectorTableProps
from dagster_fivetran.utils import (
generate_materializations,
get_fivetran_connector_url,
Expand All @@ -53,50 +54,6 @@
logger = get_dagster_logger()


class FivetranConnectorTableProps(NamedTuple):
table: str
connector_id: str
name: str
connector_url: str
schemas: Mapping[str, Any]
database: Optional[str]
service: Optional[str]


class DagsterFivetranTranslator:
def get_asset_key(self, props: FivetranConnectorTableProps) -> AssetKey:
"""Get the AssetKey for a table synced by a Fivetran connector."""
return AssetKey(props.table.split("."))

def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec:
"""Get the AssetSpec for a table synced by a Fivetran connector."""
schema_name, table_name = props.table.split(".")
schema_entry = next(
schema
for schema in props.schemas["schemas"].values()
if schema["name_in_destination"] == schema_name
)
table_entry = next(
table_entry
for table_entry in schema_entry["tables"].values()
if table_entry["name_in_destination"] == table_name
)

metadata = metadata_for_table(
table_entry,
props.connector_url,
database=props.database,
schema=schema_name,
table=table_name,
)

return AssetSpec(
key=self.get_asset_key(props),
metadata=metadata,
kinds={"fivetran", *({props.service} if props.service else set())},
)


def _fetch_and_attach_col_metadata(
fivetran_resource: FivetranResource, connector_id: str, materialization: AssetMaterialization
) -> AssetMaterialization:
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from enum import Enum
from typing import Any, Mapping, NamedTuple, Optional, Sequence

from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes

from dagster_fivetran.utils import metadata_for_table


class FivetranConnectorTableProps(NamedTuple):
table: str
connector_id: str
name: str
connector_url: str
schemas: Mapping[str, Any]
database: Optional[str]
service: Optional[str]


class FivetranContentType(Enum):
"""Enum representing each object in Fivetran's ontology."""

CONNECTOR = "connector"
DESTINATION = "destination"


@whitelist_for_serdes
@record
class FivetranContentData:
"""A record representing a piece of content in a Fivetran workspace.
Includes the object's type and data as returned from the API.
"""

content_type: FivetranContentType
properties: Mapping[str, Any]


@record
class FivetranWorkspaceData:
"""A record representing all content in a Fivetran workspace.
Provided as context for the translator so that it can resolve dependencies between content.
"""

connectors_by_id: Mapping[str, FivetranContentData]
destinations_by_id: Mapping[str, FivetranContentData]

@classmethod
def from_content_data(
cls, content_data: Sequence[FivetranContentData]
) -> "FivetranWorkspaceData":
raise NotImplementedError()

def to_fivetran_connector_table_props_data(self) -> Sequence[FivetranConnectorTableProps]:
"""Method that converts a `FivetranWorkspaceData` object
to a collection of `FivetranConnectorTableProps` objects.
"""
raise NotImplementedError()


class DagsterFivetranTranslator:
"""Translator class which converts a `FivetranConnectorTableProps` object into AssetSpecs.
Subclass this class to implement custom logic for each type of Fivetran content.
"""

def get_asset_key(self, props: FivetranConnectorTableProps) -> AssetKey:
"""Get the AssetKey for a table synced by a Fivetran connector."""
return AssetKey(props.table.split("."))

def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec:
"""Get the AssetSpec for a table synced by a Fivetran connector."""
schema_name, table_name = props.table.split(".")
schema_entry = next(
schema
for schema in props.schemas["schemas"].values()
if schema["name_in_destination"] == schema_name
)
table_entry = next(
table_entry
for table_entry in schema_entry["tables"].values()
if table_entry["name_in_destination"] == table_name
)

metadata = metadata_for_table(
table_entry,
props.connector_url,
database=props.database,
schema=schema_name,
table=table_name,
)

return AssetSpec(
key=self.get_asset_key(props),
metadata=metadata,
kinds={"fivetran", *({props.service} if props.service else set())},
)

0 comments on commit 6cb558b

Please sign in to comment.