diff --git a/src/discord-cluster-manager/cogs/github_cog.py b/src/discord-cluster-manager/cogs/github_cog.py index cc826e0..7ecff2e 100644 --- a/src/discord-cluster-manager/cogs/github_cog.py +++ b/src/discord-cluster-manager/cogs/github_cog.py @@ -1,147 +1,72 @@ import json -import discord -from consts import GPUType +from cogs.submit_cog import ProgressReporter, SubmitCog +from consts import GitHubGPU, GPUType from discord import app_commands -from discord.ext import commands from github_runner import GitHubRun from leaderboard_eval import amd_requirements, nvidia_requirements -from report import generate_report from run_eval import CompileResult, FullResult, RunResult -from utils import build_task_config, send_discord_message, setup_logging +from utils import setup_logging logger = setup_logging() -class GitHubCog(commands.Cog): +class GitHubCog(SubmitCog): def __init__(self, bot): - self.bot = bot - self.run_submission = bot.run_group.command( - name="github", description="Run a script using GitHub Actions" - )(self.run_submission) - - @app_commands.describe( - script="The Python script file to run", - gpu_type="Choose the GPU type for GitHub Actions", - ) - @app_commands.choices( - gpu_type=[ - app_commands.Choice(name="NVIDIA", value="nvidia"), - app_commands.Choice(name="AMD", value="amd"), - ] - ) - async def run_submission( - self, - interaction: discord.Interaction, - script: discord.Attachment, - gpu_type: app_commands.Choice[str], - reference_script: discord.Attachment = None, - reference_code: str = None, - ) -> tuple[discord.Thread, FullResult]: - if not script.filename.endswith((".py", ".cu", ".cuh", ".cpp")): - await send_discord_message( - interaction, "Please provide a Python (.py) or CUDA (.cu / .cuh / .cpp) file" - ) - return None, None - - thread = await self.bot.create_thread(interaction, gpu_type.name, "GitHub Job") - await thread.send(f"Processing `{script.filename}` with {gpu_type.name}...") - - try: - script_content = (await script.read()).decode("utf-8") - selected_gpu = GPUType.AMD if gpu_type.value == "amd" else GPUType.NVIDIA - lang = "py" if script.filename.endswith(".py") else "cu" - - if reference_script is not None or reference_code is not None: - reference_content = ( - reference_code - if reference_code is not None - else (await reference_script.read()).decode("utf-8") - ) - else: - reference_content = None - - config = build_task_config( - lang=lang, - reference_content=reference_content, - submission_content=script_content, - arch=None, - ) - - artifacts = await self.execute_github_run( - gpu_type=selected_gpu, - config=config, - thread=thread, - ) - - logs = artifacts["run-result"]["result.json"].decode("utf-8") - data = json.loads(logs) - if "compile" in data and data["compile"] is not None: - comp = CompileResult(**data["compile"]) - else: - comp = None - run = RunResult(**data["run"]) - result = FullResult(success=True, error="", compile=comp, run=run) - await generate_report(thread, result) - return thread, result - - except Exception as e: - logger.error(f"Error processing request: {str(e)}", exc_info=True) - if thread: - await thread.send(f"Error processing request: {str(e)}") - raise - - async def execute_github_run( - self, - gpu_type: GPUType, - config: dict, - thread: discord.Thread, - ) -> dict: + super().__init__(bot, name="GitHub", gpus=GitHubGPU) + + def _get_arch(self, gpu_type: app_commands.Choice[str]): + return None + + async def _run_submission( + self, config: dict, gpu_type: GPUType, status: ProgressReporter + ) -> FullResult: + selected_gpu = GPUType.AMD if gpu_type.value == "amd" else GPUType.NVIDIA + lang = config["lang"] - if lang == "cu" and gpu_type == GPUType.AMD: + if lang == "cu" and selected_gpu == GPUType.AMD: # TODO implement HIP - raise ValueError("Cannot use CUDA runs with AMD GPUs") + raise NotImplementedError("Cannot use CUDA runs with AMD GPUs") lang_name = {"py": "Python", "cu": "CUDA"}[lang] - logger.info(f"Attempting to trigger GitHub action for {lang_name} on {gpu_type.name}") + logger.info(f"Attempting to trigger GitHub action for {lang_name} on {selected_gpu.name}") - workflow_file = gpu_type.value + workflow_file = selected_gpu.value run = GitHubRun(workflow_file) - try: - payload = json.dumps(config) - - inputs = {"payload": payload} - if lang == "py": - if gpu_type == GPUType.NVIDIA: - inputs["requirements"] = nvidia_requirements - else: - inputs["requirements"] = amd_requirements - - if not await run.trigger(inputs): - await thread.send( - "Failed to trigger GitHub Action. Please check the configuration." - ) - return {} - - status_msg = await thread.send( - "**Running on GitHub...**\n" "> ⏳ Waiting for workflow to start..." - ) - await run.wait_for_completion(lambda x: self.wait_callback(x, thread, status_msg)) - await thread.send(f"Running completed with status: {run.status}") - - return await run.download_artifacts() - - except Exception as e: - logger.error(f"Error in trigger_github_action: {str(e)}", exc_info=True) - raise - - async def wait_callback(self, run: GitHubRun, thread: discord.Thread, msg: discord.Message): - message = ( - f"**Running on GitHub...**\n" - f"> Workflow [{run.run_id}]({run.html_url}): {run.status}\n" - f"> ⏳ {run.elapsed_time.total_seconds():.1f} seconds\n" - ) + payload = json.dumps(config) + + inputs = {"payload": payload} + if lang == "py": + if selected_gpu == GPUType.NVIDIA: + inputs["requirements"] = nvidia_requirements + else: + inputs["requirements"] = amd_requirements + + if not await run.trigger(inputs): + raise RuntimeError("Failed to trigger GitHub Action. Please check the configuration.") + + await status.push("⏳ Waiting for workflow to start...") + await run.wait_for_completion(lambda x: self.wait_callback(x, status)) + await status.update(f"Workflow [{run.run_id}]({run.html_url}) completed") + await status.push("Downloading artifacts...") + + artifacts = await run.download_artifacts() + logs = artifacts["run-result"]["result.json"].decode("utf-8") - await msg.edit(content=message) + await status.update("Downloading artifacts... done") + + data = json.loads(logs) + if "compile" in data and data["compile"] is not None: + comp = CompileResult(**data["compile"]) + else: + comp = None + run = RunResult(**data["run"]) + return FullResult(success=True, error="", compile=comp, run=run) + + async def wait_callback(self, run: GitHubRun, status: ProgressReporter): + await status.update( + f"⏳ Workflow [{run.run_id}]({run.html_url}): {run.status} " + f"({run.elapsed_time.total_seconds():.1f}s)" + ) diff --git a/src/discord-cluster-manager/cogs/leaderboard_cog.py b/src/discord-cluster-manager/cogs/leaderboard_cog.py index a11c558..fe33d7b 100644 --- a/src/discord-cluster-manager/cogs/leaderboard_cog.py +++ b/src/discord-cluster-manager/cogs/leaderboard_cog.py @@ -2,7 +2,7 @@ from datetime import datetime from enum import Enum from io import StringIO -from typing import List, Optional +from typing import Callable, List, Optional import discord from consts import ( @@ -36,7 +36,7 @@ async def async_submit_cog_job( interaction: discord.Interaction, leaderboard_name: str, script: discord.Attachment, - command, + command: Callable, reference_code, submission_content, cog: commands.Cog, @@ -44,8 +44,7 @@ async def async_submit_cog_job( runner_name: str = "GitHub", ): try: - discord_thread, result = await command.callback( - cog, + discord_thread, result = await command( interaction, script, app_commands.Choice( @@ -173,7 +172,7 @@ async def on_submit_hook( interaction: discord.Interaction, leaderboard_name: str, script: discord.Attachment, - command, + command: Callable, cog: commands.Cog, GPUsEnum: Enum, runner_name: str, @@ -251,7 +250,7 @@ async def submit( await send_discord_message(interaction, f"❌ Required {runner_name} cogs not found!") return - runner_command = runner_cog.run_submission + runner_command = runner_cog.submit_leaderboard success = await self.on_submit_hook( interaction, diff --git a/src/discord-cluster-manager/cogs/modal_cog.py b/src/discord-cluster-manager/cogs/modal_cog.py index 277311d..6c2f414 100644 --- a/src/discord-cluster-manager/cogs/modal_cog.py +++ b/src/discord-cluster-manager/cogs/modal_cog.py @@ -1,119 +1,38 @@ import asyncio -from typing import Optional -import discord import modal -from consts import GPU_TO_SM, ModalGPU +from cogs.submit_cog import ProgressReporter, SubmitCog +from consts import GPU_TO_SM, GPUType, ModalGPU from discord import app_commands -from discord.ext import commands -from report import generate_report from run_eval import FullResult -from utils import build_task_config, send_discord_message, setup_logging +from utils import setup_logging logger = setup_logging() -class ModalCog(commands.Cog): +class ModalCog(SubmitCog): def __init__(self, bot): - self.bot = bot + super().__init__(bot, "Modal", gpus=ModalGPU) - self.run_submission = bot.run_group.command( - name="modal", description="Run a script using Modal" - )(self.run_submission) + def _get_arch(self, gpu_type: app_commands.Choice[str]): + return GPU_TO_SM[gpu_type.value.upper()] - @app_commands.describe( - script="The Python script file to run", gpu_type="Choose the GPU type for Modal" - ) - @app_commands.choices( - gpu_type=[app_commands.Choice(name=gpu.name, value=gpu.value) for gpu in ModalGPU] - ) - async def run_submission( - self, - interaction: discord.Interaction, - script: discord.Attachment, - gpu_type: app_commands.Choice[str], - reference_script: Optional[discord.Attachment] = None, - reference_code: str = None, - ) -> tuple[discord.Thread, FullResult]: - thread = None - status_msg = None - try: - if not script.filename.endswith((".py", ".cu", ".cuh", ".cpp")): - await send_discord_message( - interaction, - "Please provide a Python (.py) or CUDA (.cu / .cuh / .cpp) file", - ephemeral=True, - ) - return None, None - - if not interaction.response.is_done(): - await interaction.response.defer(ephemeral=True) - - channel = interaction.channel - message = await channel.send(f"Starting Modal job with {gpu_type.name}...") - thread = await message.create_thread(name=f"{gpu_type.name} Modal Job") - - script_content = (await script.read()).decode("utf-8") - status_msg = await thread.send( - "**Running on Modal...**\n> ⏳ Waiting for available GPU..." - ) - - filename = "submission.py" if script.filename.endswith(".py") else "train.cu" - reference_content = None - if reference_script is not None or reference_code is not None: - reference_content = ( - reference_code - if reference_code is not None - else (await reference_script.read()).decode("utf-8") - ) - - config = build_task_config( - lang="py" if filename.endswith(".py") else "cu", - reference_content=reference_content, - submission_content=script_content, - arch=GPU_TO_SM[gpu_type.value.upper()], - ) - - result = await self.handle_modal_execution( - thread, - gpu_type.value, - config, - status_msg, - ) - return thread, result + async def _run_submission( + self, config: dict, gpu_type: GPUType, status: ProgressReporter + ) -> FullResult: + loop = asyncio.get_event_loop() + func_type = "pytorch" if config["lang"] == "py" else "cuda" + func_name = f"run_{func_type}_script_{gpu_type.value.lower()}" - except Exception as e: - logger.error(f"Error processing request: {str(e)}", exc_info=True) - if thread and status_msg: - await status_msg.edit(content="**Running on Modal...**\n> ❌ Job failed!") - await thread.send(f"**Error:** {str(e)}") - raise + logger.info(f"Starting modal run using {func_name}") - async def handle_modal_execution( - self, - thread: discord.Thread, - gpu_type: str, - config: dict, - status_msg: discord.Message, - ) -> FullResult: - try: - loop = asyncio.get_event_loop() - func_type = "pytorch" if config["lang"] == "py" else "cuda" - func_name = f"run_{func_type}_script_{gpu_type.lower()}" + await status.push("⏳ Waiting for available GPU...") - result = await loop.run_in_executor( - None, - lambda: modal.Function.lookup("discord-bot-runner", func_name).remote( - config=config - ), - ) + result = await loop.run_in_executor( + None, + lambda: modal.Function.lookup("discord-bot-runner", func_name).remote(config=config), + ) - # Send results - await generate_report(thread, result) - return result + await status.update(" Waiting for available GPU... Done") - except Exception as e: - logger.error(f"Error in handle_modal_execution: {str(e)}", exc_info=True) - await status_msg.edit(content="**Running on Modal...**\n> ❌ Job failed!") - await thread.send(f"**Error:** {str(e)}") - raise + return result diff --git a/src/discord-cluster-manager/cogs/submit_cog.py b/src/discord-cluster-manager/cogs/submit_cog.py new file mode 100644 index 0000000..b72bd93 --- /dev/null +++ b/src/discord-cluster-manager/cogs/submit_cog.py @@ -0,0 +1,206 @@ +from enum import Enum +from typing import Optional, Tuple, Type + +import discord +from discord import app_commands +from discord.ext import commands +from report import generate_report +from run_eval import FullResult +from utils import build_task_config, send_discord_message, setup_logging + +logger = setup_logging() + + +class ProgressReporter: + def __init__(self, status_msg: discord.Message, header: str): + self.header = header + self.lines = [] + self.status = status_msg + + @staticmethod + async def make_reporter(thread: discord.Thread, content: str): + status_msg = await thread.send(f"**{content}**\n") + return ProgressReporter(status_msg, content) + + async def push(self, content: str): + self.lines.append(f"> {content}") + await self._update_message() + + async def update(self, new_content: str): + self.lines[-1] = f"> {new_content}" + await self._update_message() + + async def update_header(self, new_header): + self.header = new_header + await self._update_message() + + async def _update_message(self): + message = str.join("\n", [f"**{self.header}**"] + self.lines) + await self.status.edit(content=message, suppress=True) + + +class SubmitCog(commands.Cog): + """ + Base class for code submission / run schedular cogs. + + Derived classes need to implement a `_get_arch(self, gpu_type: app_commands.Choice[str])` + method to translate the selected GPU to an architecture argument for Cuda, + and a + ``` + run_submission(self, config: dict, gpu_type: GPUType, + status: ProgressReporter) -> FullResult + ``` + coroutine, which handles the actual submission. + + This base class will register a `run` subcommand with the runner's name, which can be used + to run a single (non-leaderboard) script. + """ + + def __init__(self, bot, name: str, gpus: Type[Enum]): + self.bot = bot + self.name = name + + choices = [app_commands.Choice(name=c.name, value=c.value) for c in gpus] + + run_fn = self.run_script + + # note: these helpers want to set custom attributes on the function, but `method` + # does not allow setting any attributes, so we define this wrapper + async def run( + interaction: discord.Interaction, + script: discord.Attachment, + gpu_type: app_commands.Choice[str], + ): + return await run_fn(interaction, script, gpu_type) + + run = app_commands.choices(gpu_type=choices)(run) + run = app_commands.describe( + script="The Python/CUDA script file to run", + gpu_type=f"Choose the GPU type for {name}", + )(run) + + self.run_script = bot.run_group.command( + name=self.name.lower(), description=f"Run a script using {self.name}" + )(run) + + async def submit_leaderboard( + self, + interaction: discord.Interaction, + script: discord.Attachment, + gpu_type: app_commands.Choice[str], + reference_code: str, + ) -> Tuple[Optional[discord.Thread], Optional[FullResult]]: + """ + Function invoked by `leaderboard_cog` to handle a leaderboard run. + """ + thread, result = await self._handle_submission( + interaction, + gpu_type, + script=script, + reference_content=reference_code, + ) + + return thread, result + + async def run_script( + self, + interaction: discord.Interaction, + script: discord.Attachment, + gpu_type: app_commands.Choice[str], + ): + """ + Function invoked by the `run` command to run a single script. + """ + await self._handle_submission( + interaction, + gpu_type, + script=script, + reference_content=None, + ) + + async def _handle_submission( + self, + interaction: discord.Interaction, + gpu_type: app_commands.Choice[str], + script: discord.Attachment, + reference_content: Optional[str], + ) -> Tuple[Optional[discord.Thread], Optional[FullResult]]: + """ + Generic function to handle code submissions. + Args: + interaction: Interaction that started this command. + gpu_type: Which GPU to run on. + script: File that contains the submitted script. + reference_content: String with the reference code, if provided. + + Returns: + if successful, returns the created discord thread, and the result of + the run. + """ + + script_content = await self._validate_input_file(interaction, script) + if script_content is None: + return None, None + + # TODO figure out the correct way to handle messaging here + thread = await self.bot.create_thread(interaction, gpu_type.name, f"{self.name} Job") + await thread.send( + f"Starting {self.name} job for " f"`{script.filename}` with {gpu_type.name}..." + ) + + status = await ProgressReporter.make_reporter(thread, f"Running on {self.name}...") + + config = build_task_config( + lang="py" if script.filename.endswith(".py") else "cu", + reference_content=reference_content, + submission_content=script_content, + arch=self._get_arch(gpu_type), + ) + + result = await self._run_submission(config, gpu_type, status) + await status.update_header(f"Running on {self.name}... ✅ success") + await generate_report(thread, result, has_tests=reference_content is not None) + + return thread, result + + async def _validate_input_file( + self, + interaction: discord.Interaction, + script: discord.Attachment, + ) -> Optional[str]: + # check file extension + if not script.filename.endswith((".py", ".cu", ".cuh", ".cpp")): + await send_discord_message( + interaction, "Please provide a Python (.py) or CUDA (.cu / .cuh / .cpp) file" + ) + return None + + # load and decode + try: + return (await script.read()).decode("utf-8") + except UnicodeError: + await send_discord_message( + interaction, + f"Could not decode your file `{script.filename}`.\n" f"Is it UTF-8?", + ephemeral=True, + ) + return None + + async def _run_submission( + self, config: dict, gpu_type: app_commands.Choice[str], status: ProgressReporter + ) -> FullResult: + """ + Run a submission specified by `config`. + To be implemented in derived classes. + Args: + config: the config object containing all necessary runner information. + gpu_type: Which GPU to run for. + status: callback object that allows updating the status message in discord + + Returns: + Result of running `config`. + """ + raise NotImplementedError() + + def _get_arch(self, gpu_type: app_commands.Choice[str]): + raise NotImplementedError() diff --git a/src/discord-cluster-manager/report.py b/src/discord-cluster-manager/report.py index d911ff1..77d90f3 100644 --- a/src/discord-cluster-manager/report.py +++ b/src/discord-cluster-manager/report.py @@ -38,7 +38,7 @@ async def _send_split_log(thread: discord.Thread, partial_message: str, header: return "" -async def generate_report(thread: discord.Thread, result: FullResult): # noqa: C901 +async def generate_report(thread: discord.Thread, result: FullResult, has_tests: bool): # noqa: C901 message = "" if not result.success: message += "# Failure\n" @@ -91,23 +91,31 @@ async def generate_report(thread: discord.Thread, result: FullResult): # noqa: return - if not run.passed: - message += "# Testing failed\n" - message += "Command " - message += f"```bash\n{_limit_length(run.command, 1000)}```\n" - message += f"ran successfully in {run.duration:.2} seconds, but did not pass all tests.\n" - - if len(run.stderr.strip()) > 0: - message = await _send_split_log(thread, message, "Program stderr", run.stderr.strip()) - - if len(run.stdout.strip()) > 0: - message = await _send_split_log(thread, message, "Program stdout", run.stdout.strip()) - - if len(message) != 0: - await thread.send(message) - - # TODO dedicated "error" entry in our results that gets populated by check_implementation - return + if has_tests: + if not run.passed: + message += "# Testing failed\n" + message += "Command " + message += f"```bash\n{_limit_length(run.command, 1000)}```\n" + message += ( + f"ran successfully in {run.duration:.2} seconds, but did not pass all tests.\n" + ) + + if len(run.stderr.strip()) > 0: + message = await _send_split_log( + thread, message, "Program stderr", run.stderr.strip() + ) + + if len(run.stdout.strip()) > 0: + message = await _send_split_log( + thread, message, "Program stdout", run.stdout.strip() + ) + + if len(message) != 0: + await thread.send(message) + + # TODO dedicated "error" entry in our results that gets + # populated by check_implementation + return # OK, we were successful message += "# Success!\n" @@ -115,7 +123,8 @@ async def generate_report(thread: discord.Thread, result: FullResult): # noqa: message += f"```bash\n{_limit_length(run.command, 1000)}```\n" message += f"ran successfully in {run.duration:.2} seconds.\n" - message = await _send_split_log(thread, message, "Result", pprint.pformat(run.result)) + if has_tests: + message = await _send_split_log(thread, message, "Result", pprint.pformat(run.result)) if len(run.stderr.strip()) > 0: message = await _send_split_log(thread, message, "Program stderr", run.stderr.strip())