Skip to content

Commit

Permalink
Refactor logic of MaterializeOnCronRule to generic module
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Mar 11, 2024
1 parent e106b8f commit 60ee271
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 93 deletions.
139 changes: 139 additions & 0 deletions python_modules/dagster/dagster/_core/asset_graph_view/cron.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import datetime
from typing import (
AbstractSet,
NamedTuple,
Optional,
Sequence,
)

from dagster import _check as check
from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey
from dagster._core.definitions.multi_dimensional_partitions import (
MultiPartitionsDefinition,
)
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.definitions.time_window_partitions import (
get_time_partitions_def,
)
from dagster._core.instance import DynamicPartitionsStore
from dagster._utils.schedules import (
cron_string_iterator,
reverse_cron_string_iterator,
)


class MissedTicksEvaluationData(NamedTuple):
cron_schedule: str
cron_timezone: str
start_dt: Optional[datetime.datetime]
end_dt: datetime.datetime

@property
def start_timestamp(self) -> Optional[float]:
return self.start_dt.timestamp() if self.start_dt else None

@property
def end_timestamp(self) -> float:
return self.end_dt.timestamp()


def last_tick_in_cron_schedule(
missed_ticks_data: MissedTicksEvaluationData,
) -> Sequence[datetime.datetime]:
last_tick_dt = next(
reverse_cron_string_iterator(
end_timestamp=missed_ticks_data.end_timestamp,
cron_string=missed_ticks_data.cron_schedule,
execution_timezone=missed_ticks_data.cron_timezone,
)
)
return [last_tick_dt]


def cron_ticks_in_time_range(
missed_ticks_data: MissedTicksEvaluationData,
) -> Sequence[datetime.datetime]:
start_ts = check.not_none(missed_ticks_data.start_timestamp, "start_timestamp must be set")
missed_ticks = []
for dt in cron_string_iterator(
start_timestamp=start_ts,
cron_string=missed_ticks_data.cron_schedule,
execution_timezone=missed_ticks_data.cron_timezone,
):
if dt.timestamp() > missed_ticks_data.end_timestamp:
break
missed_ticks.append(dt)
return missed_ticks


def get_missed_ticks(missed_ticks_data: MissedTicksEvaluationData) -> Sequence[datetime.datetime]:
"""Return the cron ticks between start and end. If end is None, return the last tick."""
return (
cron_ticks_in_time_range(missed_ticks_data)
if missed_ticks_data.start_timestamp
else last_tick_in_cron_schedule(missed_ticks_data)
)


def get_new_asset_partitions_to_request(
*,
missed_ticks: Sequence[datetime.datetime],
asset_key: AssetKey,
partitions_def: Optional[PartitionsDefinition],
dynamic_partitions_store: DynamicPartitionsStore,
all_partitions: bool,
end_dt: datetime.datetime,
) -> AbstractSet[AssetKeyPartitionKey]:
if partitions_def is None:
return {AssetKeyPartitionKey(asset_key)}

# if all_partitions is set, then just return all partitions if any ticks have been missed
if all_partitions:
return {
AssetKeyPartitionKey(asset_key, partition_key)
for partition_key in partitions_def.get_partition_keys(
current_time=end_dt,
dynamic_partitions_store=dynamic_partitions_store,
)
}

# for partitions_defs without a time component, just return the last partition if any ticks
# have been missed
time_partitions_def = get_time_partitions_def(partitions_def)
if time_partitions_def is None:
return {
AssetKeyPartitionKey(
asset_key,
partitions_def.get_last_partition_key(
dynamic_partitions_store=dynamic_partitions_store
),
)
}

