Skip to content

Commit

Permalink
Add utility scripts for prototyping (#90)
Browse files Browse the repository at this point in the history
* Add utility scripts for prototyping

- gen_mappings: creates current_mapping.py, a fallback of resource requests for individual packages
- simulate_predict: end-to-end test of the prediction functionality of gantry, outputs the accuracy of the model
- bulk_connect: takes the last N jobs completed in gitlab and sends them to gantry, simulating a webhook POST

all files contain instructions for use

* add job stage to webhook simulator

* add instructions for generating pipeline locally

* rename folder
  • Loading branch information
cmelone authored Sep 16, 2024
1 parent f946b56 commit 980a522
Show file tree
Hide file tree
Showing 5 changed files with 347 additions and 0 deletions.
63 changes: 63 additions & 0 deletions dev/bulk_collect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# simulates a bulk sending of webhooks to gantry

import asyncio
import os
from datetime import datetime

import aiohttp

# how many jobs to retrieve from gitlab (100 per page)
NUM_GL_PAGES = 3
GANTRY_URL = "http://localhost:8080"


async def get_gitlab_jobs():
headers = {"PRIVATE-TOKEN": os.environ["GITLAB_API_TOKEN"]}
responses = []

for i in range(1, NUM_GL_PAGES + 1):
async with aiohttp.ClientSession() as session:
async with session.get(
f"https://gitlab.spack.io/api/v4/projects/2/jobs?page={i}&per_page=100",
headers=headers,
) as response:
resp = await response.json()
responses += resp
return responses


async def webhook(job):
headers = {
"X-Gitlab-Event": "Job Hook",
"X-Gitlab-Token": os.environ["GITLAB_WEBHOOK_TOKEN"],
}
job["build_status"] = job["status"]
job["build_name"] = job["name"]
job["build_id"] = job["id"]
job["build_stage"] = job["stage"]
if job["started_at"] is None or job["finished_at"] is None:
return
else:
# weirdly gitlab job api datetime format is different from the webhook format
dt = datetime.strptime(job["started_at"], "%Y-%m-%dT%H:%M:%S.%fZ")
job["build_started_at"] = dt.strftime("%Y-%m-%d %H:%M:%S UTC")
dt = datetime.strptime(job["finished_at"], "%Y-%m-%dT%H:%M:%S.%fZ")
job["build_finished_at"] = dt.strftime("%Y-%m-%d %H:%M:%S UTC")

async with aiohttp.ClientSession() as session:
async with session.post(
f"{GANTRY_URL}/v1/collect", headers=headers, json=job
) as response:
if response.status != 200:
print(f"failed to send job {job['id']} to gantry: {response.status}")


async def main():
print("retrieving list of jobs...")
jobs = await get_gitlab_jobs()
for job in jobs:
await webhook(job)


if __name__ == "__main__":
asyncio.run(main())
112 changes: 112 additions & 0 deletions dev/gen_mappings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# this script generates the (old) default resource mappings for the spackages in CI
# these values are used as a fallback source for the prediction model
# current_mapping.py will be created at OUTPUT
# be sure SPACK_ROOT is set and you are on the latest commit of develop

import os
from decimal import Decimal, InvalidOperation

import yaml

GANTRY_PATH = "/path/to/gantry"
OUTPUT = f"{GANTRY_PATH}/gantry/routes/prediction/current_mapping.py"


# https://github.com/kubernetes-client/python/blob/master/kubernetes/utils/quantity.py
# Apache License 2.0
def parse_quantity(quantity):
"""
Parse kubernetes canonical form quantity like 200Mi to a decimal number.
Supported SI suffixes:
base1024: Ki | Mi | Gi | Ti | Pi | Ei
base1000: n | u | m | "" | k | M | G | T | P | E
Input:
quantity: string. kubernetes canonical form quantity
Returns:
Decimal
Raises:
ValueError on invalid or unknown input
"""
if isinstance(quantity, (int, float, Decimal)):
return Decimal(quantity)

exponents = {
"n": -3,
"u": -2,
"m": -1,
"K": 1,
"k": 1,
"M": 2,
"G": 3,
"T": 4,
"P": 5,
"E": 6,
}

quantity = str(quantity)
number = quantity
suffix = None
if len(quantity) >= 2 and quantity[-1] == "i":
if quantity[-2] in exponents:
number = quantity[:-2]
suffix = quantity[-2:]
elif len(quantity) >= 1 and quantity[-1] in exponents:
number = quantity[:-1]
suffix = quantity[-1:]

try:
number = Decimal(number)
except InvalidOperation:
raise ValueError("Invalid number format: {}".format(number))

if suffix is None:
return number

if suffix.endswith("i"):
base = 1024
elif len(suffix) == 1:
base = 1000
else:
raise ValueError("{} has unknown suffix".format(quantity))

# handle SI inconsistency
if suffix == "ki":
raise ValueError("{} has unknown suffix".format(quantity))

if suffix[0] not in exponents:
raise ValueError("{} has unknown suffix".format(quantity))

exponent = Decimal(exponents[suffix[0]])
return number * (base**exponent)


with open(
f"{os.environ['SPACK_ROOT']}/share/spack/gitlab/cloud_pipelines/configs/linux/"
"ci.yaml"
) as f:
data = yaml.safe_load(f)

pkg_mappings = {}

for mapping in data["ci"]["pipeline-gen"][1]["submapping"]:
vars = mapping["build-job"]["variables"]

for pkg in mapping["match"]:
pkg_mappings[pkg] = {
"build_jobs": int(vars["SPACK_BUILD_JOBS"]),
"cpu_request": float(parse_quantity(vars["KUBERNETES_CPU_REQUEST"])),
"mem_request": float(parse_quantity(vars["KUBERNETES_MEMORY_REQUEST"])),
}

# write the values of job_sizes and pkg_mappings to a file

# does not preserve old file contents
with open(OUTPUT, "w") as f:
f.write("# fmt: off\n")
f.write("# flake8: noqa\n")
f.write(f"pkg_mappings = {pkg_mappings}\n")

print(f"Updated {OUTPUT} with new job sizes and package mappings")
18 changes: 18 additions & 0 deletions dev/generate_local/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Testing CI Generation Locally

If you want to test Gantry's prediction functionality on your local machine rather than needing an entire K8s cluster, it can be done in a few commands:

```bash
cd gantry/dev/generate_local
spack env activate .
mkdir mirror && spack mirror create -d mirror zlib
# add local mirror to spack.yaml
spack mirror add local_filesystem file://$PWD/mirror
spack ci generate --output-file pipeline.yaml --artifacts-root tmp
```

As you can see in `spack.yaml`, we're using `zlib` as an example spec to predict usage. This can be changed if you'd prefer to experiment with a heavier build.

The default Gantry endpoint is `localhost:8080`, so be sure to start your server and point `DB_FILE` to a database with sufficient training data. If you're on a Mac, the AWS data likely doesn't have any samples with `apple-clang` as a compiler, so you might have to update the database with `compiler_name='apple-clang'` (and the corresponding version) to get predictions.

Once the generate process is done, you can see in `pipeline.yaml` that `KUBERNETES*` variables were set.
16 changes: 16 additions & 0 deletions dev/generate_local/spack.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
spack:
view: false
specs:
- zlib
ci:
pipeline-gen:
- dynamic-mapping:
endpoint: http://0.0.0.0:8080/v1/allocation
timeout: 1
verify_ssl: false
attributes:
allow:
- variables
require:
- variables

138 changes: 138 additions & 0 deletions dev/simulate_predict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# setup:
# 1. copy the latest gantry db from s3 to a directory, export it to DB_FILE
# 2. copy the file to a different path and set DB_FILE in a different terminal session

# choose how many jobs to predict and set JOBS_TO_PREDICT
# in db #2, run `delete from jobs order by id desc limit JOBS_TO_PREDICT;`
# to ensure that we aren't predicting from our training data
# then run the gantry web server `python -m gantry`

# run this script to simulate the prediction of the last JOBS_TO_PREDICT jobs
# the script will print the average of the memory and cpu ratios of the predictions

import asyncio
import json
import os
from urllib.parse import quote

import aiohttp
import aiosqlite

JOBS_TO_PREDICT = 4000
GANTRY_URL = "http://localhost:8080"


def dict_to_spec(variants: dict) -> str:
"""Given a dict in name: value format, return a spec's concrete variants string."""
spec_parts = []

for name, value in variants.items():
if isinstance(value, bool):
# convert True/False to +/~ notation
if value:
spec_parts.append(f"+{name}")
else:
spec_parts.append(f"~{name}")
elif isinstance(value, list):
# join lists with commas
spec_parts.append(f"{name}={','.join(value)}")
else:
# add name=value pairs
spec_parts.append(f"{name}={value}")

# join all parts into a single string with no space between `+` or `~` prefixes
return " ".join(spec_parts).replace(" +", "+").replace(" ~", "~")


async def get_jobs():
db = await aiosqlite.connect(os.environ["DB_FILE"])

async with db.execute(
f"select * from jobs order by id desc limit {JOBS_TO_PREDICT}"
) as cursor:
jobs = await cursor.fetchall()

await db.close()
return jobs


async def predict(job, session):
# can delete from here if you don't need
j = {
"id": job[0],
"pod": job[1],
"node": job[2],
"start": job[3],
"end": job[4],
"gitlab_id": job[5],
"job_status": job[6],
"ref": job[7],
"pkg_name": job[8],
"pkg_version": job[9],
"pkg_variants": job[10],
"compiler_name": job[11],
"compiler_version": job[12],
"arch": job[13],
"stack": job[14],
"build_jobs": job[15],
"cpu_request": job[16],
"cpu_limit": job[17],
"cpu_mean": job[18],
"cpu_median": job[19],
"cpu_max": job[20],
"cpu_min": job[21],
"cpu_stddev": job[22],
"mem_request": job[23],
"mem_limit": job[24],
"mem_mean": job[25],
"mem_median": job[26],
"mem_max": job[27],
"mem_min": job[28],
"mem_stddev": job[29],
}

# json variants to spec format that gantry understands
var = dict_to_spec(json.loads(j["pkg_variants"]))

spec = (
f"{j['pkg_name']}@{j['pkg_version']} {var}%{j['compiler_name']}"
f"@{j['compiler_version']}"
)

async with session.get(f"{GANTRY_URL}/v1/allocation?spec={quote(spec)}") as resp:
re = await resp.text()
try:
re = json.loads(re)
except Exception:
print(f"error: {re} for spec {spec}")
exit()

mem_prediction = re["variables"]["KUBERNETES_MEMORY_REQUEST"]
# remove M from the end i.e 200M -> 200
mem_prediction = int(mem_prediction[:-1])
cpu_prediction = float(re["variables"]["KUBERNETES_CPU_REQUEST"])

mem_usage = j["mem_mean"] / 1_000_000 # bytes to MB

mem_ratio = (mem_usage) / mem_prediction
cpu_ratio = j["cpu_mean"] / cpu_prediction

return mem_ratio, cpu_ratio


async def main():
jobs = await get_jobs()
mem_preds = []
cpu_preds = []
async with aiohttp.ClientSession() as session:
for job in jobs:
pred = await predict(job, session)
mem_preds.append(pred[0])
cpu_preds.append(pred[1])

print(f"average memory ratio: {sum(mem_preds)/len(mem_preds)}")
print(f"average cpu ratio: {sum(cpu_preds)/len(cpu_preds)}")


if __name__ == "__main__":
asyncio.run(main())

0 comments on commit 980a522

Please sign in to comment.