Skip to content

Commit

Permalink
Move reconcile_scheduler_state to Scheduler
Browse files Browse the repository at this point in the history
Test Plan: unit

Reviewers: alangenfeld, prha

Reviewed By: prha

Differential Revision: https://dagster.phacility.com/D3149
  • Loading branch information
helloworld committed May 28, 2020
1 parent b602418 commit a612c11
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 171 deletions.
15 changes: 9 additions & 6 deletions examples/dagster_examples_tests/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
from dagster import seven
from dagster.cli.pipeline import execute_list_command, pipeline_list_command
from dagster.core.instance import DagsterInstance
from dagster.core.scheduler import reconcile_scheduler_state
from dagster.core.storage.schedules.sqlite.sqlite_schedule_storage import SqliteScheduleStorage
from dagster.utils import file_relative_path, script_relative_path
from dagster.utils.test import FilesytemTestScheduler


def no_print(_):
Expand Down Expand Up @@ -40,19 +41,21 @@ def test_schedules():
with seven.TemporaryDirectory() as temp_dir:
instance = DagsterInstance.local_temp(temp_dir)

# Patch scheduler and schedule storage.
instance._schedule_storage = SqliteScheduleStorage.from_local( # pylint: disable=protected-access
temp_dir
)
instance._scheduler = FilesytemTestScheduler(temp_dir) # pylint: disable=protected-access

context = define_context_for_repository_yaml(
path=file_relative_path(__file__, '../repository.yaml'), instance=instance
)

