diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index b341f33a9b9f9..6701c6a73169f 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -30,6 +30,7 @@ from dagster._core.definitions.metadata.metadata_set import TableMetadataSet from dagster._core.definitions.metadata.table import TableColumn, TableSchema from dagster._core.definitions.resource_definition import dagster_maintained_resource +from dagster._core.errors import DagsterStepOutputNotFoundError from dagster._core.utils import imap from dagster._record import as_dict, record from dagster._utils.cached_method import cached_method @@ -64,11 +65,13 @@ FIVETRAN_CONNECTOR_PATH = f"{FIVETRAN_CONNECTOR_ENDPOINT}/" DAGSTER_FIVETRAN_FETCH_COLUMN_METADATA_KEY = "dagster-fivetran/fetch_column_metadata" -DEFAULT_MAX_THREADPOOL_WORKERS = 10 +DAGSTER_FIVETRAN_INFER_MISSING_TABLES_METADATA_KEY = "dagster-fivetran/infer_missing_tables" # default polling interval (in seconds) DEFAULT_POLL_INTERVAL = 10 +DEFAULT_MAX_THREADPOOL_WORKERS = 10 + FIVETRAN_RECONSTRUCTION_METADATA_KEY_PREFIX = "dagster-fivetran/reconstruction_metadata" @@ -1038,6 +1041,9 @@ def sync_and_poll( # TODO: Add op tags to fivetran_assets decorator and build_fivetran_assets_definitions factory fetch_column_metadata = context.op.tags.get(DAGSTER_FIVETRAN_FETCH_COLUMN_METADATA_KEY) + infer_missing_tables = context.op.tags.get( + DAGSTER_FIVETRAN_INFER_MISSING_TABLES_METADATA_KEY + ) connector_id = next( check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id) @@ -1085,8 +1091,19 @@ def sync_and_poll( yield materialization unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys - if unmaterialized_asset_keys: - context.log.warning(f"Assets were not materialized: {unmaterialized_asset_keys}") + if infer_missing_tables: + for asset_key in unmaterialized_asset_keys: + yield Output(value=None, output_name=asset_key.to_python_identifier()) + else: + if unmaterialized_asset_keys: + asset_key = next(iter(unmaterialized_asset_keys)) + output_name = "_".join(asset_key.path) + raise DagsterStepOutputNotFoundError( + f"Core compute for {context.op_def.name} did not return an output for" + f' non-optional output "{output_name}".', + step_key=context.get_step_execution_context().step.key, + output_name=output_name, + ) def _fetch_and_attach_col_metadata( self, connector_id: str, materialization: AssetMaterialization