Skip to content

Commit

Permalink
matrix for ray and dagster versions
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Oct 22, 2024
1 parent 8f1de50 commit d46c68b
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 41 deletions.
13 changes: 11 additions & 2 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ env:

jobs:
test:
name: Test Python ${{ matrix.py }} - KubeRay ${{ matrix.kuberay }}
name: Test Python ${{ matrix.py }} - Ray ${{ matrix.kuberay }} - Dagster ${{ matrix.dagster }} - KubeRay ${{ matrix.kuberay }}
runs-on: ${{ matrix.os }}-latest
strategy:
fail-fast: false
Expand All @@ -31,6 +31,13 @@ jobs:
- "3.11"
- "3.10"
- "3.9"
ray:
- "2.37.0"
- "2.24.0"
- "2.12.0"
dagster:
- "1.8.12"
- "1.7.16"
kuberay:
- "1.1.0"
- "1.2.2"
Expand All @@ -57,10 +64,12 @@ jobs:
- name: Run tests
env:
PYTEST_KUBERAY_VERSIONS: "${{ matrix.kuberay }}"
PYTEST_RAY_VERSION: "${{ matrix.ray }}"
PYTEST_DAGSTER_VERSION: "${{ matrix.dagster }}"
run: uv run pytest -v .

lint:
name: lint ${{ matrix.py }} - ${{ matrix.os }}
name: Lint ${{ matrix.py }}
runs-on: ${{ matrix.os }}-latest
strategy:
fail-fast: false
Expand Down
18 changes: 14 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ COPY --from=bitnami/kubectl:1.30.3 /opt/bitnami/kubectl/bin/kubectl /usr/local/b

# install uv (https://github.com/astral-sh/uv)
# docs for using uv with Docker: https://docs.astral.sh/uv/guides/integration/docker/
COPY --from=ghcr.io/astral-sh/uv:0.4.18 /uv /bin/uv
COPY --from=ghcr.io/astral-sh/uv:0.4.25 /uv /bin/uv

ENV UV_PROJECT_ENVIRONMENT=/usr/local/
ENV DAGSTER_HOME=/opt/dagster/dagster_home
Expand All @@ -27,7 +27,7 @@ WORKDIR /src
COPY pyproject.toml uv.lock ./

RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --frozen --all-extras --no-dev --no-install-project
uv sync --frozen --all-extras --no-dev --no-install-project --inexact

FROM base-prod AS base-dev

Expand All @@ -41,8 +41,18 @@ RUN --mount=type=cache,target=/cache/downloads \
curl https://nodejs.org/dist/v$NODE_VERSION/$NODE_PACKAGE.tar.gz -o /cache/downloads/$NODE_PACKAGE.tar.gz \
&& tar -xzC /opt/ -f /cache/downloads/$NODE_PACKAGE.tar.gz


RUN mkdir dagster_ray && touch dagster_ray/__init__.py && touch README.md
COPY dagster_ray/_version.py dagster_ray/_version.py

# Install specific Dagster and Ray versions (for integration tests)
ARG RAY_VERSION=2.35.0
ARG DAGSTER_VERSION=1.8.12
RUN --mount=type=cache,target=/root/.cache/uv \
uv add --no-sync "ray[all]==$RAY_VERSION" "dagster==$DAGSTER_VERSION"

RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --frozen --all-extras --no-install-project
uv sync --frozen --all-extras --no-install-project --inexact

# -------------------------------------------------------------
FROM base-${BUILD_DEPENDENCIES} AS final
Expand All @@ -51,4 +61,4 @@ FROM base-${BUILD_DEPENDENCIES} AS final
COPY . .

