Skip to content

Commit

Permalink
adding jobs list to fluxion controller
Browse files Browse the repository at this point in the history
Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed Jul 4, 2024
1 parent 654c07a commit f380ce4
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 29 deletions.
75 changes: 66 additions & 9 deletions examples/experimental/multiple-applications-per-pod/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,21 @@ flux resource list
What we are seeing in the above is the set of resources that need to be shared across the containers (brokers). We don't want to oversubscribe, or for example, tell any specific broker that it can use all the resources while we tell the same to the others. We have to be careful that we use the Python install that is alongside the Flux install. Note that *you should not run this* but I want to show you how the queue was started. You can issue `--help` to see all the options to customize:

```bash
# You can look at help...
/mnt/flux/view/bin/python3.11 /mnt/flux/view/fluxion_controller.py start --help

# This is how it was started using the defaults (do not run this again)
# But DO NOT RUN THIS! It's already running.
# This is how the fluxion controller was started using the defaults (do not run this again)
/mnt/flux/view/bin/python3.11 /mnt/flux/view/fluxion_controller.py start
```

To submit a job, (and you can do this from any of the flux container brokers) - it will be hitting a web service that the Python script is exposing from the queue!
You might want to watch the main fluxion controller in a different terminal before submitting the job:

```bash
kubectl logs flux-sample-0-wxxkp -f
```

Then from your interactive terminal, let's submit a job! To do that (and you can do this from any of the flux container brokers) - it will be hitting a web service that the Python script is exposing from the queue!

