Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor logic of MaterializeOnCronRule to generic module #20364

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions python_modules/dagster/dagster/_core/asset_graph_view/cron.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import datetime
from typing import (
AbstractSet,
NamedTuple,
Optional,
Sequence,
)

from dagster import _check as check
from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey
from dagster._core.definitions.multi_dimensional_partitions import (
MultiPartitionsDefinition,
)
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.definitions.time_window_partitions import (
get_time_partitions_def,
)
from dagster._core.instance import DynamicPartitionsStore
from dagster._utils.schedules import (
cron_string_iterator,
reverse_cron_string_iterator,
)


class MissedTicksEvaluationData(NamedTuple):
cron_schedule: str
cron_timezone: str
start_dt: Optional[datetime.datetime]
end_dt: datetime.datetime

@property
def start_timestamp(self) -> Optional[float]:
return self.start_dt.timestamp() if self.start_dt else None

@property
def end_timestamp(self) -> float:
return self.end_dt.timestamp()


def last_tick_in_cron_schedule(
missed_ticks_data: MissedTicksEvaluationData,
) -> Sequence[datetime.datetime]:
last_tick_dt = next(
reverse_cron_string_iterator(
end_timestamp=missed_ticks_data.end_timestamp,
cron_string=missed_ticks_data.cron_schedule,
execution_timezone=missed_ticks_data.cron_timezone,
)
)
return [last_tick_dt]


def cron_ticks_in_time_range(
missed_ticks_data: MissedTicksEvaluationData,
) -> Sequence[datetime.datetime]:
start_ts = check.not_none(missed_ticks_data.start_timestamp, "start_timestamp must be set")
missed_ticks = []
for dt in cron_string_iterator(
start_timestamp=start_ts,
cron_string=missed_ticks_data.cron_schedule,
execution_timezone=missed_ticks_data.cron_timezone,
):
if dt.timestamp() > missed_ticks_data.end_timestamp:
break
missed_ticks.append(dt)
return missed_ticks


def get_missed_ticks(missed_ticks_data: MissedTicksEvaluationData) -> Sequence[datetime.datetime]:
"""Return the cron ticks between start and end. If end is None, return the last tick."""
return (
cron_ticks_in_time_range(missed_ticks_data)
if missed_ticks_data.start_timestamp
else last_tick_in_cron_schedule(missed_ticks_data)
)


def get_new_asset_partitions_to_request(
*,
missed_ticks: Sequence[datetime.datetime],
asset_key: AssetKey,
partitions_def: Optional[PartitionsDefinition],
dynamic_partitions_store: DynamicPartitionsStore,
all_partitions: bool,
end_dt: datetime.datetime,
) -> AbstractSet[AssetKeyPartitionKey]:
if partitions_def is None:
return {AssetKeyPartitionKey(asset_key)}

# if all_partitions is set, then just return all partitions if any ticks have been missed
if all_partitions:
return {
AssetKeyPartitionKey(asset_key, partition_key)
for partition_key in partitions_def.get_partition_keys(
current_time=end_dt,
dynamic_partitions_store=dynamic_partitions_store,
)
}

# for partitions_defs without a time component, just return the last partition if any ticks
# have been missed
time_partitions_def = get_time_partitions_def(partitions_def)
if time_partitions_def is None:
return {
AssetKeyPartitionKey(
asset_key,
partitions_def.get_last_partition_key(
dynamic_partitions_store=dynamic_partitions_store
),
)
}

