Skip to content

Commit

Permalink
[dagster-fivetran] Support infer_missing_tables in FivetranWorkspace.…
Browse files Browse the repository at this point in the history
…sync_and_poll
  • Loading branch information
maximearmstrong committed Nov 27, 2024
1 parent de95534 commit 4ddd6ab
Showing 1 changed file with 20 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from dagster._core.definitions.metadata.metadata_set import TableMetadataSet
from dagster._core.definitions.metadata.table import TableColumn, TableSchema
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._core.errors import DagsterStepOutputNotFoundError
from dagster._core.utils import imap
from dagster._record import as_dict, record
from dagster._utils.cached_method import cached_method
Expand Down Expand Up @@ -64,11 +65,13 @@
FIVETRAN_CONNECTOR_PATH = f"{FIVETRAN_CONNECTOR_ENDPOINT}/"

DAGSTER_FIVETRAN_FETCH_COLUMN_METADATA_KEY = "dagster-fivetran/fetch_column_metadata"
DEFAULT_MAX_THREADPOOL_WORKERS = 10
DAGSTER_FIVETRAN_INFER_MISSING_TABLES_METADATA_KEY = "dagster-fivetran/infer_missing_tables"

# default polling interval (in seconds)
DEFAULT_POLL_INTERVAL = 10

DEFAULT_MAX_THREADPOOL_WORKERS = 10

FIVETRAN_RECONSTRUCTION_METADATA_KEY_PREFIX = "dagster-fivetran/reconstruction_metadata"


Expand Down Expand Up @@ -1040,6 +1043,9 @@ def sync_and_poll(

# TODO: Add op tags to fivetran_assets decorator and build_fivetran_assets_definitions factory
fetch_column_metadata = context.op.tags.get(DAGSTER_FIVETRAN_FETCH_COLUMN_METADATA_KEY)
infer_missing_tables = context.op.tags.get(
DAGSTER_FIVETRAN_INFER_MISSING_TABLES_METADATA_KEY
)

connector_id = next(
check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id)
Expand Down Expand Up @@ -1086,8 +1092,19 @@ def sync_and_poll(
yield materialization

unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys
if unmaterialized_asset_keys:
context.log.warning(f"Assets were not materialized: {unmaterialized_asset_keys}")
if infer_missing_tables:
for asset_key in unmaterialized_asset_keys:
yield Output(value=None, output_name=asset_key.to_python_identifier())
else:
if unmaterialized_asset_keys:
asset_key = next(iter(unmaterialized_asset_keys))
output_name = "_".join(asset_key.path)
raise DagsterStepOutputNotFoundError(
f"Core compute for {context.op_def.name} did not return an output for"
f' non-optional output "{output_name}".',
step_key=context.get_step_execution_context().step.key,
output_name=output_name,
)

def _fetch_and_attach_col_metadata(
self, connector_id: str, materialization: AssetMaterialization
Expand Down

0 comments on commit 4ddd6ab

Please sign in to comment.