Skip to content

Commit

Permalink
[0.7.15] Isolate updating scheduler state from updating cron tab
Browse files Browse the repository at this point in the history
Summary:
Previously, the scheduler had only two main methods to control both schedule state and cron jobs: `stop_schedule` and `start_schedule`.

In `reconcile_scheduler_state`, for every schedule that was already in the schedule storage, we would call `stop` then `start` schedule to refresh the cron job + sh files needed for the schedule to run.

However, this introduces a race condition when calling `reconcile_scheduler_state` multiple times in parallel. Since you can start/stop an already started/stopped schedule respectively, two calls to reconcile that are executing the `stop + start` block will cause each other to error.

In the previous diff, we moved `reconcile_schedule_state` to the `Scheduler` base class.

In this diff, we rename the previous `start_schedule` and `stop_schedule` methods to `start_schedule_and_update_storage_state` and `stop_schedule_and_update_storage_state` and move them to the base class. Then, the abstract methods called `start_schedule` and `end_schedule` are responsible for only starting and stopping jobs for the schedule.

Test Plan: unit

Reviewers: alangenfeld, max, prha

Reviewed By: max

Differential Revision: https://dagster.phacility.com/D3110
  • Loading branch information
helloworld committed May 29, 2020
1 parent a612c11 commit 651ce7b
Show file tree
Hide file tree
Showing 19 changed files with 568 additions and 192 deletions.
4 changes: 3 additions & 1 deletion examples/dagster_examples/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ def backfill_should_execute(context, partition_set_def, schedule_name):
is_remaining_partitions = bool(available_partitions.difference(satisfied_partitions))
if not is_remaining_partitions:
try:
context.instance.stop_schedule(context.repository, schedule_name)
context.instance.stop_schedule_and_update_storage_state(
context.repository, schedule_name
)
except OSError:
pass

Expand Down
22 changes: 11 additions & 11 deletions js_modules/dagit/src/schedules/ScheduleRow.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -53,25 +53,25 @@ const getNaturalLanguageCronString = (cronSchedule: string) => {
}
};

