Skip to content

Commit

Permalink
cp for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
schrockn committed Mar 8, 2024
1 parent 01db921 commit bed20b2
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 35 deletions.
18 changes: 13 additions & 5 deletions python_modules/dagster/dagster/_core/asset_graph_view/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,26 @@ def get_missed_ticks(cron_data: CronEvaluationData) -> Sequence[datetime.datetim
)


# class NewPartitionRequest(NamedTuple):
# missed_ticks: Sequence[datetime.datetime]
# asset_partition_keys: AbstractSet[AssetKeyPartitionKey]


def get_new_asset_partitions_to_request(
*,
cron_data: CronEvaluationData,
asset_key: AssetKey,
partitions_def: Optional[PartitionsDefinition],
dynamic_partitions_store: DynamicPartitionsStore,
all_partitions: bool,
missed_ticks: Sequence[datetime.datetime],
) -> AbstractSet[AssetKeyPartitionKey]:
missed_ticks = (
cron_ticks_in_time_range(cron_data)
if cron_data.start_timestamp
else last_tick_in_cron_schedule(cron_data)
)
# missed_ticks = get_missed_ticks(cron_data)
# # (
# cron_ticks_in_time_range(cron_data)
# if cron_data.start_timestamp
# else last_tick_in_cron_schedule(cron_data)
# )

if not missed_ticks:
return set()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
Mapping,
NamedTuple,
Optional,
Sequence,
Set,
)

Expand Down Expand Up @@ -291,50 +290,38 @@ def decision_type(self) -> AutoMaterializeDecisionType:
def description(self) -> str:
return f"not materialized since last cron schedule tick of '{self.cron_schedule}' (timezone: {self.timezone})"

def missed_cron_ticks(
self, context: AssetConditionEvaluationContext
) -> Sequence[datetime.datetime]:
"""Returns the cron ticks which have been missed since the previous cursor was generated."""
# if it's the first time evaluating this rule, then just count the latest tick as missed
start_dt = (
None
if (not context.previous_evaluation or not context.previous_evaluation_timestamp)
else datetime.datetime.fromtimestamp(context.previous_evaluation_timestamp)
)
return get_missed_ticks(
CronEvaluationData(
cron_schedule=self.cron_schedule,
cron_timezone=self.timezone,
start_dt=start_dt,
end_dt=context.evaluation_time,
)
)

def evaluate_for_asset(
self, context: AssetConditionEvaluationContext
) -> "AssetConditionResult":
from .asset_condition.asset_condition import AssetConditionResult

# if it's the first time evaluating this rule, then just count the latest tick as missed
first_time = not context.previous_evaluation or not context.previous_evaluation_timestamp
missed_ticks = get_missed_ticks(
CronEvaluationData(
cron_schedule=self.cron_schedule,
cron_timezone=self.timezone,
start_dt=None if first_time else context.evaluation_time,
end_dt=context.evaluation_time,
# by setting start_dt to None
first_tick = not context.previous_evaluation or not context.previous_evaluation_timestamp
start_dt = (
None
if first_tick
else datetime.datetime.fromtimestamp(
check.not_none(context.previous_evaluation_timestamp)
)
)
cron_data = CronEvaluationData(
cron_schedule=self.cron_schedule,
cron_timezone=self.timezone,
start_dt=start_dt,
end_dt=context.evaluation_time,
)

missed_ticks = get_missed_ticks(cron_data)

new_asset_partitions = get_new_asset_partitions_to_request(
cron_data=CronEvaluationData(
cron_schedule=self.cron_schedule,
cron_timezone=self.timezone,
start_dt=datetime.datetime.fromtimestamp(context.previous_evaluation_timestamp)
if context.previous_evaluation_timestamp
else None,
start_dt=start_dt,
end_dt=context.evaluation_time,
),
missed_ticks=missed_ticks,
asset_key=context.asset_key,
partitions_def=context.partitions_def,
dynamic_partitions_store=context.instance_queryer,
Expand Down

0 comments on commit bed20b2

Please sign in to comment.