Skip to content

Commit

Permalink
Update callsites of AssetSpec._replace to use replace_attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
benpankow committed Nov 15, 2024
1 parent e1756ba commit 73e87b9
Show file tree
Hide file tree
Showing 15 changed files with 40 additions and 36 deletions.
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
6 changes: 4 additions & 2 deletions docs/content/integrations/looker.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,13 @@ class CustomDagsterLookerApiTranslator(DagsterLookerApiTranslator):
asset_spec = super().get_asset_spec(looker_structure)

# Add a team owner tag for all Looker assets
asset_spec = asset_spec._replace(owners=["team:my_team"])
asset_spec = asset_spec.replace_attributes(owners=["team:my_team"])

# For only Looker dashboard, prefix the asset key with "looker" for organizational purposes
if looker_structure.structure_type == LookerStructureType.DASHBOARD:
asset_spec = asset_spec._replace(key=asset_spec.key.with_prefix("looker"))
asset_spec = asset_spec.replace_attributes(
key=asset_spec.key.with_prefix("looker")
)

return asset_spec

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ def get_asset_spec(self, looker_structure: LookerStructureData) -> dg.AssetSpec:
asset_spec = super().get_asset_spec(looker_structure)

# Add a team owner tag for all Looker assets
asset_spec = asset_spec._replace(owners=["team:my_team"])
asset_spec = asset_spec.replace_attributes(owners=["team:my_team"])

# For only Looker dashboard, prefix the asset key with "looker" for organizational purposes
if looker_structure.structure_type == LookerStructureType.DASHBOARD:
asset_spec = asset_spec._replace(key=asset_spec.key.with_prefix("looker"))
asset_spec = asset_spec.replace_attributes(
key=asset_spec.key.with_prefix("looker")
)

