Skip to content

Commit

Permalink
Decouple schema config from connector, implement connector properties
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Nov 14, 2024
1 parent 8af9618 commit d840327
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
DagsterFivetranTranslator,
FivetranConnector,
FivetranDestination,
FivetranSchemaConfig,
FivetranWorkspaceData,
)
from dagster_fivetran.types import FivetranOutput
Expand Down Expand Up @@ -632,8 +633,8 @@ def fetch_fivetran_workspace_data(
FivetranWorkspaceData: A snapshot of the Fivetran workspace's content.
"""
connectors_by_id = {}

destinations_by_id = {}
schema_configs_by_connector_id = {}

client = self.get_client()
groups = client.get_groups()["items"]
Expand All @@ -649,19 +650,28 @@ def fetch_fivetran_workspace_data(

connectors_details = client.get_connectors_for_group(group_id=group_id)["items"]
for connector_details in connectors_details:
connector = FivetranConnector.from_api_details(
connector = FivetranConnector.from_connector_details(
connector_details=connector_details,
destination_details=destination_details,
schema_config_details=client.get_schema_config_for_connector(
connector_id=connector_details["id"]
),
)

if not connector.has_bad_status:
connectors_by_id[connector.id] = connector
if connector.has_bad_setup_state:
continue

connectors_by_id[connector.id] = connector

schema_config_details = client.get_schema_config_for_connector(
connector_id=connector.id
)
schema_config = FivetranSchemaConfig.from_schema_config_details(
schema_config_details=schema_config_details
)

schema_configs_by_connector_id[connector.id] = schema_config

return FivetranWorkspaceData(
connectors_by_id=connectors_by_id, destinations_by_id=destinations_by_id
connectors_by_id=connectors_by_id,
destinations_by_id=destinations_by_id,
schema_configs_by_connector_id=schema_configs_by_connector_id,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class FivetranConnectorTableProps(NamedTuple):
service: Optional[str]


@whitelist_for_serdes
class FivetranConnectorSetupStateType(Enum):
"""Enum representing each setup state for a connector in Fivetran's ontology."""

Expand All @@ -36,34 +37,32 @@ class FivetranConnector:
id: str
name: str
service: str
schema_config: "FivetranSchemaConfig"
destination_id: str
has_bad_status: bool
group_id: str
setup_state: FivetranConnectorSetupStateType

@property
def url(self) -> str:
return f"https://fivetran.com/dashboard/connectors/{self.service}/{self.name}"

@property
def destination_id(self) -> str:
return self.group_id

@property
def has_bad_setup_state(self) -> bool:
return self.setup_state is not FivetranConnectorSetupStateType.CONNECTED

@classmethod
def from_api_details(
def from_connector_details(
cls,
connector_details: Mapping[str, Any],
destination_details: Mapping[str, Any],
schema_config_details: Mapping[str, Any],
) -> "FivetranConnector":
return cls(
id=connector_details["id"],
name=connector_details["schema"],
service=connector_details["service"],
schema_config=FivetranSchemaConfig.from_schema_config_details(
schema_config_details=schema_config_details
),
destination_id=destination_details["id"],
has_bad_status=connector_details["status"]["setup_state"]
in (
FivetranConnectorSetupStateType.INCOMPLETE,
FivetranConnectorSetupStateType.BROKEN,
),
group_id=connector_details["group_id"],
setup_state=FivetranConnectorSetupStateType(connector_details["status"]["setup_state"]),
)


Expand Down Expand Up @@ -155,6 +154,7 @@ class FivetranWorkspaceData:

connectors_by_id: Mapping[str, FivetranConnector]
destinations_by_id: Mapping[str, FivetranDestination]
schema_configs_by_connector_id: Mapping[str, FivetranSchemaConfig]

@cached_method
def to_fivetran_connector_table_props_data(self) -> Sequence[FivetranConnectorTableProps]:
Expand All @@ -163,10 +163,11 @@ def to_fivetran_connector_table_props_data(self) -> Sequence[FivetranConnectorTa
"""
data: List[FivetranConnectorTableProps] = []

for connector_id, connector in self.connectors_by_id.items():
for connector in self.connectors_by_id.values():
destination = self.destinations_by_id[connector.destination_id]
schema_config = self.schema_configs_by_connector_id[connector.id]

for schema in connector.schema_config.schemas.values():
for schema in schema_config.schemas.values():
if schema.enabled:
for table in schema.tables.values():
if table.enabled:
Expand All @@ -176,10 +177,10 @@ def to_fivetran_connector_table_props_data(self) -> Sequence[FivetranConnectorTa
schema_name=schema.name_in_destination,
table_name=table.name_in_destination,
),
connector_id=connector_id,
connector_id=connector.id,
name=connector.name,
connector_url=connector.url,
schema_config=connector.schema_config,
schema_config=schema_config,
database=destination.database,
service=destination.service,
)
Expand Down

0 comments on commit d840327

Please sign in to comment.