Skip to content

Fix verify tests #281

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/nvidia_workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ jobs:
- name: Create input files
shell: bash
run: |
# install jq
apt-get update && apt-get install -y jq
# Extract the payload content without printing it
PAYLOAD=$(jq -r '.inputs.payload' $GITHUB_EVENT_PATH)

Expand Down
15 changes: 10 additions & 5 deletions src/discord-cluster-manager/cogs/verify_run_cog.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,21 @@ async def trigger_run(self, interaction: discord.Interaction, gpu: GPU, reporter
sub_code = create_mock_attachment(
"submission.py", Path("examples/identity_py/submission.py").read_text()
)
logger.info(f"sub_code: {sub_code}")
Copy link
Preview

Copilot AI May 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] This debug logging statement may not be needed in production. Consider removing it or lowering its log level.

Suggested change
logger.info(f"sub_code: {sub_code}")
logger.debug(f"sub_code: {sub_code}")

Copilot uses AI. Check for mistakes.

logger.info(f"sub_code.text: {Path('examples/identity_py/submission.py').read_text()}")
task = make_task("examples/identity_py")
else:
elif lang == "cu":
sub_code = create_mock_attachment(
"test.cu", Path("examples/identity_cuda/submission.cu").read_text()
"submission.cu", Path("examples/identity_cuda/submission.cu").read_text()
)
task = make_task("examples/identity_cuda")
else:
raise ValueError(f"Invalid language: {lang}")