const errorDisplay = (status: ScheduleStatus, runningJobCount: number) => {
if (status === ScheduleStatus.STOPPED && runningJobCount == 0) {
const errorDisplay = (status: ScheduleStatus, runningScheduleCount: number) => {
if (status === ScheduleStatus.STOPPED && runningScheduleCount == 0) {
return null;
} else if (status === ScheduleStatus.RUNNING && runningJobCount == 1) {
} else if (status === ScheduleStatus.RUNNING && runningScheduleCount == 1) {
return null;
}

const errors = [];
if (status === ScheduleStatus.RUNNING && runningJobCount === 0) {
if (status === ScheduleStatus.RUNNING && runningScheduleCount === 0) {
errors.push(
"Schedule is set to be running, but the scheduler is not running the schedule"
);
} else if (status === ScheduleStatus.STOPPED && runningJobCount > 0) {
} else if (status === ScheduleStatus.STOPPED && runningScheduleCount > 0) {
errors.push(
"Schedule is set to be stopped, but the scheduler is still running the schedule"
);
}

if (runningJobCount > 0) {
if (runningScheduleCount > 0) {
errors.push("Duplicate cron job for schedule found.");
}

Expand Down Expand Up @@ -106,7 +106,7 @@ export const ScheduleRow: React.FunctionComponent<{
const {
status,
scheduleDefinition,
runningJobCount,
runningScheduleCount,
logsPath,
stats,
ticks,
Expand Down Expand Up @@ -194,7 +194,7 @@ export const ScheduleRow: React.FunctionComponent<{
}}
/>

{errorDisplay(status, runningJobCount)}
{errorDisplay(status, runningScheduleCount)}
</RowColumn>
<RowColumn style={{ flex: 1.4 }}>{displayName}</RowColumn>
<RowColumn>
Expand Down Expand Up @@ -368,7 +368,7 @@ export const ScheduleRow: React.FunctionComponent<{
export const ScheduleRowFragment = gql`
fragment ScheduleFragment on RunningSchedule {
__typename
runningJobCount
runningScheduleCount
scheduleDefinition {
name
cronSchedule
Expand Down Expand Up @@ -428,7 +428,7 @@ const START_SCHEDULE_MUTATION = gql`
... on RunningScheduleResult {
schedule {
__typename
runningJobCount
runningScheduleCount
scheduleDefinition {
__typename
name
Expand All @@ -451,7 +451,7 @@ const STOP_SCHEDULE_MUTATION = gql`
... on RunningScheduleResult {
schedule {
__typename
runningJobCount
runningScheduleCount
scheduleDefinition {
__typename
name
Expand Down
2 changes: 1 addition & 1 deletion js_modules/dagit/src/schedules/types/ScheduleFragment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export interface ScheduleFragment_stats {

export interface ScheduleFragment {
__typename: "RunningSchedule";
runningJobCount: number;
runningScheduleCount: number;
scheduleDefinition: ScheduleFragment_scheduleDefinition;
logsPath: string;
ticks: ScheduleFragment_ticks[];
Expand Down
2 changes: 1 addition & 1 deletion js_modules/dagit/src/schedules/types/ScheduleRootQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ export interface ScheduleRootQuery_scheduleOrError_RunningSchedule_attemptList {

export interface ScheduleRootQuery_scheduleOrError_RunningSchedule {
__typename: "RunningSchedule";
runningJobCount: number;
runningScheduleCount: number;
scheduleDefinition: ScheduleRootQuery_scheduleOrError_RunningSchedule_scheduleDefinition;
logsPath: string;
ticks: ScheduleRootQuery_scheduleOrError_RunningSchedule_ticks[];
Expand Down
2 changes: 1 addition & 1 deletion js_modules/dagit/src/schedules/types/SchedulesRootQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export interface SchedulesRootQuery_scheduler_Scheduler_runningSchedules_stats {

export interface SchedulesRootQuery_scheduler_Scheduler_runningSchedules {
__typename: "RunningSchedule";
runningJobCount: number;
runningScheduleCount: number;
scheduleDefinition: SchedulesRootQuery_scheduler_Scheduler_runningSchedules_scheduleDefinition;
logsPath: string;
ticks: SchedulesRootQuery_scheduler_Scheduler_runningSchedules_ticks[];
Expand Down
2 changes: 1 addition & 1 deletion js_modules/dagit/src/schedules/types/StartSchedule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export interface StartSchedule_startSchedule_RunningScheduleResult_schedule_sche

export interface StartSchedule_startSchedule_RunningScheduleResult_schedule {
__typename: "RunningSchedule";
runningJobCount: number;
runningScheduleCount: number;
scheduleDefinition: StartSchedule_startSchedule_RunningScheduleResult_schedule_scheduleDefinition;
status: ScheduleStatus;
}
Expand Down
2 changes: 1 addition & 1 deletion js_modules/dagit/src/schedules/types/StopSchedule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export interface StopSchedule_stopRunningSchedule_RunningScheduleResult_schedule

export interface StopSchedule_stopRunningSchedule_RunningScheduleResult_schedule {
__typename: "RunningSchedule";
runningJobCount: number;
runningScheduleCount: number;
scheduleDefinition: StopSchedule_stopRunningSchedule_RunningScheduleResult_schedule_scheduleDefinition;
status: ScheduleStatus;
}
Expand Down
2 changes: 1 addition & 1 deletion js_modules/dagit/src/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,7 @@ type RunningSchedule {
attempts(limit: Int): [ScheduleAttempt!]!
attemptsCount: Int!
logsPath: String!
runningJobCount: Int!
runningScheduleCount: Int!
}

type RunningScheduleResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
def start_schedule(graphene_info, schedule_name):
repository = graphene_info.context.get_repository()
instance = graphene_info.context.instance
schedule = instance.start_schedule(repository, schedule_name)
schedule = instance.start_schedule_and_update_storage_state(repository, schedule_name)
return graphene_info.schema.type_named('RunningScheduleResult')(
schedule=graphene_info.schema.type_named('RunningSchedule')(
graphene_info, schedule=schedule
Expand All @@ -28,7 +28,7 @@ def start_schedule(graphene_info, schedule_name):
def stop_schedule(graphene_info, schedule_name):
repository = graphene_info.context.get_repository()
instance = graphene_info.context.instance
schedule = instance.stop_schedule(repository, schedule_name)
schedule = instance.stop_schedule_and_update_storage_state(repository, schedule_name)
return graphene_info.schema.type_named('RunningScheduleResult')(
schedule=graphene_info.schema.type_named('RunningSchedule')(
graphene_info, schedule=schedule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
from dagster.core.definitions import ScheduleDefinition, ScheduleExecutionContext
from dagster.core.definitions.partition import PartitionScheduleDefinition
from dagster.core.errors import ScheduleExecutionError, user_code_error_boundary
from dagster.core.scheduler import Schedule, ScheduleTickStatus
from dagster.core.scheduler.scheduler import ScheduleTickStatsSnapshot
from dagster.core.scheduler import Schedule, ScheduleTickStatsSnapshot, ScheduleTickStatus
from dagster.core.storage.pipeline_run import PipelineRunsFilter


Expand Down Expand Up @@ -208,7 +207,7 @@ class Meta(object):
attempts = dauphin.Field(dauphin.non_null_list('ScheduleAttempt'), limit=dauphin.Int())
attempts_count = dauphin.NonNull(dauphin.Int)
logs_path = dauphin.NonNull(dauphin.String)
running_job_count = dauphin.NonNull(dauphin.Int)
running_schedule_count = dauphin.NonNull(dauphin.Int)

def __init__(self, graphene_info, schedule):
self._schedule = check.inst_param(schedule, 'schedule', Schedule)
Expand All @@ -223,12 +222,12 @@ def __init__(self, graphene_info, schedule):
repository_path=schedule.repository_path,
)

def resolve_running_job_count(self, graphene_info):
def resolve_running_schedule_count(self, graphene_info):
repository = graphene_info.context.get_repository()
running_job_count = graphene_info.context.instance.running_job_count(
running_schedule_count = graphene_info.context.instance.running_schedule_count(
repository.name, self._schedule.name
)
return running_job_count
return running_schedule_count

# TODO: Delete in 0.8.0 release
# https://github.com/dagster-io/dagster/issues/228
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,3 +329,60 @@
],
'ticksCount': 1
}

snapshots['test_tick_success 1'] = {
'scheduleDefinition': {
'name': 'no_config_pipeline_hourly_schedule'
},
'stats': {
'ticksFailed': 0,
'ticksSkipped': 0,
'ticksStarted': 0,
'ticksSucceeded': 1
},
'ticks': [
{
'status': 'SUCCESS',
'tickId': '1'
}
],
'ticksCount': 1
}

snapshots['test_should_execute_scheduler_error 1'] = {
'scheduleDefinition': {
'name': 'should_execute_error_schedule'
},
'stats': {
'ticksFailed': 1,
'ticksSkipped': 0,
'ticksStarted': 0,
'ticksSucceeded': 0
},
'ticks': [
{
'status': 'FAILURE',
'tickId': '1'
}
],
'ticksCount': 1
}

snapshots['test_invalid_config_schedule_error 1'] = {
'scheduleDefinition': {
'name': 'invalid_config_schedule'
},
'stats': {
'ticksFailed': 0,
'ticksSkipped': 0,
'ticksStarted': 0,
'ticksSucceeded': 1
},
'ticks': [
{
'status': 'SUCCESS',
'tickId': '1'
}
],
'ticksCount': 1
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ def test_get_all_schedules():
)

# Start schedule
schedule = instance.start_schedule(repository, "no_config_pipeline_hourly_schedule")
schedule = instance.start_schedule_and_update_storage_state(
repository, "no_config_pipeline_hourly_schedule"
)

# Query Scheduler + all Schedules
scheduler_result = execute_dagster_graphql(context, GET_SCHEDULES_QUERY)
Expand Down
16 changes: 9 additions & 7 deletions python_modules/dagster/dagster/cli/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,14 +281,16 @@ def execute_start_command(schedule_name, all_flag, cli_args, print_fn):
if all_flag:
for schedule in instance.all_schedules(repository):
try:
schedule = instance.start_schedule(repository, schedule.name)
schedule = instance.start_schedule_and_update_storage_state(
repository, schedule.name
)
except DagsterInvariantViolationError as ex:
raise click.UsageError(ex)

print_fn("Started all schedules for repository {name}".format(name=repository.name))
else:
try:
schedule = instance.start_schedule(repository, schedule_name)
schedule = instance.start_schedule_and_update_storage_state(repository, schedule_name)
except DagsterInvariantViolationError as ex:
raise click.UsageError(ex)

Expand All @@ -311,7 +313,7 @@ def execute_stop_command(schedule_name, cli_args, print_fn, instance=None):
repository = handle.build_repository_definition()

try:
instance.stop_schedule(repository, schedule_name)
instance.stop_schedule_and_update_storage_state(repository, schedule_name)
except DagsterInvariantViolationError as ex:
raise click.UsageError(ex)

Expand Down Expand Up @@ -368,8 +370,8 @@ def execute_restart_command(schedule_name, all_running_flag, cli_args, print_fn)
for schedule in instance.all_schedules(repository):
if schedule.status == ScheduleStatus.RUNNING:
try:
instance.stop_schedule(repository, schedule.name)
instance.start_schedule(repository, schedule.name)
instance.stop_schedule_and_update_storage_state(repository, schedule.name)
instance.start_schedule_and_update_storage_state(repository, schedule.name)
except DagsterInvariantViolationError as ex:
raise click.UsageError(ex)

Expand All @@ -386,8 +388,8 @@ def execute_restart_command(schedule_name, all_running_flag, cli_args, print_fn)
)

try:
instance.stop_schedule(repository, schedule_name)
instance.start_schedule(repository, schedule_name)
instance.stop_schedule_and_update_storage_state(repository, schedule_name)
instance.start_schedule_and_update_storage_state(repository, schedule_name)
except DagsterInvariantViolationError as ex:
raise click.UsageError(ex)

Expand Down
28 changes: 17 additions & 11 deletions python_modules/dagster/dagster/core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -807,17 +807,23 @@ def reconcile_scheduler_state(self, python_path, repository_path, repository):
self, repository, python_path, repository_path
)

def start_schedule(self, repository, schedule_name):
return self._scheduler.start_schedule(self, repository, schedule_name)
def start_schedule_and_update_storage_state(self, repository, schedule_name):
return self._scheduler.start_schedule_and_update_storage_state(
self, repository, schedule_name
)

def stop_schedule(self, repository, schedule_name):
return self._scheduler.stop_schedule(self, repository, schedule_name)
def stop_schedule_and_update_storage_state(self, repository, schedule_name):
return self._scheduler.stop_schedule_and_update_storage_state(
self, repository, schedule_name
)

def end_schedule(self, repository, schedule_name):
return self._scheduler.end_schedule(self, repository, schedule_name)
def stop_schedule_and_delete_from_storage(self, repository, schedule_name):
return self._scheduler.stop_schedule_and_delete_from_storage(
self, repository, schedule_name
)

def running_job_count(self, repository_name, schedule_name):
return self._scheduler.running_job_count(repository_name, schedule_name)
def running_schedule_count(self, repository_name, schedule_name):
return self._scheduler.running_schedule_count(repository_name, schedule_name)

def scheduler_debug_info(self):
from dagster.core.scheduler import SchedulerDebugInfo, ScheduleStatus
Expand All @@ -827,22 +833,22 @@ def scheduler_debug_info(self):
schedule_info = self.all_schedules_info()
schedules = []
for repository_name, schedule in schedule_info:
if schedule.status == ScheduleStatus.RUNNING and not self.running_job_count(
if schedule.status == ScheduleStatus.RUNNING and not self.running_schedule_count(
repository_name, schedule.name
):
errors.append(
"Schedule {schedule_name} is set to be running, but the scheduler is not "
"running the schedule.".format(schedule_name=schedule.name)
)
elif schedule.status == ScheduleStatus.STOPPED and self.running_job_count(
elif schedule.status == ScheduleStatus.STOPPED and self.running_schedule_count(
repository_name, schedule.name
):
errors.append(
"Schedule {schedule_name} is set to be stopped, but the scheduler is still running "
"the schedule.".format(schedule_name=schedule.name)
)

if self.running_job_count(repository_name, schedule.name) > 1:
if self.running_schedule_count(repository_name, schedule.name) > 1:
errors.append(
"Duplicate jobs found: More than one job for schedule {schedule_name} are "
"running on the scheduler.".format(schedule_name=schedule.name)
Expand Down
4 changes: 4 additions & 0 deletions python_modules/dagster/dagster/core/scheduler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from .scheduler import (
DagsterScheduleDoesNotExist,
DagsterScheduleReconciliationError,
DagsterSchedulerError,
Schedule,
ScheduleDefinitionData,
ScheduleStatus,
ScheduleTick,
ScheduleTickStatsSnapshot,
ScheduleTickStatus,
Scheduler,
SchedulerDebugInfo,
Expand Down
Loading

0 comments on commit 651ce7b

Please sign in to comment.