Skip to content

Commit

Permalink
SubmitCog base class to extract common functionality;
Browse files Browse the repository at this point in the history
run command only runs a single file now
  • Loading branch information
ngc92 committed Jan 19, 2025
1 parent f86fd96 commit c70bb1b
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 254 deletions.
179 changes: 52 additions & 127 deletions src/discord-cluster-manager/cogs/github_cog.py
Original file line number Diff line number Diff line change
@@ -1,147 +1,72 @@
import json

import discord
from consts import GPUType
from cogs.submit_cog import ProgressReporter, SubmitCog
from consts import GitHubGPU, GPUType
from discord import app_commands
from discord.ext import commands
from github_runner import GitHubRun
from leaderboard_eval import amd_requirements, nvidia_requirements
from report import generate_report
from run_eval import CompileResult, FullResult, RunResult
from utils import build_task_config, send_discord_message, setup_logging
from utils import setup_logging

logger = setup_logging()


class GitHubCog(commands.Cog):
class GitHubCog(SubmitCog):
def __init__(self, bot):
self.bot = bot
self.run_submission = bot.run_group.command(
name="github", description="Run a script using GitHub Actions"
)(self.run_submission)

@app_commands.describe(
script="The Python script file to run",
gpu_type="Choose the GPU type for GitHub Actions",
)
@app_commands.choices(
gpu_type=[
app_commands.Choice(name="NVIDIA", value="nvidia"),
app_commands.Choice(name="AMD", value="amd"),
]
)
async def run_submission(
self,
interaction: discord.Interaction,
script: discord.Attachment,
gpu_type: app_commands.Choice[str],
reference_script: discord.Attachment = None,
reference_code: str = None,
) -> tuple[discord.Thread, FullResult]:
if not script.filename.endswith((".py", ".cu", ".cuh", ".cpp")):
await send_discord_message(
interaction, "Please provide a Python (.py) or CUDA (.cu / .cuh / .cpp) file"
)
return None, None

thread = await self.bot.create_thread(interaction, gpu_type.name, "GitHub Job")
await thread.send(f"Processing `{script.filename}` with {gpu_type.name}...")

try:
script_content = (await script.read()).decode("utf-8")
selected_gpu = GPUType.AMD if gpu_type.value == "amd" else GPUType.NVIDIA
lang = "py" if script.filename.endswith(".py") else "cu"

if reference_script is not None or reference_code is not None:
reference_content = (
reference_code
if reference_code is not None
else (await reference_script.read()).decode("utf-8")
)
else:
reference_content = None

config = build_task_config(
lang=lang,
reference_content=reference_content,
submission_content=script_content,
arch=None,
)

artifacts = await self.execute_github_run(
gpu_type=selected_gpu,
config=config,
thread=thread,
)

logs = artifacts["run-result"]["result.json"].decode("utf-8")
data = json.loads(logs)
if "compile" in data and data["compile"] is not None:
comp = CompileResult(**data["compile"])
else:
comp = None
run = RunResult(**data["run"])
result = FullResult(success=True, error="", compile=comp, run=run)
await generate_report(thread, result)
return thread, result

except Exception as e:
logger.error(f"Error processing request: {str(e)}", exc_info=True)
if thread:
await thread.send(f"Error processing request: {str(e)}")
raise

async def execute_github_run(
self,
gpu_type: GPUType,
config: dict,
thread: discord.Thread,
) -> dict:
super().__init__(bot, name="GitHub", gpus=GitHubGPU)

def _get_arch(self, gpu_type: app_commands.Choice[str]):
return None

async def _run_submission(
self, config: dict, gpu_type: GPUType, status: ProgressReporter
) -> FullResult:
selected_gpu = GPUType.AMD if gpu_type.value == "amd" else GPUType.NVIDIA

lang = config["lang"]
if lang == "cu" and gpu_type == GPUType.AMD:
if lang == "cu" and selected_gpu == GPUType.AMD:
# TODO implement HIP
raise ValueError("Cannot use CUDA runs with AMD GPUs")
raise NotImplementedError("Cannot use CUDA runs with AMD GPUs")

lang_name = {"py": "Python", "cu": "CUDA"}[lang]