```bash
/mnt/flux/view/bin/python3.11 /mnt/flux/view/fluxion_controller.py submit --help
Expand All @@ -181,14 +189,63 @@ And then we see from where we submit:
And from the Fluxion service script:

```console
{'command': ['ior'], 'cpu': '4', 'container': 'ior'}
🙏️ Requesting to submit: ior
✅️ Match of jobspec to Fluxion graph success!
10.244.0.18 - - [01/Jul/2024 23:55:42] "POST /submit HTTP/1.1" 200 -
👉️ Job on ior 1 is complete.
✅️ Cancel of jobid 1 success!
INFO:werkzeug:Press CTRL+C to quit
INFO:fluxion_controller:{'command': ['ior'], 'cpu': '4', 'container': 'ior', 'duration': None, 'workdir': None}
INFO:fluxion_controller:{'t_depend': 1720132156.9946244, 't_run': 1720132157.0078795, 't_cleanup': 1720132157.5837069, 't_inactive': 1720132157.5849578, 'duration': 3600.0, 'expiration': 0.0, 'name': 'ior', 'cwd': '', 'queue': '', 'project': '', 'bank': '', 'ntasks': 1, 'ncores': 4, 'nnodes': 1, 'priority': 16, 'ranks': '3', 'nodelist': 'flux-sample-3', 'success': True, 'result': 'COMPLETED', 'waitstatus': 0, 'id': JobID(13180046671872), 't_submit': 1720132156.9826128, 't_remaining': 0.0, 'state': 'INACTIVE', 'username': 'root', 'userid': 0, 'urgency': 16, 'runtime': 0.5758273601531982, 'status': 'COMPLETED', 'returncode': 0, 'dependencies': [], 'annotations': {}, 'exception': {'occurred': False, 'severity': '', 'type': '', 'note': ''}, 'container': 'ior', 'fluxion': 1}
INFO:werkzeug:10.244.0.27 - - [04/Jul/2024 22:29:18] "POST /submit HTTP/1.1" 200 -
INFO:fluxion_controller:👉️ Job on ior 1 is complete.
INFO:fluxion_controller:✅️ Cancel of jobid 1 success!
```

Let's try submitting a job to the lammps application broker (container) now (note the container working directory has the input files)

```bash
/mnt/flux/view/bin/python3.11 /mnt/flux/view/fluxion_controller.py submit --cpu 4 --container lammps lmp -v x 2 -v y 2 -v z 2 -in in.reaxc.hns -nocite
```

This one actually takes more than a second to run, so wait for that, and you'll eventually see the fluxion detect it is finished and clean up:

```bash
INFO:werkzeug:10.244.0.27 - - [04/Jul/2024 22:30:41] "POST /submit HTTP/1.1" 200 -
INFO:fluxion_controller:👉️ Job on lammps 2 is complete.
INFO:fluxion_controller:✅️ Cancel of jobid 2 success!
```

Let's now list jobs for one container...

```bash
/mnt/flux/view/bin/python3.11 /mnt/flux/view/fluxion_controller.py jobs --container lammps
```
```console
Jobs for Lammps
┏━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━┳━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Container ┃ Id ┃ Name ┃ Status ┃ Nodes ┃ Cores ┃ Runtime ┃ Returncode ┃
┡━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━╇━━━━━━━╇━━━━━━━━━╇━━━━━━━━━━━━┩
│ lammps │ ƒXBNj4c7 │ lmp │ INACTIVE (COMPLETED) │ 1 │ 4 │ 27 │ 0 │
└───────────┴──────────┴──────┴──────────────────────┴───────┴───────┴─────────┴────────────┘
```

Or all containers!


```bash
/mnt/flux/view/bin/python3.11 /mnt/flux/view/fluxion_controller.py jobs
```
```console
⭐️ Found application queue: index 0
⭐️ Found application chatterbug: index 3
⭐️ Found application ior: index 2
⭐️ Found application lammps: index 1
Jobs for Queue, Chatterbug, Ior, Lammps
┏━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━┳━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Container ┃ Id ┃ Name ┃ Status ┃ Nodes ┃ Cores ┃ Runtime ┃ Returncode ┃
┡━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━╇━━━━━━━╇━━━━━━━━━╇━━━━━━━━━━━━┩
│ queue │ ƒ4WEiYej │ python3.11 │ RUN │ 1 │ 1 │ 45 │ │
│ ior │ ƒU1BWHuH │ ior │ INACTIVE (COMPLETED) │ 1 │ 4 │ 0 │ 0 │
│ lammps │ ƒXBNj4c7 │ lmp │ INACTIVE (COMPLETED) │ 1 │ 4 │ 27 │ 0 │
└───────────┴──────────┴────────────┴──────────────────────┴───────┴───────┴─────────┴────────────┘
```

I am calling this "pancake elasticity" since we can theoretically deploy many application containers and then use them when needed, essentially expanding the one running out (resource wise) while the others remain flat (not using resources). This isn't entirely ready yet (still testing) but a lot of the automation is in place.
Lammps will show up like that when it is finished. I am calling this "pancake elasticity" since we can theoretically deploy many application containers and then use them when needed, essentially expanding the one running out (resource wise) while the others remain flat (not using resources). This isn't entirely ready yet (still testing) but a lot of the automation is in place.

It's so super cool!! :D This is going to likely inspire the next round of work for thinking about scheduling and fluxion.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@
import argparse
import requests
import threading
import logging
from flask import Flask, jsonify, request

from rich.console import Console
from rich.table import Table


try:
import flux
import flux.job
Expand All @@ -28,6 +33,8 @@
ctrl = None
app = Flask(__name__)

logging.basicConfig(level=logging.DEBUG)


@app.route("/submit", methods=["POST"])
def submit_job():
Expand All @@ -37,12 +44,19 @@ def submit_job():
global ctrl

data = request.get_json()
print(data)
app.logger.info(data)
for required in ["command", "cpu", "container"]:
if required not in data or not data[required]:
return jsonify({"error": f"{required} is required."})

response = ctrl.submit_job(data["container"], data["cpu"], data["command"])
response = ctrl.submit_job(
data["container"],
data["cpu"],
data["command"],
workdir=data["workdir"],
duration=data["duration"],
)
app.logger.info(response)
return jsonify(response)


Expand Down Expand Up @@ -133,12 +147,11 @@ def __init__(
def populate_jobs(self):
"""
Given running queues, populate with current jobs
TBA: this will handle restoring from shutdown state.
Not supported yet.
"""
pass
# TODO how do we do this? We essentially need to restore state
import IPython

IPython.embed()

def discover_containers(self):
"""
Expand Down Expand Up @@ -249,7 +262,9 @@ def run(self):
# Get the status of the job from the handle
info = flux.job.get_job(handle, jobset["container"])
if info["result"] == "COMPLETED":
print(f"👉️ Job on {container} {jobset['fluxion']} is complete.")
app.logger.info(
f"👉️ Job on {container} {jobset['fluxion']} is complete."
)
self.cancel(jobset["fluxion"])
continue
# Otherwise add back to jobs set
Expand All @@ -267,16 +282,94 @@ def cancel(self, jobid):
try:
response = self.cli.cancel(jobid=jobid)
if response.status == fluxion_pb2.CancelResponse.ResultType.CANCEL_SUCCESS:
print(f"✅️ Cancel of jobid {jobid} success!")
app.logger.info(f"✅️ Cancel of jobid {jobid} success!")
else:
print(f"Issue with cancel, return code {response.status}")
app.logger.info(f"Issue with cancel, return code {response.status}")
except:
print(f"✅️ jobid {jobid} is already inactive.")
app.logger.info(f"✅️ jobid {jobid} is already inactive.")

def submit_error(self, message):
"""
Given a message, print (for the developer log) and return as json
"""
print(message)
return {"error": message}

def list_jobs(self, containers):
"""
List jobs for one or more containers
"""
if not containers:
containers = list(self.handles.keys())
if not containers:
sys.exit(
"One or more application target containers are required (--container)"
)

# Create a pretty table!
names = ", ".join(x.capitalize() for x in containers)
table = Table(title=f"Jobs for {names}")

# These are the header columns
table.add_column("Container", justify="right", style="cyan", no_wrap=True)
table.add_column("Id", style="magenta")
table.add_column("Name", style="magenta")
table.add_column("Status", style="magenta")
table.add_column("Nodes", style="magenta")
table.add_column("Cores", style="magenta")
table.add_column("Runtime", style="magenta")
table.add_column("Returncode", justify="right", style="green")

# They are asking for a broker container handle that doesn't exist
for container in containers:
table = self.list_container_jobs(container, table)

console = Console()
console.print(table)

def list_container_jobs(self, container, table):
"""
List jobs for a single container, adding to a single table
"""
# Allow failure and continue
if container not in self.handles:
print(f"Application container handle for {container} does not exist.")
return

# Our broker hook to the container
handle = self.handles[container]
jobs = flux.job.job_list(handle).get()["jobs"]

for info in jobs:
job = flux.job.get_job(handle, info["id"])
status = f"{job['state']} ({job['status']})"
if job["status"] == job["state"]:
status = job["state"]
runtime = str(int(job["runtime"]))
jobid = str(job["id"])
table.add_row(
container,
jobid,
job["name"],
status,
str(job["nnodes"]),
str(job["ncores"]),
runtime,
str(job["returncode"]),
)
return table

def submit_job(self, container, cpu_count, command):
def submit_job(
self,
container,
cpu_count,
command,
workdir=None,
duration=None,
environment=None,
):
"""
Demo of submitting a job. We will want a more robust way to do this.
TODO: add working directory, duration, environment, etc.
This currently just asks for the command and total cores across nodes.
We let fluxion decide how to distribute that across physical nodes.
Expand All @@ -291,7 +384,7 @@ def submit_job(self, container, cpu_count, command):
# They are asking for a broker container handle that doesn't exist
if container not in self.handles:
choices = ",".join(list(self.handles.keys()))
sys.exit(
return self.submit_error(
f"Application container handle for {container} does not exist - choices are {choices}."
)

Expand All @@ -303,17 +396,23 @@ def submit_job(self, container, cpu_count, command):
print(f"🙏️ Requesting to submit: {' '.join(command)}")
jobspec["tasks"][0]["command"] = command

# Add additional system parameters
if duration is not None:
jobspec["attributes"]["system"]["duration"] = duration
if workdir is not None:
jobspec["attributes"]["system"]["cwd"] = workdir
if environment is not None and isinstance(environment, dict):
jobspec["attributes"]["system"]["environment"] = environment

# This asks fluxion if we can schedule it
self.cli = FluxionClient(host=self.fluxion_host)
response = self.cli.match(json.dumps(jobspec))
if response.status == fluxion_pb2.MatchResponse.ResultType.MATCH_SUCCESS:
print("✅️ Match of jobspec to Fluxion graph success!")
else:
msg = (
return self.submit_error(
f"Issue with match, return code {response.status}, cannot schedule now"
)
print(msg)
return {"error": msg}

# We need the exact allocation to pass forward to the container broker
alloc = json.loads(response.allocation)
Expand Down Expand Up @@ -381,8 +480,11 @@ def submit_job(self, container, cpu_count, command):
# Wait until it's running (and thus don't submit other jobs)
# This assumes running one client to submit, and prevents race
jobid = fluxjob.get_id()
print(f"⭐️ Submit job {jobid} to container {container}")

while True:
info = flux.job.get_job(handle, jobid)
print(f"Job is in state {info['state']}")

# These should be all states that come before running or finished
if info["state"] in ["DEPEND", "PRIORITY", "SCHED"]:
Expand Down Expand Up @@ -432,21 +534,36 @@ def get_parser():
start = subparsers.add_parser(
"start", description="initialize and start fluxion (only do this once)!"
)
jobs = subparsers.add_parser(
"jobs",
description="list jobs for a specific application broker",
formatter_class=argparse.RawTextHelpFormatter,
)
submit = subparsers.add_parser(
"submit",
description="submit a JobSpec for a specific application broker",
formatter_class=argparse.RawTextHelpFormatter,
)

submit.add_argument("--container", help="Application container to submit to")
# Submit enforces just one container
for command in [submit, jobs]:
command.add_argument(
"-c", "--container", help="Application container to target", action="append"
)
submit.add_argument("--cpu", help="Total CPU across N nodes to request under slot")
submit.add_argument("--workdir", help="Working directory for application")
submit.add_argument(
"--timeout",
help="Total runtime seconds (timeout) for application, defaults to 3600",
type=int,
)
submit.add_argument(
"--host",
help="MiniCluster hostname running the service",
default="flux-sample-0.flux-service.default.svc.cluster.local:5000",
)

for command in [start, submit]:
for command in [start, submit, jobs]:
command.add_argument("--fluxion-host", help="Fluxion service host")
command.add_argument(
"--resource-dir", help="MiniCluster resource (R) directory"
Expand Down Expand Up @@ -477,11 +594,22 @@ def main():
if args.command == "start":
ctrl.init_fluxion()

elif args.command == "jobs":
ctrl.list_jobs(args.container)

# The submit issues a post to the running server
elif args.command == "submit":
if not args.container or len(args.container) > 1:
sys.exit("Submit requires exactly one container.")
response = requests.post(
f"http://{args.host}/submit",
json={"command": command, "cpu": args.cpu, "container": args.container},
json={
"command": command,
"cpu": args.cpu,
"container": args.container[0],
"duration": args.timeout,
"workdir": args.workdir,
},
)
print(response.json())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ spec:
pre: |
yum install -y git wget
/mnt/flux/view/bin/python3.11 -m ensurepip
/mnt/flux/view/bin/python3.11 -m pip install Flask requests
/mnt/flux/view/bin/python3.11 -m pip install Flask requests rich
/mnt/flux/view/bin/python3.11 -m pip install -e "git+https://github.com/converged-computing/fluxion.git#egg=fluxion&subdirectory=python/v1"
wget -O /mnt/flux/view/fluxion_controller.py https://raw.githubusercontent.com/flux-framework/flux-operator/multiple-applications-per-pod/examples/experimental/multiple-applications-per-pod/fluxion_controller.py
# By the time we get here, the other brokers have started.
Expand Down

0 comments on commit f380ce4

Please sign in to comment.