# We need to call up on the scheduler handle to persist
# state about the schedules to disk before running them.
# Note: This dependency will be removed soon.
repository = context.get_repository()
reconcile_scheduler_state(
instance.reconcile_scheduler_state(
python_path=sys.executable,
repository_path=file_relative_path(__file__, '../'),
repository=repository,
instance=instance,
)

for schedule_name in [
Expand Down
5 changes: 2 additions & 3 deletions python_modules/dagit/dagit/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from dagster import check, seven
from dagster.core.execution.compute_logs import warn_if_compute_logs_disabled
from dagster.core.instance import DagsterInstance
from dagster.core.scheduler import reconcile_scheduler_state
from dagster.core.snap import ActiveRepositoryData
from dagster.core.storage.compute_log_manager import ComputeIOType

Expand Down Expand Up @@ -237,8 +236,8 @@ def create_app_with_execution_handle(handle, instance, reloader=None):
handle = context.get_handle()
python_path = sys.executable
repository_path = handle.data.repository_yaml
reconcile_scheduler_state(
python_path, repository_path, repository=repository, instance=instance
instance.reconcile_scheduler_state(
repository=repository, python_path=python_path, repository_path=repository_path
)
else:
warnings.warn(MISSING_SCHEDULER_WARNING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

from dagster import seven
from dagster.core.instance import DagsterInstance, InstanceType
from dagster.core.scheduler import reconcile_scheduler_state
from dagster.core.scheduler.scheduler import ScheduleTickStatus
from dagster.core.storage.event_log import InMemoryEventLogStorage
from dagster.core.storage.local_compute_log_manager import NoOpComputeLogManager
Expand Down Expand Up @@ -358,7 +357,11 @@ def test_tick_success(snapshot):
)
repository = context.get_repository()

reconcile_scheduler_state("", "", repository, instance)
instance.reconcile_scheduler_state(
repository=repository,
python_path='/path/to/python',
repository_path='/path/to/repository',
)
schedule_def = repository.get_schedule_def("no_config_pipeline_hourly_schedule")

start_time = time.time()
Expand Down Expand Up @@ -396,7 +399,11 @@ def test_tick_skip(snapshot):
path=file_relative_path(__file__, '../repository.yaml'), instance=instance
)
repository = context.get_repository()
reconcile_scheduler_state("", "", repository, instance)
instance.reconcile_scheduler_state(
repository=repository,
python_path='/path/to/python',
repository_path='/path/to/repository',
)

execute_dagster_graphql(
context,
Expand Down Expand Up @@ -428,7 +435,11 @@ def test_should_execute_scheduler_error(snapshot):
path=file_relative_path(__file__, '../repository.yaml'), instance=instance
)
repository = context.get_repository()
reconcile_scheduler_state("", "", repository, instance)
instance.reconcile_scheduler_state(
repository=repository,
python_path='/path/to/python',
repository_path='/path/to/repository',
)

execute_dagster_graphql(
context,
Expand Down Expand Up @@ -465,7 +476,11 @@ def test_tags_scheduler_error(snapshot):
path=file_relative_path(__file__, '../repository.yaml'), instance=instance
)
repository = context.get_repository()
reconcile_scheduler_state("", "", repository, instance)
instance.reconcile_scheduler_state(
repository=repository,
python_path='/path/to/python',
repository_path='/path/to/repository',
)

result = execute_dagster_graphql(
context,
Expand Down Expand Up @@ -501,7 +516,11 @@ def test_enviornment_dict_scheduler_error(snapshot):
path=file_relative_path(__file__, '../repository.yaml'), instance=instance
)
repository = context.get_repository()
reconcile_scheduler_state("", "", repository, instance)
instance.reconcile_scheduler_state(
repository=repository,
python_path='/path/to/python',
repository_path='/path/to/repository',
)

result = execute_dagster_graphql(
context,
Expand Down Expand Up @@ -538,7 +557,11 @@ def test_enviornment_dict_scheduler_error_serialize_cauze():
path=file_relative_path(__file__, '../repository.yaml'), instance=instance
)
repository = context.get_repository()
reconcile_scheduler_state("", "", repository, instance)
instance.reconcile_scheduler_state(
repository=repository,
python_path='/path/to/python',
repository_path='/path/to/repository',
)

result = execute_dagster_graphql(
context,
Expand All @@ -565,7 +588,11 @@ def test_query_multiple_schedule_ticks(snapshot):
path=file_relative_path(__file__, '../repository.yaml'), instance=instance
)
repository = context.get_repository()
reconcile_scheduler_state("", "", repository, instance)
instance.reconcile_scheduler_state(
repository=repository,
python_path='/path/to/python',
repository_path='/path/to/repository',
)

for scheduleName in [
'no_config_pipeline_hourly_schedule',
Expand Down Expand Up @@ -671,7 +698,11 @@ def test_invalid_config_schedule_error(snapshot):
path=file_relative_path(__file__, '../repository.yaml'), instance=instance
)
repository = context.get_repository()
reconcile_scheduler_state("", "", repository, instance)
instance.reconcile_scheduler_state(
repository=repository,
python_path='/path/to/python',
repository_path='/path/to/repository',
)

result = execute_dagster_graphql(
context,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
import os
import sys

import mock
from dagster_graphql.test.utils import define_context_for_repository_yaml, execute_dagster_graphql

from dagster import ScheduleDefinition, seven
from dagster.core.instance import DagsterInstance, InstanceType
from dagster.core.scheduler import (
Schedule,
ScheduleStatus,
get_schedule_change_set,
reconcile_scheduler_state,
)
from dagster.core.scheduler import Schedule, ScheduleStatus, get_schedule_change_set
from dagster.core.storage.event_log import InMemoryEventLogStorage
from dagster.core.storage.local_compute_log_manager import NoOpComputeLogManager
from dagster.core.storage.root import LocalArtifactStorage
Expand Down Expand Up @@ -74,11 +68,10 @@ def test_get_all_schedules():

# Initialize scheduler
repository = context.get_repository()
reconcile_scheduler_state(
python_path=sys.executable,
repository_path="",
instance.reconcile_scheduler_state(
repository=repository,
instance=instance,
python_path='/path/to/python',
repository_path='/path/to/repository',
)

# Start schedule
Expand Down
10 changes: 4 additions & 6 deletions python_modules/dagster/dagster/cli/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@
from dagster import DagsterInvariantViolationError, check
from dagster.cli.load_handle import handle_for_repo_cli_args
from dagster.core.instance import DagsterInstance
from dagster.core.scheduler import (
ScheduleStatus,
get_schedule_change_set,
reconcile_scheduler_state,
)
from dagster.core.scheduler import ScheduleStatus, get_schedule_change_set
from dagster.utils import DEFAULT_REPOSITORY_YAML_FILENAME


Expand Down Expand Up @@ -183,7 +179,9 @@ def execute_up_command(preview, cli_args, print_fn):
return

try:
reconcile_scheduler_state(python_path, repository_path, repository, instance=instance)
instance.reconcile_scheduler_state(
repository=repository, python_path=python_path, repository_path=repository_path
)
except DagsterInvariantViolationError as ex:
raise click.UsageError(ex)

Expand Down
5 changes: 5 additions & 0 deletions python_modules/dagster/dagster/core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,11 @@ def launch_run(self, run_id):

# Scheduler

def reconcile_scheduler_state(self, python_path, repository_path, repository):
return self._scheduler.reconcile_scheduler_state(
self, repository, python_path, repository_path
)

def start_schedule(self, repository, schedule_name):
return self._scheduler.start_schedule(self, repository, schedule_name)

Expand Down
1 change: 0 additions & 1 deletion python_modules/dagster/dagster/core/scheduler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,4 @@
SchedulerDebugInfo,
SchedulerHandle,
get_schedule_change_set,
reconcile_scheduler_state,
)
Loading

0 comments on commit a612c11

Please sign in to comment.