diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 82b36a45ad1bd..1e1b5c95b2332 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -21,12 +21,15 @@ _check as check, get_dagster_logger, resource, + Output ) from dagster._annotations import experimental from dagster._config.pythonic_config import ConfigurableResource from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader from dagster._core.definitions.resource_definition import dagster_maintained_resource +from dagster._core.errors import DagsterStepOutputNotFoundError +from dagster._core.utils import imap from dagster._record import as_dict, record from dagster._utils.cached_method import cached_method from dagster._vendored.dateutil import parser @@ -60,6 +63,9 @@ FIVETRAN_API_VERSION_PATH = f"{FIVETRAN_API_VERSION}/" FIVETRAN_CONNECTOR_PATH = f"{FIVETRAN_CONNECTOR_ENDPOINT}/" +DEFAULT_MAX_THREADPOOL_WORKERS = 10 +DAGSTER_FIVETRAN_INFER_MISSING_TABLES_METADATA_KEY = "dagster-fivetran/infer_missing_tables" + # default polling interval (in seconds) DEFAULT_POLL_INTERVAL = 10 @@ -1034,6 +1040,12 @@ def sync_and_poll( def _sync_and_poll(self, context: Union[OpExecutionContext, AssetExecutionContext]): assets_def = context.assets_def dagster_fivetran_translator = get_translator_from_fivetran_assets(assets_def) + + # TODO: Add op tags to fivetran_assets decorator and build_fivetran_assets_definitions factory + infer_missing_tables = context.op.tags.get( + DAGSTER_FIVETRAN_INFER_MISSING_TABLES_METADATA_KEY + ) + connector_id = next( check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id) for spec in assets_def.specs @@ -1063,8 +1075,19 @@ def _sync_and_poll(self, context: Union[OpExecutionContext, AssetExecutionContex yield materialization unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys - if unmaterialized_asset_keys: - context.log.warning(f"Assets were not materialized: {unmaterialized_asset_keys}") + if infer_missing_tables: + for asset_key in unmaterialized_asset_keys: + yield Output(value=None, output_name=asset_key.to_python_identifier()) + else: + if unmaterialized_asset_keys: + asset_key = next(iter(unmaterialized_asset_keys)) + output_name = "_".join(asset_key.path) + raise DagsterStepOutputNotFoundError( + f"Core compute for {context.op_def.name} did not return an output for" + f' non-optional output "{output_name}".', + step_key=context.get_step_execution_context().step.key, + output_name=output_name, + ) @experimental