Skip to content

Commit

Permalink
[blueprints] Add CLI command to generate JSON schemas (#22479)
Browse files Browse the repository at this point in the history
## Summary

Implements a command which loads `YamlBlueprintsLoader` objects from a
provided module (or the current Dagster project, via `pyproject.toml`),
picks a specific loader, and outputs its schema to stdout (optionally,
formatted).

## Test Plan

Unit test.
  • Loading branch information
benpankow authored Jun 11, 2024
1 parent 15b4525 commit 7bf3597
Show file tree
Hide file tree
Showing 12 changed files with 237 additions and 2 deletions.
120 changes: 120 additions & 0 deletions examples/experimental/dagster-blueprints/dagster_blueprints/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import json
import sys
from importlib import import_module
from typing import List, Mapping, Optional

import click
from dagster import _check as check
from dagster._cli.workspace.cli_target import has_pyproject_dagster_block
from dagster._core.remote_representation.origin import ManagedGrpcPythonEnvCodeLocationOrigin
from dagster._core.workspace.load_target import PyProjectFileTarget
from dagster._utils.warnings import disable_dagster_warnings

from dagster_blueprints.load_from_yaml import YamlBlueprintsLoader

from .version import __version__


def get_python_modules_from_pyproject(pyproject_path: str) -> List[str]:
"""Utility to get the Python modules from a `pyproject.toml` file."""
origins = PyProjectFileTarget(pyproject_path).create_origins()

modules = []
for origin in origins:
if isinstance(origin, ManagedGrpcPythonEnvCodeLocationOrigin):
module = origin.loadable_target_origin.module_name
if module:
modules.append(module)
return modules


@click.command(
help="Generates JSON schema files for Blueprint types specified by YamlBlueprintsLoader objects."
)
@click.option(
"--loader-module",
type=click.STRING,
help="Path of Python module that contains YamlBlueprintsLoader objects. Defaults to Dagster project module, if `pyproject.toml` exists.",
)
@click.option(
"--loader-name",
type=click.STRING,
help="Name of the YamlBlueprintsLoader object to generate a schema for. Required if the specified module contains multiple loaders.",
)
@click.option(
"--pretty",
"-p",
is_flag=True,
help="Whether to pretty-print the generated schema.",
)
def generate_schema(
loader_module: Optional[str] = None, loader_name: Optional[str] = None, pretty: bool = False
) -> None:
loaders: Mapping[str, YamlBlueprintsLoader] = load_blueprints_loaders_from_module_path_or_infer(
loader_module
)

check.invariant(
len(loaders) > 0, "No YamlBlueprintsLoader objects found in the provided module."
)
check.invariant(
loader_name or len(loaders) == 1,
"Must provide a loader name since the specified module contains multiple lodaers.",
)

check.invariant(
loader_name is None or loader_name in loaders,
f"Loader name {loader_name} not found in the provided module.",
)

loader = loaders[loader_name] if loader_name else next(iter(loaders.values()))
click.echo(json.dumps(loader.model_json_schema(), indent=2 if pretty else None))


def load_blueprints_loaders_from_module_path_or_infer(
module_path: Optional[str],
) -> Mapping[str, YamlBlueprintsLoader]:
"""Loads YamlBlueprintsLoader objects from the provided module path, or infers the module path from the current
directory's `pyproject.toml` file. If no module path is provided and no `pyproject.toml` file is found, raises an
error.
"""
with disable_dagster_warnings():
if module_path:
return load_blueprints_loaders_from_module_path(module_path)
else:
check.invariant(
has_pyproject_dagster_block("pyproject.toml"),
"No `pyproject.toml` found in the current directory, or no `tool.dagster` block found in `pyproject.toml`.",
)
return {
loader_name: loader
for module in get_python_modules_from_pyproject("pyproject.toml")
for loader_name, loader in load_blueprints_loaders_from_module_path(module).items()
}


def load_blueprints_loaders_from_module_path(
module_path: str,
) -> Mapping[str, YamlBlueprintsLoader]:
sys.path.append(".")

module = import_module(module_path)

out = {}
for attr in dir(module):
value = getattr(module, attr)
if isinstance(value, YamlBlueprintsLoader):
out = {**out, attr: value}
return out


def main():
@click.group(
commands=[generate_schema],
context_settings={"max_content_width": 120, "help_option_names": ["-h", "--help"]},
)
@click.version_option(__version__, "--version", "-v")
def group():
"""CLI tools for working with Dagster Blueprints."""

return group()
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"name": "dagster-blueprint-schema",
"publisher": "dagster",
"displayName": "Dagster Blueprint Schema",
"description": "Provides schema validation for Dagster Blueprint YAML files",
"version": "0.0.1",
"engines": {
"vscode": "^1.89.0"
},
"extensionDependencies": ["redhat.vscode-yaml"],
"categories": ["Programming Languages", "Data Science"],
"contributes": {}
}
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.vscode
blueprint_project/blueprints/dagster.autogenerated.schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from pathlib import Path

