Skip to content

Commit

Permalink
[components] Add cache to dagster-dg (#26517)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Initial implementation of a cache for `dagster-dg`. Currently just
caches the registered components for a code location. Cache key is (code
loc abs path, env hash, type of payload). The environment hash is
calculated over the file metadata of `uv.lock` and the content of
`pkg.lib` (i.e. where local component types can be defined).
 
## How I Tested These Changes

New unit tests + manual testing (running `dg list component-types`
multiple times in the same code loc, observing speedup).
  • Loading branch information
smackesey authored Dec 17, 2024
1 parent 550d473 commit 6edff74
Show file tree
Hide file tree
Showing 10 changed files with 372 additions and 81 deletions.
74 changes: 74 additions & 0 deletions python_modules/libraries/dagster-dg/dagster_dg/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import shutil
import sys
from pathlib import Path
from typing import Final, Literal, Optional, Tuple

from typing_extensions import Self, TypeAlias

from dagster_dg.config import DgConfig

_CACHE_CONTAINER_DIR_NAME: Final = "dg-cache"

CachableDataType: TypeAlias = Literal["component_registry_data"]


def get_default_cache_dir() -> Path:
if sys.platform == "win32":
return Path.home() / "AppData" / "dg" / "cache"
elif sys.platform == "darwin":
return Path.home() / "Library" / "Caches" / "dg"
else:
return Path.home() / ".cache" / "dg"


class DgCache:
@classmethod
def from_default(cls) -> Self:
return cls.from_parent_path(get_default_cache_dir())

@classmethod
def from_config(cls, config: DgConfig) -> Self:
return cls.from_parent_path(
parent_path=config.cache_dir,
logging_enabled=config.verbose,
)

# This is the preferred constructor to use when creating a cache. It ensures that all data is
# stored inside an additional container directory inside the user-specified cache directory.
# When we clear the cache, we only delete this container directory. This is to avoid accidents
# when the user mistakenly specifies a cache directory that contains other data.
@classmethod
def from_parent_path(cls, parent_path: Path, logging_enabled: bool = False) -> Self:
root_path = parent_path / _CACHE_CONTAINER_DIR_NAME
return cls(root_path, logging_enabled)

def __init__(self, root_path: Path, logging_enabled: bool):
self._root_path = root_path
self._root_path.mkdir(parents=True, exist_ok=True)
self._logging_enabled = logging_enabled

def clear(self) -> None:
shutil.rmtree(self._root_path)
self.log(f"CACHE [clear]: {self._root_path}")

def get(self, key: Tuple[str, ...]) -> Optional[str]:
path = self._get_path(key)
if path.exists():
self.log(f"CACHE [hit]: {path}")
return path.read_text()
else:
self.log(f"CACHE [miss]: {path}")
return None

def set(self, key: Tuple[str, ...], value: str) -> None:
path = self._get_path(key)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(value)
self.log(f"CACHE [write]: {path}")

def _get_path(self, key: Tuple[str, ...]) -> Path:
return Path(self._root_path, *key)

def log(self, message: str) -> None:
if self._logging_enabled:
print(message) # noqa: T201
59 changes: 52 additions & 7 deletions python_modules/libraries/dagster-dg/dagster_dg/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from pathlib import Path

import click

from dagster_dg.cache import DgCache
from dagster_dg.cli.generate import generate_cli
from dagster_dg.cli.info import info_cli
from dagster_dg.cli.list import list_cli
Expand All @@ -14,27 +17,69 @@ def create_dg_cli():
"list": list_cli,
}

