Skip to content

Commit

Permalink
[dagster-fivetran] Move DagsterFivetranTranslator metadata to Fivetra…
Browse files Browse the repository at this point in the history
…n specs loader (#26587)

## Summary & Motivation

Following [this
discussion](#26559 (comment))
for Airbyte Cloud, we add the translator as metadata as the spec loader
level

## How I Tested These Changes

Same tests with BK
  • Loading branch information
maximearmstrong authored Dec 20, 2024
1 parent 0113bcc commit ac0b0d6
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from dagster_fivetran.resources import FivetranWorkspace
from dagster_fivetran.translator import DagsterFivetranTranslator, FivetranMetadataSet
from dagster_fivetran.utils import DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY


@experimental
Expand Down Expand Up @@ -109,9 +108,7 @@ def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: Fivet
group_name=group_name,
can_subset=True,
specs=[
spec.merge_attributes(
metadata={DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY: dagster_fivetran_translator}
)
spec
for spec in workspace.load_asset_specs(
dagster_fivetran_translator=dagster_fivetran_translator
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
)
from dagster_fivetran.types import FivetranOutput
from dagster_fivetran.utils import (
DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY,
get_fivetran_connector_table_name,
get_fivetran_connector_url,
get_fivetran_logs_url,
Expand Down Expand Up @@ -1100,16 +1101,23 @@ def load_fivetran_asset_specs(
fivetran_specs = load_fivetran_asset_specs(fivetran_workspace)
defs = dg.Definitions(assets=[*fivetran_specs], resources={"fivetran": fivetran_workspace}
"""
dagster_fivetran_translator = dagster_fivetran_translator or DagsterFivetranTranslator()

with workspace.process_config_and_initialize_cm() as initialized_workspace:
return check.is_list(
FivetranWorkspaceDefsLoader(
workspace=initialized_workspace,
translator=dagster_fivetran_translator or DagsterFivetranTranslator(),
return [
spec.merge_attributes(
metadata={DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY: dagster_fivetran_translator}
)
.build_defs()
.assets,
AssetSpec,
)
for spec in check.is_list(
FivetranWorkspaceDefsLoader(
workspace=initialized_workspace,
translator=dagster_fivetran_translator,
)
.build_defs()
.assets,
AssetSpec,
)
]


@record
Expand Down

0 comments on commit ac0b0d6

Please sign in to comment.