Skip to content

Commit

Permalink
RFC: Don't require an @repository to be defined when there is only on…
Browse files Browse the repository at this point in the history
…e repository - gather all code artifacts and package them together instead

Summary:
Inspired by https://threads.com/34418826592 from nick - if we are planning to enforce that each process has a single repository, and are generally planning to de-emphasize the repository as a core organization thing in Dagit, we have the option of not requiring you to write an @repository at all. Instead we could bring in each of the code artifacts in the file/module and package them up into one for you. This might be a bit more familiar to people coming from airflow's auto-geneated DAG approach too.

Whipped this up to explore that direction.

One big thing we would need to figure out is caching - the existing code loading paths rely on the fact that there's a single RepositoryDefinition in code to apply some caching, so creating a new one each time the code is loaded may be undesirable.
  • Loading branch information
gibsondan committed Feb 28, 2022
1 parent 779e27f commit 14b3890
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 99 deletions.
69 changes: 49 additions & 20 deletions python_modules/dagster/dagster/core/code_pointer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,21 @@ def describe(self):
@staticmethod
def from_module(module_name, definition, working_directory):
check.str_param(module_name, "module_name")
check.str_param(definition, "definition")
check.opt_str_param(definition, "definition")
check.opt_str_param(working_directory, "working_directory")
return ModuleCodePointer(module_name, definition, working_directory)

@staticmethod
def from_python_package(module_name, attribute, working_directory):
check.str_param(module_name, "module_name")
check.str_param(attribute, "attribute")
check.opt_str_param(attribute, "attribute")
check.opt_str_param(working_directory, "working_directory")
return PackageCodePointer(module_name, attribute, working_directory)

