Skip to content

Commit

Permalink
Change default backfill policy to BackfillPolicy.multi_run(1) for mat…
Browse files Browse the repository at this point in the history
…erializable assets
  • Loading branch information
smackesey committed Jun 12, 2024
1 parent b5a6878 commit caf4b19
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 131 deletions.
16 changes: 2 additions & 14 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
AssetExecutionType,
)
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.backfill_policy import BackfillPolicy, BackfillPolicyType
from dagster._core.definitions.backfill_policy import DEFAULT_BACKFILL_POLICY, BackfillPolicy
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.graph_definition import SubselectedGraphDefinition
from dagster._core.definitions.metadata import ArbitraryMetadataMapping
Expand Down Expand Up @@ -195,18 +195,6 @@ def __init__(
backfill_policy, "backfill_policy", BackfillPolicy
)

if self._partitions_def is None:
# check if backfill policy is BackfillPolicyType.SINGLE_RUN if asset is not partitioned
check.param_invariant(
(
backfill_policy.policy_type is BackfillPolicyType.SINGLE_RUN
if backfill_policy
else True
),
"backfill_policy",
"Non partitioned asset can only have single run backfill policy",
)

self._is_subset = check.bool_param(is_subset, "is_subset")

if specs is not None:
Expand Down Expand Up @@ -952,7 +940,7 @@ def _get_external_asset_metadata_value(self, metadata_key: str) -> object:

@property
def backfill_policy(self) -> Optional[BackfillPolicy]:
return self._backfill_policy
return self._backfill_policy or (DEFAULT_BACKFILL_POLICY if self.is_executable else None)

@public
@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,16 @@ def __str__(self):
)


with disable_dagster_warnings():
DEFAULT_BACKFILL_POLICY = BackfillPolicy.multi_run(1)


# In situations where multiple backfill policies are specified, call this to resolve a canonical
# policy, which is the policy with the minimum max_partitions_per_run.
def resolve_backfill_policy(
backfill_policies: Iterable[Optional[BackfillPolicy]],
backfill_policies: Iterable[BackfillPolicy],
) -> BackfillPolicy:
policy = next(iter(sorted(backfill_policies, key=_backfill_policy_sort_key)), None)
with disable_dagster_warnings():
return policy or BackfillPolicy.multi_run(1)


