Skip to content

Commit

Permalink
matrix for ray and dagster versions
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Oct 22, 2024
1 parent 8f1de50 commit 7d89999
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 84 deletions.
43 changes: 27 additions & 16 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ env:

jobs:
test:
name: Test Python ${{ matrix.py }} - KubeRay ${{ matrix.kuberay }}
name: Test Python ${{ matrix.py }} - Ray ${{ matrix.ray }} - Dagster ${{ matrix.dagster }} - KubeRay ${{ matrix.kuberay }}
runs-on: ${{ matrix.os }}-latest
strategy:
fail-fast: false
Expand All @@ -31,20 +31,16 @@ jobs:
- "3.11"
- "3.10"
- "3.9"
ray:
- "2.37.0"
- "2.24.0"
dagster:
- "1.8.12"
kuberay:
- "1.1.0"
- "1.2.2"
steps:
- uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v3
with:
version: 0.4.18
enable-cache: true
- name: Set up Python ${{ matrix.py }}
run: uv python install ${{ matrix.py }}
- name: Install dependencies
run: uv sync --all-extras --dev
- uses: azure/[email protected]
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
Expand All @@ -53,14 +49,27 @@ jobs:
with:
start: false
driver: docker
#- uses: mxschmitt/action-tmate@v3
- name: Install uv
uses: astral-sh/setup-uv@v3
with:
version: 0.4.25
enable-cache: true
- name: Set up Python ${{ matrix.py }}
run: uv python install ${{ matrix.py }} && uv python pin ${{ matrix.py }} && uv venv --python ${{ matrix.py }}
- name: Override ray==${{ matrix.ray }} dagster==${{ matrix.dagster }}
id: override
run: uv add --no-sync "ray[all]==${{ matrix.ray }}" "dagster==${{ matrix.dagster }}" || echo SKIP=1 >> $GITHUB_OUTPUT
- name: Install dependencies
run: uv sync --all-extras --dev
if: ${{ steps.override.outputs.SKIP != '1' }}
- name: Run tests
env:
PYTEST_KUBERAY_VERSIONS: "${{ matrix.kuberay }}"
run: uv run pytest -v .
if: ${{ steps.override.outputs.SKIP != '1' }}

lint:
name: lint ${{ matrix.py }} - ${{ matrix.os }}
name: Lint ${{ matrix.py }}
runs-on: ${{ matrix.os }}-latest
strategy:
fail-fast: false
Expand All @@ -76,10 +85,10 @@ jobs:
- name: Install uv
uses: astral-sh/setup-uv@v3
with:
version: 0.4.18
version: 0.4.25
enable-cache: true
- name: Set up Python ${{ matrix.py }}
run: uv python install ${{ matrix.py }}
run: uv python install ${{ matrix.py }} && uv python pin ${{ matrix.py }}
- name: Install dependencies
run: uv sync --all-extras --dev
- name: Run pre-commit hooks
Expand Down Expand Up @@ -108,10 +117,12 @@ jobs:
- name: Install uv
uses: astral-sh/setup-uv@v3
with:
version: 0.4.18
version: 0.4.25
enable-cache: true
- name: Set up Python
run: uv python install 3.11.9
run: uv python install $PYTHON_VERSION && uv python pin $PYTHON_VERSION
env:
PYTHON_VERSION: 3.11.9
- name: Generate Version
run: export VERSION=$(uv run dunamai from any --style pep440) && echo "Version is $VERSION" && echo "VERSION=$VERSION" >> $GITHUB_ENV
- name: Replace version in code
Expand Down
18 changes: 14 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ COPY --from=bitnami/kubectl:1.30.3 /opt/bitnami/kubectl/bin/kubectl /usr/local/b

# install uv (https://github.com/astral-sh/uv)
# docs for using uv with Docker: https://docs.astral.sh/uv/guides/integration/docker/
COPY --from=ghcr.io/astral-sh/uv:0.4.18 /uv /bin/uv
COPY --from=ghcr.io/astral-sh/uv:0.4.25 /uv /bin/uv

