Skip to content

Commit

Permalink
Release 1.6.12 (tracks 490750e448f6159ed6393f58c36e2f1eddfb1c33)
Browse files Browse the repository at this point in the history
  • Loading branch information
elementl-devtools committed Mar 21, 2024
1 parent 545eb31 commit f763409
Show file tree
Hide file tree
Showing 18 changed files with 187 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ def notify(
source = metrics.get_source()
if source == CliEventTags.source.github:
event = github_context.get_github_event(project_dir)
msg = "Your pull request is automatically being deployed to Dagster Cloud."
msg = f"Your pull request at commit `{event.github_sha}` is automatically being deployed to Dagster Cloud."
event.update_pr_comment(
msg + "\n\n" + report.markdown_report(location_states),
orig_author="github-actions[bot]",
Expand Down
14 changes: 10 additions & 4 deletions dagster-cloud-cli/dagster_cloud_cli/commands/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
@app.command()
@dagster_cloud_options()
def set_deployment(
ctx: Context, deployment: str = Argument(..., autocompletion=available_deployment_names)
ctx: Context,
deployment: str = Argument(..., autocompletion=available_deployment_names),
):
"""Set the default deployment for CLI commands."""
deployments = available_deployment_names(ctx=ctx)
Expand All @@ -42,7 +43,10 @@ def set_deployment(
@app.command()
def view(
show_token: bool = Option(
False, "--show-token", "-s", help="Whether to display the user token in plaintext."
False,
"--show-token",
"-s",
help="Whether to display the user token in plaintext.",
),
):
"""View the current CLI configuration."""
Expand Down Expand Up @@ -198,7 +202,7 @@ def _setup(organization: str, deployment: str, api_token: str):
deployment_name = deployment if deployment else "prod"
ui.print(
"\nTo create a new user token or find an existing token, visit"
f" https://dagster.cloud/{new_org}/{deployment_name}/cloud-settings/tokens"
f" https://dagster.cloud/{new_org}/{deployment_name}/org-settings/tokens"
)
new_api_token = (
ui.password_input("Dagster Cloud user token:", default=api_token or "") or None
Expand Down Expand Up @@ -232,7 +236,9 @@ def _setup(organization: str, deployment: str, api_token: str):

write_config(
DagsterCloudCliConfig(
organization=new_org, default_deployment=new_deployment, user_token=new_api_token
organization=new_org,
default_deployment=new_deployment,
user_token=new_api_token,
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,17 @@ def process_alert_policies_config(alert_policies_config):
}
)
),
"pagerduty": Field(
config=Shape(
fields={
"integration_key": Field(
config=str,
is_required=True,
description="The integration key for your PagerDuty app.",
)
}
)
),
}
),
is_required=True,
Expand Down
3 changes: 3 additions & 0 deletions dagster-cloud-cli/dagster_cloud_cli/gql.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,9 @@ def get_deployment_settings(client: GqlShimClient) -> Dict[str, Any]:
... on MicrosoftTeamsAlertPolicyNotification {
webhookUrl
}
... on PagerdutyAlertPolicyNotification {
integrationKey
}
}
enabled
}
Expand Down
2 changes: 1 addition & 1 deletion dagster-cloud-cli/dagster_cloud_cli/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.6.11"
__version__ = "1.6.12"
2 changes: 1 addition & 1 deletion dagster-cloud-examples/dagster_cloud_examples/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.6.11"
__version__ = "1.6.12"
2 changes: 1 addition & 1 deletion dagster-cloud-examples/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def get_version() -> str:
name="dagster-cloud-examples",
version=ver,
packages=find_packages(exclude=["dagster_cloud_examples_tests*"]),
install_requires=["dagster_cloud==1.6.11"],
install_requires=["dagster_cloud==1.6.12"],
extras_require={"tests": ["mypy", "pylint", "pytest"]},
author="Elementl",
author_email="[email protected]",
Expand Down
7 changes: 2 additions & 5 deletions dagster-cloud/dagster_cloud/dagster_anomaly_detection/defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,8 @@ def the_check(context: AssetExecutionContext) -> AssetCheckResult:
headers={"Dagster-Cloud-Api-Token": instance.dagster_cloud_agent_token},
)
client = Client(transport=transport, fetch_schema_from_transport=True)
asset_key = context.asset_checks_def.asset_key
asset = context.job_def.asset_layer.assets_defs_by_key.get(
asset_key
) or context.job_def.asset_layer.source_assets_by_key.get(asset_key)
if asset is None:
asset_key = next(iter(context.assets_def.check_keys)).asset_key
if not context.job_def.asset_layer.has(asset_key):
raise Exception(f"Could not find targeted asset {asset_key.to_string()}.")
result = client.execute(
gql(ANOMALY_DETECTION_INFERENCE_MUTATION),
Expand Down
10 changes: 0 additions & 10 deletions dagster-cloud/dagster_cloud/serverless/io_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,6 @@ def has_output(self, context):
key = self._get_path(context)
return self._has_object(key)

def _rm_object(self, key):
check.str_param(key, "key")
check.param_invariant(len(key) > 0, "key")

# delete_object wont fail even if the item has been deleted.
self._s3.delete_object(Bucket=self._bucket, Key=key)

def _has_object(self, key):
check.str_param(key, "key")
check.param_invariant(len(key) > 0, "key")
Expand Down Expand Up @@ -101,9 +94,6 @@ def handle_output(self, context, obj):

key = self._get_path(context)

if self._has_object(key):
self._rm_object(key)

pickled_obj = pickle.dumps(obj, PICKLE_PROTOCOL)
pickled_obj_bytes = io.BytesIO(pickled_obj)
self._s3.upload_fileobj(
Expand Down
2 changes: 1 addition & 1 deletion dagster-cloud/dagster_cloud/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.6.11"
__version__ = "1.6.12"
13 changes: 11 additions & 2 deletions dagster-cloud/dagster_cloud/workspace/docker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
from dagster._serdes.config_class import ConfigurableClassData
from dagster._utils import find_free_port
from dagster._utils.merger import merge_dicts
from dagster_cloud_cli.core.workspace import CodeDeploymentMetadata
from dagster_docker import DockerRunLauncher
from dagster_docker.container_context import DockerContainerContext
from dateutil.parser import parse
from docker.models.containers import Container
from typing_extensions import Self

Expand All @@ -38,6 +38,8 @@
GRPC_SERVER_LABEL = "dagster_grpc_server"
MULTIPEX_SERVER_LABEL = "dagster_multipex_server"
AGENT_LABEL = "dagster_agent_id"
SERVER_TIMESTAMP_LABEL = "dagster_server_timestamp"


IMAGE_PULL_LOG_INTERVAL = 15

Expand Down Expand Up @@ -261,8 +263,9 @@ def _start_new_server_spinup(
self,
deployment_name: str,
location_name: str,
metadata: CodeDeploymentMetadata,
desired_entry: UserCodeLauncherEntry,
) -> DagsterCloudGrpcServer:
metadata = desired_entry.code_deployment_metadata
container_name = unique_docker_resource_name(deployment_name, location_name)

container_context = DockerContainerContext(
Expand Down Expand Up @@ -294,6 +297,7 @@ def _start_new_server_spinup(
MULTIPEX_SERVER_LABEL: "",
deterministic_label_for_location(deployment_name, location_name): "",
AGENT_LABEL: self._instance.instance_uuid,
SERVER_TIMESTAMP_LABEL: str(desired_entry.update_timestamp),
}
else:
command = metadata.get_grpc_server_command(
Expand All @@ -306,6 +310,7 @@ def _start_new_server_spinup(
GRPC_SERVER_LABEL: "",
deterministic_label_for_location(deployment_name, location_name): "",
AGENT_LABEL: self._instance.instance_uuid,
SERVER_TIMESTAMP_LABEL: str(desired_entry.update_timestamp),
}

container, server_endpoint = self._launch_container(
Expand Down Expand Up @@ -351,6 +356,10 @@ def _remove_server_handle(self, server_handle: DagsterDockerContainer) -> None:
def get_agent_id_for_server(self, handle: DagsterDockerContainer) -> Optional[str]:
return handle.container.labels.get(AGENT_LABEL)

def get_server_create_timestamp(self, handle: DagsterDockerContainer) -> Optional[float]:
created_time_str = handle.container.attrs["Created"]
return parse(created_time_str).timestamp()

def _list_server_handles(self) -> List[DagsterDockerContainer]:
client = docker.client.from_env()
return [
Expand Down
18 changes: 15 additions & 3 deletions dagster-cloud/dagster_cloud/workspace/ecs/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from dagster._utils.merger import merge_dicts
from dagster_aws.ecs.container_context import EcsContainerContext
from dagster_aws.secretsmanager import get_secrets_from_arns
from dagster_cloud_cli.core.workspace import CodeDeploymentMetadata

from dagster_cloud.api.dagster_cloud_api import (
UserCodeDeploymentType,
Expand Down Expand Up @@ -342,8 +341,13 @@ def get_code_server_resource_limits(
}

def _start_new_server_spinup(
self, deployment_name: str, location_name: str, metadata: CodeDeploymentMetadata
self,
deployment_name: str,
location_name: str,
desired_entry: UserCodeLauncherEntry,
) -> DagsterCloudGrpcServer:
metadata = desired_entry.code_deployment_metadata

if metadata.pex_metadata:
command = metadata.get_multipex_server_command(
PORT, metrics_enabled=self._instance.user_code_launcher.code_server_metrics_enabled
Expand All @@ -352,6 +356,7 @@ def _start_new_server_spinup(
tags = {
"dagster/multipex_server": "1",
"dagster/agent_id": self._instance.instance_uuid,
"dagster/server_timestamp": str(desired_entry.update_timestamp),
}
else:
command = metadata.get_grpc_server_command(
Expand All @@ -360,7 +365,11 @@ def _start_new_server_spinup(
additional_env = metadata.get_grpc_server_env(
PORT, location_name, self._instance.ref_for_deployment(deployment_name)
)
tags = {"dagster/grpc_server": "1", "dagster/agent_id": self._instance.instance_uuid}
tags = {
"dagster/grpc_server": "1",
"dagster/agent_id": self._instance.instance_uuid,
"dagster/server_timestamp": str(desired_entry.update_timestamp),
}

container_context = EcsContainerContext(
secrets=self.secrets,
Expand Down Expand Up @@ -632,6 +641,9 @@ def get_agent_id_for_server(self, handle: EcsServerHandleType) -> Optional[str]:
# Need to get container for server handle, then get the agent tag from that.
return handle.tags.get("dagster/agent_id")

def get_server_create_timestamp(self, handle: EcsServerHandleType) -> Optional[float]:
return handle.create_timestamp

def _run_launcher_kwargs(self) -> Dict[str, Any]:
return dict(
task_definition={
Expand Down
19 changes: 8 additions & 11 deletions dagster-cloud/dagster_cloud/workspace/ecs/service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from typing import Optional

from dagster._utils.cached_method import cached_method

Expand Down Expand Up @@ -76,19 +77,15 @@ def public_ip(self):

@property
@cached_method
def created_at(self):
task_arns = self.client.ecs.list_tasks(
cluster=self.client.cluster_name,
serviceName=self.name,
).get("taskArns")
def create_timestamp(self) -> Optional[float]:
response = self.client.ecs.describe_services(
cluster=self.client.cluster_name, services=[self.name]
)

# Assume there's only one task per service
task = self.client.ecs.describe_tasks(
cluster=self.client.cluster_name,
tasks=task_arns,
).get("tasks")[0]
service_description = response["services"][0]

return task.get("createdAt")
# Extract the creation timestamp
return service_description["createdAt"].timestamp()

@property
@cached_method
Expand Down
Loading

0 comments on commit f763409

Please sign in to comment.