From 651ce7bef1bf13f93d4a64a1d91d3869ccb53d69 Mon Sep 17 00:00:00 2001 From: Sashank Thupukari Date: Tue, 26 May 2020 17:37:01 -0400 Subject: [PATCH] [0.7.15] Isolate updating scheduler state from updating cron tab Summary: Previously, the scheduler had only two main methods to control both schedule state and cron jobs: `stop_schedule` and `start_schedule`. In `reconcile_scheduler_state`, for every schedule that was already in the schedule storage, we would call `stop` then `start` schedule to refresh the cron job + sh files needed for the schedule to run. However, this introduces a race condition when calling `reconcile_scheduler_state` multiple times in parallel. Since you can start/stop an already started/stopped schedule respectively, two calls to reconcile that are executing the `stop + start` block will cause each other to error. In the previous diff, we moved `reconcile_schedule_state` to the `Scheduler` base class. In this diff, we rename the previous `start_schedule` and `stop_schedule` methods to `start_schedule_and_update_storage_state` and `stop_schedule_and_update_storage_state` and move them to the base class. Then, the abstract methods called `start_schedule` and `end_schedule` are responsible for only starting and stopping jobs for the schedule. Test Plan: unit Reviewers: alangenfeld, max, prha Reviewed By: max Differential Revision: https://dagster.phacility.com/D3110 --- examples/dagster_examples/schedules.py | 4 +- .../dagit/src/schedules/ScheduleRow.tsx | 22 +- .../src/schedules/types/ScheduleFragment.ts | 2 +- .../src/schedules/types/ScheduleRootQuery.ts | 2 +- .../src/schedules/types/SchedulesRootQuery.ts | 2 +- .../src/schedules/types/StartSchedule.ts | 2 +- .../dagit/src/schedules/types/StopSchedule.ts | 2 +- js_modules/dagit/src/schema.graphql | 2 +- .../implementation/fetch_schedules.py | 4 +- .../dagster_graphql/schema/schedules.py | 11 +- .../snapshots/snap_test_execute_schedule.py | 57 ++++ .../graphql/test_scheduler.py | 4 +- .../dagster/dagster/cli/schedule.py | 16 +- .../dagster/dagster/core/instance/__init__.py | 28 +- .../dagster/core/scheduler/__init__.py | 4 + .../dagster/core/scheduler/scheduler.py | 194 +++++++++++-- .../dagster/dagster/utils/test/__init__.py | 52 +--- .../dagster_cron/cron_scheduler.py | 97 +++---- .../dagster_cron_tests/test_cron_scheduler.py | 255 +++++++++++++++--- 19 files changed, 568 insertions(+), 192 deletions(-) diff --git a/examples/dagster_examples/schedules.py b/examples/dagster_examples/schedules.py index 43d2fecf087e9..9a185ca9c5738 100644 --- a/examples/dagster_examples/schedules.py +++ b/examples/dagster_examples/schedules.py @@ -55,7 +55,9 @@ def backfill_should_execute(context, partition_set_def, schedule_name): is_remaining_partitions = bool(available_partitions.difference(satisfied_partitions)) if not is_remaining_partitions: try: - context.instance.stop_schedule(context.repository, schedule_name) + context.instance.stop_schedule_and_update_storage_state( + context.repository, schedule_name + ) except OSError: pass diff --git a/js_modules/dagit/src/schedules/ScheduleRow.tsx b/js_modules/dagit/src/schedules/ScheduleRow.tsx index 4dafa48fc2183..0963a6958496b 100644 --- a/js_modules/dagit/src/schedules/ScheduleRow.tsx +++ b/js_modules/dagit/src/schedules/ScheduleRow.tsx @@ -53,25 +53,25 @@ const getNaturalLanguageCronString = (cronSchedule: string) => { } }; -const errorDisplay = (status: ScheduleStatus, runningJobCount: number) => { - if (status === ScheduleStatus.STOPPED && runningJobCount == 0) { +const errorDisplay = (status: ScheduleStatus, runningScheduleCount: number) => { + if (status === ScheduleStatus.STOPPED && runningScheduleCount == 0) { return null; - } else if (status === ScheduleStatus.RUNNING && runningJobCount == 1) { + } else if (status === ScheduleStatus.RUNNING && runningScheduleCount == 1) { return null; } const errors = []; - if (status === ScheduleStatus.RUNNING && runningJobCount === 0) { + if (status === ScheduleStatus.RUNNING && runningScheduleCount === 0) { errors.push( "Schedule is set to be running, but the scheduler is not running the schedule" ); - } else if (status === ScheduleStatus.STOPPED && runningJobCount > 0) { + } else if (status === ScheduleStatus.STOPPED && runningScheduleCount > 0) { errors.push( "Schedule is set to be stopped, but the scheduler is still running the schedule" ); } - if (runningJobCount > 0) { + if (runningScheduleCount > 0) { errors.push("Duplicate cron job for schedule found."); } @@ -106,7 +106,7 @@ export const ScheduleRow: React.FunctionComponent<{ const { status, scheduleDefinition, - runningJobCount, + runningScheduleCount, logsPath, stats, ticks, @@ -194,7 +194,7 @@ export const ScheduleRow: React.FunctionComponent<{ }} /> - {errorDisplay(status, runningJobCount)} + {errorDisplay(status, runningScheduleCount)} {displayName} @@ -368,7 +368,7 @@ export const ScheduleRow: React.FunctionComponent<{ export const ScheduleRowFragment = gql` fragment ScheduleFragment on RunningSchedule { __typename - runningJobCount + runningScheduleCount scheduleDefinition { name cronSchedule @@ -428,7 +428,7 @@ const START_SCHEDULE_MUTATION = gql` ... on RunningScheduleResult { schedule { __typename - runningJobCount + runningScheduleCount scheduleDefinition { __typename name @@ -451,7 +451,7 @@ const STOP_SCHEDULE_MUTATION = gql` ... on RunningScheduleResult { schedule { __typename - runningJobCount + runningScheduleCount scheduleDefinition { __typename name diff --git a/js_modules/dagit/src/schedules/types/ScheduleFragment.ts b/js_modules/dagit/src/schedules/types/ScheduleFragment.ts index ea519a08ca637..c825f73fb8387 100644 --- a/js_modules/dagit/src/schedules/types/ScheduleFragment.ts +++ b/js_modules/dagit/src/schedules/types/ScheduleFragment.ts @@ -47,7 +47,7 @@ export interface ScheduleFragment_stats { export interface ScheduleFragment { __typename: "RunningSchedule"; - runningJobCount: number; + runningScheduleCount: number; scheduleDefinition: ScheduleFragment_scheduleDefinition; logsPath: string; ticks: ScheduleFragment_ticks[]; diff --git a/js_modules/dagit/src/schedules/types/ScheduleRootQuery.ts b/js_modules/dagit/src/schedules/types/ScheduleRootQuery.ts index 69fce8a2b397a..01ec028b73c84 100644 --- a/js_modules/dagit/src/schedules/types/ScheduleRootQuery.ts +++ b/js_modules/dagit/src/schedules/types/ScheduleRootQuery.ts @@ -112,7 +112,7 @@ export interface ScheduleRootQuery_scheduleOrError_RunningSchedule_attemptList { export interface ScheduleRootQuery_scheduleOrError_RunningSchedule { __typename: "RunningSchedule"; - runningJobCount: number; + runningScheduleCount: number; scheduleDefinition: ScheduleRootQuery_scheduleOrError_RunningSchedule_scheduleDefinition; logsPath: string; ticks: ScheduleRootQuery_scheduleOrError_RunningSchedule_ticks[]; diff --git a/js_modules/dagit/src/schedules/types/SchedulesRootQuery.ts b/js_modules/dagit/src/schedules/types/SchedulesRootQuery.ts index a458ba6742bc8..a97fc05e9a51a 100644 --- a/js_modules/dagit/src/schedules/types/SchedulesRootQuery.ts +++ b/js_modules/dagit/src/schedules/types/SchedulesRootQuery.ts @@ -52,7 +52,7 @@ export interface SchedulesRootQuery_scheduler_Scheduler_runningSchedules_stats { export interface SchedulesRootQuery_scheduler_Scheduler_runningSchedules { __typename: "RunningSchedule"; - runningJobCount: number; + runningScheduleCount: number; scheduleDefinition: SchedulesRootQuery_scheduler_Scheduler_runningSchedules_scheduleDefinition; logsPath: string; ticks: SchedulesRootQuery_scheduler_Scheduler_runningSchedules_ticks[]; diff --git a/js_modules/dagit/src/schedules/types/StartSchedule.ts b/js_modules/dagit/src/schedules/types/StartSchedule.ts index 43d2e1f682054..d92a259d227dd 100644 --- a/js_modules/dagit/src/schedules/types/StartSchedule.ts +++ b/js_modules/dagit/src/schedules/types/StartSchedule.ts @@ -16,7 +16,7 @@ export interface StartSchedule_startSchedule_RunningScheduleResult_schedule_sche export interface StartSchedule_startSchedule_RunningScheduleResult_schedule { __typename: "RunningSchedule"; - runningJobCount: number; + runningScheduleCount: number; scheduleDefinition: StartSchedule_startSchedule_RunningScheduleResult_schedule_scheduleDefinition; status: ScheduleStatus; } diff --git a/js_modules/dagit/src/schedules/types/StopSchedule.ts b/js_modules/dagit/src/schedules/types/StopSchedule.ts index c34a7dc5714ed..ad2651076f64e 100644 --- a/js_modules/dagit/src/schedules/types/StopSchedule.ts +++ b/js_modules/dagit/src/schedules/types/StopSchedule.ts @@ -16,7 +16,7 @@ export interface StopSchedule_stopRunningSchedule_RunningScheduleResult_schedule export interface StopSchedule_stopRunningSchedule_RunningScheduleResult_schedule { __typename: "RunningSchedule"; - runningJobCount: number; + runningScheduleCount: number; scheduleDefinition: StopSchedule_stopRunningSchedule_RunningScheduleResult_schedule_scheduleDefinition; status: ScheduleStatus; } diff --git a/js_modules/dagit/src/schema.graphql b/js_modules/dagit/src/schema.graphql index 638ee5d338c61..acd815239f2f5 100644 --- a/js_modules/dagit/src/schema.graphql +++ b/js_modules/dagit/src/schema.graphql @@ -1003,7 +1003,7 @@ type RunningSchedule { attempts(limit: Int): [ScheduleAttempt!]! attemptsCount: Int! logsPath: String! - runningJobCount: Int! + runningScheduleCount: Int! } type RunningScheduleResult { diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_schedules.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_schedules.py index 4e211308e360b..dc1ef287cd6df 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_schedules.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_schedules.py @@ -16,7 +16,7 @@ def start_schedule(graphene_info, schedule_name): repository = graphene_info.context.get_repository() instance = graphene_info.context.instance - schedule = instance.start_schedule(repository, schedule_name) + schedule = instance.start_schedule_and_update_storage_state(repository, schedule_name) return graphene_info.schema.type_named('RunningScheduleResult')( schedule=graphene_info.schema.type_named('RunningSchedule')( graphene_info, schedule=schedule @@ -28,7 +28,7 @@ def start_schedule(graphene_info, schedule_name): def stop_schedule(graphene_info, schedule_name): repository = graphene_info.context.get_repository() instance = graphene_info.context.instance - schedule = instance.stop_schedule(repository, schedule_name) + schedule = instance.stop_schedule_and_update_storage_state(repository, schedule_name) return graphene_info.schema.type_named('RunningScheduleResult')( schedule=graphene_info.schema.type_named('RunningSchedule')( graphene_info, schedule=schedule diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/schedules.py b/python_modules/dagster-graphql/dagster_graphql/schema/schedules.py index 9cb3c48f43a0b..14c17b3cba733 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/schedules.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/schedules.py @@ -20,8 +20,7 @@ from dagster.core.definitions import ScheduleDefinition, ScheduleExecutionContext from dagster.core.definitions.partition import PartitionScheduleDefinition from dagster.core.errors import ScheduleExecutionError, user_code_error_boundary -from dagster.core.scheduler import Schedule, ScheduleTickStatus -from dagster.core.scheduler.scheduler import ScheduleTickStatsSnapshot +from dagster.core.scheduler import Schedule, ScheduleTickStatsSnapshot, ScheduleTickStatus from dagster.core.storage.pipeline_run import PipelineRunsFilter @@ -208,7 +207,7 @@ class Meta(object): attempts = dauphin.Field(dauphin.non_null_list('ScheduleAttempt'), limit=dauphin.Int()) attempts_count = dauphin.NonNull(dauphin.Int) logs_path = dauphin.NonNull(dauphin.String) - running_job_count = dauphin.NonNull(dauphin.Int) + running_schedule_count = dauphin.NonNull(dauphin.Int) def __init__(self, graphene_info, schedule): self._schedule = check.inst_param(schedule, 'schedule', Schedule) @@ -223,12 +222,12 @@ def __init__(self, graphene_info, schedule): repository_path=schedule.repository_path, ) - def resolve_running_job_count(self, graphene_info): + def resolve_running_schedule_count(self, graphene_info): repository = graphene_info.context.get_repository() - running_job_count = graphene_info.context.instance.running_job_count( + running_schedule_count = graphene_info.context.instance.running_schedule_count( repository.name, self._schedule.name ) - return running_job_count + return running_schedule_count # TODO: Delete in 0.8.0 release # https://github.com/dagster-io/dagster/issues/228 diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_execute_schedule.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_execute_schedule.py index 2bf8049791854..3bfabd567002f 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_execute_schedule.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_execute_schedule.py @@ -329,3 +329,60 @@ ], 'ticksCount': 1 } + +snapshots['test_tick_success 1'] = { + 'scheduleDefinition': { + 'name': 'no_config_pipeline_hourly_schedule' + }, + 'stats': { + 'ticksFailed': 0, + 'ticksSkipped': 0, + 'ticksStarted': 0, + 'ticksSucceeded': 1 + }, + 'ticks': [ + { + 'status': 'SUCCESS', + 'tickId': '1' + } + ], + 'ticksCount': 1 +} + +snapshots['test_should_execute_scheduler_error 1'] = { + 'scheduleDefinition': { + 'name': 'should_execute_error_schedule' + }, + 'stats': { + 'ticksFailed': 1, + 'ticksSkipped': 0, + 'ticksStarted': 0, + 'ticksSucceeded': 0 + }, + 'ticks': [ + { + 'status': 'FAILURE', + 'tickId': '1' + } + ], + 'ticksCount': 1 +} + +snapshots['test_invalid_config_schedule_error 1'] = { + 'scheduleDefinition': { + 'name': 'invalid_config_schedule' + }, + 'stats': { + 'ticksFailed': 0, + 'ticksSkipped': 0, + 'ticksStarted': 0, + 'ticksSucceeded': 1 + }, + 'ticks': [ + { + 'status': 'SUCCESS', + 'tickId': '1' + } + ], + 'ticksCount': 1 +} diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_scheduler.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_scheduler.py index 4fdb35fc7bbed..ee547fccaec13 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_scheduler.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_scheduler.py @@ -75,7 +75,9 @@ def test_get_all_schedules(): ) # Start schedule - schedule = instance.start_schedule(repository, "no_config_pipeline_hourly_schedule") + schedule = instance.start_schedule_and_update_storage_state( + repository, "no_config_pipeline_hourly_schedule" + ) # Query Scheduler + all Schedules scheduler_result = execute_dagster_graphql(context, GET_SCHEDULES_QUERY) diff --git a/python_modules/dagster/dagster/cli/schedule.py b/python_modules/dagster/dagster/cli/schedule.py index 0e86ec439b2f3..93b35976455de 100644 --- a/python_modules/dagster/dagster/cli/schedule.py +++ b/python_modules/dagster/dagster/cli/schedule.py @@ -281,14 +281,16 @@ def execute_start_command(schedule_name, all_flag, cli_args, print_fn): if all_flag: for schedule in instance.all_schedules(repository): try: - schedule = instance.start_schedule(repository, schedule.name) + schedule = instance.start_schedule_and_update_storage_state( + repository, schedule.name + ) except DagsterInvariantViolationError as ex: raise click.UsageError(ex) print_fn("Started all schedules for repository {name}".format(name=repository.name)) else: try: - schedule = instance.start_schedule(repository, schedule_name) + schedule = instance.start_schedule_and_update_storage_state(repository, schedule_name) except DagsterInvariantViolationError as ex: raise click.UsageError(ex) @@ -311,7 +313,7 @@ def execute_stop_command(schedule_name, cli_args, print_fn, instance=None): repository = handle.build_repository_definition() try: - instance.stop_schedule(repository, schedule_name) + instance.stop_schedule_and_update_storage_state(repository, schedule_name) except DagsterInvariantViolationError as ex: raise click.UsageError(ex) @@ -368,8 +370,8 @@ def execute_restart_command(schedule_name, all_running_flag, cli_args, print_fn) for schedule in instance.all_schedules(repository): if schedule.status == ScheduleStatus.RUNNING: try: - instance.stop_schedule(repository, schedule.name) - instance.start_schedule(repository, schedule.name) + instance.stop_schedule_and_update_storage_state(repository, schedule.name) + instance.start_schedule_and_update_storage_state(repository, schedule.name) except DagsterInvariantViolationError as ex: raise click.UsageError(ex) @@ -386,8 +388,8 @@ def execute_restart_command(schedule_name, all_running_flag, cli_args, print_fn) ) try: - instance.stop_schedule(repository, schedule_name) - instance.start_schedule(repository, schedule_name) + instance.stop_schedule_and_update_storage_state(repository, schedule_name) + instance.start_schedule_and_update_storage_state(repository, schedule_name) except DagsterInvariantViolationError as ex: raise click.UsageError(ex) diff --git a/python_modules/dagster/dagster/core/instance/__init__.py b/python_modules/dagster/dagster/core/instance/__init__.py index 4536b0ca0a877..2eca2cba09ec6 100644 --- a/python_modules/dagster/dagster/core/instance/__init__.py +++ b/python_modules/dagster/dagster/core/instance/__init__.py @@ -807,17 +807,23 @@ def reconcile_scheduler_state(self, python_path, repository_path, repository): self, repository, python_path, repository_path ) - def start_schedule(self, repository, schedule_name): - return self._scheduler.start_schedule(self, repository, schedule_name) + def start_schedule_and_update_storage_state(self, repository, schedule_name): + return self._scheduler.start_schedule_and_update_storage_state( + self, repository, schedule_name + ) - def stop_schedule(self, repository, schedule_name): - return self._scheduler.stop_schedule(self, repository, schedule_name) + def stop_schedule_and_update_storage_state(self, repository, schedule_name): + return self._scheduler.stop_schedule_and_update_storage_state( + self, repository, schedule_name + ) - def end_schedule(self, repository, schedule_name): - return self._scheduler.end_schedule(self, repository, schedule_name) + def stop_schedule_and_delete_from_storage(self, repository, schedule_name): + return self._scheduler.stop_schedule_and_delete_from_storage( + self, repository, schedule_name + ) - def running_job_count(self, repository_name, schedule_name): - return self._scheduler.running_job_count(repository_name, schedule_name) + def running_schedule_count(self, repository_name, schedule_name): + return self._scheduler.running_schedule_count(repository_name, schedule_name) def scheduler_debug_info(self): from dagster.core.scheduler import SchedulerDebugInfo, ScheduleStatus @@ -827,14 +833,14 @@ def scheduler_debug_info(self): schedule_info = self.all_schedules_info() schedules = [] for repository_name, schedule in schedule_info: - if schedule.status == ScheduleStatus.RUNNING and not self.running_job_count( + if schedule.status == ScheduleStatus.RUNNING and not self.running_schedule_count( repository_name, schedule.name ): errors.append( "Schedule {schedule_name} is set to be running, but the scheduler is not " "running the schedule.".format(schedule_name=schedule.name) ) - elif schedule.status == ScheduleStatus.STOPPED and self.running_job_count( + elif schedule.status == ScheduleStatus.STOPPED and self.running_schedule_count( repository_name, schedule.name ): errors.append( @@ -842,7 +848,7 @@ def scheduler_debug_info(self): "the schedule.".format(schedule_name=schedule.name) ) - if self.running_job_count(repository_name, schedule.name) > 1: + if self.running_schedule_count(repository_name, schedule.name) > 1: errors.append( "Duplicate jobs found: More than one job for schedule {schedule_name} are " "running on the scheduler.".format(schedule_name=schedule.name) diff --git a/python_modules/dagster/dagster/core/scheduler/__init__.py b/python_modules/dagster/dagster/core/scheduler/__init__.py index a5e3e92fcb977..8105df9177273 100644 --- a/python_modules/dagster/dagster/core/scheduler/__init__.py +++ b/python_modules/dagster/dagster/core/scheduler/__init__.py @@ -1,8 +1,12 @@ from .scheduler import ( + DagsterScheduleDoesNotExist, + DagsterScheduleReconciliationError, + DagsterSchedulerError, Schedule, ScheduleDefinitionData, ScheduleStatus, ScheduleTick, + ScheduleTickStatsSnapshot, ScheduleTickStatus, Scheduler, SchedulerDebugInfo, diff --git a/python_modules/dagster/dagster/core/scheduler/scheduler.py b/python_modules/dagster/dagster/core/scheduler/scheduler.py index 2572d973e55c0..56d9932353f6c 100644 --- a/python_modules/dagster/dagster/core/scheduler/scheduler.py +++ b/python_modules/dagster/dagster/core/scheduler/scheduler.py @@ -5,11 +5,44 @@ import six from dagster import check +from dagster.core.definitions.repository import RepositoryDefinition from dagster.core.definitions.schedule import ScheduleDefinition, ScheduleDefinitionData +from dagster.core.errors import DagsterError +from dagster.core.instance import DagsterInstance from dagster.serdes import whitelist_for_serdes from dagster.utils.error import SerializableErrorInfo +class DagsterSchedulerError(DagsterError): + '''Base class for all Dagster Scheduler errors''' + + +class DagsterScheduleReconciliationError(DagsterError): + '''Error raised during schedule state reconcilation. During reconcilation, exceptions that are + raised when trying to start or stop a schedule are collected and passed to this wrapper exception. + The individual exceptions can be accessed by the `errors` property. ''' + + def __init__(self, preamble, errors, *args, **kwargs): + self.errors = errors + + error_msg = preamble + error_messages = [] + for i_error, error in enumerate(self.errors): + error_messages.append(str(error)) + error_msg += '\n Error {i_error}: {error_message}'.format( + i_error=i_error + 1, error_message=str(error) + ) + + self.message = error_msg + self.error_messages = error_messages + + super(DagsterScheduleReconciliationError, self).__init__(error_msg, *args, **kwargs) + + +class DagsterScheduleDoesNotExist(DagsterSchedulerError): + '''Errors raised when ending a job for a schedule.''' + + @whitelist_for_serdes class ScheduleStatus(Enum): RUNNING = 'RUNNING' @@ -121,6 +154,17 @@ class Scheduler(six.with_metaclass(abc.ABCMeta)): an external system such as cron to ensure scheduled repeated execution according. ''' + def _get_schedule_by_name(self, instance, repository, schedule_name): + schedule = instance.get_schedule_by_name(repository, schedule_name) + if not schedule: + raise DagsterScheduleDoesNotExist( + 'You have attempted to start the job for schedule {name}, but it does not exist.'.format( + name=schedule_name + ) + ) + + return schedule + def reconcile_scheduler_state(self, instance, repository, python_path, repository_path): '''Reconcile the ScheduleDefinitions list from the repository and ScheduleStorage on the instance to ensure there is a 1-1 correlation between ScheduleDefinitions and @@ -170,17 +214,110 @@ def reconcile_scheduler_state(self, instance, repository, python_path, repositor existing_schedule_names = set([s.name for s in instance.all_schedules(repository)]) schedule_names_to_delete = existing_schedule_names - schedule_def_names + schedule_reconciliation_errors = [] for schedule in schedules_to_restart: # Restart is only needed if the schedule was previously running if schedule.status == ScheduleStatus.RUNNING: - self.stop_schedule(instance, repository, schedule.name) - self.start_schedule(instance, repository, schedule.name) + try: + self.stop_schedule(instance, repository, schedule.name) + self.start_schedule(instance, repository, schedule.name) + except DagsterSchedulerError as e: + schedule_reconciliation_errors.append(e) if schedule.status == ScheduleStatus.STOPPED: - self.stop_schedule(instance, repository, schedule.name) + try: + self.stop_schedule(instance, repository, schedule.name) + except DagsterSchedulerError as e: + schedule_reconciliation_errors.append(e) for schedule_name in schedule_names_to_delete: - self.end_schedule(instance, repository, schedule_name) + try: + instance.stop_schedule_and_delete_from_storage(repository, schedule_name) + except DagsterSchedulerError as e: + schedule_reconciliation_errors.append(e) + + if len(schedule_reconciliation_errors): + raise DagsterScheduleReconciliationError( + "One or more errors were encountered by the Scheduler while starting or stopping schedules. " + "Individual error messages follow:", + errors=schedule_reconciliation_errors, + ) + + def start_schedule_and_update_storage_state(self, instance, repository, schedule_name): + ''' + Updates the status of the given schedule to `ScheduleStatus.RUNNING` in schedule storage, + then calls `start_schedule`. + + This should not be overridden by subclasses. + + Args: + instance (DagsterInstance): The current instance. + repository (RepositoryDefinition): The repository containing the schedule definition. + schedule_name (string): The name of the schedule to start running. + ''' + + check.inst_param(instance, 'instance', DagsterInstance) + check.inst_param(repository, 'repository', RepositoryDefinition) + check.str_param(schedule_name, 'schedule_name') + + schedule = self._get_schedule_by_name(instance, repository, schedule_name) + + if schedule.status == ScheduleStatus.RUNNING: + raise DagsterSchedulerError( + 'You have attempted to start schedule {name}, but it is already running'.format( + name=schedule_name + ) + ) + + self.start_schedule(instance, repository, schedule.name) + started_schedule = schedule.with_status(ScheduleStatus.RUNNING) + instance.update_schedule(repository, started_schedule) + return started_schedule + + def stop_schedule_and_update_storage_state(self, instance, repository, schedule_name): + ''' + Updates the status of the given schedule to `ScheduleStatus.STOPPED` in schedule storage, + then calls `stop_schedule`. + + This should not be overridden by subclasses. + + Args: + instance (DagsterInstance): The current instance. + repository (RepositoryDefinition): The repository containing the schedule definition. + schedule_name (string): The name of the schedule to start running. + ''' + + check.inst_param(instance, 'instance', DagsterInstance) + check.inst_param(repository, 'repository', RepositoryDefinition) + check.str_param(schedule_name, 'schedule_name') + + schedule = self._get_schedule_by_name(instance, repository, schedule_name) + + self.stop_schedule(instance, repository, schedule.name) + stopped_schedule = schedule.with_status(ScheduleStatus.STOPPED) + instance.update_schedule(repository, stopped_schedule) + return stopped_schedule + + def stop_schedule_and_delete_from_storage(self, instance, repository, schedule_name): + ''' + Deletes a schedule from schedule storage, then calls `stop_schedule`. + + This should not be overridden by subclasses. + + Args: + instance (DagsterInstance): The current instance. + repository (RepositoryDefinition): The repository containing the schedule definition. + schedule_name (string): The name of the schedule to start running. + ''' + + check.inst_param(instance, 'instance', DagsterInstance) + check.inst_param(repository, 'repository', RepositoryDefinition) + check.str_param(schedule_name, 'schedule_name') + + schedule = self._get_schedule_by_name(instance, repository, schedule_name) + self.stop_schedule(instance, repository, schedule.name) + instance.delete_schedule(repository, schedule) + return schedule @abc.abstractmethod def debug_info(self): @@ -189,35 +326,56 @@ def debug_info(self): @abc.abstractmethod def start_schedule(self, instance, repository, schedule_name): - '''Resume a pipeline schedule. + '''Start running a schedule. This method is called by `start_schedule_and_update_storage_state`, + which first updates the status of the schedule in schedule storage to `ScheduleStatus.RUNNING`, + then calls this method. + + For example, in the cron scheduler, this method writes a cron job to the cron tab + for the given schedule. Args: - schedule_name (string): The schedule to resume + instance (DagsterInstance): The current instance. + repository (RepositoryDefinition): The repository containing the schedule definition. + schedule_name (string): The name of the schedule to start running. ''' @abc.abstractmethod def stop_schedule(self, instance, repository, schedule_name): - '''Stops an existing pipeline schedule + '''Stop running a schedule. - Args: - schedule_name (string): The schedule to stop - ''' + This method is called by + 1) `stop_schedule_and_update_storage_state`, + which first updates the status of the schedule in schedule storage to `ScheduleStatus.STOPPED`, + then calls this method. + 2) `stop_schedule_and_delete_from_storage`, which deletes the schedule from schedule storage + then calls this method. - @abc.abstractmethod - def end_schedule(self, instance, repository, schedule_name): - '''Resume a pipeline schedule. + For example, in the cron scheduler, this method deletes the cron job for a given scheduler + from the cron tab. Args: - schedule_name (string): The schedule to end and delete + instance (DagsterInstance): The current instance. + repository (RepositoryDefinition): The repository containing the schedule definition. + schedule_name (string): The schedule to stop running. ''' @abc.abstractmethod - def running_job_count(self, repository_name, schedule_name): - '''Resume a pipeline schedule. + def running_schedule_count(self, repository_name, schedule_name): + '''Returns the number of jobs currently running for the given schedule. This method is used + for detecting when the scheduler is out of sync with schedule storage. + + For example, when: + - There are duplicate jobs runnning for a single schedule + - There are no jobs runnning for a schedule that is set to be running + - There are still jobs running for a schedule that is set to be stopped + + When the scheduler and schedule storage are in sync, this method should return: + - 1 when a schedule is set to be running + - 0 wen a schedule is set to be stopped Args: - repository_name (string): The repository the schedule belongs to - schedule_name (string): The name of the schedule to check + repository_name (string): The name of the repository containing the schedule definition. + schedule_name (string): The schedule to return the number of jobs for ''' @abc.abstractmethod diff --git a/python_modules/dagster/dagster/utils/test/__init__.py b/python_modules/dagster/dagster/utils/test/__init__.py index 8fa25723f0b88..0cf8f5a898905 100644 --- a/python_modules/dagster/dagster/utils/test/__init__.py +++ b/python_modules/dagster/dagster/utils/test/__init__.py @@ -33,6 +33,7 @@ ) from dagster.core.instance import DagsterInstance from dagster.core.scheduler import ScheduleStatus, Scheduler +from dagster.core.scheduler.scheduler import DagsterScheduleDoesNotExist, DagsterSchedulerError from dagster.core.storage.file_manager import LocalFileManager from dagster.core.storage.intermediates_manager import InMemoryIntermediatesManager from dagster.core.storage.pipeline_run import PipelineRun @@ -387,6 +388,12 @@ def restore_directory(src): class FilesytemTestScheduler(Scheduler): + '''This class is used in dagster core and dagster_graphql to test the scheduler's interactions + with schedule storage, which are implemented in the methods defined on the base Scheduler class. + Therefore, the following methods used to actually schedule jobs (e.g. create and remove cron jobs + on a cron tab) are left unimimplemented. + ''' + def __init__(self, artifacts_dir): check.str_param(artifacts_dir, 'artifacts_dir') self._artifacts_dir = artifacts_dir @@ -395,53 +402,14 @@ def debug_info(self): return "" def start_schedule(self, instance, repository, schedule_name): - schedule = instance.get_schedule_by_name(repository, schedule_name) - if not schedule: - raise DagsterInvariantViolationError( - 'You have attempted to start schedule {name}, but it does not exist.'.format( - name=schedule_name - ) - ) - - if schedule.status == ScheduleStatus.RUNNING: - raise DagsterInvariantViolationError( - 'You have attempted to start schedule {name}, but it is already running'.format( - name=schedule_name - ) - ) - - started_schedule = schedule.with_status(ScheduleStatus.RUNNING) - instance.update_schedule(repository, started_schedule) - return schedule + pass def stop_schedule(self, instance, repository, schedule_name): - schedule = instance.get_schedule_by_name(repository, schedule_name) - if not schedule: - raise DagsterInvariantViolationError( - 'You have attempted to stop schedule {name}, but was never initialized.' - 'Use `schedule up` to initialize schedules'.format(name=schedule_name) - ) - - stopped_schedule = schedule.with_status(ScheduleStatus.STOPPED) - instance.update_schedule(repository, stopped_schedule) - return stopped_schedule + pass - def running_job_count(self, repository_name, schedule_name): - # Not currently tested in dagster core + def running_schedule_count(self, repository_name, schedule_name): return 0 - def end_schedule(self, instance, repository, schedule_name): - schedule = instance.get_schedule_by_name(repository, schedule_name) - if not schedule: - raise DagsterInvariantViolationError( - 'You have attempted to end schedule {name}, but it is not running.'.format( - name=schedule_name - ) - ) - - instance.storage.delete_schedule(repository, schedule) - return schedule - def get_logs_directory(self, instance, repository, schedule_name): check.inst_param(repository, 'repository', RepositoryDefinition) check.str_param(schedule_name, 'schedule_name') diff --git a/python_modules/libraries/dagster-cron/dagster_cron/cron_scheduler.py b/python_modules/libraries/dagster-cron/dagster_cron/cron_scheduler.py index 070fd8fd2a883..4a549aac2b8b5 100644 --- a/python_modules/libraries/dagster-cron/dagster_cron/cron_scheduler.py +++ b/python_modules/libraries/dagster-cron/dagster_cron/cron_scheduler.py @@ -6,9 +6,9 @@ import six from crontab import CronTab -from dagster import DagsterInstance, DagsterInvariantViolationError, check, seven, utils +from dagster import DagsterInstance, check, seven, utils from dagster.core.definitions import RepositoryDefinition -from dagster.core.scheduler import ScheduleStatus, Scheduler +from dagster.core.scheduler import DagsterSchedulerError, Scheduler from dagster.serdes import ConfigurableClass @@ -45,71 +45,58 @@ def debug_info(self): ) def start_schedule(self, instance, repository, schedule_name): - schedule = instance.get_schedule_by_name(repository, schedule_name) - if not schedule: - raise DagsterInvariantViolationError( - 'You have attempted to start schedule {name}, but it does not exist.'.format( - name=schedule_name - ) - ) + check.inst_param(instance, 'instance', DagsterInstance) + check.inst_param(repository, 'repository', RepositoryDefinition) + check.str_param(schedule_name, 'schedule_name') - if schedule.status == ScheduleStatus.RUNNING: - raise DagsterInvariantViolationError( - 'You have attempted to start schedule {name}, but it is already running'.format( - name=schedule_name - ) - ) + schedule = self._get_schedule_by_name(instance, repository, schedule_name) + + # If the cron job already exists, remove it. This prevents duplicate entries. + # Then, add a new cron job to the cron tab. + if self.running_schedule_count(repository.name, schedule.name) > 0: + self._end_cron_job(instance, repository, schedule) - started_schedule = schedule.with_status(ScheduleStatus.RUNNING) - self._start_cron_job(instance, repository, started_schedule) + self._start_cron_job(instance, repository, schedule) - # Check that the schedule made it to the cron tab - if not self.running_job_count(repository.name, schedule.name): - raise DagsterInvariantViolationError( - "Attempted to write cron job for schedule {schedule_name}, but failed".format( - schedule_name=schedule.name + # Verify that the cron job is running + running_schedule_count = self.running_schedule_count(repository.name, schedule.name) + if running_schedule_count == 0: + raise DagsterSchedulerError( + "Attempted to write cron job for schedule " + "{schedule_name}, but failed. " + "The scheduler is not running {schedule_name}.".format(schedule_name=schedule.name) + ) + elif running_schedule_count > 1: + raise DagsterSchedulerError( + "Attempted to write cron job for schedule " + "{schedule_name}, but duplicate cron jobs were found. " + "There are {running_schedule_count} jobs running for the schedule." + "To resolve, run `dagster schedule up`, or edit the cron tab to " + "remove duplicate schedules".format( + schedule_name=schedule.name, running_schedule_count=running_schedule_count ) ) - # If the cron job was successfully installed, then update scheduler state - instance.update_schedule(repository, started_schedule) - return started_schedule - def stop_schedule(self, instance, repository, schedule_name): - schedule = instance.get_schedule_by_name(repository, schedule_name) - if not schedule: - raise DagsterInvariantViolationError( - 'You have attempted to stop schedule {name}, but was never initialized.' - 'Use `schedule up` to initialize schedules'.format(name=schedule_name) - ) - - stopped_schedule = schedule.with_status(ScheduleStatus.STOPPED) - self._end_cron_job(instance, repository, stopped_schedule) - - if self.running_job_count(repository.name, schedule.name): - raise DagsterInvariantViolationError( - "Attempted to remove cron job for schedule {schedule_name}, but failed. The cron " - "job for the schedule is still running".format(schedule_name=schedule.name) - ) + check.inst_param(instance, 'instance', DagsterInstance) + check.inst_param(repository, 'repository', RepositoryDefinition) + check.str_param(schedule_name, 'schedule_name') - instance.update_schedule(repository, stopped_schedule) + schedule = self._get_schedule_by_name(instance, repository, schedule_name) - return stopped_schedule + self._end_cron_job(instance, repository, schedule) - def end_schedule(self, instance, repository, schedule_name): - schedule = instance.get_schedule_by_name(repository, schedule_name) - if not schedule: - raise DagsterInvariantViolationError( - 'You have attempted to end schedule {name}, but it is not running.'.format( - name=schedule_name + # Verify that the cron job has been removed + running_schedule_count = self.running_schedule_count(repository.name, schedule.name) + if running_schedule_count > 0: + raise DagsterSchedulerError( + "Attempted to remove existing cron job for schedule " + "{schedule_name}, but failed. " + "There are still {running_schedule_count} jobs running for the schedule.".format( + schedule_name=schedule.name, running_schedule_count=running_schedule_count ) ) - instance.delete_schedule(repository, schedule) - self._end_cron_job(instance, repository, schedule) - - return schedule - def wipe(self, instance): # Note: This method deletes schedules from ALL repositories check.inst_param(instance, 'instance', DagsterInstance) @@ -176,7 +163,7 @@ def _end_cron_job(self, instance, repository, schedule): if os.path.isfile(script_file): os.remove(script_file) - def running_job_count(self, repository_name, schedule_name): + def running_schedule_count(self, repository_name, schedule_name): cron_tab = CronTab(user=True) matching_jobs = cron_tab.find_comment( self._cron_tag_for_schedule(repository_name, schedule_name) diff --git a/python_modules/libraries/dagster-cron/dagster_cron_tests/test_cron_scheduler.py b/python_modules/libraries/dagster-cron/dagster_cron_tests/test_cron_scheduler.py index ad408b1dd9f21..87e42c7002969 100644 --- a/python_modules/libraries/dagster-cron/dagster_cron_tests/test_cron_scheduler.py +++ b/python_modules/libraries/dagster-cron/dagster_cron_tests/test_cron_scheduler.py @@ -8,9 +8,13 @@ from dagster import ScheduleDefinition, check, file_relative_path from dagster.core.definitions import RepositoryDefinition, lambda_solid, pipeline -from dagster.core.errors import DagsterInvariantViolationError from dagster.core.instance import DagsterInstance, InstanceType from dagster.core.scheduler import Schedule, ScheduleStatus +from dagster.core.scheduler.scheduler import ( + DagsterScheduleDoesNotExist, + DagsterScheduleReconciliationError, + DagsterSchedulerError, +) from dagster.core.storage.event_log import InMemoryEventLogStorage from dagster.core.storage.local_compute_log_manager import NoOpComputeLogManager from dagster.core.storage.root import LocalArtifactStorage @@ -89,6 +93,12 @@ def define_repo(): ) +def define_smaller_repo(): + return RepositoryDefinition( + name="test", pipeline_defs=[no_config_pipeline], schedule_defs=define_schedules()[:-1], + ) + + def define_scheduler_instance(tempdir): return DagsterInstance( instance_type=InstanceType.EPHEMERAL, @@ -135,7 +145,9 @@ def test_re_init( ) # Start schedule - schedule = instance.start_schedule(repository, "no_config_pipeline_every_min_schedule") + schedule = instance.start_schedule_and_update_storage_state( + repository, "no_config_pipeline_every_min_schedule" + ) # Re-initialize scheduler instance.reconcile_scheduler_state( @@ -168,7 +180,9 @@ def test_start_and_stop_schedule( schedule_def = repository.get_schedule_def("no_config_pipeline_every_min_schedule") - schedule = instance.start_schedule(repository, "no_config_pipeline_every_min_schedule") + schedule = instance.start_schedule_and_update_storage_state( + repository, "no_config_pipeline_every_min_schedule" + ) check.inst_param(schedule, 'schedule', Schedule) assert "/bin/python" in schedule.python_path @@ -179,12 +193,25 @@ def test_start_and_stop_schedule( os.path.join(tempdir, 'schedules', 'scripts') ) - instance.stop_schedule(repository, "no_config_pipeline_every_min_schedule") + instance.stop_schedule_and_update_storage_state( + repository, "no_config_pipeline_every_min_schedule" + ) assert "{}.{}.sh".format(repository.name, schedule_def.name) not in os.listdir( os.path.join(tempdir, 'schedules', 'scripts') ) +def test_start_non_existent_schedule( + restore_cron_tab, repository +): # pylint:disable=unused-argument,redefined-outer-name + with TemporaryDirectory() as tempdir: + instance = define_scheduler_instance(tempdir) + + with pytest.raises(DagsterScheduleDoesNotExist): + # Initialize scheduler + instance.start_schedule_and_update_storage_state(repository, "asdf") + + def get_cron_jobs(): output = subprocess.check_output(['crontab', '-l']) return list(filter(None, output.decode('utf-8').strip().split("\n"))) @@ -203,9 +230,15 @@ def test_start_schedule_cron_job( repository=repository, ) - instance.start_schedule(repository, "no_config_pipeline_every_min_schedule") - instance.start_schedule(repository, "no_config_pipeline_daily_schedule") - instance.start_schedule(repository, "default_config_pipeline_every_min_schedule") + instance.start_schedule_and_update_storage_state( + repository, "no_config_pipeline_every_min_schedule" + ) + instance.start_schedule_and_update_storage_state( + repository, "no_config_pipeline_daily_schedule" + ) + instance.start_schedule_and_update_storage_state( + repository, "default_config_pipeline_every_min_schedule" + ) # Inspect the cron tab cron_jobs = get_cron_jobs() @@ -231,6 +264,54 @@ def test_start_schedule_cron_job( assert log_file.endswith("scheduler.log") +def test_remove_schedule_def( + restore_cron_tab, repository +): # pylint:disable=unused-argument,redefined-outer-name + with TemporaryDirectory() as tempdir: + instance = define_scheduler_instance(tempdir) + + # Initialize scheduler + instance.reconcile_scheduler_state( + python_path=sys.executable, + repository_path=file_relative_path(__file__, '.../repository.yam'), + repository=repository, + ) + + assert len(instance.all_schedules(repository=repository)) == 3 + + instance.reconcile_scheduler_state( + python_path=sys.executable, + repository_path=file_relative_path(__file__, '.../repository.yam'), + repository=define_smaller_repo(), + ) + + assert len(instance.all_schedules(repository=repository)) == 2 + + +def test_add_schedule_def( + restore_cron_tab, repository +): # pylint:disable=unused-argument,redefined-outer-name + with TemporaryDirectory() as tempdir: + instance = define_scheduler_instance(tempdir) + + # Initialize scheduler + instance.reconcile_scheduler_state( + python_path=sys.executable, + repository_path=file_relative_path(__file__, '.../repository.yam'), + repository=define_smaller_repo(), + ) + + assert len(instance.all_schedules(repository=repository)) == 2 + + instance.reconcile_scheduler_state( + python_path=sys.executable, + repository_path=file_relative_path(__file__, '.../repository.yam'), + repository=repository, + ) + + assert len(instance.all_schedules(repository=repository)) == 3 + + def test_start_and_stop_schedule_cron_tab( restore_cron_tab, repository ): # pylint:disable=unused-argument,redefined-outer-name @@ -245,33 +326,45 @@ def test_start_and_stop_schedule_cron_tab( ) # Start schedule - instance.start_schedule(repository, "no_config_pipeline_every_min_schedule") + instance.start_schedule_and_update_storage_state( + repository, "no_config_pipeline_every_min_schedule" + ) cron_jobs = get_cron_jobs() assert len(cron_jobs) == 1 # Try starting it again - with pytest.raises(DagsterInvariantViolationError): - instance.start_schedule(repository, "no_config_pipeline_every_min_schedule") + with pytest.raises(DagsterSchedulerError): + instance.start_schedule_and_update_storage_state( + repository, "no_config_pipeline_every_min_schedule" + ) cron_jobs = get_cron_jobs() assert len(cron_jobs) == 1 # Start another schedule - instance.start_schedule(repository, "no_config_pipeline_daily_schedule") + instance.start_schedule_and_update_storage_state( + repository, "no_config_pipeline_daily_schedule" + ) cron_jobs = get_cron_jobs() assert len(cron_jobs) == 2 # Stop second schedule - instance.stop_schedule(repository, "no_config_pipeline_daily_schedule") + instance.stop_schedule_and_update_storage_state( + repository, "no_config_pipeline_daily_schedule" + ) cron_jobs = get_cron_jobs() assert len(cron_jobs) == 1 # Try stopping second schedule again - instance.stop_schedule(repository, "no_config_pipeline_daily_schedule") + instance.stop_schedule_and_update_storage_state( + repository, "no_config_pipeline_daily_schedule" + ) cron_jobs = get_cron_jobs() assert len(cron_jobs) == 1 # Start second schedule - instance.start_schedule(repository, "no_config_pipeline_daily_schedule") + instance.start_schedule_and_update_storage_state( + repository, "no_config_pipeline_daily_schedule" + ) cron_jobs = get_cron_jobs() assert len(cron_jobs) == 2 @@ -284,7 +377,9 @@ def test_start_and_stop_schedule_cron_tab( cron_jobs = get_cron_jobs() assert len(cron_jobs) == 2 - instance.start_schedule(repository, "default_config_pipeline_every_min_schedule") + instance.start_schedule_and_update_storage_state( + repository, "default_config_pipeline_every_min_schedule" + ) cron_jobs = get_cron_jobs() assert len(cron_jobs) == 3 @@ -298,9 +393,15 @@ def test_start_and_stop_schedule_cron_tab( assert len(cron_jobs) == 3 # Stop all schedules - instance.stop_schedule(repository, "no_config_pipeline_every_min_schedule") - instance.stop_schedule(repository, "no_config_pipeline_daily_schedule") - instance.stop_schedule(repository, "default_config_pipeline_every_min_schedule") + instance.stop_schedule_and_update_storage_state( + repository, "no_config_pipeline_every_min_schedule" + ) + instance.stop_schedule_and_update_storage_state( + repository, "no_config_pipeline_daily_schedule" + ) + instance.stop_schedule_and_update_storage_state( + repository, "default_config_pipeline_every_min_schedule" + ) cron_jobs = get_cron_jobs() assert len(cron_jobs) == 0 @@ -336,7 +437,9 @@ def raises(*args, **kwargs): instance._scheduler._start_cron_job = raises # pylint: disable=protected-access with pytest.raises(Exception, match='Patch'): - instance.start_schedule(repository, "no_config_pipeline_every_min_schedule") + instance.start_schedule_and_update_storage_state( + repository, "no_config_pipeline_every_min_schedule" + ) schedule = instance.get_schedule_by_name(repository, schedule_def.name) @@ -364,10 +467,13 @@ def do_nothing(*_): # End schedule with pytest.raises( - DagsterInvariantViolationError, - match="Attempted to write cron job for schedule no_config_pipeline_every_min_schedule, but failed", + DagsterSchedulerError, + match="Attempted to write cron job for schedule no_config_pipeline_every_min_schedule, " + "but failed. The scheduler is not running no_config_pipeline_every_min_schedule.", ): - instance.start_schedule(repository, "no_config_pipeline_every_min_schedule") + instance.start_schedule_and_update_storage_state( + repository, "no_config_pipeline_every_min_schedule" + ) def test_start_schedule_manual_delete_debug( @@ -384,7 +490,9 @@ def test_start_schedule_manual_delete_debug( repository=repository, ) - instance.start_schedule(repository, "no_config_pipeline_every_min_schedule") + instance.start_schedule_and_update_storage_state( + repository, "no_config_pipeline_every_min_schedule" + ) # Manually delete the schedule from the crontab instance.scheduler._end_cron_job( # pylint: disable=protected-access @@ -456,7 +564,9 @@ def test_start_schedule_manual_duplicate_schedules_add_debug( repository=repository, ) - instance.start_schedule(repository, "no_config_pipeline_every_min_schedule") + instance.start_schedule_and_update_storage_state( + repository, "no_config_pipeline_every_min_schedule" + ) # Manually add extra cron tabs instance.scheduler._start_cron_job( # pylint: disable=protected-access @@ -505,7 +615,9 @@ def raises(*args, **kwargs): instance._scheduler._end_cron_job = raises # pylint: disable=protected-access - schedule = instance.start_schedule(repository, "no_config_pipeline_every_min_schedule") + schedule = instance.start_schedule_and_update_storage_state( + repository, "no_config_pipeline_every_min_schedule" + ) check.inst_param(schedule, 'schedule', Schedule) assert "/bin/python" in schedule.python_path @@ -518,7 +630,9 @@ def raises(*args, **kwargs): # End schedule with pytest.raises(Exception, match='Patch'): - instance.stop_schedule(repository, "no_config_pipeline_every_min_schedule") + instance.stop_schedule_and_update_storage_state( + repository, "no_config_pipeline_every_min_schedule" + ) schedule = instance.get_schedule_by_name(repository, schedule_def.name) @@ -544,14 +658,20 @@ def do_nothing(*_): instance._scheduler._end_cron_job = do_nothing # pylint: disable=protected-access - instance.start_schedule(repository, "no_config_pipeline_every_min_schedule") + instance.start_schedule_and_update_storage_state( + repository, "no_config_pipeline_every_min_schedule" + ) # End schedule with pytest.raises( - DagsterInvariantViolationError, - match="Attempted to remove cron job for schedule no_config_pipeline_every_min_schedule, but failed.", + DagsterSchedulerError, + match="Attempted to remove existing cron job for schedule " + "no_config_pipeline_every_min_schedule, but failed. There are still 1 jobs running for " + "the schedule.", ): - instance.stop_schedule(repository, "no_config_pipeline_every_min_schedule") + instance.stop_schedule_and_update_storage_state( + repository, "no_config_pipeline_every_min_schedule" + ) def test_wipe(restore_cron_tab, repository): # pylint:disable=unused-argument,redefined-outer-name @@ -567,7 +687,9 @@ def test_wipe(restore_cron_tab, repository): # pylint:disable=unused-argument,r ) # Start schedule - instance.start_schedule(repository, "no_config_pipeline_every_min_schedule") + instance.start_schedule_and_update_storage_state( + repository, "no_config_pipeline_every_min_schedule" + ) # Wipe scheduler instance.wipe_all_schedules() @@ -601,10 +723,79 @@ def test_log_directory( ) # Start schedule - instance.start_schedule(repository, "no_config_pipeline_every_min_schedule") + instance.start_schedule_and_update_storage_state( + repository, "no_config_pipeline_every_min_schedule" + ) # Wipe scheduler instance.wipe_all_schedules() # Check schedules are wiped assert instance.all_schedules(repository) == [] + + +def test_reconcile_failure( + restore_cron_tab, repository +): # pylint:disable=unused-argument,redefined-outer-name + with TemporaryDirectory() as tempdir: + instance = define_scheduler_instance(tempdir) + instance.reconcile_scheduler_state( + python_path=sys.executable, + repository_path=file_relative_path(__file__, '.../repository.yam'), + repository=repository, + ) + instance.start_schedule_and_update_storage_state( + repository, "no_config_pipeline_every_min_schedule" + ) + + def failed_start_job(*_): + raise DagsterSchedulerError("Failed to start") + + def failed_end_job(*_): + raise DagsterSchedulerError("Failed to stop") + + instance._scheduler.start_schedule = failed_start_job # pylint: disable=protected-access + instance._scheduler.stop_schedule = failed_end_job # pylint: disable=protected-access + + # Initialize scheduler + with pytest.raises( + DagsterScheduleReconciliationError, + match="Error 1: Failed to stop\n Error 2: Failed to stop\n Error 3: Failed to stop", + ): + instance.reconcile_scheduler_state( + python_path=sys.executable, + repository_path=file_relative_path(__file__, '.../repository.yam'), + repository=repository, + ) + + +def test_reconcile_failure_when_deleting_schedule_def( + restore_cron_tab, repository +): # pylint:disable=unused-argument,redefined-outer-name + with TemporaryDirectory() as tempdir: + instance = define_scheduler_instance(tempdir) + + # Initialize scheduler + instance.reconcile_scheduler_state( + python_path=sys.executable, + repository_path=file_relative_path(__file__, '.../repository.yam'), + repository=repository, + ) + + assert len(instance.all_schedules(repository=repository)) == 3 + + def failed_end_job(*_): + raise DagsterSchedulerError("Failed to stop") + + instance._scheduler.stop_schedule_and_delete_from_storage = ( # pylint: disable=protected-access + failed_end_job + ) + + with pytest.raises( + DagsterScheduleReconciliationError, match="Error 1: Failed to stop", + ): + instance.reconcile_scheduler_state( + python_path=sys.executable, + repository_path=file_relative_path(__file__, '.../repository.yam'), + repository=define_smaller_repo(), + )