ENV UV_PROJECT_ENVIRONMENT=/usr/local/
ENV DAGSTER_HOME=/opt/dagster/dagster_home
Expand All @@ -27,12 +27,12 @@ WORKDIR /src
COPY pyproject.toml uv.lock ./

RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --frozen --all-extras --no-dev --no-install-project
uv sync --frozen --all-extras --no-dev --no-install-project --inexact

FROM base-prod AS base-dev

# Node.js is needed for pyright in CI
ARG NODE_VERSION=20.7.0
ARG NODE_VERSION=23.0.0
ARG NODE_PACKAGE=node-v$NODE_VERSION-linux-x64
ARG NODE_HOME=/opt/$NODE_PACKAGE
ENV NODE_PATH $NODE_HOME/lib/node_modules
Expand All @@ -41,6 +41,16 @@ RUN --mount=type=cache,target=/cache/downloads \
curl https://nodejs.org/dist/v$NODE_VERSION/$NODE_PACKAGE.tar.gz -o /cache/downloads/$NODE_PACKAGE.tar.gz \
&& tar -xzC /opt/ -f /cache/downloads/$NODE_PACKAGE.tar.gz


RUN mkdir dagster_ray && touch dagster_ray/__init__.py && touch README.md
COPY dagster_ray/_version.py dagster_ray/_version.py

# Install specific Dagster and Ray versions (for integration tests)
ARG RAY_VERSION=2.35.0
ARG DAGSTER_VERSION=1.8.12
RUN --mount=type=cache,target=/root/.cache/uv \
uv add --no-sync "ray[all]==$RAY_VERSION" "dagster==$DAGSTER_VERSION"

RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --frozen --all-extras --no-install-project

Expand All @@ -51,4 +61,4 @@ FROM base-${BUILD_DEPENDENCIES} AS final
COPY . .

# finally install all our code
RUN uv sync --frozen --all-extras
RUN uv sync --frozen --all-extras --inexact
17 changes: 13 additions & 4 deletions dagster_ray/executor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, cast

import dagster
from dagster import (
_check as check,
)
Expand All @@ -19,7 +20,9 @@
StepHandler,
StepHandlerContext,
)
from dagster._core.remote_representation.origin import RemoteJobOrigin
from dagster._utils.merger import merge_dicts
from packaging.version import Version
from pydantic import Field

from dagster_ray.config import RayExecutionConfig, RayJobSubmissionClientConfig
Expand Down Expand Up @@ -149,10 +152,16 @@ def launch_step(self, step_handler_context: StepHandlerContext) -> Iterator[Dags
"dagster/op": step_key,
"dagster/run-id": step_handler_context.execute_step_args.run_id,
}
if run.external_job_origin:
labels["dagster/code-location"] = (
run.external_job_origin.repository_origin.code_location_origin.location_name
)

if Version(dagster.__version__) >= Version("1.8.12"):
remote_job_origin = run.remote_job_origin # type: ignore
else:
remote_job_origin = run.external_job_origin # type: ignore

remote_job_origin = cast(Optional[RemoteJobOrigin], remote_job_origin)

if remote_job_origin:
labels["dagster/code-location"] = remote_job_origin.repository_origin.code_location_origin.location_name

user_provided_config = RayExecutionConfig.from_tags({**step_handler_context.step_tags[step_key]})