def _backfill_policy_sort_key(bp: Optional[BackfillPolicy]) -> float:
if bp is None: # equivalent to max_partitions_per_run=1
return 1
elif bp.max_partitions_per_run is None:
return float("inf")
else:
return bp.max_partitions_per_run
return next(
iter(sorted(backfill_policies, key=lambda bp: bp.max_partitions_per_run or float("inf"))),
DEFAULT_BACKFILL_POLICY,
)
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,12 @@ def resolve(
# Require that all assets in the job have the same backfill policy
executable_nodes = {job_asset_graph.get(k) for k in job_asset_graph.executable_asset_keys}
nodes_by_backfill_policy = dict(
groupby((n for n in executable_nodes if n.is_partitioned), lambda n: n.backfill_policy)
groupby(
(n for n in executable_nodes if n.is_partitioned),
lambda n: check.not_none(
n.backfill_policy, f"Unexpected null backfill policy for {n.key}"
),
)
)
backfill_policy = resolve_backfill_policy(nodes_by_backfill_policy.keys())

Expand Down
41 changes: 6 additions & 35 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@
import pendulum

import dagster._check as check
from dagster._core.definitions.asset_daemon_context import (
build_run_requests,
build_run_requests_with_backfill_policies,
)
from dagster._core.definitions.asset_daemon_context import build_run_requests_with_backfill_policies
from dagster._core.definitions.asset_graph_subset import AssetGraphSubset
from dagster._core.definitions.asset_selection import KeysAssetSelection
from dagster._core.definitions.base_asset_graph import BaseAssetGraph
Expand Down Expand Up @@ -1310,38 +1307,12 @@ def execute_asset_backfill_iteration_inner(
f"The following assets were considered for materialization but not requested: {not_requested_str}"
)

# check if all assets have backfill policies if any of them do, otherwise, raise error
asset_backfill_policies = [
asset_graph.get(asset_key).backfill_policy
for asset_key in {
asset_partition.asset_key for asset_partition in asset_partitions_to_request
}
]
all_assets_have_backfill_policies = all(
backfill_policy is not None for backfill_policy in asset_backfill_policies
run_requests = build_run_requests_with_backfill_policies(
asset_partitions=asset_partitions_to_request,
asset_graph=asset_graph,
run_tags={**run_tags, BACKFILL_ID_TAG: backfill_id},
dynamic_partitions_store=instance_queryer,
)
if all_assets_have_backfill_policies:
run_requests = build_run_requests_with_backfill_policies(
asset_partitions=asset_partitions_to_request,
asset_graph=asset_graph,
run_tags={**run_tags, BACKFILL_ID_TAG: backfill_id},
dynamic_partitions_store=instance_queryer,
)
else:
if not all(backfill_policy is None for backfill_policy in asset_backfill_policies):
# if some assets have backfill policies, but not all of them, raise error
raise DagsterBackfillFailedError(
"Either all assets must have backfill policies or none of them must have backfill"
" policies. To backfill these assets together, either add backfill policies to all"
" assets, or remove backfill policies from all assets."
)
# When any of the assets do not have backfill policies, we fall back to the default behavior of
# backfilling them partition by partition.
run_requests = build_run_requests(
asset_partitions=asset_partitions_to_request,
asset_graph=asset_graph,
run_tags={**run_tags, BACKFILL_ID_TAG: backfill_id},
)

if request_roots:
check.invariant(
Expand Down
34 changes: 15 additions & 19 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1363,21 +1363,17 @@ def _log_materialization_planned_event_for_asset(
from dagster._core.definitions.partition_key_range import PartitionKeyRange
from dagster._core.events import AssetMaterializationPlannedData, DagsterEvent

is_partitioned = check.not_none(output.properties).is_asset_partitioned
partition_tag = dagster_run.tags.get(PARTITION_NAME_TAG)
partition_range_start, partition_range_end = (
dagster_run.tags.get(ASSET_PARTITION_RANGE_START_TAG),
dagster_run.tags.get(ASSET_PARTITION_RANGE_END_TAG),
)

if partition_tag and (partition_range_start or partition_range_end):
raise DagsterInvariantViolationError(
f"Cannot have {ASSET_PARTITION_RANGE_START_TAG} or"
f" {ASSET_PARTITION_RANGE_END_TAG} set along with"
f" {PARTITION_NAME_TAG}"
)

partitions_subset = None
if partition_range_start or partition_range_end:
if is_partitioned and partition_tag:
partition = partition_tag
partitions_subset = None
elif is_partitioned and (partition_range_start or partition_range_end):
if not partition_range_start or not partition_range_end:
raise DagsterInvariantViolationError(
f"Cannot have {ASSET_PARTITION_RANGE_START_TAG} or"
Expand All @@ -1398,17 +1394,17 @@ def _log_materialization_planned_event_for_asset(
"Creating a run targeting a partition range is not supported for jobs partitioned with function-based dynamic partitions"
)

if check.not_none(output.properties).is_asset_partitioned:
partitions_subset = job_partitions_def.subset_with_partition_keys(
job_partitions_def.get_partition_keys_in_range(
PartitionKeyRange(partition_range_start, partition_range_end),
dynamic_partitions_store=self,
)
).to_serializable_subset()
partition = None
partitions_subset = job_partitions_def.subset_with_partition_keys(
job_partitions_def.get_partition_keys_in_range(
PartitionKeyRange(partition_range_start, partition_range_end),
dynamic_partitions_store=self,
)
).to_serializable_subset()
else:
partition = None
partitions_subset = None

partition = (
partition_tag if check.not_none(output.properties).is_asset_partitioned else None
)
materialization_planned = DagsterEvent.build_asset_materialization_planned_event(
job_name,
step.key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
from dagster._core.definitions.auto_materialize_sensor_definition import (
AutoMaterializeSensorDefinition,
)
from dagster._core.definitions.backfill_policy import BackfillPolicy
from dagster._core.definitions.backfill_policy import DEFAULT_BACKFILL_POLICY, BackfillPolicy
from dagster._core.definitions.definition_config_schema import ConfiguredDefinitionConfigSchema
from dagster._core.definitions.dependency import (
GraphNode,
Expand Down Expand Up @@ -1419,6 +1419,10 @@ def __new__(
# job, and no source assets could be part of any job
is_source = len(job_names or []) == 0

# backcompat logic to assign default BackfillPolicy for materializable assets
if backfill_policy is None and execution_type == AssetExecutionType.MATERIALIZATION:
backfill_policy = DEFAULT_BACKFILL_POLICY

return super(ExternalAssetNode, cls).__new__(
cls,
asset_key=check.inst_param(asset_key, "asset_key", AssetKey),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
asset,
)
from dagster._core.definitions.partition import StaticPartitionsDefinition
from dagster._core.errors import DagsterBackfillFailedError
from dagster._core.execution.asset_backfill import AssetBackfillData, AssetBackfillStatus
from dagster._core.instance_for_test import instance_for_test
from dagster._core.storage.tags import (
Expand All @@ -37,53 +36,6 @@
)


def test_asset_backfill_not_all_asset_have_backfill_policy():
@asset(backfill_policy=None)
def unpartitioned_upstream_of_partitioned():
return 1

@asset(
partitions_def=DailyPartitionsDefinition("2023-01-01"),
backfill_policy=BackfillPolicy.single_run(),
)
def upstream_daily_partitioned_asset():
return 1

assets_by_repo_name = {
"repo": [
unpartitioned_upstream_of_partitioned,
upstream_daily_partitioned_asset,
]
}
asset_graph = get_asset_graph(assets_by_repo_name)

backfill_data = AssetBackfillData.from_asset_partitions(
partition_names=None,
asset_graph=asset_graph,
asset_selection=[
unpartitioned_upstream_of_partitioned.key,
upstream_daily_partitioned_asset.key,
],
dynamic_partitions_store=MagicMock(),
all_partitions=True,
backfill_start_timestamp=get_current_timestamp(),
)

with pytest.raises(
DagsterBackfillFailedError,
match=(
"Either all assets must have backfill policies or none of them must have backfill"
" policies"
),
):
execute_asset_backfill_iteration_consume_generator(
backfill_id="test_backfill_id",
asset_backfill_data=backfill_data,
asset_graph=asset_graph,
instance=DagsterInstance.ephemeral(),
)


def test_asset_backfill_parent_and_children_have_different_backfill_policy():
time_now = get_current_datetime()
daily_partitions_def: DailyPartitionsDefinition = DailyPartitionsDefinition("2023-01-01")
Expand Down

0 comments on commit caf4b19

Please sign in to comment.