Skip to content

Commit

Permalink
Release 1.9.12 (tracks 0a20a5c0c0bad6e3d3a3800c229563cd462c7db1)
Browse files Browse the repository at this point in the history
  • Loading branch information
elementl-devtools committed Feb 10, 2025
1 parent ea78f12 commit 5116b28
Show file tree
Hide file tree
Showing 15 changed files with 145 additions and 39 deletions.
2 changes: 1 addition & 1 deletion dagster-cloud-cli/dagster_cloud_cli/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.9.11"
__version__ = "1.9.12"
2 changes: 1 addition & 1 deletion dagster-cloud-cli/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ classifiers = [
"Operating System :: OS Independent",
]
dependencies = [
"dagster==1.9.11",
"dagster==1.9.12",
"packaging>=20.9",
"questionary",
"requests",
Expand Down
2 changes: 1 addition & 1 deletion dagster-cloud-examples/dagster_cloud_examples/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.9.11"
__version__ = "1.9.12"
2 changes: 1 addition & 1 deletion dagster-cloud-examples/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ classifiers = [
"Operating System :: OS Independent",
]
dependencies = [
"dagster-cloud==1.9.11",
"dagster-cloud==1.9.12",
]

[project.license]
Expand Down
1 change: 0 additions & 1 deletion dagster-cloud/dagster_cloud/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,6 @@ def dagster_cloud_run_worker_monitoring_interval_seconds(self) -> int:
def opentelemetry(self) -> OpenTelemetryController:
if not self._opentelemetry_controller:
self._opentelemetry_controller = OpenTelemetryController(
service_name=self.agent_display_name,
instance_id=self.instance_uuid,
version=__version__,
config=self._opentelemetry_config,
Expand Down
8 changes: 7 additions & 1 deletion dagster-cloud/dagster_cloud/opentelemetry/config/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dagster import BoolSource, Field, Shape
from dagster import BoolSource, Field, Shape, StringSource

from dagster_cloud.opentelemetry.config.exporter import exporter_config_schema
from dagster_cloud.opentelemetry.config.log_record_processor import log_record_processor_schema
Expand All @@ -15,6 +15,12 @@ def opentelemetry_config_schema():
is_required=False,
default_value=True,
),
"service_name": Field(
StringSource,
description="Name of the service to which to send telemetry",
is_required=False,
default_value="dagster-cloud-agent",
),
"logging": Field(
description="The logging configuration.",
is_required=False,
Expand Down
6 changes: 3 additions & 3 deletions dagster-cloud/dagster_cloud/opentelemetry/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ def main():
logger.info("OpenTelemetry is not started. This log will not be captured by OpenTelemetry.")
opentelemetry = OpenTelemetryController(
service_name="my_service",
instance_id="my_instance",
version="0.0.1",
config=otel_config,
Expand Down Expand Up @@ -95,15 +94,13 @@ def main():

def __init__(
self,
service_name: str,
instance_id: str,
version: str,
config: Optional[Mapping[str, Any]] = None,
logger: Optional[logging.Logger] = None,
):
"""Initialize the telemetry instance.
params:
service_name: str: The name of the service.
instance_id: str: The unique identifier of the service instance.
version: str: The version of the service.
config: dict: The configuration for the telemetry components.
Expand All @@ -122,6 +119,9 @@ def __init__(
self._meters: dict[str, Meter] = {}

self._config = config or {}

service_name = self._config["service_name"]

self._logger = logger or logging.getLogger()
self._resource_attributes = {
"service.name": service_name,
Expand Down
8 changes: 4 additions & 4 deletions dagster-cloud/dagster_cloud/pex/grpc/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import dagster._check as check
import grpc
from dagster._core.errors import DagsterUserCodeUnreachableError
from dagster._grpc.__generated__ import api_pb2
from dagster._grpc.__generated__.api_pb2_grpc import (
from dagster._grpc.__generated__ import dagster_api_pb2
from dagster._grpc.__generated__.dagster_api_pb2_grpc import (
DagsterApiServicer,
add_DagsterApiServicer_to_server,
)
Expand Down Expand Up @@ -186,7 +186,7 @@ def ListRepositories(self, request, context):
self._get_handle_from_metadata(context)
)
if isinstance(client_or_error, SerializableErrorInfo):
return api_pb2.ListRepositoriesReply(
return dagster_api_pb2.ListRepositoriesReply(
serialized_list_repositories_response_or_error=serialize_value(client_or_error)
)
return client_or_error._get_response("ListRepositories", request) # noqa: SLF001
Expand Down Expand Up @@ -337,7 +337,7 @@ def GetCurrentRuns(self, request, context):
f"Active server hit error:\n{e}",
)

return api_pb2.GetCurrentRunsReply(
return dagster_api_pb2.GetCurrentRunsReply(
serialized_current_runs=serialize_value(
GetCurrentRunsResult(current_runs=all_run_ids, serializable_error_info=None)
)
Expand Down
24 changes: 24 additions & 0 deletions dagster-cloud/dagster_cloud/storage/event_logs/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,30 @@
}
"""

GET_POOL_LIMITS_QUERY = """
query getPoolLimits {
eventLogs {
getPoolLimits {
name
limit
fromDefault
}
}
}
"""

GET_POOL_CONFIG_QUERY = """
query getPoolConfig {
eventLogs {
getPoolConfig {
poolGranularity
defaultPoolLimit
opGranularityRunBuffer
}
}
}
"""

GET_CONCURRENCY_INFO_QUERY = """
query getConcurrencyInfo($concurrencyKey: String!) {
eventLogs {
Expand Down
28 changes: 28 additions & 0 deletions dagster-cloud/dagster_cloud/storage/event_logs/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from dagster._core.events import DagsterEvent, DagsterEventType
from dagster._core.events.log import EventLogEntry
from dagster._core.execution.stats import RunStepKeyStatsSnapshot, RunStepMarker, StepEventStatus
from dagster._core.instance.config import PoolConfig, PoolGranularity
from dagster._core.storage.asset_check_execution_record import (
AssetCheckExecutionRecord,
AssetCheckExecutionRecordStatus,
Expand All @@ -47,6 +48,7 @@
EventLogConnection,
EventLogStorage,
PlannedMaterializationInfo,
PoolLimit,
)
from dagster._core.storage.partition_status_cache import AssetStatusCacheValue
from dagster._serdes import ConfigurableClass, ConfigurableClassData, serialize_value
Expand Down Expand Up @@ -106,6 +108,8 @@
GET_MATERIALIZATION_COUNT_BY_PARTITION,
GET_MATERIALIZED_PARTITIONS,
GET_MAXIMUM_RECORD_ID,
GET_POOL_CONFIG_QUERY,
GET_POOL_LIMITS_QUERY,
GET_RECORDS_FOR_RUN_QUERY,
GET_RUN_STATUS_CHANGE_EVENTS_QUERY,
GET_STATS_FOR_RUN_QUERY,
Expand Down Expand Up @@ -1332,3 +1336,27 @@ def get_asset_status_cache_values(
else None
for value in res["data"]["eventLogs"]["getAssetStatusCacheValues"]
]

def get_pool_config(self):
res = self._execute_query(GET_POOL_CONFIG_QUERY)
pool_config = res["data"]["eventLogs"]["getPoolConfig"]
granularity_str = pool_config.get("poolGranularity")

return PoolConfig(
pool_granularity=PoolGranularity(granularity_str) if granularity_str else None,
default_pool_limit=pool_config.get("defaultPoolLimit"),
op_granularity_run_buffer=pool_config.get("opGranularityRunBuffer"),
)

def get_pool_limits(self) -> Sequence[PoolLimit]:
"""Get the set of concurrency limited keys and limits."""
res = self._execute_query(GET_POOL_LIMITS_QUERY)
limits = res["data"]["eventLogs"]["getPoolLimits"]
return [
PoolLimit(
name=limit.get("name"),
limit=limit.get("limit"),
from_default=limit.get("from_default"),
)
for limit in limits
]
49 changes: 39 additions & 10 deletions dagster-cloud/dagster_cloud/storage/event_logs/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from typing import Optional
from typing import List, Optional

from dagster import DagsterEvent
from dagster._core.events import (
Expand All @@ -26,48 +26,77 @@ def _get_maximum_event_message_characters() -> int:

def _truncate_dagster_event_error(
error_info: Optional[SerializableErrorInfo],
truncations: List[str],
) -> Optional[SerializableErrorInfo]:
if not error_info:
return error_info

return truncate_serialized_error(error_info, _get_error_character_size_limit(), max_depth=5)
return truncate_serialized_error(
error_info,
_get_error_character_size_limit(),
max_depth=5,
truncations=truncations,
)


def _truncate_dagster_event(dagster_event: Optional[DagsterEvent]) -> Optional[DagsterEvent]:
def _truncate_dagster_event(
dagster_event: Optional[DagsterEvent],
truncations: List[str],
) -> Optional[DagsterEvent]:
if not dagster_event:
return dagster_event
event_specific_data = dagster_event.event_specific_data

if isinstance(event_specific_data, JobFailureData):
event_specific_data = event_specific_data._replace(
error=_truncate_dagster_event_error(event_specific_data.error),
error=_truncate_dagster_event_error(
event_specific_data.error,
truncations,
),
first_step_failure_event=_truncate_dagster_event(
event_specific_data.first_step_failure_event
event_specific_data.first_step_failure_event,
truncations,
),
)
elif isinstance(
event_specific_data,
(JobCanceledData, EngineEventData, HookErroredData, StepFailureData, StepRetryData),
):
event_specific_data = event_specific_data._replace(
error=_truncate_dagster_event_error(event_specific_data.error),
error=_truncate_dagster_event_error(
event_specific_data.error,
truncations,
),
)

return dagster_event._replace(event_specific_data=event_specific_data)


def truncate_event(event: EventLogEntry, maximum_length=None) -> EventLogEntry:
def truncate_event(
event: EventLogEntry,
maximum_length=None,
truncations: Optional[List[str]] = None,
) -> EventLogEntry:
truncations = [] if truncations is None else truncations

if event.dagster_event:
event = event._replace(dagster_event=_truncate_dagster_event(event.dagster_event))
event = event._replace(
dagster_event=_truncate_dagster_event(
event.dagster_event,
truncations,
)
)

maximum_length = (
maximum_length if maximum_length is not None else _get_maximum_event_message_characters()
)

if len(event.user_message) > maximum_length:
len_usr_msg = len(event.user_message)
if len_usr_msg > maximum_length:
truncations.append(f"user_message {len_usr_msg} to {maximum_length}")
return event._replace(
user_message=(
f"[TRUNCATED from {len(event.user_message)} characters to"
f"[TRUNCATED from {len_usr_msg} characters to"
f" {maximum_length}]"
f" {event.user_message[:maximum_length]} [TRUNCATED]"
),
Expand Down
27 changes: 22 additions & 5 deletions dagster-cloud/dagster_cloud/util/errors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Sequence
from typing import List, Optional, Sequence

from dagster._serdes import serialize_value
from dagster._utils.error import SerializableErrorInfo
Expand All @@ -14,10 +14,16 @@ def unwrap_user_code_error(error_info: SerializableErrorInfo) -> SerializableErr


def truncate_serialized_error(
error_info: SerializableErrorInfo, field_size_limit: int, max_depth: int
error_info: SerializableErrorInfo,
field_size_limit: int,
max_depth: int,
truncations: Optional[List[str]] = None,
):
truncations = [] if truncations is None else truncations

if error_info.cause:
if max_depth == 0:
truncations.append("cause")
new_cause = (
error_info.cause
if len(serialize_value(error_info.cause)) <= field_size_limit
Expand All @@ -29,12 +35,16 @@ def truncate_serialized_error(
)
else:
new_cause = truncate_serialized_error(
error_info.cause, field_size_limit, max_depth=max_depth - 1
error_info.cause,
field_size_limit,
max_depth=max_depth - 1,
truncations=truncations,
)
error_info = error_info._replace(cause=new_cause)

if error_info.context:
if max_depth == 0:
truncations.append("context")
new_context = (
error_info.context
if len(serialize_value(error_info.context)) <= field_size_limit
Expand All @@ -46,7 +56,10 @@ def truncate_serialized_error(
)
else:
new_context = truncate_serialized_error(
error_info.context, field_size_limit, max_depth=max_depth - 1
error_info.context,
field_size_limit,
max_depth=max_depth - 1,
truncations=truncations,
)
error_info = error_info._replace(context=new_context)

Expand All @@ -55,19 +68,23 @@ def truncate_serialized_error(
for stack_elem in error_info.stack:
stack_size_so_far += len(stack_elem)
if stack_size_so_far > field_size_limit:
truncations.append("stack")
truncated_stack.append("(TRUNCATED)")
break

truncated_stack.append(stack_elem)

error_info = error_info._replace(stack=truncated_stack)

if len(error_info.message) > field_size_limit:
msg_len = len(error_info.message)
if msg_len > field_size_limit:
truncations.append(f"message from {msg_len} to {field_size_limit}")
error_info = error_info._replace(
message=error_info.message[:field_size_limit] + " (TRUNCATED)"
)

if error_info.cls_name and len(error_info.cls_name) > ERROR_CLASS_NAME_SIZE_LIMIT:
truncations.append("cls_name")
error_info = error_info._replace(
cls_name=error_info.cls_name[:ERROR_CLASS_NAME_SIZE_LIMIT] + " (TRUNCATED)"
)
Expand Down
2 changes: 1 addition & 1 deletion dagster-cloud/dagster_cloud/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.9.11"
__version__ = "1.9.12"
10 changes: 6 additions & 4 deletions dagster-cloud/dagster_cloud/workspace/config_schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,12 @@ def process_workspace_config(workspace_config) -> Dict[str, Any]:
check.is_list(workspace_config.get("locations"))

validation = validate_config(WORKSPACE_CONFIG_SCHEMA, workspace_config)
check.invariant(
validation.success,
", ".join([error.message for error in cast(List[EvaluationError], validation.errors)]),
)
if not validation.success:
check.failed(
", ".join(
[error.message for error in cast(List[EvaluationError], validation.errors)]
),
)
return workspace_config


Expand Down
Loading

0 comments on commit 5116b28

Please sign in to comment.