Skip to content

Commit

Permalink
[module-loaders] [rfc] Definitions from module loader (#26546)
Browse files Browse the repository at this point in the history
## Summary & Motivation
This PR is a prototype of a top-level API to load dagster definitions
from across a module into a single returned `Definitions` object. It is
not intended for landing yet. Just want to align on direction.

### Why add this API?
For any user who is bought into a module structure making use of
`load_assets_from_x`, we are currently making their life unnecessarily
more difficult by not bringing in other objects that are scoped at
module-load time. Those users agree - we've received requests for an API
to do that numerous times.

In absence of a compelling replacement for our current project
structure, I think the existence of this API is a good stopgap to
improve module ergonomics.

### Why are resources, loggers, and executor provided as args?
It seems like the most straightforward way to support these objects
without some sort of additional magic. Since we force you to provide a
key in addition to the class itself, there's not currently a
module-scoped pattern that matches how these objects are defined on a
`Definitions` argument. So it seems reasonable to accept these as
parameters.

The other approach would be to allow users to configure resources as
variables and use the key as a variable. But I figure the more
conservative approach here leaves us room to also do that in the future
(can imagine some sort of combination step).
```python
dbt = DbtCliResource(...)
# equivalent of
{"dbt": DbtCliResource(...)}
```

For executor, the fact that we only accept one per module is kind of
unique. I don't think a user would define an executor in the same way
that they define a sensor, schedule, or job, for example, where the
definition is inlined to the module. I think it makes more sense for the
user to need to provide this explicitly.

### Why does this return a Definitions object
I don't think there is any reasonable alternative. The existing
`load_assets_from_x` is nice because that whole list can be provided
directly to a `Definitions` object - this is not the case for if we
provided a flat list of heterogeneous types of Dagster objects - this
could not be provided directly to a `Definitions` object. Unless we
added some sort of `from_list` constructor or something. But I think
this is more straightforward.

It also gives us the opportunity to potentially paper over some stuff;
if we so choose - we can, for example, automatically coerce source
assets and AssetSpec objects into resolved AssetsDefinitions.

### Should this thing take in other `Definitions` objects?
Right now, I think no. While `Definitions.merge` exists, it's obscured
and not documented, I think most users think of a single `Definitions`
object as being synonymous with a code location.

### What's the intended design pattern for using this?
I think the intended use case for this would be to provide one call at
the top level of the module of a given code location; and automatically
load in all of your dagster defs.

```
defs = load_definitions_from_module(current_module, resources=..., loggers=...)
```

## How I Tested These Changes
I added a new test which operates on all test specs and calls this fxn,
and also one for handling the resource and logger cases.
  • Loading branch information
dpeng817 committed Dec 19, 2024
1 parent 620c297 commit f160acc
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from types import ModuleType
from typing import Any, Mapping, Optional, Union

from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.executor_definition import ExecutorDefinition
from dagster._core.definitions.logger_definition import LoggerDefinition
from dagster._core.definitions.module_loaders.object_list import ModuleScopedDagsterObjects
from dagster._core.executor.base import Executor


def load_definitions_from_module(
module: ModuleType,
resources: Optional[Mapping[str, Any]] = None,
loggers: Optional[Mapping[str, LoggerDefinition]] = None,
executor: Optional[Union[Executor, ExecutorDefinition]] = None,
) -> Definitions:
return Definitions(
**ModuleScopedDagsterObjects.from_modules([module]).get_object_list().to_definitions_args(),
resources=resources,
loggers=loggers,
executor=executor,
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from collections import defaultdict
from functools import cached_property
from types import ModuleType
from typing import Callable, Dict, Iterable, Mapping, Optional, Sequence, Union, cast
from typing import Any, Callable, Dict, Iterable, Mapping, Optional, Sequence, Union, cast

from dagster._core.definitions.asset_checks import AssetChecksDefinition, has_only_asset_checks
from dagster._core.definitions.asset_key import AssetKey, CoercibleToAssetKeyPrefix
Expand All @@ -15,6 +15,7 @@
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.module_loaders.utils import (
JobDefinitionObject,
LoadableAssetObject,
LoadableDagsterObject,
RuntimeAssetObjectTypes,
RuntimeDagsterObjectTypes,
Expand Down Expand Up @@ -210,6 +211,38 @@ def cacheable_assets(self) -> Sequence[CacheableAssetsDefinition]:
asset for asset in self.loaded_objects if isinstance(asset, CacheableAssetsDefinition)
]

@cached_property
def sensors(self) -> Sequence[SensorDefinition]:
return [
dagster_object
for dagster_object in self.loaded_objects
if isinstance(dagster_object, SensorDefinition)
]

@cached_property
def schedules(self) -> Sequence[ScheduleDefinitionObject]:
return [
dagster_object
for dagster_object in self.loaded_objects
if isinstance(dagster_object, RuntimeScheduleObjectTypes)
]

@cached_property
def jobs(self) -> Sequence[JobDefinitionObject]:
return [
dagster_object
for dagster_object in self.loaded_objects
if isinstance(dagster_object, RuntimeJobObjectTypes)
]

@cached_property
def assets(self) -> Sequence[LoadableAssetObject]:
return [
*self.assets_defs_and_specs,
*self.source_assets,
*self.cacheable_assets,
]

def get_objects(
self, filter_fn: Callable[[LoadableDagsterObject], bool]
) -> Sequence[LoadableDagsterObject]:
Expand Down Expand Up @@ -324,6 +357,15 @@ def with_attributes(
)
return DagsterObjectsList(return_list)

def to_definitions_args(self) -> Mapping[str, Any]:
return {
"assets": self.assets,
"asset_checks": self.checks_defs,
"sensors": self.sensors,
"schedules": self.schedules,
"jobs": self.jobs,
}


def _spec_mapper_disallow_group_override(
group_name: Optional[str], automation_condition: Optional[AutomationCondition]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
GIT_ROOT_PATH = os.path.normpath(os.path.join(DAGSTER_PACKAGE_PATH, "../../"))

# path of the current file relative to the `dagster` package root
PATH_IN_PACKAGE = "/dagster_tests/asset_defs_tests/"
PATH_IN_PACKAGE = "/dagster_tests/definitions_tests/module_loader_tests/"

# {path to module}:{path to file relative to module root}:{line number}
EXPECTED_ORIGINS = {
Expand All @@ -42,8 +42,8 @@


def test_asset_code_origins() -> None:
from dagster_tests.asset_defs_tests import asset_package
from dagster_tests.asset_defs_tests.asset_package import module_with_assets
from dagster_tests.definitions_tests.module_loader_tests import asset_package
from dagster_tests.definitions_tests.module_loader_tests.asset_package import module_with_assets

collection = load_assets_from_modules([asset_package, module_with_assets])

Expand Down Expand Up @@ -105,8 +105,8 @@ def test_asset_code_origins() -> None:


def test_asset_code_origins_source_control() -> None:
from dagster_tests.asset_defs_tests import asset_package
from dagster_tests.asset_defs_tests.asset_package import module_with_assets
from dagster_tests.definitions_tests.module_loader_tests import asset_package
from dagster_tests.definitions_tests.module_loader_tests.asset_package import module_with_assets

collection = load_assets_from_modules([asset_package, module_with_assets])

Expand Down Expand Up @@ -161,8 +161,8 @@ def test_asset_code_origins_source_control() -> None:
def test_asset_code_origins_source_control_custom_mapping() -> None:
# test custom source_control_file_path_mapping fn

from dagster_tests.asset_defs_tests import asset_package
from dagster_tests.asset_defs_tests.asset_package import module_with_assets
from dagster_tests.definitions_tests.module_loader_tests import asset_package
from dagster_tests.definitions_tests.module_loader_tests.asset_package import module_with_assets

collection = load_assets_from_modules([asset_package, module_with_assets])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def test_load_assets_from_modules_with_group_name():
def test_respect_existing_groups():
assets = load_assets_from_current_module()
assets_def = next(iter(a for a in assets if isinstance(a, AssetsDefinition)))
assert assets_def.group_names_by_key.get(AssetKey("asset_in_current_module")) == "my_group" # pyright: ignore[reportAttributeAccessIssue]
assert assets_def.group_names_by_key.get(AssetKey("asset_in_current_module")) == "my_group"

with pytest.raises(DagsterInvalidDefinitionError):
load_assets_from_current_module(group_name="yay")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import logging
from contextlib import contextmanager
from types import ModuleType
from typing import Any, Mapping, Sequence, Type
from typing import Any, Mapping, Sequence, Type, cast

import dagster as dg
import pytest
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.module_loaders.load_defs_from_module import (
load_definitions_from_module,
)
from dagster._core.definitions.module_loaders.object_list import ModuleScopedDagsterObjects
from dagster._core.definitions.module_loaders.utils import LoadableDagsterObject
from dagster._record import record


Expand Down Expand Up @@ -42,6 +48,16 @@ def my_check() -> dg.AssetCheckResult:
return my_check


def all_loadable_objects_from_defs(defs: Definitions) -> Sequence[LoadableDagsterObject]:
return [
*(defs.assets or []),
*(defs.sensors or []),
*(defs.schedules or []),
*(defs.asset_checks or []),
*(defs.jobs or []),
]


@contextmanager
def optional_pytest_raise(error_expected: bool, exception_cls: Type[Exception]):
if error_expected:
Expand Down Expand Up @@ -156,3 +172,54 @@ def test_collision_detection(objects: Mapping[str, Any], error_expected: bool) -
obj_list = ModuleScopedDagsterObjects.from_modules([module_fake]).get_object_list()
obj_ids = {id(obj) for obj in objects.values()}
assert len(obj_list.loaded_objects) == len(obj_ids)


@pytest.mark.parametrize(**ModuleScopeTestSpec.as_parametrize_kwargs(MODULE_TEST_SPECS))
def test_load_from_definitions(objects: Mapping[str, Any], error_expected: bool) -> None:
module_fake = build_module_fake("fake", objects)
with optional_pytest_raise(
error_expected=error_expected, exception_cls=dg.DagsterInvalidDefinitionError
):
defs = load_definitions_from_module(module_fake)
obj_ids = {id(obj) for obj in all_loadable_objects_from_defs(defs)}
expected_obj_ids = {id(obj) for obj in objects.values()}
assert len(obj_ids) == len(expected_obj_ids)


def test_load_with_resources() -> None:
@dg.resource
def my_resource(): ...

module_fake = build_module_fake("foo", {"my_resource": my_resource})
defs = load_definitions_from_module(module_fake)
assert len(all_loadable_objects_from_defs(defs)) == 0
assert len(defs.resources or {}) == 0
defs = load_definitions_from_module(module_fake, resources={"foo": my_resource})
assert len(defs.resources or {}) == 1


def test_load_with_logger_defs() -> None:
@dg.logger(config_schema={})
def my_logger(init_context) -> logging.Logger: ...

module_fake = build_module_fake("foo", {"my_logger": my_logger})
defs = load_definitions_from_module(module_fake)
assert len(all_loadable_objects_from_defs(defs)) == 0
assert len(defs.resources or {}) == 0
defs = load_definitions_from_module(module_fake, resources={"foo": my_logger})
assert len(defs.resources or {}) == 1


def test_load_with_executor() -> None:
@dg.executor(name="my_executor")
def my_executor(init_context) -> dg.Executor: ...

module_fake = build_module_fake("foo", {"my_executor": my_executor})
defs = load_definitions_from_module(module_fake)
assert len(all_loadable_objects_from_defs(defs)) == 0
assert defs.executor is None
defs = load_definitions_from_module(module_fake, executor=my_executor)
assert (
defs.executor is not None
and cast(dg.ExecutorDefinition, defs.executor).name == "my_executor"
)

0 comments on commit f160acc

Please sign in to comment.