diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py index e926912504953..9c276d552b070 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py @@ -101,6 +101,8 @@ def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: Fivet ) """ + dagster_fivetran_translator = dagster_fivetran_translator or DagsterFivetranTranslator() + return multi_asset( name=name, group_name=group_name, @@ -111,7 +113,6 @@ def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: Fivet ) for spec in workspace.load_asset_specs( dagster_fivetran_translator=dagster_fivetran_translator - or DagsterFivetranTranslator() ) if FivetranMetadataSet.extract(spec.metadata).connector_id == connector_id ], diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 9af774eb076b7..3ea581594a96f 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -4,7 +4,7 @@ import time from datetime import datetime, timedelta from functools import partial -from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Union +from typing import Any, Callable, Iterator, Mapping, Optional, Sequence, Tuple, Union from urllib.parse import urljoin import requests @@ -14,9 +14,9 @@ Definitions, Failure, InitResourceContext, + MaterializeResult, MetadataValue, OpExecutionContext, - Output, __version__, _check as check, get_dagster_logger, @@ -27,7 +27,7 @@ from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader from dagster._core.definitions.resource_definition import dagster_maintained_resource -from dagster._record import record +from dagster._record import as_dict, record from dagster._utils.cached_method import cached_method from dagster._vendored.dateutil import parser from pydantic import Field, PrivateAttr @@ -977,15 +977,17 @@ def _generate_materialization( yield AssetMaterialization( asset_key=asset_key, - description=f"Table generated via Fivetran sync: {schema.name}.{table.name}", + description=( + f"Table generated via Fivetran sync: {schema.name_in_destination}.{table.name_in_destination}" + ), metadata={ **metadata_for_table( - table, + as_dict(table), get_fivetran_connector_url(fivetran_output.connector_details), include_column_info=True, database=None, - schema=schema.name, - table=table.name, + schema=schema.name_in_destination, + table=table.name_in_destination, ), "schema_source_name": schema_source_name, "table_source_name": table_source_name, @@ -993,9 +995,19 @@ def _generate_materialization( ) def sync_and_poll( - self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None - ): - # TODO: Add docstrings + self, context: Union[OpExecutionContext, AssetExecutionContext] + ) -> Iterator[Union[AssetMaterialization, MaterializeResult]]: + """Executes a sync and poll process to materialize Fivetran assets. + + Args: + context (Union[OpExecutionContext, AssetExecutionContext]): The execution context + from within `@fivetran_assets`. If an AssetExecutionContext is passed, + its underlying OpExecutionContext will be used. + + Returns: + Iterator[Union[AssetMaterialization, MaterializeResult]]: An iterator of MaterializeResult + or AssetMaterialization. + """ assets_def = context.assets_def dagster_fivetran_translator = get_translator_from_fivetran_assets(assets_def) @@ -1013,13 +1025,11 @@ def sync_and_poll( for materialization in self._generate_materialization( fivetran_output=fivetran_output, dagster_fivetran_translator=dagster_fivetran_translator ): - # scan through all tables actually created, if it was expected then emit an Output. - # otherwise, emit a runtime AssetMaterialization + # Scan through all tables actually created, if it was expected then emit a MaterializeResult. + # Otherwise, emit a runtime AssetMaterialization. if materialization.asset_key in context.selected_asset_keys: - yield Output( - value=None, - output_name=materialization.asset_key.to_python_identifier(), - metadata=materialization.metadata, + yield MaterializeResult( + asset_key=materialization.asset_key, metadata=materialization.metadata ) materialized_asset_keys.add(materialization.asset_key) else: diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py index 7fcdc69cc9cc2..2ae8e65742e8c 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py @@ -1,4 +1,5 @@ from typing import Any, Iterator, Mapping +from unittest.mock import patch import pytest import responses @@ -7,6 +8,7 @@ FIVETRAN_API_VERSION, FIVETRAN_CONNECTOR_ENDPOINT, ) +from dagster_fivetran.types import FivetranOutput TEST_MAX_TIME_STR = "2024-12-01T15:45:29.013729Z" TEST_PREVIOUS_MAX_TIME_STR = "2024-12-01T15:43:29.013729Z" @@ -232,168 +234,180 @@ def get_sample_connection_details(succeeded_at: str, failed_at: str) -> Mapping[ # Taken from Fivetran API documentation # https://fivetran.com/docs/rest-api/api-reference/connector-schema/connector-schema-config -SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR = { - "code": "Success", - "message": "Operation performed.", - "data": { - "enable_new_by_default": True, - "schemas": { - "property1": { - "name_in_destination": "schema_name_in_destination_1", - "enabled": True, - "tables": { - "property1": { - "sync_mode": "SOFT_DELETE", - "name_in_destination": "table_name_in_destination_1", - "enabled": True, - "columns": { - "property1": { - "name_in_destination": "column_name_in_destination_1", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", +# The sample is parameterized to test the sync and poll materialization method +def get_sample_schema_config_for_connector(table_name: str) -> Mapping[str, Any]: + return { + "code": "Success", + "message": "Operation performed.", + "data": { + "enable_new_by_default": True, + "schemas": { + "property1": { + "name_in_destination": "schema_name_in_destination_1", + "enabled": True, + "tables": { + "property1": { + "sync_mode": "SOFT_DELETE", + "name_in_destination": table_name, + "enabled": True, + "columns": { + "property1": { + "name_in_destination": "column_name_in_destination_1", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, - }, - "property2": { - "name_in_destination": "column_name_in_destination_2", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "name_in_destination": "column_name_in_destination_2", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, }, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_TABLE", + }, + "supports_columns_config": True, }, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_TABLE", - }, - "supports_columns_config": True, - }, - "property2": { - "sync_mode": "SOFT_DELETE", - "name_in_destination": "table_name_in_destination_2", - "enabled": True, - "columns": { - "property1": { - "name_in_destination": "column_name_in_destination_1", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "sync_mode": "SOFT_DELETE", + "name_in_destination": "table_name_in_destination_2", + "enabled": True, + "columns": { + "property1": { + "name_in_destination": "column_name_in_destination_1", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, - }, - "property2": { - "name_in_destination": "column_name_in_destination_2", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "name_in_destination": "column_name_in_destination_2", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, }, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_TABLE", + }, + "supports_columns_config": True, }, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_TABLE", - }, - "supports_columns_config": True, }, }, - }, - "property2": { - "name_in_destination": "schema_name_in_destination_2", - "enabled": True, - "tables": { - "property1": { - "sync_mode": "SOFT_DELETE", - "name_in_destination": "table_name_in_destination_1", - "enabled": True, - "columns": { - "property1": { - "name_in_destination": "column_name_in_destination_1", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "name_in_destination": "schema_name_in_destination_2", + "enabled": True, + "tables": { + "property1": { + "sync_mode": "SOFT_DELETE", + "name_in_destination": "table_name_in_destination_1", + "enabled": True, + "columns": { + "property1": { + "name_in_destination": "column_name_in_destination_1", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, - }, - "property2": { - "name_in_destination": "column_name_in_destination_1", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "name_in_destination": "column_name_in_destination_1", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, }, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_TABLE", + }, + "supports_columns_config": True, }, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_TABLE", - }, - "supports_columns_config": True, - }, - "property2": { - "sync_mode": "SOFT_DELETE", - "name_in_destination": "table_name_in_destination_2", - "enabled": True, - "columns": { - "property1": { - "name_in_destination": "column_name_in_destination_1", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "sync_mode": "SOFT_DELETE", + "name_in_destination": "table_name_in_destination_2", + "enabled": True, + "columns": { + "property1": { + "name_in_destination": "column_name_in_destination_1", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, - }, - "property2": { - "name_in_destination": "column_name_in_destination_2", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "name_in_destination": "column_name_in_destination_2", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, }, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_TABLE", + }, + "supports_columns_config": True, }, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_TABLE", - }, - "supports_columns_config": True, }, }, }, + "schema_change_handling": "ALLOW_ALL", }, - "schema_change_handling": "ALLOW_ALL", - }, -} + } + + +SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR = get_sample_schema_config_for_connector( + table_name="table_name_in_destination_1" +) + +# We change the name of the original example to test the sync and poll materialization method +ALTERED_SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR = get_sample_schema_config_for_connector( + table_name="another_table_name_in_destination_1" +) SAMPLE_SUCCESS_MESSAGE = {"code": "Success", "message": "Operation performed."} @@ -501,3 +515,25 @@ def all_api_mocks_fixture( status=200, ) yield fetch_workspace_data_api_mocks + + +@pytest.fixture(name="sync_and_poll") +def poll_and_sync_fixture(): + with patch("dagster_fivetran.resources.FivetranClient.sync_and_poll") as mocked_function: + # Fivetran output where all sync'd tables match the workspace data that was used to create the assets def + expected_fivetran_output = FivetranOutput( + connector_details=get_sample_connection_details( + succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR + )["data"], + schema_config=SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR["data"], + ) + # Fivetran output where a table is missing and an unexpected table is sync'd, + # compared to the workspace data that was used to create the assets def + unexpected_fivetran_output = FivetranOutput( + connector_details=get_sample_connection_details( + succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR + )["data"], + schema_config=ALTERED_SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR["data"], + ) + mocked_function.side_effect = [expected_fivetran_output, unexpected_fivetran_output] + yield mocked_function diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index 9c4fb9594253b..b4e76ba9e892c 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -1,8 +1,13 @@ +from unittest.mock import MagicMock + import pytest import responses -from dagster import Failure +from dagster import AssetExecutionContext, AssetKey, Failure +from dagster._config.field_utils import EnvVar +from dagster._core.definitions.materialize import materialize +from dagster._core.test_utils import environ from dagster._vendored.dateutil import parser -from dagster_fivetran import FivetranOutput, FivetranWorkspace +from dagster_fivetran import FivetranOutput, FivetranWorkspace, fivetran_assets from dagster_fivetran.translator import MIN_TIME_STR from dagster_fivetran_tests.experimental.conftest import ( @@ -205,3 +210,65 @@ def _mock_interaction(): else: with pytest.raises(Failure, match="failed!"): _mock_interaction() + + +def test_fivetran_materialization( + connector_id: str, + fetch_workspace_data_api_mocks: responses.RequestsMock, + sync_and_poll: MagicMock, +) -> None: + with environ({"FIVETRAN_API_KEY": TEST_API_KEY, "FIVETRAN_API_SECRET": TEST_API_SECRET}): + workspace = FivetranWorkspace( + account_id=TEST_ACCOUNT_ID, + api_key=EnvVar("FIVETRAN_API_KEY"), + api_secret=EnvVar("FIVETRAN_API_SECRET"), + ) + + @fivetran_assets(connector_id=connector_id, workspace=workspace, name=connector_id) + def my_fivetran_assets(context: AssetExecutionContext, fivetran: FivetranWorkspace): + yield from fivetran.sync_and_poll(context=context) + + # Mocked FivetranClient.sync_and_poll returns API response where all connector tables are expected + result = materialize( + [my_fivetran_assets], + resources={"fivetran": workspace}, + ) + assert result.success + asset_materializations = [ + event + for event in result.events_for_node(connector_id) + if event.event_type_value == "ASSET_MATERIALIZATION" + ] + assert len(asset_materializations) == 4 + materialized_asset_keys = { + asset_materialization.asset_key for asset_materialization in asset_materializations + } + assert len(materialized_asset_keys) == 4 + assert my_fivetran_assets.keys == materialized_asset_keys + + # Mocked FivetranClient.sync_and_poll returns API response + # where one expected table is missing and an unexpected table is present + result = materialize( + [my_fivetran_assets], + resources={"fivetran": workspace}, + ) + assert result.success + asset_materializations = [ + event + for event in result.events_for_node(connector_id) + if event.event_type_value == "ASSET_MATERIALIZATION" + ] + assert len(asset_materializations) == 4 + materialized_asset_keys = { + asset_materialization.asset_key for asset_materialization in asset_materializations + } + assert len(materialized_asset_keys) == 4 + assert my_fivetran_assets.keys != materialized_asset_keys + assert ( + AssetKey(["schema_name_in_destination_1", "another_table_name_in_destination_1"]) + in materialized_asset_keys + ) + assert ( + AssetKey(["schema_name_in_destination_1", "table_name_in_destination_1"]) + not in materialized_asset_keys + )