From e9365bb8f8c9421787b1dc0ab8628cfb33ce0225 Mon Sep 17 00:00:00 2001 From: Christopher DeCarolis Date: Thu, 19 Dec 2024 09:09:44 -0800 Subject: [PATCH] [module-loaders] [rfc] Definitions from module loader (#26546) ## Summary & Motivation This PR is a prototype of a top-level API to load dagster definitions from across a module into a single returned `Definitions` object. It is not intended for landing yet. Just want to align on direction. ### Why add this API? For any user who is bought into a module structure making use of `load_assets_from_x`, we are currently making their life unnecessarily more difficult by not bringing in other objects that are scoped at module-load time. Those users agree - we've received requests for an API to do that numerous times. In absence of a compelling replacement for our current project structure, I think the existence of this API is a good stopgap to improve module ergonomics. ### Why are resources, loggers, and executor provided as args? It seems like the most straightforward way to support these objects without some sort of additional magic. Since we force you to provide a key in addition to the class itself, there's not currently a module-scoped pattern that matches how these objects are defined on a `Definitions` argument. So it seems reasonable to accept these as parameters. The other approach would be to allow users to configure resources as variables and use the key as a variable. But I figure the more conservative approach here leaves us room to also do that in the future (can imagine some sort of combination step). ```python dbt = DbtCliResource(...) # equivalent of {"dbt": DbtCliResource(...)} ``` For executor, the fact that we only accept one per module is kind of unique. I don't think a user would define an executor in the same way that they define a sensor, schedule, or job, for example, where the definition is inlined to the module. I think it makes more sense for the user to need to provide this explicitly. ### Why does this return a Definitions object I don't think there is any reasonable alternative. The existing `load_assets_from_x` is nice because that whole list can be provided directly to a `Definitions` object - this is not the case for if we provided a flat list of heterogeneous types of Dagster objects - this could not be provided directly to a `Definitions` object. Unless we added some sort of `from_list` constructor or something. But I think this is more straightforward. It also gives us the opportunity to potentially paper over some stuff; if we so choose - we can, for example, automatically coerce source assets and AssetSpec objects into resolved AssetsDefinitions. ### Should this thing take in other `Definitions` objects? Right now, I think no. While `Definitions.merge` exists, it's obscured and not documented, I think most users think of a single `Definitions` object as being synonymous with a code location. ### What's the intended design pattern for using this? I think the intended use case for this would be to provide one call at the top level of the module of a given code location; and automatically load in all of your dagster defs. ``` defs = load_definitions_from_module(current_module, resources=..., loggers=...) ``` ## How I Tested These Changes I added a new test which operates on all test specs and calls this fxn, and also one for handling the resource and logger cases. --- .../module_loaders/load_defs_from_module.py | 24 +++++++ .../definitions/module_loaders/object_list.py | 44 +++++++++++- .../test_module_loaders.py | 69 ++++++++++++++++++- 3 files changed, 135 insertions(+), 2 deletions(-) create mode 100644 python_modules/dagster/dagster/_core/definitions/module_loaders/load_defs_from_module.py diff --git a/python_modules/dagster/dagster/_core/definitions/module_loaders/load_defs_from_module.py b/python_modules/dagster/dagster/_core/definitions/module_loaders/load_defs_from_module.py new file mode 100644 index 0000000000000..53b8344bbd0f8 --- /dev/null +++ b/python_modules/dagster/dagster/_core/definitions/module_loaders/load_defs_from_module.py @@ -0,0 +1,24 @@ +from types import ModuleType +from typing import Optional + +from dagster._core.definitions.definitions_class import Definitions +from dagster._core.definitions.module_loaders.object_list import ModuleScopedDagsterObjects +from dagster._core.definitions.module_loaders.utils import ( + ExecutorObject, + LoggerDefinitionKeyMapping, + ResourceDefinitionMapping, +) + + +def load_definitions_from_module( + module: ModuleType, + resources: Optional[ResourceDefinitionMapping] = None, + loggers: Optional[LoggerDefinitionKeyMapping] = None, + executor: Optional[ExecutorObject] = None, +) -> Definitions: + return Definitions( + **ModuleScopedDagsterObjects.from_modules([module]).get_object_list().to_definitions_args(), + resources=resources, + loggers=loggers, + executor=executor, + ) diff --git a/python_modules/dagster/dagster/_core/definitions/module_loaders/object_list.py b/python_modules/dagster/dagster/_core/definitions/module_loaders/object_list.py index 2252f0f8bb6d6..15f5adc340926 100644 --- a/python_modules/dagster/dagster/_core/definitions/module_loaders/object_list.py +++ b/python_modules/dagster/dagster/_core/definitions/module_loaders/object_list.py @@ -1,7 +1,7 @@ from collections import defaultdict from functools import cached_property from types import ModuleType -from typing import Callable, Dict, Iterable, Mapping, Optional, Sequence, Union, cast +from typing import Any, Callable, Dict, Iterable, Mapping, Optional, Sequence, Union, cast from dagster._core.definitions.asset_checks import AssetChecksDefinition, has_only_asset_checks from dagster._core.definitions.asset_key import AssetKey, CoercibleToAssetKeyPrefix @@ -15,6 +15,7 @@ from dagster._core.definitions.freshness_policy import FreshnessPolicy from dagster._core.definitions.module_loaders.utils import ( JobDefinitionObject, + LoadableAssetObject, LoadableDagsterObject, RuntimeAssetObjectTypes, RuntimeDagsterObjectTypes, @@ -210,6 +211,38 @@ def cacheable_assets(self) -> Sequence[CacheableAssetsDefinition]: asset for asset in self.loaded_objects if isinstance(asset, CacheableAssetsDefinition) ] + @cached_property + def sensors(self) -> Sequence[SensorDefinition]: + return [ + dagster_object + for dagster_object in self.loaded_objects + if isinstance(dagster_object, SensorDefinition) + ] + + @cached_property + def schedules(self) -> Sequence[ScheduleDefinitionObject]: + return [ + dagster_object + for dagster_object in self.loaded_objects + if isinstance(dagster_object, RuntimeScheduleObjectTypes) + ] + + @cached_property + def jobs(self) -> Sequence[JobDefinitionObject]: + return [ + dagster_object + for dagster_object in self.loaded_objects + if isinstance(dagster_object, RuntimeJobObjectTypes) + ] + + @cached_property + def assets(self) -> Sequence[LoadableAssetObject]: + return [ + *self.assets_defs_and_specs, + *self.source_assets, + *self.cacheable_assets, + ] + def get_objects( self, filter_fn: Callable[[LoadableDagsterObject], bool] ) -> Sequence[LoadableDagsterObject]: @@ -324,6 +357,15 @@ def with_attributes( ) return DagsterObjectsList(return_list) + def to_definitions_args(self) -> Mapping[str, Any]: + return { + "assets": self.assets, + "asset_checks": self.checks_defs, + "sensors": self.sensors, + "schedules": self.schedules, + "jobs": self.jobs, + } + def _spec_mapper_disallow_group_override( group_name: Optional[str], automation_condition: Optional[AutomationCondition] diff --git a/python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/test_module_loaders.py b/python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/test_module_loaders.py index ccdbd5492a678..3fee6021fab99 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/test_module_loaders.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/module_loader_tests/test_module_loaders.py @@ -1,10 +1,16 @@ +import logging from contextlib import contextmanager from types import ModuleType -from typing import Any, Mapping, Sequence, Type +from typing import Any, Mapping, Sequence, Type, cast import dagster as dg import pytest +from dagster._core.definitions.definitions_class import Definitions +from dagster._core.definitions.module_loaders.load_defs_from_module import ( + load_definitions_from_module, +) from dagster._core.definitions.module_loaders.object_list import ModuleScopedDagsterObjects +from dagster._core.definitions.module_loaders.utils import LoadableDagsterObject from dagster._record import record @@ -42,6 +48,16 @@ def my_check() -> dg.AssetCheckResult: return my_check +def all_loadable_objects_from_defs(defs: Definitions) -> Sequence[LoadableDagsterObject]: + return [ + *(defs.assets or []), + *(defs.sensors or []), + *(defs.schedules or []), + *(defs.asset_checks or []), + *(defs.jobs or []), + ] + + @contextmanager def optional_pytest_raise(error_expected: bool, exception_cls: Type[Exception]): if error_expected: @@ -156,3 +172,54 @@ def test_collision_detection(objects: Mapping[str, Any], error_expected: bool) - obj_list = ModuleScopedDagsterObjects.from_modules([module_fake]).get_object_list() obj_ids = {id(obj) for obj in objects.values()} assert len(obj_list.loaded_objects) == len(obj_ids) + + +@pytest.mark.parametrize(**ModuleScopeTestSpec.as_parametrize_kwargs(MODULE_TEST_SPECS)) +def test_load_from_definitions(objects: Mapping[str, Any], error_expected: bool) -> None: + module_fake = build_module_fake("fake", objects) + with optional_pytest_raise( + error_expected=error_expected, exception_cls=dg.DagsterInvalidDefinitionError + ): + defs = load_definitions_from_module(module_fake) + obj_ids = {id(obj) for obj in all_loadable_objects_from_defs(defs)} + expected_obj_ids = {id(obj) for obj in objects.values()} + assert len(obj_ids) == len(expected_obj_ids) + + +def test_load_with_resources() -> None: + @dg.resource + def my_resource(): ... + + module_fake = build_module_fake("foo", {"my_resource": my_resource}) + defs = load_definitions_from_module(module_fake) + assert len(all_loadable_objects_from_defs(defs)) == 0 + assert len(defs.resources or {}) == 0 + defs = load_definitions_from_module(module_fake, resources={"foo": my_resource}) + assert len(defs.resources or {}) == 1 + + +def test_load_with_logger_defs() -> None: + @dg.logger(config_schema={}) + def my_logger(init_context) -> logging.Logger: ... + + module_fake = build_module_fake("foo", {"my_logger": my_logger}) + defs = load_definitions_from_module(module_fake) + assert len(all_loadable_objects_from_defs(defs)) == 0 + assert len(defs.resources or {}) == 0 + defs = load_definitions_from_module(module_fake, resources={"foo": my_logger}) + assert len(defs.resources or {}) == 1 + + +def test_load_with_executor() -> None: + @dg.executor(name="my_executor") + def my_executor(init_context) -> dg.Executor: ... + + module_fake = build_module_fake("foo", {"my_executor": my_executor}) + defs = load_definitions_from_module(module_fake) + assert len(all_loadable_objects_from_defs(defs)) == 0 + assert defs.executor is None + defs = load_definitions_from_module(module_fake, executor=my_executor) + assert ( + defs.executor is not None + and cast(dg.ExecutorDefinition, defs.executor).name == "my_executor" + )