Expand Down
10 changes: 4 additions & 6 deletions dagster_ray/kuberay/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@
"containers": [
{
"volumeMounts": [
# {"mountPath": "/tmp/ray", "name": "log-volume"},
{"mountPath": "/tmp/ray", "name": "ray-logs"},
],
"name": "head",
"imagePullPolicy": "Always",
},
],
"volumes": [
{"name": "log-volume", "emptyDir": {}},
{"name": "ray-logs", "emptyDir": {}},
],
"affinity": {},
"tolerations": [],
Expand All @@ -58,15 +58,13 @@
"imagePullSecrets": [],
"containers": [
{
"volumeMounts": [
# {"mountPath": "/tmp/ray", "name": "log-volume"}
],
"volumeMounts": [{"mountPath": "/tmp/ray", "name": "ray-logs"}],
"name": "worker",
"imagePullPolicy": "Always",
}
],
"volumes": [
{"name": "log-volume", "emptyDir": {}},
{"name": "ray-logs", "emptyDir": {}},
],
"affinity": {},
"tolerations": [],
Expand Down
15 changes: 9 additions & 6 deletions dagster_ray/pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ def run( # type: ignore
ray_job (Dict[str, Any]): RayJob specification. `API reference <https://ray-project.github.io/kuberay/reference/api/#rayjob>`_.
extras (Optional[Dict[str, Any]]): Additional information to pass to the Pipes session.
"""
from ray.job_submission import JobStatus

with open_pipes_session(
context=context,
Expand All @@ -224,11 +223,7 @@ def run( # type: ignore

try:
self._read_messages(context, job_id)
status = self._wait_for_completion(context, job_id)

if status in {JobStatus.FAILED, JobStatus.STOPPED}:
raise RuntimeError(f"RayJob {job_id} failed with status {status}")

self._wait_for_completion(context, job_id)
return PipesClientCompletedInvocation(session)

except DagsterExecutionInterruptedError:
Expand Down Expand Up @@ -281,12 +276,20 @@ def _read_messages(self, context: OpExecutionContext, job_id: str) -> None:
)

def _wait_for_completion(self, context: OpExecutionContext, job_id: str) -> "JobStatus":
from ray.job_submission import JobStatus

context.log.info(f"[pipes] Waiting for RayJob {job_id} to complete...")

while True:
status = self.client.get_job_status(job_id)

if status.is_terminal():
if status in {JobStatus.FAILED, JobStatus.STOPPED}:
job_details = self.client.get_job_info(job_id)
raise RuntimeError(
f"[pipes] RayJob {job_id} failed with status {status}. Message:\n{job_details.message}"
)

return status

time.sleep(self.poll_interval)
Expand Down
19 changes: 14 additions & 5 deletions dagster_ray/run_launcher.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import logging
import sys
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast

import dagster
from dagster import _check as check
from dagster._cli.api import ExecuteRunArgs # type: ignore
from dagster._config.config_schema import UserConfigSchema
from dagster._core.events import EngineEventData
from dagster._core.launcher import LaunchRunContext, ResumeRunContext, RunLauncher
from dagster._core.launcher.base import CheckRunHealthResult, WorkerStatus
from dagster._core.remote_representation.origin import RemoteJobOrigin
from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus
from dagster._grpc.types import ResumeRunArgs
from dagster._serdes import ConfigurableClass, ConfigurableClassData
from dagster._utils.error import serializable_error_info_from_exc_info
from packaging.version import Version
from pydantic import Field

from dagster_ray.config import RayExecutionConfig, RayJobSubmissionClientConfig
Expand Down Expand Up @@ -137,10 +140,16 @@ def _launch_ray_job(self, submission_id: str, entrypoint: str, run: DagsterRun):
"dagster/job": job_origin.job_name,
"dagster/run-id": run.run_id,
}
if run.external_job_origin:
labels["dagster/code-location"] = (
run.external_job_origin.repository_origin.code_location_origin.location_name
)

if Version(dagster.__version__) >= Version("1.8.12"):
remote_job_origin = run.remote_job_origin # type: ignore
else:
remote_job_origin = run.external_job_origin # type: ignore

remote_job_origin = cast(Optional[RemoteJobOrigin], remote_job_origin)

if remote_job_origin:
labels["dagster/code-location"] = remote_job_origin.repository_origin.code_location_origin.location_name

cfg_from_tags = RayLauncherConfig.from_tags(run.tags)

Expand Down
46 changes: 30 additions & 16 deletions tests/kuberay/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from dagster_ray.kuberay.client import RayClusterClient
from dagster_ray.kuberay.configs import DEFAULT_HEAD_GROUP_SPEC, DEFAULT_WORKER_GROUP_SPECS
from tests import ROOT_DIR
from tests.kuberay.utils import NAMESPACE, get_random_free_port
from tests.kuberay.utils import NAMESPACE


@pytest.fixture(scope="session")
Expand All @@ -28,6 +28,9 @@ def kuberay_helm_repo():

@pytest.fixture(scope="session")
def dagster_ray_image():
import dagster
import ray

"""
Either returns the image name from the environment variable PYTEST_DAGSTER_RAY_IMAGE
or builds the image and returns it
Expand All @@ -36,7 +39,11 @@ def dagster_ray_image():
if PYTEST_DAGSTER_RAY_IMAGE is None:
# build the local image
python_version = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
image = f"local/dagster-ray:py-{python_version}"
ray_version = ray.__version__
dagster_version = dagster.__version__

image = f"local/dagster-ray:py-{python_version}-{ray_version}-{dagster_version}"

subprocess.run(
[
"docker",
Expand All @@ -47,6 +54,10 @@ def dagster_ray_image():
"BUILD_DEPENDENCIES=dev",
"--build-arg",
f"PYTHON_VERSION={python_version}",
"--build-arg",
f"RAY_VERSION={ray_version}",
"--build-arg",
f"DAGSTER_VERSION={dagster_version}",
"-t",
image,
str(ROOT_DIR),
Expand Down Expand Up @@ -145,7 +156,7 @@ def k8s_with_raycluster(
k8s_with_kuberay: AClusterManager,
head_group_spec: Dict[str, Any],
worker_group_specs: List[Dict[str, Any]],
) -> Iterator[Tuple[dict[str, int], AClusterManager]]:
) -> Iterator[Tuple[dict[str, str], AClusterManager]]:
# create a RayCluster
config.load_kube_config(str(k8s_with_kuberay.kubeconfig))

Expand All @@ -155,6 +166,8 @@ def k8s_with_raycluster(

client.create(
body={
"kind": "RayCluster",
"apiVersion": "ray.io/v1",
"metadata": {"name": PERSISTENT_RAY_CLUSTER_NAME},
"spec": {
"headGroupSpec": head_group_spec,
Expand All @@ -164,21 +177,22 @@ def k8s_with_raycluster(
namespace=NAMESPACE,
)

redis_port = get_random_free_port()
dashboard_port = get_random_free_port()

with k8s_with_kuberay.port_forwarding(
target=f"svc/{PERSISTENT_RAY_CLUSTER_NAME}-head-svc",
source_port=redis_port,
target_port=10001,
client.wait_until_ready(
name=PERSISTENT_RAY_CLUSTER_NAME,
namespace=NAMESPACE,
), k8s_with_kuberay.port_forwarding(
target=f"svc/{PERSISTENT_RAY_CLUSTER_NAME}-head-svc",
source_port=dashboard_port,
target_port=8265,
timeout=600,
)

with client.port_forward(
name=PERSISTENT_RAY_CLUSTER_NAME,
namespace=NAMESPACE,
):
yield {"redis": redis_port, "dashboard": dashboard_port}, k8s_with_kuberay
local_dashboard_port=0,
local_gcs_port=0,
) as (dashboard_port, redis_port):
yield (
{"gcs": f"ray://localhost:{redis_port}", "dashboard": f"http://localhost:{dashboard_port}"},
k8s_with_kuberay,
)

client.delete(
name=PERSISTENT_RAY_CLUSTER_NAME,
Expand Down
Loading

0 comments on commit 7d89999

Please sign in to comment.