from dagster import asset
from dagster_blueprints.blueprint import Blueprint, BlueprintDefinitions
from dagster_blueprints.load_from_yaml import YamlBlueprintsLoader


class SimpleAssetBlueprint(Blueprint):
key: str

def build_defs(self) -> BlueprintDefinitions:
@asset(key=self.key)
def blueprint_asset(): ...

return BlueprintDefinitions(assets=[blueprint_asset])


loader = YamlBlueprintsLoader(
path=Path(__file__).parent / "blueprints", per_file_blueprint_type=SimpleAssetBlueprint
)
other_loader = YamlBlueprintsLoader(
path=Path(__file__).parent / "other_blueprints", per_file_blueprint_type=SimpleAssetBlueprint
)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
key: foo
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"title": "ParsingModel[SimpleAssetBlueprint]",
"$ref": "#/definitions/SimpleAssetBlueprint",
"definitions": {
"SimpleAssetBlueprint": {
"title": "SimpleAssetBlueprint",
"description": "A blob of user-provided, structured metadata that specifies a set of Dagster definitions,\nlike assets, jobs, schedules, sensors, resources, or asset checks.\n\nBase class for user-provided types. Users override and provide:\n- The set of fields\n- A build_defs implementation that generates Dagster Definitions from field values",
"type": "object",
"properties": {
"key": {
"title": "Key",
"type": "string"
}
},
"required": [
"key"
],
"additionalProperties": false
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
key: bar
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"

[tool.dagster]
module_name = "blueprint_project"
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import os
from pathlib import Path

import pytest
from click.testing import CliRunner


def test_locate_from_pyproject_yaml() -> None:
from dagster_blueprints.cli import get_python_modules_from_pyproject

sample_pyproject_path = Path(__file__).parent / "sample_blueprint_project" / "pyproject.toml"
assert get_python_modules_from_pyproject(os.fspath(sample_pyproject_path)) == [
"blueprint_project",
]


@pytest.mark.parametrize("explicit_loader_module", [True, False])
def test_generate_single_schema(explicit_loader_module: bool) -> None:
from dagster_blueprints.cli import generate_schema

sample_project_path = Path(__file__).parent / "sample_blueprint_project"

expected_schema_filepaths = [
(sample_project_path / "blueprint_project" / folder / "dagster.autogenerated.schema.json")
for folder in ("blueprints", "other_blueprints")
]
for expected_schema_filepath in expected_schema_filepaths:
if expected_schema_filepath.exists():
os.remove(expected_schema_filepath)

os.chdir(sample_project_path)

args = []
if explicit_loader_module:
args.extend(["--loader-module", "blueprint_project"])
out = CliRunner().invoke(generate_schema, args)

# file has two loaders, must explicitly specify
assert out.exit_code == 1

args.extend(["--loader-name", "loader"])
out = CliRunner().invoke(generate_schema, args)
assert '"SimpleAssetBlueprint"' in out.output
5 changes: 5 additions & 0 deletions examples/experimental/dagster-blueprints/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,9 @@ def get_version() -> str:
f"dagster-databricks{pin}",
],
zip_safe=False,
entry_points={
"console_scripts": [
"dagster-blueprints = dagster_blueprints.cli:main",
]
},
)
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/_cli/workspace/cli_target.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def are_all_keys_empty(kwargs: ClickArgMapping, keys: Iterable[str]) -> bool:
)


def _has_pyproject_dagster_block(path: str) -> bool:
def has_pyproject_dagster_block(path: str) -> bool:
if not os.path.exists(path):
return False
with open(path, "rb") as f:
Expand All @@ -116,7 +116,7 @@ def get_workspace_load_target(kwargs: ClickArgMapping) -> WorkspaceLoadTarget:
if are_all_keys_empty(kwargs, WORKSPACE_CLI_ARGS):
if kwargs.get("empty_workspace"):
return EmptyWorkspaceTarget()
if _has_pyproject_dagster_block("pyproject.toml"):
if has_pyproject_dagster_block("pyproject.toml"):
return PyProjectFileTarget("pyproject.toml")

if os.path.exists("workspace.yaml"):
Expand Down

0 comments on commit 7bf3597

Please sign in to comment.