logger.info(f"Attempting to trigger GitHub action for {lang_name} on {gpu_type.name}")
logger.info(f"Attempting to trigger GitHub action for {lang_name} on {selected_gpu.name}")

workflow_file = gpu_type.value
workflow_file = selected_gpu.value
run = GitHubRun(workflow_file)

try:
payload = json.dumps(config)

inputs = {"payload": payload}
if lang == "py":
if gpu_type == GPUType.NVIDIA:
inputs["requirements"] = nvidia_requirements
else:
inputs["requirements"] = amd_requirements

if not await run.trigger(inputs):
await thread.send(
"Failed to trigger GitHub Action. Please check the configuration."
)
return {}

status_msg = await thread.send(
"**Running on GitHub...**\n" "> ⏳ Waiting for workflow to start..."
)
await run.wait_for_completion(lambda x: self.wait_callback(x, thread, status_msg))
await thread.send(f"Running completed with status: {run.status}")

return await run.download_artifacts()

except Exception as e:
logger.error(f"Error in trigger_github_action: {str(e)}", exc_info=True)
raise

async def wait_callback(self, run: GitHubRun, thread: discord.Thread, msg: discord.Message):
message = (
f"**Running on GitHub...**\n"
f"> Workflow [{run.run_id}]({run.html_url}): {run.status}\n"
f"> ⏳ {run.elapsed_time.total_seconds():.1f} seconds\n"
)
payload = json.dumps(config)

inputs = {"payload": payload}
if lang == "py":
if selected_gpu == GPUType.NVIDIA:
inputs["requirements"] = nvidia_requirements
else:
inputs["requirements"] = amd_requirements

if not await run.trigger(inputs):
raise RuntimeError("Failed to trigger GitHub Action. Please check the configuration.")

await status.push("⏳ Waiting for workflow to start...")
await run.wait_for_completion(lambda x: self.wait_callback(x, status))
await status.update(f"Workflow [{run.run_id}]({run.html_url}) completed")
await status.push("Downloading artifacts...")

artifacts = await run.download_artifacts()
logs = artifacts["run-result"]["result.json"].decode("utf-8")

await msg.edit(content=message)
await status.update("Downloading artifacts... done")

data = json.loads(logs)
if "compile" in data and data["compile"] is not None:
comp = CompileResult(**data["compile"])
else:
comp = None
run = RunResult(**data["run"])
return FullResult(success=True, error="", compile=comp, run=run)

async def wait_callback(self, run: GitHubRun, status: ProgressReporter):
await status.update(
f"⏳ Workflow [{run.run_id}]({run.html_url}): {run.status} "
f"({run.elapsed_time.total_seconds():.1f}s)"
)
11 changes: 5 additions & 6 deletions src/discord-cluster-manager/cogs/leaderboard_cog.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from datetime import datetime
from enum import Enum
from io import StringIO
from typing import List, Optional
from typing import Callable, List, Optional

