diff --git a/dagster-cloud-cli/dagster_cloud_cli/commands/ci/__init__.py b/dagster-cloud-cli/dagster_cloud_cli/commands/ci/__init__.py index 47ce6cc..109d62e 100644 --- a/dagster-cloud-cli/dagster_cloud_cli/commands/ci/__init__.py +++ b/dagster-cloud-cli/dagster_cloud_cli/commands/ci/__init__.py @@ -292,7 +292,7 @@ def notify( source = metrics.get_source() if source == CliEventTags.source.github: event = github_context.get_github_event(project_dir) - msg = "Your pull request is automatically being deployed to Dagster Cloud." + msg = f"Your pull request at commit `{event.github_sha}` is automatically being deployed to Dagster Cloud." event.update_pr_comment( msg + "\n\n" + report.markdown_report(location_states), orig_author="github-actions[bot]", diff --git a/dagster-cloud-cli/dagster_cloud_cli/commands/config.py b/dagster-cloud-cli/dagster_cloud_cli/commands/config.py index 87fe57f..1e44cd1 100644 --- a/dagster-cloud-cli/dagster_cloud_cli/commands/config.py +++ b/dagster-cloud-cli/dagster_cloud_cli/commands/config.py @@ -25,7 +25,8 @@ @app.command() @dagster_cloud_options() def set_deployment( - ctx: Context, deployment: str = Argument(..., autocompletion=available_deployment_names) + ctx: Context, + deployment: str = Argument(..., autocompletion=available_deployment_names), ): """Set the default deployment for CLI commands.""" deployments = available_deployment_names(ctx=ctx) @@ -42,7 +43,10 @@ def set_deployment( @app.command() def view( show_token: bool = Option( - False, "--show-token", "-s", help="Whether to display the user token in plaintext." + False, + "--show-token", + "-s", + help="Whether to display the user token in plaintext.", ), ): """View the current CLI configuration.""" @@ -198,7 +202,7 @@ def _setup(organization: str, deployment: str, api_token: str): deployment_name = deployment if deployment else "prod" ui.print( "\nTo create a new user token or find an existing token, visit" - f" https://dagster.cloud/{new_org}/{deployment_name}/cloud-settings/tokens" + f" https://dagster.cloud/{new_org}/{deployment_name}/org-settings/tokens" ) new_api_token = ( ui.password_input("Dagster Cloud user token:", default=api_token or "") or None @@ -232,7 +236,9 @@ def _setup(organization: str, deployment: str, api_token: str): write_config( DagsterCloudCliConfig( - organization=new_org, default_deployment=new_deployment, user_token=new_api_token + organization=new_org, + default_deployment=new_deployment, + user_token=new_api_token, ) ) diff --git a/dagster-cloud-cli/dagster_cloud_cli/commands/deployment/alert_policies/config_schema.py b/dagster-cloud-cli/dagster_cloud_cli/commands/deployment/alert_policies/config_schema.py index eef9f63..0ba6e87 100644 --- a/dagster-cloud-cli/dagster_cloud_cli/commands/deployment/alert_policies/config_schema.py +++ b/dagster-cloud-cli/dagster_cloud_cli/commands/deployment/alert_policies/config_schema.py @@ -180,6 +180,17 @@ def process_alert_policies_config(alert_policies_config): } ) ), + "pagerduty": Field( + config=Shape( + fields={ + "integration_key": Field( + config=str, + is_required=True, + description="The integration key for your PagerDuty app.", + ) + } + ) + ), } ), is_required=True, diff --git a/dagster-cloud-cli/dagster_cloud_cli/gql.py b/dagster-cloud-cli/dagster_cloud_cli/gql.py index d72fd95..cd6c297 100644 --- a/dagster-cloud-cli/dagster_cloud_cli/gql.py +++ b/dagster-cloud-cli/dagster_cloud_cli/gql.py @@ -353,6 +353,9 @@ def get_deployment_settings(client: GqlShimClient) -> Dict[str, Any]: ... on MicrosoftTeamsAlertPolicyNotification { webhookUrl } + ... on PagerdutyAlertPolicyNotification { + integrationKey + } } enabled } diff --git a/dagster-cloud-cli/dagster_cloud_cli/version.py b/dagster-cloud-cli/dagster_cloud_cli/version.py index b370bd0..44bc99c 100644 --- a/dagster-cloud-cli/dagster_cloud_cli/version.py +++ b/dagster-cloud-cli/dagster_cloud_cli/version.py @@ -1 +1 @@ -__version__ = "1.6.11" +__version__ = "1.6.12" diff --git a/dagster-cloud-examples/dagster_cloud_examples/version.py b/dagster-cloud-examples/dagster_cloud_examples/version.py index b370bd0..44bc99c 100644 --- a/dagster-cloud-examples/dagster_cloud_examples/version.py +++ b/dagster-cloud-examples/dagster_cloud_examples/version.py @@ -1 +1 @@ -__version__ = "1.6.11" +__version__ = "1.6.12" diff --git a/dagster-cloud-examples/setup.py b/dagster-cloud-examples/setup.py index 1c992e7..dd16677 100644 --- a/dagster-cloud-examples/setup.py +++ b/dagster-cloud-examples/setup.py @@ -19,7 +19,7 @@ def get_version() -> str: name="dagster-cloud-examples", version=ver, packages=find_packages(exclude=["dagster_cloud_examples_tests*"]), - install_requires=["dagster_cloud==1.6.11"], + install_requires=["dagster_cloud==1.6.12"], extras_require={"tests": ["mypy", "pylint", "pytest"]}, author="Elementl", author_email="hello@elementl.com", diff --git a/dagster-cloud/dagster_cloud/dagster_anomaly_detection/defs.py b/dagster-cloud/dagster_cloud/dagster_anomaly_detection/defs.py index ed9c69a..e6cb0b8 100644 --- a/dagster-cloud/dagster_cloud/dagster_anomaly_detection/defs.py +++ b/dagster-cloud/dagster_cloud/dagster_anomaly_detection/defs.py @@ -59,11 +59,8 @@ def the_check(context: AssetExecutionContext) -> AssetCheckResult: headers={"Dagster-Cloud-Api-Token": instance.dagster_cloud_agent_token}, ) client = Client(transport=transport, fetch_schema_from_transport=True) - asset_key = context.asset_checks_def.asset_key - asset = context.job_def.asset_layer.assets_defs_by_key.get( - asset_key - ) or context.job_def.asset_layer.source_assets_by_key.get(asset_key) - if asset is None: + asset_key = next(iter(context.assets_def.check_keys)).asset_key + if not context.job_def.asset_layer.has(asset_key): raise Exception(f"Could not find targeted asset {asset_key.to_string()}.") result = client.execute( gql(ANOMALY_DETECTION_INFERENCE_MUTATION), diff --git a/dagster-cloud/dagster_cloud/serverless/io_manager.py b/dagster-cloud/dagster_cloud/serverless/io_manager.py index db1335d..8b2923d 100644 --- a/dagster-cloud/dagster_cloud/serverless/io_manager.py +++ b/dagster-cloud/dagster_cloud/serverless/io_manager.py @@ -59,13 +59,6 @@ def has_output(self, context): key = self._get_path(context) return self._has_object(key) - def _rm_object(self, key): - check.str_param(key, "key") - check.param_invariant(len(key) > 0, "key") - - # delete_object wont fail even if the item has been deleted. - self._s3.delete_object(Bucket=self._bucket, Key=key) - def _has_object(self, key): check.str_param(key, "key") check.param_invariant(len(key) > 0, "key") @@ -101,9 +94,6 @@ def handle_output(self, context, obj): key = self._get_path(context) - if self._has_object(key): - self._rm_object(key) - pickled_obj = pickle.dumps(obj, PICKLE_PROTOCOL) pickled_obj_bytes = io.BytesIO(pickled_obj) self._s3.upload_fileobj( diff --git a/dagster-cloud/dagster_cloud/version.py b/dagster-cloud/dagster_cloud/version.py index b370bd0..44bc99c 100644 --- a/dagster-cloud/dagster_cloud/version.py +++ b/dagster-cloud/dagster_cloud/version.py @@ -1 +1 @@ -__version__ = "1.6.11" +__version__ = "1.6.12" diff --git a/dagster-cloud/dagster_cloud/workspace/docker/__init__.py b/dagster-cloud/dagster_cloud/workspace/docker/__init__.py index 0bb0a1b..fd6bbec 100644 --- a/dagster-cloud/dagster_cloud/workspace/docker/__init__.py +++ b/dagster-cloud/dagster_cloud/workspace/docker/__init__.py @@ -14,9 +14,9 @@ from dagster._serdes.config_class import ConfigurableClassData from dagster._utils import find_free_port from dagster._utils.merger import merge_dicts -from dagster_cloud_cli.core.workspace import CodeDeploymentMetadata from dagster_docker import DockerRunLauncher from dagster_docker.container_context import DockerContainerContext +from dateutil.parser import parse from docker.models.containers import Container from typing_extensions import Self @@ -38,6 +38,8 @@ GRPC_SERVER_LABEL = "dagster_grpc_server" MULTIPEX_SERVER_LABEL = "dagster_multipex_server" AGENT_LABEL = "dagster_agent_id" +SERVER_TIMESTAMP_LABEL = "dagster_server_timestamp" + IMAGE_PULL_LOG_INTERVAL = 15 @@ -261,8 +263,9 @@ def _start_new_server_spinup( self, deployment_name: str, location_name: str, - metadata: CodeDeploymentMetadata, + desired_entry: UserCodeLauncherEntry, ) -> DagsterCloudGrpcServer: + metadata = desired_entry.code_deployment_metadata container_name = unique_docker_resource_name(deployment_name, location_name) container_context = DockerContainerContext( @@ -294,6 +297,7 @@ def _start_new_server_spinup( MULTIPEX_SERVER_LABEL: "", deterministic_label_for_location(deployment_name, location_name): "", AGENT_LABEL: self._instance.instance_uuid, + SERVER_TIMESTAMP_LABEL: str(desired_entry.update_timestamp), } else: command = metadata.get_grpc_server_command( @@ -306,6 +310,7 @@ def _start_new_server_spinup( GRPC_SERVER_LABEL: "", deterministic_label_for_location(deployment_name, location_name): "", AGENT_LABEL: self._instance.instance_uuid, + SERVER_TIMESTAMP_LABEL: str(desired_entry.update_timestamp), } container, server_endpoint = self._launch_container( @@ -351,6 +356,10 @@ def _remove_server_handle(self, server_handle: DagsterDockerContainer) -> None: def get_agent_id_for_server(self, handle: DagsterDockerContainer) -> Optional[str]: return handle.container.labels.get(AGENT_LABEL) + def get_server_create_timestamp(self, handle: DagsterDockerContainer) -> Optional[float]: + created_time_str = handle.container.attrs["Created"] + return parse(created_time_str).timestamp() + def _list_server_handles(self) -> List[DagsterDockerContainer]: client = docker.client.from_env() return [ diff --git a/dagster-cloud/dagster_cloud/workspace/ecs/launcher.py b/dagster-cloud/dagster_cloud/workspace/ecs/launcher.py index 67836cc..c7f31d2 100644 --- a/dagster-cloud/dagster_cloud/workspace/ecs/launcher.py +++ b/dagster-cloud/dagster_cloud/workspace/ecs/launcher.py @@ -19,7 +19,6 @@ from dagster._utils.merger import merge_dicts from dagster_aws.ecs.container_context import EcsContainerContext from dagster_aws.secretsmanager import get_secrets_from_arns -from dagster_cloud_cli.core.workspace import CodeDeploymentMetadata from dagster_cloud.api.dagster_cloud_api import ( UserCodeDeploymentType, @@ -342,8 +341,13 @@ def get_code_server_resource_limits( } def _start_new_server_spinup( - self, deployment_name: str, location_name: str, metadata: CodeDeploymentMetadata + self, + deployment_name: str, + location_name: str, + desired_entry: UserCodeLauncherEntry, ) -> DagsterCloudGrpcServer: + metadata = desired_entry.code_deployment_metadata + if metadata.pex_metadata: command = metadata.get_multipex_server_command( PORT, metrics_enabled=self._instance.user_code_launcher.code_server_metrics_enabled @@ -352,6 +356,7 @@ def _start_new_server_spinup( tags = { "dagster/multipex_server": "1", "dagster/agent_id": self._instance.instance_uuid, + "dagster/server_timestamp": str(desired_entry.update_timestamp), } else: command = metadata.get_grpc_server_command( @@ -360,7 +365,11 @@ def _start_new_server_spinup( additional_env = metadata.get_grpc_server_env( PORT, location_name, self._instance.ref_for_deployment(deployment_name) ) - tags = {"dagster/grpc_server": "1", "dagster/agent_id": self._instance.instance_uuid} + tags = { + "dagster/grpc_server": "1", + "dagster/agent_id": self._instance.instance_uuid, + "dagster/server_timestamp": str(desired_entry.update_timestamp), + } container_context = EcsContainerContext( secrets=self.secrets, @@ -632,6 +641,9 @@ def get_agent_id_for_server(self, handle: EcsServerHandleType) -> Optional[str]: # Need to get container for server handle, then get the agent tag from that. return handle.tags.get("dagster/agent_id") + def get_server_create_timestamp(self, handle: EcsServerHandleType) -> Optional[float]: + return handle.create_timestamp + def _run_launcher_kwargs(self) -> Dict[str, Any]: return dict( task_definition={ diff --git a/dagster-cloud/dagster_cloud/workspace/ecs/service.py b/dagster-cloud/dagster_cloud/workspace/ecs/service.py index f1b9080..a030bd9 100644 --- a/dagster-cloud/dagster_cloud/workspace/ecs/service.py +++ b/dagster-cloud/dagster_cloud/workspace/ecs/service.py @@ -1,4 +1,5 @@ import logging +from typing import Optional from dagster._utils.cached_method import cached_method @@ -76,19 +77,15 @@ def public_ip(self): @property @cached_method - def created_at(self): - task_arns = self.client.ecs.list_tasks( - cluster=self.client.cluster_name, - serviceName=self.name, - ).get("taskArns") + def create_timestamp(self) -> Optional[float]: + response = self.client.ecs.describe_services( + cluster=self.client.cluster_name, services=[self.name] + ) - # Assume there's only one task per service - task = self.client.ecs.describe_tasks( - cluster=self.client.cluster_name, - tasks=task_arns, - ).get("tasks")[0] + service_description = response["services"][0] - return task.get("createdAt") + # Extract the creation timestamp + return service_description["createdAt"].timestamp() @property @cached_method diff --git a/dagster-cloud/dagster_cloud/workspace/kubernetes/launcher.py b/dagster-cloud/dagster_cloud/workspace/kubernetes/launcher.py index 3bd5340..189d1a3 100644 --- a/dagster-cloud/dagster_cloud/workspace/kubernetes/launcher.py +++ b/dagster-cloud/dagster_cloud/workspace/kubernetes/launcher.py @@ -2,7 +2,7 @@ from collections import defaultdict from contextlib import contextmanager from pathlib import Path -from typing import Any, Collection, Dict, List, NamedTuple, Optional, Set, Tuple, cast +from typing import Any, Collection, Dict, List, Mapping, NamedTuple, Optional, Set, Tuple import kubernetes import kubernetes.client as client @@ -24,7 +24,6 @@ from dagster_k8s.container_context import K8sContainerContext from dagster_k8s.job import UserDefinedDagsterK8sConfig from dagster_k8s.models import k8s_snake_case_dict -from kubernetes.client.models.v1_deployment_list import V1DeploymentList from kubernetes.client.rest import ApiException from typing_extensions import Self @@ -62,6 +61,8 @@ class K8sHandle(NamedTuple): namespace: str name: str + labels: Mapping[str, str] + creation_timestamp: Optional[float] def __str__(self): return f"{self.namespace}/{self.name}" @@ -396,8 +397,13 @@ def get_code_server_resource_limits( } def _start_new_server_spinup( - self, deployment_name: str, location_name: str, metadata: CodeDeploymentMetadata + self, + deployment_name: str, + location_name: str, + desired_entry: UserCodeLauncherEntry, ) -> DagsterCloudGrpcServer: + metadata = desired_entry.code_deployment_metadata + args = metadata.get_grpc_server_command( metrics_enabled=self._instance.user_code_launcher.code_server_metrics_enabled ) @@ -406,9 +412,11 @@ def _start_new_server_spinup( container_context = self._resolve_container_context(metadata) + deployment_reponse = None + try: with self._get_apps_api_instance() as api_instance: - api_response = api_instance.create_namespaced_deployment( + deployment_reponse = api_instance.create_namespaced_deployment( container_context.namespace, body=construct_code_location_deployment( self._instance, @@ -418,11 +426,12 @@ def _start_new_server_spinup( metadata, container_context, args=args, + server_timestamp=desired_entry.update_timestamp, ), ) self._logger.info( "Created deployment {} in namespace {}".format( - api_response.metadata.name, + deployment_reponse.metadata.name, container_context.namespace, ) ) @@ -437,7 +446,7 @@ def _start_new_server_spinup( self._used_namespaces[(deployment_name, location_name)].add(namespace) try: - api_response = self._get_core_api_client().create_namespaced_service( + service_response = self._get_core_api_client().create_namespaced_service( namespace, construct_code_location_service( deployment_name, @@ -445,10 +454,11 @@ def _start_new_server_spinup( resource_name, container_context, self._instance, + server_timestamp=desired_entry.update_timestamp, ), ) self._logger.info( - f"Created service {api_response.metadata.name} in namespace {namespace}" + f"Created service {service_response.metadata.name} in namespace {namespace}" ) except ApiException as e: self._logger.error( @@ -466,7 +476,16 @@ def _start_new_server_spinup( ) return DagsterCloudGrpcServer( - K8sHandle(namespace=namespace, name=resource_name), endpoint, metadata + K8sHandle( + namespace=namespace, + name=resource_name, + labels=deployment_reponse.metadata.labels, + creation_timestamp=deployment_reponse.metadata.creation_timestamp.timestamp() + if deployment_reponse.metadata.creation_timestamp + else None, + ), + endpoint, + metadata, ) def _get_timeout_debug_info( @@ -540,7 +559,16 @@ def _get_standalone_dagster_server_handles_for_location( ), ).items for deployment in deployments: - handles.append(K8sHandle(namespace=namespace, name=deployment.metadata.name)) + handles.append( + K8sHandle( + namespace=namespace, + name=deployment.metadata.name, + labels=deployment.metadata.labels, + creation_timestamp=deployment.metadata.creation_timestamp.timestamp() + if deployment.metadata.creation_timestamp + else None, + ) + ) return handles @@ -559,27 +587,24 @@ def _list_server_handles(self) -> List[K8sHandle]: label_selector="managed_by=K8sUserCodeLauncher", ).items for deployment in deployments: - handles.append(K8sHandle(namespace, deployment.metadata.name)) + handles.append( + K8sHandle( + namespace, + deployment.metadata.name, + deployment.metadata.labels, + deployment.metadata.creation_timestamp.timestamp() + if deployment.metadata.creation_timestamp + else None, + ) + ) self._logger.info(f"Listing server handles: {handles}") return handles def get_agent_id_for_server(self, handle: K8sHandle) -> Optional[str]: - with self._get_apps_api_instance() as api_instance: - deployments = cast( - V1DeploymentList, - api_instance.list_namespaced_deployment( - handle.namespace, - label_selector="managed_by=K8sUserCodeLauncher", - field_selector=f"metadata.name={handle.name}", - ), - ).items - if not deployments: - self._logger.warning( - f"Attempted to retrieve agent_id for server with handle {handle}; but no" - " deployments found." - ) - return None - return deployments.pop().metadata.labels.get("agent_id") + return handle.labels.get("agent_id") + + def get_server_create_timestamp(self, handle: K8sHandle) -> Optional[float]: + return handle.creation_timestamp def _remove_server_handle(self, server_handle: K8sHandle) -> None: # Since we track which servers to delete by listing the k8s deployments, diff --git a/dagster-cloud/dagster_cloud/workspace/kubernetes/utils.py b/dagster-cloud/dagster_cloud/workspace/kubernetes/utils.py index c5ffd9a..1f5168c 100644 --- a/dagster-cloud/dagster_cloud/workspace/kubernetes/utils.py +++ b/dagster-cloud/dagster_cloud/workspace/kubernetes/utils.py @@ -22,7 +22,10 @@ def _get_dagster_k8s_labels( - deployment_name: str, location_name: str, instance: DagsterCloudAgentInstance + deployment_name: str, + location_name: str, + instance: DagsterCloudAgentInstance, + server_timestamp: float, ) -> Mapping[str, str]: return { **MANAGED_RESOURCES_LABEL, @@ -30,6 +33,7 @@ def _get_dagster_k8s_labels( "location_name": get_k8s_human_readable_label(location_name), "deployment_name": get_k8s_human_readable_label(deployment_name), "agent_id": instance.instance_uuid, + "server_timestamp": str(server_timestamp), } @@ -73,7 +77,12 @@ def get_k8s_human_readable_label(name): def construct_code_location_service( - deployment_name, location_name, service_name, container_context, instance + deployment_name, + location_name, + service_name, + container_context, + instance, + server_timestamp: float, ): labels = container_context.labels @@ -82,7 +91,9 @@ def construct_code_location_service( name=service_name, labels={ **labels, - **_get_dagster_k8s_labels(deployment_name, location_name, instance), + **_get_dagster_k8s_labels( + deployment_name, location_name, instance, server_timestamp + ), }, ), spec=client.V1ServiceSpec( @@ -100,6 +111,7 @@ def construct_code_location_deployment( metadata, container_context, args, + server_timestamp: float, ): pull_policy = container_context.image_pull_policy env_config_maps = container_context.env_config_maps @@ -180,7 +192,9 @@ def construct_code_location_deployment( "name": k8s_deployment_name, "labels": { **container_context.labels, - **_get_dagster_k8s_labels(deployment_name, location_name, instance), + **_get_dagster_k8s_labels( + deployment_name, location_name, instance, server_timestamp + ), }, }, "spec": { # DeploymentSpec @@ -192,7 +206,9 @@ def construct_code_location_deployment( "user-deployment": k8s_deployment_name, **container_context.labels, **user_defined_pod_template_labels, - **_get_dagster_k8s_labels(deployment_name, location_name, instance), + **_get_dagster_k8s_labels( + deployment_name, location_name, instance, server_timestamp + ), }, }, "spec": pod_spec_config, diff --git a/dagster-cloud/dagster_cloud/workspace/user_code_launcher/process.py b/dagster-cloud/dagster_cloud/workspace/user_code_launcher/process.py index 0678986..e769b32 100644 --- a/dagster-cloud/dagster_cloud/workspace/user_code_launcher/process.py +++ b/dagster-cloud/dagster_cloud/workspace/user_code_launcher/process.py @@ -19,7 +19,6 @@ from dagster._serdes.ipc import open_ipc_subprocess from dagster._utils import find_free_port, safe_tempfile_path_unmanaged from dagster._utils.merger import merge_dicts -from dagster_cloud_cli.core.workspace import CodeDeploymentMetadata from typing_extensions import Self from dagster_cloud.api.dagster_cloud_api import ( @@ -223,8 +222,10 @@ def _start_new_server_spinup( self, deployment_name: str, location_name: str, - metadata: CodeDeploymentMetadata, + desired_entry: UserCodeLauncherEntry, ) -> DagsterCloudGrpcServer: + metadata = desired_entry.code_deployment_metadata + key = (deployment_name, location_name) client: Union[MultiPexGrpcClient, DagsterGrpcClient] @@ -391,6 +392,9 @@ def _list_server_handles(self) -> List[int]: def get_agent_id_for_server(self, handle: int) -> Optional[str]: return self._instance.instance_uuid + def get_server_create_timestamp(self, handle: int) -> Optional[float]: + return None + def __exit__(self, exception_type, exception_value, traceback): super().__exit__(exception_value, exception_value, traceback) diff --git a/dagster-cloud/dagster_cloud/workspace/user_code_launcher/user_code_launcher.py b/dagster-cloud/dagster_cloud/workspace/user_code_launcher/user_code_launcher.py index 10a4796..ffaa55f 100644 --- a/dagster-cloud/dagster_cloud/workspace/user_code_launcher/user_code_launcher.py +++ b/dagster-cloud/dagster_cloud/workspace/user_code_launcher/user_code_launcher.py @@ -31,6 +31,7 @@ ) import dagster._check as check +import pendulum from dagster import BoolSource, Field, IntSource from dagster._api.list_repositories import sync_list_repositories_grpc from dagster._core.definitions.selector import JobSelector @@ -94,6 +95,8 @@ # How often to sync actual_entries with pex server liveness MULTIPEX_ACTUAL_ENTRIES_REFRESH_INTERVAL = 30 +CLEANUP_SERVER_GRACE_PERIOD_SECONDS = 3600 + ServerHandle = TypeVar("ServerHandle") DEPLOYMENT_INFO_QUERY = """ @@ -814,7 +817,7 @@ def _start_new_server_spinup( self, deployment_name: str, location_name: str, - metadata: CodeDeploymentMetadata, + desired_entry: UserCodeLauncherEntry, ) -> DagsterCloudGrpcServer: """Create a new server for the given location using the given metadata as configuration and return a ServerHandle indicating where it can be found. Any waiting for the server @@ -925,6 +928,10 @@ def _list_server_handles(self) -> List[ServerHandle]: def get_agent_id_for_server(self, handle: ServerHandle) -> Optional[str]: """Returns the agent_id that created a particular GRPC server.""" + @abstractmethod + def get_server_create_timestamp(self, handle: ServerHandle) -> Optional[float]: + """Returns the update_timestamp value from the given code server.""" + def _can_cleanup_server(self, handle: ServerHandle, active_agent_ids: Set[str]) -> bool: """Returns true if we can clean up the server identified by the handle without issues (server was started by this agent, or agent is no longer active).""" agent_id_for_server = self.get_agent_id_for_server(handle) @@ -933,11 +940,31 @@ def _can_cleanup_server(self, handle: ServerHandle, active_agent_ids: Set[str]) f" {self._instance.instance_uuid}." ) self._logger.info(f"All active agent ids: {active_agent_ids}") - return ( - not agent_id_for_server - or self._instance.instance_uuid == agent_id_for_server - or agent_id_for_server not in cast(Set[str], active_agent_ids) - ) + + # If this server was created by the current agent, it can always be cleaned up + # (or if its a legacy server that never set an agent ID) + if not agent_id_for_server or self._instance.instance_uuid == agent_id_for_server: + return True + + try: + update_timestamp_for_server = self.get_server_create_timestamp(handle) + except: + self._logger.exception(f"Failure fetching service creation timestamp for {handle}") + return False + + # Clean up servers that were created more than CLEANUP_SERVER_GRACE_PERIOD_SECONDS + # seconds ago (to avoid race conditions) and were created by some agent that is now + # inactive, to ensure that servers are eventually cleaned up by the next agent + # when an agent crashes + if ( + update_timestamp_for_server + and update_timestamp_for_server + >= pendulum.now("UTC").timestamp() - CLEANUP_SERVER_GRACE_PERIOD_SECONDS + ): + self._logger.info("Not cleaning up server since it was recently created") + return False + + return agent_id_for_server not in cast(Set[str], active_agent_ids) def _graceful_cleanup_servers(self): # ServerHandles active_agent_ids = self.get_active_agent_ids() @@ -1301,10 +1328,10 @@ def _reconcile( self._logger.info( f"Creating new multipex server for {multipex_server_repr}" ) - # confirm it's a valid image since _create_multipex_server will launch a container + # confirm it's a valid image since _start_new_server_spinup will launch a container self._check_for_image(desired_entry.code_deployment_metadata) - multipex_server = self._create_multipex_server( - deployment_name, location_name, desired_entry.code_deployment_metadata + multipex_server = self._start_new_server_spinup( + deployment_name, location_name, desired_entry ) self._multipex_servers[to_update_key] = multipex_server assert self._get_multipex_server( @@ -1641,14 +1668,6 @@ def _get_multipex_server( return None - def _create_multipex_server(self, deployment_name, location_name, code_deployment_metadata): - multipex_server = self._start_new_server_spinup( - deployment_name, - location_name, - code_deployment_metadata, - ) - return multipex_server - def _create_pex_server( self, deployment_name: str, @@ -1699,9 +1718,7 @@ def _start_new_dagster_server( desired_entry.code_deployment_metadata, ) else: - return self._start_new_server_spinup( - deployment_name, location_name, desired_entry.code_deployment_metadata - ) + return self._start_new_server_spinup(deployment_name, location_name, desired_entry) def get_grpc_endpoint( self, diff --git a/dagster-cloud/setup.py b/dagster-cloud/setup.py index 222aeb7..4d17c9f 100644 --- a/dagster-cloud/setup.py +++ b/dagster-cloud/setup.py @@ -40,8 +40,8 @@ def get_description() -> str: packages=find_packages(exclude=["dagster_cloud_tests*"]), include_package_data=True, install_requires=[ - "dagster==1.6.11", - "dagster-cloud-cli==1.6.11", + "dagster==1.6.12", + "dagster-cloud-cli==1.6.12", "pex>=2.1.132,<3", "questionary", "requests", @@ -66,13 +66,13 @@ def get_description() -> str: "dbt-snowflake", "dbt-postgres", "dbt-duckdb", - "dagster-dbt==0.22.11", - "dagster_k8s==0.22.11", + "dagster-dbt==0.22.12", + "dagster_k8s==0.22.12", ], "insights": ["pyarrow"], - "docker": ["docker", "dagster_docker==0.22.11"], - "kubernetes": ["kubernetes", "dagster_k8s==0.22.11"], - "ecs": ["dagster_aws==0.22.11", "boto3"], + "docker": ["docker", "dagster_docker==0.22.12"], + "kubernetes": ["kubernetes", "dagster_k8s==0.22.12"], + "ecs": ["dagster_aws==0.22.12", "boto3"], "sandbox": ["supervisor"], "pex": ["boto3"], "serverless": ["boto3"],