From f6a914ba95f24f02b4689c259d92e5951c6b38f2 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong <46797220+maximearmstrong@users.noreply.github.com> Date: Thu, 5 Dec 2024 16:13:37 -0500 Subject: [PATCH] [11/n][dagster-fivetran] Implement materialization method in FivetranWorkspace (#25961) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary & Motivation This PR implements `FivetranWorkspace.sync_and_poll`, the materialization method for Fivetran assets. This method: - calls `FivetranClient.sync_and_poll` - takes the FivetranOutput returned by `FivetranClient.sync_and_poll` and generates the asset materializations - yields `MaterializeResult` for each expected asset and `AssetMaterialization` for each unexpected asset - a connector table that was not in the connector at definitions loading time can be in the FivetranOutput. Eg. the table was added after definitions loading time and before sync. - logs a warning for each unmaterialized table - a connector table can be created at definitions loading time, but can be missing in the FivetranOutput. Eg. the table was deleted after definitions loading time and before sync. Can be leveraged like: ```python from dagster_fivetran import FivetranWorkspace, fivetran_assets import dagster as dg fivetran_workspace = FivetranWorkspace( account_id=dg.EnvVar("FIVETRAN_API_KEY"), api_key=dg.EnvVar("FIVETRAN_API_KEY"), api_secret=dg.EnvVar("FIVETRAN_API_SECRET"), ) @fivetran_assets( connector_id="connector_id", name="connector_id", group_name="connector_id", workspace=fivetran_workspace, ) def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: FivetranWorkspace): yield from fivetran.sync_and_poll(context=context) defs = dg.Definitions( assets=[fivetran_connector_assets], resources={"fivetran": fivetran_workspace}, ) ``` ## How I Tested These Changes Additional tests with BK Tested with a live Fivetran instance: Screenshot 2024-11-26 at 5 46 31 PM ## Changelog [dagster-fivetran] --- .../dagster_fivetran/asset_decorator.py | 8 +- .../dagster_fivetran/resources.py | 121 ++++++- .../dagster_fivetran/utils.py | 29 +- .../experimental/conftest.py | 316 ++++++++++-------- .../experimental/test_resources.py | 84 ++++- 5 files changed, 405 insertions(+), 153 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py index 86c8929421dac..9c276d552b070 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py @@ -5,6 +5,7 @@ from dagster_fivetran.resources import FivetranWorkspace from dagster_fivetran.translator import DagsterFivetranTranslator, FivetranMetadataSet +from dagster_fivetran.utils import DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY @experimental @@ -100,15 +101,18 @@ def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: Fivet ) """ + dagster_fivetran_translator = dagster_fivetran_translator or DagsterFivetranTranslator() + return multi_asset( name=name, group_name=group_name, can_subset=True, specs=[ - spec + spec.merge_attributes( + metadata={DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY: dagster_fivetran_translator} + ) for spec in workspace.load_asset_specs( dagster_fivetran_translator=dagster_fivetran_translator - or DagsterFivetranTranslator() ) if FivetranMetadataSet.extract(spec.metadata).connector_id == connector_id ], diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index effbc8432d286..a2c82215d0e46 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -4,15 +4,17 @@ import time from datetime import datetime, timedelta from functools import partial -from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Union +from typing import Any, Callable, Iterator, Mapping, Optional, Sequence, Tuple, Union from urllib.parse import urljoin import requests from dagster import ( AssetExecutionContext, + AssetMaterialization, Definitions, Failure, InitResourceContext, + MaterializeResult, MetadataValue, OpExecutionContext, __version__, @@ -25,7 +27,7 @@ from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader from dagster._core.definitions.resource_definition import dagster_maintained_resource -from dagster._record import record +from dagster._record import as_dict, record from dagster._utils.cached_method import cached_method from dagster._vendored.dateutil import parser from pydantic import Field, PrivateAttr @@ -36,12 +38,20 @@ DagsterFivetranTranslator, FivetranConnector, FivetranConnectorScheduleType, + FivetranConnectorTableProps, FivetranDestination, + FivetranMetadataSet, FivetranSchemaConfig, FivetranWorkspaceData, ) from dagster_fivetran.types import FivetranOutput -from dagster_fivetran.utils import get_fivetran_connector_url, get_fivetran_logs_url +from dagster_fivetran.utils import ( + get_fivetran_connector_table_name, + get_fivetran_connector_url, + get_fivetran_logs_url, + get_translator_from_fivetran_assets, + metadata_for_table, +) FIVETRAN_API_BASE = "https://api.fivetran.com" FIVETRAN_API_VERSION = "v1" @@ -832,6 +842,7 @@ class FivetranWorkspace(ConfigurableResource): _client: FivetranClient = PrivateAttr(default=None) + @cached_method def get_client(self) -> FivetranClient: return FivetranClient( api_key=self.api_key, @@ -929,10 +940,108 @@ def load_asset_specs( dagster_fivetran_translator=dagster_fivetran_translator or DagsterFivetranTranslator(), ) - def sync_and_poll( - self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None + def _generate_materialization( + self, + fivetran_output: FivetranOutput, + dagster_fivetran_translator: DagsterFivetranTranslator, ): - raise NotImplementedError() + connector = FivetranConnector.from_connector_details( + connector_details=fivetran_output.connector_details + ) + schema_config = FivetranSchemaConfig.from_schema_config_details( + schema_config_details=fivetran_output.schema_config + ) + + for schema_source_name, schema in schema_config.schemas.items(): + if not schema.enabled: + continue + + for table_source_name, table in schema.tables.items(): + if not table.enabled: + continue + + asset_key = dagster_fivetran_translator.get_asset_spec( + props=FivetranConnectorTableProps( + table=get_fivetran_connector_table_name( + schema_name=schema.name_in_destination, + table_name=table.name_in_destination, + ), + connector_id=connector.id, + name=connector.name, + connector_url=connector.url, + schema_config=schema_config, + database=None, + service=None, + ) + ).key + + yield AssetMaterialization( + asset_key=asset_key, + description=( + f"Table generated via Fivetran sync: {schema.name_in_destination}.{table.name_in_destination}" + ), + metadata={ + **metadata_for_table( + as_dict(table), + get_fivetran_connector_url(fivetran_output.connector_details), + include_column_info=True, + database=None, + schema=schema.name_in_destination, + table=table.name_in_destination, + ), + "schema_source_name": schema_source_name, + "table_source_name": table_source_name, + }, + ) + + def sync_and_poll( + self, context: Union[OpExecutionContext, AssetExecutionContext] + ) -> Iterator[Union[AssetMaterialization, MaterializeResult]]: + """Executes a sync and poll process to materialize Fivetran assets. + + Args: + context (Union[OpExecutionContext, AssetExecutionContext]): The execution context + from within `@fivetran_assets`. If an AssetExecutionContext is passed, + its underlying OpExecutionContext will be used. + + Returns: + Iterator[Union[AssetMaterialization, MaterializeResult]]: An iterator of MaterializeResult + or AssetMaterialization. + """ + assets_def = context.assets_def + dagster_fivetran_translator = get_translator_from_fivetran_assets(assets_def) + + connector_id = next( + check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id) + for spec in assets_def.specs + ) + + client = self.get_client() + fivetran_output = client.sync_and_poll( + connector_id=connector_id, + ) + + materialized_asset_keys = set() + for materialization in self._generate_materialization( + fivetran_output=fivetran_output, dagster_fivetran_translator=dagster_fivetran_translator + ): + # Scan through all tables actually created, if it was expected then emit a MaterializeResult. + # Otherwise, emit a runtime AssetMaterialization. + if materialization.asset_key in context.selected_asset_keys: + yield MaterializeResult( + asset_key=materialization.asset_key, metadata=materialization.metadata + ) + materialized_asset_keys.add(materialization.asset_key) + else: + context.log.warning( + f"An unexpected asset was materialized: {materialization.asset_key}. " + f"Yielding a materialization event." + ) + yield materialization + + unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys + if unmaterialized_asset_keys: + context.log.warning(f"Assets were not materialized: {unmaterialized_asset_keys}") @experimental diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/utils.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/utils.py index 62589c4deb5b6..13c67b76135b0 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/utils.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/utils.py @@ -1,13 +1,23 @@ -from typing import Any, Dict, Iterator, Mapping, Optional, Sequence +from typing import TYPE_CHECKING, Any, Dict, Iterator, Mapping, Optional, Sequence import dagster._check as check -from dagster import AssetMaterialization, MetadataValue +from dagster import ( + AssetMaterialization, + AssetsDefinition, + DagsterInvariantViolationError, + MetadataValue, +) from dagster._core.definitions.metadata import RawMetadataMapping from dagster._core.definitions.metadata.metadata_set import TableMetadataSet from dagster._core.definitions.metadata.table import TableColumn, TableSchema from dagster_fivetran.types import FivetranOutput +if TYPE_CHECKING: + from dagster_fivetran import DagsterFivetranTranslator + +DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY = "dagster-fivetran/dagster_fivetran_translator" + def get_fivetran_connector_url(connector_details: Mapping[str, Any]) -> str: service = connector_details["service"] @@ -23,6 +33,21 @@ def get_fivetran_connector_table_name(schema_name: str, table_name: str) -> str: return f"{schema_name}.{table_name}" +def get_translator_from_fivetran_assets( + fivetran_assets: AssetsDefinition, +) -> "DagsterFivetranTranslator": + metadata_by_key = fivetran_assets.metadata_by_key or {} + first_asset_key = next(iter(fivetran_assets.metadata_by_key.keys())) + first_metadata = metadata_by_key.get(first_asset_key, {}) + dagster_fivetran_translator = first_metadata.get(DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY) + if dagster_fivetran_translator is None: + raise DagsterInvariantViolationError( + f"Expected to find fivetran translator metadata on asset {first_asset_key.to_user_string()}," + " but did not. Did you pass in assets that weren't generated by @fivetran_assets?" + ) + return dagster_fivetran_translator + + def metadata_for_table( table_data: Mapping[str, Any], connector_url: str, diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py index 7fcdc69cc9cc2..3d8c5db724e0c 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py @@ -1,4 +1,5 @@ from typing import Any, Iterator, Mapping +from unittest.mock import patch import pytest import responses @@ -7,6 +8,7 @@ FIVETRAN_API_VERSION, FIVETRAN_CONNECTOR_ENDPOINT, ) +from dagster_fivetran.types import FivetranOutput TEST_MAX_TIME_STR = "2024-12-01T15:45:29.013729Z" TEST_PREVIOUS_MAX_TIME_STR = "2024-12-01T15:43:29.013729Z" @@ -232,168 +234,180 @@ def get_sample_connection_details(succeeded_at: str, failed_at: str) -> Mapping[ # Taken from Fivetran API documentation # https://fivetran.com/docs/rest-api/api-reference/connector-schema/connector-schema-config -SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR = { - "code": "Success", - "message": "Operation performed.", - "data": { - "enable_new_by_default": True, - "schemas": { - "property1": { - "name_in_destination": "schema_name_in_destination_1", - "enabled": True, - "tables": { - "property1": { - "sync_mode": "SOFT_DELETE", - "name_in_destination": "table_name_in_destination_1", - "enabled": True, - "columns": { - "property1": { - "name_in_destination": "column_name_in_destination_1", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", +# The sample is parameterized to test the sync and poll materialization method +def get_sample_schema_config_for_connector(table_name: str) -> Mapping[str, Any]: + return { + "code": "Success", + "message": "Operation performed.", + "data": { + "enable_new_by_default": True, + "schemas": { + "property1": { + "name_in_destination": "schema_name_in_destination_1", + "enabled": True, + "tables": { + "property1": { + "sync_mode": "SOFT_DELETE", + "name_in_destination": table_name, + "enabled": True, + "columns": { + "property1": { + "name_in_destination": "column_name_in_destination_1", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, - }, - "property2": { - "name_in_destination": "column_name_in_destination_2", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "name_in_destination": "column_name_in_destination_2", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, }, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_TABLE", + }, + "supports_columns_config": True, }, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_TABLE", - }, - "supports_columns_config": True, - }, - "property2": { - "sync_mode": "SOFT_DELETE", - "name_in_destination": "table_name_in_destination_2", - "enabled": True, - "columns": { - "property1": { - "name_in_destination": "column_name_in_destination_1", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "sync_mode": "SOFT_DELETE", + "name_in_destination": "table_name_in_destination_2", + "enabled": True, + "columns": { + "property1": { + "name_in_destination": "column_name_in_destination_1", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, - }, - "property2": { - "name_in_destination": "column_name_in_destination_2", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "name_in_destination": "column_name_in_destination_2", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, }, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_TABLE", + }, + "supports_columns_config": True, }, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_TABLE", - }, - "supports_columns_config": True, }, }, - }, - "property2": { - "name_in_destination": "schema_name_in_destination_2", - "enabled": True, - "tables": { - "property1": { - "sync_mode": "SOFT_DELETE", - "name_in_destination": "table_name_in_destination_1", - "enabled": True, - "columns": { - "property1": { - "name_in_destination": "column_name_in_destination_1", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "name_in_destination": "schema_name_in_destination_2", + "enabled": True, + "tables": { + "property1": { + "sync_mode": "SOFT_DELETE", + "name_in_destination": "table_name_in_destination_1", + "enabled": True, + "columns": { + "property1": { + "name_in_destination": "column_name_in_destination_1", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, - }, - "property2": { - "name_in_destination": "column_name_in_destination_1", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "name_in_destination": "column_name_in_destination_1", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, }, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_TABLE", + }, + "supports_columns_config": True, }, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_TABLE", - }, - "supports_columns_config": True, - }, - "property2": { - "sync_mode": "SOFT_DELETE", - "name_in_destination": "table_name_in_destination_2", - "enabled": True, - "columns": { - "property1": { - "name_in_destination": "column_name_in_destination_1", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "sync_mode": "SOFT_DELETE", + "name_in_destination": "table_name_in_destination_2", + "enabled": True, + "columns": { + "property1": { + "name_in_destination": "column_name_in_destination_1", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, - }, - "property2": { - "name_in_destination": "column_name_in_destination_2", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "name_in_destination": "column_name_in_destination_2", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, }, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_TABLE", + }, + "supports_columns_config": True, }, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_TABLE", - }, - "supports_columns_config": True, }, }, }, + "schema_change_handling": "ALLOW_ALL", }, - "schema_change_handling": "ALLOW_ALL", - }, -} + } + + +SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR = get_sample_schema_config_for_connector( + table_name="table_name_in_destination_1" +) + +# We change the name of the original example to test the sync and poll materialization method +ALTERED_SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR = get_sample_schema_config_for_connector( + table_name="another_table_name_in_destination_1" +) SAMPLE_SUCCESS_MESSAGE = {"code": "Success", "message": "Operation performed."} @@ -501,3 +515,25 @@ def all_api_mocks_fixture( status=200, ) yield fetch_workspace_data_api_mocks + + +@pytest.fixture(name="sync_and_poll") +def sync_and_poll_fixture(): + with patch("dagster_fivetran.resources.FivetranClient.sync_and_poll") as mocked_function: + # Fivetran output where all sync'd tables match the workspace data that was used to create the assets def + expected_fivetran_output = FivetranOutput( + connector_details=get_sample_connection_details( + succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR + )["data"], + schema_config=SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR["data"], + ) + # Fivetran output where a table is missing and an unexpected table is sync'd, + # compared to the workspace data that was used to create the assets def + unexpected_fivetran_output = FivetranOutput( + connector_details=get_sample_connection_details( + succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR + )["data"], + schema_config=ALTERED_SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR["data"], + ) + mocked_function.side_effect = [expected_fivetran_output, unexpected_fivetran_output] + yield mocked_function diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index 9c4fb9594253b..a371a44feec57 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -1,8 +1,14 @@ +import re +from unittest.mock import MagicMock + import pytest import responses -from dagster import Failure +from dagster import AssetExecutionContext, AssetKey, Failure +from dagster._config.field_utils import EnvVar +from dagster._core.definitions.materialize import materialize +from dagster._core.test_utils import environ from dagster._vendored.dateutil import parser -from dagster_fivetran import FivetranOutput, FivetranWorkspace +from dagster_fivetran import FivetranOutput, FivetranWorkspace, fivetran_assets from dagster_fivetran.translator import MIN_TIME_STR from dagster_fivetran_tests.experimental.conftest import ( @@ -137,7 +143,7 @@ def test_basic_resource_request( "resync_long_success", ], ) -def test_sync_and_poll_methods(method, n_polls, succeed_at_end, connector_id): +def test_sync_and_poll_client_methods(method, n_polls, succeed_at_end, connector_id): resource = FivetranWorkspace( account_id=TEST_ACCOUNT_ID, api_key=TEST_API_KEY, api_secret=TEST_API_SECRET ) @@ -205,3 +211,75 @@ def _mock_interaction(): else: with pytest.raises(Failure, match="failed!"): _mock_interaction() + + +def test_fivetran_sync_and_poll_materialization_method( + connector_id: str, + fetch_workspace_data_api_mocks: responses.RequestsMock, + sync_and_poll: MagicMock, + capsys: pytest.CaptureFixture, +) -> None: + with environ({"FIVETRAN_API_KEY": TEST_API_KEY, "FIVETRAN_API_SECRET": TEST_API_SECRET}): + workspace = FivetranWorkspace( + account_id=TEST_ACCOUNT_ID, + api_key=EnvVar("FIVETRAN_API_KEY"), + api_secret=EnvVar("FIVETRAN_API_SECRET"), + ) + + @fivetran_assets(connector_id=connector_id, workspace=workspace, name=connector_id) + def my_fivetran_assets(context: AssetExecutionContext, fivetran: FivetranWorkspace): + yield from fivetran.sync_and_poll(context=context) + + # Mocked FivetranClient.sync_and_poll returns API response where all connector tables are expected + result = materialize( + [my_fivetran_assets], + resources={"fivetran": workspace}, + ) + assert result.success + asset_materializations = [ + event + for event in result.events_for_node(connector_id) + if event.event_type_value == "ASSET_MATERIALIZATION" + ] + assert len(asset_materializations) == 4 + materialized_asset_keys = { + asset_materialization.asset_key for asset_materialization in asset_materializations + } + assert len(materialized_asset_keys) == 4 + assert my_fivetran_assets.keys == materialized_asset_keys + + # Mocked FivetranClient.sync_and_poll returns API response + # where one expected table is missing and an unexpected table is present + result = materialize( + [my_fivetran_assets], + resources={"fivetran": workspace}, + ) + + assert result.success + asset_materializations = [ + event + for event in result.events_for_node(connector_id) + if event.event_type_value == "ASSET_MATERIALIZATION" + ] + assert len(asset_materializations) == 4 + materialized_asset_keys = { + asset_materialization.asset_key for asset_materialization in asset_materializations + } + assert len(materialized_asset_keys) == 4 + assert my_fivetran_assets.keys != materialized_asset_keys + assert ( + AssetKey(["schema_name_in_destination_1", "another_table_name_in_destination_1"]) + in materialized_asset_keys + ) + assert ( + AssetKey(["schema_name_in_destination_1", "table_name_in_destination_1"]) + not in materialized_asset_keys + ) + + captured = capsys.readouterr() + assert re.search( + r"dagster - WARNING - (?s:.)+ - An unexpected asset was materialized", captured.err + ) + assert re.search( + r"dagster - WARNING - (?s:.)+ - Assets were not materialized", captured.err + )