Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Catalog to config #4323

Merged
merged 88 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from 79 commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
ae5384a
Captured init arguments
ElenaKhaustova Nov 7, 2024
fcdf357
Implemented unresoloving credentials
ElenaKhaustova Nov 7, 2024
b147374
Added some comments
ElenaKhaustova Nov 7, 2024
0ed0c1e
Put type in first place for dataset config
ElenaKhaustova Nov 7, 2024
3c839a9
Handled version key
ElenaKhaustova Nov 7, 2024
29bb714
Added lazy dataset to_config
ElenaKhaustova Nov 8, 2024
49858b6
Removed data key from MemoryDataset
ElenaKhaustova Nov 8, 2024
0d0ba91
Added TODOs
ElenaKhaustova Nov 8, 2024
8413f58
Saved call args
ElenaKhaustova Nov 11, 2024
a89db7e
Saved only set credentials
ElenaKhaustova Nov 11, 2024
b081c65
Processed CachedDataset case
ElenaKhaustova Nov 11, 2024
18c0ad6
Updated TODOs
ElenaKhaustova Nov 12, 2024
2751ea8
Tested with PartitionedDataset
ElenaKhaustova Nov 12, 2024
fc576ff
Popped metadata
ElenaKhaustova Nov 12, 2024
1d6454c
Fixed versioning when load
ElenaKhaustova Nov 12, 2024
8c31237
Fixed linter
ElenaKhaustova Nov 12, 2024
e035881
Tested datasets factories
ElenaKhaustova Nov 13, 2024
edcdc38
Tested transcoding
ElenaKhaustova Nov 13, 2024
15a1e72
Removed TODOs
ElenaKhaustova Nov 13, 2024
d4e4534
Removed debug output
ElenaKhaustova Nov 13, 2024
e7e8af5
Removed debug output
ElenaKhaustova Nov 13, 2024
1b6be8e
Added logic to set VERSIONED_FLAG_KEY
ElenaKhaustova Nov 14, 2024
db77436
Merge branch 'main' into feature/3932-catalog-from-to-prototype
ElenaKhaustova Nov 14, 2024
c6dc380
Updated version set up
ElenaKhaustova Nov 14, 2024
54b0793
Added TODO for versioning
ElenaKhaustova Nov 14, 2024
f183e60
Added tests for unresolve_config_credentials
ElenaKhaustova Nov 14, 2024
0d9d241
Implemented test_to_config
ElenaKhaustova Nov 14, 2024
763e635
Added test with MemoryDataset
ElenaKhaustova Nov 14, 2024
8795dd6
Extended test examples
ElenaKhaustova Nov 14, 2024
e3289b4
Materialized cached_ds
ElenaKhaustova Nov 14, 2024
5d93a41
Merge branch 'main' into feature/3932-catalog-from-to-prototype
ElenaKhaustova Nov 14, 2024
59b603e
Merge branch 'main' into feature/3932-catalog-from-to-prototype
ElenaKhaustova Nov 15, 2024
ae62886
Exclude parameters
ElenaKhaustova Nov 18, 2024
b2ebfe2
Fixed import
ElenaKhaustova Nov 18, 2024
a07107a
Added test with parameters
ElenaKhaustova Nov 18, 2024
5dc4abf
Merge branch 'main' into feature/3932-catalog-from-to-prototype
ElenaKhaustova Nov 19, 2024
e5adb5d
Moved tests for CatalogConfigResolver to a separate file
ElenaKhaustova Nov 19, 2024
bdf45a3
Made unresolve_config_credentials staticmethod
ElenaKhaustova Nov 19, 2024
33d6791
Updated comment to clarify meaning
ElenaKhaustova Nov 19, 2024
33ff979
Moved to_config anfter from_config
ElenaKhaustova Nov 19, 2024
7546540
Returned is_parameter for catalog and added TODOs
ElenaKhaustova Nov 19, 2024
c37c04d
Renamed catalog config resolver methods
ElenaKhaustova Nov 20, 2024
591f4a0
Implemented _validate_versions method
ElenaKhaustova Nov 21, 2024
5aaebe6
Added _validate_versions calls
ElenaKhaustova Nov 21, 2024
bdb7cf6
Updated error descriptions
ElenaKhaustova Nov 21, 2024
e2ffeaa
Added validation to the old catalog
ElenaKhaustova Nov 22, 2024
6b1e802
Fixed linter
ElenaKhaustova Nov 22, 2024
06e343b
Implemented unit tests for KedroDataCatalog
ElenaKhaustova Nov 22, 2024
5492b9f
Removed odd comments
ElenaKhaustova Nov 22, 2024
c96546c
Implemented tests for DataCatalog
ElenaKhaustova Nov 22, 2024
46f2df6
Added docstrings
ElenaKhaustova Nov 22, 2024
56a067c
Added release notes
ElenaKhaustova Nov 22, 2024
cbd1d4a
Merge branch 'fix/4327-validate-datasets-versions' into feature/3932-…
ElenaKhaustova Nov 22, 2024
e9027b9
Updated version logic
ElenaKhaustova Nov 22, 2024
11b148b
Added CachedDataset case
ElenaKhaustova Nov 22, 2024
163ca17
Merge branch 'main' into fix/4327-validate-datasets-versions
ElenaKhaustova Nov 27, 2024
ca2ac6c
Updated release notes
ElenaKhaustova Nov 27, 2024
615d135
Added tests for CachedDataset use case
ElenaKhaustova Nov 27, 2024
8dd1084
Merge branch 'fix/4327-validate-datasets-versions' into feature/3932-…
ElenaKhaustova Nov 27, 2024
8a01881
Updated unit test after version validation is applied
ElenaKhaustova Nov 27, 2024
eb44a30
Removed MemoryDatasets
ElenaKhaustova Nov 27, 2024
ba3d04e
Removed _is_parameter
ElenaKhaustova Nov 27, 2024
35953a9
Pop metadata from cached dataset configuration
ElenaKhaustova Nov 27, 2024
d56793b
Fixed lint
ElenaKhaustova Nov 27, 2024
ebf1483
Fixed unit test
ElenaKhaustova Nov 27, 2024
f5468c9
Added docstrings for AbstractDataset.to_config()
ElenaKhaustova Nov 27, 2024
edee597
Updated docstrings
ElenaKhaustova Nov 27, 2024
4454970
Fixed typos
ElenaKhaustova Nov 27, 2024
5d6bd3c
Updated TODOs
ElenaKhaustova Nov 27, 2024
f1ace7c
Merge branch 'main' into fix/4327-validate-datasets-versions
ElenaKhaustova Nov 27, 2024
95fc260
Merge branch 'fix/4327-validate-datasets-versions' into feature/3932-…
ElenaKhaustova Nov 27, 2024
86e25e9
Added docstring for KedroDataCatalog.to_config
ElenaKhaustova Nov 27, 2024
c8fd99e
Added docstrinbgs for unresolve_credentials
ElenaKhaustova Nov 27, 2024
5b2f21f
Updated release notes
ElenaKhaustova Nov 27, 2024
35dc102
Fixed indentation
ElenaKhaustova Nov 27, 2024
2853fda
Fixed to_config() example
ElenaKhaustova Nov 27, 2024
8f0fe4f
Fixed indentation
ElenaKhaustova Nov 27, 2024
0db9b46
Fixed indentation
ElenaKhaustova Nov 27, 2024
2f72e23
Added a note about to_config() constraints
ElenaKhaustova Nov 27, 2024
33f29fd
Merge branch 'main' into feature/3932-catalog-from-to-prototype
ElenaKhaustova Nov 28, 2024
a7689b9
Fixed typo
ElenaKhaustova Nov 28, 2024
3c3664e
Replace type string with the constant
ElenaKhaustova Nov 28, 2024
b7183ab
Replace type string with the constant
ElenaKhaustova Nov 28, 2024
171e80f
Moved _is_memory_dataset
ElenaKhaustova Nov 28, 2024
b789018
Simplified nested decorator
ElenaKhaustova Nov 28, 2024
6ba6ee4
Fixed lint
ElenaKhaustova Nov 28, 2024
7008346
Removed _init_args class attribute
ElenaKhaustova Nov 29, 2024
7af15a5
Returned @wraps
ElenaKhaustova Nov 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
# Upcoming Release

