Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CDF-23583] 👮‍♂️ CPU and Memory outside function limit #1309

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.cdf-tk.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Changes are grouped as follows:
with the flag `--offline`.
- [alpha feature] New subcommand `cdf modules pull` which pulls the configuration from the CDF project to the local
modules directory.
- Support for property `dataModelingType` in `LocationFilter` resources.

### Fixed

Expand All @@ -31,6 +32,10 @@ Changes are grouped as follows:
- Cognite Toolkit has improved resources that have server set default values that can lead to redeploy even when
unchanged. This includes `Sequences`, `Containers`, `DataSets`, `Views`, `Nodes`, `Edges`, `ExtractionPipelines`,
`CogniteFiles`, `HostedExtractorJobs`, `Relationships`, `RobotMaps`, and `WorkflowVersions`.
- On CDF deployed on Azure and AWS clouds, setting the `CPU` and `memory` of a CogniteFiles to lower than
the required value no longer triggers a redeploy.
- `LocationFilters` now parses the `version` key of `View` and `DataModel` correctly as a string.
- `LocationFilters` now converts an empty string of `dataSetExternalId` to `0` instead of ignoring it.

## [0.3.23] - 2024-12-13

Expand Down
4 changes: 2 additions & 2 deletions cognite_toolkit/_cdf_tk/client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from ._toolkit_client import ToolkitClient
from ._toolkit_client import ToolkitClient, ToolkitClientConfig

__all__ = ["ToolkitClient"]
__all__ = ["ToolkitClient", "ToolkitClientConfig"]
34 changes: 33 additions & 1 deletion cognite_toolkit/_cdf_tk/client/_toolkit_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

from typing import Literal, cast

from cognite.client import ClientConfig, CogniteClient

from .api.dml import DMLAPI
Expand All @@ -9,11 +11,41 @@
from .api.verify import VerifyAPI


class ToolkitClientConfig(ClientConfig):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small extension of the ClientConfig to include new property.

@property
def cloud_provider(self) -> Literal["azure", "aws", "gcp", "unknown"]:
cdf_cluster = self.cdf_cluster
if cdf_cluster is None:
return "unknown"
elif cdf_cluster.startswith("az-") or cdf_cluster in {"azure-dev", "bluefield", "westeurope-1"}:
return "azure"
elif cdf_cluster.startswith("aws-") or cdf_cluster in {"orangefield"}:
return "aws"
elif cdf_cluster.startswith("gc-") or cdf_cluster in {
"greenfield",
"asia-northeast1-1",
"cognitedata-development",
"cognitedata-production",
}:
return "gcp"
else:
return "unknown"


class ToolkitClient(CogniteClient):
def __init__(self, config: ClientConfig | None = None) -> None:
def __init__(self, config: ToolkitClientConfig | None = None) -> None:
super().__init__(config=config)
self.location_filters = LocationFiltersAPI(self._config, self._API_VERSION, self)
self.robotics = RoboticsAPI(self._config, self._API_VERSION, self)
self.dml = DMLAPI(self._config, self._API_VERSION, self)
self.verify = VerifyAPI(self._config, self._API_VERSION, self)
self.lookup = LookUpGroup(self._config, self._API_VERSION, self)

@property
def config(self) -> ToolkitClientConfig:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overwrites the property in the base class to get the correct type hint.

"""Returns a config object containing the configuration for the current client.

Returns:
ToolkitClientConfig: The configuration object.
"""
return cast(ToolkitClientConfig, self._config)
47 changes: 36 additions & 11 deletions cognite_toolkit/_cdf_tk/client/api/lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,22 @@ def resource_name(self) -> str:
return type(self).__name__.removesuffix("LookUpAPI")

@overload
def id(self, external_id: str, is_dry_run: bool = False) -> int: ...
def id(self, external_id: str, is_dry_run: bool = False, allow_empty: bool = False) -> int: ...

@overload
def id(self, external_id: SequenceNotStr[str], is_dry_run: bool = False) -> list[int]: ...
def id(
self, external_id: SequenceNotStr[str], is_dry_run: bool = False, allow_empty: bool = False
) -> list[int]: ...

