Skip to content

Commit

Permalink
Collection API (#3)
Browse files Browse the repository at this point in the history
* Add basic GitHub Actions CI

* rough draft of collection functionality

* Revert "Add basic GitHub Actions CI"

This reverts commit 98586b3.

* line breaks

* improvements to collection

* aiohttp server basics

* refactoring of collection

* isort

* don't depend on dotenv for .env sourcing

Co-authored-by: Alec Scott <[email protected]>

* add stack

* restructure how clients are initialized

* reorganize files into clients/models/routes

* decouple spec utility functions from misc.py

* rename vm: node build: job

* reorganize functionality around clients rather than models

* job_id -> gitlab_id

* make prometheus client more modular

* lessen fatality of not receiving the right hook

* black

* no need to store ghost jobs as they are being collected by KW

* don't try to collect UO-ran jobs [ci skip]

* remove tests [ci skip]

* import clients individually [ci skip]

* version the API [ci skip]

* break up PrometheusClient.query into query_single and query_range [ci skip]

* fix prometheus client types

* fix flake8

---------

Co-authored-by: Alec Scott <[email protected]>
  • Loading branch information
cmelone and alecbcs authored Feb 12, 2024
1 parent 06161a1 commit 89353a1
Show file tree
Hide file tree
Showing 23 changed files with 894 additions and 1 deletion.
17 changes: 17 additions & 0 deletions .envrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#------------------------------------------------------------------------
# Load Development Spack Environment (If Spack is installed.)
#
# Run 'direnv allow' from within the cloned repository to automatically
# load the spack environment when you enter the directory.
#------------------------------------------------------------------------
if type spack &>/dev/null; then
. $SPACK_ROOT/share/spack/setup-env.sh
spack env activate -d .
fi

#------------------------------------------------------------------------
# Load Environment Variables from .env (if files exists)
#------------------------------------------------------------------------
if [ -e .env ]; then
source .env
fi
3 changes: 3 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[flake8]
max-line-length = 88
extend-ignore = E203, E704
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
__pycache__
.env
spack.lock
.spack-env
db/*.db
47 changes: 47 additions & 0 deletions db/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
CREATE TABLE nodes (
id INTEGER PRIMARY KEY,
uuid TEXT NOT NULL UNIQUE,
hostname TEXT NOT NULL,
cores REAL NOT NULL,
mem REAL NOT NULL,
arch TEXT NOT NULL,
os TEXT NOT NULL,
instance_type TEXT NOT NULL
);

CREATE TABLE jobs (
id INTEGER PRIMARY KEY,
pod TEXT NOT NULL UNIQUE,
node INTEGER NOT NULL,
start INTEGER NOT NULL,
end INTEGER NOT NULL,
gitlab_id INTEGER NOT NULL UNIQUE,
job_status TEXT NOT NULL,
ref TEXT NOT NULL,
pkg_name TEXT NOT NULL,
pkg_version TEXT NOT NULL,
pkg_variants TEXT NOT NULL,
compiler_name TEXT NOT NULL,
compiler_version TEXT NOT NULL,
arch TEXT NOT NULL,
stack TEXT NOT NULL,
build_jobs INTEGER NOT NULL,
cpu_request REAL NOT NULL,
cpu_limit REAL, -- this can be null becasue it's currently not set
cpu_mean REAL NOT NULL,
cpu_median REAL NOT NULL,
cpu_max REAL NOT NULL,
cpu_min REAL NOT NULL,
cpu_stddev REAL NOT NULL,
mem_request REAL NOT NULL,
mem_limit REAL NOT NULL,
mem_mean REAL NOT NULL,
mem_median REAL NOT NULL,
mem_max REAL NOT NULL,
mem_min REAL NOT NULL,
mem_stddev REAL NOT NULL,
FOREIGN KEY (node)
REFERENCES nodes (id)
ON UPDATE CASCADE
ON DELETE CASCADE
);
33 changes: 32 additions & 1 deletion gantry/__main__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,36 @@
import os

import aiosqlite
from aiohttp import web

from gantry.clients.gitlab import GitlabClient
from gantry.clients.prometheus import PrometheusClient
from gantry.views import routes


async def init_db(app: web.Application):
db = await aiosqlite.connect(os.environ["DB_FILE"])
await db.execute("PRAGMA foreign_keys = ON;")
app["db"] = db
yield
await db.close()


async def init_clients(app: web.Application):
app["gitlab"] = GitlabClient(
os.environ["GITLAB_URL"], os.environ["GITLAB_API_TOKEN"]
)
app["prometheus"] = PrometheusClient(
os.environ["PROMETHEUS_URL"], os.environ.get("PROMETHEUS_COOKIE", "")
)


def main():
print("Hello World")
app = web.Application()
app.add_routes(routes)
app.cleanup_ctx.append(init_db)
app.on_startup.append(init_clients)
web.run_app(app)


if __name__ == "__main__":
Expand Down
Empty file added gantry/clients/__init__.py
Empty file.
31 changes: 31 additions & 0 deletions gantry/clients/gitlab.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import aiohttp


class GitlabClient:
def __init__(self, base_url: str, api_token: str):
self.base_url = base_url
self.headers = {"PRIVATE-TOKEN": api_token}

async def _request(self, url: str, response_type: str) -> dict | str:
"""
Helper for requests to the Gitlab API.
args:
url: the url to request
response_type: the type of response to expect (json or text)
returns: the response from Gitlab in the specified format
"""

async with aiohttp.ClientSession(raise_for_status=True) as session:
async with session.get(url, headers=self.headers) as resp:
if response_type == "json":
return await resp.json()
if response_type == "text":
return await resp.text()

async def job_log(self, gl_id: int) -> str:
"""Given a job id, returns the log from that job"""

url = f"{self.base_url}/jobs/{gl_id}/trace"
return await self._request(url, "text")
2 changes: 2 additions & 0 deletions gantry/clients/prometheus/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# flake8: noqa
from .prometheus import PrometheusClient
143 changes: 143 additions & 0 deletions gantry/clients/prometheus/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import json

from gantry.clients.prometheus import util
from gantry.util.spec import spec_variants


class PrometheusJobClient:
def __init__(self, client):
self.client = client

async def get_annotations(self, gl_id: int, time: float) -> dict:
"""
args:
gl_id: gitlab job id
time: when to query (unix timestamp)
returns: dict of annotations
"""

res = await self.client.query_single(
query={
"metric": "kube_pod_annotations",
"filters": {"annotation_gitlab_ci_job_id": gl_id},
},
time=time,
)

if not res:
raise util.IncompleteData("annotation data is missing")

annotations = res[0]["labels"]

return {
"pod": annotations["pod"],
# if build jobs is not set, defaults to 16 due to spack config
"build_jobs": annotations.get(
"annotation_metrics_spack_job_build_jobs", 16
),
"arch": annotations["annotation_metrics_spack_job_spec_arch"],
"pkg_name": annotations["annotation_metrics_spack_job_spec_pkg_name"],
"pkg_version": annotations["annotation_metrics_spack_job_spec_pkg_version"],
"pkg_variants": json.dumps(
spec_variants(annotations["annotation_metrics_spack_job_spec_variants"])
),
"compiler_name": annotations[
"annotation_metrics_spack_job_spec_compiler_name"
],
"compiler_version": annotations[
"annotation_metrics_spack_job_spec_compiler_version"
],
"stack": annotations["annotation_metrics_spack_ci_stack_name"],
}

async def get_resources(self, pod: str, time: float) -> tuple[dict, str]:
"""
args:
pod: pod name
time: when to query (unix timestamp)
returns: dict of resources and node hostname
"""

requests = util.process_resources(
await self.client.query_single(
query={
"metric": "kube_pod_container_resource_requests",
"filters": {"container": "build", "pod": pod},
},
time=time,
)
)

limits_res = await self.client.query_single(
query={
"metric": "kube_pod_container_resource_limits",
"filters": {"container": "build", "pod": pod},
},
time=time,
)

if not limits_res:
raise util.IncompleteData("missing limits")

# instead of needing to fetch the node where the pod ran from kube_pod_info
# we can grab it from kube_pod_container_resource_limits
# weirdly, it's not available in kube_pod_labels or annotations
# https://github.com/kubernetes/kube-state-metrics/issues/1148
node = limits_res[0]["labels"]["node"]
limits = util.process_resources(limits_res)

return (
{
"cpu_request": requests["cpu"]["value"],
"mem_request": requests["memory"]["value"],
"cpu_limit": limits.get("cpu", {}).get("value"),
"mem_limit": limits["memory"]["value"],
},
node,
)

async def get_usage(self, pod: str, start: float, end: float) -> dict:
"""
Gets resource usage attributes for a job.
args:
pod: pod name
start: start time (unix timestamp)
end: end time (unix timestamp)
returns: dict of usage stats
"""

mem_usage = util.process_usage(
await self.client.query_range(
query={
"metric": "container_memory_working_set_bytes",
"filters": {"container": "build", "pod": pod},
},
start=start,
end=end,
)
)

cpu_usage = util.process_usage(
await self.client.query_range(
query=(
f"rate(container_cpu_usage_seconds_total{{"
f"pod='{pod}', container='build'}}[90s])"
),
start=start,
end=end,
)
)

return {
"cpu_mean": cpu_usage["mean"],
"cpu_median": cpu_usage["median"],
"cpu_max": cpu_usage["max"],
"cpu_min": cpu_usage["min"],
"cpu_stddev": cpu_usage["stddev"],
"mem_mean": mem_usage["mean"],
"mem_median": mem_usage["median"],
"mem_max": mem_usage["max"],
"mem_min": mem_usage["min"],
"mem_stddev": mem_usage["stddev"],
}
56 changes: 56 additions & 0 deletions gantry/clients/prometheus/node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from gantry.clients.prometheus import util


class PrometheusNodeClient:
def __init__(self, client):
self.client = client

async def get_uuid(self, hostname: str, time: float) -> dict:
"""
args:
hostname: node hostname
time: time to query (unix timestamp)
returns: dict of node info (UUID as of now)
"""

res = await self.client.query_single(
query={
"metric": "kube_node_info",
"filters": {"node": hostname},
},
time=time,
)

if not res:
raise util.IncompleteData(f"node info is missing. hostname={hostname}")

return res[0]["labels"]["system_uuid"]

async def get_labels(self, hostname: str, time: float) -> dict:
"""
args:
hostname: node hostname
time: time to query (unix timestamp)
returns: dict of node labels
"""

res = await self.client.query_single(
query={
"metric": "kube_node_labels",
"filters": {"node": hostname},
},
time=time,
)

if not res:
raise util.IncompleteData(f"node labels are missing. hostname={hostname}")

labels = res[0]["labels"]

return {
"cores": float(labels["label_karpenter_k8s_aws_instance_cpu"]),
"mem": float(labels["label_karpenter_k8s_aws_instance_memory"]),
"arch": labels["label_kubernetes_io_arch"],
"os": labels["label_kubernetes_io_os"],
"instance_type": labels["label_node_kubernetes_io_instance_type"],
}
Loading

0 comments on commit 89353a1

Please sign in to comment.