## Major features and improvements
* Implemented `KedroDataCatalog.to_config()` method that converts the catalog instance into a configuration format suitable for serialization.

Check warning on line 4 in RELEASE.md

View workflow job for this annotation

GitHub Actions / vale

[vale] RELEASE.md#L4

[Kedro.ukspelling] In general, use UK English spelling instead of 'serialization'.
Raw output
{"message": "[Kedro.ukspelling] In general, use UK English spelling instead of 'serialization'.", "location": {"path": "RELEASE.md", "range": {"start": {"line": 4, "column": 129}}}, "severity": "WARNING"}

## Bug fixes and other changes
* Added validation to ensure dataset versions consistency across catalog.

## Breaking changes to the API
## Documentation changes
## Community contributions
Expand Down
1 change: 1 addition & 0 deletions kedro/framework/cli/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def _create_session(package_name: str, **kwargs: Any) -> KedroSession:


def is_parameter(dataset_name: str) -> bool:
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved
# TODO: when breaking change move it to kedro/io/core.py
merelcht marked this conversation as resolved.
Show resolved Hide resolved
"""Check if dataset is a parameter."""
return dataset_name.startswith("params:") or dataset_name == "parameters"

Expand Down
53 changes: 48 additions & 5 deletions kedro/io/catalog_config_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(
self._dataset_patterns, self._default_pattern = self._extract_patterns(
config, credentials
)
self._resolved_configs = self._resolve_config_credentials(config, credentials)
self._resolved_configs = self.resolve_credentials(config, credentials)

@property
def config(self) -> dict[str, dict[str, Any]]:
Expand Down Expand Up @@ -237,8 +237,9 @@ def _extract_patterns(

return sorted_patterns, user_default

def _resolve_config_credentials(
self,
@classmethod
def resolve_credentials(
cls,
config: dict[str, dict[str, Any]] | None,
credentials: dict[str, dict[str, Any]] | None,
) -> dict[str, dict[str, Any]]:
Expand All @@ -254,13 +255,55 @@ def _resolve_config_credentials(
"\nHint: If this catalog entry is intended for variable interpolation, "
"make sure that the key is preceded by an underscore."
)
if not self.is_pattern(ds_name):
resolved_configs[ds_name] = self._resolve_credentials(
if not cls.is_pattern(ds_name):
resolved_configs[ds_name] = cls._resolve_credentials(
ds_config, credentials
)

return resolved_configs

@staticmethod
def unresolve_credentials(
cred_name: str, ds_config: dict[str, dict[str, Any]] | None
) -> tuple[dict[str, dict[str, Any]], dict[str, dict[str, Any]]]:
"""Extracts and replaces credentials in a dataset configuration with
references, ensuring separation of credentials from the dataset configuration.

Credentials are searched for recursively in the dataset configuration.
The first occurrence of the `CREDENTIALS_KEY` is replaced with a generated
reference key.

Args:
cred_name: A unique identifier for the credentials being unresolved.
This is used to generate a reference key for the credentials.
ds_config: The dataset configuration containing potential credentials
under the key `CREDENTIALS_KEY`.

Returns:
A tuple containing:
ds_config_copy : A deep copy of the original dataset
configuration with credentials replaced by reference keys.
credentials: A dictionary mapping generated reference keys to the original credentials.
"""
ds_config_copy = copy.deepcopy(ds_config) or {}
credentials: dict[str, Any] = {}
credentials_ref = f"{cred_name}_{CREDENTIALS_KEY}"

def unresolve(config: Any) -> None:
# We don't expect credentials key appears more than once within the same dataset config,
# So once we found the key first time we unresolve it and stop iterating after
for key, val in config.items():
if key == CREDENTIALS_KEY and config[key]:
credentials[credentials_ref] = config[key]
config[key] = credentials_ref
return
if isinstance(val, dict):
unresolve(val)

unresolve(ds_config_copy)

return ds_config_copy, credentials

def resolve_pattern(self, ds_name: str) -> dict[str, Any]:
"""Resolve dataset patterns and return resolved configurations based on the existing patterns."""
matched_pattern = self.match_pattern(ds_name)
Expand Down
196 changes: 189 additions & 7 deletions kedro/io/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from datetime import datetime, timezone
from functools import partial, wraps
from glob import iglob
from inspect import getcallargs
from operator import attrgetter
from pathlib import Path, PurePath, PurePosixPath
from typing import (
Expand Down Expand Up @@ -57,6 +58,7 @@
"s3a",
"s3n",
)
TYPE_KEY = "type"


class DatasetError(Exception):
Expand All @@ -71,16 +73,16 @@ class DatasetError(Exception):


class DatasetNotFoundError(DatasetError):
"""``DatasetNotFoundError`` raised by ``DataCatalog`` class in case of
trying to use a non-existing dataset.
"""``DatasetNotFoundError`` raised by ```DataCatalog`` and ``KedroDataCatalog``
classes in case of trying to use a non-existing dataset.
"""

pass


class DatasetAlreadyExistsError(DatasetError):
"""``DatasetAlreadyExistsError`` raised by ``DataCatalog`` class in case
of trying to add a dataset which already exists in the ``DataCatalog``.
"""``DatasetAlreadyExistsError`` raised by ```DataCatalog`` and ``KedroDataCatalog``
classes in case of trying to add a dataset which already exists in the ``DataCatalog``.
"""

pass
Expand All @@ -94,6 +96,15 @@ class VersionNotFoundError(DatasetError):
pass


class VersionAlreadyExistsError(DatasetError):
"""``VersionAlreadyExistsError`` raised by ``DataCatalog`` and ``KedroDataCatalog``
classes when attempting to add a dataset to a catalog with a save version
that conflicts with the save version already set for the catalog.
"""

pass


_DI = TypeVar("_DI")
_DO = TypeVar("_DO")

Expand Down Expand Up @@ -148,6 +159,23 @@ class AbstractDataset(abc.ABC, Generic[_DI, _DO]):
need to change the `_EPHEMERAL` attribute to 'True'.
"""
_EPHEMERAL = False
# Declares a class-level attribute that will store the initialization
# arguments of an instance. Initially, it is set to None, but it will
# hold a dictionary of arguments after initialization.
_init_args: dict[str, Any] | None = None
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved

def __post_init__(self, call_args: dict[str, Any]) -> None:
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved
"""Handles additional setup after the object is initialized.

Stores the initialization arguments (excluding `self`) in the `_init_args` attribute.

Args:
call_args: A dictionary of arguments passed to the `__init__` method, captured
using `inspect.getcallargs`.
"""

self._init_args = call_args
self._init_args.pop("self", None)

@classmethod
def from_config(
Expand Down Expand Up @@ -201,6 +229,58 @@ def from_config(
) from err
return dataset

def to_config(self) -> dict[str, Any]:
"""Converts the dataset instance into a dictionary-based configuration for
serialization. Ensures that any subclass-specific details are handled, with
additional logic for versioning and caching implemented for `CachedDataset`.

Adds a key for the dataset's type using its module and class name and
includes the initialization arguments.

For `CachedDataset` it extracts the underlying dataset's configuration,
handles the `versioned` flag and removes unnecessary metadata. It also
ensures the embedded dataset's configuration is appropriately flattened
or transformed.

If the dataset has a version key, it sets the `versioned` flag in the
configuration.

Removes the `metadata` key from the configuration if present.

Returns:
A dictionary containing the dataset's type and initialization arguments.
"""
return_config: dict[str, Any] = {
f"{TYPE_KEY}": f"{type(self).__module__}.{type(self).__name__}"
}

if self._init_args:
return_config.update(self._init_args)

if type(self).__name__ == "CachedDataset":
cached_ds = return_config.pop("dataset")
cached_ds_return_config: dict[str, Any] = {}
if isinstance(cached_ds, dict):
cached_ds_return_config = cached_ds
elif isinstance(cached_ds, AbstractDataset):
cached_ds_return_config = cached_ds.to_config()
if VERSIONED_FLAG_KEY in cached_ds_return_config:
return_config[VERSIONED_FLAG_KEY] = cached_ds_return_config.pop(
VERSIONED_FLAG_KEY
)
# Pop metadata from configuration
cached_ds_return_config.pop("metadata", None)
return_config["dataset"] = cached_ds_return_config

# Set `versioned` key if version present in the dataset
if return_config.pop(VERSION_KEY, None):
return_config[VERSIONED_FLAG_KEY] = True

# Pop metadata from configuration
return_config.pop("metadata", None)

return return_config

@property
def _logger(self) -> logging.Logger:
return logging.getLogger(__name__)
Expand Down Expand Up @@ -281,11 +361,49 @@ def save(self: Self, data: _DI) -> None:
return save

def __init_subclass__(cls, **kwargs: Any) -> None:
"""Decorate the `load` and `save` methods provided by the class.
"""Customizes the behavior of subclasses of AbstractDataset during
their creation. This method is automatically invoked when a subclass
of AbstractDataset is defined.


Decorates the `load` and `save` methods provided by the class.
If `_load` or `_save` are defined, alias them as a prerequisite.

"""

# Save the original __init__ method of the subclass
init_func: Callable = cls.__init__

def init_decorator(previous_init: Callable) -> Callable:
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved
"""A decorator that wraps the original __init__ of the subclass.

It ensures that after the original __init__ executes, the `__post_init__`
method of the instance is called with the arguments used to initialize
the object.
"""

def new_init(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def]
"""The decorated __init__ method.

Executes the original __init__, then calls __post_init__ with the arguments
used to initialize the instance.
"""

# Call the original __init__ method
previous_init(self, *args, **kwargs)
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved
if type(self) is cls:
# Capture and process the arguments passed to the original __init__
call_args = getcallargs(init_func, self, *args, **kwargs)
# Call the custom post-initialization method to save captured arguments
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved
self.__post_init__(call_args)
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved

return new_init

# Replace the subclass's __init__ with the decorated version
# A hook for subclasses to capture initialization arguments and save them
# in the AbstractDataset._init_args field
cls.__init__ = init_decorator(cls.__init__) # type: ignore[method-assign]

super().__init_subclass__(**kwargs)

if hasattr(cls, "_load") and not cls._load.__qualname__.startswith("Abstract"):
Expand Down Expand Up @@ -484,14 +602,14 @@ def parse_dataset_definition(
config = copy.deepcopy(config)

# TODO: remove when removing old catalog as moved to KedroDataCatalog
if "type" not in config:
if TYPE_KEY not in config:
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved
raise DatasetError(
"'type' is missing from dataset catalog configuration."
"\nHint: If this catalog entry is intended for variable interpolation, "
"make sure that the top level key is preceded by an underscore."
)

dataset_type = config.pop("type")
dataset_type = config.pop(TYPE_KEY)
class_obj = None
if isinstance(dataset_type, str):
if len(dataset_type.strip(".")) != len(dataset_type):
Expand Down Expand Up @@ -955,3 +1073,67 @@ def confirm(self, name: str) -> None:
def shallow_copy(self, extra_dataset_patterns: Patterns | None = None) -> _C:
"""Returns a shallow copy of the current object."""
...


def _validate_versions(
datasets: dict[str, AbstractDataset] | None,
load_versions: dict[str, str],
save_version: str | None,
) -> tuple[dict[str, str], str | None]:
"""Validates and synchronises dataset versions for loading and saving.

Ensures consistency of dataset versions across a catalog, particularly
for versioned datasets. It updates load versions and validates that all
save versions are consistent.

Args:
datasets: A dictionary mapping dataset names to their instances.
if None, no validation occurs.
load_versions: A mapping between dataset names and versions
to load.
save_version: Version string to be used for ``save`` operations
by all datasets with versioning enabled.

Returns:
Updated ``load_versions`` with load versions specified in the ``datasets``
and resolved ``save_version``.

Raises:
VersionAlreadyExistsError: If a dataset's save version conflicts with
the catalog's save version.
"""
if not datasets:
return load_versions, save_version

cur_load_versions = load_versions.copy()
cur_save_version = save_version

for ds_name, ds in datasets.items():
# TODO: Move to kedro/io/kedro_data_catalog.py when removing DataCatalog
# TODO: Make it a protected static method for KedroDataCatalog
# TODO: Replace with isinstance(ds, CachedDataset) - current implementation avoids circular import
cur_ds = ds._dataset if ds.__class__.__name__ == "CachedDataset" else ds # type: ignore[attr-defined]

if isinstance(cur_ds, AbstractVersionedDataset) and cur_ds._version:
if cur_ds._version.load:
cur_load_versions[ds_name] = cur_ds._version.load
if cur_ds._version.save:
cur_save_version = cur_save_version or cur_ds._version.save
if cur_save_version != cur_ds._version.save:
raise VersionAlreadyExistsError(
f"Cannot add a dataset `{ds_name}` with `{cur_ds._version.save}` save version. "
f"Save version set for the catalog is `{cur_save_version}`"
f"All datasets in the catalog must have the same save version."
)

return cur_load_versions, cur_save_version


def _is_memory_dataset(ds_or_type: AbstractDataset | str) -> bool:
"""Check if dataset or str type provided is a MemoryDataset."""
if isinstance(ds_or_type, AbstractDataset):
ElenaKhaustova marked this conversation as resolved.
Show resolved Hide resolved
return ds_or_type.__class__.__name__ == "MemoryDataset"
if isinstance(ds_or_type, str):
return ds_or_type in {"MemoryDataset", "kedro.io.memory_dataset.MemoryDataset"}

return False
Loading