missed_time_partition_keys = filter(
None,
[
time_partitions_def.get_last_partition_key(
current_time=missed_tick,
dynamic_partitions_store=dynamic_partitions_store,
)
for missed_tick in missed_ticks
],
)
# for multi partitions definitions, request to materialize all partitions for each missed
# cron schedule tick
if isinstance(partitions_def, MultiPartitionsDefinition):
return {
AssetKeyPartitionKey(asset_key, partition_key)
for time_partition_key in missed_time_partition_keys
for partition_key in partitions_def.get_multipartition_keys_with_dimension_value(
partitions_def.time_window_dimension.name,
time_partition_key,
dynamic_partitions_store=dynamic_partitions_store,
)
}
else:
return {
AssetKeyPartitionKey(asset_key, time_partition_key)
for time_partition_key in missed_time_partition_keys
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,23 @@
from collections import defaultdict
from typing import (
TYPE_CHECKING,
AbstractSet,
Dict,
Iterable,
Mapping,
NamedTuple,
Optional,
Sequence,
Set,
)

import pytz

import dagster._check as check
from dagster._annotations import experimental, public
from dagster._core.asset_graph_view.cron import (
MissedTicksEvaluationData,
get_missed_ticks,
get_new_asset_partitions_to_request,
)
from dagster._core.definitions.asset_subset import AssetSubset, ValidAssetSubset
from dagster._core.definitions.auto_materialize_rule_evaluation import (
AutoMaterializeDecisionType,
Expand All @@ -28,19 +31,16 @@
from dagster._core.definitions.freshness_based_auto_materialize import (
freshness_evaluation_results_for_asset_key,
)
from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionsDefinition
from dagster._core.definitions.time_window_partitions import (
TimeWindow,
TimeWindowPartitionsDefinition,
get_time_partitions_def,
)
from dagster._core.storage.dagster_run import RunsFilter
from dagster._core.storage.tags import AUTO_MATERIALIZE_TAG
from dagster._serdes.serdes import (
whitelist_for_serdes,
)
from dagster._utils.schedules import (
cron_string_iterator,
is_valid_cron_string,
reverse_cron_string_iterator,
)
Expand Down Expand Up @@ -290,105 +290,48 @@ def decision_type(self) -> AutoMaterializeDecisionType:
def description(self) -> str:
return f"not materialized since last cron schedule tick of '{self.cron_schedule}' (timezone: {self.timezone})"

def missed_cron_ticks(
self, context: AssetConditionEvaluationContext
) -> Sequence[datetime.datetime]:
"""Returns the cron ticks which have been missed since the previous cursor was generated."""
# if it's the first time evaluating this rule, then just count the latest tick as missed
if not context.previous_evaluation or not context.previous_evaluation_timestamp:
previous_dt = next(
reverse_cron_string_iterator(
end_timestamp=context.evaluation_time.timestamp(),
cron_string=self.cron_schedule,
execution_timezone=self.timezone,
)
)
return [previous_dt]
missed_ticks = []
for dt in cron_string_iterator(
start_timestamp=context.previous_evaluation_timestamp,
cron_string=self.cron_schedule,
execution_timezone=self.timezone,
):
if dt > context.evaluation_time:
break
missed_ticks.append(dt)
return missed_ticks

def get_new_candidate_asset_partitions(
self, context: AssetConditionEvaluationContext, missed_ticks: Sequence[datetime.datetime]
) -> AbstractSet[AssetKeyPartitionKey]:
if not missed_ticks:
return set()

partitions_def = context.partitions_def
if partitions_def is None:
return {AssetKeyPartitionKey(context.asset_key)}

# if all_partitions is set, then just return all partitions if any ticks have been missed
if self.all_partitions:
return {
AssetKeyPartitionKey(context.asset_key, partition_key)
for partition_key in partitions_def.get_partition_keys(
current_time=context.evaluation_time,
dynamic_partitions_store=context.instance_queryer,
)
}

# for partitions_defs without a time component, just return the last partition if any ticks
# have been missed
time_partitions_def = get_time_partitions_def(partitions_def)
if time_partitions_def is None:
return {
AssetKeyPartitionKey(
context.asset_key,
partitions_def.get_last_partition_key(
dynamic_partitions_store=context.instance_queryer
),
)
}

missed_time_partition_keys = filter(
None,
[
time_partitions_def.get_last_partition_key(
current_time=missed_tick,
dynamic_partitions_store=context.instance_queryer,
)
for missed_tick in missed_ticks
],
)
# for multi partitions definitions, request to materialize all partitions for each missed
# cron schedule tick
if isinstance(partitions_def, MultiPartitionsDefinition):
return {
AssetKeyPartitionKey(context.asset_key, partition_key)
for time_partition_key in missed_time_partition_keys
for partition_key in partitions_def.get_multipartition_keys_with_dimension_value(
partitions_def.time_window_dimension.name,
time_partition_key,
dynamic_partitions_store=context.instance_queryer,
)
}
else:
return {
AssetKeyPartitionKey(context.asset_key, time_partition_key)
for time_partition_key in missed_time_partition_keys
}

def evaluate_for_asset(
self, context: AssetConditionEvaluationContext
) -> "AssetConditionResult":
from .asset_condition.asset_condition import AssetConditionResult

missed_ticks = self.missed_cron_ticks(context)
new_asset_partitions = self.get_new_candidate_asset_partitions(context, missed_ticks)
# if it's the first time evaluating this rule, then just count the latest tick as missed
# by setting start_dt to None
start_dt = (
datetime.datetime.fromtimestamp(context.previous_evaluation_timestamp)
if context.previous_evaluation and context.previous_evaluation_timestamp
# None if it is the first tick
else None
)

missed_ticks = get_missed_ticks(
MissedTicksEvaluationData(
cron_schedule=self.cron_schedule,
cron_timezone=self.timezone,
start_dt=start_dt,
end_dt=context.evaluation_time,
)
)

new_asset_partitions = (
get_new_asset_partitions_to_request(
missed_ticks=missed_ticks,
asset_key=context.asset_key,
partitions_def=context.partitions_def,
dynamic_partitions_store=context.instance_queryer,
all_partitions=self.all_partitions,
end_dt=context.evaluation_time,
)
if missed_ticks
else set()
)

# if it's the first time evaluating this rule, must query for the actual subset that has
# been materialized since the previous cron tick, as materializations may have happened
# before the previous evaluation, which
# `context.materialized_requested_or_discarded_since_previous_tick_subset` would not capture
if context.previous_evaluation is None:
check.invariant(missed_ticks, "Assuming at least one missed tick on first evaluation")
new_asset_partitions -= context.instance_queryer.get_asset_subset_updated_after_time(
asset_key=context.asset_key, after_time=missed_ticks[-1]
).asset_partitions
Expand Down

0 comments on commit 60ee271

Please sign in to comment.