import discord
from consts import (
Expand Down Expand Up @@ -36,16 +36,15 @@ async def async_submit_cog_job(
interaction: discord.Interaction,
leaderboard_name: str,
script: discord.Attachment,
command,
command: Callable,
reference_code,
submission_content,
cog: commands.Cog,
gpu: AllGPU,
runner_name: str = "GitHub",
):
try:
discord_thread, result = await command.callback(
cog,
discord_thread, result = await command(
interaction,
script,
app_commands.Choice(
Expand Down Expand Up @@ -173,7 +172,7 @@ async def on_submit_hook(
interaction: discord.Interaction,
leaderboard_name: str,
script: discord.Attachment,
command,
command: Callable,
cog: commands.Cog,
GPUsEnum: Enum,
runner_name: str,
Expand Down Expand Up @@ -251,7 +250,7 @@ async def submit(
await send_discord_message(interaction, f"❌ Required {runner_name} cogs not found!")
return

runner_command = runner_cog.run_submission
runner_command = runner_cog.submit_leaderboard

success = await self.on_submit_hook(
interaction,
Expand Down
123 changes: 21 additions & 102 deletions src/discord-cluster-manager/cogs/modal_cog.py
Original file line number Diff line number Diff line change
@@ -1,119 +1,38 @@
import asyncio
from typing import Optional

import discord
import modal
from consts import GPU_TO_SM, ModalGPU
from cogs.submit_cog import ProgressReporter, SubmitCog
from consts import GPU_TO_SM, GPUType, ModalGPU
from discord import app_commands
from discord.ext import commands
from report import generate_report
from run_eval import FullResult
from utils import build_task_config, send_discord_message, setup_logging
from utils import setup_logging

logger = setup_logging()


class ModalCog(commands.Cog):
class ModalCog(SubmitCog):
def __init__(self, bot):
self.bot = bot
super().__init__(bot, "Modal", gpus=ModalGPU)

self.run_submission = bot.run_group.command(
name="modal", description="Run a script using Modal"
)(self.run_submission)
def _get_arch(self, gpu_type: app_commands.Choice[str]):
return GPU_TO_SM[gpu_type.value.upper()]

@app_commands.describe(
script="The Python script file to run", gpu_type="Choose the GPU type for Modal"
)
@app_commands.choices(
gpu_type=[app_commands.Choice(name=gpu.name, value=gpu.value) for gpu in ModalGPU]
)
async def run_submission(
self,
interaction: discord.Interaction,
script: discord.Attachment,
gpu_type: app_commands.Choice[str],
reference_script: Optional[discord.Attachment] = None,
reference_code: str = None,
) -> tuple[discord.Thread, FullResult]:
thread = None
status_msg = None
try:
if not script.filename.endswith((".py", ".cu", ".cuh", ".cpp")):
await send_discord_message(
interaction,
"Please provide a Python (.py) or CUDA (.cu / .cuh / .cpp) file",
ephemeral=True,
)
return None, None

if not interaction.response.is_done():
await interaction.response.defer(ephemeral=True)

channel = interaction.channel
message = await channel.send(f"Starting Modal job with {gpu_type.name}...")
thread = await message.create_thread(name=f"{gpu_type.name} Modal Job")

script_content = (await script.read()).decode("utf-8")
status_msg = await thread.send(
"**Running on Modal...**\n> ⏳ Waiting for available GPU..."
)

filename = "submission.py" if script.filename.endswith(".py") else "train.cu"
reference_content = None
if reference_script is not None or reference_code is not None:
reference_content = (
reference_code
if reference_code is not None
else (await reference_script.read()).decode("utf-8")
)

config = build_task_config(
lang="py" if filename.endswith(".py") else "cu",
reference_content=reference_content,
submission_content=script_content,
arch=GPU_TO_SM[gpu_type.value.upper()],
)

result = await self.handle_modal_execution(
thread,
gpu_type.value,
config,
status_msg,
)
return thread, result
async def _run_submission(
self, config: dict, gpu_type: GPUType, status: ProgressReporter
) -> FullResult:
loop = asyncio.get_event_loop()
func_type = "pytorch" if config["lang"] == "py" else "cuda"
func_name = f"run_{func_type}_script_{gpu_type.value.lower()}"

except Exception as e:
logger.error(f"Error processing request: {str(e)}", exc_info=True)
if thread and status_msg:
await status_msg.edit(content="**Running on Modal...**\n> ❌ Job failed!")
await thread.send(f"**Error:** {str(e)}")
raise
logger.info(f"Starting modal run using {func_name}")

async def handle_modal_execution(
self,
thread: discord.Thread,
gpu_type: str,
config: dict,
status_msg: discord.Message,
) -> FullResult:
try:
loop = asyncio.get_event_loop()
func_type = "pytorch" if config["lang"] == "py" else "cuda"
func_name = f"run_{func_type}_script_{gpu_type.lower()}"
await status.push("⏳ Waiting for available GPU...")

result = await loop.run_in_executor(
None,
lambda: modal.Function.lookup("discord-bot-runner", func_name).remote(
config=config
),
)
result = await loop.run_in_executor(
None,
lambda: modal.Function.lookup("discord-bot-runner", func_name).remote(config=config),
)

# Send results
await generate_report(thread, result)
return result
await status.update(" Waiting for available GPU... Done")

except Exception as e:
logger.error(f"Error in handle_modal_execution: {str(e)}", exc_info=True)
await status_msg.edit(content="**Running on Modal...**\n> ❌ Job failed!")
await thread.send(f"**Error:** {str(e)}")
raise
return result
Loading

0 comments on commit c70bb1b

Please sign in to comment.