def id(self, external_id: str | SequenceNotStr[str], is_dry_run: bool = False) -> int | list[int]:
def id(
self, external_id: str | SequenceNotStr[str], is_dry_run: bool = False, allow_empty: bool = False
) -> int | list[int]:
ids = [external_id] if isinstance(external_id, str) else external_id
missing = [id for id in ids if id not in self._cache]
if allow_empty and "" in missing:
# Note we do not want to put empty string in the cache. It is a special case that
# as of 01/02/2025 only applies to LocationFilters
missing.remove("")
if missing:
try:
lookup = self._id(missing)
Expand All @@ -63,14 +71,19 @@ def id(self, external_id: str | SequenceNotStr[str], is_dry_run: bool = False) -
raise ResourceRetrievalError(
f"Failed to retrieve {self.resource_name} with external_id {missing}." "Have you created it?"
)
if is_dry_run:
return (
self._cache.get(external_id, self.dry_run_id)
if isinstance(external_id, str)
else [self._cache.get(id, self.dry_run_id) for id in ids]
)
return (
self._get_id_from_cache(external_id, is_dry_run, allow_empty)
if isinstance(external_id, str)
else [self._get_id_from_cache(id, is_dry_run, allow_empty) for id in ids]
)

return self._cache[external_id] if isinstance(external_id, str) else [self._cache[id] for id in ids]
def _get_id_from_cache(self, external_id: str, is_dry_run: bool = False, allow_empty: bool = False) -> int:
if allow_empty and external_id == "":
return 0
elif is_dry_run:
return self._cache.get(external_id, self.dry_run_id)
else:
return self._cache[external_id]

