Skip to content

Commit

Permalink
Change default backfill policy to BackfillPolicy.multi_run(1) for mat…
Browse files Browse the repository at this point in the history
…erializable assets
  • Loading branch information
smackesey committed Jun 12, 2024
1 parent b478c5f commit 0c0504d
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 64 deletions.
18 changes: 4 additions & 14 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
AssetExecutionType,
)
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.backfill_policy import BackfillPolicy, BackfillPolicyType
from dagster._core.definitions.backfill_policy import DEFAULT_BACKFILL_POLICY, BackfillPolicy
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.graph_definition import SubselectedGraphDefinition
from dagster._core.definitions.metadata import ArbitraryMetadataMapping
Expand Down Expand Up @@ -195,18 +195,6 @@ def __init__(
backfill_policy, "backfill_policy", BackfillPolicy
)

if self._partitions_def is None:
# check if backfill policy is BackfillPolicyType.SINGLE_RUN if asset is not partitioned
check.param_invariant(
(
backfill_policy.policy_type is BackfillPolicyType.SINGLE_RUN
if backfill_policy
else True
),
"backfill_policy",
"Non partitioned asset can only have single run backfill policy",
)

self._is_subset = check.bool_param(is_subset, "is_subset")

if specs is not None:
Expand Down Expand Up @@ -952,7 +940,9 @@ def _get_external_asset_metadata_value(self, metadata_key: str) -> object:

@property
def backfill_policy(self) -> Optional[BackfillPolicy]:
return self._backfill_policy
return self._backfill_policy or (
DEFAULT_BACKFILL_POLICY if self.is_materializable else None
)

@public
@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,16 @@ def __str__(self):
)


with disable_dagster_warnings():
DEFAULT_BACKFILL_POLICY = BackfillPolicy.multi_run(1)


# In situations where multiple backfill policies are specified, call this to resolve a canonical
# policy, which is the policy with the minimum max_partitions_per_run.
def resolve_backfill_policy(
backfill_policies: Iterable[Optional[BackfillPolicy]],
backfill_policies: Iterable[BackfillPolicy],
) -> BackfillPolicy:
policy = next(iter(sorted(backfill_policies, key=_backfill_policy_sort_key)), None)
with disable_dagster_warnings():
return policy or BackfillPolicy.multi_run(1)


def _backfill_policy_sort_key(bp: Optional[BackfillPolicy]) -> float:
if bp is None: # equivalent to max_partitions_per_run=1
return 1
elif bp.max_partitions_per_run is None:
return float("inf")
else:
return bp.max_partitions_per_run
return next(
iter(sorted(backfill_policies, key=lambda bp: bp.max_partitions_per_run or float("inf"))),
DEFAULT_BACKFILL_POLICY,
)
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,12 @@ def resolve(
# Require that all assets in the job have the same backfill policy
executable_nodes = {job_asset_graph.get(k) for k in job_asset_graph.executable_asset_keys}
nodes_by_backfill_policy = dict(
groupby((n for n in executable_nodes if n.is_partitioned), lambda n: n.backfill_policy)
groupby(
(n for n in executable_nodes if n.is_partitioned),
lambda n: check.not_none(
n.backfill_policy, f"Unexpected null backfill policy for {n.key}"
),
)
)
backfill_policy = resolve_backfill_policy(nodes_by_backfill_policy.keys())

Expand Down
41 changes: 6 additions & 35 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@
import pendulum

import dagster._check as check
from dagster._core.definitions.asset_daemon_context import (
build_run_requests,
build_run_requests_with_backfill_policies,
)
from dagster._core.definitions.asset_daemon_context import build_run_requests_with_backfill_policies
from dagster._core.definitions.asset_graph_subset import AssetGraphSubset
from dagster._core.definitions.asset_selection import KeysAssetSelection
from dagster._core.definitions.base_asset_graph import BaseAssetGraph
Expand Down Expand Up @@ -1310,38 +1307,12 @@ def execute_asset_backfill_iteration_inner(
f"The following assets were considered for materialization but not requested: {not_requested_str}"
)

# check if all assets have backfill policies if any of them do, otherwise, raise error
asset_backfill_policies = [
asset_graph.get(asset_key).backfill_policy
for asset_key in {
asset_partition.asset_key for asset_partition in asset_partitions_to_request
}
]
all_assets_have_backfill_policies = all(
backfill_policy is not None for backfill_policy in asset_backfill_policies
run_requests = build_run_requests_with_backfill_policies(
asset_partitions=asset_partitions_to_request,
asset_graph=asset_graph,
run_tags={**run_tags, BACKFILL_ID_TAG: backfill_id},
dynamic_partitions_store=instance_queryer,
)
if all_assets_have_backfill_policies:
run_requests = build_run_requests_with_backfill_policies(
asset_partitions=asset_partitions_to_request,
asset_graph=asset_graph,
run_tags={**run_tags, BACKFILL_ID_TAG: backfill_id},
dynamic_partitions_store=instance_queryer,
)
else:
if not all(backfill_policy is None for backfill_policy in asset_backfill_policies):
# if some assets have backfill policies, but not all of them, raise error
raise DagsterBackfillFailedError(
"Either all assets must have backfill policies or none of them must have backfill"
" policies. To backfill these assets together, either add backfill policies to all"
" assets, or remove backfill policies from all assets."
)
# When any of the assets do not have backfill policies, we fall back to the default behavior of
# backfilling them partition by partition.
run_requests = build_run_requests(
asset_partitions=asset_partitions_to_request,
asset_graph=asset_graph,
run_tags={**run_tags, BACKFILL_ID_TAG: backfill_id},
)

if request_roots:
check.invariant(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
from dagster._core.definitions.auto_materialize_sensor_definition import (
AutoMaterializeSensorDefinition,
)
from dagster._core.definitions.backfill_policy import BackfillPolicy
from dagster._core.definitions.backfill_policy import DEFAULT_BACKFILL_POLICY, BackfillPolicy
from dagster._core.definitions.definition_config_schema import ConfiguredDefinitionConfigSchema
from dagster._core.definitions.dependency import (
GraphNode,
Expand Down Expand Up @@ -1419,6 +1419,10 @@ def __new__(
# job, and no source assets could be part of any job
is_source = len(job_names or []) == 0

# backcompat logic to assign default BackfillPolicy for materializable assets
if backfill_policy is None and execution_type == AssetExecutionType.MATERIALIZATION:
backfill_policy = DEFAULT_BACKFILL_POLICY

return super(ExternalAssetNode, cls).__new__(
cls,
asset_key=check.inst_param(asset_key, "asset_key", AssetKey),
Expand Down

0 comments on commit 0c0504d

Please sign in to comment.