diff --git a/examples/dagster_examples/schedules.py b/examples/dagster_examples/schedules.py
index 43d2fecf087e9..9a185ca9c5738 100644
--- a/examples/dagster_examples/schedules.py
+++ b/examples/dagster_examples/schedules.py
@@ -55,7 +55,9 @@ def backfill_should_execute(context, partition_set_def, schedule_name):
is_remaining_partitions = bool(available_partitions.difference(satisfied_partitions))
if not is_remaining_partitions:
try:
- context.instance.stop_schedule(context.repository, schedule_name)
+ context.instance.stop_schedule_and_update_storage_state(
+ context.repository, schedule_name
+ )
except OSError:
pass
diff --git a/js_modules/dagit/src/schedules/ScheduleRow.tsx b/js_modules/dagit/src/schedules/ScheduleRow.tsx
index 4dafa48fc2183..0963a6958496b 100644
--- a/js_modules/dagit/src/schedules/ScheduleRow.tsx
+++ b/js_modules/dagit/src/schedules/ScheduleRow.tsx
@@ -53,25 +53,25 @@ const getNaturalLanguageCronString = (cronSchedule: string) => {
}
};
-const errorDisplay = (status: ScheduleStatus, runningJobCount: number) => {
- if (status === ScheduleStatus.STOPPED && runningJobCount == 0) {
+const errorDisplay = (status: ScheduleStatus, runningScheduleCount: number) => {
+ if (status === ScheduleStatus.STOPPED && runningScheduleCount == 0) {
return null;
- } else if (status === ScheduleStatus.RUNNING && runningJobCount == 1) {
+ } else if (status === ScheduleStatus.RUNNING && runningScheduleCount == 1) {
return null;
}
const errors = [];
- if (status === ScheduleStatus.RUNNING && runningJobCount === 0) {
+ if (status === ScheduleStatus.RUNNING && runningScheduleCount === 0) {
errors.push(
"Schedule is set to be running, but the scheduler is not running the schedule"
);
- } else if (status === ScheduleStatus.STOPPED && runningJobCount > 0) {
+ } else if (status === ScheduleStatus.STOPPED && runningScheduleCount > 0) {
errors.push(
"Schedule is set to be stopped, but the scheduler is still running the schedule"
);
}
- if (runningJobCount > 0) {
+ if (runningScheduleCount > 0) {
errors.push("Duplicate cron job for schedule found.");
}
@@ -106,7 +106,7 @@ export const ScheduleRow: React.FunctionComponent<{
const {
status,
scheduleDefinition,
- runningJobCount,
+ runningScheduleCount,
logsPath,
stats,
ticks,
@@ -194,7 +194,7 @@ export const ScheduleRow: React.FunctionComponent<{
}}
/>
- {errorDisplay(status, runningJobCount)}
+ {errorDisplay(status, runningScheduleCount)}
{displayName}
@@ -368,7 +368,7 @@ export const ScheduleRow: React.FunctionComponent<{
export const ScheduleRowFragment = gql`
fragment ScheduleFragment on RunningSchedule {
__typename
- runningJobCount
+ runningScheduleCount
scheduleDefinition {
name
cronSchedule
@@ -428,7 +428,7 @@ const START_SCHEDULE_MUTATION = gql`
... on RunningScheduleResult {
schedule {
__typename
- runningJobCount
+ runningScheduleCount
scheduleDefinition {
__typename
name
@@ -451,7 +451,7 @@ const STOP_SCHEDULE_MUTATION = gql`
... on RunningScheduleResult {
schedule {
__typename
- runningJobCount
+ runningScheduleCount
scheduleDefinition {
__typename
name
diff --git a/js_modules/dagit/src/schedules/types/ScheduleFragment.ts b/js_modules/dagit/src/schedules/types/ScheduleFragment.ts
index ea519a08ca637..c825f73fb8387 100644
--- a/js_modules/dagit/src/schedules/types/ScheduleFragment.ts
+++ b/js_modules/dagit/src/schedules/types/ScheduleFragment.ts
@@ -47,7 +47,7 @@ export interface ScheduleFragment_stats {
export interface ScheduleFragment {
__typename: "RunningSchedule";
- runningJobCount: number;
+ runningScheduleCount: number;
scheduleDefinition: ScheduleFragment_scheduleDefinition;
logsPath: string;
ticks: ScheduleFragment_ticks[];
diff --git a/js_modules/dagit/src/schedules/types/ScheduleRootQuery.ts b/js_modules/dagit/src/schedules/types/ScheduleRootQuery.ts
index 69fce8a2b397a..01ec028b73c84 100644
--- a/js_modules/dagit/src/schedules/types/ScheduleRootQuery.ts
+++ b/js_modules/dagit/src/schedules/types/ScheduleRootQuery.ts
@@ -112,7 +112,7 @@ export interface ScheduleRootQuery_scheduleOrError_RunningSchedule_attemptList {
export interface ScheduleRootQuery_scheduleOrError_RunningSchedule {
__typename: "RunningSchedule";
- runningJobCount: number;
+ runningScheduleCount: number;
scheduleDefinition: ScheduleRootQuery_scheduleOrError_RunningSchedule_scheduleDefinition;
logsPath: string;
ticks: ScheduleRootQuery_scheduleOrError_RunningSchedule_ticks[];
diff --git a/js_modules/dagit/src/schedules/types/SchedulesRootQuery.ts b/js_modules/dagit/src/schedules/types/SchedulesRootQuery.ts
index a458ba6742bc8..a97fc05e9a51a 100644
--- a/js_modules/dagit/src/schedules/types/SchedulesRootQuery.ts
+++ b/js_modules/dagit/src/schedules/types/SchedulesRootQuery.ts
@@ -52,7 +52,7 @@ export interface SchedulesRootQuery_scheduler_Scheduler_runningSchedules_stats {
export interface SchedulesRootQuery_scheduler_Scheduler_runningSchedules {
__typename: "RunningSchedule";
- runningJobCount: number;
+ runningScheduleCount: number;
scheduleDefinition: SchedulesRootQuery_scheduler_Scheduler_runningSchedules_scheduleDefinition;
logsPath: string;
ticks: SchedulesRootQuery_scheduler_Scheduler_runningSchedules_ticks[];
diff --git a/js_modules/dagit/src/schedules/types/StartSchedule.ts b/js_modules/dagit/src/schedules/types/StartSchedule.ts
index 43d2e1f682054..d92a259d227dd 100644
--- a/js_modules/dagit/src/schedules/types/StartSchedule.ts
+++ b/js_modules/dagit/src/schedules/types/StartSchedule.ts
@@ -16,7 +16,7 @@ export interface StartSchedule_startSchedule_RunningScheduleResult_schedule_sche
export interface StartSchedule_startSchedule_RunningScheduleResult_schedule {
__typename: "RunningSchedule";
- runningJobCount: number;
+ runningScheduleCount: number;
scheduleDefinition: StartSchedule_startSchedule_RunningScheduleResult_schedule_scheduleDefinition;
status: ScheduleStatus;
}
diff --git a/js_modules/dagit/src/schedules/types/StopSchedule.ts b/js_modules/dagit/src/schedules/types/StopSchedule.ts
index c34a7dc5714ed..ad2651076f64e 100644
--- a/js_modules/dagit/src/schedules/types/StopSchedule.ts
+++ b/js_modules/dagit/src/schedules/types/StopSchedule.ts
@@ -16,7 +16,7 @@ export interface StopSchedule_stopRunningSchedule_RunningScheduleResult_schedule
export interface StopSchedule_stopRunningSchedule_RunningScheduleResult_schedule {
__typename: "RunningSchedule";
- runningJobCount: number;
+ runningScheduleCount: number;
scheduleDefinition: StopSchedule_stopRunningSchedule_RunningScheduleResult_schedule_scheduleDefinition;
status: ScheduleStatus;
}
diff --git a/js_modules/dagit/src/schema.graphql b/js_modules/dagit/src/schema.graphql
index 638ee5d338c61..acd815239f2f5 100644
--- a/js_modules/dagit/src/schema.graphql
+++ b/js_modules/dagit/src/schema.graphql
@@ -1003,7 +1003,7 @@ type RunningSchedule {
attempts(limit: Int): [ScheduleAttempt!]!
attemptsCount: Int!
logsPath: String!
- runningJobCount: Int!
+ runningScheduleCount: Int!
}
type RunningScheduleResult {
diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_schedules.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_schedules.py
index 4e211308e360b..dc1ef287cd6df 100644
--- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_schedules.py
+++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_schedules.py
@@ -16,7 +16,7 @@
def start_schedule(graphene_info, schedule_name):
repository = graphene_info.context.get_repository()
instance = graphene_info.context.instance
- schedule = instance.start_schedule(repository, schedule_name)
+ schedule = instance.start_schedule_and_update_storage_state(repository, schedule_name)
return graphene_info.schema.type_named('RunningScheduleResult')(
schedule=graphene_info.schema.type_named('RunningSchedule')(
graphene_info, schedule=schedule
@@ -28,7 +28,7 @@ def start_schedule(graphene_info, schedule_name):
def stop_schedule(graphene_info, schedule_name):
repository = graphene_info.context.get_repository()
instance = graphene_info.context.instance
- schedule = instance.stop_schedule(repository, schedule_name)
+ schedule = instance.stop_schedule_and_update_storage_state(repository, schedule_name)
return graphene_info.schema.type_named('RunningScheduleResult')(
schedule=graphene_info.schema.type_named('RunningSchedule')(
graphene_info, schedule=schedule
diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/schedules.py b/python_modules/dagster-graphql/dagster_graphql/schema/schedules.py
index 9cb3c48f43a0b..14c17b3cba733 100644
--- a/python_modules/dagster-graphql/dagster_graphql/schema/schedules.py
+++ b/python_modules/dagster-graphql/dagster_graphql/schema/schedules.py
@@ -20,8 +20,7 @@
from dagster.core.definitions import ScheduleDefinition, ScheduleExecutionContext
from dagster.core.definitions.partition import PartitionScheduleDefinition
from dagster.core.errors import ScheduleExecutionError, user_code_error_boundary
-from dagster.core.scheduler import Schedule, ScheduleTickStatus
-from dagster.core.scheduler.scheduler import ScheduleTickStatsSnapshot
+from dagster.core.scheduler import Schedule, ScheduleTickStatsSnapshot, ScheduleTickStatus
from dagster.core.storage.pipeline_run import PipelineRunsFilter
@@ -208,7 +207,7 @@ class Meta(object):
attempts = dauphin.Field(dauphin.non_null_list('ScheduleAttempt'), limit=dauphin.Int())
attempts_count = dauphin.NonNull(dauphin.Int)
logs_path = dauphin.NonNull(dauphin.String)
- running_job_count = dauphin.NonNull(dauphin.Int)
+ running_schedule_count = dauphin.NonNull(dauphin.Int)
def __init__(self, graphene_info, schedule):
self._schedule = check.inst_param(schedule, 'schedule', Schedule)
@@ -223,12 +222,12 @@ def __init__(self, graphene_info, schedule):
repository_path=schedule.repository_path,
)
- def resolve_running_job_count(self, graphene_info):
+ def resolve_running_schedule_count(self, graphene_info):
repository = graphene_info.context.get_repository()
- running_job_count = graphene_info.context.instance.running_job_count(
+ running_schedule_count = graphene_info.context.instance.running_schedule_count(
repository.name, self._schedule.name
)
- return running_job_count
+ return running_schedule_count
# TODO: Delete in 0.8.0 release
# https://github.com/dagster-io/dagster/issues/228
diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_execute_schedule.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_execute_schedule.py
index 2bf8049791854..3bfabd567002f 100644
--- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_execute_schedule.py
+++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_execute_schedule.py
@@ -329,3 +329,60 @@
],
'ticksCount': 1
}
+
+snapshots['test_tick_success 1'] = {
+ 'scheduleDefinition': {
+ 'name': 'no_config_pipeline_hourly_schedule'
+ },
+ 'stats': {
+ 'ticksFailed': 0,
+ 'ticksSkipped': 0,
+ 'ticksStarted': 0,
+ 'ticksSucceeded': 1
+ },
+ 'ticks': [
+ {
+ 'status': 'SUCCESS',
+ 'tickId': '1'
+ }
+ ],
+ 'ticksCount': 1
+}
+
+snapshots['test_should_execute_scheduler_error 1'] = {
+ 'scheduleDefinition': {
+ 'name': 'should_execute_error_schedule'
+ },
+ 'stats': {
+ 'ticksFailed': 1,
+ 'ticksSkipped': 0,
+ 'ticksStarted': 0,
+ 'ticksSucceeded': 0
+ },
+ 'ticks': [
+ {
+ 'status': 'FAILURE',
+ 'tickId': '1'
+ }
+ ],
+ 'ticksCount': 1
+}
+
+snapshots['test_invalid_config_schedule_error 1'] = {
+ 'scheduleDefinition': {
+ 'name': 'invalid_config_schedule'
+ },
+ 'stats': {
+ 'ticksFailed': 0,
+ 'ticksSkipped': 0,
+ 'ticksStarted': 0,
+ 'ticksSucceeded': 1
+ },
+ 'ticks': [
+ {
+ 'status': 'SUCCESS',
+ 'tickId': '1'
+ }
+ ],
+ 'ticksCount': 1
+}
diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_scheduler.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_scheduler.py
index 4fdb35fc7bbed..ee547fccaec13 100644
--- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_scheduler.py
+++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_scheduler.py
@@ -75,7 +75,9 @@ def test_get_all_schedules():
)
# Start schedule
- schedule = instance.start_schedule(repository, "no_config_pipeline_hourly_schedule")
+ schedule = instance.start_schedule_and_update_storage_state(
+ repository, "no_config_pipeline_hourly_schedule"
+ )
# Query Scheduler + all Schedules
scheduler_result = execute_dagster_graphql(context, GET_SCHEDULES_QUERY)
diff --git a/python_modules/dagster/dagster/cli/schedule.py b/python_modules/dagster/dagster/cli/schedule.py
index 0e86ec439b2f3..93b35976455de 100644
--- a/python_modules/dagster/dagster/cli/schedule.py
+++ b/python_modules/dagster/dagster/cli/schedule.py
@@ -281,14 +281,16 @@ def execute_start_command(schedule_name, all_flag, cli_args, print_fn):
if all_flag:
for schedule in instance.all_schedules(repository):
try:
- schedule = instance.start_schedule(repository, schedule.name)
+ schedule = instance.start_schedule_and_update_storage_state(
+ repository, schedule.name
+ )
except DagsterInvariantViolationError as ex:
raise click.UsageError(ex)
print_fn("Started all schedules for repository {name}".format(name=repository.name))
else:
try:
- schedule = instance.start_schedule(repository, schedule_name)
+ schedule = instance.start_schedule_and_update_storage_state(repository, schedule_name)
except DagsterInvariantViolationError as ex:
raise click.UsageError(ex)
@@ -311,7 +313,7 @@ def execute_stop_command(schedule_name, cli_args, print_fn, instance=None):
repository = handle.build_repository_definition()
try:
- instance.stop_schedule(repository, schedule_name)
+ instance.stop_schedule_and_update_storage_state(repository, schedule_name)
except DagsterInvariantViolationError as ex:
raise click.UsageError(ex)
@@ -368,8 +370,8 @@ def execute_restart_command(schedule_name, all_running_flag, cli_args, print_fn)
for schedule in instance.all_schedules(repository):
if schedule.status == ScheduleStatus.RUNNING:
try:
- instance.stop_schedule(repository, schedule.name)
- instance.start_schedule(repository, schedule.name)
+ instance.stop_schedule_and_update_storage_state(repository, schedule.name)
+ instance.start_schedule_and_update_storage_state(repository, schedule.name)
except DagsterInvariantViolationError as ex:
raise click.UsageError(ex)
@@ -386,8 +388,8 @@ def execute_restart_command(schedule_name, all_running_flag, cli_args, print_fn)
)
try:
- instance.stop_schedule(repository, schedule_name)
- instance.start_schedule(repository, schedule_name)
+ instance.stop_schedule_and_update_storage_state(repository, schedule_name)
+ instance.start_schedule_and_update_storage_state(repository, schedule_name)
except DagsterInvariantViolationError as ex:
raise click.UsageError(ex)
diff --git a/python_modules/dagster/dagster/core/instance/__init__.py b/python_modules/dagster/dagster/core/instance/__init__.py
index 4536b0ca0a877..2eca2cba09ec6 100644
--- a/python_modules/dagster/dagster/core/instance/__init__.py
+++ b/python_modules/dagster/dagster/core/instance/__init__.py
@@ -807,17 +807,23 @@ def reconcile_scheduler_state(self, python_path, repository_path, repository):
self, repository, python_path, repository_path
)
- def start_schedule(self, repository, schedule_name):
- return self._scheduler.start_schedule(self, repository, schedule_name)
+ def start_schedule_and_update_storage_state(self, repository, schedule_name):
+ return self._scheduler.start_schedule_and_update_storage_state(
+ self, repository, schedule_name
+ )
- def stop_schedule(self, repository, schedule_name):
- return self._scheduler.stop_schedule(self, repository, schedule_name)
+ def stop_schedule_and_update_storage_state(self, repository, schedule_name):
+ return self._scheduler.stop_schedule_and_update_storage_state(
+ self, repository, schedule_name
+ )
- def end_schedule(self, repository, schedule_name):
- return self._scheduler.end_schedule(self, repository, schedule_name)
+ def stop_schedule_and_delete_from_storage(self, repository, schedule_name):
+ return self._scheduler.stop_schedule_and_delete_from_storage(
+ self, repository, schedule_name
+ )
- def running_job_count(self, repository_name, schedule_name):
- return self._scheduler.running_job_count(repository_name, schedule_name)
+ def running_schedule_count(self, repository_name, schedule_name):
+ return self._scheduler.running_schedule_count(repository_name, schedule_name)
def scheduler_debug_info(self):
from dagster.core.scheduler import SchedulerDebugInfo, ScheduleStatus
@@ -827,14 +833,14 @@ def scheduler_debug_info(self):
schedule_info = self.all_schedules_info()
schedules = []
for repository_name, schedule in schedule_info:
- if schedule.status == ScheduleStatus.RUNNING and not self.running_job_count(
+ if schedule.status == ScheduleStatus.RUNNING and not self.running_schedule_count(
repository_name, schedule.name
):
errors.append(
"Schedule {schedule_name} is set to be running, but the scheduler is not "
"running the schedule.".format(schedule_name=schedule.name)
)
- elif schedule.status == ScheduleStatus.STOPPED and self.running_job_count(
+ elif schedule.status == ScheduleStatus.STOPPED and self.running_schedule_count(
repository_name, schedule.name
):
errors.append(
@@ -842,7 +848,7 @@ def scheduler_debug_info(self):
"the schedule.".format(schedule_name=schedule.name)
)
- if self.running_job_count(repository_name, schedule.name) > 1:
+ if self.running_schedule_count(repository_name, schedule.name) > 1:
errors.append(
"Duplicate jobs found: More than one job for schedule {schedule_name} are "
"running on the scheduler.".format(schedule_name=schedule.name)
diff --git a/python_modules/dagster/dagster/core/scheduler/__init__.py b/python_modules/dagster/dagster/core/scheduler/__init__.py
index a5e3e92fcb977..8105df9177273 100644
--- a/python_modules/dagster/dagster/core/scheduler/__init__.py
+++ b/python_modules/dagster/dagster/core/scheduler/__init__.py
@@ -1,8 +1,12 @@
from .scheduler import (
+ DagsterScheduleDoesNotExist,
+ DagsterScheduleReconciliationError,
+ DagsterSchedulerError,
Schedule,
ScheduleDefinitionData,
ScheduleStatus,
ScheduleTick,
+ ScheduleTickStatsSnapshot,
ScheduleTickStatus,
Scheduler,
SchedulerDebugInfo,
diff --git a/python_modules/dagster/dagster/core/scheduler/scheduler.py b/python_modules/dagster/dagster/core/scheduler/scheduler.py
index 2572d973e55c0..56d9932353f6c 100644
--- a/python_modules/dagster/dagster/core/scheduler/scheduler.py
+++ b/python_modules/dagster/dagster/core/scheduler/scheduler.py
@@ -5,11 +5,44 @@
import six
from dagster import check
+from dagster.core.definitions.repository import RepositoryDefinition
from dagster.core.definitions.schedule import ScheduleDefinition, ScheduleDefinitionData
+from dagster.core.errors import DagsterError
+from dagster.core.instance import DagsterInstance
from dagster.serdes import whitelist_for_serdes
from dagster.utils.error import SerializableErrorInfo
+class DagsterSchedulerError(DagsterError):
+ '''Base class for all Dagster Scheduler errors'''
+
+
+class DagsterScheduleReconciliationError(DagsterError):
+ '''Error raised during schedule state reconcilation. During reconcilation, exceptions that are
+ raised when trying to start or stop a schedule are collected and passed to this wrapper exception.
+ The individual exceptions can be accessed by the `errors` property. '''
+
+ def __init__(self, preamble, errors, *args, **kwargs):
+ self.errors = errors
+
+ error_msg = preamble
+ error_messages = []
+ for i_error, error in enumerate(self.errors):
+ error_messages.append(str(error))
+ error_msg += '\n Error {i_error}: {error_message}'.format(
+ i_error=i_error + 1, error_message=str(error)
+ )
+
+ self.message = error_msg
+ self.error_messages = error_messages
+
+ super(DagsterScheduleReconciliationError, self).__init__(error_msg, *args, **kwargs)
+
+
+class DagsterScheduleDoesNotExist(DagsterSchedulerError):
+ '''Errors raised when ending a job for a schedule.'''
+
+
@whitelist_for_serdes
class ScheduleStatus(Enum):
RUNNING = 'RUNNING'
@@ -121,6 +154,17 @@ class Scheduler(six.with_metaclass(abc.ABCMeta)):
an external system such as cron to ensure scheduled repeated execution according.
'''
+ def _get_schedule_by_name(self, instance, repository, schedule_name):
+ schedule = instance.get_schedule_by_name(repository, schedule_name)
+ if not schedule:
+ raise DagsterScheduleDoesNotExist(
+ 'You have attempted to start the job for schedule {name}, but it does not exist.'.format(
+ name=schedule_name
+ )
+ )
+
+ return schedule
+
def reconcile_scheduler_state(self, instance, repository, python_path, repository_path):
'''Reconcile the ScheduleDefinitions list from the repository and ScheduleStorage
on the instance to ensure there is a 1-1 correlation between ScheduleDefinitions and
@@ -170,17 +214,110 @@ def reconcile_scheduler_state(self, instance, repository, python_path, repositor
existing_schedule_names = set([s.name for s in instance.all_schedules(repository)])
schedule_names_to_delete = existing_schedule_names - schedule_def_names
+ schedule_reconciliation_errors = []
for schedule in schedules_to_restart:
# Restart is only needed if the schedule was previously running
if schedule.status == ScheduleStatus.RUNNING:
- self.stop_schedule(instance, repository, schedule.name)
- self.start_schedule(instance, repository, schedule.name)
+ try:
+ self.stop_schedule(instance, repository, schedule.name)
+ self.start_schedule(instance, repository, schedule.name)
+ except DagsterSchedulerError as e:
+ schedule_reconciliation_errors.append(e)
if schedule.status == ScheduleStatus.STOPPED:
- self.stop_schedule(instance, repository, schedule.name)
+ try:
+ self.stop_schedule(instance, repository, schedule.name)
+ except DagsterSchedulerError as e:
+ schedule_reconciliation_errors.append(e)
for schedule_name in schedule_names_to_delete:
- self.end_schedule(instance, repository, schedule_name)
+ try:
+ instance.stop_schedule_and_delete_from_storage(repository, schedule_name)
+ except DagsterSchedulerError as e:
+ schedule_reconciliation_errors.append(e)
+
+ if len(schedule_reconciliation_errors):
+ raise DagsterScheduleReconciliationError(
+ "One or more errors were encountered by the Scheduler while starting or stopping schedules. "
+ "Individual error messages follow:",
+ errors=schedule_reconciliation_errors,
+ )
+
+ def start_schedule_and_update_storage_state(self, instance, repository, schedule_name):
+ '''
+ Updates the status of the given schedule to `ScheduleStatus.RUNNING` in schedule storage,
+ then calls `start_schedule`.
+
+ This should not be overridden by subclasses.
+
+ Args:
+ instance (DagsterInstance): The current instance.
+ repository (RepositoryDefinition): The repository containing the schedule definition.
+ schedule_name (string): The name of the schedule to start running.
+ '''
+
+ check.inst_param(instance, 'instance', DagsterInstance)
+ check.inst_param(repository, 'repository', RepositoryDefinition)
+ check.str_param(schedule_name, 'schedule_name')
+
+ schedule = self._get_schedule_by_name(instance, repository, schedule_name)
+
+ if schedule.status == ScheduleStatus.RUNNING:
+ raise DagsterSchedulerError(
+ 'You have attempted to start schedule {name}, but it is already running'.format(
+ name=schedule_name
+ )
+ )
+
+ self.start_schedule(instance, repository, schedule.name)
+ started_schedule = schedule.with_status(ScheduleStatus.RUNNING)
+ instance.update_schedule(repository, started_schedule)
+ return started_schedule
+
+ def stop_schedule_and_update_storage_state(self, instance, repository, schedule_name):
+ '''
+ Updates the status of the given schedule to `ScheduleStatus.STOPPED` in schedule storage,
+ then calls `stop_schedule`.
+
+ This should not be overridden by subclasses.
+
+ Args:
+ instance (DagsterInstance): The current instance.
+ repository (RepositoryDefinition): The repository containing the schedule definition.
+ schedule_name (string): The name of the schedule to start running.
+ '''
+
+ check.inst_param(instance, 'instance', DagsterInstance)
+ check.inst_param(repository, 'repository', RepositoryDefinition)
+ check.str_param(schedule_name, 'schedule_name')
+
+ schedule = self._get_schedule_by_name(instance, repository, schedule_name)
+
+ self.stop_schedule(instance, repository, schedule.name)
+ stopped_schedule = schedule.with_status(ScheduleStatus.STOPPED)
+ instance.update_schedule(repository, stopped_schedule)
+ return stopped_schedule
+
+ def stop_schedule_and_delete_from_storage(self, instance, repository, schedule_name):
+ '''
+ Deletes a schedule from schedule storage, then calls `stop_schedule`.
+
+ This should not be overridden by subclasses.
+
+ Args:
+ instance (DagsterInstance): The current instance.
+ repository (RepositoryDefinition): The repository containing the schedule definition.
+ schedule_name (string): The name of the schedule to start running.
+ '''
+
+ check.inst_param(instance, 'instance', DagsterInstance)
+ check.inst_param(repository, 'repository', RepositoryDefinition)
+ check.str_param(schedule_name, 'schedule_name')
+
+ schedule = self._get_schedule_by_name(instance, repository, schedule_name)
+ self.stop_schedule(instance, repository, schedule.name)
+ instance.delete_schedule(repository, schedule)
+ return schedule
@abc.abstractmethod
def debug_info(self):
@@ -189,35 +326,56 @@ def debug_info(self):
@abc.abstractmethod
def start_schedule(self, instance, repository, schedule_name):
- '''Resume a pipeline schedule.
+ '''Start running a schedule. This method is called by `start_schedule_and_update_storage_state`,
+ which first updates the status of the schedule in schedule storage to `ScheduleStatus.RUNNING`,
+ then calls this method.
+
+ For example, in the cron scheduler, this method writes a cron job to the cron tab
+ for the given schedule.
Args:
- schedule_name (string): The schedule to resume
+ instance (DagsterInstance): The current instance.
+ repository (RepositoryDefinition): The repository containing the schedule definition.
+ schedule_name (string): The name of the schedule to start running.
'''
@abc.abstractmethod
def stop_schedule(self, instance, repository, schedule_name):
- '''Stops an existing pipeline schedule
+ '''Stop running a schedule.
- Args:
- schedule_name (string): The schedule to stop
- '''
+ This method is called by
+ 1) `stop_schedule_and_update_storage_state`,
+ which first updates the status of the schedule in schedule storage to `ScheduleStatus.STOPPED`,
+ then calls this method.
+ 2) `stop_schedule_and_delete_from_storage`, which deletes the schedule from schedule storage
+ then calls this method.
- @abc.abstractmethod
- def end_schedule(self, instance, repository, schedule_name):
- '''Resume a pipeline schedule.
+ For example, in the cron scheduler, this method deletes the cron job for a given scheduler
+ from the cron tab.
Args:
- schedule_name (string): The schedule to end and delete
+ instance (DagsterInstance): The current instance.
+ repository (RepositoryDefinition): The repository containing the schedule definition.
+ schedule_name (string): The schedule to stop running.
'''
@abc.abstractmethod
- def running_job_count(self, repository_name, schedule_name):
- '''Resume a pipeline schedule.
+ def running_schedule_count(self, repository_name, schedule_name):
+ '''Returns the number of jobs currently running for the given schedule. This method is used
+ for detecting when the scheduler is out of sync with schedule storage.
+
+ For example, when:
+ - There are duplicate jobs runnning for a single schedule
+ - There are no jobs runnning for a schedule that is set to be running
+ - There are still jobs running for a schedule that is set to be stopped
+
+ When the scheduler and schedule storage are in sync, this method should return:
+ - 1 when a schedule is set to be running
+ - 0 wen a schedule is set to be stopped
Args:
- repository_name (string): The repository the schedule belongs to
- schedule_name (string): The name of the schedule to check
+ repository_name (string): The name of the repository containing the schedule definition.
+ schedule_name (string): The schedule to return the number of jobs for
'''
@abc.abstractmethod
diff --git a/python_modules/dagster/dagster/utils/test/__init__.py b/python_modules/dagster/dagster/utils/test/__init__.py
index 8fa25723f0b88..0cf8f5a898905 100644
--- a/python_modules/dagster/dagster/utils/test/__init__.py
+++ b/python_modules/dagster/dagster/utils/test/__init__.py
@@ -33,6 +33,7 @@
)
from dagster.core.instance import DagsterInstance
from dagster.core.scheduler import ScheduleStatus, Scheduler
+from dagster.core.scheduler.scheduler import DagsterScheduleDoesNotExist, DagsterSchedulerError
from dagster.core.storage.file_manager import LocalFileManager
from dagster.core.storage.intermediates_manager import InMemoryIntermediatesManager
from dagster.core.storage.pipeline_run import PipelineRun
@@ -387,6 +388,12 @@ def restore_directory(src):
class FilesytemTestScheduler(Scheduler):
+ '''This class is used in dagster core and dagster_graphql to test the scheduler's interactions
+ with schedule storage, which are implemented in the methods defined on the base Scheduler class.
+ Therefore, the following methods used to actually schedule jobs (e.g. create and remove cron jobs
+ on a cron tab) are left unimimplemented.
+ '''
+
def __init__(self, artifacts_dir):
check.str_param(artifacts_dir, 'artifacts_dir')
self._artifacts_dir = artifacts_dir
@@ -395,53 +402,14 @@ def debug_info(self):
return ""
def start_schedule(self, instance, repository, schedule_name):
- schedule = instance.get_schedule_by_name(repository, schedule_name)
- if not schedule:
- raise DagsterInvariantViolationError(
- 'You have attempted to start schedule {name}, but it does not exist.'.format(
- name=schedule_name
- )
- )
-
- if schedule.status == ScheduleStatus.RUNNING:
- raise DagsterInvariantViolationError(
- 'You have attempted to start schedule {name}, but it is already running'.format(
- name=schedule_name
- )
- )
-
- started_schedule = schedule.with_status(ScheduleStatus.RUNNING)
- instance.update_schedule(repository, started_schedule)
- return schedule
+ pass
def stop_schedule(self, instance, repository, schedule_name):
- schedule = instance.get_schedule_by_name(repository, schedule_name)
- if not schedule:
- raise DagsterInvariantViolationError(
- 'You have attempted to stop schedule {name}, but was never initialized.'
- 'Use `schedule up` to initialize schedules'.format(name=schedule_name)
- )
-
- stopped_schedule = schedule.with_status(ScheduleStatus.STOPPED)
- instance.update_schedule(repository, stopped_schedule)
- return stopped_schedule
+ pass
- def running_job_count(self, repository_name, schedule_name):
- # Not currently tested in dagster core
+ def running_schedule_count(self, repository_name, schedule_name):
return 0
- def end_schedule(self, instance, repository, schedule_name):
- schedule = instance.get_schedule_by_name(repository, schedule_name)
- if not schedule:
- raise DagsterInvariantViolationError(
- 'You have attempted to end schedule {name}, but it is not running.'.format(
- name=schedule_name
- )
- )
-
- instance.storage.delete_schedule(repository, schedule)
- return schedule
-
def get_logs_directory(self, instance, repository, schedule_name):
check.inst_param(repository, 'repository', RepositoryDefinition)
check.str_param(schedule_name, 'schedule_name')
diff --git a/python_modules/libraries/dagster-cron/dagster_cron/cron_scheduler.py b/python_modules/libraries/dagster-cron/dagster_cron/cron_scheduler.py
index 070fd8fd2a883..4a549aac2b8b5 100644
--- a/python_modules/libraries/dagster-cron/dagster_cron/cron_scheduler.py
+++ b/python_modules/libraries/dagster-cron/dagster_cron/cron_scheduler.py
@@ -6,9 +6,9 @@
import six
from crontab import CronTab
-from dagster import DagsterInstance, DagsterInvariantViolationError, check, seven, utils
+from dagster import DagsterInstance, check, seven, utils
from dagster.core.definitions import RepositoryDefinition
-from dagster.core.scheduler import ScheduleStatus, Scheduler
+from dagster.core.scheduler import DagsterSchedulerError, Scheduler
from dagster.serdes import ConfigurableClass
@@ -45,71 +45,58 @@ def debug_info(self):
)
def start_schedule(self, instance, repository, schedule_name):
- schedule = instance.get_schedule_by_name(repository, schedule_name)
- if not schedule:
- raise DagsterInvariantViolationError(
- 'You have attempted to start schedule {name}, but it does not exist.'.format(
- name=schedule_name
- )
- )
+ check.inst_param(instance, 'instance', DagsterInstance)
+ check.inst_param(repository, 'repository', RepositoryDefinition)
+ check.str_param(schedule_name, 'schedule_name')
- if schedule.status == ScheduleStatus.RUNNING:
- raise DagsterInvariantViolationError(
- 'You have attempted to start schedule {name}, but it is already running'.format(
- name=schedule_name
- )
- )
+ schedule = self._get_schedule_by_name(instance, repository, schedule_name)
+
+ # If the cron job already exists, remove it. This prevents duplicate entries.
+ # Then, add a new cron job to the cron tab.
+ if self.running_schedule_count(repository.name, schedule.name) > 0:
+ self._end_cron_job(instance, repository, schedule)
- started_schedule = schedule.with_status(ScheduleStatus.RUNNING)
- self._start_cron_job(instance, repository, started_schedule)
+ self._start_cron_job(instance, repository, schedule)
- # Check that the schedule made it to the cron tab
- if not self.running_job_count(repository.name, schedule.name):
- raise DagsterInvariantViolationError(
- "Attempted to write cron job for schedule {schedule_name}, but failed".format(
- schedule_name=schedule.name
+ # Verify that the cron job is running
+ running_schedule_count = self.running_schedule_count(repository.name, schedule.name)
+ if running_schedule_count == 0:
+ raise DagsterSchedulerError(
+ "Attempted to write cron job for schedule "
+ "{schedule_name}, but failed. "
+ "The scheduler is not running {schedule_name}.".format(schedule_name=schedule.name)
+ )
+ elif running_schedule_count > 1:
+ raise DagsterSchedulerError(
+ "Attempted to write cron job for schedule "
+ "{schedule_name}, but duplicate cron jobs were found. "
+ "There are {running_schedule_count} jobs running for the schedule."
+ "To resolve, run `dagster schedule up`, or edit the cron tab to "
+ "remove duplicate schedules".format(
+ schedule_name=schedule.name, running_schedule_count=running_schedule_count
)
)
- # If the cron job was successfully installed, then update scheduler state
- instance.update_schedule(repository, started_schedule)
- return started_schedule
-
def stop_schedule(self, instance, repository, schedule_name):
- schedule = instance.get_schedule_by_name(repository, schedule_name)
- if not schedule:
- raise DagsterInvariantViolationError(
- 'You have attempted to stop schedule {name}, but was never initialized.'
- 'Use `schedule up` to initialize schedules'.format(name=schedule_name)
- )
-
- stopped_schedule = schedule.with_status(ScheduleStatus.STOPPED)
- self._end_cron_job(instance, repository, stopped_schedule)
-
- if self.running_job_count(repository.name, schedule.name):
- raise DagsterInvariantViolationError(
- "Attempted to remove cron job for schedule {schedule_name}, but failed. The cron "
- "job for the schedule is still running".format(schedule_name=schedule.name)
- )
+ check.inst_param(instance, 'instance', DagsterInstance)
+ check.inst_param(repository, 'repository', RepositoryDefinition)
+ check.str_param(schedule_name, 'schedule_name')
- instance.update_schedule(repository, stopped_schedule)
+ schedule = self._get_schedule_by_name(instance, repository, schedule_name)
- return stopped_schedule
+ self._end_cron_job(instance, repository, schedule)
- def end_schedule(self, instance, repository, schedule_name):
- schedule = instance.get_schedule_by_name(repository, schedule_name)
- if not schedule:
- raise DagsterInvariantViolationError(
- 'You have attempted to end schedule {name}, but it is not running.'.format(
- name=schedule_name
+ # Verify that the cron job has been removed
+ running_schedule_count = self.running_schedule_count(repository.name, schedule.name)
+ if running_schedule_count > 0:
+ raise DagsterSchedulerError(
+ "Attempted to remove existing cron job for schedule "
+ "{schedule_name}, but failed. "
+ "There are still {running_schedule_count} jobs running for the schedule.".format(
+ schedule_name=schedule.name, running_schedule_count=running_schedule_count
)
)
- instance.delete_schedule(repository, schedule)
- self._end_cron_job(instance, repository, schedule)
-
- return schedule
-
def wipe(self, instance):
# Note: This method deletes schedules from ALL repositories
check.inst_param(instance, 'instance', DagsterInstance)
@@ -176,7 +163,7 @@ def _end_cron_job(self, instance, repository, schedule):
if os.path.isfile(script_file):
os.remove(script_file)
- def running_job_count(self, repository_name, schedule_name):
+ def running_schedule_count(self, repository_name, schedule_name):
cron_tab = CronTab(user=True)
matching_jobs = cron_tab.find_comment(
self._cron_tag_for_schedule(repository_name, schedule_name)
diff --git a/python_modules/libraries/dagster-cron/dagster_cron_tests/test_cron_scheduler.py b/python_modules/libraries/dagster-cron/dagster_cron_tests/test_cron_scheduler.py
index ad408b1dd9f21..87e42c7002969 100644
--- a/python_modules/libraries/dagster-cron/dagster_cron_tests/test_cron_scheduler.py
+++ b/python_modules/libraries/dagster-cron/dagster_cron_tests/test_cron_scheduler.py
@@ -8,9 +8,13 @@
from dagster import ScheduleDefinition, check, file_relative_path
from dagster.core.definitions import RepositoryDefinition, lambda_solid, pipeline
-from dagster.core.errors import DagsterInvariantViolationError
from dagster.core.instance import DagsterInstance, InstanceType
from dagster.core.scheduler import Schedule, ScheduleStatus
+from dagster.core.scheduler.scheduler import (
+ DagsterScheduleDoesNotExist,
+ DagsterScheduleReconciliationError,
+ DagsterSchedulerError,
+)
from dagster.core.storage.event_log import InMemoryEventLogStorage
from dagster.core.storage.local_compute_log_manager import NoOpComputeLogManager
from dagster.core.storage.root import LocalArtifactStorage
@@ -89,6 +93,12 @@ def define_repo():
)
+def define_smaller_repo():
+ return RepositoryDefinition(
+ name="test", pipeline_defs=[no_config_pipeline], schedule_defs=define_schedules()[:-1],
+ )
+
+
def define_scheduler_instance(tempdir):
return DagsterInstance(
instance_type=InstanceType.EPHEMERAL,
@@ -135,7 +145,9 @@ def test_re_init(
)
# Start schedule
- schedule = instance.start_schedule(repository, "no_config_pipeline_every_min_schedule")
+ schedule = instance.start_schedule_and_update_storage_state(
+ repository, "no_config_pipeline_every_min_schedule"
+ )
# Re-initialize scheduler
instance.reconcile_scheduler_state(
@@ -168,7 +180,9 @@ def test_start_and_stop_schedule(
schedule_def = repository.get_schedule_def("no_config_pipeline_every_min_schedule")
- schedule = instance.start_schedule(repository, "no_config_pipeline_every_min_schedule")
+ schedule = instance.start_schedule_and_update_storage_state(
+ repository, "no_config_pipeline_every_min_schedule"
+ )
check.inst_param(schedule, 'schedule', Schedule)
assert "/bin/python" in schedule.python_path
@@ -179,12 +193,25 @@ def test_start_and_stop_schedule(
os.path.join(tempdir, 'schedules', 'scripts')
)
- instance.stop_schedule(repository, "no_config_pipeline_every_min_schedule")
+ instance.stop_schedule_and_update_storage_state(
+ repository, "no_config_pipeline_every_min_schedule"
+ )
assert "{}.{}.sh".format(repository.name, schedule_def.name) not in os.listdir(
os.path.join(tempdir, 'schedules', 'scripts')
)
+def test_start_non_existent_schedule(
+ restore_cron_tab, repository
+): # pylint:disable=unused-argument,redefined-outer-name
+ with TemporaryDirectory() as tempdir:
+ instance = define_scheduler_instance(tempdir)
+
+ with pytest.raises(DagsterScheduleDoesNotExist):
+ # Initialize scheduler
+ instance.start_schedule_and_update_storage_state(repository, "asdf")
+
+
def get_cron_jobs():
output = subprocess.check_output(['crontab', '-l'])
return list(filter(None, output.decode('utf-8').strip().split("\n")))
@@ -203,9 +230,15 @@ def test_start_schedule_cron_job(
repository=repository,
)
- instance.start_schedule(repository, "no_config_pipeline_every_min_schedule")
- instance.start_schedule(repository, "no_config_pipeline_daily_schedule")
- instance.start_schedule(repository, "default_config_pipeline_every_min_schedule")
+ instance.start_schedule_and_update_storage_state(
+ repository, "no_config_pipeline_every_min_schedule"
+ )
+ instance.start_schedule_and_update_storage_state(
+ repository, "no_config_pipeline_daily_schedule"
+ )
+ instance.start_schedule_and_update_storage_state(
+ repository, "default_config_pipeline_every_min_schedule"
+ )
# Inspect the cron tab
cron_jobs = get_cron_jobs()
@@ -231,6 +264,54 @@ def test_start_schedule_cron_job(
assert log_file.endswith("scheduler.log")
+def test_remove_schedule_def(
+ restore_cron_tab, repository
+): # pylint:disable=unused-argument,redefined-outer-name
+ with TemporaryDirectory() as tempdir:
+ instance = define_scheduler_instance(tempdir)
+
+ # Initialize scheduler
+ instance.reconcile_scheduler_state(
+ python_path=sys.executable,
+ repository_path=file_relative_path(__file__, '.../repository.yam'),
+ repository=repository,
+ )
+
+ assert len(instance.all_schedules(repository=repository)) == 3
+
+ instance.reconcile_scheduler_state(
+ python_path=sys.executable,
+ repository_path=file_relative_path(__file__, '.../repository.yam'),
+ repository=define_smaller_repo(),
+ )
+
+ assert len(instance.all_schedules(repository=repository)) == 2
+
+
+def test_add_schedule_def(
+ restore_cron_tab, repository
+): # pylint:disable=unused-argument,redefined-outer-name
+ with TemporaryDirectory() as tempdir:
+ instance = define_scheduler_instance(tempdir)
+
+ # Initialize scheduler
+ instance.reconcile_scheduler_state(
+ python_path=sys.executable,
+ repository_path=file_relative_path(__file__, '.../repository.yam'),
+ repository=define_smaller_repo(),
+ )
+
+ assert len(instance.all_schedules(repository=repository)) == 2
+
+ instance.reconcile_scheduler_state(
+ python_path=sys.executable,
+ repository_path=file_relative_path(__file__, '.../repository.yam'),
+ repository=repository,
+ )
+
+ assert len(instance.all_schedules(repository=repository)) == 3
+
+
def test_start_and_stop_schedule_cron_tab(
restore_cron_tab, repository
): # pylint:disable=unused-argument,redefined-outer-name
@@ -245,33 +326,45 @@ def test_start_and_stop_schedule_cron_tab(
)
# Start schedule
- instance.start_schedule(repository, "no_config_pipeline_every_min_schedule")
+ instance.start_schedule_and_update_storage_state(
+ repository, "no_config_pipeline_every_min_schedule"
+ )
cron_jobs = get_cron_jobs()
assert len(cron_jobs) == 1
# Try starting it again
- with pytest.raises(DagsterInvariantViolationError):
- instance.start_schedule(repository, "no_config_pipeline_every_min_schedule")
+ with pytest.raises(DagsterSchedulerError):
+ instance.start_schedule_and_update_storage_state(
+ repository, "no_config_pipeline_every_min_schedule"
+ )
cron_jobs = get_cron_jobs()
assert len(cron_jobs) == 1
# Start another schedule
- instance.start_schedule(repository, "no_config_pipeline_daily_schedule")
+ instance.start_schedule_and_update_storage_state(
+ repository, "no_config_pipeline_daily_schedule"
+ )
cron_jobs = get_cron_jobs()
assert len(cron_jobs) == 2
# Stop second schedule
- instance.stop_schedule(repository, "no_config_pipeline_daily_schedule")
+ instance.stop_schedule_and_update_storage_state(
+ repository, "no_config_pipeline_daily_schedule"
+ )
cron_jobs = get_cron_jobs()
assert len(cron_jobs) == 1
# Try stopping second schedule again
- instance.stop_schedule(repository, "no_config_pipeline_daily_schedule")
+ instance.stop_schedule_and_update_storage_state(
+ repository, "no_config_pipeline_daily_schedule"
+ )
cron_jobs = get_cron_jobs()
assert len(cron_jobs) == 1
# Start second schedule
- instance.start_schedule(repository, "no_config_pipeline_daily_schedule")
+ instance.start_schedule_and_update_storage_state(
+ repository, "no_config_pipeline_daily_schedule"
+ )
cron_jobs = get_cron_jobs()
assert len(cron_jobs) == 2
@@ -284,7 +377,9 @@ def test_start_and_stop_schedule_cron_tab(
cron_jobs = get_cron_jobs()
assert len(cron_jobs) == 2
- instance.start_schedule(repository, "default_config_pipeline_every_min_schedule")
+ instance.start_schedule_and_update_storage_state(
+ repository, "default_config_pipeline_every_min_schedule"
+ )
cron_jobs = get_cron_jobs()
assert len(cron_jobs) == 3
@@ -298,9 +393,15 @@ def test_start_and_stop_schedule_cron_tab(
assert len(cron_jobs) == 3
# Stop all schedules
- instance.stop_schedule(repository, "no_config_pipeline_every_min_schedule")
- instance.stop_schedule(repository, "no_config_pipeline_daily_schedule")
- instance.stop_schedule(repository, "default_config_pipeline_every_min_schedule")
+ instance.stop_schedule_and_update_storage_state(
+ repository, "no_config_pipeline_every_min_schedule"
+ )
+ instance.stop_schedule_and_update_storage_state(
+ repository, "no_config_pipeline_daily_schedule"
+ )
+ instance.stop_schedule_and_update_storage_state(
+ repository, "default_config_pipeline_every_min_schedule"
+ )
cron_jobs = get_cron_jobs()
assert len(cron_jobs) == 0
@@ -336,7 +437,9 @@ def raises(*args, **kwargs):
instance._scheduler._start_cron_job = raises # pylint: disable=protected-access
with pytest.raises(Exception, match='Patch'):
- instance.start_schedule(repository, "no_config_pipeline_every_min_schedule")
+ instance.start_schedule_and_update_storage_state(
+ repository, "no_config_pipeline_every_min_schedule"
+ )
schedule = instance.get_schedule_by_name(repository, schedule_def.name)
@@ -364,10 +467,13 @@ def do_nothing(*_):
# End schedule
with pytest.raises(
- DagsterInvariantViolationError,
- match="Attempted to write cron job for schedule no_config_pipeline_every_min_schedule, but failed",
+ DagsterSchedulerError,
+ match="Attempted to write cron job for schedule no_config_pipeline_every_min_schedule, "
+ "but failed. The scheduler is not running no_config_pipeline_every_min_schedule.",
):
- instance.start_schedule(repository, "no_config_pipeline_every_min_schedule")
+ instance.start_schedule_and_update_storage_state(
+ repository, "no_config_pipeline_every_min_schedule"
+ )
def test_start_schedule_manual_delete_debug(
@@ -384,7 +490,9 @@ def test_start_schedule_manual_delete_debug(
repository=repository,
)
- instance.start_schedule(repository, "no_config_pipeline_every_min_schedule")
+ instance.start_schedule_and_update_storage_state(
+ repository, "no_config_pipeline_every_min_schedule"
+ )
# Manually delete the schedule from the crontab
instance.scheduler._end_cron_job( # pylint: disable=protected-access
@@ -456,7 +564,9 @@ def test_start_schedule_manual_duplicate_schedules_add_debug(
repository=repository,
)
- instance.start_schedule(repository, "no_config_pipeline_every_min_schedule")
+ instance.start_schedule_and_update_storage_state(
+ repository, "no_config_pipeline_every_min_schedule"
+ )
# Manually add extra cron tabs
instance.scheduler._start_cron_job( # pylint: disable=protected-access
@@ -505,7 +615,9 @@ def raises(*args, **kwargs):
instance._scheduler._end_cron_job = raises # pylint: disable=protected-access
- schedule = instance.start_schedule(repository, "no_config_pipeline_every_min_schedule")
+ schedule = instance.start_schedule_and_update_storage_state(
+ repository, "no_config_pipeline_every_min_schedule"
+ )
check.inst_param(schedule, 'schedule', Schedule)
assert "/bin/python" in schedule.python_path
@@ -518,7 +630,9 @@ def raises(*args, **kwargs):
# End schedule
with pytest.raises(Exception, match='Patch'):
- instance.stop_schedule(repository, "no_config_pipeline_every_min_schedule")
+ instance.stop_schedule_and_update_storage_state(
+ repository, "no_config_pipeline_every_min_schedule"
+ )
schedule = instance.get_schedule_by_name(repository, schedule_def.name)
@@ -544,14 +658,20 @@ def do_nothing(*_):
instance._scheduler._end_cron_job = do_nothing # pylint: disable=protected-access
- instance.start_schedule(repository, "no_config_pipeline_every_min_schedule")
+ instance.start_schedule_and_update_storage_state(
+ repository, "no_config_pipeline_every_min_schedule"
+ )
# End schedule
with pytest.raises(
- DagsterInvariantViolationError,
- match="Attempted to remove cron job for schedule no_config_pipeline_every_min_schedule, but failed.",
+ DagsterSchedulerError,
+ match="Attempted to remove existing cron job for schedule "
+ "no_config_pipeline_every_min_schedule, but failed. There are still 1 jobs running for "
+ "the schedule.",
):
- instance.stop_schedule(repository, "no_config_pipeline_every_min_schedule")
+ instance.stop_schedule_and_update_storage_state(
+ repository, "no_config_pipeline_every_min_schedule"
+ )
def test_wipe(restore_cron_tab, repository): # pylint:disable=unused-argument,redefined-outer-name
@@ -567,7 +687,9 @@ def test_wipe(restore_cron_tab, repository): # pylint:disable=unused-argument,r
)
# Start schedule
- instance.start_schedule(repository, "no_config_pipeline_every_min_schedule")
+ instance.start_schedule_and_update_storage_state(
+ repository, "no_config_pipeline_every_min_schedule"
+ )
# Wipe scheduler
instance.wipe_all_schedules()
@@ -601,10 +723,79 @@ def test_log_directory(
)
# Start schedule
- instance.start_schedule(repository, "no_config_pipeline_every_min_schedule")
+ instance.start_schedule_and_update_storage_state(
+ repository, "no_config_pipeline_every_min_schedule"
+ )
# Wipe scheduler
instance.wipe_all_schedules()
# Check schedules are wiped
assert instance.all_schedules(repository) == []
+
+
+def test_reconcile_failure(
+ restore_cron_tab, repository
+): # pylint:disable=unused-argument,redefined-outer-name
+ with TemporaryDirectory() as tempdir:
+ instance = define_scheduler_instance(tempdir)
+ instance.reconcile_scheduler_state(
+ python_path=sys.executable,
+ repository_path=file_relative_path(__file__, '.../repository.yam'),
+ repository=repository,
+ )
+ instance.start_schedule_and_update_storage_state(
+ repository, "no_config_pipeline_every_min_schedule"
+ )
+
+ def failed_start_job(*_):
+ raise DagsterSchedulerError("Failed to start")
+
+ def failed_end_job(*_):
+ raise DagsterSchedulerError("Failed to stop")
+
+ instance._scheduler.start_schedule = failed_start_job # pylint: disable=protected-access
+ instance._scheduler.stop_schedule = failed_end_job # pylint: disable=protected-access
+
+ # Initialize scheduler
+ with pytest.raises(
+ DagsterScheduleReconciliationError,
+ match="Error 1: Failed to stop\n Error 2: Failed to stop\n Error 3: Failed to stop",
+ ):
+ instance.reconcile_scheduler_state(
+ python_path=sys.executable,
+ repository_path=file_relative_path(__file__, '.../repository.yam'),
+ repository=repository,
+ )
+
+
+def test_reconcile_failure_when_deleting_schedule_def(
+ restore_cron_tab, repository
+): # pylint:disable=unused-argument,redefined-outer-name
+ with TemporaryDirectory() as tempdir:
+ instance = define_scheduler_instance(tempdir)
+
+ # Initialize scheduler
+ instance.reconcile_scheduler_state(
+ python_path=sys.executable,
+ repository_path=file_relative_path(__file__, '.../repository.yam'),
+ repository=repository,
+ )
+
+ assert len(instance.all_schedules(repository=repository)) == 3
+
+ def failed_end_job(*_):
+ raise DagsterSchedulerError("Failed to stop")
+
+ instance._scheduler.stop_schedule_and_delete_from_storage = ( # pylint: disable=protected-access
+ failed_end_job
+ )
+
+ with pytest.raises(
+ DagsterScheduleReconciliationError, match="Error 1: Failed to stop",
+ ):
+ instance.reconcile_scheduler_state(
+ python_path=sys.executable,
+ repository_path=file_relative_path(__file__, '.../repository.yam'),
+ repository=define_smaller_repo(),
+ )