Skip to content

Commit

Permalink
reorganize functionality around clients rather than models
Browse files Browse the repository at this point in the history
  • Loading branch information
cmelone committed Jan 25, 2024
1 parent fe2449d commit 7b05c8c
Show file tree
Hide file tree
Showing 12 changed files with 432 additions and 416 deletions.
207 changes: 200 additions & 7 deletions gantry/clients/prometheus.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import json
import logging
import math
import statistics
import urllib.parse

import aiohttp

from gantry.util.spec import spec_variants


class IncompleteData(Exception):
pass
Expand Down Expand Up @@ -97,6 +100,198 @@ def prettify_res(self, response: dict) -> dict:
for result in response["data"]["result"]
]

async def get_job_annotations(self, job_id: int, time: float) -> dict:
"""
args:
job_id: job id
time: when to query (unix timestamp)
returns: dict of annotations
"""

res = await self.query(
type="single",
query={
"metric": "kube_pod_annotations",
"filters": {"annotation_gitlab_ci_job_id": job_id},
},
time=time,
)

if not res:
raise IncompleteData("annotation data is missing")

annotations = res[0]["labels"]

return {
"pod": annotations["pod"],
# if build jobs is not set, defaults to 16 due to spack config
"build_jobs": annotations.get(
"annotation_metrics_spack_job_build_jobs", 16
),
"arch": annotations["annotation_metrics_spack_job_spec_arch"],
"pkg_name": annotations["annotation_metrics_spack_job_spec_pkg_name"],
"pkg_version": annotations["annotation_metrics_spack_job_spec_pkg_version"],
"pkg_variants": json.dumps(
spec_variants(annotations["annotation_metrics_spack_job_spec_variants"])
),
"compiler_name": annotations[
"annotation_metrics_spack_job_spec_compiler_name"
],
"compiler_version": annotations[
"annotation_metrics_spack_job_spec_compiler_version"
],
"stack": annotations["annotation_metrics_spack_ci_stack_name"],
}

async def get_job_resources(self, pod: str, time: float) -> tuple[dict, str]:
"""
args:
job_id: job id
pod: pod name
time: when to query (unix timestamp)
returns: dict of resources and node hostname
"""

requests = process_resources(
await self.query(
type="single",
query={
"metric": "kube_pod_container_resource_requests",
"filters": {"container": "build", "pod": pod},
},
time=time,
)
)

limits_res = await self.query(
type="single",
query={
"metric": "kube_pod_container_resource_limits",
"filters": {"container": "build", "pod": pod},
},
time=time,
)

if not limits_res:
raise IncompleteData("missing limits")

# instead of needing to fetch the node where the pod ran from kube_pod_info
# we can grab it from kube_pod_container_resource_limits
# weirdly, it's not available in kube_pod_labels or annotations
# https://github.com/kubernetes/kube-state-metrics/issues/1148
node = limits_res[0]["labels"]["node"]
limits = process_resources(limits_res)

return (
{
"cpu_request": requests["cpu"]["value"],
"mem_request": requests["memory"]["value"],
"cpu_limit": limits.get("cpu", {}).get("value"),
"mem_limit": limits["memory"]["value"],
},
node,
)

async def get_job_usage(self, pod: str, start: float, end: float) -> dict:
"""
Gets resource usage attributes for a job.
args:
pod: pod name
start: start time (unix timestamp)
end: end time (unix timestamp)
returns: dict of usage stats
"""

mem_usage = process_usage(
await self.query(
type="range",
query={
"metric": "container_memory_working_set_bytes",
"filters": {"container": "build", "pod": pod},
},
start=start,
end=end,
)
)

cpu_usage = process_usage(
await self.query(
type="range",
custom_query=(
f"rate(container_cpu_usage_seconds_total{{"
f"pod='{pod}', container='build'}}[90s])"
),
start=start,
end=end,
)
)

return {
"cpu_mean": cpu_usage["mean"],
"cpu_median": cpu_usage["median"],
"cpu_max": cpu_usage["max"],
"cpu_min": cpu_usage["min"],
"cpu_stddev": cpu_usage["stddev"],
"mem_mean": mem_usage["mean"],
"mem_median": mem_usage["median"],
"mem_max": mem_usage["max"],
"mem_min": mem_usage["min"],
"mem_stddev": mem_usage["stddev"],
}

async def get_node_uuid(self, hostname: str, time: float) -> dict:
"""
args:
hostname: node hostname
time: time to query (unix timestamp)
returns: dict of node info (UUID as of now)
"""

res = await self.query(
type="single",
query={
"metric": "kube_node_info",
"filters": {"node": hostname},
},
time=time,
)

if not res:
raise IncompleteData(f"node info is missing. hostname={hostname}")

return res[0]["labels"]["system_uuid"]

async def get_node_labels(self, hostname: str, time: float) -> dict:
"""
args:
hostname: node hostname
time: time to query (unix timestamp)
returns: dict of node labels
"""

res = await self.query(
type="single",
query={
"metric": "kube_node_labels",
"filters": {"node": hostname},
},
time=time,
)

if not res:
raise IncompleteData(f"node labels are missing. hostname={hostname}")

