Skip to content

[AQUA] Integrate aqua to use model group #1214

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jul 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
324 changes: 30 additions & 294 deletions ads/aqua/model/model.py

Large diffs are not rendered by default.

403 changes: 371 additions & 32 deletions ads/aqua/modeldeployment/deployment.py

Large diffs are not rendered by default.

37 changes: 32 additions & 5 deletions ads/aqua/modeldeployment/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from ads.aqua import logger
from ads.aqua.common.entities import AquaMultiModelRef
from ads.aqua.common.enums import Tags
from ads.aqua.common.errors import AquaValueError
from ads.aqua.config.utils.serializer import Serializable
from ads.aqua.constants import UNKNOWN_DICT
from ads.aqua.data import AquaResourceIdentifier
Expand All @@ -21,6 +22,7 @@
from ads.common.serializer import DataClassSerializable
from ads.common.utils import UNKNOWN, get_console_link
from ads.model.datascience_model import DataScienceModel
from ads.model.deployment.model_deployment import ModelDeploymentType
from ads.model.model_metadata import ModelCustomMetadataItem


Expand Down Expand Up @@ -147,13 +149,39 @@ def from_oci_model_deployment(
AquaDeployment:
The instance of the Aqua model deployment.
"""
instance_configuration = oci_model_deployment.model_deployment_configuration_details.model_configuration_details.instance_configuration
model_deployment_configuration_details = (
oci_model_deployment.model_deployment_configuration_details
)
if (
model_deployment_configuration_details.deployment_type
== ModelDeploymentType.SINGLE_MODEL
):
instance_configuration = model_deployment_configuration_details.model_configuration_details.instance_configuration
instance_count = model_deployment_configuration_details.model_configuration_details.scaling_policy.instance_count
model_id = model_deployment_configuration_details.model_configuration_details.model_id
elif (
model_deployment_configuration_details.deployment_type
== ModelDeploymentType.MODEL_GROUP
):
instance_configuration = model_deployment_configuration_details.infrastructure_configuration_details.instance_configuration
instance_count = model_deployment_configuration_details.infrastructure_configuration_details.scaling_policy.instance_count
model_id = model_deployment_configuration_details.model_group_configuration_details.model_group_id
else:
allowed_deployment_types = ", ".join(
[key for key in dir(ModelDeploymentType) if not key.startswith("__")]
)
raise AquaValueError(
f"Invalid AQUA deployment with type {model_deployment_configuration_details.deployment_type}."
f"Only {allowed_deployment_types} are supported at this moment. Specify a different AQUA model deployment."
)

instance_shape_config_details = (
instance_configuration.model_deployment_instance_shape_config_details
)
instance_count = oci_model_deployment.model_deployment_configuration_details.model_configuration_details.scaling_policy.instance_count
environment_variables = oci_model_deployment.model_deployment_configuration_details.environment_configuration_details.environment_variables
cmd = oci_model_deployment.model_deployment_configuration_details.environment_configuration_details.cmd
environment_variables = model_deployment_configuration_details.environment_configuration_details.environment_variables
cmd = (
model_deployment_configuration_details.environment_configuration_details.cmd
)
shape_info = ShapeInfo(
instance_shape=instance_configuration.instance_shape_name,
instance_count=instance_count,
Expand All @@ -168,7 +196,6 @@ def from_oci_model_deployment(
else None
),
)
model_id = oci_model_deployment._model_deployment_configuration_details.model_configuration_details.model_id
tags = {}
tags.update(oci_model_deployment.freeform_tags or UNKNOWN_DICT)
tags.update(oci_model_deployment.defined_tags or UNKNOWN_DICT)
Expand Down
27 changes: 14 additions & 13 deletions ads/aqua/modeldeployment/model_group_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from typing import List, Optional, Tuple, Union

from pydantic import BaseModel, Field, field_validator
from pydantic import BaseModel, Field
from typing_extensions import Self

from ads.aqua import logger
Expand Down Expand Up @@ -61,18 +61,19 @@ class BaseModelSpec(BaseModel):
description="Optional list of fine-tuned model variants associated with this base model.",
)

@field_validator("model_path")
@classmethod
def clean_model_path(cls, artifact_path_prefix: str) -> str:
"""Validates and cleans the file path for model_path parameter."""
if ObjectStorageDetails.is_oci_path(artifact_path_prefix):
os_path = ObjectStorageDetails.from_path(artifact_path_prefix)
artifact_path_prefix = os_path.filepath.rstrip("/")
return artifact_path_prefix

raise AquaValueError(
"The base model path is not available in the model artifact."
)
def build_model_path(cls, model_id: str, artifact_path_prefix: str) -> str:
"""Cleans and builds the file path for model_path parameter
to format: <model_id>/<artifact_path_prefix>
"""
if not ObjectStorageDetails.is_oci_path(artifact_path_prefix):
raise AquaValueError(
"The base model path is not available in the model artifact."
)

os_path = ObjectStorageDetails.from_path(artifact_path_prefix)
artifact_path_prefix = os_path.filepath.rstrip("/")
return model_id + "/" + artifact_path_prefix.lstrip("/")

@classmethod
def dedup_lora_modules(cls, fine_tune_weights: List[LoraModuleSpec]):
Expand All @@ -99,7 +100,7 @@ def from_aqua_multi_model_ref(

return cls(
model_id=model.model_id,
model_path=model.artifact_location,
model_path=cls.build_model_path(model.model_id, model.artifact_location),
params=model_params,
model_task=model.model_task,
fine_tune_weights=cls.dedup_lora_modules(model.fine_tune_weights),
Expand Down
118 changes: 76 additions & 42 deletions ads/model/deployment/model_deployment.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
#!/usr/bin/env python
# -*- coding: utf-8; -*-

# Copyright (c) 2021, 2023 Oracle and/or its affiliates.
# Copyright (c) 2021, 2025 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/


import collections
import copy
import datetime
import oci
import warnings
import time
from typing import Dict, List, Union, Any
import warnings
from typing import Any, Dict, List, Union

import oci
import oci.loggingsearch
from ads.common import auth as authutil
import pandas as pd
from ads.model.serde.model_input import JsonModelInputSERDE
from oci.data_science.models import (
CreateModelDeploymentDetails,
LogDetails,
UpdateModelDeploymentDetails,
)

from ads.common import auth as authutil
from ads.common import utils as ads_utils
from ads.common.oci_logging import (
LOG_INTERVAL,
LOG_RECORDS_LIMIT,
Expand All @@ -30,10 +35,10 @@
from ads.model.deployment.common.utils import send_request
from ads.model.deployment.model_deployment_infrastructure import (
DEFAULT_BANDWIDTH_MBPS,
DEFAULT_MEMORY_IN_GBS,
DEFAULT_OCPUS,
DEFAULT_REPLICA,
DEFAULT_SHAPE_NAME,
DEFAULT_OCPUS,
DEFAULT_MEMORY_IN_GBS,
MODEL_DEPLOYMENT_INFRASTRUCTURE_TYPE,
ModelDeploymentInfrastructure,
)
Expand All @@ -45,18 +50,14 @@
ModelDeploymentRuntimeType,
OCIModelDeploymentRuntimeType,
)
from ads.model.serde.model_input import JsonModelInputSERDE
from ads.model.service.oci_datascience_model_deployment import (
OCIDataScienceModelDeployment,
)
from ads.common import utils as ads_utils

from .common import utils
from .common.utils import State
from .model_deployment_properties import ModelDeploymentProperties
from oci.data_science.models import (
LogDetails,
CreateModelDeploymentDetails,
UpdateModelDeploymentDetails,
)

DEFAULT_WAIT_TIME = 1200
DEFAULT_POLL_INTERVAL = 10
Expand All @@ -80,6 +81,11 @@ class ModelDeploymentLogType:
ACCESS = "access"


class ModelDeploymentType:
SINGLE_MODEL = "SINGLE_MODEL"
MODEL_GROUP = "MODEL_GROUP"


class LogNotConfiguredError(Exception): # pragma: no cover
pass

Expand Down Expand Up @@ -964,7 +970,9 @@ def predict(
except oci.exceptions.ServiceError as ex:
# When bandwidth exceeds the allocated value, TooManyRequests error (429) will be raised by oci backend.
if ex.status == 429:
bandwidth_mbps = self.infrastructure.bandwidth_mbps or DEFAULT_BANDWIDTH_MBPS
bandwidth_mbps = (
self.infrastructure.bandwidth_mbps or DEFAULT_BANDWIDTH_MBPS
)
utils.get_logger().warning(
f"Load balancer bandwidth exceeds the allocated {bandwidth_mbps} Mbps."
"To estimate the actual bandwidth, use formula: (payload size in KB) * (estimated requests per second) * 8 / 1024."
Expand Down Expand Up @@ -1644,36 +1652,36 @@ def _build_model_deployment_configuration_details(self) -> Dict:
}

if infrastructure.subnet_id:
instance_configuration[
infrastructure.CONST_SUBNET_ID
] = infrastructure.subnet_id
instance_configuration[infrastructure.CONST_SUBNET_ID] = (
infrastructure.subnet_id
)

if infrastructure.private_endpoint_id:
if not hasattr(
oci.data_science.models.InstanceConfiguration, "private_endpoint_id"
):
# TODO: add oci version with private endpoint support.
raise EnvironmentError(
raise OSError(
"Private endpoint is not supported in the current OCI SDK installed."
)

instance_configuration[
infrastructure.CONST_PRIVATE_ENDPOINT_ID
] = infrastructure.private_endpoint_id
instance_configuration[infrastructure.CONST_PRIVATE_ENDPOINT_ID] = (
infrastructure.private_endpoint_id
)

scaling_policy = {
infrastructure.CONST_POLICY_TYPE: "FIXED_SIZE",
infrastructure.CONST_INSTANCE_COUNT: infrastructure.replica
or DEFAULT_REPLICA,
}

if not runtime.model_uri:
if not (runtime.model_uri or runtime.model_group_id):
raise ValueError(
"Missing parameter model uri. Try reruning it after model uri is configured."
"Missing parameter model uri and model group id. Try reruning it after model or model group is configured."
)

model_id = runtime.model_uri
if not model_id.startswith("ocid"):
if model_id and not model_id.startswith("ocid"):
from ads.model.datascience_model import DataScienceModel

dsc_model = DataScienceModel(
Expand Down Expand Up @@ -1704,7 +1712,7 @@ def _build_model_deployment_configuration_details(self) -> Dict:
oci.data_science.models,
"ModelDeploymentEnvironmentConfigurationDetails",
):
raise EnvironmentError(
raise OSError(
"Environment variable hasn't been supported in the current OCI SDK installed."
)

Expand All @@ -1720,9 +1728,9 @@ def _build_model_deployment_configuration_details(self) -> Dict:
and runtime.inference_server.upper()
== MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON
):
environment_variables[
"CONTAINER_TYPE"
] = MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON
environment_variables["CONTAINER_TYPE"] = (
MODEL_DEPLOYMENT_INFERENCE_SERVER_TRITON
)
runtime.set_spec(runtime.CONST_ENV, environment_variables)
environment_configuration_details = {
runtime.CONST_ENVIRONMENT_CONFIG_TYPE: runtime.environment_config_type,
Expand All @@ -1734,27 +1742,45 @@ def _build_model_deployment_configuration_details(self) -> Dict:
oci.data_science.models,
"OcirModelDeploymentEnvironmentConfigurationDetails",
):
raise EnvironmentError(
raise OSError(
"Container runtime hasn't been supported in the current OCI SDK installed."
)
environment_configuration_details["image"] = runtime.image
environment_configuration_details["imageDigest"] = runtime.image_digest
environment_configuration_details["cmd"] = runtime.cmd
environment_configuration_details["entrypoint"] = runtime.entrypoint
environment_configuration_details["serverPort"] = runtime.server_port
environment_configuration_details[
"healthCheckPort"
] = runtime.health_check_port
environment_configuration_details["healthCheckPort"] = (
runtime.health_check_port
)

model_deployment_configuration_details = {
infrastructure.CONST_DEPLOYMENT_TYPE: "SINGLE_MODEL",
infrastructure.CONST_DEPLOYMENT_TYPE: ModelDeploymentType.SINGLE_MODEL,
infrastructure.CONST_MODEL_CONFIG_DETAILS: model_configuration_details,
runtime.CONST_ENVIRONMENT_CONFIG_DETAILS: environment_configuration_details,
}

if runtime.model_group_id:
model_deployment_configuration_details[
infrastructure.CONST_DEPLOYMENT_TYPE
] = ModelDeploymentType.MODEL_GROUP
model_deployment_configuration_details["modelGroupConfigurationDetails"] = {
runtime.CONST_MODEL_GROUP_ID: runtime.model_group_id
}
model_deployment_configuration_details[
"infrastructureConfigurationDetails"
] = {
"infrastructureType": "INSTANCE_POOL",
infrastructure.CONST_BANDWIDTH_MBPS: infrastructure.bandwidth_mbps
or DEFAULT_BANDWIDTH_MBPS,
infrastructure.CONST_INSTANCE_CONFIG: instance_configuration,
infrastructure.CONST_SCALING_POLICY: scaling_policy,
}
model_configuration_details.pop(runtime.CONST_MODEL_ID)

if runtime.deployment_mode == ModelDeploymentMode.STREAM:
if not hasattr(oci.data_science.models, "StreamConfigurationDetails"):
raise EnvironmentError(
raise OSError(
"Model deployment mode hasn't been supported in the current OCI SDK installed."
)
model_deployment_configuration_details[
Expand Down Expand Up @@ -1786,9 +1812,13 @@ def _build_category_log_details(self) -> Dict:

logs = {}
if (
self.infrastructure.access_log and
self.infrastructure.access_log.get(self.infrastructure.CONST_LOG_GROUP_ID, None)
and self.infrastructure.access_log.get(self.infrastructure.CONST_LOG_ID, None)
self.infrastructure.access_log
and self.infrastructure.access_log.get(
self.infrastructure.CONST_LOG_GROUP_ID, None
)
and self.infrastructure.access_log.get(
self.infrastructure.CONST_LOG_ID, None
)
):
logs[self.infrastructure.CONST_ACCESS] = {
self.infrastructure.CONST_LOG_GROUP_ID: self.infrastructure.access_log.get(
Expand All @@ -1799,9 +1829,13 @@ def _build_category_log_details(self) -> Dict:
),
}
if (
self.infrastructure.predict_log and
self.infrastructure.predict_log.get(self.infrastructure.CONST_LOG_GROUP_ID, None)
and self.infrastructure.predict_log.get(self.infrastructure.CONST_LOG_ID, None)
self.infrastructure.predict_log
and self.infrastructure.predict_log.get(
self.infrastructure.CONST_LOG_GROUP_ID, None
)
and self.infrastructure.predict_log.get(
self.infrastructure.CONST_LOG_ID, None
)
):
logs[self.infrastructure.CONST_PREDICT] = {
self.infrastructure.CONST_LOG_GROUP_ID: self.infrastructure.predict_log.get(
Expand Down
Loading