missed_time_partition_keys = filter(
None,
[
time_partitions_def.get_last_partition_key(
current_time=missed_tick,
dynamic_partitions_store=dynamic_partitions_store,
)
for missed_tick in missed_ticks
],
)
# for multi partitions definitions, request to materialize all partitions for each missed
# cron schedule tick
if isinstance(partitions_def, MultiPartitionsDefinition):
return {
AssetKeyPartitionKey(asset_key, partition_key)
for time_partition_key in missed_time_partition_keys
for partition_key in partitions_def.get_multipartition_keys_with_dimension_value(
partitions_def.time_window_dimension.name,
time_partition_key,
dynamic_partitions_store=dynamic_partitions_store,
)
}
else:
return {
AssetKeyPartitionKey(asset_key, time_partition_key)
for time_partition_key in missed_time_partition_keys
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,23 @@
from collections import defaultdict
from typing import (
TYPE_CHECKING,
AbstractSet,
Dict,
Iterable,
Mapping,
NamedTuple,
Optional,
Sequence,
Set,
)

import pytz

import dagster._check as check
from dagster._annotations import experimental, public
from dagster._core.asset_graph_view.cron import (
MissedTicksEvaluationData,
get_missed_ticks,
get_new_asset_partitions_to_request,
)
from dagster._core.definitions.asset_subset import AssetSubset, ValidAssetSubset
from dagster._core.definitions.auto_materialize_rule_evaluation import (
AutoMaterializeDecisionType,
Expand All @@ -28,19 +31,16 @@
from dagster._core.definitions.freshness_based_auto_materialize import (
freshness_evaluation_results_for_asset_key,
)
from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionsDefinition
from dagster._core.definitions.time_window_partitions import (
TimeWindow,
TimeWindowPartitionsDefinition,
get_time_partitions_def,
)
from dagster._core.storage.dagster_run import RunsFilter
from dagster._core.storage.tags import AUTO_MATERIALIZE_TAG
from dagster._serdes.serdes import (
whitelist_for_serdes,
)
from dagster._utils.schedules import (
cron_string_iterator,
is_valid_cron_string,
reverse_cron_string_iterator,
)
Expand Down Expand Up @@ -290,105 +290,48 @@ def decision_type(self) -> AutoMaterializeDecisionType:
def description(self) -> str:
return f"not materialized since last cron schedule tick of '{self.cron_schedule}' (timezone: {self.timezone})"

def missed_cron_ticks(
self, context: AssetConditionEvaluationContext
) -> Sequence[datetime.datetime]:
"""Returns the cron ticks which have been missed since the previous cursor was generated."""
# if it's the first time evaluating this rule, then just count the latest tick as missed
if not context.previous_evaluation or not context.previous_evaluation_timestamp:
previous_dt = next(
reverse_cron_string_iterator(
end_timestamp=context.evaluation_time.timestamp(),
cron_string=self.cron_schedule,
execution_timezone=self.timezone,
)
)
return [previous_dt]
missed_ticks = []
for dt in cron_string_iterator(
start_timestamp=context.previous_evaluation_timestamp,
cron_string=self.cron_schedule,
execution_timezone=self.timezone,
):
if dt > context.evaluation_time:
break
missed_ticks.append(dt)
return missed_ticks

def get_new_candidate_asset_partitions(
self, context: AssetConditionEvaluationContext, missed_ticks: Sequence[datetime.datetime]
) -> AbstractSet[AssetKeyPartitionKey]:
if not missed_ticks:
return set()

partitions_def = context.partitions_def
if partitions_def is None:
return {AssetKeyPartitionKey(context.asset_key)}

# if all_partitions is set, then just return all partitions if any ticks have been missed
if self.all_partitions:
return {
AssetKeyPartitionKey(context.asset_key, partition_key)
for partition_key in partitions_def.get_partition_keys(
current_time=context.evaluation_time,
dynamic_partitions_store=context.instance_queryer,
)
}

# for partitions_defs without a time component, just return the last partition if any ticks
# have been missed
time_partitions_def = get_time_partitions_def(partitions_def)
if time_partitions_def is None:
return {
AssetKeyPartitionKey(
context.asset_key,
partitions_def.get_last_partition_key(
dynamic_partitions_store=context.instance_queryer
),
)
}

missed_time_partition_keys = filter(
None,
[
time_partitions_def.get_last_partition_key(
current_time=missed_tick,
dynamic_partitions_store=context.instance_queryer,
)
for missed_tick in missed_ticks
],
)
# for multi partitions definitions, request to materialize all partitions for each missed
# cron schedule tick
if isinstance(partitions_def, MultiPartitionsDefinition):
return {
AssetKeyPartitionKey(context.asset_key, partition_key)
for time_partition_key in missed_time_partition_keys
for partition_key in partitions_def.get_multipartition_keys_with_dimension_value(
partitions_def.time_window_dimension.name,
time_partition_key,
dynamic_partitions_store=context.instance_queryer,
)
}
else:
return {
AssetKeyPartitionKey(context.asset_key, time_partition_key)
for time_partition_key in missed_time_partition_keys
}

def evaluate_for_asset(
self, context: AssetConditionEvaluationContext
) -> "AssetConditionResult":
from .asset_condition.asset_condition import AssetConditionResult

missed_ticks = self.missed_cron_ticks(context)
new_asset_partitions = self.get_new_candidate_asset_partitions(context, missed_ticks)
# if it's the first time evaluating this rule, then just count the latest tick as missed
# by setting start_dt to None
start_dt = (
datetime.datetime.fromtimestamp(context.previous_evaluation_timestamp)
if context.previous_evaluation and context.previous_evaluation_timestamp
# None if it is the first tick
else None
)

missed_ticks = get_missed_ticks(
MissedTicksEvaluationData(
cron_schedule=self.cron_schedule,
cron_timezone=self.timezone,
start_dt=start_dt,
end_dt=context.evaluation_time,
)
)

new_asset_partitions = (
get_new_asset_partitions_to_request(
missed_ticks=missed_ticks,
asset_key=context.asset_key,
partitions_def=context.partitions_def,
dynamic_partitions_store=context.instance_queryer,
all_partitions=self.all_partitions,
end_dt=context.evaluation_time,
)
if missed_ticks
else set()
)

# if it's the first time evaluating this rule, must query for the actual subset that has
# been materialized since the previous cron tick, as materializations may have happened
# before the previous evaluation, which
# `context.materialized_requested_or_discarded_since_previous_tick_subset` would not capture
if context.previous_evaluation is None:
check.invariant(missed_ticks, "Assuming at least one missed tick on first evaluation")
new_asset_partitions -= context.instance_queryer.get_asset_subset_updated_after_time(
asset_key=context.asset_key, after_time=missed_ticks[-1]
).asset_partitions
Expand Down