@overload
def external_id(self, id: int) -> str: ...
Expand All @@ -84,6 +97,8 @@ def external_id(
) -> str | list[str]:
ids = [id] if isinstance(id, int) else id
missing = [id_ for id_ in ids if id_ not in self._reverse_cache]
if 0 in missing:
missing.remove(0)
if missing:
try:
lookup = self._external_id(missing)
Expand All @@ -103,7 +118,17 @@ def external_id(
raise ResourceRetrievalError(
f"Failed to retrieve {self.resource_name} with id {missing}." "Have you created it?"
)
return self._reverse_cache[id] if isinstance(id, int) else [self._reverse_cache[id] for id in ids]
return (
self._get_external_id_from_cache(id)
if isinstance(id, int)
else [self._get_external_id_from_cache(id) for id in ids]
)

def _get_external_id_from_cache(self, id: int) -> str:
if id == 0:
# Reverse of looking up an empty string.
return ""
return self._reverse_cache[id]

@abstractmethod
def _id(self, external_id: SequenceNotStr[str]) -> dict[str, int]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def __init__(
scene: LocationFilterScene | None = None,
asset_centric: AssetCentricFilter | None = None,
views: list[LocationFilterView] | None = None,
data_modeling_type: Literal["HYBRID", "DATA_MODELING_ONLY"] | None = None,
) -> None:
self.external_id = external_id
self.name = name
Expand All @@ -158,6 +159,7 @@ def __init__(
self.scene = scene
self.asset_centric = asset_centric
self.views = views
self.data_modeling_type = data_modeling_type

def as_write(self) -> LocationFilterWrite:
return LocationFilterWrite(
Expand All @@ -170,6 +172,7 @@ def as_write(self) -> LocationFilterWrite:
scene=self.scene,
asset_centric=self.asset_centric,
views=self.views,
data_modeling_type=self.data_modeling_type,
)

def dump(self, camel_case: bool = True) -> dict[str, Any]:
Expand Down Expand Up @@ -208,6 +211,7 @@ def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None =
scene=scene,
asset_centric=asset_centric,
views=views,
data_modeling_type=resource.get("dataModelingType"),
)


Expand Down Expand Up @@ -242,9 +246,19 @@ def __init__(
scene: LocationFilterScene | None = None,
asset_centric: AssetCentricFilter | None = None,
views: list[LocationFilterView] | None = None,
data_modeling_type: Literal["HYBRID", "DATA_MODELING_ONLY"] | None = None,
) -> None:
super().__init__(
external_id, name, parent_id, description, data_models, instance_spaces, scene, asset_centric, views
external_id,
name,
parent_id,
description,
data_models,
instance_spaces,
scene,
asset_centric,
views,
data_modeling_type,
)
self.id = id
self.created_time = created_time
Expand All @@ -267,6 +281,7 @@ def _load(cls, resource: dict[str, Any], cognite_client: CogniteClient | None =
views=[LocationFilterView._load(view) for view in resource["views"]] if "views" in resource else None,
created_time=resource["createdTime"],
updated_time=resource["lastUpdatedTime"],
data_modeling_type=resource.get("dataModelingType"),
)


Expand Down
7 changes: 6 additions & 1 deletion cognite_toolkit/_cdf_tk/commands/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
DataModelLoader,
ExtractionPipelineConfigLoader,
FileLoader,
LocationFilterLoader,
NodeLoader,
RawDatabaseLoader,
RawTableLoader,
Expand Down Expand Up @@ -455,7 +456,11 @@ def _replace_variables(
source_files.append(BuildSourceFile(source, content, None))
continue

if resource_name in {TransformationLoader.folder_name, DataModelLoader.folder_name}:
if resource_name in {
TransformationLoader.folder_name,
DataModelLoader.folder_name,
LocationFilterLoader.folder_name,
}:
# Ensure that all keys that are version gets read as strings.
# This is required by DataModels, Views, and Transformations that reference DataModels and Views.
content = quote_int_value_by_key_in_yaml(content, key="version")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,19 @@ def dump_resource(self, resource: Function, local: dict[str, Any]) -> dict[str,
if key not in local:
# Server set default values
dumped.pop(key, None)
elif isinstance(local.get(key), float) and local[key] < dumped[key]:
# On Azure and AWS, the server sets the CPU and Memory to the default values if the user
# pass in lower values. We set this to match the local to avoid triggering a redeploy.
# Note the user will get a warning about this when the function is created.
if self.client.config.cloud_provider in ("azure", "aws"):
dumped[key] = local[key]
elif self.client.config.cloud_provider == "gcp" and key == "cpu" and local[key] < 1.0:
# GCP does not allow CPU to be set to below 1.0
dumped[key] = local[key]
elif self.client.config.cloud_provider == "gcp" and key == "memory" and local[key] < 1.5:
# GCP does not allow Memory to be set to below 1.5
dumped[key] = local[key]

for key in ["indexUrl", "extraIndexUrls"]:
# Only in write (request) format of the function
if key in local:
Expand Down Expand Up @@ -224,9 +237,38 @@ def create(self, items: FunctionWriteList) -> FunctionList:
else:
raise RuntimeError("Could not retrieve file from files API")
item.file_id = file_id
created.append(self.client.functions.create(item))
created_item = self.client.functions.create(item)
self._warn_if_cpu_or_memory_changed(created_item, item)
created.append(created_item)
return created

@staticmethod
def _warn_if_cpu_or_memory_changed(created_item: Function, item: FunctionWrite) -> None:
is_cpu_increased = (
isinstance(item.cpu, float) and isinstance(created_item.cpu, float) and item.cpu < created_item.cpu
)
is_mem_increased = (
isinstance(item.memory, float)
and isinstance(created_item.memory, float)
and item.memory < created_item.memory
)
if is_cpu_increased and is_mem_increased:
prefix = "CPU and Memory"
suffix = f"CPU {item.cpu} -> {created_item.cpu}, Memory {item.memory} -> {created_item.memory}"
elif is_cpu_increased:
prefix = "CPU"
suffix = f"{item.cpu} -> {created_item.cpu}"
elif is_mem_increased:
prefix = "Memory"
suffix = f"{item.memory} -> {created_item.memory}"
else:
return
# The server sets the CPU and Memory to the default values, if the user pass in a lower value.
# This happens on Azure and AWS. Warning the user about this.
LowSeverityWarning(
f"Function {prefix} is not configurable. Function {item.external_id!r} set {suffix}"
).print_warning()
Comment on lines +268 to +270
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition, give the user a warning that whatever they set has no effect.


def retrieve(self, ids: SequenceNotStr[str]) -> FunctionList:
if not self._is_activated("retrieve"):
return FunctionList([])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from collections.abc import Hashable, Iterable, Sequence
from functools import lru_cache
from pathlib import Path
from typing import Any, final

from cognite.client.data_classes.capabilities import Capability, LocationFiltersAcl
Expand All @@ -16,7 +17,7 @@
LocationFilterWriteList,
)
from cognite_toolkit._cdf_tk.loaders._base_loaders import ResourceLoader
from cognite_toolkit._cdf_tk.utils import in_dict
from cognite_toolkit._cdf_tk.utils import in_dict, quote_int_value_by_key_in_yaml, safe_read
from cognite_toolkit._cdf_tk.utils.diff_list import diff_list_hashable, diff_list_identifiable, dm_identifier

from .classic_loaders import AssetLoader, SequenceLoader
Expand Down Expand Up @@ -90,19 +91,31 @@ def get_id(cls, item: LocationFilter | LocationFilterWrite | dict) -> str:
def dump_id(cls, id: str) -> dict[str, Any]:
return {"externalId": id}

def safe_read(self, filepath: Path | str) -> str:
# The version is a string, but the user often writes it as an int.
# YAML will then parse it as an int, for example, `3_0_2` will be parsed as `302`.
# This is technically a user mistake, as you should quote the version in the YAML file.
# However, we do not want to put this burden on the user (knowing the intricate workings of YAML),
# so we fix it here.
return quote_int_value_by_key_in_yaml(safe_read(filepath), key="version")

def load_resource(self, resource: dict[str, Any], is_dry_run: bool = False) -> LocationFilterWrite:
if parent_external_id := resource.pop("parentExternalId", None):
resource["parentId"] = self.client.lookup.location_filters.id(parent_external_id, is_dry_run)
if "assetCentric" not in resource:
return LocationFilterWrite._load(resource)
asset_centric = resource["assetCentric"]
if data_set_external_ids := asset_centric.pop("dataSetExternalIds", None):
asset_centric["dataSetIds"] = self.client.lookup.data_sets.id(data_set_external_ids, is_dry_run)
asset_centric["dataSetIds"] = self.client.lookup.data_sets.id(
data_set_external_ids, is_dry_run, allow_empty=True
)
for subfilter_name in self.subfilter_names:
subfilter = asset_centric.get(subfilter_name, {})
if data_set_external_ids := subfilter.pop("dataSetExternalIds", []):
asset_centric[subfilter_name]["dataSetIds"] = self.client.lookup.data_sets.id(
data_set_external_ids, is_dry_run
data_set_external_ids,
is_dry_run,
allow_empty=True,
)

return LocationFilterWrite._load(resource)
Expand All @@ -111,6 +124,9 @@ def dump_resource(self, resource: LocationFilter, local: dict[str, Any]) -> dict
dumped = resource.as_write().dump()
if parent_id := dumped.pop("parentId", None):
dumped["parentExternalId"] = self.client.lookup.location_filters.external_id(parent_id)
if "dataModelingType" in dumped and "dataModelingType" not in local:
# Default set on server side
dumped.pop("dataModelingType")
if "assetCentric" not in dumped:
return dumped
asset_centric = dumped["assetCentric"]
Expand Down
7 changes: 3 additions & 4 deletions cognite_toolkit/_cdf_tk/utils/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import questionary
import typer
from cognite.client import ClientConfig
from cognite.client.config import global_config
from cognite.client.credentials import (
CredentialProvider,
Expand All @@ -38,7 +37,7 @@
from rich import print
from rich.prompt import Prompt

from cognite_toolkit._cdf_tk.client import ToolkitClient
from cognite_toolkit._cdf_tk.client import ToolkitClient, ToolkitClientConfig
from cognite_toolkit._cdf_tk.constants import (
_RUNNING_IN_BROWSER,
TOOLKIT_CLIENT_ENTRA_ID,
Expand Down Expand Up @@ -659,7 +658,7 @@ def initialize_from_auth_variables(self, auth: AuthVariables, clear_cache: bool
raise AuthenticationError(f"Login flow {auth.login_flow} is not supported.")
self._token_url = auth.token_url
self._toolkit_client = ToolkitClient(
ClientConfig(
ToolkitClientConfig(
client_name=self._client_name,
base_url=self._cdf_url,
project=self._project,
Expand Down Expand Up @@ -790,7 +789,7 @@ def create_client(self, credentials: ClientCredentials) -> ToolkitClient:
)

return ToolkitClient(
config=ClientConfig(
config=ToolkitClientConfig(
client_name=self._client_name,
project=self._project,
base_url=self._cdf_url,
Expand Down
Loading
Loading