From 82e6c350962ef785c93460d0a64f612ce71f4c65 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Tue, 17 Dec 2024 19:47:49 -0500 Subject: [PATCH] [14/n][dagster-airbyte] Implement AirbyteCloudWorkspace.sync_and_poll --- .../dagster_airbyte/asset_decorator.py | 11 +- .../dagster_airbyte/asset_defs.py | 15 +- .../dagster_airbyte/managed/reconciliation.py | 7 +- .../dagster_airbyte/resources.py | 93 ++++++++- .../dagster_airbyte/translator.py | 4 +- .../dagster-airbyte/dagster_airbyte/utils.py | 35 +++- .../experimental/conftest.py | 184 +++++++++++------- .../experimental/test_asset_specs.py | 4 +- .../experimental/test_resources.py | 91 ++++++++- .../experimental/test_translator.py | 6 +- 10 files changed, 345 insertions(+), 105 deletions(-) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py index d03ef85395124..d0b576572268f 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py @@ -5,6 +5,7 @@ from dagster_airbyte.resources import AirbyteCloudWorkspace from dagster_airbyte.translator import AirbyteMetadataSet, DagsterAirbyteTranslator +from dagster_airbyte.utils import DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY @experimental @@ -101,14 +102,18 @@ def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: Airbyt resources={"airbyte": airbyte_workspace}, ) """ + dagster_airbyte_translator = dagster_airbyte_translator or DagsterAirbyteTranslator() + return multi_asset( name=name, group_name=group_name, - can_subset=False, + can_subset=True, specs=[ - spec + spec.merge_attributes( + metadata={DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY: dagster_airbyte_translator} + ) for spec in workspace.load_asset_specs( - dagster_airbyte_translator=dagster_airbyte_translator or DagsterAirbyteTranslator() + dagster_airbyte_translator=dagster_airbyte_translator ) if AirbyteMetadataSet.extract(spec.metadata).connection_id == connection_id ], diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py index fe55c2c4f3193..a977dde5a56dd 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py @@ -1,7 +1,6 @@ import hashlib import inspect import os -import re from abc import abstractmethod from functools import partial from itertools import chain @@ -57,6 +56,7 @@ from dagster_airbyte.translator import AirbyteMetadataSet, DagsterAirbyteTranslator from dagster_airbyte.types import AirbyteTableMetadata from dagster_airbyte.utils import ( + clean_name, generate_materializations, generate_table_schema, is_basic_normalization_operation, @@ -471,11 +471,6 @@ def _get_normalization_tables_for_schema( return out -def _clean_name(name: str) -> str: - """Cleans an input to be a valid Dagster asset name.""" - return re.sub(r"[^a-z0-9]+", "_", name.lower()) - - class AirbyteConnectionMetadata( NamedTuple( "_AirbyteConnectionMetadata", @@ -917,7 +912,7 @@ def load_assets_from_airbyte_instance( workspace_id: Optional[str] = None, key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, create_assets_for_normalization_tables: bool = True, - connection_to_group_fn: Optional[Callable[[str], Optional[str]]] = _clean_name, + connection_to_group_fn: Optional[Callable[[str], Optional[str]]] = clean_name, connection_meta_to_group_fn: Optional[ Callable[[AirbyteConnectionMetadata], Optional[str]] ] = None, @@ -1022,7 +1017,7 @@ def load_assets_from_airbyte_instance( check.invariant( not connection_meta_to_group_fn or not connection_to_group_fn - or connection_to_group_fn == _clean_name, + or connection_to_group_fn == clean_name, "Cannot specify both connection_meta_to_group_fn and connection_to_group_fn", ) @@ -1140,8 +1135,8 @@ def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec: @airbyte_assets( connection_id=connection_id, workspace=workspace, - name=_clean_name(connection_id), - group_name=_clean_name(connection_id), + name=clean_name(connection_id), + group_name=clean_name(connection_id), dagster_airbyte_translator=dagster_airbyte_translator, ) def _asset_fn(context: AssetExecutionContext, airbyte: AirbyteCloudWorkspace): diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/managed/reconciliation.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/managed/reconciliation.py index 846bcb34bd9f0..e8f26c16c916b 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/managed/reconciliation.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/managed/reconciliation.py @@ -36,7 +36,6 @@ from dagster_airbyte.asset_defs import ( AirbyteConnectionMetadata, AirbyteInstanceCacheableAssetsDefinition, - _clean_name, ) from dagster_airbyte.managed.types import ( MANAGED_ELEMENTS_DEPRECATION_MSG, @@ -50,7 +49,7 @@ InitializedAirbyteSource, ) from dagster_airbyte.resources import AirbyteResource -from dagster_airbyte.utils import is_basic_normalization_operation +from dagster_airbyte.utils import is_basic_normalization_operation, clean_name def gen_configured_stream_json( @@ -746,7 +745,7 @@ def load_assets_from_connections( connections: Iterable[AirbyteConnection], key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, create_assets_for_normalization_tables: bool = True, - connection_to_group_fn: Optional[Callable[[str], Optional[str]]] = _clean_name, + connection_to_group_fn: Optional[Callable[[str], Optional[str]]] = clean_name, connection_meta_to_group_fn: Optional[ Callable[[AirbyteConnectionMetadata], Optional[str]] ] = None, @@ -821,7 +820,7 @@ def load_assets_from_connections( check.invariant( not connection_meta_to_group_fn or not connection_to_group_fn - or connection_to_group_fn == _clean_name, + or connection_to_group_fn == clean_name, "Cannot specify both connection_meta_to_group_fn and connection_to_group_fn", ) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py index ea6730cbe046e..ffbd1d1f4fd26 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py @@ -11,10 +11,12 @@ import requests from dagster import ( AssetExecutionContext, + AssetMaterialization, ConfigurableResource, Definitions, Failure, InitResourceContext, + MaterializeResult, OpExecutionContext, _check as check, get_dagster_logger, @@ -34,13 +36,19 @@ from dagster_airbyte.translator import ( AirbyteConnection, + AirbyteConnectionTableProps, AirbyteDestination, AirbyteJob, AirbyteJobStatusType, + AirbyteMetadataSet, AirbyteWorkspaceData, DagsterAirbyteTranslator, ) from dagster_airbyte.types import AirbyteOutput +from dagster_airbyte.utils import ( + get_airbyte_connection_table_name, + get_translator_from_airbyte_assets, +) AIRBYTE_REST_API_BASE = "https://api.airbyte.com" AIRBYTE_REST_API_VERSION = "v1" @@ -1212,10 +1220,91 @@ def load_asset_specs( workspace=self, dagster_airbyte_translator=dagster_airbyte_translator ) + def _generate_materialization( + self, + airbyte_output: AirbyteOutput, + dagster_airbyte_translator: DagsterAirbyteTranslator, + ): + connection = AirbyteConnection.from_connection_details( + connection_details=airbyte_output.connection_details + ) + + for stream in connection.streams.values(): + if stream.selected: + connection_table_name = get_airbyte_connection_table_name( + stream_prefix=connection.stream_prefix, + stream_name=stream.name, + ) + stream_asset_spec = dagster_airbyte_translator.get_asset_spec( + props=AirbyteConnectionTableProps( + table_name=connection_table_name, + stream_prefix=connection.stream_prefix, + stream_name=stream.name, + json_schema=stream.json_schema, + connection_id=connection.id, + connection_name=connection.name, + destination_type=None, + database=None, + schema=None, + ) + ) + + yield AssetMaterialization( + asset_key=stream_asset_spec.key, + description=( + f"Table generated via Airbyte Cloud sync " + f"for connection {connection.name}: {connection_table_name}" + ), + metadata=stream_asset_spec.metadata, + ) + def sync_and_poll( - self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None + self, context: Union[OpExecutionContext, AssetExecutionContext] ): - raise NotImplementedError() + """Executes a sync and poll process to materialize Airbyte Cloud assets. + + Args: + context (Union[OpExecutionContext, AssetExecutionContext]): The execution context + from within `@airbyte_assets`. If an AssetExecutionContext is passed, + its underlying OpExecutionContext will be used. + + Returns: + Iterator[Union[AssetMaterialization, MaterializeResult]]: An iterator of MaterializeResult + or AssetMaterialization. + """ + assets_def = context.assets_def + dagster_airbyte_translator = get_translator_from_airbyte_assets(assets_def) + connection_id = next( + check.not_none(AirbyteMetadataSet.extract(spec.metadata).connection_id) + for spec in assets_def.specs + ) + + client = self.get_client() + airbyte_output = client.sync_and_poll( + connection_id=connection_id, + ) + + materialized_asset_keys = set() + for materialization in self._generate_materialization( + airbyte_output=airbyte_output, dagster_airbyte_translator=dagster_airbyte_translator + ): + # Scan through all tables actually created, if it was expected then emit a MaterializeResult. + # Otherwise, emit a runtime AssetMaterialization. + if materialization.asset_key in context.selected_asset_keys: + yield MaterializeResult( + asset_key=materialization.asset_key, metadata=materialization.metadata + ) + materialized_asset_keys.add(materialization.asset_key) + else: + context.log.warning( + f"An unexpected asset was materialized: {materialization.asset_key}. " + f"Yielding a materialization event." + ) + yield materialization + + unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys + if unmaterialized_asset_keys: + context.log.warning(f"Assets were not materialized: {unmaterialized_asset_keys}") @experimental diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/translator.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/translator.py index 1347b1d24af4e..5a5110375d542 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/translator.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/translator.py @@ -41,7 +41,7 @@ class AirbyteConnectionTableProps: json_schema: Mapping[str, Any] connection_id: str connection_name: str - destination_type: str + destination_type: Optional[str] database: Optional[str] schema: Optional[str] @@ -231,5 +231,5 @@ def get_asset_spec(self, props: AirbyteConnectionTableProps) -> AssetSpec: return AssetSpec( key=AssetKey(props.table_name), metadata=metadata, - kinds={"airbyte", props.destination_type}, + kinds={"airbyte", *({props.destination_type} if props.destination_type else set())}, ) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/utils.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/utils.py index 69597953f39bb..e5970f07d0a3b 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/utils.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/utils.py @@ -1,10 +1,26 @@ -from typing import Any, Iterator, Mapping, Optional, Sequence +import re +from typing import TYPE_CHECKING, Any, Iterator, Mapping, Optional, Sequence -from dagster import AssetMaterialization, MetadataValue +from dagster import ( + AssetMaterialization, + AssetsDefinition, + DagsterInvariantViolationError, + MetadataValue, +) from dagster._core.definitions.metadata.table import TableColumn, TableSchema from dagster_airbyte.types import AirbyteOutput +if TYPE_CHECKING: + from dagster_airbyte import DagsterAirbyteTranslator + +DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY = "dagster-airbyte/dagster_airbyte_translator" + + +def clean_name(name: str) -> str: + """Cleans an input to be a valid Dagster asset name.""" + return re.sub(r"[^a-z0-9]+", "_", name.lower()) + def get_airbyte_connection_table_name(stream_prefix: Optional[str], stream_name: str) -> str: return f"{stream_prefix if stream_prefix else ''}{stream_name}" @@ -78,3 +94,18 @@ def generate_materializations( all_stream_stats.get(stream_name, {}), asset_key_prefix=asset_key_prefix, ) + + +def get_translator_from_airbyte_assets( + airbyte_assets: AssetsDefinition, +) -> "DagsterAirbyteTranslator": + metadata_by_key = airbyte_assets.metadata_by_key or {} + first_asset_key = next(iter(airbyte_assets.metadata_by_key.keys())) + first_metadata = metadata_by_key.get(first_asset_key, {}) + dagster_airbyte_translator = first_metadata.get(DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY) + if dagster_airbyte_translator is None: + raise DagsterInvariantViolationError( + f"Expected to find airbyte translator metadata on asset {first_asset_key.to_user_string()}," + " but did not. Did you pass in assets that weren't generated by @airbyte_assets?" + ) + return dagster_airbyte_translator diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py index 3efd9ac841041..b70a2736e6be3 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py @@ -1,4 +1,5 @@ -from typing import Any, Iterator, Mapping +from typing import Any, Iterator, List, Mapping +from unittest.mock import patch import pytest import responses @@ -9,6 +10,7 @@ AIRBYTE_REST_API_VERSION, ) from dagster_airbyte.translator import AirbyteConnectionTableProps, AirbyteJobStatusType +from dagster_airbyte.types import AirbyteOutput TEST_WORKSPACE_ID = "some_workspace_id" TEST_CLIENT_ID = "some_client_id" @@ -27,6 +29,8 @@ TEST_CONNECTION_NAME = "Postgres To Snowflake" TEST_STREAM_PREFIX = "test_prefix_" TEST_STREAM_NAME = "test_stream" +TEST_ANOTHER_STREAM_NAME = "test_another_stream" +TEST_UNEXPECTED_STREAM_NAME = "test_unexpected_stream" TEST_SELECTED = True TEST_JSON_SCHEMA = {} TEST_JOB_ID = 12345 @@ -72,81 +76,97 @@ } +def get_stream_details(name: str) -> Mapping[str, Any]: + return { + "stream": { + "name": name, + "jsonSchema": TEST_JSON_SCHEMA, + "supportedSyncModes": ["full_refresh"], + "sourceDefinedCursor": False, + "defaultCursorField": ["string"], + "sourceDefinedPrimaryKey": [["string"]], + "namespace": "string", + "isResumable": False, + }, + "config": { + "syncMode": "full_refresh", + "cursorField": ["string"], + "destinationSyncMode": "append", + "primaryKey": [["string"]], + "aliasName": "string", + "selected": TEST_SELECTED, + "suggested": False, + "fieldSelectionEnabled": False, + "selectedFields": [{"fieldPath": ["string"]}], + "hashedFields": [{"fieldPath": ["string"]}], + "mappers": [ + { + "id": "1938d12e-b540-4000-8ff0-46231e18f301", + "type": "hashing", + "mapperConfiguration": {}, + } + ], + "minimumGenerationId": 0, + "generationId": 0, + "syncId": 0, + }, + } + + # Taken from Airbyte Configuration API documentation # https://airbyte-public-api-docs.s3.us-east-2.amazonaws.com/rapidoc-api-docs.html#post-/v1/connections/get # https://github.com/airbytehq/airbyte-platform/blob/v1.0.0/airbyte-api/server-api/src/main/openapi/config.yaml -SAMPLE_CONNECTION_DETAILS = { - "connectionId": TEST_CONNECTION_ID, - "name": TEST_CONNECTION_NAME, - "namespaceDefinition": "source", - "namespaceFormat": "${SOURCE_NAMESPACE}", - "prefix": TEST_STREAM_PREFIX, - "sourceId": "0c31738c-0b2d-4887-b506-e2cd1c39cc35", - "destinationId": TEST_DESTINATION_ID, - "operationIds": ["1938d12e-b540-4000-8c46-1be33f00ab01"], - "syncCatalog": { - "streams": [ - { - "stream": { - "name": TEST_STREAM_NAME, - "jsonSchema": TEST_JSON_SCHEMA, - "supportedSyncModes": ["full_refresh"], - "sourceDefinedCursor": False, - "defaultCursorField": ["string"], - "sourceDefinedPrimaryKey": [["string"]], - "namespace": "string", - "isResumable": False, - }, - "config": { - "syncMode": "full_refresh", - "cursorField": ["string"], - "destinationSyncMode": "append", - "primaryKey": [["string"]], - "aliasName": "string", - "selected": TEST_SELECTED, - "suggested": False, - "fieldSelectionEnabled": False, - "selectedFields": [{"fieldPath": ["string"]}], - "hashedFields": [{"fieldPath": ["string"]}], - "mappers": [ - { - "id": "1938d12e-b540-4000-8ff0-46231e18f301", - "type": "hashing", - "mapperConfiguration": {}, - } - ], - "minimumGenerationId": 0, - "generationId": 0, - "syncId": 0, - }, - } - ] - }, - "schedule": {"units": 0, "timeUnit": "minutes"}, - "scheduleType": "manual", - "scheduleData": { - "basicSchedule": {"timeUnit": "minutes", "units": 0}, - "cron": {"cronExpression": "string", "cronTimeZone": "string"}, - }, - "status": "active", - "resourceRequirements": { - "cpu_request": "string", - "cpu_limit": "string", - "memory_request": "string", - "memory_limit": "string", - "ephemeral_storage_request": "string", - "ephemeral_storage_limit": "string", - }, - "sourceCatalogId": "1938d12e-b540-4000-85a4-7ecc2445a901", - "geography": "auto", - "breakingChange": False, - "notifySchemaChanges": False, - "notifySchemaChangesByEmail": False, - "nonBreakingChangesPreference": "ignore", - "created_at": 0, - "backfillPreference": "enabled", - "workspaceId": "744cc0ed-7f05-4949-9e60-2a814f90c035", -} +def get_connection_details_sample(streams: List[Mapping[str, Any]]) -> Mapping[str, Any]: + return { + "connectionId": TEST_CONNECTION_ID, + "name": TEST_CONNECTION_NAME, + "namespaceDefinition": "source", + "namespaceFormat": "${SOURCE_NAMESPACE}", + "prefix": TEST_STREAM_PREFIX, + "sourceId": "0c31738c-0b2d-4887-b506-e2cd1c39cc35", + "destinationId": TEST_DESTINATION_ID, + "operationIds": ["1938d12e-b540-4000-8c46-1be33f00ab01"], + "syncCatalog": {"streams": streams}, + "schedule": {"units": 0, "timeUnit": "minutes"}, + "scheduleType": "manual", + "scheduleData": { + "basicSchedule": {"timeUnit": "minutes", "units": 0}, + "cron": {"cronExpression": "string", "cronTimeZone": "string"}, + }, + "status": "active", + "resourceRequirements": { + "cpu_request": "string", + "cpu_limit": "string", + "memory_request": "string", + "memory_limit": "string", + "ephemeral_storage_request": "string", + "ephemeral_storage_limit": "string", + }, + "sourceCatalogId": "1938d12e-b540-4000-85a4-7ecc2445a901", + "geography": "auto", + "breakingChange": False, + "notifySchemaChanges": False, + "notifySchemaChangesByEmail": False, + "nonBreakingChangesPreference": "ignore", + "created_at": 0, + "backfillPreference": "enabled", + "workspaceId": "744cc0ed-7f05-4949-9e60-2a814f90c035", + } + + +SAMPLE_CONNECTION_DETAILS = get_connection_details_sample( + streams=[ + get_stream_details(name=TEST_STREAM_NAME), + get_stream_details(name=TEST_ANOTHER_STREAM_NAME), + ] +) + +UNEXPECTED_SAMPLE_CONNECTION_DETAILS = get_connection_details_sample( + streams=[ + get_stream_details(name=TEST_STREAM_NAME), + get_stream_details(name=TEST_UNEXPECTED_STREAM_NAME), + ] +) # Taken from Airbyte REST API documentation @@ -248,3 +268,21 @@ def all_api_mocks_fixture( status=200, ) yield fetch_workspace_data_api_mocks + + +@pytest.fixture(name="airbyte_cloud_sync_and_poll") +def sync_and_poll_fixture(): + with patch("dagster_airbyte.resources.AirbyteCloudClient.sync_and_poll") as mocked_function: + # Airbyte output where all sync'd tables match the workspace data that was used to create the assets def + expected_airbyte_output = AirbyteOutput( + connection_details=SAMPLE_CONNECTION_DETAILS, + job_details=get_job_details_sample(status=AirbyteJobStatusType.SUCCEEDED), + ) + # Airbyte output where a table is missing and an unexpected table is sync'd, + # compared to the workspace data that was used to create the assets def + unexpected_airbyte_output = AirbyteOutput( + connection_details=UNEXPECTED_SAMPLE_CONNECTION_DETAILS, + job_details=get_job_details_sample(status=AirbyteJobStatusType.SUCCEEDED), + ) + mocked_function.side_effect = [expected_airbyte_output, unexpected_airbyte_output] + yield mocked_function diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_asset_specs.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_asset_specs.py index dc29adc0eb2ac..0c6f32ee5a858 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_asset_specs.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_asset_specs.py @@ -54,8 +54,8 @@ def test_translator_spec( all_assets_keys = [asset.key for asset in all_assets] # 1 table for the connection - assert len(all_assets) == 1 - assert len(all_assets_keys) == 1 + assert len(all_assets) == 2 + assert len(all_assets_keys) == 2 # Test the asset key for the connection table the_asset_key = next(iter(all_assets_keys)) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py index 30e88ec95be59..d3378be34e07c 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py @@ -1,12 +1,14 @@ import json +import re from datetime import datetime from typing import Optional -from unittest import mock +from unittest.mock import MagicMock, patch import pytest import responses -from dagster import Failure -from dagster_airbyte import AirbyteCloudWorkspace +from dagster import AssetExecutionContext, AssetKey, EnvVar, Failure, materialize +from dagster._core.test_utils import environ +from dagster_airbyte import AirbyteCloudWorkspace, airbyte_assets from dagster_airbyte.resources import ( AIRBYTE_CONFIGURATION_API_BASE, AIRBYTE_CONFIGURATION_API_VERSION, @@ -15,15 +17,19 @@ ) from dagster_airbyte.translator import AirbyteJobStatusType from dagster_airbyte.types import AirbyteOutput +from dagster_airbyte.utils import clean_name from dagster_airbyte_tests.experimental.conftest import ( SAMPLE_CONNECTION_DETAILS, TEST_ACCESS_TOKEN, + TEST_ANOTHER_STREAM_NAME, TEST_CLIENT_ID, TEST_CLIENT_SECRET, TEST_CONNECTION_ID, TEST_DESTINATION_ID, TEST_JOB_ID, + TEST_STREAM_PREFIX, + TEST_UNEXPECTED_STREAM_NAME, TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE, TEST_WORKSPACE_ID, get_job_details_sample, @@ -95,7 +101,7 @@ def test_refresh_access_token(base_api_mocks: responses.RequestsMock) -> None: test_time_first_call = datetime(2024, 1, 1, 0, 0, 0) test_time_before_expiration = datetime(2024, 1, 1, 0, 2, 0) test_time_after_expiration = datetime(2024, 1, 1, 0, 3, 0) - with mock.patch("dagster_airbyte.resources.datetime", wraps=datetime) as dt: + with patch("dagster_airbyte.resources.datetime", wraps=datetime) as dt: # Test first call, must get the access token before calling the jobs api dt.now.return_value = test_time_first_call client._make_request(method="GET", endpoint="test", base_url=client.rest_api_base_url) # noqa @@ -365,3 +371,80 @@ def test_airbyte_sync_and_poll_client_cancel_on_termination( assert_rest_api_call( call=base_api_mocks.calls[-1], endpoint=test_job_endpoint, method=last_call_method ) + + +def test_fivetran_airbyte_cloud_sync_and_poll_materialization_method( + fetch_workspace_data_api_mocks: responses.RequestsMock, + airbyte_cloud_sync_and_poll: MagicMock, + capsys: pytest.CaptureFixture, +) -> None: + with environ( + {"AIRBYTE_CLIENT_ID": TEST_CLIENT_ID, "AIRBYTE_CLIENT_SECRET": TEST_CLIENT_SECRET} + ): + workspace = AirbyteCloudWorkspace( + workspace_id=TEST_WORKSPACE_ID, + client_id=EnvVar("AIRBYTE_CLIENT_ID"), + client_secret=EnvVar("AIRBYTE_CLIENT_SECRET"), + ) + + cleaned_connection_id = clean_name(TEST_CONNECTION_ID) + + @airbyte_assets( + connection_id=TEST_CONNECTION_ID, workspace=workspace, name=cleaned_connection_id + ) + def my_airbyte_assets(context: AssetExecutionContext, airbyte: AirbyteCloudWorkspace): + yield from airbyte.sync_and_poll(context=context) + + # Mocked AirbyteCloudClient.sync_and_poll returns API response where all connection tables are expected + result = materialize( + [my_airbyte_assets], + resources={"airbyte": workspace}, + ) + assert result.success + asset_materializations = [ + event + for event in result.events_for_node(cleaned_connection_id) + if event.event_type_value == "ASSET_MATERIALIZATION" + ] + assert len(asset_materializations) == 2 + materialized_asset_keys = { + asset_materialization.asset_key for asset_materialization in asset_materializations + } + assert len(materialized_asset_keys) == 2 + assert my_airbyte_assets.keys == materialized_asset_keys + + # Mocked FivetranClient.sync_and_poll returns API response + # where one expected table is missing and an unexpected table is present + result = materialize( + [my_airbyte_assets], + resources={"airbyte": workspace}, + ) + + assert result.success + asset_materializations = [ + event + for event in result.events_for_node(cleaned_connection_id) + if event.event_type_value == "ASSET_MATERIALIZATION" + ] + assert len(asset_materializations) == 2 + materialized_asset_keys = { + asset_materialization.asset_key for asset_materialization in asset_materializations + } + assert len(materialized_asset_keys) == 2 + assert my_airbyte_assets.keys != materialized_asset_keys + assert ( + AssetKey([f"{TEST_STREAM_PREFIX}{TEST_UNEXPECTED_STREAM_NAME}"]) + in materialized_asset_keys + ) + assert ( + AssetKey([f"{TEST_STREAM_PREFIX}{TEST_ANOTHER_STREAM_NAME}"]) + not in materialized_asset_keys + ) + + captured = capsys.readouterr() + assert re.search( + r"dagster - WARNING - (?s:.)+ - An unexpected asset was materialized", captured.err + ) + assert re.search( + r"dagster - WARNING - (?s:.)+ - Assets were not materialized", captured.err + ) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_translator.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_translator.py index 70996ec48eb42..5d6313e3f2fcc 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_translator.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_translator.py @@ -34,7 +34,7 @@ def test_airbyte_workspace_data_to_table_props( table_props_data = ( resource.fetch_airbyte_workspace_data().to_airbyte_connection_table_props_data() ) - assert len(table_props_data) == 1 + assert len(table_props_data) == 2 first_table_props = next(iter(table_props_data)) assert first_table_props == TEST_AIRBYTE_CONNECTION_TABLE_PROPS @@ -51,7 +51,7 @@ def test_translator_asset_spec( table_props_data = ( resource.fetch_airbyte_workspace_data().to_airbyte_connection_table_props_data() ) - assert len(table_props_data) == 1 + assert len(table_props_data) == 2 first_table_props = next(iter(table_props_data)) translator = DagsterAirbyteTranslator() @@ -93,7 +93,7 @@ def test_custom_translator( table_props_data = ( resource.fetch_airbyte_workspace_data().to_airbyte_connection_table_props_data() ) - assert len(table_props_data) == 1 + assert len(table_props_data) == 2 first_table_props = next(iter(table_props_data)) translator = MyCustomTranslator()