# finally install all our code
RUN uv sync --frozen --all-extras
RUN uv sync --frozen --all-extras --inexact
10 changes: 4 additions & 6 deletions dagster_ray/kuberay/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@
"containers": [
{
"volumeMounts": [
# {"mountPath": "/tmp/ray", "name": "log-volume"},
{"mountPath": "/tmp/ray", "name": "ray-logs"},
],
"name": "head",
"imagePullPolicy": "Always",
},
],
"volumes": [
{"name": "log-volume", "emptyDir": {}},
{"name": "ray-logs", "emptyDir": {}},
],
"affinity": {},
"tolerations": [],
Expand All @@ -58,15 +58,13 @@
"imagePullSecrets": [],
"containers": [
{
"volumeMounts": [
# {"mountPath": "/tmp/ray", "name": "log-volume"}
],
"volumeMounts": [{"mountPath": "/tmp/ray", "name": "ray-logs"}],
"name": "worker",
"imagePullPolicy": "Always",
}
],
"volumes": [
{"name": "log-volume", "emptyDir": {}},
{"name": "ray-logs", "emptyDir": {}},
],
"affinity": {},
"tolerations": [],
Expand Down
15 changes: 9 additions & 6 deletions dagster_ray/pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ def run( # type: ignore
ray_job (Dict[str, Any]): RayJob specification. `API reference <https://ray-project.github.io/kuberay/reference/api/#rayjob>`_.
extras (Optional[Dict[str, Any]]): Additional information to pass to the Pipes session.
"""
from ray.job_submission import JobStatus

with open_pipes_session(
context=context,
Expand All @@ -224,11 +223,7 @@ def run( # type: ignore

try:
self._read_messages(context, job_id)
status = self._wait_for_completion(context, job_id)

if status in {JobStatus.FAILED, JobStatus.STOPPED}:
raise RuntimeError(f"RayJob {job_id} failed with status {status}")

self._wait_for_completion(context, job_id)
return PipesClientCompletedInvocation(session)

except DagsterExecutionInterruptedError:
Expand Down Expand Up @@ -281,12 +276,20 @@ def _read_messages(self, context: OpExecutionContext, job_id: str) -> None:
)

def _wait_for_completion(self, context: OpExecutionContext, job_id: str) -> "JobStatus":
from ray.job_submission import JobStatus

context.log.info(f"[pipes] Waiting for RayJob {job_id} to complete...")

while True:
status = self.client.get_job_status(job_id)

if status.is_terminal():
if status in {JobStatus.FAILED, JobStatus.STOPPED}:
job_details = self.client.get_job_info(job_id)
raise RuntimeError(
f"[pipes] RayJob {job_id} failed with status {status}. Message:\n{job_details.message}"
)

return status

time.sleep(self.poll_interval)
Expand Down
46 changes: 30 additions & 16 deletions tests/kuberay/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from dagster_ray.kuberay.client import RayClusterClient
from dagster_ray.kuberay.configs import DEFAULT_HEAD_GROUP_SPEC, DEFAULT_WORKER_GROUP_SPECS
from tests import ROOT_DIR
from tests.kuberay.utils import NAMESPACE, get_random_free_port
from tests.kuberay.utils import NAMESPACE


@pytest.fixture(scope="session")
Expand All @@ -28,6 +28,9 @@ def kuberay_helm_repo():

@pytest.fixture(scope="session")
def dagster_ray_image():
import dagster
import ray

"""
Either returns the image name from the environment variable PYTEST_DAGSTER_RAY_IMAGE
or builds the image and returns it
Expand All @@ -36,7 +39,11 @@ def dagster_ray_image():
if PYTEST_DAGSTER_RAY_IMAGE is None:
# build the local image
python_version = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
image = f"local/dagster-ray:py-{python_version}"
ray_version = os.getenv("PYTEST_RAY_VERSION") or ray.__version__
dagster_version = os.getenv("PYTEST_DAGSTER_VERSION") or dagster.__version__

image = f"local/dagster-ray:py-{python_version}-{ray_version}-{dagster_version}"

subprocess.run(
[
"docker",
Expand All @@ -47,6 +54,10 @@ def dagster_ray_image():
"BUILD_DEPENDENCIES=dev",
"--build-arg",
f"PYTHON_VERSION={python_version}",
"--build-arg",
f"RAY_VERSION={ray_version}",
"--build-arg",
f"DAGSTER_VERSION={dagster_version}",
"-t",
image,
str(ROOT_DIR),
Expand Down Expand Up @@ -145,7 +156,7 @@ def k8s_with_raycluster(
k8s_with_kuberay: AClusterManager,
head_group_spec: Dict[str, Any],
worker_group_specs: List[Dict[str, Any]],
) -> Iterator[Tuple[dict[str, int], AClusterManager]]:
) -> Iterator[Tuple[dict[str, str], AClusterManager]]:
# create a RayCluster
config.load_kube_config(str(k8s_with_kuberay.kubeconfig))

Expand All @@ -155,6 +166,8 @@ def k8s_with_raycluster(

client.create(
body={
"kind": "RayCluster",
"apiVersion": "ray.io/v1",
"metadata": {"name": PERSISTENT_RAY_CLUSTER_NAME},
"spec": {
"headGroupSpec": head_group_spec,
Expand All @@ -164,21 +177,22 @@ def k8s_with_raycluster(
namespace=NAMESPACE,
)

redis_port = get_random_free_port()
dashboard_port = get_random_free_port()

with k8s_with_kuberay.port_forwarding(
target=f"svc/{PERSISTENT_RAY_CLUSTER_NAME}-head-svc",
source_port=redis_port,
target_port=10001,
client.wait_until_ready(
name=PERSISTENT_RAY_CLUSTER_NAME,
namespace=NAMESPACE,
), k8s_with_kuberay.port_forwarding(
target=f"svc/{PERSISTENT_RAY_CLUSTER_NAME}-head-svc",
source_port=dashboard_port,
target_port=8265,
timeout=600,
)

with client.port_forward(
name=PERSISTENT_RAY_CLUSTER_NAME,
namespace=NAMESPACE,
):
yield {"redis": redis_port, "dashboard": dashboard_port}, k8s_with_kuberay
local_dashboard_port=0,
local_gcs_port=0,
) as (dashboard_port, redis_port):
yield (
{"gcs": f"ray://localhost:{redis_port}", "dashboard": f"http://localhost:{dashboard_port}"},
k8s_with_kuberay,
)

client.delete(
name=PERSISTENT_RAY_CLUSTER_NAME,
Expand Down
13 changes: 7 additions & 6 deletions tests/kuberay/test_pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
from dagster_ray import PipesRayJobClient
from dagster_ray.kuberay.client import RayJobClient
from dagster_ray.kuberay.pipes import PipesKubeRayJobClient
from tests.test_pipes import LOCAL_SCRIPT_PATH

ENTRYPOINT = "python /src/tests/scripts/remote_job.py"

RAY_JOB = {
"apiVersion": "ray.io/v1",
Expand All @@ -26,7 +27,7 @@
},
"spec": {
"activeDeadlineSeconds": 10800,
"entrypoint": "python /src/tests/scripts/remote_job.py",
"entrypoint": ENTRYPOINT,
"entrypointNumCpus": 0.1,
"rayClusterSpec": {
"autoscalerOptions": {
Expand Down Expand Up @@ -118,11 +119,11 @@ def my_asset(context: AssetExecutionContext, pipes_kube_rayjob_client: PipesKube


@pytest.fixture(scope="session")
def pipes_ray_job_client(k8s_with_raycluster: Tuple[dict[str, int], AClusterManager]):
ports, k8s = k8s_with_raycluster
def pipes_ray_job_client(k8s_with_raycluster: Tuple[dict[str, str], AClusterManager]):
hosts, k8s = k8s_with_raycluster
return PipesRayJobClient(
client=JobSubmissionClient(
address=f"https://localhost:{ports['dashboard']}",
address=hosts["dashboard"],
)
)

Expand All @@ -132,7 +133,7 @@ def test_ray_job_pipes(pipes_ray_job_client: PipesRayJobClient, capsys):
def my_asset(context: AssetExecutionContext, pipes_ray_job_client: PipesRayJobClient):
result = pipes_ray_job_client.run(
context=context,
submit_job_params={"entrypoint": f"{sys.executable} {LOCAL_SCRIPT_PATH}"},
submit_job_params={"entrypoint": ENTRYPOINT, "entrypoint_num_cpus": 0.1},
extras={"foo": "bar"},
).get_materialize_result()

Expand Down
2 changes: 1 addition & 1 deletion tests/kuberay/test_raycluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from dagster_ray.kuberay import KubeRayCluster, RayClusterClientResource, RayClusterConfig, cleanup_kuberay_clusters
from dagster_ray.kuberay.client import RayClusterClient
from dagster_ray.kuberay.ops import CleanupKuberayClustersConfig
from tests.kuberay.conftest import NAMESPACE, get_random_free_port
from tests.kuberay.utils import NAMESPACE, get_random_free_port


@pytest.fixture(scope="session")
Expand Down

0 comments on commit d46c68b

Please sign in to comment.