Skip to content

Commit

Permalink
[dagster-fivetran] Support infer_missing_tables in FivetranWorkspace.…
Browse files Browse the repository at this point in the history
…sync_and_poll
  • Loading branch information
maximearmstrong committed Dec 12, 2024
1 parent 4156025 commit 0e58732
Showing 1 changed file with 25 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
_check as check,
get_dagster_logger,
resource,
Output
)
from dagster._annotations import experimental
from dagster._config.pythonic_config import ConfigurableResource
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._core.errors import DagsterStepOutputNotFoundError
from dagster._core.utils import imap
from dagster._record import as_dict, record
from dagster._utils.cached_method import cached_method
from dagster._vendored.dateutil import parser
Expand Down Expand Up @@ -60,6 +63,9 @@
FIVETRAN_API_VERSION_PATH = f"{FIVETRAN_API_VERSION}/"
FIVETRAN_CONNECTOR_PATH = f"{FIVETRAN_CONNECTOR_ENDPOINT}/"

DEFAULT_MAX_THREADPOOL_WORKERS = 10
DAGSTER_FIVETRAN_INFER_MISSING_TABLES_METADATA_KEY = "dagster-fivetran/infer_missing_tables"

# default polling interval (in seconds)
DEFAULT_POLL_INTERVAL = 10

Expand Down Expand Up @@ -1034,6 +1040,12 @@ def sync_and_poll(
def _sync_and_poll(self, context: Union[OpExecutionContext, AssetExecutionContext]):
assets_def = context.assets_def
dagster_fivetran_translator = get_translator_from_fivetran_assets(assets_def)

# TODO: Add op tags to fivetran_assets decorator and build_fivetran_assets_definitions factory
infer_missing_tables = context.op.tags.get(
DAGSTER_FIVETRAN_INFER_MISSING_TABLES_METADATA_KEY
)

connector_id = next(
check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id)
for spec in assets_def.specs
Expand Down Expand Up @@ -1063,8 +1075,19 @@ def _sync_and_poll(self, context: Union[OpExecutionContext, AssetExecutionContex
yield materialization

unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys
if unmaterialized_asset_keys:
context.log.warning(f"Assets were not materialized: {unmaterialized_asset_keys}")
if infer_missing_tables:
for asset_key in unmaterialized_asset_keys:
yield Output(value=None, output_name=asset_key.to_python_identifier())
else:
if unmaterialized_asset_keys:
asset_key = next(iter(unmaterialized_asset_keys))
output_name = "_".join(asset_key.path)
raise DagsterStepOutputNotFoundError(
f"Core compute for {context.op_def.name} did not return an output for"
f' non-optional output "{output_name}".',
step_key=context.get_step_execution_context().step.key,
output_name=output_name,
)


@experimental
Expand Down

0 comments on commit 0e58732

Please sign in to comment.