From 5ee51b782d9b3dedd01712bf207f5c5a2469154c Mon Sep 17 00:00:00 2001 From: Maxime Armstrong <46797220+maximearmstrong@users.noreply.github.com> Date: Thu, 26 Dec 2024 12:37:18 -0500 Subject: [PATCH] [14/n][dagster-airbyte] Implement materialization method for AirbyteCloudWorkspace (#26559) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary & Motivation This PR implements `AirbyteCloudWorkspace.sync_and_poll`, the materialization method for Airbyte Cloud assets. This method: - calls `AirbyteCloudClient.sync_and_poll` - takes the AirbyteOutput returned by `AirbyteCloudClient.sync_and_poll` and generates the asset materializations - yields `MaterializeResult` for each expected asset and `AssetMaterialization` for each unexpected asset - a connection table that was not in the connection at definitions loading time can be in the AirbyteOutput. Eg. the table was added after definitions loading time and before sync. - logs a warning for each unmaterialized table - a connection table can be created at definitions loading time, but can be missing in the AirbyteOutput. Eg. the table was deleted after definitions loading time and before sync. Can be leveraged like: ```python from dagster_airbyte import AirbyteCloudWorkspace, airbyte_assets import dagster as dg airbyte_workspace = AirbyteCloudWorkspace( workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"), client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"), client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"), ) @airbyte_assets( connection_id="airbyte_connection_id", name="airbyte_connection_name", group_name="airbyte_connection_name", workspace=airbyte_workspace, ) def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace): yield from airbyte.sync_and_poll(context=context) defs = dg.Definitions( assets=[airbyte_connection_assets], resources={"airbyte": airbyte_workspace}, ) ``` ## How I Tested These Changes Additional tests with BK ## Changelog [dagster-airbyte] Airbyte Cloud assets can now be materialized using the `AirbyteCloudWorkspace.sync_and_poll(…)` method in the definition of a `@airbyte_assets` decorator. --- .../dagster_airbyte/asset_decorator.py | 4 +- .../dagster_airbyte/asset_defs.py | 15 +- .../dagster_airbyte/managed/reconciliation.py | 7 +- .../dagster_airbyte/resources.py | 116 ++++++++++- .../dagster_airbyte/translator.py | 4 +- .../dagster-airbyte/dagster_airbyte/utils.py | 35 +++- .../experimental/conftest.py | 184 +++++++++++------- .../experimental/test_asset_specs.py | 4 +- .../experimental/test_resources.py | 92 ++++++++- .../experimental/test_translator.py | 6 +- 10 files changed, 357 insertions(+), 110 deletions(-) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py index 1baa7bf3c6035..9efe75ebed956 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py @@ -97,6 +97,8 @@ def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: Airbyt resources={"airbyte": airbyte_workspace}, ) """ + dagster_airbyte_translator = dagster_airbyte_translator or DagsterAirbyteTranslator() + return multi_asset( name=name, group_name=group_name, @@ -104,7 +106,7 @@ def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: Airbyt specs=[ spec for spec in workspace.load_asset_specs( - dagster_airbyte_translator=dagster_airbyte_translator or DagsterAirbyteTranslator() + dagster_airbyte_translator=dagster_airbyte_translator ) if AirbyteMetadataSet.extract(spec.metadata).connection_id == connection_id ], diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py index 6d04b782ac063..6f2e2b9caa80a 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py @@ -1,7 +1,6 @@ import hashlib import inspect import os -import re from abc import abstractmethod from functools import partial from itertools import chain @@ -57,6 +56,7 @@ from dagster_airbyte.translator import AirbyteMetadataSet, DagsterAirbyteTranslator from dagster_airbyte.types import AirbyteTableMetadata from dagster_airbyte.utils import ( + clean_name, generate_materializations, generate_table_schema, is_basic_normalization_operation, @@ -471,11 +471,6 @@ def _get_normalization_tables_for_schema( return out -def _clean_name(name: str) -> str: - """Cleans an input to be a valid Dagster asset name.""" - return re.sub(r"[^a-z0-9]+", "_", name.lower()) - - class AirbyteConnectionMetadata( NamedTuple( "_AirbyteConnectionMetadata", @@ -917,7 +912,7 @@ def load_assets_from_airbyte_instance( workspace_id: Optional[str] = None, key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, create_assets_for_normalization_tables: bool = True, - connection_to_group_fn: Optional[Callable[[str], Optional[str]]] = _clean_name, + connection_to_group_fn: Optional[Callable[[str], Optional[str]]] = clean_name, connection_meta_to_group_fn: Optional[ Callable[[AirbyteConnectionMetadata], Optional[str]] ] = None, @@ -1022,7 +1017,7 @@ def load_assets_from_airbyte_instance( check.invariant( not connection_meta_to_group_fn or not connection_to_group_fn - or connection_to_group_fn == _clean_name, + or connection_to_group_fn == clean_name, "Cannot specify both connection_meta_to_group_fn and connection_to_group_fn", ) @@ -1143,8 +1138,8 @@ def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec: @airbyte_assets( connection_id=connection_id, workspace=workspace, - name=_clean_name(connection_name), - group_name=_clean_name(connection_name), + name=clean_name(connection_name), + group_name=clean_name(connection_name), dagster_airbyte_translator=dagster_airbyte_translator, ) def _asset_fn(context: AssetExecutionContext, airbyte: AirbyteCloudWorkspace): diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/managed/reconciliation.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/managed/reconciliation.py index 846bcb34bd9f0..73adebffb370c 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/managed/reconciliation.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/managed/reconciliation.py @@ -36,7 +36,6 @@ from dagster_airbyte.asset_defs import ( AirbyteConnectionMetadata, AirbyteInstanceCacheableAssetsDefinition, - _clean_name, ) from dagster_airbyte.managed.types import ( MANAGED_ELEMENTS_DEPRECATION_MSG, @@ -50,7 +49,7 @@ InitializedAirbyteSource, ) from dagster_airbyte.resources import AirbyteResource -from dagster_airbyte.utils import is_basic_normalization_operation +from dagster_airbyte.utils import clean_name, is_basic_normalization_operation def gen_configured_stream_json( @@ -746,7 +745,7 @@ def load_assets_from_connections( connections: Iterable[AirbyteConnection], key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, create_assets_for_normalization_tables: bool = True, - connection_to_group_fn: Optional[Callable[[str], Optional[str]]] = _clean_name, + connection_to_group_fn: Optional[Callable[[str], Optional[str]]] = clean_name, connection_meta_to_group_fn: Optional[ Callable[[AirbyteConnectionMetadata], Optional[str]] ] = None, @@ -821,7 +820,7 @@ def load_assets_from_connections( check.invariant( not connection_meta_to_group_fn or not connection_to_group_fn - or connection_to_group_fn == _clean_name, + or connection_to_group_fn == clean_name, "Cannot specify both connection_meta_to_group_fn and connection_to_group_fn", ) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py index d6ba0d0ea0308..3bd3e4e36de6e 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py @@ -11,10 +11,12 @@ import requests from dagster import ( AssetExecutionContext, + AssetMaterialization, ConfigurableResource, Definitions, Failure, InitResourceContext, + MaterializeResult, _check as check, get_dagster_logger, resource, @@ -33,13 +35,20 @@ from dagster_airbyte.translator import ( AirbyteConnection, + AirbyteConnectionTableProps, AirbyteDestination, AirbyteJob, AirbyteJobStatusType, + AirbyteMetadataSet, AirbyteWorkspaceData, DagsterAirbyteTranslator, ) from dagster_airbyte.types import AirbyteOutput +from dagster_airbyte.utils import ( + DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY, + get_airbyte_connection_table_name, + get_translator_from_airbyte_assets, +) AIRBYTE_REST_API_BASE = "https://api.airbyte.com" AIRBYTE_REST_API_VERSION = "v1" @@ -1211,8 +1220,90 @@ def load_asset_specs( workspace=self, dagster_airbyte_translator=dagster_airbyte_translator ) + def _generate_materialization( + self, + airbyte_output: AirbyteOutput, + dagster_airbyte_translator: DagsterAirbyteTranslator, + ): + connection = AirbyteConnection.from_connection_details( + connection_details=airbyte_output.connection_details + ) + + for stream in connection.streams.values(): + if stream.selected: + connection_table_name = get_airbyte_connection_table_name( + stream_prefix=connection.stream_prefix, + stream_name=stream.name, + ) + stream_asset_spec = dagster_airbyte_translator.get_asset_spec( + props=AirbyteConnectionTableProps( + table_name=connection_table_name, + stream_prefix=connection.stream_prefix, + stream_name=stream.name, + json_schema=stream.json_schema, + connection_id=connection.id, + connection_name=connection.name, + destination_type=None, + database=None, + schema=None, + ) + ) + + yield AssetMaterialization( + asset_key=stream_asset_spec.key, + description=( + f"Table generated via Airbyte Cloud sync " + f"for connection {connection.name}: {connection_table_name}" + ), + metadata=stream_asset_spec.metadata, + ) + + @experimental def sync_and_poll(self, context: AssetExecutionContext): - raise NotImplementedError() + """Executes a sync and poll process to materialize Airbyte Cloud assets. + This method can only be used in the context of an asset execution. + + Args: + context (AssetExecutionContext): The execution context + from within `@airbyte_assets`. + + Returns: + Iterator[Union[AssetMaterialization, MaterializeResult]]: An iterator of MaterializeResult + or AssetMaterialization. + """ + assets_def = context.assets_def + dagster_airbyte_translator = get_translator_from_airbyte_assets(assets_def) + connection_id = next( + check.not_none(AirbyteMetadataSet.extract(spec.metadata).connection_id) + for spec in assets_def.specs + ) + + client = self.get_client() + airbyte_output = client.sync_and_poll( + connection_id=connection_id, + ) + + materialized_asset_keys = set() + for materialization in self._generate_materialization( + airbyte_output=airbyte_output, dagster_airbyte_translator=dagster_airbyte_translator + ): + # Scan through all tables actually created, if it was expected then emit a MaterializeResult. + # Otherwise, emit a runtime AssetMaterialization. + if materialization.asset_key in context.selected_asset_keys: + yield MaterializeResult( + asset_key=materialization.asset_key, metadata=materialization.metadata + ) + materialized_asset_keys.add(materialization.asset_key) + else: + context.log.warning( + f"An unexpected asset was materialized: {materialization.asset_key}. " + f"Yielding a materialization event." + ) + yield materialization + + unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys + if unmaterialized_asset_keys: + context.log.warning(f"Assets were not materialized: {unmaterialized_asset_keys}") @experimental @@ -1250,16 +1341,23 @@ def load_airbyte_cloud_asset_specs( airbyte_cloud_specs = load_airbyte_cloud_asset_specs(airbyte_cloud_workspace) defs = dg.Definitions(assets=airbyte_cloud_specs) """ + dagster_airbyte_translator = dagster_airbyte_translator or DagsterAirbyteTranslator() + with workspace.process_config_and_initialize_cm() as initialized_workspace: - return check.is_list( - AirbyteCloudWorkspaceDefsLoader( - workspace=initialized_workspace, - translator=dagster_airbyte_translator or DagsterAirbyteTranslator(), + return [ + spec.merge_attributes( + metadata={DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY: dagster_airbyte_translator} ) - .build_defs() - .assets, - AssetSpec, - ) + for spec in check.is_list( + AirbyteCloudWorkspaceDefsLoader( + workspace=initialized_workspace, + translator=dagster_airbyte_translator, + ) + .build_defs() + .assets, + AssetSpec, + ) + ] @record diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/translator.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/translator.py index 1347b1d24af4e..5a5110375d542 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/translator.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/translator.py @@ -41,7 +41,7 @@ class AirbyteConnectionTableProps: json_schema: Mapping[str, Any] connection_id: str connection_name: str - destination_type: str + destination_type: Optional[str] database: Optional[str] schema: Optional[str] @@ -231,5 +231,5 @@ def get_asset_spec(self, props: AirbyteConnectionTableProps) -> AssetSpec: return AssetSpec( key=AssetKey(props.table_name), metadata=metadata, - kinds={"airbyte", props.destination_type}, + kinds={"airbyte", *({props.destination_type} if props.destination_type else set())}, ) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/utils.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/utils.py index 69597953f39bb..e5970f07d0a3b 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/utils.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/utils.py @@ -1,10 +1,26 @@ -from typing import Any, Iterator, Mapping, Optional, Sequence +import re +from typing import TYPE_CHECKING, Any, Iterator, Mapping, Optional, Sequence -from dagster import AssetMaterialization, MetadataValue +from dagster import ( + AssetMaterialization, + AssetsDefinition, + DagsterInvariantViolationError, + MetadataValue, +) from dagster._core.definitions.metadata.table import TableColumn, TableSchema from dagster_airbyte.types import AirbyteOutput +if TYPE_CHECKING: + from dagster_airbyte import DagsterAirbyteTranslator + +DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY = "dagster-airbyte/dagster_airbyte_translator" + + +def clean_name(name: str) -> str: + """Cleans an input to be a valid Dagster asset name.""" + return re.sub(r"[^a-z0-9]+", "_", name.lower()) + def get_airbyte_connection_table_name(stream_prefix: Optional[str], stream_name: str) -> str: return f"{stream_prefix if stream_prefix else ''}{stream_name}" @@ -78,3 +94,18 @@ def generate_materializations( all_stream_stats.get(stream_name, {}), asset_key_prefix=asset_key_prefix, ) + + +def get_translator_from_airbyte_assets( + airbyte_assets: AssetsDefinition, +) -> "DagsterAirbyteTranslator": + metadata_by_key = airbyte_assets.metadata_by_key or {} + first_asset_key = next(iter(airbyte_assets.metadata_by_key.keys())) + first_metadata = metadata_by_key.get(first_asset_key, {}) + dagster_airbyte_translator = first_metadata.get(DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY) + if dagster_airbyte_translator is None: + raise DagsterInvariantViolationError( + f"Expected to find airbyte translator metadata on asset {first_asset_key.to_user_string()}," + " but did not. Did you pass in assets that weren't generated by @airbyte_assets?" + ) + return dagster_airbyte_translator diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py index 3efd9ac841041..a9e8c93c31d2b 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/conftest.py @@ -1,4 +1,5 @@ -from typing import Any, Iterator, Mapping +from typing import Any, Iterator, List, Mapping +from unittest.mock import patch import pytest import responses @@ -9,6 +10,7 @@ AIRBYTE_REST_API_VERSION, ) from dagster_airbyte.translator import AirbyteConnectionTableProps, AirbyteJobStatusType +from dagster_airbyte.types import AirbyteOutput TEST_WORKSPACE_ID = "some_workspace_id" TEST_CLIENT_ID = "some_client_id" @@ -27,6 +29,8 @@ TEST_CONNECTION_NAME = "Postgres To Snowflake" TEST_STREAM_PREFIX = "test_prefix_" TEST_STREAM_NAME = "test_stream" +TEST_ANOTHER_STREAM_NAME = "test_another_stream" +TEST_UNEXPECTED_STREAM_NAME = "test_unexpected_stream" TEST_SELECTED = True TEST_JSON_SCHEMA = {} TEST_JOB_ID = 12345 @@ -72,81 +76,97 @@ } +def get_stream_details(name: str) -> Mapping[str, Any]: + return { + "stream": { + "name": name, + "jsonSchema": TEST_JSON_SCHEMA, + "supportedSyncModes": ["full_refresh"], + "sourceDefinedCursor": False, + "defaultCursorField": ["string"], + "sourceDefinedPrimaryKey": [["string"]], + "namespace": "string", + "isResumable": False, + }, + "config": { + "syncMode": "full_refresh", + "cursorField": ["string"], + "destinationSyncMode": "append", + "primaryKey": [["string"]], + "aliasName": "string", + "selected": TEST_SELECTED, + "suggested": False, + "fieldSelectionEnabled": False, + "selectedFields": [{"fieldPath": ["string"]}], + "hashedFields": [{"fieldPath": ["string"]}], + "mappers": [ + { + "id": "1938d12e-b540-4000-8ff0-46231e18f301", + "type": "hashing", + "mapperConfiguration": {}, + } + ], + "minimumGenerationId": 0, + "generationId": 0, + "syncId": 0, + }, + } + + # Taken from Airbyte Configuration API documentation # https://airbyte-public-api-docs.s3.us-east-2.amazonaws.com/rapidoc-api-docs.html#post-/v1/connections/get # https://github.com/airbytehq/airbyte-platform/blob/v1.0.0/airbyte-api/server-api/src/main/openapi/config.yaml -SAMPLE_CONNECTION_DETAILS = { - "connectionId": TEST_CONNECTION_ID, - "name": TEST_CONNECTION_NAME, - "namespaceDefinition": "source", - "namespaceFormat": "${SOURCE_NAMESPACE}", - "prefix": TEST_STREAM_PREFIX, - "sourceId": "0c31738c-0b2d-4887-b506-e2cd1c39cc35", - "destinationId": TEST_DESTINATION_ID, - "operationIds": ["1938d12e-b540-4000-8c46-1be33f00ab01"], - "syncCatalog": { - "streams": [ - { - "stream": { - "name": TEST_STREAM_NAME, - "jsonSchema": TEST_JSON_SCHEMA, - "supportedSyncModes": ["full_refresh"], - "sourceDefinedCursor": False, - "defaultCursorField": ["string"], - "sourceDefinedPrimaryKey": [["string"]], - "namespace": "string", - "isResumable": False, - }, - "config": { - "syncMode": "full_refresh", - "cursorField": ["string"], - "destinationSyncMode": "append", - "primaryKey": [["string"]], - "aliasName": "string", - "selected": TEST_SELECTED, - "suggested": False, - "fieldSelectionEnabled": False, - "selectedFields": [{"fieldPath": ["string"]}], - "hashedFields": [{"fieldPath": ["string"]}], - "mappers": [ - { - "id": "1938d12e-b540-4000-8ff0-46231e18f301", - "type": "hashing", - "mapperConfiguration": {}, - } - ], - "minimumGenerationId": 0, - "generationId": 0, - "syncId": 0, - }, - } - ] - }, - "schedule": {"units": 0, "timeUnit": "minutes"}, - "scheduleType": "manual", - "scheduleData": { - "basicSchedule": {"timeUnit": "minutes", "units": 0}, - "cron": {"cronExpression": "string", "cronTimeZone": "string"}, - }, - "status": "active", - "resourceRequirements": { - "cpu_request": "string", - "cpu_limit": "string", - "memory_request": "string", - "memory_limit": "string", - "ephemeral_storage_request": "string", - "ephemeral_storage_limit": "string", - }, - "sourceCatalogId": "1938d12e-b540-4000-85a4-7ecc2445a901", - "geography": "auto", - "breakingChange": False, - "notifySchemaChanges": False, - "notifySchemaChangesByEmail": False, - "nonBreakingChangesPreference": "ignore", - "created_at": 0, - "backfillPreference": "enabled", - "workspaceId": "744cc0ed-7f05-4949-9e60-2a814f90c035", -} +def get_connection_details_sample(streams: List[Mapping[str, Any]]) -> Mapping[str, Any]: + return { + "connectionId": TEST_CONNECTION_ID, + "name": TEST_CONNECTION_NAME, + "namespaceDefinition": "source", + "namespaceFormat": "${SOURCE_NAMESPACE}", + "prefix": TEST_STREAM_PREFIX, + "sourceId": "0c31738c-0b2d-4887-b506-e2cd1c39cc35", + "destinationId": TEST_DESTINATION_ID, + "operationIds": ["1938d12e-b540-4000-8c46-1be33f00ab01"], + "syncCatalog": {"streams": streams}, + "schedule": {"units": 0, "timeUnit": "minutes"}, + "scheduleType": "manual", + "scheduleData": { + "basicSchedule": {"timeUnit": "minutes", "units": 0}, + "cron": {"cronExpression": "string", "cronTimeZone": "string"}, + }, + "status": "active", + "resourceRequirements": { + "cpu_request": "string", + "cpu_limit": "string", + "memory_request": "string", + "memory_limit": "string", + "ephemeral_storage_request": "string", + "ephemeral_storage_limit": "string", + }, + "sourceCatalogId": "1938d12e-b540-4000-85a4-7ecc2445a901", + "geography": "auto", + "breakingChange": False, + "notifySchemaChanges": False, + "notifySchemaChangesByEmail": False, + "nonBreakingChangesPreference": "ignore", + "created_at": 0, + "backfillPreference": "enabled", + "workspaceId": "744cc0ed-7f05-4949-9e60-2a814f90c035", + } + + +SAMPLE_CONNECTION_DETAILS = get_connection_details_sample( + streams=[ + get_stream_details(name=TEST_STREAM_NAME), + get_stream_details(name=TEST_ANOTHER_STREAM_NAME), + ] +) + +UNEXPECTED_SAMPLE_CONNECTION_DETAILS = get_connection_details_sample( + streams=[ + get_stream_details(name=TEST_STREAM_NAME), + get_stream_details(name=TEST_UNEXPECTED_STREAM_NAME), + ] +) # Taken from Airbyte REST API documentation @@ -248,3 +268,21 @@ def all_api_mocks_fixture( status=200, ) yield fetch_workspace_data_api_mocks + + +@pytest.fixture(name="airbyte_cloud_sync_and_poll") +def sync_and_poll_fixture(): + with patch("dagster_airbyte.resources.AirbyteCloudClient.sync_and_poll") as mocked_function: + # Airbyte output where all synced tables match the workspace data that was used to create the assets def + expected_airbyte_output = AirbyteOutput( + connection_details=SAMPLE_CONNECTION_DETAILS, + job_details=get_job_details_sample(status=AirbyteJobStatusType.SUCCEEDED), + ) + # Airbyte output where a table is missing and an unexpected table is synced, + # compared to the workspace data that was used to create the assets def + unexpected_airbyte_output = AirbyteOutput( + connection_details=UNEXPECTED_SAMPLE_CONNECTION_DETAILS, + job_details=get_job_details_sample(status=AirbyteJobStatusType.SUCCEEDED), + ) + mocked_function.side_effect = [expected_airbyte_output, unexpected_airbyte_output] + yield mocked_function diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_asset_specs.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_asset_specs.py index f0fc78a6b71cb..baa158a0973ac 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_asset_specs.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_asset_specs.py @@ -54,8 +54,8 @@ def test_translator_spec( all_assets_keys = [asset.key for asset in all_assets] # 1 table for the connection - assert len(all_assets) == 1 - assert len(all_assets_keys) == 1 + assert len(all_assets) == 2 + assert len(all_assets_keys) == 2 # Test the asset key for the connection table the_asset_key = next(iter(all_assets_keys)) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py index 30e88ec95be59..82fde4a0a2f0c 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_resources.py @@ -1,12 +1,15 @@ import json +import re from datetime import datetime from typing import Optional -from unittest import mock +from unittest.mock import MagicMock, patch import pytest import responses -from dagster import Failure -from dagster_airbyte import AirbyteCloudWorkspace +from dagster import AssetExecutionContext, AssetKey, EnvVar, Failure, materialize +from dagster._core.events import DagsterEventType +from dagster._core.test_utils import environ +from dagster_airbyte import AirbyteCloudWorkspace, airbyte_assets from dagster_airbyte.resources import ( AIRBYTE_CONFIGURATION_API_BASE, AIRBYTE_CONFIGURATION_API_VERSION, @@ -15,15 +18,19 @@ ) from dagster_airbyte.translator import AirbyteJobStatusType from dagster_airbyte.types import AirbyteOutput +from dagster_airbyte.utils import clean_name from dagster_airbyte_tests.experimental.conftest import ( SAMPLE_CONNECTION_DETAILS, TEST_ACCESS_TOKEN, + TEST_ANOTHER_STREAM_NAME, TEST_CLIENT_ID, TEST_CLIENT_SECRET, TEST_CONNECTION_ID, TEST_DESTINATION_ID, TEST_JOB_ID, + TEST_STREAM_PREFIX, + TEST_UNEXPECTED_STREAM_NAME, TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE, TEST_WORKSPACE_ID, get_job_details_sample, @@ -95,7 +102,7 @@ def test_refresh_access_token(base_api_mocks: responses.RequestsMock) -> None: test_time_first_call = datetime(2024, 1, 1, 0, 0, 0) test_time_before_expiration = datetime(2024, 1, 1, 0, 2, 0) test_time_after_expiration = datetime(2024, 1, 1, 0, 3, 0) - with mock.patch("dagster_airbyte.resources.datetime", wraps=datetime) as dt: + with patch("dagster_airbyte.resources.datetime", wraps=datetime) as dt: # Test first call, must get the access token before calling the jobs api dt.now.return_value = test_time_first_call client._make_request(method="GET", endpoint="test", base_url=client.rest_api_base_url) # noqa @@ -365,3 +372,80 @@ def test_airbyte_sync_and_poll_client_cancel_on_termination( assert_rest_api_call( call=base_api_mocks.calls[-1], endpoint=test_job_endpoint, method=last_call_method ) + + +def test_fivetran_airbyte_cloud_sync_and_poll_materialization_method( + fetch_workspace_data_api_mocks: responses.RequestsMock, + airbyte_cloud_sync_and_poll: MagicMock, + capsys: pytest.CaptureFixture, +) -> None: + with environ( + {"AIRBYTE_CLIENT_ID": TEST_CLIENT_ID, "AIRBYTE_CLIENT_SECRET": TEST_CLIENT_SECRET} + ): + workspace = AirbyteCloudWorkspace( + workspace_id=TEST_WORKSPACE_ID, + client_id=EnvVar("AIRBYTE_CLIENT_ID"), + client_secret=EnvVar("AIRBYTE_CLIENT_SECRET"), + ) + + cleaned_connection_id = clean_name(TEST_CONNECTION_ID) + + @airbyte_assets( + connection_id=TEST_CONNECTION_ID, workspace=workspace, name=cleaned_connection_id + ) + def my_airbyte_assets(context: AssetExecutionContext, airbyte: AirbyteCloudWorkspace): + yield from airbyte.sync_and_poll(context=context) + + # Mocked AirbyteCloudClient.sync_and_poll returns API response where all connection tables are expected + result = materialize( + [my_airbyte_assets], + resources={"airbyte": workspace}, + ) + assert result.success + asset_materializations = [ + event + for event in result.events_for_node(cleaned_connection_id) + if event.event_type_value == DagsterEventType.ASSET_MATERIALIZATION + ] + assert len(asset_materializations) == 2 + materialized_asset_keys = { + asset_materialization.asset_key for asset_materialization in asset_materializations + } + assert len(materialized_asset_keys) == 2 + assert my_airbyte_assets.keys == materialized_asset_keys + + # Mocked FivetranClient.sync_and_poll returns API response + # where one expected table is missing and an unexpected table is present + result = materialize( + [my_airbyte_assets], + resources={"airbyte": workspace}, + ) + + assert result.success + asset_materializations = [ + event + for event in result.events_for_node(cleaned_connection_id) + if event.event_type_value == DagsterEventType.ASSET_MATERIALIZATION + ] + assert len(asset_materializations) == 2 + materialized_asset_keys = { + asset_materialization.asset_key for asset_materialization in asset_materializations + } + assert len(materialized_asset_keys) == 2 + assert my_airbyte_assets.keys != materialized_asset_keys + assert ( + AssetKey([f"{TEST_STREAM_PREFIX}{TEST_UNEXPECTED_STREAM_NAME}"]) + in materialized_asset_keys + ) + assert ( + AssetKey([f"{TEST_STREAM_PREFIX}{TEST_ANOTHER_STREAM_NAME}"]) + not in materialized_asset_keys + ) + + captured = capsys.readouterr() + assert re.search( + r"dagster - WARNING - (?s:.)+ - An unexpected asset was materialized", captured.err + ) + assert re.search( + r"dagster - WARNING - (?s:.)+ - Assets were not materialized", captured.err + ) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_translator.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_translator.py index 70996ec48eb42..5d6313e3f2fcc 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_translator.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/experimental/test_translator.py @@ -34,7 +34,7 @@ def test_airbyte_workspace_data_to_table_props( table_props_data = ( resource.fetch_airbyte_workspace_data().to_airbyte_connection_table_props_data() ) - assert len(table_props_data) == 1 + assert len(table_props_data) == 2 first_table_props = next(iter(table_props_data)) assert first_table_props == TEST_AIRBYTE_CONNECTION_TABLE_PROPS @@ -51,7 +51,7 @@ def test_translator_asset_spec( table_props_data = ( resource.fetch_airbyte_workspace_data().to_airbyte_connection_table_props_data() ) - assert len(table_props_data) == 1 + assert len(table_props_data) == 2 first_table_props = next(iter(table_props_data)) translator = DagsterAirbyteTranslator() @@ -93,7 +93,7 @@ def test_custom_translator( table_props_data = ( resource.fetch_airbyte_workspace_data().to_airbyte_connection_table_props_data() ) - assert len(table_props_data) == 1 + assert len(table_props_data) == 2 first_table_props = next(iter(table_props_data)) translator = MyCustomTranslator()