@staticmethod
def from_python_file(python_file, definition, working_directory):
check.str_param(python_file, "python_file")
check.str_param(definition, "definition")
check.opt_str_param(definition, "definition")
check.opt_str_param(working_directory, "working_directory")
return FileCodePointer(
python_file=python_file, fn_name=definition, working_directory=working_directory
Expand Down Expand Up @@ -149,20 +149,28 @@ def load_python_module(module_name, working_directory, remove_from_path_fn=None)
class FileCodePointer(
NamedTuple(
"_FileCodePointer",
[("python_file", str), ("fn_name", str), ("working_directory", Optional[str])],
[("python_file", str), ("fn_name", Optional[str]), ("working_directory", Optional[str])],
),
CodePointer,
):
def __new__(cls, python_file: str, fn_name: str, working_directory: Optional[str] = None):
def __new__(
cls, python_file: str, fn_name: Optional[str], working_directory: Optional[str] = None
):
return super(FileCodePointer, cls).__new__(
cls,
check.str_param(python_file, "python_file"),
check.str_param(fn_name, "fn_name"),
check.opt_str_param(fn_name, "fn_name"),
check.opt_str_param(working_directory, "working_directory"),
)

def load_target(self):
module = load_python_file(self.python_file, self.working_directory)

from .workspace.autodiscovery import create_ephemeral_repository

if not self.fn_name:
return create_ephemeral_repository(module)

if not hasattr(module, self.fn_name):
raise DagsterInvariantViolationError(
"{name} not found at module scope in file {file}.".format(
Expand All @@ -173,33 +181,38 @@ def load_target(self):
return getattr(module, self.fn_name)

def describe(self):
if self.working_directory:
return "{self.python_file}::{self.fn_name} -- [dir {self.working_directory}]".format(
self=self
)
else:
return "{self.python_file}::{self.fn_name}".format(self=self)

file_str = f"{self.python_file}::{self.fn_name}" if self.fn_name else self.python_file

return (
f"{file_str} -- [dir {self.working_directory}]" if self.working_directory else file_str
)


@whitelist_for_serdes
class ModuleCodePointer(
NamedTuple(
"_ModuleCodePointer",
[("module", str), ("fn_name", str), ("working_directory", Optional[str])],
[("module", str), ("fn_name", Optional[str]), ("working_directory", Optional[str])],
),
CodePointer,
):
def __new__(cls, module: str, fn_name: str, working_directory: Optional[str] = None):
def __new__(cls, module: str, fn_name: Optional[str], working_directory: Optional[str] = None):
return super(ModuleCodePointer, cls).__new__(
cls,
check.str_param(module, "module"),
check.str_param(fn_name, "fn_name"),
check.opt_str_param(fn_name, "fn_name"),
check.opt_str_param(working_directory, "working_directory"),
)

def load_target(self):
module = load_python_module(self.module, self.working_directory)

from .workspace.autodiscovery import create_ephemeral_repository

if not self.fn_name:
return create_ephemeral_repository(module)

if not hasattr(module, self.fn_name):
raise DagsterInvariantViolationError(
"{name} not found in module {module}. dir: {dir}".format(
Expand All @@ -209,28 +222,40 @@ def load_target(self):
return getattr(module, self.fn_name)

def describe(self):
return "from {self.module} import {self.fn_name}".format(self=self)
return (
"from {self.module} import {self.fn_name}".format(self=self)
if self.fn_name
else "import {self.module}"
)


@whitelist_for_serdes
class PackageCodePointer(
NamedTuple(
"_PackageCodePointer",
[("module", str), ("attribute", str), ("working_directory", Optional[str])],
[("module", str), ("attribute", Optional[str]), ("working_directory", Optional[str])],
),
CodePointer,
):
def __new__(cls, module: str, attribute: str, working_directory: Optional[str] = None):
def __new__(
cls, module: str, attribute: Optional[str], working_directory: Optional[str] = None
):
return super(PackageCodePointer, cls).__new__(
cls,
check.str_param(module, "module"),
check.str_param(attribute, "attribute"),
check.opt_str_param(attribute, "attribute"),
check.opt_str_param(working_directory, "working_directory"),
)

def load_target(self):
module = load_python_module(self.module, self.working_directory)

from .workspace.autodiscovery import create_ephemeral_repository

if not self.attribute:
# Get all the members from the module
return create_ephemeral_repository(module)

if not hasattr(module, self.attribute):
raise DagsterInvariantViolationError(
"{name} not found in module {module}. dir: {dir}".format(
Expand All @@ -240,7 +265,11 @@ def load_target(self):
return getattr(module, self.attribute)

def describe(self):
return "from {self.module} import {self.attribute}".format(self=self)
return (
"from {self.module} import {self.attribute}".format(self=self)
if self.attribute
else "import {self.module}"
)


def get_python_file_from_target(target):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,6 @@ def repository_def_from_target_def(target):

# special case - we can wrap a single pipeline in a repository
if isinstance(target, (PipelineDefinition, GraphDefinition)):
# consider including pipeline name in generated repo name
return RepositoryDefinition(
name=get_ephemeral_repository_name(target.name),
repository_data=CachingRepositoryData.from_list([target]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ def from_list(cls, repository_definitions):
schedules = {}
sensors = {}
source_assets = {}
asset_group = None
for definition in repository_definitions:
if isinstance(definition, PipelineDefinition):
if (
Expand Down Expand Up @@ -626,6 +627,11 @@ def from_list(cls, repository_definitions):
pipelines_or_jobs[coerced.name] = coerced

elif isinstance(definition, AssetGroup):
if asset_group:
raise DagsterInvalidDefinitionError(
"Repository can only include one AssetGroup"
)

asset_group = definition
pipelines_or_jobs[asset_group.all_assets_job_name] = build_assets_job(
asset_group.all_assets_job_name,
Expand Down
95 changes: 49 additions & 46 deletions python_modules/dagster/dagster/core/workspace/autodiscovery.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,31 @@
import inspect
from collections import namedtuple
from typing import NamedTuple

from dagster import (
DagsterInvariantViolationError,
GraphDefinition,
JobDefinition,
PartitionSetDefinition,
PipelineDefinition,
RepositoryDefinition,
ScheduleDefinition,
SensorDefinition,
check,
)
from dagster.core.asset_defs import AssetGroup
from dagster.core.code_pointer import load_python_file, load_python_module

LoadableTarget = namedtuple("LoadableTarget", "attribute target_definition")


class EphemeralRepositoryTarget(NamedTuple("EphemeralRepositoryTarget", [])):
pass


def loadable_targets_from_python_file(python_file, working_directory=None):
loaded_module = load_python_file(python_file, working_directory)
return loadable_targets_from_loaded_module(loaded_module)
return _loadable_targets_from_loaded_module(loaded_module)


def loadable_targets_from_python_module(module_name, working_directory, remove_from_path_fn=None):
Expand All @@ -25,79 +34,59 @@ def loadable_targets_from_python_module(module_name, working_directory, remove_f
working_directory=working_directory,
remove_from_path_fn=remove_from_path_fn,
)
return loadable_targets_from_loaded_module(module)
return _loadable_targets_from_loaded_module(module)


def loadable_targets_from_python_package(package_name, working_directory, remove_from_path_fn=None):
module = load_python_module(
package_name, working_directory, remove_from_path_fn=remove_from_path_fn
)
return loadable_targets_from_loaded_module(module)
return _loadable_targets_from_loaded_module(module)


def loadable_targets_from_loaded_module(module):
def _loadable_targets_from_loaded_module(module):
loadable_repos = _loadable_targets_of_type(module, RepositoryDefinition)
if loadable_repos:
return loadable_repos

# Back-compat for ephemeral single-pipeline case
loadable_pipelines = _loadable_targets_of_type(module, PipelineDefinition)
loadable_jobs = _loadable_targets_of_type(module, JobDefinition)

if len(loadable_pipelines) == 1:
return loadable_pipelines

elif len(loadable_pipelines) > 1:
target_type = "job" if len(loadable_jobs) > 1 else "pipeline"
raise DagsterInvariantViolationError(
(
'No repository and more than one {target_type} found in "{module_name}". If you load '
"a file or module directly it must have only one {target_type} "
"in scope. Found {target_type}s defined in variables or decorated "
"functions: {pipeline_symbols}."
).format(
module_name=module.__name__,
pipeline_symbols=repr([p.attribute for p in loadable_pipelines]),
target_type=target_type,
)
)

# Back-compat for ephemeral single-graph case
loadable_graphs = _loadable_targets_of_type(module, GraphDefinition)

if len(loadable_graphs) == 1:
return loadable_graphs

elif len(loadable_graphs) > 1:
raise DagsterInvariantViolationError(
(
'No repository, job, or pipeline, and more than one graph found in "{module_name}". '
"If you load a file or module directly it must either have one repository, one "
"job, one pipeline, or one graph in scope. Found graphs defined in variables or "
"decorated functions: {graph_symbols}."
).format(
module_name=module.__name__,
graph_symbols=repr([g.attribute for g in loadable_graphs]),
)
)

# Back-compat for ephemeral single-asset-group case
loadable_asset_groups = _loadable_targets_of_type(module, AssetGroup)
if len(loadable_asset_groups) == 1:
return loadable_asset_groups

elif len(loadable_asset_groups) > 1:
var_names = repr([a.attribute for a in loadable_asset_groups])
loadable_targets = _get_ephemeral_repository_loadable_targets(module)

if len(loadable_targets) == 0:
raise DagsterInvariantViolationError(
(
f'More than one asset collection found in "{module.__name__}". '
"If you load a file or module directly it must either have one repository, one "
"job, one pipeline, one graph, or one asset collection scope. Found asset "
f"collections defined in variables: {var_names}."
'No jobs, pipelines, asset collections, or repositories found in "{}".'.format(
module.__name__
)
)

raise DagsterInvariantViolationError(
'No jobs, pipelines, graphs, asset collections, or repositories found in "{}".'.format(
module.__name__
)
return [EphemeralRepositoryTarget()]


def _get_ephemeral_repository_loadable_targets(module):
return _loadable_targets_of_type(
module,
(
PipelineDefinition,
JobDefinition,
PartitionSetDefinition,
ScheduleDefinition,
SensorDefinition,
AssetGroup,
),
)


Expand All @@ -108,3 +97,17 @@ def _loadable_targets_of_type(module, klass):
loadable_targets.append(LoadableTarget(name, value))

return loadable_targets


def create_ephemeral_repository(module):
from dagster.core.definitions.repository_definition import CachingRepositoryData

# What about caching?????
targets = _get_ephemeral_repository_loadable_targets(module)
check.invariant(len(targets) > 0, "must have at least one code artifact in a repository")
return RepositoryDefinition(
name="__repository__",
repository_data=CachingRepositoryData.from_list(
[target.target_definition for target in targets]
),
)
Loading

0 comments on commit 14b3890

Please sign in to comment.