Skip to content

Commit

Permalink
Release 1.6.10 (tracks d7dd42bfe62c031c0fa0c01a3e0163312ff0e4c1)
Browse files Browse the repository at this point in the history
  • Loading branch information
elementl-devtools committed Mar 14, 2024
1 parent 8693582 commit b6cf514
Show file tree
Hide file tree
Showing 18 changed files with 267 additions and 29 deletions.
2 changes: 1 addition & 1 deletion dagster-cloud-cli/dagster_cloud_cli/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.6.9"
__version__ = "1.6.10"
2 changes: 1 addition & 1 deletion dagster-cloud-examples/dagster_cloud_examples/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.6.9"
__version__ = "1.6.10"
2 changes: 1 addition & 1 deletion dagster-cloud-examples/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def get_version() -> str:
name="dagster-cloud-examples",
version=ver,
packages=find_packages(exclude=["dagster_cloud_examples_tests*"]),
install_requires=["dagster_cloud==1.6.9"],
install_requires=["dagster_cloud==1.6.10"],
extras_require={"tests": ["mypy", "pylint", "pytest"]},
author="Elementl",
author_email="[email protected]",
Expand Down
6 changes: 3 additions & 3 deletions dagster-cloud/dagster_cloud/agent/dagster_cloud_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
import dagster._check as check
import pendulum
from dagster import DagsterInstance
from dagster._core.host_representation import (
from dagster._core.launcher.base import LaunchRunContext
from dagster._core.remote_representation import (
CodeLocationOrigin,
)
from dagster._core.host_representation.origin import RegisteredCodeLocationOrigin
from dagster._core.launcher.base import LaunchRunContext
from dagster._core.remote_representation.origin import RegisteredCodeLocationOrigin
from dagster._core.utils import FuturesAwareThreadPoolExecutor
from dagster._grpc.client import DagsterGrpcClient
from dagster._grpc.types import CancelExecutionRequest
Expand Down
2 changes: 1 addition & 1 deletion dagster-cloud/dagster_cloud/api/dagster_cloud_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pendulum
from dagster._core.code_pointer import CodePointer
from dagster._core.definitions.selector import JobSelector
from dagster._core.host_representation import (
from dagster._core.remote_representation import (
CodeLocationOrigin,
ExternalRepositoryData,
)
Expand Down
Empty file.
142 changes: 142 additions & 0 deletions dagster-cloud/dagster_cloud/dagster_anomaly_detection/defs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import os
from typing import Optional, Sequence, Union, cast

from dagster import (
_check as check,
asset_check,
)
from dagster._core.definitions.asset_check_result import AssetCheckResult
from dagster._core.definitions.asset_check_spec import AssetCheckSeverity
from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.events import CoercibleToAssetKey
from dagster._core.definitions.source_asset import SourceAsset
from dagster._core.errors import (
DagsterError,
DagsterInvariantViolationError,
)
from dagster._core.execution.context.compute import AssetExecutionContext
from dagster._core.instance import DagsterInstance
from gql import Client, gql
from gql.transport.requests import RequestsHTTPTransport

from dagster_cloud import DagsterCloudAgentInstance

from .mutation import ANOMALY_DETECTION_INFERENCE_MUTATION
from .types import (
AnomalyDetectionModelParams,
BetaFreshnessAnomalyDetectionParams,
)

DEFAULT_MODEL_PARAMS = BetaFreshnessAnomalyDetectionParams(sensitivity=0.1)


class DagsterCloudAnomalyDetectionFailed(DagsterError):
"""Raised when an anomaly detection check fails host-side."""


def _build_check_for_asset(
asset: Union[CoercibleToAssetKey, AssetsDefinition, SourceAsset],
params: AnomalyDetectionModelParams,
) -> AssetChecksDefinition:
@asset_check(
asset=asset,
description=f"Detects anomalies in the freshness of the asset using model {params.model_version.value.lower()}.",
name="freshness_anomaly_detection",
)
def the_check(context: AssetExecutionContext) -> AssetCheckResult:
if not _is_agent_instance(context.instance):
raise DagsterInvariantViolationError(
f"This anomaly detection check is not being launched from a dagster agent. "
"Anomaly detection is only available for dagster cloud deployments."
f"Instance type: {type(context.instance)}."
)
instance = cast(DagsterCloudAgentInstance, context.instance)
transport = RequestsHTTPTransport(
url=os.getenv("DAGSTER_METRICS_DAGIT_URL", f"{instance.dagit_url}graphql"),
use_json=True,
timeout=300,
headers={"Dagster-Cloud-Api-Token": instance.dagster_cloud_agent_token},
)
client = Client(transport=transport, fetch_schema_from_transport=True)
asset_key = context.asset_checks_def.asset_key
asset = context.job_def.asset_layer.assets_defs_by_key.get(
asset_key
) or context.job_def.asset_layer.source_assets_by_key.get(asset_key)
if asset is None:
raise Exception(f"Could not find targeted asset {asset_key.to_string()}.")
result = client.execute(
gql(ANOMALY_DETECTION_INFERENCE_MUTATION),
{
"modelVersion": params.model_version.value,
"params": {
**dict(params),
"asset_key_user_string": asset_key.to_user_string(),
},
},
)
if result["anomalyDetectionInference"]["__typename"] != "AnomalyDetectionSuccess":
raise DagsterCloudAnomalyDetectionFailed(
f"Anomaly detection failed: {result['anomalyDetectionInference']['message']}"
)
response = result["anomalyDetectionInference"]["response"]
overdue_seconds = check.float_param(response["overdue_seconds"], "overdue_seconds")
expected_event_timestamp = response["overdue_deadline_timestamp"]
model_training_range_start = response["model_training_range_start_timestamp"]
model_training_range_end = response["model_training_range_end_timestamp"]
metadata = {
"model_params": {**params.as_metadata},
"model_version": params.model_version.value,
"model_training_range_start_timestamp": model_training_range_start,
"model_training_range_end_timestamp": model_training_range_end,
"overdue_deadline_timestamp": expected_event_timestamp,
}
if overdue_seconds > 0:
metadata["overdue_minutes"] = overdue_seconds / 60
return AssetCheckResult(
passed=False,
severity=AssetCheckSeverity.WARN,
metadata=metadata,
)
else:
return AssetCheckResult(passed=True, metadata=metadata)

return the_check


def build_anomaly_detection_freshness_checks(
*,
assets: Sequence[Union[CoercibleToAssetKey, AssetsDefinition, SourceAsset]],
params: Optional[AnomalyDetectionModelParams],
) -> Sequence[AssetChecksDefinition]:
"""Builds a list of asset checks which utilize anomaly detection algorithms to
determine the freshness of data.
Args:
assets (Sequence[Union[CoercibleToAssetKey, AssetsDefinition, SourceAsset]]): The assets to construct checks for. For each passed in
asset, there will be a corresponding constructed `AssetChecksDefinition`.
params (AnomalyDetectionModelParams): The parameters to use for the model. The parameterization corresponds to the model used.
Returns:
Sequence[AssetChecksDefinition]: A list of `AssetChecksDefinition` objects, each corresponding to an asset in the `assets` parameter.
Examples:
.. code-block:: python
from dagster_cloud import build_anomaly_detection_freshness_checks, AnomalyDetectionModel, BetaFreshnessAnomalyDetectionParams
checks = build_anomaly_detection_freshness_checks(
assets=[AssetKey("foo_asset"), AssetKey("foo_asset")],
params=BetaFreshnessAnomalyDetectionParams(sensitivity=0.1),
)
"""
params = check.opt_inst_param(
params, "params", AnomalyDetectionModelParams, DEFAULT_MODEL_PARAMS
)
return [_build_check_for_asset(asset, params) for asset in assets]


def _is_agent_instance(instance: DagsterInstance) -> bool:
if hasattr(instance, "dagster_cloud_agent_token") and hasattr(instance, "dagit_url"):
return True
return False
17 changes: 17 additions & 0 deletions dagster-cloud/dagster_cloud/dagster_anomaly_detection/mutation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
ANOMALY_DETECTION_INFERENCE_MUTATION = """
mutation AnomalyDetectionInferenceMutation($modelVersion: String!, $params: GenericScalar!) {
anomalyDetectionInference(modelVersion: $modelVersion, params: $params) {
__typename
... on AnomalyDetectionSuccess {
response
}
... on AnomalyDetectionFailure {
message
}
... on PythonError {
message
stack
}
}
}
"""
55 changes: 55 additions & 0 deletions dagster-cloud/dagster_cloud/dagster_anomaly_detection/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from abc import abstractproperty
from enum import Enum
from typing import Dict

from dagster._core.definitions.metadata import FloatMetadataValue, MetadataValue
from pydantic import BaseModel


class AnomalyDetectionModelVersion(Enum):
FRESHNESS_BETA = "FRESHNESS_BETA"


### INTERNAL MODEL PARAMETER SETS ###


class InternalModelParams(BaseModel):
@abstractproperty
def model_version(self) -> AnomalyDetectionModelVersion:
raise NotImplementedError("Subclasses must implement this method")


class InternalBetaFreshnessAnomalyDetectionParams(InternalModelParams):
sensitivity: float
asset_key_user_string: str

@property
def model_version(self) -> AnomalyDetectionModelVersion:
return AnomalyDetectionModelVersion.FRESHNESS_BETA


### USER FACING MODEL PARAMETER SETS ###


class AnomalyDetectionModelParams(BaseModel):
@abstractproperty
def model_version(self) -> AnomalyDetectionModelVersion:
raise NotImplementedError("Subclasses must implement this method")

@abstractproperty
def as_metadata(self) -> Dict[str, MetadataValue]:
raise NotImplementedError("Subclasses must implement this method")


class BetaFreshnessAnomalyDetectionParams(AnomalyDetectionModelParams):
sensitivity: float

@property
def model_version(self) -> AnomalyDetectionModelVersion:
return AnomalyDetectionModelVersion.FRESHNESS_BETA

@property
def as_metadata(self) -> Dict[str, MetadataValue]:
return {
"sensitivity": FloatMetadataValue(self.sensitivity),
}
8 changes: 8 additions & 0 deletions dagster-cloud/dagster_cloud/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,14 @@ def dagster_cloud_gen_logs_url_url(self):
def dagster_cloud_gen_insights_url_url(self) -> str:
return f"{self.dagster_cloud_url}/gen_insights_url"

@property
def dagster_cloud_gen_artifacts_post(self) -> str:
return f"{self.dagster_cloud_url}/gen_artifacts_post"

@property
def dagster_cloud_gen_artifacts_get(self) -> str:
return f"{self.dagster_cloud_url}/gen_artifacts_get"

@property
def dagster_cloud_upload_job_snap_url(self):
return f"{self.dagster_cloud_url}/upload_job_snapshot"
Expand Down
2 changes: 1 addition & 1 deletion dagster-cloud/dagster_cloud/storage/runs/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
)

GET_RUN_TAGS_QUERY = """
query getRunTagsQuery($jsonTagKeys: JSONString, $valuePrefix: String, $limit: Int) {
query getRunTagsQuery($jsonTagKeys: JSONString!, $valuePrefix: String, $limit: Int) {
runs {
getRunTags(jsonTagKeys: $jsonTagKeys, valuePrefix: $valuePrefix, limit: $limit) {
key
Expand Down
10 changes: 3 additions & 7 deletions dagster-cloud/dagster_cloud/storage/runs/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
)
from dagster._core.events import DagsterEvent
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.host_representation.origin import ExternalJobOrigin
from dagster._core.remote_representation.origin import ExternalJobOrigin
from dagster._core.snap import (
ExecutionPlanSnapshot,
JobSnapshot,
Expand Down Expand Up @@ -305,18 +305,14 @@ def get_run_records(

def get_run_tags(
self,
tag_keys: Optional[Sequence[str]] = None,
tag_keys: Sequence[str],
value_prefix: Optional[str] = None,
limit: Optional[int] = None,
) -> Sequence[Tuple[str, Set[str]]]:
res = self._execute_query(
GET_RUN_TAGS_QUERY,
variables={
"jsonTagKeys": (
json.dumps(check.list_param(tag_keys, "tag_keys", of_type=str))
if tag_keys
else None
),
"jsonTagKeys": (json.dumps(check.list_param(tag_keys, "tag_keys", of_type=str))),
"valuePrefix": check.opt_str_param(value_prefix, "value_prefix"),
"limit": check.opt_int_param(limit, "limit"),
},
Expand Down
2 changes: 1 addition & 1 deletion dagster-cloud/dagster_cloud/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.6.9"
__version__ = "1.6.10"
9 changes: 9 additions & 0 deletions dagster-cloud/dagster_cloud/workspace/ecs/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@

STOPPED_TASK_GRACE_PERIOD = 30


ECS_EXEC_LINUX_PARAMETERS = {
"capabilities": {"add": ["SYS_PTRACE"]},
"initProcessEnabled": True,
}

config = Config(retries={"max_attempts": 50, "mode": "standard"})


Expand Down Expand Up @@ -141,6 +147,7 @@ def register_task_definition(
runtime_platform=None,
mount_points=None,
volumes=None,
linux_parameters=None,
):
container_name = container_name or family

Expand Down Expand Up @@ -179,6 +186,7 @@ def register_task_definition(
runtime_platform=runtime_platform,
mount_points=mount_points,
volumes=volumes,
linux_parameters=linux_parameters,
)

try:
Expand Down Expand Up @@ -257,6 +265,7 @@ def create_service(
runtime_platform=runtime_platform,
mount_points=mount_points,
volumes=volumes,
linux_parameters=ECS_EXEC_LINUX_PARAMETERS if allow_ecs_exec else None,
)

service_registry_arn = None
Expand Down
13 changes: 12 additions & 1 deletion dagster-cloud/dagster_cloud/workspace/ecs/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@
GetCrashedPexServersArgs,
)
from dagster_cloud.workspace.config_schema import SHARED_ECS_CONFIG
from dagster_cloud.workspace.ecs.client import DEFAULT_ECS_GRACE_PERIOD, DEFAULT_ECS_TIMEOUT, Client
from dagster_cloud.workspace.ecs.client import (
DEFAULT_ECS_GRACE_PERIOD,
DEFAULT_ECS_TIMEOUT,
ECS_EXEC_LINUX_PARAMETERS,
Client,
)
from dagster_cloud.workspace.ecs.service import Service
from dagster_cloud.workspace.ecs.utils import get_ecs_human_readable_label, unique_ecs_resource_name
from dagster_cloud.workspace.user_code_launcher import (
Expand Down Expand Up @@ -642,6 +647,11 @@ def _run_launcher_kwargs(self) -> Dict[str, Any]:
**({"runtime_platform": self.runtime_platform} if self.runtime_platform else {}),
**({"mount_points": self.mount_points} if self.mount_points else {}),
**({"volumes": self.volumes} if self.volumes else {}),
**(
{"linux_parameters": ECS_EXEC_LINUX_PARAMETERS}
if self._get_enable_ecs_exec()
else {}
),
},
secrets=self.secrets,
secrets_tag=self.secrets_tag,
Expand All @@ -651,6 +661,7 @@ def _run_launcher_kwargs(self) -> Dict[str, Any]:
"cluster": self.cluster,
"networkConfiguration": self.client.network_configuration,
"launchType": self.launch_type,
**({"enableExecuteCommand": True} if self._get_enable_ecs_exec() else {}),
},
run_ecs_tags=self.run_ecs_tags,
container_name=CONTAINER_NAME,
Expand Down
2 changes: 1 addition & 1 deletion dagster-cloud/dagster_cloud/workspace/ecs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import re
from typing import Optional

from dagster._core.host_representation.origin import ExternalJobOrigin
from dagster._core.remote_representation.origin import ExternalJobOrigin
from dagster_aws.ecs.utils import sanitize_family

from ..user_code_launcher.utils import get_human_readable_label, unique_resource_name
Expand Down
Loading

0 comments on commit b6cf514

Please sign in to comment.