Skip to content

Commit

Permalink
Fully remove pendulum from dagster core (#22418)
Browse files Browse the repository at this point in the history
## Summary & Motivation
Will wait to land this until after the 1.8 branch cut.

## How I Tested These Changes
  • Loading branch information
gibsondan authored Jul 29, 2024
1 parent 8e4dbcf commit 403776f
Show file tree
Hide file tree
Showing 31 changed files with 177 additions and 396 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,6 @@ def _unsupported_dagster_python_versions(tox_factor: Optional[str]) -> List[Avai
return [AvailablePythonVersion.V3_11, AvailablePythonVersion.V3_12]

if tox_factor in {
"definitions_tests_pendulum_1",
"definitions_tests_pendulum_2",
"scheduler_tests_pendulum_1",
"scheduler_tests_pendulum_2",
"type_signature_tests",
}:
return [AvailablePythonVersion.V3_12]
Expand Down Expand Up @@ -416,17 +412,13 @@ def tox_factors_for_folder(tests_folder_name: str) -> List[str]:
"daemon_sensor_tests",
"daemon_tests",
"definitions_tests",
"definitions_tests_pendulum_1",
"definitions_tests_pendulum_2",
"general_tests",
"general_tests_old_protobuf",
"launcher_tests",
"logging_tests",
"model_tests_pydantic1",
"model_tests_pydantic2",
"scheduler_tests",
"scheduler_tests_pendulum_1",
"scheduler_tests_pendulum_2",
"storage_tests",
"storage_tests_sqlalchemy_1_3",
"storage_tests_sqlalchemy_1_4",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import Optional, Sequence, Union, cast

import pendulum
from dagster import (
AssetDep,
AssetsDefinition,
Expand All @@ -17,6 +16,7 @@
ASSET_PARTITION_RANGE_END_TAG,
ASSET_PARTITION_RANGE_START_TAG,
)
from dagster._time import parse_time_string

from .perf_scenario import ActivityHistory, PerfScenario

Expand Down Expand Up @@ -94,5 +94,5 @@ def build_run_request_for_all_partitions(asset_def: AssetsDefinition) -> RunRequ
[build_run_request_for_all_partitions(a) for a in assets]
+ [build_run_request_for_all_partitions(assets_by_key["d"])]
),
current_time=pendulum.parse("2023-09-06T00:05"),
current_time=parse_time_string("2023-09-06T00:05"),
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import random
from datetime import timedelta

import pendulum
from dagster import (
AssetExecutionContext,
MetadataValue,
Expand All @@ -17,6 +16,7 @@
observable_source_asset,
)
from dagster._core.definitions.asset_selection import KeysAssetSelection
from dagster._time import get_current_datetime


@observable_source_asset(group_name="freshness_checks")
Expand Down Expand Up @@ -45,7 +45,7 @@ def derived_asset(context: AssetExecutionContext) -> None:
.value,
"latest_updated_timestamp",
)
if latest_updated_timestamp < pendulum.now().subtract(minutes=4).timestamp():
if latest_updated_timestamp < (get_current_datetime() - timedelta(minutes=4)).timestamp():
raise Exception("source is stale, so I am going to fail :(")


Expand Down Expand Up @@ -95,7 +95,7 @@ def get_last_updated_timestamp_unreliable_source() -> TimestampMetadataValue:
"dagster/last_updated_timestamp"
] # type: ignore
else:
now = pendulum.now()
now = get_current_datetime()
rounded_minute = max(math.floor((now.minute - 1) / 3) * 3, 0)
return MetadataValue.timestamp(now.replace(minute=rounded_minute))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
TimeWindowPartitionsDefinition,
get_time_partitions_def,
)
from dagster._time import get_current_datetime
from dagster._utils.cached_method import cached_method

if TYPE_CHECKING:
Expand Down Expand Up @@ -276,8 +277,6 @@ def for_test(
effective_dt: Optional[datetime] = None,
last_event_id: Optional[int] = None,
):
import pendulum