return asset_spec

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ def enrich_spec_with_airflow_task_metadata(
tasks: AbstractSet[TaskHandle],
serialized_data: SerializedAirflowDefinitionsData,
) -> AssetSpec:
tags = {**spec.tags, **airlift_mapped_kind_dict()}
return spec._replace(
metadata={**spec.metadata, **metadata_for_mapped_tasks(tasks, serialized_data)},
tags=tags,
return spec.merge_attributes(
metadata=metadata_for_mapped_tasks(tasks, serialized_data),
tags=airlift_mapped_kind_dict(),
)


Expand All @@ -65,10 +64,9 @@ def enrich_spec_with_airflow_dag_metadata(
dags: AbstractSet[DagHandle],
serialized_data: SerializedAirflowDefinitionsData,
) -> AssetSpec:
tags = {**spec.tags, **airlift_mapped_kind_dict()}
return spec._replace(
metadata={**spec.metadata, **metadata_for_mapped_dags(dags, serialized_data)},
tags=tags,
return spec.merge_attributes(
metadata=metadata_for_mapped_dags(dags, serialized_data),
tags=airlift_mapped_kind_dict(),
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def _attach_code_references_to_definitions(blueprint: Blueprint, defs: Definitio

new_assets_defs.append(
assets_def.map_asset_specs(
lambda spec: spec._replace(metadata=new_metadata_by_key[spec.key])
lambda spec: spec.replace_attributes(metadata=new_metadata_by_key[spec.key])
)
)
return copy(defs, assets=new_assets_defs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def filter_specs_by_tag(specs: Sequence[AssetSpec], tag: str) -> Dict[AssetKey,


def add_dep_to_spec(spec: AssetSpec, dep: AssetKey) -> AssetSpec:
return spec._replace(deps=[*spec.deps, AssetDep(dep)])
return spec.replace_attributes(deps=[*spec.deps, AssetDep(dep)])


def key_for_uid(specs: Sequence[AssetSpec], uid: str) -> AssetKey:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def jaffle_shop_assets(context: AssetExecutionContext, dbt: DbtCliResource):


jaffle_shop_external_assets = [
spec._replace(code_version=None, skippable=False) for spec in jaffle_shop_assets.specs
spec.replace_attributes(code_version=None, skippable=False) for spec in jaffle_shop_assets.specs
]

jaffle_shop_with_upstream = eager_asset(with_deps(DBT_SOURCE_TO_DAG, jaffle_shop_assets))
Expand Down
10 changes: 7 additions & 3 deletions examples/starlift-demo/dbt_example/dagster_defs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ def apply_eager_automation(defs: Definitions) -> Definitions:
continue
assets.append(
asset.map_asset_specs(
lambda spec: spec._replace(automation_condition=AutomationCondition.eager())
lambda spec: spec.replace_attributes(
automation_condition=AutomationCondition.eager()
)
if spec.automation_condition is None
else spec
)
Expand All @@ -40,7 +42,7 @@ def apply_eager_automation(defs: Definitions) -> Definitions:


def with_group(assets_def: AssetsDefinition, group_name: str) -> AssetsDefinition:
return assets_def.map_asset_specs(lambda spec: spec._replace(group_name=group_name))
return assets_def.map_asset_specs(lambda spec: spec.replace_attributes(group_name=group_name))


def with_deps(
Expand Down Expand Up @@ -93,4 +95,6 @@ def with_deps(


def eager(specs: Sequence[AssetSpec]) -> Sequence[AssetSpec]:
return [spec._replace(automation_condition=AutomationCondition.eager()) for spec in specs]
return [
spec.replace_attributes(automation_condition=AutomationCondition.eager()) for spec in specs
]
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -1238,7 +1238,7 @@ def update_replace_dict_and_conflicts(

replace_dict["deps"] = new_deps

replaced_specs.append(spec._replace(**replace_dict))
replaced_specs.append(spec.replace_attributes(**replace_dict))

for attr_name, conflicting_asset_keys in conflicts_by_attr_name.items():
raise DagsterInvalidDefinitionError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ def specs(self) -> Sequence[AssetSpec]:
" AssetSpecs/AssetOuts supplied to this multi_asset have a group_name defined.",
)

return [spec._replace(group_name=self.group_name) for spec in specs]
return [spec.replace_attributes(group_name=self.group_name) for spec in specs]

def _synthesize_specs(self) -> Sequence[AssetSpec]:
resolved_specs = []
Expand Down
24 changes: 11 additions & 13 deletions python_modules/dagster/dagster/_core/definitions/external_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,19 +183,17 @@ def create_unexecutable_external_asset_from_assets_def(
for key in assets_def.keys:
orig_spec = assets_def.get_asset_spec(key)
specs.append(
orig_spec._replace(
metadata={
**(orig_spec.metadata or {}),
**(
{
SYSTEM_METADATA_KEY_IO_MANAGER_KEY: assets_def.get_io_manager_key_for_asset_key(
key
)
}
if assets_def.has_output_for_asset_key(key)
else {}
),
},
orig_spec.merge_attributes(
metadata=(
{
SYSTEM_METADATA_KEY_IO_MANAGER_KEY: assets_def.get_io_manager_key_for_asset_key(
key
)
}
if assets_def.has_output_for_asset_key(key)
else {}
),
).replace_attributes(
automation_condition=None,
freshness_policy=None,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def _with_code_source_single_definition(
}

return assets_def.map_asset_specs(
lambda spec: spec._replace(metadata=metadata_by_key[spec.key])
lambda spec: spec.replace_attributes(metadata=metadata_by_key[spec.key])
)


Expand Down Expand Up @@ -279,7 +279,7 @@ def _convert_local_path_to_git_path_single_definition(
}

return assets_def.map_asset_specs(
lambda spec: spec._replace(metadata=metadata_by_key[spec.key])
lambda spec: spec.replace_attributes(metadata=metadata_by_key[spec.key])
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def get_asset_key(self, data: SigmaDataset) -> AssetKey:
def get_asset_spec(self, data: SigmaDataset) -> AssetSpec:
spec = super().get_asset_spec(data)
if isinstance(data, SigmaDataset):
return spec._replace(description="Custom description")
return spec.replace_attributes(description="Custom description")
return spec

sample_dataset = SigmaDataset(
Expand Down

0 comments on commit 73e87b9

Please sign in to comment.