Skip to content

Commit

Permalink
Release 1.7.11 (tracks 0eeb994d54421a99c0a95e2f12e3461c0de0f5d6)
Browse files Browse the repository at this point in the history
  • Loading branch information
elementl-devtools committed Jun 27, 2024
1 parent b4859a5 commit c491a0b
Show file tree
Hide file tree
Showing 26 changed files with 211 additions and 173 deletions.
25 changes: 19 additions & 6 deletions dagster-cloud-cli/dagster_cloud_cli/commands/ci/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def branch_deployment(
dagster_env: Optional[str] = DAGSTER_ENV_OPTION,
mark_closed: bool = False,
read_only: bool = False,
base_deployment_name: Optional[str] = None,
):
try:
if organization:
Expand All @@ -89,21 +90,31 @@ def branch_deployment(
print(get_branch_deployment_name_from_context(url, project_dir))
return

print(create_or_update_deployment_from_context(url, project_dir, mark_closed))
print(
create_or_update_deployment_from_context(
url, project_dir, mark_closed, base_deployment_name=base_deployment_name
)
)

except ValueError as err:
logging.error(
f"cannot determine branch deployment: {err}",
)
sys.exit(1)


def create_or_update_deployment_from_context(url, project_dir: str, mark_closed=False) -> str:
def create_or_update_deployment_from_context(
url,
project_dir: str,
mark_closed=False,
base_deployment_name: Optional[str] = None,
) -> str:
source = metrics.get_source()
if source == CliEventTags.source.github:
event = github_context.get_github_event(project_dir)
api_token = os.environ[TOKEN_ENV_VAR_NAME]
deployment_name = code_location.create_or_update_branch_deployment_from_github_context(
url, api_token, event, mark_closed
url, api_token, event, mark_closed, base_deployment_name=base_deployment_name
)
if not deployment_name:
raise ValueError(
Expand All @@ -129,6 +140,7 @@ def create_or_update_deployment_from_context(url, project_dir: str, mark_closed=
author_name=event.git_metadata.name,
author_email=event.git_metadata.email,
commit_message=event.git_metadata.message,
base_deployment_name=base_deployment_name,
)
logging.info(
"Got branch deployment %r for branch %r",
Expand Down Expand Up @@ -258,11 +270,12 @@ def init(
# available (eg. if not in a PR) then we fallback to the --deployment flag.

try:
branch_deployment = create_or_update_deployment_from_context(url, project_dir)
branch_deployment = create_or_update_deployment_from_context(
url, project_dir, base_deployment_name=deployment
)
if deployment:
ui.print(
f"Deploying to branch deployment {branch_deployment}, ignoring"
f" --deployment={deployment}"
f"Deploying to branch deployment {branch_deployment}, and using base deployment {deployment}"
)
deployment = branch_deployment
is_branch_deployment = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
get_location_document,
)
from dagster_cloud_cli.core.graphql_client import DagsterCloudGraphQLClient
from dagster_cloud_cli.core.workspace import CodeDeploymentMetadata
from dagster_cloud_cli.core.workspace import CodeLocationDeployData
from dagster_cloud_cli.utils import add_options

DEFAULT_LOCATIONS_YAML_FILENAME = "locations.yaml"
Expand Down Expand Up @@ -263,7 +263,7 @@ def execute_list_command(client):

for location in list_res:
metadata = deserialize_value(
location["serializedDeploymentMetadata"], CodeDeploymentMetadata
location["serializedDeploymentMetadata"], CodeLocationDeployData
)

location_desc = [location["locationName"]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def create_or_update_branch_deployment_from_github_context(
dagster_cloud_api_token: str,
github_event: github_context.GithubEvent,
mark_closed: bool,
base_deployment_name: Optional[str],
) -> Optional[str]:
"""Return the branch deployment associated with the github PR."""
event = github_event
Expand All @@ -35,6 +36,7 @@ def create_or_update_branch_deployment_from_github_context(
author_email=event.author_email,
commit_message=event.commit_msg,
author_avatar_url=github_event.get_github_avatar_url(),
base_deployment_name=base_deployment_name,
)
logging.info(
"Got branch deployment %r for branch %r",
Expand Down
13 changes: 7 additions & 6 deletions dagster-cloud-cli/dagster_cloud_cli/core/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class PexMetadata(
# the pex files to execute
("pex_tag", str),
# python_version determines which pex base docker image to use
# only one of PexMetadata.python_version or CodeDeploymentMetadata.image should be specified
# only one of PexMetadata.python_version or CodeLocationDeployData.image should be specified
("python_version", Optional[str]),
],
)
Expand Down Expand Up @@ -85,10 +85,11 @@ def get_instance_ref_for_user_code(instance_ref: InstanceRef) -> InstanceRef:
return instance_ref._replace(custom_instance_class_data=custom_instance_class_data)


# History of CodeDeploymentMetadata
# History of CodeLocationDeployData
# 1. Removal of `enable_metrics` field
@whitelist_for_serdes
class CodeDeploymentMetadata(
# 2. Renamed from `CodeDeploymentMetadata` to `CodeLocationDeployData``
@whitelist_for_serdes(storage_name="CodeDeploymentMetadata")
class CodeLocationDeployData(
NamedTuple(
"_CodeDeploymentMetadata",
[
Expand Down Expand Up @@ -127,7 +128,7 @@ def __new__(
"Must supply exactly one of a file name, a package name, or a module name",
)

return super(CodeDeploymentMetadata, cls).__new__(
return super(CodeLocationDeployData, cls).__new__(
cls,
check.opt_str_param(image, "image"),
check.opt_str_param(python_file, "python_file"),
Expand All @@ -143,7 +144,7 @@ def __new__(
check.opt_str_param(agent_queue, "agent_queue"),
)

def with_cloud_context_env(self, cloud_context_env: Dict[str, Any]) -> "CodeDeploymentMetadata":
def with_cloud_context_env(self, cloud_context_env: Dict[str, Any]) -> "CodeLocationDeployData":
return self._replace(cloud_context_env=cloud_context_env)

def get_multipex_server_command(
Expand Down
2 changes: 1 addition & 1 deletion dagster-cloud-cli/dagster_cloud_cli/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.7.10"
__version__ = "1.7.11"
2 changes: 1 addition & 1 deletion dagster-cloud-cli/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def get_version() -> str:
packages=find_packages(exclude=["dagster_cloud.cli_tests*"]),
include_package_data=True,
install_requires=[
"dagster==1.7.10",
"dagster==1.7.11",
"packaging>=20.9",
"questionary",
"requests",
Expand Down
2 changes: 1 addition & 1 deletion dagster-cloud-examples/dagster_cloud_examples/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.7.10"
__version__ = "1.7.11"
2 changes: 1 addition & 1 deletion dagster-cloud-examples/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def get_version() -> str:
name="dagster-cloud-examples",
version=ver,
packages=find_packages(exclude=["dagster_cloud_examples_tests*"]),
install_requires=["dagster_cloud==1.7.10"],
install_requires=["dagster_cloud==1.7.11"],
extras_require={"tests": ["mypy", "pylint", "pytest"]},
author="Elementl",
author_email="[email protected]",
Expand Down
43 changes: 36 additions & 7 deletions dagster-cloud/dagster_cloud/agent/dagster_cloud_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from collections import deque
from concurrent.futures import Future, ThreadPoolExecutor
from contextlib import ExitStack
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple, Union, cast

import dagster._check as check
Expand All @@ -23,7 +24,7 @@
from dagster._utils.merger import merge_dicts
from dagster._utils.typed_dict import init_optional_typeddict
from dagster_cloud_cli.core.errors import raise_http_error
from dagster_cloud_cli.core.workspace import CodeDeploymentMetadata, get_instance_ref_for_user_code
from dagster_cloud_cli.core.workspace import CodeLocationDeployData, get_instance_ref_for_user_code

from dagster_cloud.api.dagster_cloud_api import (
AgentHeartbeat,
Expand Down Expand Up @@ -88,6 +89,10 @@
)
)

LIVENESS_CHECK_INTERVAL_SECONDS = float(
os.getenv("DAGSTER_CLOUD_AGENT_LIVENESS_CHECK_INTERVAL", "15.0")
)


class DagsterCloudAgent:
def __init__(self, pending_requests_limit: int = DEFAULT_PENDING_REQUESTS_LIMIT):
Expand Down Expand Up @@ -127,6 +132,8 @@ def __init__(self, pending_requests_limit: int = DEFAULT_PENDING_REQUESTS_LIMIT)
set()
)

self._last_liveness_check_time = None

def __enter__(self):
return self

Expand Down Expand Up @@ -277,10 +284,32 @@ def run_loop(
f"Failed to check for workspace updates: \n{serializable_error_info_from_exc_info(sys.exc_info())}"
)

self._write_liveness_sentinel_if_overdue()

# Check for any received interrupts
with raise_interrupts_as(KeyboardInterrupt):
time.sleep(SLEEP_INTERVAL_SECONDS)

def _write_liveness_sentinel_if_overdue(self):
if self._last_liveness_check_time is False:
return

now = time.time()
if self._last_liveness_check_time is None:
self._logger.info("Starting liveness sentinel")
elif self._last_liveness_check_time + LIVENESS_CHECK_INTERVAL_SECONDS > now:
return

# Write to a sentinel file to indicate that we've finished our initial
# reconciliation - this is used to indicate that we're ready to
# serve requests
try:
Path("/opt/liveness_sentinel.txt").touch(exist_ok=True)
self._last_liveness_check_time = now
except Exception as e:
self._logger.error(f"Failed to write liveness sentinel and disabling it: {e}")
self._last_liveness_check_time = False

def _check_update_workspace(self, instance, user_code_launcher, upload_all):
curr_time = pendulum.now("UTC")

Expand Down Expand Up @@ -403,8 +432,8 @@ def _upload_outdated_workspace_entries(

for entry in entries:
location_name = entry["locationName"]
deployment_metadata = deserialize_value(
entry["serializedDeploymentMetadata"], CodeDeploymentMetadata
code_location_deploy_data = deserialize_value(
entry["serializedDeploymentMetadata"], CodeLocationDeployData
)
if entry["hasOutdatedData"]:
# Spin up a server for this location and upload its metadata to Cloud
Expand All @@ -413,7 +442,7 @@ def _upload_outdated_workspace_entries(
(deployment_name, location_name, is_branch_deployment)
] = now
upload_metadata[(deployment_name, location_name)] = UserCodeLauncherEntry(
code_deployment_metadata=deployment_metadata,
code_location_deploy_data=code_location_deploy_data,
update_timestamp=float(entry["metadataTimestamp"]),
)

Expand Down Expand Up @@ -541,8 +570,8 @@ def _query_for_workspace_updates(
location_key = (deployment_name, location_name)

all_locations.add(location_key)
deployment_metadata = deserialize_value(
entry["serializedDeploymentMetadata"], CodeDeploymentMetadata
code_location_deploy_data = deserialize_value(
entry["serializedDeploymentMetadata"], CodeLocationDeployData
)

# The GraphQL can return a mix of deployments with TTLs (for example, all
Expand All @@ -554,7 +583,7 @@ def _query_for_workspace_updates(
user_code_launcher, is_branch_deployment
) or location_key in cast(Set[Tuple[str, str]], locations_with_ttl_to_query):
deployment_map[location_key] = UserCodeLauncherEntry(
code_deployment_metadata=deployment_metadata,
code_location_deploy_data=code_location_deploy_data,
update_timestamp=float(entry["metadataTimestamp"]),
)

Expand Down
12 changes: 11 additions & 1 deletion dagster-cloud/dagster_cloud/anomaly_detection/defs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
from typing import Iterable, Optional, Sequence, Union, cast

from dagster import (
Expand Down Expand Up @@ -28,6 +29,7 @@
from dagster._core.definitions.source_asset import SourceAsset
from dagster._core.errors import DagsterError, DagsterInvariantViolationError
from dagster._core.instance import DagsterInstance
from dagster._time import get_current_timestamp
from dagster_cloud_cli.core.graphql_client import (
DagsterCloudGraphQLClient,
create_cloud_webserver_client,
Expand Down Expand Up @@ -110,13 +112,21 @@ def handle_anomaly_detection_inference_failure(
data["__typename"] == "AnomalyDetectionFailure"
and data["message"] == params.model_version.minimum_required_records_msg
):
# Pause evaluation for a day if there are not enough records.
metadata[FRESH_UNTIL_METADATA_KEY] = MetadataValue.timestamp(
get_current_timestamp() + datetime.timedelta(days=1).total_seconds()
)
description = (
f"Anomaly detection failed: {data['message']} Any sensors will wait to "
"re-evaluate this check for a day."
)
# Intercept failure in the case of not enough records, and return a pass to avoid
# being too noisy with failures.
return AssetCheckResult(
passed=True,
severity=severity,
metadata=metadata,
description=data["message"],
description=description,
asset_key=asset_key,
)
raise DagsterCloudAnomalyDetectionFailed(f"Anomaly detection failed: {data['message']}")
Expand Down
14 changes: 7 additions & 7 deletions dagster-cloud/dagster_cloud/api/dagster_cloud_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from dagster._serdes import whitelist_for_serdes
from dagster._utils.container import ContainerUtilizationMetrics
from dagster._utils.error import SerializableErrorInfo
from dagster_cloud_cli.core.workspace import CodeDeploymentMetadata
from dagster_cloud_cli.core.workspace import CodeLocationDeployData
from typing_extensions import NotRequired

from dagster_cloud.agent import AgentQueuesConfig
Expand Down Expand Up @@ -55,7 +55,7 @@ class DagsterCloudUploadLocationData(NamedTuple):
dagster_library_versions: Optional[Mapping[str, str]] = None


@whitelist_for_serdes
@whitelist_for_serdes(storage_field_names={"code_location_deploy_data": "deployment_metadata"})
class DagsterCloudUploadWorkspaceEntry(NamedTuple):
"""Serialized object uploaded by the Dagster Cloud agent with information about
a repository location - either the serialized DagsterCloudUploadLocationData
Expand All @@ -64,7 +64,7 @@ class DagsterCloudUploadWorkspaceEntry(NamedTuple):
"""

location_name: str
deployment_metadata: CodeDeploymentMetadata
code_location_deploy_data: CodeLocationDeployData
upload_location_data: Optional[DagsterCloudUploadLocationData]
serialized_error_info: Optional[SerializableErrorInfo]

Expand Down Expand Up @@ -274,15 +274,15 @@ def __new__(cls, repo_name, code_pointer, external_repository_data):
)


@whitelist_for_serdes
@whitelist_for_serdes(storage_field_names={"code_location_deploy_data": "code_deployment_metadata"})
class LoadRepositoriesResponse(
NamedTuple(
"_LoadRepositoriesResponse",
[
("repository_datas", Sequence[DagsterCloudRepositoryData]),
("container_image", Optional[str]),
("executable_path", Optional[str]),
("code_deployment_metadata", Optional[CodeDeploymentMetadata]),
("code_location_deploy_data", Optional[CodeLocationDeployData]),
("dagster_library_versions", Optional[Mapping[str, str]]),
],
)
Expand All @@ -292,7 +292,7 @@ def __new__(
repository_datas,
container_image,
executable_path,
code_deployment_metadata=None,
code_location_deploy_data=None,
dagster_library_versions: Optional[Mapping[str, str]] = None,
):
return super(cls, LoadRepositoriesResponse).__new__(
Expand All @@ -305,7 +305,7 @@ def __new__(
check.opt_str_param(container_image, "container_image"),
check.opt_str_param(executable_path, "executable_path"),
check.opt_inst_param(
code_deployment_metadata, "code_deployment_metadata", CodeDeploymentMetadata
code_location_deploy_data, "code_location_deploy_data", CodeLocationDeployData
),
check.opt_nullable_mapping_param(dagster_library_versions, "dagster_library_versions"),
)
Expand Down
Loading

0 comments on commit c491a0b

Please sign in to comment.