from dagster._core.definitions.data_version import CachingStaleStatusResolver
from dagster._core.instance import DagsterInstance

Expand All @@ -290,7 +289,7 @@ def for_test(
return AssetGraphView(
stale_resolver=stale_resolver,
temporal_context=TemporalContext(
effective_dt=effective_dt or pendulum.now(),
effective_dt=effective_dt or get_current_datetime(),
last_event_id=last_event_id or instance.event_log_storage.get_maximum_record_id(),
),
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import datetime
from typing import Any, Dict, Iterable, Optional, Sequence, Union, cast

import pendulum

from dagster import _check as check
from dagster._annotations import experimental
from dagster._core.definitions.asset_check_result import AssetCheckResult
Expand All @@ -16,6 +14,7 @@
TimestampMetadataValue,
)
from dagster._core.execution.context.compute import AssetCheckExecutionContext
from dagster._time import get_current_timestamp, get_timezone
from dagster._utils.schedules import (
get_latest_completed_cron_tick,
get_next_cron_tick,
Expand Down Expand Up @@ -168,9 +167,11 @@ def _build_freshness_multi_check(
def the_check(context: AssetCheckExecutionContext) -> Iterable[AssetCheckResult]:
for check_key in context.selected_asset_check_keys:
asset_key = check_key.asset_key
current_timestamp = pendulum.now("UTC").timestamp()
current_timestamp = get_current_timestamp()

current_time_in_freshness_tz = pendulum.from_timestamp(current_timestamp, tz=timezone)
current_time_in_freshness_tz = datetime.datetime.fromtimestamp(
current_timestamp, tz=get_timezone(timezone)
)
latest_completed_cron_tick = (
get_latest_completed_cron_tick(
deadline_cron, current_time_in_freshness_tz, timezone
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import datetime
from typing import Iterator, Optional, Sequence, Tuple, Union, cast

import pendulum

from dagster import _check as check
from dagster._annotations import experimental
from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecordStatus
from dagster._time import get_current_datetime, get_current_timestamp

from ...asset_check_spec import AssetCheckKey
from ...asset_checks import AssetChecksDefinition
Expand All @@ -21,8 +20,8 @@
This sensor launches execution of freshness checks for the provided assets. The sensor will
only launch a new execution of a freshness check if the check previously passed, but enough
time has passed that the check could be overdue again. Once a check has failed, the sensor
will not launch a new execution until the asset has been updated (which should automatically
execute the check).
will not launch a new execution until the asset has been updated (which should automatically
execute the check).
"""


Expand Down Expand Up @@ -84,7 +83,7 @@ def the_sensor(context: SensorEvaluationContext) -> Optional[Union[RunRequest, S
left_off_asset_check_key = (
AssetCheckKey.from_user_string(context.cursor) if context.cursor else None
)
start_time = pendulum.now("UTC")
start_time = get_current_datetime()
checks_to_evaluate = []
checks_iter = freshness_checks_get_evaluations_iter(
context=context,
Expand All @@ -96,8 +95,7 @@ def the_sensor(context: SensorEvaluationContext) -> Optional[Union[RunRequest, S
# iteration; this allows us to pause the sensor if it runs into the maximum runtime.
check_key, should_evaluate = next(checks_iter, (None, False))
while (
pendulum.now("UTC").timestamp() - start_time.timestamp() < MAXIMUM_RUNTIME_SECONDS
and check_key
get_current_timestamp() - start_time.timestamp() < MAXIMUM_RUNTIME_SECONDS and check_key
):
if should_evaluate:
checks_to_evaluate.append(check_key)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from typing import Any, Dict, Iterable, Sequence, Union

import pendulum

from dagster import _check as check
from dagster._annotations import experimental
from dagster._core.definitions.asset_check_result import AssetCheckResult
Expand All @@ -16,6 +14,7 @@
)
from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsDefinition
from dagster._core.execution.context.compute import AssetCheckExecutionContext
from dagster._time import datetime_from_timestamp, get_current_timestamp
from dagster._utils.schedules import (
get_latest_completed_cron_tick,
get_next_cron_tick,
Expand Down Expand Up @@ -143,17 +142,17 @@ def _build_freshness_multi_check(
def the_check(context: AssetCheckExecutionContext) -> Iterable[AssetCheckResult]:
for check_key in context.selected_asset_check_keys:
asset_key = check_key.asset_key
current_timestamp = pendulum.now("UTC").timestamp()
current_timestamp = get_current_timestamp()

partitions_def = check.inst(
context.job_def.asset_layer.asset_graph.get(asset_key).partitions_def,
TimeWindowPartitionsDefinition,
)
current_time_in_freshness_tz = pendulum.from_timestamp(current_timestamp, tz=timezone)
current_time_in_freshness_tz = datetime_from_timestamp(current_timestamp, tz=timezone)
deadline = get_latest_completed_cron_tick(
deadline_cron, current_time_in_freshness_tz, timezone
)
deadline_in_partitions_def_tz = pendulum.from_timestamp(
deadline_in_partitions_def_tz = datetime_from_timestamp(
deadline.timestamp(), tz=partitions_def.timezone
)
last_completed_time_window = check.not_none(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from typing import Any, Mapping, Optional, cast

import pendulum

import dagster._check as check
from dagster._annotations import experimental
from dagster._core.asset_graph_view.asset_graph_view import AssetGraphView, TemporalContext
Expand All @@ -12,6 +10,7 @@
AutomationConditionEvaluator,
)
from dagster._core.definitions.run_request import SensorResult
from dagster._time import get_current_datetime
from dagster._utils.caching_instance_queryer import CachingInstanceQueryer

from .asset_selection import AssetSelection
Expand All @@ -36,7 +35,7 @@ def evaluate_automation_conditions(context: SensorEvaluationContext):
instance_queryer = CachingInstanceQueryer(
context.instance,
asset_graph,
evaluation_time=pendulum.now(),
evaluation_time=get_current_datetime(),
logger=context.log,
)

Expand Down
6 changes: 2 additions & 4 deletions python_modules/dagster/dagster/_core/definitions/data_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import datetime
from typing import AbstractSet, Dict, Mapping, Optional, Sequence, Tuple, cast

import pendulum

import dagster._check as check
from dagster._core.definitions.asset_selection import KeysAssetSelection
from dagster._core.definitions.base_asset_graph import BaseAssetGraph
Expand All @@ -34,7 +32,7 @@
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.event_api import EventLogRecord
from dagster._core.storage.dagster_run import FINISHED_STATUSES, DagsterRunStatus, RunsFilter
from dagster._time import datetime_from_timestamp
from dagster._time import datetime_from_timestamp, get_current_datetime
from dagster._utils import make_hashable
from dagster._utils.cached_method import cached_method
from dagster._utils.caching_instance_queryer import CachingInstanceQueryer
Expand Down Expand Up @@ -490,7 +488,7 @@ def get_data_time_by_key_for_record(
record_id=record.storage_id,
record_timestamp=record.event_log_entry.timestamp,
record_tags=make_hashable(event.tags or {}),
current_time=current_time or pendulum.now("UTC"),
current_time=current_time or get_current_datetime(),
)

def get_current_data_time(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
import logging
from typing import TYPE_CHECKING, AbstractSet, Any, Mapping, NamedTuple, Optional

import pendulum

import dagster._check as check
from dagster._core.asset_graph_view.asset_graph_view import (
AssetGraphView,
Expand All @@ -24,6 +22,7 @@
)
from dagster._core.definitions.events import AssetKeyPartitionKey
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._time import get_current_datetime

from .legacy.legacy_context import LegacyRuleEvaluationContext

Expand Down Expand Up @@ -122,7 +121,7 @@ def create(
),
asset_graph_view=asset_graph_view,
parent_context=None,
create_time=pendulum.now("UTC"),
create_time=get_current_datetime(),
logger=logger,
cursor=condition_cursor,
current_tick_results_by_key=current_tick_results_by_key,
Expand All @@ -144,7 +143,7 @@ def for_child_condition(
),
asset_graph_view=self.asset_graph_view,
parent_context=self,
create_time=pendulum.now("UTC"),
create_time=get_current_datetime(),
logger=self.logger,
cursor=self.cursor,
current_tick_results_by_key=self.current_tick_results_by_key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from dagster._core.definitions.asset_subset import AssetSubset, ValidAssetSubset
from dagster._core.definitions.events import AssetKeyPartitionKey
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._seven.compat.pendulum import PendulumInterval
from dagster._core.definitions.time_window_partitions import TimeWindow
from dagster._utils.schedules import cron_string_iterator

if TYPE_CHECKING:
Expand All @@ -27,7 +27,7 @@ def get_execution_period_for_policy(
freshness_policy: FreshnessPolicy,
effective_data_time: Optional[datetime.datetime],
current_time: datetime.datetime,
) -> PendulumInterval:
) -> TimeWindow:
if freshness_policy.cron_schedule:
tick_iterator = cron_string_iterator(
start_timestamp=current_time.timestamp(),
Expand All @@ -41,18 +41,18 @@ def get_execution_period_for_policy(
tick = next(tick_iterator)
required_data_time = tick - freshness_policy.maximum_lag_delta
if effective_data_time is None or effective_data_time < required_data_time:
return PendulumInterval(start=required_data_time, end=tick)
return TimeWindow(start=required_data_time, end=tick)

else:
# occurs when asset is missing
if effective_data_time is None:
return PendulumInterval(
return TimeWindow(
# require data from at most maximum_lag_delta ago
start=current_time - freshness_policy.maximum_lag_delta,
# this data should be available as soon as possible
end=current_time,
)
return PendulumInterval(
return TimeWindow(
# we don't want to execute this too frequently
start=effective_data_time + 0.9 * freshness_policy.maximum_lag_delta,
end=max(effective_data_time + freshness_policy.maximum_lag_delta, current_time),
Expand All @@ -64,7 +64,7 @@ def get_execution_period_and_evaluation_data_for_policies(
policies: AbstractSet[FreshnessPolicy],
effective_data_time: Optional[datetime.datetime],
current_time: datetime.datetime,
) -> Tuple[Optional[PendulumInterval], Optional["TextRuleEvaluationData"]]:
) -> Tuple[Optional[TimeWindow], Optional["TextRuleEvaluationData"]]:
"""Determines a range of times for which you can kick off an execution of this asset to solve
the most pressing constraint, alongside a maximum number of additional constraints.
"""
Expand All @@ -84,7 +84,7 @@ def get_execution_period_and_evaluation_data_for_policies(
if merged_period is None:
merged_period = period
elif period.start <= merged_period.end:
merged_period = PendulumInterval(
merged_period = TimeWindow(
start=max(period.start, merged_period.start),
end=period.end,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from typing import Callable, Dict, Mapping, NamedTuple, Optional, Set, cast

import pendulum

import dagster._check as check
from dagster._annotations import PublicAttr, experimental
from dagster._core.definitions.asset_selection import AssetSelection
Expand All @@ -21,6 +19,7 @@
from dagster._serdes.errors import DeserializationError
from dagster._serdes.serdes import deserialize_value
from dagster._seven import JSONDecodeError
from dagster._time import get_current_datetime

from .sensor_definition import (
DefaultSensorStatus,
Expand Down Expand Up @@ -240,7 +239,7 @@ def _wrapped_fn(context: SensorEvaluationContext):
yield SkipReason(f"Initializing {name}.")
return

evaluation_time = pendulum.now("UTC")
evaluation_time = get_current_datetime()
asset_graph = context.repository_def.asset_graph
instance_queryer = CachingInstanceQueryer(
context.instance, asset_graph, evaluation_time
Expand Down
Loading

0 comments on commit 403776f

Please sign in to comment.