labels = res[0]["labels"]

return {
"cores": float(labels["label_karpenter_k8s_aws_instance_cpu"]),
"mem": float(labels["label_karpenter_k8s_aws_instance_memory"]),
"arch": labels["label_kubernetes_io_arch"],
"os": labels["label_kubernetes_io_os"],
"instance_type": labels["label_node_kubernetes_io_instance_type"],
}


def query_to_str(metric: str, filters: dict) -> str:
"""
Expand All @@ -107,20 +302,19 @@ def query_to_str(metric: str, filters: dict) -> str:
return f"{metric}{{{filters_str}}}"


def process_resources(res: dict, job_id: int) -> dict:
def process_resources(res: dict) -> dict:
"""
Processes the resource limits and requests from a Prometheus response into
readable format.
args:
res: Prometheus response
job_id: job id for error logging
returns: dict with {resource: {unit: value}} format
"""

if not res:
raise IncompleteData(f"resource data is missing for job {job_id}")
raise IncompleteData("resource data is missing")

processed = {}
for item in res:
Expand All @@ -133,21 +327,20 @@ def process_resources(res: dict, job_id: int) -> dict:
return processed


def process_usage(res: dict, job_id: int) -> dict:
def process_usage(res: dict) -> dict:
"""
Processes the usage data from a Prometheus response into readable format.
This could either be CPU usage or memory usage.
args:
res: Prometheus response
job_id: job id for error logging
returns: dict with {statistic: value} format
"""

if not res:
# sometimes prometheus reports no data for a job if the time range is too small
raise IncompleteData(f"usage data is missing for job {job_id}")
raise IncompleteData("usage data is missing")

usage = [float(value) for timestamp, value in res[0]["values"]]

Expand All @@ -165,6 +358,6 @@ def process_usage(res: dict, job_id: int) -> dict:
or sum_stats["mean"] == 0
or math.isnan(sum_stats["stddev"])
):
raise IncompleteData(f"usage data is invalid for job {job_id}")
raise IncompleteData("usage data is invalid")

return sum_stats
3 changes: 3 additions & 0 deletions gantry/db/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# flake8: noqa
from .get import *
from .insert import *
41 changes: 41 additions & 0 deletions gantry/db/get.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import logging

import aiosqlite


async def get_node(db: aiosqlite.Connection, uuid: str) -> int | None:
"""return the primary key if found, otherwise return None"""

async with db.execute("select id from nodes where uuid = ?", (uuid,)) as cursor:
if cur_node := await cursor.fetchone():
return cur_node[0]

return None


async def job_exists(db: aiosqlite.Connection, job_id: int) -> bool:
"""return if the job exists in the database"""

async with db.execute("select id from jobs where job_id = ?", (job_id,)) as cursor:
if await cursor.fetchone():
logging.warning(
f"""
job {job_id} already in database.
check why multiple requests are being sent.
"""
)
return True

return False


async def ghost_exists(db: aiosqlite.Connection, job_id: int) -> bool:
"""return if the ghost job exists in the database"""

async with db.execute(
"select id from ghost_jobs where job_id = ?", (job_id,)
) as cursor:
if await cursor.fetchone():
return True

return False
69 changes: 69 additions & 0 deletions gantry/db/insert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import aiosqlite

from gantry.db.get import get_node


def insert_dict(table: str, input: dict, ignore=False) -> tuple[str, tuple]:
"""
crafts an sqlite insert statement from a dictionary.
args:
table: name of the table to insert into
input: dictionary of values to insert
ignore: whether to ignore duplicate entries
returns: tuple of (query, values)
"""

columns = ", ".join(input.keys())
values = ", ".join(["?" for _ in range(len(input))])
query = f"INSERT INTO {table} ({columns}) VALUES ({values})"

if ignore:
query = query.replace("INSERT", "INSERT OR IGNORE")

# using a tuple of values from the dictionary
values_tuple = tuple(input.values())
return query, values_tuple


async def insert_ghost(db: aiosqlite.Connection, job_id: int) -> None:
"""Inserts a ghost job into the database."""

await db.execute(("insert into ghost_jobs (name) values (?)"), (job_id,))


async def insert_node(db: aiosqlite.Connection, node: dict) -> int:
"""Inserts a node into the database."""

async with db.execute(
*insert_dict(
"nodes",
node,
# deal with races
ignore=True,
)
) as cursor:
pk = cursor.lastrowid

if pk == 0:
# the ignore part of the query was triggered, some other call
# must have inserted the node before this one
pk = await get_node(db, node["uuid"])

return pk


async def insert_job(db: aiosqlite.Connection, job: dict) -> int:
"""Inserts a job into the database."""

async with db.execute(
*insert_dict(
"jobs",
job,
# if the job somehow gets added into the db (pod+id being unique)
# then ignore the insert
ignore=True,
)
) as cursor:
return cursor.lastrowid
3 changes: 1 addition & 2 deletions gantry/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
# flake8: noqa
from .build import Build
from .vm import VM
from .job import Job
Loading

0 comments on commit 7b05c8c

Please sign in to comment.