diff --git a/examples/experimental/multiple-applications-per-pod/README.md b/examples/experimental/multiple-applications-per-pod/README.md index 03e1fa45..e5ef9074 100644 --- a/examples/experimental/multiple-applications-per-pod/README.md +++ b/examples/experimental/multiple-applications-per-pod/README.md @@ -153,13 +153,21 @@ flux resource list What we are seeing in the above is the set of resources that need to be shared across the containers (brokers). We don't want to oversubscribe, or for example, tell any specific broker that it can use all the resources while we tell the same to the others. We have to be careful that we use the Python install that is alongside the Flux install. Note that *you should not run this* but I want to show you how the queue was started. You can issue `--help` to see all the options to customize: ```bash +# You can look at help... /mnt/flux/view/bin/python3.11 /mnt/flux/view/fluxion_controller.py start --help -# This is how it was started using the defaults (do not run this again) +# But DO NOT RUN THIS! It's already running. +# This is how the fluxion controller was started using the defaults (do not run this again) /mnt/flux/view/bin/python3.11 /mnt/flux/view/fluxion_controller.py start ``` -To submit a job, (and you can do this from any of the flux container brokers) - it will be hitting a web service that the Python script is exposing from the queue! +You might want to watch the main fluxion controller in a different terminal before submitting the job: + +```bash +kubectl logs flux-sample-0-wxxkp -f +``` + +Then from your interactive terminal, let's submit a job! To do that (and you can do this from any of the flux container brokers) - it will be hitting a web service that the Python script is exposing from the queue! ```bash /mnt/flux/view/bin/python3.11 /mnt/flux/view/fluxion_controller.py submit --help @@ -181,14 +189,63 @@ And then we see from where we submit: And from the Fluxion service script: ```console -{'command': ['ior'], 'cpu': '4', 'container': 'ior'} -๐Ÿ™๏ธ Requesting to submit: ior -โœ…๏ธ Match of jobspec to Fluxion graph success! -10.244.0.18 - - [01/Jul/2024 23:55:42] "POST /submit HTTP/1.1" 200 - -๐Ÿ‘‰๏ธ Job on ior 1 is complete. -โœ…๏ธ Cancel of jobid 1 success! +INFO:werkzeug:Press CTRL+C to quit +INFO:fluxion_controller:{'command': ['ior'], 'cpu': '4', 'container': 'ior', 'duration': None, 'workdir': None} +INFO:fluxion_controller:{'t_depend': 1720132156.9946244, 't_run': 1720132157.0078795, 't_cleanup': 1720132157.5837069, 't_inactive': 1720132157.5849578, 'duration': 3600.0, 'expiration': 0.0, 'name': 'ior', 'cwd': '', 'queue': '', 'project': '', 'bank': '', 'ntasks': 1, 'ncores': 4, 'nnodes': 1, 'priority': 16, 'ranks': '3', 'nodelist': 'flux-sample-3', 'success': True, 'result': 'COMPLETED', 'waitstatus': 0, 'id': JobID(13180046671872), 't_submit': 1720132156.9826128, 't_remaining': 0.0, 'state': 'INACTIVE', 'username': 'root', 'userid': 0, 'urgency': 16, 'runtime': 0.5758273601531982, 'status': 'COMPLETED', 'returncode': 0, 'dependencies': [], 'annotations': {}, 'exception': {'occurred': False, 'severity': '', 'type': '', 'note': ''}, 'container': 'ior', 'fluxion': 1} +INFO:werkzeug:10.244.0.27 - - [04/Jul/2024 22:29:18] "POST /submit HTTP/1.1" 200 - +INFO:fluxion_controller:๐Ÿ‘‰๏ธ Job on ior 1 is complete. +INFO:fluxion_controller:โœ…๏ธ Cancel of jobid 1 success! +``` + +Let's try submitting a job to the lammps application broker (container) now (note the container working directory has the input files) + +```bash +/mnt/flux/view/bin/python3.11 /mnt/flux/view/fluxion_controller.py submit --cpu 4 --container lammps lmp -v x 2 -v y 2 -v z 2 -in in.reaxc.hns -nocite +``` + +This one actually takes more than a second to run, so wait for that, and you'll eventually see the fluxion detect it is finished and clean up: + +```bash +INFO:werkzeug:10.244.0.27 - - [04/Jul/2024 22:30:41] "POST /submit HTTP/1.1" 200 - +INFO:fluxion_controller:๐Ÿ‘‰๏ธ Job on lammps 2 is complete. +INFO:fluxion_controller:โœ…๏ธ Cancel of jobid 2 success! +``` + +Let's now list jobs for one container... + +```bash +/mnt/flux/view/bin/python3.11 /mnt/flux/view/fluxion_controller.py jobs --container lammps +``` +```console + Jobs for Lammps +โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”“ +โ”ƒ Container โ”ƒ Id โ”ƒ Name โ”ƒ Status โ”ƒ Nodes โ”ƒ Cores โ”ƒ Runtime โ”ƒ Returncode โ”ƒ +โ”กโ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”ฉ +โ”‚ lammps โ”‚ ฦ’XBNj4c7 โ”‚ lmp โ”‚ INACTIVE (COMPLETED) โ”‚ 1 โ”‚ 4 โ”‚ 27 โ”‚ 0 โ”‚ +โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ +``` + +Or all containers! + + +```bash +/mnt/flux/view/bin/python3.11 /mnt/flux/view/fluxion_controller.py jobs +``` +```console +โญ๏ธ Found application queue: index 0 +โญ๏ธ Found application chatterbug: index 3 +โญ๏ธ Found application ior: index 2 +โญ๏ธ Found application lammps: index 1 + Jobs for Queue, Chatterbug, Ior, Lammps +โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”โ”โ”โ”ณโ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”“ +โ”ƒ Container โ”ƒ Id โ”ƒ Name โ”ƒ Status โ”ƒ Nodes โ”ƒ Cores โ”ƒ Runtime โ”ƒ Returncode โ”ƒ +โ”กโ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ”โ”โ”โ•‡โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”ฉ +โ”‚ queue โ”‚ ฦ’4WEiYej โ”‚ python3.11 โ”‚ RUN โ”‚ 1 โ”‚ 1 โ”‚ 45 โ”‚ โ”‚ +โ”‚ ior โ”‚ ฦ’U1BWHuH โ”‚ ior โ”‚ INACTIVE (COMPLETED) โ”‚ 1 โ”‚ 4 โ”‚ 0 โ”‚ 0 โ”‚ +โ”‚ lammps โ”‚ ฦ’XBNj4c7 โ”‚ lmp โ”‚ INACTIVE (COMPLETED) โ”‚ 1 โ”‚ 4 โ”‚ 27 โ”‚ 0 โ”‚ +โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ ``` -I am calling this "pancake elasticity" since we can theoretically deploy many application containers and then use them when needed, essentially expanding the one running out (resource wise) while the others remain flat (not using resources). This isn't entirely ready yet (still testing) but a lot of the automation is in place. +Lammps will show up like that when it is finished. I am calling this "pancake elasticity" since we can theoretically deploy many application containers and then use them when needed, essentially expanding the one running out (resource wise) while the others remain flat (not using resources). This isn't entirely ready yet (still testing) but a lot of the automation is in place. It's so super cool!! :D This is going to likely inspire the next round of work for thinking about scheduling and fluxion. diff --git a/examples/experimental/multiple-applications-per-pod/fluxion_controller.py b/examples/experimental/multiple-applications-per-pod/fluxion_controller.py index d8e44a5a..aae3061b 100644 --- a/examples/experimental/multiple-applications-per-pod/fluxion_controller.py +++ b/examples/experimental/multiple-applications-per-pod/fluxion_controller.py @@ -5,8 +5,13 @@ import argparse import requests import threading +import logging from flask import Flask, jsonify, request +from rich.console import Console +from rich.table import Table + + try: import flux import flux.job @@ -28,6 +33,8 @@ ctrl = None app = Flask(__name__) +logging.basicConfig(level=logging.DEBUG) + @app.route("/submit", methods=["POST"]) def submit_job(): @@ -37,12 +44,19 @@ def submit_job(): global ctrl data = request.get_json() - print(data) + app.logger.info(data) for required in ["command", "cpu", "container"]: if required not in data or not data[required]: return jsonify({"error": f"{required} is required."}) - response = ctrl.submit_job(data["container"], data["cpu"], data["command"]) + response = ctrl.submit_job( + data["container"], + data["cpu"], + data["command"], + workdir=data["workdir"], + duration=data["duration"], + ) + app.logger.info(response) return jsonify(response) @@ -133,12 +147,11 @@ def __init__( def populate_jobs(self): """ Given running queues, populate with current jobs + + TBA: this will handle restoring from shutdown state. + Not supported yet. """ pass - # TODO how do we do this? We essentially need to restore state - import IPython - - IPython.embed() def discover_containers(self): """ @@ -249,7 +262,9 @@ def run(self): # Get the status of the job from the handle info = flux.job.get_job(handle, jobset["container"]) if info["result"] == "COMPLETED": - print(f"๐Ÿ‘‰๏ธ Job on {container} {jobset['fluxion']} is complete.") + app.logger.info( + f"๐Ÿ‘‰๏ธ Job on {container} {jobset['fluxion']} is complete." + ) self.cancel(jobset["fluxion"]) continue # Otherwise add back to jobs set @@ -267,16 +282,94 @@ def cancel(self, jobid): try: response = self.cli.cancel(jobid=jobid) if response.status == fluxion_pb2.CancelResponse.ResultType.CANCEL_SUCCESS: - print(f"โœ…๏ธ Cancel of jobid {jobid} success!") + app.logger.info(f"โœ…๏ธ Cancel of jobid {jobid} success!") else: - print(f"Issue with cancel, return code {response.status}") + app.logger.info(f"Issue with cancel, return code {response.status}") except: - print(f"โœ…๏ธ jobid {jobid} is already inactive.") + app.logger.info(f"โœ…๏ธ jobid {jobid} is already inactive.") + + def submit_error(self, message): + """ + Given a message, print (for the developer log) and return as json + """ + print(message) + return {"error": message} + + def list_jobs(self, containers): + """ + List jobs for one or more containers + """ + if not containers: + containers = list(self.handles.keys()) + if not containers: + sys.exit( + "One or more application target containers are required (--container)" + ) + + # Create a pretty table! + names = ", ".join(x.capitalize() for x in containers) + table = Table(title=f"Jobs for {names}") + + # These are the header columns + table.add_column("Container", justify="right", style="cyan", no_wrap=True) + table.add_column("Id", style="magenta") + table.add_column("Name", style="magenta") + table.add_column("Status", style="magenta") + table.add_column("Nodes", style="magenta") + table.add_column("Cores", style="magenta") + table.add_column("Runtime", style="magenta") + table.add_column("Returncode", justify="right", style="green") + + # They are asking for a broker container handle that doesn't exist + for container in containers: + table = self.list_container_jobs(container, table) + + console = Console() + console.print(table) + + def list_container_jobs(self, container, table): + """ + List jobs for a single container, adding to a single table + """ + # Allow failure and continue + if container not in self.handles: + print(f"Application container handle for {container} does not exist.") + return + + # Our broker hook to the container + handle = self.handles[container] + jobs = flux.job.job_list(handle).get()["jobs"] + + for info in jobs: + job = flux.job.get_job(handle, info["id"]) + status = f"{job['state']} ({job['status']})" + if job["status"] == job["state"]: + status = job["state"] + runtime = str(int(job["runtime"])) + jobid = str(job["id"]) + table.add_row( + container, + jobid, + job["name"], + status, + str(job["nnodes"]), + str(job["ncores"]), + runtime, + str(job["returncode"]), + ) + return table - def submit_job(self, container, cpu_count, command): + def submit_job( + self, + container, + cpu_count, + command, + workdir=None, + duration=None, + environment=None, + ): """ Demo of submitting a job. We will want a more robust way to do this. - TODO: add working directory, duration, environment, etc. This currently just asks for the command and total cores across nodes. We let fluxion decide how to distribute that across physical nodes. @@ -291,7 +384,7 @@ def submit_job(self, container, cpu_count, command): # They are asking for a broker container handle that doesn't exist if container not in self.handles: choices = ",".join(list(self.handles.keys())) - sys.exit( + return self.submit_error( f"Application container handle for {container} does not exist - choices are {choices}." ) @@ -303,17 +396,23 @@ def submit_job(self, container, cpu_count, command): print(f"๐Ÿ™๏ธ Requesting to submit: {' '.join(command)}") jobspec["tasks"][0]["command"] = command + # Add additional system parameters + if duration is not None: + jobspec["attributes"]["system"]["duration"] = duration + if workdir is not None: + jobspec["attributes"]["system"]["cwd"] = workdir + if environment is not None and isinstance(environment, dict): + jobspec["attributes"]["system"]["environment"] = environment + # This asks fluxion if we can schedule it self.cli = FluxionClient(host=self.fluxion_host) response = self.cli.match(json.dumps(jobspec)) if response.status == fluxion_pb2.MatchResponse.ResultType.MATCH_SUCCESS: print("โœ…๏ธ Match of jobspec to Fluxion graph success!") else: - msg = ( + return self.submit_error( f"Issue with match, return code {response.status}, cannot schedule now" ) - print(msg) - return {"error": msg} # We need the exact allocation to pass forward to the container broker alloc = json.loads(response.allocation) @@ -381,8 +480,11 @@ def submit_job(self, container, cpu_count, command): # Wait until it's running (and thus don't submit other jobs) # This assumes running one client to submit, and prevents race jobid = fluxjob.get_id() + print(f"โญ๏ธ Submit job {jobid} to container {container}") + while True: info = flux.job.get_job(handle, jobid) + print(f"Job is in state {info['state']}") # These should be all states that come before running or finished if info["state"] in ["DEPEND", "PRIORITY", "SCHED"]: @@ -432,21 +534,36 @@ def get_parser(): start = subparsers.add_parser( "start", description="initialize and start fluxion (only do this once)!" ) + jobs = subparsers.add_parser( + "jobs", + description="list jobs for a specific application broker", + formatter_class=argparse.RawTextHelpFormatter, + ) submit = subparsers.add_parser( "submit", description="submit a JobSpec for a specific application broker", formatter_class=argparse.RawTextHelpFormatter, ) - submit.add_argument("--container", help="Application container to submit to") + # Submit enforces just one container + for command in [submit, jobs]: + command.add_argument( + "-c", "--container", help="Application container to target", action="append" + ) submit.add_argument("--cpu", help="Total CPU across N nodes to request under slot") + submit.add_argument("--workdir", help="Working directory for application") + submit.add_argument( + "--timeout", + help="Total runtime seconds (timeout) for application, defaults to 3600", + type=int, + ) submit.add_argument( "--host", help="MiniCluster hostname running the service", default="flux-sample-0.flux-service.default.svc.cluster.local:5000", ) - for command in [start, submit]: + for command in [start, submit, jobs]: command.add_argument("--fluxion-host", help="Fluxion service host") command.add_argument( "--resource-dir", help="MiniCluster resource (R) directory" @@ -477,11 +594,22 @@ def main(): if args.command == "start": ctrl.init_fluxion() + elif args.command == "jobs": + ctrl.list_jobs(args.container) + # The submit issues a post to the running server elif args.command == "submit": + if not args.container or len(args.container) > 1: + sys.exit("Submit requires exactly one container.") response = requests.post( f"http://{args.host}/submit", - json={"command": command, "cpu": args.cpu, "container": args.container}, + json={ + "command": command, + "cpu": args.cpu, + "container": args.container[0], + "duration": args.timeout, + "workdir": args.workdir, + }, ) print(response.json()) diff --git a/examples/experimental/multiple-applications-per-pod/minicluster.yaml b/examples/experimental/multiple-applications-per-pod/minicluster.yaml index 8425b492..0d12eecc 100644 --- a/examples/experimental/multiple-applications-per-pod/minicluster.yaml +++ b/examples/experimental/multiple-applications-per-pod/minicluster.yaml @@ -29,7 +29,7 @@ spec: pre: | yum install -y git wget /mnt/flux/view/bin/python3.11 -m ensurepip - /mnt/flux/view/bin/python3.11 -m pip install Flask requests + /mnt/flux/view/bin/python3.11 -m pip install Flask requests rich /mnt/flux/view/bin/python3.11 -m pip install -e "git+https://github.com/converged-computing/fluxion.git#egg=fluxion&subdirectory=python/v1" wget -O /mnt/flux/view/fluxion_controller.py https://raw.githubusercontent.com/flux-framework/flux-operator/multiple-applications-per-pod/examples/experimental/multiple-applications-per-pod/fluxion_controller.py # By the time we get here, the other brokers have started.