# Defaults are defined on the DgConfig object.
@click.group(
commands=commands,
context_settings={"max_content_width": 120, "help_option_names": ["-h", "--help"]},
invoke_without_command=True,
)
@click.version_option(__version__, "--version", "-v")
@click.option(
"--builtin-component-lib",
type=str,
default=DgConfig.builtin_component_lib,
help="Specify a builitin component library to use.",
)
@click.option(
"--verbose",
is_flag=True,
default=DgConfig.verbose,
help="Enable verbose output for debugging.",
)
@click.option(
"--disable-cache",
is_flag=True,
default=DgConfig.disable_cache,
help="Disable caching of component registry data.",
)
@click.option(
"--clear-cache",
is_flag=True,
help="Clear the cache before running the command.",
default=False,
)
@click.option(
"--cache-dir",
type=Path,
default=DgConfig.cache_dir,
help="Specify a directory to use for the cache.",
)
@click.version_option(__version__, "--version", "-v")
@click.pass_context
def group(context: click.Context, builtin_component_lib: str):
def group(
context: click.Context,
builtin_component_lib: str,
verbose: bool,
disable_cache: bool,
cache_dir: Path,
clear_cache: bool,
):
"""CLI tools for working with Dagster components."""
context.ensure_object(dict)
set_config_on_cli_context(
context,
DgConfig(
builtin_component_lib=builtin_component_lib,
),
config = DgConfig(
builtin_component_lib=builtin_component_lib,
verbose=verbose,
disable_cache=disable_cache,
cache_dir=cache_dir,
)
if clear_cache:
DgCache.from_config(config).clear()
if context.invoked_subcommand is None:
context.exit(0)
elif context.invoked_subcommand is None:
click.echo(context.get_help())
context.exit(0)

set_config_on_cli_context(context, config)

return group

Expand Down
21 changes: 21 additions & 0 deletions python_modules/libraries/dagster-dg/dagster_dg/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import sys
from dataclasses import dataclass
from pathlib import Path

import click
from typing_extensions import Self
Expand All @@ -8,14 +10,33 @@
DEFAULT_BUILTIN_COMPONENT_LIB = "dagster_components"


def _get_default_cache_dir() -> Path:
if sys.platform == "win32":
return Path.home() / "AppData" / "dg" / "cache"
elif sys.platform == "darwin":
return Path.home() / "Library" / "Caches" / "dg"
else:
return Path.home() / ".cache" / "dg"


DEFAULT_CACHE_DIR = _get_default_cache_dir()


@dataclass
class DgConfig:
"""Global configuration for Dg.
Attributes:
disable_cache (bool): If True, disable caching. Defaults to False.
cache_dir (Optional[str]): The directory to use for caching. If None, the default cache will
be used.
verbose (bool): If True, log debug information.
builitin_component_lib (str): The name of the builtin component library to load.
"""

disable_cache: bool = False
cache_dir: Path = DEFAULT_CACHE_DIR
verbose: bool = False
builtin_component_lib: str = DEFAULT_BUILTIN_COMPONENT_LIB

@classmethod
Expand Down
58 changes: 49 additions & 9 deletions python_modules/libraries/dagster-dg/dagster_dg/context.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import hashlib
import json
import os
from dataclasses import dataclass
Expand All @@ -8,10 +9,15 @@
import tomli
from typing_extensions import Self

from dagster_dg.cache import CachableDataType, DgCache
from dagster_dg.component import RemoteComponentRegistry, RemoteComponentType
from dagster_dg.config import DgConfig
from dagster_dg.error import DgError
from dagster_dg.utils import execute_code_location_command
from dagster_dg.utils import (
execute_code_location_command,
hash_directory_metadata,
hash_file_metadata,
)


def is_inside_deployment_directory(path: Path) -> bool:
Expand Down Expand Up @@ -64,21 +70,23 @@ def _is_code_location_root_directory(path: Path) -> bool:
_DEPLOYMENT_CODE_LOCATIONS_DIR: Final = "code_locations"

# Code location
_CODE_LOCATION_CUSTOM_COMPONENTS_DIR: Final = "lib"
_CODE_LOCATION_COMPONENTS_LIB_DIR: Final = "lib"
_CODE_LOCATION_COMPONENT_INSTANCES_DIR: Final = "components"


@dataclass
class DgContext:
config: DgConfig
cache: Optional[DgCache] = None

@classmethod
def from_cli_context(cls, cli_context: click.Context) -> Self:
return cls.from_config(config=DgConfig.from_cli_context(cli_context))

@classmethod
def from_config(cls, config: DgConfig) -> Self:
return cls(config=config)
cache = None if config.disable_cache else DgCache.from_config(config)
return cls(config=config, cache=cache)

@classmethod
def default(cls) -> Self:
Expand All @@ -105,6 +113,27 @@ def get_code_location_names(self) -> Iterable[str]:
return [loc.name for loc in sorted((self.root_path / "code_locations").iterdir())]


def get_code_location_env_hash(code_location_root_path: Path) -> str:
uv_lock_path = code_location_root_path / "uv.lock"
if not uv_lock_path.exists():
raise DgError(f"uv.lock file not found in {code_location_root_path}")
local_components_path = (
code_location_root_path / code_location_root_path.name / _CODE_LOCATION_COMPONENTS_LIB_DIR
)
if not local_components_path.exists():
raise DgError(f"Local components directory not found in {code_location_root_path}")
hasher = hashlib.md5()
hash_file_metadata(hasher, uv_lock_path)
hash_directory_metadata(hasher, local_components_path)
return hasher.hexdigest()


def make_cache_key(code_location_path: Path, data_type: CachableDataType) -> Tuple[str, str, str]:
path_parts = [str(part) for part in code_location_path.parts if part != "/"]
env_hash = get_code_location_env_hash(code_location_path)
return ("_".join(path_parts), env_hash, data_type)


@dataclass
class CodeLocationDirectoryContext:
"""Class encapsulating contextual information about a components code location directory.
Expand All @@ -127,10 +156,21 @@ class CodeLocationDirectoryContext:
@classmethod
def from_path(cls, path: Path, dg_context: DgContext) -> Self:
root_path = _resolve_code_location_root_directory(path)
component_registry_data = execute_code_location_command(
root_path, ["list", "component-types"], dg_context
)
component_registry = RemoteComponentRegistry.from_dict(json.loads(component_registry_data))

cache = dg_context.cache
if cache:
cache_key = make_cache_key(root_path, "component_registry_data")

raw_registry_data = cache.get(cache_key) if cache else None
if not raw_registry_data:
raw_registry_data = execute_code_location_command(
root_path, ["list", "component-types"], dg_context
)
if cache:
cache.set(cache_key, raw_registry_data)

registry_data = json.loads(raw_registry_data)
component_registry = RemoteComponentRegistry.from_dict(registry_data)

return cls(
root_path=root_path,
Expand All @@ -148,11 +188,11 @@ def config(self) -> DgConfig:

@property
def local_component_types_root_path(self) -> str:
return os.path.join(self.root_path, self.name, _CODE_LOCATION_CUSTOM_COMPONENTS_DIR)
return os.path.join(self.root_path, self.name, _CODE_LOCATION_COMPONENTS_LIB_DIR)

@property
def local_component_types_root_module_name(self) -> str:
return f"{self.name}.{_CODE_LOCATION_CUSTOM_COMPONENTS_DIR}"
return f"{self.name}.{_CODE_LOCATION_COMPONENTS_LIB_DIR}"

def iter_component_types(self) -> Iterable[Tuple[str, RemoteComponentType]]:
for key in sorted(self.component_registry.keys()):
Expand Down
23 changes: 23 additions & 0 deletions python_modules/libraries/dagster-dg/dagster_dg/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,20 @@
import re
import subprocess
import sys
from fnmatch import fnmatch
from pathlib import Path
from typing import TYPE_CHECKING, Any, Final, Iterator, List, Mapping, Optional, Sequence, Union

import click
import jinja2
from typing_extensions import TypeAlias

from dagster_dg.version import __version__ as dagster_version

# There is some weirdness concerning the availabilty of hashlib.HASH between different Python
# versions, so for nowe we avoid trying to import it and just alias the type to Any.
Hash: TypeAlias = Any

if TYPE_CHECKING:
from dagster_dg.context import DgContext

Expand Down Expand Up @@ -84,6 +90,7 @@ def snakecase(string: str) -> str:
"__pycache__",
".pytest_cache",
"*.egg-info",
"*.cpython-*",
".DS_Store",
".ruff_cache",
"tox.ini",
Expand Down Expand Up @@ -188,3 +195,19 @@ def ensure_dagster_dg_tests_import() -> None:
dagster_dg_package_root / "dagster_dg_tests"
).exists(), "Could not find dagster_dg_tests where expected"
sys.path.append(dagster_dg_package_root.as_posix())


def hash_directory_metadata(hasher: Hash, path: Union[str, Path]) -> None:
for root, dirs, files in os.walk(path):
for name in dirs + files:
if any(fnmatch(name, pattern) for pattern in _DEFAULT_EXCLUDES):
continue
filepath = os.path.join(root, name)
hash_file_metadata(hasher, filepath)


def hash_file_metadata(hasher: Hash, path: Union[str, Path]) -> None:
stat = os.stat(path=path)
hasher.update(str(path).encode())
hasher.update(str(stat.st_mtime).encode()) # Last modified time
hasher.update(str(stat.st_size).encode()) # File size
Loading

0 comments on commit 6edff74

Please sign in to comment.