return await submit_leaderboard(
interaction,
-1,
sub_code,
sub_code.read(),
gpu,
reporter=reporter,
task=task,
Expand All @@ -77,6 +81,7 @@ async def verify_github_run(
reporter,
lang: str,
) -> bool:
logger.info(f"Verifying run for {gpu.name} {lang}")
result = await self.trigger_run(interaction, gpu, reporter, lang)
return result.success
#
Expand Down Expand Up @@ -297,8 +302,8 @@ async def verify_runs(self, interaction: discord.Interaction):
amd = get_gpu_by_name("mi300")
t4 = get_gpu_by_name("T4")

reporter = MultiProgressReporter("Verifying")
await reporter.show(interaction)
reporter = MultiProgressReporter(interaction, "Verifying")
await reporter.show()

results = await asyncio.gather(
self.verify_github_run(interaction, nvidia, reporter.add_run("NVIDIA-PY"), "py"),
Expand Down
2 changes: 2 additions & 0 deletions src/discord-cluster-manager/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ class RankCriterion(Enum):
]
MODAL_CUDA_INCLUDE_DIRS = ["/ThunderKittens/include"]

DEFAULT_GITHUB_TIMEOUT_MINUTES = 10 # Default timeout for GitHub launcher in minutes

NVIDIA_REQUIREMENTS = """
numpy
torch
Expand Down
238 changes: 136 additions & 102 deletions src/discord-cluster-manager/launchers/github.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
import asyncio
import datetime
import json
import math
import pprint
import tempfile
import zipfile
from typing import Awaitable, Callable, Optional

import requests
from consts import AMD_REQUIREMENTS, GPU, NVIDIA_REQUIREMENTS, GitHubGPU
from consts import (
AMD_REQUIREMENTS,
DEFAULT_GITHUB_TIMEOUT_MINUTES,
GPU,
NVIDIA_REQUIREMENTS,
GitHubGPU,
SubmissionMode,
)
from github import Github, UnknownObjectException, WorkflowRun
from report import RunProgressReporter
from run_eval import CompileResult, EvalResult, FullResult, RunResult, SystemInfo
Expand All @@ -18,105 +26,6 @@
logger = setup_logging()


class GitHubLauncher(Launcher):
def __init__(self, repo: str, token: str):
super().__init__(name="GitHub", gpus=GitHubGPU)
self.repo = repo
self.token = token
self.trigger_limit = asyncio.Semaphore(1)

async def run_submission(
self, config: dict, gpu_type: GPU, status: RunProgressReporter
) -> FullResult:
gpu_vendor = None
if gpu_type.value in ["MI300", "MI250"]:
selected_workflow = "amd_workflow.yml"
runner_name = {"MI300": "amdgpu-mi300-x86-64", "MI250": "amdgpu-mi250-x86-64"}[
gpu_type.value
]
gpu_vendor = "AMD"
requirements = AMD_REQUIREMENTS
elif gpu_type.value == "NVIDIA":
selected_workflow = "nvidia_workflow.yml"
gpu_vendor = "NVIDIA"
requirements = NVIDIA_REQUIREMENTS
else:
raise ValueError(f"Invalid GPU type: {gpu_type.value}")

lang = config["lang"]
if lang == "cu" and gpu_vendor == "AMD":
# TODO implement HIP
raise NotImplementedError("Cannot use CUDA runs with AMD GPUs")

lang_name = {"py": "Python", "cu": "CUDA"}[lang]

logger.info(f"Attempting to trigger GitHub action for {lang_name} on {selected_workflow}")
run = GitHubRun(self.repo, self.token, selected_workflow)
logger.info(f"Successfully created GitHub run: {run.run_id}")

payload = json.dumps(config)

inputs = {"payload": payload}
if lang == "py":
inputs["requirements"] = requirements
if gpu_vendor == "AMD":
inputs["runner"] = runner_name

async with self.trigger_limit: # DO NOT REMOVE, PREVENTS A RACE CONDITION
if not await run.trigger(inputs):
raise RuntimeError(
"Failed to trigger GitHub Action. Please check the configuration."
)

await status.push("⏳ Waiting for workflow to start...")
logger.info("Waiting for workflow to start...")
await run.wait_for_completion(lambda x: self.wait_callback(x, status))
await status.update(f"Workflow [{run.run_id}]({run.html_url}) completed")
logger.info(f"Workflow [{run.run_id}]({run.html_url}) completed")
await status.push("Downloading artifacts...")
logger.info("Downloading artifacts...")

artifacts = await run.download_artifacts()
if "run-result" not in artifacts:
logger.error("Could not find `run-result` among artifacts: %s", artifacts.keys())
await status.push("Downloading artifacts... failed")
return FullResult(
success=False, error="Could not download artifacts", runs={}, system=SystemInfo()
)

logs = artifacts["run-result"]["result.json"].decode("utf-8")

await status.update("Downloading artifacts... done")
logger.info("Downloading artifacts... done")

data = json.loads(logs)
runs = {}
# convert json back to EvalResult structures, which requires
# special handling for datetime and our dataclasses.
for k, v in data["runs"].items():
if "compilation" in v and v["compilation"] is not None:
comp = CompileResult(**v["compilation"])
else:
comp = None
run = RunResult(**v["run"])
res = EvalResult(
start=datetime.datetime.fromisoformat(v["start"]),
end=datetime.datetime.fromisoformat(v["end"]),
compilation=comp,
run=run,
)
runs[k] = res

system = SystemInfo(**data.get("system", {}))
return FullResult(success=True, error="", runs=runs, system=system)

async def wait_callback(self, run: "GitHubRun", status: RunProgressReporter):
await status.update(
f"⏳ Workflow [{run.run_id}]({run.html_url}): {run.status} "
f"({run.elapsed_time.total_seconds():.1f}s)"
)


class GitHubRun:
def __init__(self, repo: str, token: str, workflow_file: str):
gh = Github(token)
Expand Down Expand Up @@ -228,8 +137,12 @@ async def trigger(self, inputs: dict) -> bool:
return False

async def wait_for_completion(
self, callback: Callable[["GitHubRun"], Awaitable[None]], timeout_minutes: int = 10
self,
callback: Callable[["GitHubRun"],
Awaitable[None]],
timeout_minutes: int = DEFAULT_GITHUB_TIMEOUT_MINUTES
):
logger.info(f"the timeout is {timeout_minutes}")
if self.run is None:
raise ValueError("Run needs to be triggered before a status check!")

Expand Down Expand Up @@ -303,5 +216,126 @@ async def download_artifacts(self) -> dict:
f"Status code: {response.status_code}"
)

logger.info("Download artifacts for run %s: %s", self.run_id, list(extracted.keys()))
logger.info("Download artifacts for run %s: %s",
self.run_id,
list(extracted.keys()))
return extracted

class GitHubLauncher(Launcher):
def __init__(self, repo: str, token: str):
super().__init__(name="GitHub", gpus=GitHubGPU)
self.repo = repo
self.token = token
self.trigger_limit = asyncio.Semaphore(1)

async def run_submission(
self, config: dict, gpu_type: GPU, status: RunProgressReporter
) -> FullResult:
selected_workflow, runner_name, requirements = self._select_workflow_params(gpu_type)
if gpu_type.value in ["MI300", "MI250"]:
gpu_vendor = "AMD"
elif gpu_type.value == "NVIDIA":
gpu_vendor = "NVIDIA"
else:
raise ValueError(f"Invalid GPU type: {gpu_type.value}")
logger.info(f"config keys are {config.keys()}")
logger.info(f"the config is {config}")
lang = config["lang"]
if lang == "cu" and gpu_vendor == "AMD":
# TODO implement HIP
raise NotImplementedError("Cannot use CUDA runs with AMD GPUs")

lang_name = {"py": "Python", "cu": "CUDA"}[lang]

logger.info(f"Attempting to trigger GitHub action for {lang_name} on {selected_workflow}")
run = GitHubRun(self.repo, self.token, selected_workflow)
logger.info(f"Successfully created GitHub run: {run.run_id}")

payload = json.dumps(config)

inputs = {"payload": payload}
if lang == "py":
inputs["requirements"] = requirements
if gpu_vendor == "AMD":
inputs["runner"] = runner_name

async with self.trigger_limit: # DO NOT REMOVE, PREVENTS A RACE CONDITION
if not await run.trigger(inputs):
raise RuntimeError(
"Failed to trigger GitHub Action. Please check the configuration."
)

await status.push("⏳ Waiting for workflow to start...")
logger.info("Waiting for workflow to start...")

timeout_minutes = self._compute_timeout_minutes(config)
await run.wait_for_completion(
lambda x: self.wait_callback(x, status), timeout_minutes=timeout_minutes
)
await status.update(f"Workflow [{run.run_id}]({run.html_url}) completed")
logger.info(f"Workflow [{run.run_id}]({run.html_url}) completed")
await status.push("⏳ Downloading artifacts...")
return await self._handle_artifacts(run, status)

async def wait_callback(self, run: "GitHubRun", status: RunProgressReporter):
await status.update(
f"⏳ Workflow [{run.run_id}]({run.html_url}): {run.status} "
f"({run.elapsed_time.total_seconds():.1f}s)"
)

def _select_workflow_params(self, gpu_type: GPU) -> tuple[str, Optional[str], str]:
"""
Returns workflow file, runner name (if any), and requirements for given GPU.
"""
if gpu_type.value in ["MI300", "MI250"]:
runner = {
"MI300": "amdgpu-mi300-x86-64",
"MI250": "amdgpu-mi250-x86-64",
}[gpu_type.value]
return "amd_workflow.yml", runner, AMD_REQUIREMENTS
if gpu_type.value == "NVIDIA":
return "nvidia_workflow.yml", None, NVIDIA_REQUIREMENTS
raise ValueError(f"Invalid GPU type: {gpu_type.value}")

def _compute_timeout_minutes(self, config: dict) -> int:
"""
Compute timeout in minutes based on submission mode and config timeouts.
"""
mode = config.get("mode")
sec_map = {
SubmissionMode.TEST.value: config.get("test_timeout"),
SubmissionMode.BENCHMARK.value: config.get("benchmark_timeout"),
SubmissionMode.LEADERBOARD.value: config.get("ranked_timeout"),
}
seconds = sec_map.get(mode) or DEFAULT_GITHUB_TIMEOUT_MINUTES * 60
return math.ceil(seconds / 60)

async def _handle_artifacts(
self,
run: GitHubRun,
status: RunProgressReporter
) -> FullResult:
logger.info("Downloading artifacts...")
artifacts = await run.download_artifacts()
if "run-result" not in artifacts:
logger.error("Could not find `run-result` among artifacts: %s", artifacts.keys())
await status.push("Downloading artifacts... failed")
return FullResult(success=False,
error="Could not download artifacts", runs={},
system=SystemInfo())
logs = artifacts["run-result"]["result.json"].decode("utf-8")
await status.update("✅ Downloading artifacts... done")
logger.info("Downloading artifacts... done")
data = json.loads(logs)
runs: dict[str, EvalResult] = {}
for key, v in data.get("runs", {}).items():
comp = CompileResult(**v["compilation"]) if v.get("compilation") else None
res_run = RunResult(**v["run"])
runs[key] = EvalResult(
start=datetime.datetime.fromisoformat(v["start"]),
end=datetime.datetime.fromisoformat(v["end"]),
compilation=comp,
run=res_run,
)
system = SystemInfo(**data.get("system", {}))
return FullResult(success=True, error="", runs=runs, system=system)
8 changes: 7 additions & 1 deletion src/discord-cluster-manager/run_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from typing import Optional, Protocol, Union

from consts import CUDA_FLAGS, ExitCode, Timeout
import logging
logger = logging.getLogger(__name__)


@dataclasses.dataclass
Expand Down Expand Up @@ -390,6 +392,8 @@ def run_cuda_script( # # noqa: C901
"""
start = datetime.datetime.now()
try:
for source in sources:
logger.info(f"Source file: {source}")
# Write submission files to directory
_create_files(sources)
_create_files(headers)
Expand Down Expand Up @@ -448,7 +452,9 @@ def run_pytorch_script( # noqa: C901
start = datetime.datetime.now()
try:
assert main in sources.keys()

print("sources are ")
print(sources)
Comment on lines +455 to +456
Copy link
Preview

Copilot AI May 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Replace this print statement with a logging call (e.g., logger.debug) for consistent logging.

Suggested change
print("sources are ")
print(sources)
logger.info("sources are:")
logger.info(sources)

Copilot uses AI. Check for mistakes.

logging.info(f"sources are {sources}")
Comment on lines +455 to +457
Copy link
Preview

Copilot AI May 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Replace this print statement with a logging call (e.g., logger.debug) for consistent logging.

Suggested change
print("sources are ")
print(sources)
logging.info(f"sources are {sources}")
logger.info(f"sources are {sources}")

Copilot uses AI. Check for mistakes.

# Write submission files to directory
_create_files(sources)

Expand Down
1 change: 1 addition & 0 deletions src/discord-cluster-manager/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ def build_task_config(
"sources": sources,
"headers": headers,
"include_dirs": task.config.include_dirs,
**common,
}


Expand Down
Loading