diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/fivetran_event_iterator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/fivetran_event_iterator.py index f3eb858af9287..2b28cd7347ef3 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/fivetran_event_iterator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/fivetran_event_iterator.py @@ -1,4 +1,5 @@ -from typing import TYPE_CHECKING, Generic, Iterator, Union +from concurrent.futures import ThreadPoolExecutor +from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Iterator, Union from dagster import ( AssetExecutionContext, @@ -9,57 +10,58 @@ ) from dagster._annotations import experimental, public from dagster._core.definitions.metadata.metadata_set import TableMetadataSet -from dagster._core.definitions.metadata.table import TableColumn, TableSchema +from dagster._core.utils import imap from typing_extensions import TypeVar +from dagster_fivetran.translator import FivetranMetadataSet +from dagster_fivetran.utils import get_column_schema_for_columns, get_fivetran_connector_table_name + if TYPE_CHECKING: from dagster_fivetran.resources import FivetranWorkspace - FivetranEventType = Union[AssetMaterialization, MaterializeResult] T = TypeVar("T", bound=FivetranEventType) +DEFAULT_MAX_THREADPOOL_WORKERS = 10 -def fetch_column_metadata( + +def _fetch_column_metadata( + materialization: FivetranEventType, fivetran_workspace: "FivetranWorkspace", - connector_id: str, - materialization: Union[AssetMaterialization, MaterializeResult], -) -> AssetMaterialization: - """Subroutine to fetch column metadata for a given table from the Fivetran API and attach it to the - materialization. - """ - schema_source_name = materialization.metadata["schema_source_name"].value - table_source_name = materialization.metadata["table_source_name"].value +) -> Dict[str, Any]: + """Subroutine to fetch column metadata for a given table from the Fivetran API.""" + materialization_metadata = check.not_none(materialization.metadata) + connector_id = check.not_none( + FivetranMetadataSet.extract(materialization_metadata).connector_id + ) + schema_name = check.not_none( + FivetranMetadataSet.extract(materialization_metadata).destination_schema_name + ) + table_name = check.not_none( + FivetranMetadataSet.extract(materialization_metadata).destination_table_name + ) + client = fivetran_workspace.get_client() + metadata = {} try: - table_conn_data = client.get_columns_for_table( + table_conn_data = client.get_columns_config_for_table( connector_id=connector_id, - schema_name=schema_source_name, - table_name=table_source_name, + schema_name=schema_name, + table_name=table_name, ) + columns = check.dict_elem(table_conn_data, "columns") - table_columns = sorted( - [ - TableColumn(name=col["name_in_destination"], type="") - for col in columns.values() - if "name_in_destination" in col and col.get("enabled") - ], - key=lambda col: col.name, - ) - return materialization.with_metadata( - { - **materialization.metadata, - **TableMetadataSet(column_schema=TableSchema(table_columns)), - } - ) + metadata = {**TableMetadataSet(column_schema=get_column_schema_for_columns(columns))} except Exception as e: client._log.warning( # noqa - "An error occurred while fetching column metadata for table %s", + f"An error occurred while fetching column metadata for table " + f"{get_fivetran_connector_table_name(schema_name=schema_name, table_name=table_name)}." + "Column metadata will not be included in the event.\n\n" f"Exception: {e}", exc_info=True, ) - return materialization + return metadata class FivetranEventIterator(Generic[T], Iterator): @@ -86,17 +88,62 @@ def __iter__(self) -> "FivetranEventIterator[T]": @experimental @public def fetch_column_metadata(self) -> "FivetranEventIterator": - """Fetches column metadata for each table synced by the Sling CLI. + """Fetches column metadata for each table synced with the Fivetran API. - Retrieves the column schema and lineage for each target table. + Retrieves the column schema for each destination table. Returns: - SlingEventIterator: An iterator of Dagster events with column metadata attached. + FivetranEventIterator: An iterator of Dagster events with column metadata attached. """ + fetch_metadata_fn: Callable[ + [FivetranEventType], + Dict[str, Any], + ] = lambda materialization: _fetch_column_metadata( + materialization=materialization, + fivetran_workspace=self._fivetran_workspace, + ) + + return self._attach_metadata(fetch_metadata_fn) + + def _attach_metadata( + self, + fn: Callable[[FivetranEventType], Dict[str, Any]], + ) -> "FivetranEventIterator": + """Runs a threaded task to attach metadata to each event in the iterator. - def _fetch_column_metadata() -> Iterator[T]: - raise NotImplementedError() + Args: + fn (Callable[[Union[AssetMaterialization, MaterializeResult]], Dict[str, Any]]): + A function which takes a FivetranEventType and returns + a dictionary of metadata to attach to the event. + + Returns: + Iterator[Union[AssetMaterialization, MaterializeResult]]: + A set of corresponding Dagster events for Fivetran tables, with any metadata output + by the function attached, yielded in the order they are emitted by the Fivetran API. + """ - return FivetranEventIterator[T]( - _fetch_column_metadata(), self._fivetran_workspace, self._context + def _map_fn(event: FivetranEventType) -> FivetranEventType: + return event._replace(metadata={**check.is_dict(event.metadata), **fn(event)}) + + def _threadpool_wrap_map_fn() -> Iterator[FivetranEventType]: + assets_def = self._context.assets_def + connector_id = next( + check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id) + for spec in assets_def.specs + ) + + with ThreadPoolExecutor( + max_workers=DEFAULT_MAX_THREADPOOL_WORKERS, + thread_name_prefix=f"fivetran_{connector_id}", + ) as executor: + yield from imap( + executor=executor, + iterable=self._inner_iterator, + func=_map_fn, + ) + + return FivetranEventIterator( + events=_threadpool_wrap_map_fn(), + fivetran_workspace=self._fivetran_workspace, + context=self._context, ) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 9f8782c857f4c..82b36a45ad1bd 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -4,7 +4,7 @@ import time from datetime import datetime, timedelta from functools import partial -from typing import Any, Callable, Iterator, Mapping, Optional, Sequence, Tuple, Union +from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Union from urllib.parse import urljoin import requests @@ -60,8 +60,6 @@ FIVETRAN_API_VERSION_PATH = f"{FIVETRAN_API_VERSION}/" FIVETRAN_CONNECTOR_PATH = f"{FIVETRAN_CONNECTOR_ENDPOINT}/" -DEFAULT_MAX_THREADPOOL_WORKERS = 10 - # default polling interval (in seconds) DEFAULT_POLL_INTERVAL = 10 @@ -970,11 +968,11 @@ def _generate_materialization( schema_config_details=fivetran_output.schema_config ) - for schema_source_name, schema in schema_config.schemas.items(): + for schema in schema_config.schemas.values(): if not schema.enabled: continue - for table_source_name, table in schema.tables.items(): + for table in schema.tables.values(): if not table.enabled: continue @@ -1007,14 +1005,17 @@ def _generate_materialization( schema=schema.name_in_destination, table=table.name_in_destination, ), - "schema_source_name": schema_source_name, - "table_source_name": table_source_name, + **FivetranMetadataSet( + connector_id=connector.id, + destination_schema_name=schema.name_in_destination, + destination_table_name=table.name_in_destination, + ), }, ) def sync_and_poll( self, context: Union[OpExecutionContext, AssetExecutionContext] - ) -> Iterator[Union[AssetMaterialization, MaterializeResult]]: + ) -> FivetranEventIterator[Union[AssetMaterialization, MaterializeResult]]: """Executes a sync and poll process to materialize Fivetran assets. Args: diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index 3a35d335703b5..054b6cee51301 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -251,6 +251,8 @@ def to_fivetran_connector_table_props_data(self) -> Sequence[FivetranConnectorTa class FivetranMetadataSet(NamespacedMetadataSet): connector_id: Optional[str] = None + destination_schema_name: Optional[str] = None + destination_table_name: Optional[str] = None @classmethod def namespace(cls) -> str: @@ -284,7 +286,14 @@ def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec: table=table_name, ) - augmented_metadata = {**metadata, **FivetranMetadataSet(connector_id=props.connector_id)} + augmented_metadata = { + **metadata, + **FivetranMetadataSet( + connector_id=props.connector_id, + destination_schema_name=schema_name, + destination_table_name=table_name, + ), + } return AssetSpec( key=AssetKey(props.table.split(".")), diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/utils.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/utils.py index 13c67b76135b0..0ec34676aadd8 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/utils.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/utils.py @@ -61,15 +61,7 @@ def metadata_for_table( table_name = None if table_data.get("columns"): columns = check.dict_elem(table_data, "columns") - table_columns = sorted( - [ - TableColumn(name=col["name_in_destination"], type="") - for col in columns.values() - if "name_in_destination" in col and col.get("enabled") - ], - key=lambda col: col.name, - ) - column_schema = TableSchema(columns=table_columns) + column_schema = get_column_schema_for_columns(columns=columns) if include_column_info: metadata["column_info"] = MetadataValue.json(columns) @@ -84,6 +76,18 @@ def metadata_for_table( return metadata +def get_column_schema_for_columns(columns: Mapping[str, Any]): + table_columns = sorted( + [ + TableColumn(name=col["name_in_destination"], type="") + for col in columns.values() + if "name_in_destination" in col and col.get("enabled") + ], + key=lambda col: col.name, + ) + return TableSchema(columns=table_columns) + + def _table_data_to_materialization( fivetran_output: FivetranOutput, asset_key_prefix: Sequence[str], diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py index 76df67e6416e3..a28903099a709 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py @@ -20,6 +20,8 @@ TEST_SCHEMA_NAME = "schema_name_in_destination_1" TEST_TABLE_NAME = "table_name_in_destination_1" +TEST_SECOND_SCHEMA_NAME = "schema_name_in_destination_2" +TEST_SECOND_TABLE_NAME = "table_name_in_destination_2" TEST_ANOTHER_TABLE_NAME = "another_table_name_in_destination_1" # Taken from Fivetran API documentation @@ -343,7 +345,7 @@ def get_sample_schema_config_for_connector(table_name: str) -> Mapping[str, Any] "is_primary_key": True, }, "property2": { - "name_in_destination": "column_name_in_destination_1", + "name_in_destination": "column_name_in_destination_2", "enabled": True, "hashed": False, "enabled_patch_settings": { diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_columns_metadata.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_columns_metadata.py new file mode 100644 index 0000000000000..a929e7e3f6e21 --- /dev/null +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_columns_metadata.py @@ -0,0 +1,141 @@ +import re +from unittest.mock import MagicMock + +import pytest +import responses +from dagster import AssetExecutionContext, AssetKey, TableColumn, TableSchema +from dagster._config.field_utils import EnvVar +from dagster._core.definitions.materialize import materialize +from dagster._core.definitions.metadata import TableMetadataSet +from dagster._core.definitions.metadata.table import TableColumnConstraints, TableConstraints +from dagster._core.test_utils import environ +from dagster_fivetran import FivetranWorkspace, fivetran_assets + +from dagster_fivetran_tests.experimental.conftest import ( + SAMPLE_SOURCE_TABLE_COLUMNS_CONFIG, + TEST_ACCOUNT_ID, + TEST_API_KEY, + TEST_API_SECRET, + TEST_SCHEMA_NAME, + TEST_SECOND_SCHEMA_NAME, + TEST_SECOND_TABLE_NAME, + TEST_TABLE_NAME, + get_fivetran_connector_api_url, +) + + +def test_column_schema( + connector_id: str, + fetch_workspace_data_api_mocks: responses.RequestsMock, + sync_and_poll: MagicMock, + capsys: pytest.CaptureFixture, +) -> None: + with environ({"FIVETRAN_API_KEY": TEST_API_KEY, "FIVETRAN_API_SECRET": TEST_API_SECRET}): + test_connector_api_url = get_fivetran_connector_api_url(connector_id) + for schema_name, table_name in [ + (TEST_SCHEMA_NAME, TEST_TABLE_NAME), + (TEST_SCHEMA_NAME, TEST_SECOND_TABLE_NAME), + (TEST_SECOND_SCHEMA_NAME, TEST_TABLE_NAME), + (TEST_SECOND_SCHEMA_NAME, TEST_SECOND_TABLE_NAME), + ]: + fetch_workspace_data_api_mocks.add( + method=responses.GET, + url=f"{test_connector_api_url}/schemas/{schema_name}/tables/{table_name}/columns", + json=SAMPLE_SOURCE_TABLE_COLUMNS_CONFIG, + status=200, + ) + + workspace = FivetranWorkspace( + account_id=TEST_ACCOUNT_ID, + api_key=EnvVar("FIVETRAN_API_KEY"), + api_secret=EnvVar("FIVETRAN_API_SECRET"), + ) + + @fivetran_assets(connector_id=connector_id, workspace=workspace, name=connector_id) + def my_fivetran_assets(context: AssetExecutionContext, fivetran: FivetranWorkspace): + yield from fivetran.sync_and_poll(context=context).fetch_column_metadata() + + for schema_name, table_name in [ + (TEST_SCHEMA_NAME, TEST_TABLE_NAME), + (TEST_SCHEMA_NAME, TEST_SECOND_TABLE_NAME), + (TEST_SECOND_SCHEMA_NAME, TEST_TABLE_NAME), + (TEST_SECOND_SCHEMA_NAME, TEST_SECOND_TABLE_NAME), + ]: + table_spec = my_fivetran_assets.get_asset_spec( + AssetKey( + [ + schema_name, + table_name, + ] + ) + ) + spec_table_schema = TableMetadataSet.extract(table_spec.metadata).column_schema + + expected_spec_table_schema = TableSchema( + columns=[ + TableColumn( + name="column_name_in_destination_1", + type="", + description=None, + constraints=TableColumnConstraints(nullable=True, unique=False, other=[]), + tags={}, + ), + TableColumn( + name="column_name_in_destination_2", + type="", + description=None, + constraints=TableColumnConstraints(nullable=True, unique=False, other=[]), + tags={}, + ), + ], + constraints=TableConstraints(other=[]), + ) + + assert spec_table_schema == expected_spec_table_schema + + result = materialize( + [my_fivetran_assets], + resources={"fivetran": workspace}, + ) + assert result.success + + for schema_name, table_name in [ + (TEST_SCHEMA_NAME, TEST_TABLE_NAME), + (TEST_SCHEMA_NAME, TEST_SECOND_TABLE_NAME), + (TEST_SECOND_SCHEMA_NAME, TEST_TABLE_NAME), + (TEST_SECOND_SCHEMA_NAME, TEST_SECOND_TABLE_NAME), + ]: + table_schema_by_asset_key = { + event.materialization.asset_key: TableMetadataSet.extract( + event.materialization.metadata + ).column_schema + for event in result.get_asset_materialization_events() + if event.materialization.asset_key + == AssetKey( + [ + schema_name, + table_name, + ] + ) + } + expected_table_schema_by_asset_key = { + AssetKey( + [ + schema_name, + table_name, + ] + ): TableSchema( + columns=[ + TableColumn("column_name_in_destination_1", type=""), + TableColumn("column_name_in_destination_2", type=""), + ] + ), + } + + assert table_schema_by_asset_key == expected_table_schema_by_asset_key + + captured = capsys.readouterr() + assert not re.search( + r"dagster - WARNING - (?s:.)+ - An error occurred while fetching column metadata for table", + captured.err, + )