diff --git a/gantry/clients/prometheus.py b/gantry/clients/prometheus.py index a3db981..720c8cc 100644 --- a/gantry/clients/prometheus.py +++ b/gantry/clients/prometheus.py @@ -1,3 +1,4 @@ +import json import logging import math import statistics @@ -5,6 +6,8 @@ import aiohttp +from gantry.util.spec import spec_variants + class IncompleteData(Exception): pass @@ -97,6 +100,198 @@ def prettify_res(self, response: dict) -> dict: for result in response["data"]["result"] ] + async def get_job_annotations(self, job_id: int, time: float) -> dict: + """ + args: + job_id: job id + time: when to query (unix timestamp) + returns: dict of annotations + """ + + res = await self.query( + type="single", + query={ + "metric": "kube_pod_annotations", + "filters": {"annotation_gitlab_ci_job_id": job_id}, + }, + time=time, + ) + + if not res: + raise IncompleteData("annotation data is missing") + + annotations = res[0]["labels"] + + return { + "pod": annotations["pod"], + # if build jobs is not set, defaults to 16 due to spack config + "build_jobs": annotations.get( + "annotation_metrics_spack_job_build_jobs", 16 + ), + "arch": annotations["annotation_metrics_spack_job_spec_arch"], + "pkg_name": annotations["annotation_metrics_spack_job_spec_pkg_name"], + "pkg_version": annotations["annotation_metrics_spack_job_spec_pkg_version"], + "pkg_variants": json.dumps( + spec_variants(annotations["annotation_metrics_spack_job_spec_variants"]) + ), + "compiler_name": annotations[ + "annotation_metrics_spack_job_spec_compiler_name" + ], + "compiler_version": annotations[ + "annotation_metrics_spack_job_spec_compiler_version" + ], + "stack": annotations["annotation_metrics_spack_ci_stack_name"], + } + + async def get_job_resources(self, pod: str, time: float) -> tuple[dict, str]: + """ + args: + job_id: job id + pod: pod name + time: when to query (unix timestamp) + returns: dict of resources and node hostname + """ + + requests = process_resources( + await self.query( + type="single", + query={ + "metric": "kube_pod_container_resource_requests", + "filters": {"container": "build", "pod": pod}, + }, + time=time, + ) + ) + + limits_res = await self.query( + type="single", + query={ + "metric": "kube_pod_container_resource_limits", + "filters": {"container": "build", "pod": pod}, + }, + time=time, + ) + + if not limits_res: + raise IncompleteData("missing limits") + + # instead of needing to fetch the node where the pod ran from kube_pod_info + # we can grab it from kube_pod_container_resource_limits + # weirdly, it's not available in kube_pod_labels or annotations + # https://github.com/kubernetes/kube-state-metrics/issues/1148 + node = limits_res[0]["labels"]["node"] + limits = process_resources(limits_res) + + return ( + { + "cpu_request": requests["cpu"]["value"], + "mem_request": requests["memory"]["value"], + "cpu_limit": limits.get("cpu", {}).get("value"), + "mem_limit": limits["memory"]["value"], + }, + node, + ) + + async def get_job_usage(self, pod: str, start: float, end: float) -> dict: + """ + Gets resource usage attributes for a job. + + args: + pod: pod name + start: start time (unix timestamp) + end: end time (unix timestamp) + returns: dict of usage stats + """ + + mem_usage = process_usage( + await self.query( + type="range", + query={ + "metric": "container_memory_working_set_bytes", + "filters": {"container": "build", "pod": pod}, + }, + start=start, + end=end, + ) + ) + + cpu_usage = process_usage( + await self.query( + type="range", + custom_query=( + f"rate(container_cpu_usage_seconds_total{{" + f"pod='{pod}', container='build'}}[90s])" + ), + start=start, + end=end, + ) + ) + + return { + "cpu_mean": cpu_usage["mean"], + "cpu_median": cpu_usage["median"], + "cpu_max": cpu_usage["max"], + "cpu_min": cpu_usage["min"], + "cpu_stddev": cpu_usage["stddev"], + "mem_mean": mem_usage["mean"], + "mem_median": mem_usage["median"], + "mem_max": mem_usage["max"], + "mem_min": mem_usage["min"], + "mem_stddev": mem_usage["stddev"], + } + + async def get_node_uuid(self, hostname: str, time: float) -> dict: + """ + args: + hostname: node hostname + time: time to query (unix timestamp) + returns: dict of node info (UUID as of now) + """ + + res = await self.query( + type="single", + query={ + "metric": "kube_node_info", + "filters": {"node": hostname}, + }, + time=time, + ) + + if not res: + raise IncompleteData(f"node info is missing. hostname={hostname}") + + return res[0]["labels"]["system_uuid"] + + async def get_node_labels(self, hostname: str, time: float) -> dict: + """ + args: + hostname: node hostname + time: time to query (unix timestamp) + returns: dict of node labels + """ + + res = await self.query( + type="single", + query={ + "metric": "kube_node_labels", + "filters": {"node": hostname}, + }, + time=time, + ) + + if not res: + raise IncompleteData(f"node labels are missing. hostname={hostname}") + + labels = res[0]["labels"] + + return { + "cores": float(labels["label_karpenter_k8s_aws_instance_cpu"]), + "mem": float(labels["label_karpenter_k8s_aws_instance_memory"]), + "arch": labels["label_kubernetes_io_arch"], + "os": labels["label_kubernetes_io_os"], + "instance_type": labels["label_node_kubernetes_io_instance_type"], + } + def query_to_str(metric: str, filters: dict) -> str: """ @@ -107,20 +302,19 @@ def query_to_str(metric: str, filters: dict) -> str: return f"{metric}{{{filters_str}}}" -def process_resources(res: dict, job_id: int) -> dict: +def process_resources(res: dict) -> dict: """ Processes the resource limits and requests from a Prometheus response into readable format. args: res: Prometheus response - job_id: job id for error logging returns: dict with {resource: {unit: value}} format """ if not res: - raise IncompleteData(f"resource data is missing for job {job_id}") + raise IncompleteData("resource data is missing") processed = {} for item in res: @@ -133,21 +327,20 @@ def process_resources(res: dict, job_id: int) -> dict: return processed -def process_usage(res: dict, job_id: int) -> dict: +def process_usage(res: dict) -> dict: """ Processes the usage data from a Prometheus response into readable format. This could either be CPU usage or memory usage. args: res: Prometheus response - job_id: job id for error logging returns: dict with {statistic: value} format """ if not res: # sometimes prometheus reports no data for a job if the time range is too small - raise IncompleteData(f"usage data is missing for job {job_id}") + raise IncompleteData("usage data is missing") usage = [float(value) for timestamp, value in res[0]["values"]] @@ -165,6 +358,6 @@ def process_usage(res: dict, job_id: int) -> dict: or sum_stats["mean"] == 0 or math.isnan(sum_stats["stddev"]) ): - raise IncompleteData(f"usage data is invalid for job {job_id}") + raise IncompleteData("usage data is invalid") return sum_stats diff --git a/gantry/db/__init__.py b/gantry/db/__init__.py new file mode 100644 index 0000000..dab0a74 --- /dev/null +++ b/gantry/db/__init__.py @@ -0,0 +1,3 @@ +# flake8: noqa +from .get import * +from .insert import * diff --git a/gantry/db/get.py b/gantry/db/get.py new file mode 100644 index 0000000..8ef7977 --- /dev/null +++ b/gantry/db/get.py @@ -0,0 +1,41 @@ +import logging + +import aiosqlite + + +async def get_node(db: aiosqlite.Connection, uuid: str) -> int | None: + """return the primary key if found, otherwise return None""" + + async with db.execute("select id from nodes where uuid = ?", (uuid,)) as cursor: + if cur_node := await cursor.fetchone(): + return cur_node[0] + + return None + + +async def job_exists(db: aiosqlite.Connection, job_id: int) -> bool: + """return if the job exists in the database""" + + async with db.execute("select id from jobs where job_id = ?", (job_id,)) as cursor: + if await cursor.fetchone(): + logging.warning( + f""" + job {job_id} already in database. + check why multiple requests are being sent. + """ + ) + return True + + return False + + +async def ghost_exists(db: aiosqlite.Connection, job_id: int) -> bool: + """return if the ghost job exists in the database""" + + async with db.execute( + "select id from ghost_jobs where job_id = ?", (job_id,) + ) as cursor: + if await cursor.fetchone(): + return True + + return False diff --git a/gantry/db/insert.py b/gantry/db/insert.py new file mode 100644 index 0000000..da35620 --- /dev/null +++ b/gantry/db/insert.py @@ -0,0 +1,69 @@ +import aiosqlite + +from gantry.db.get import get_node + + +def insert_dict(table: str, input: dict, ignore=False) -> tuple[str, tuple]: + """ + crafts an sqlite insert statement from a dictionary. + + args: + table: name of the table to insert into + input: dictionary of values to insert + ignore: whether to ignore duplicate entries + + returns: tuple of (query, values) + """ + + columns = ", ".join(input.keys()) + values = ", ".join(["?" for _ in range(len(input))]) + query = f"INSERT INTO {table} ({columns}) VALUES ({values})" + + if ignore: + query = query.replace("INSERT", "INSERT OR IGNORE") + + # using a tuple of values from the dictionary + values_tuple = tuple(input.values()) + return query, values_tuple + + +async def insert_ghost(db: aiosqlite.Connection, job_id: int) -> None: + """Inserts a ghost job into the database.""" + + await db.execute(("insert into ghost_jobs (name) values (?)"), (job_id,)) + + +async def insert_node(db: aiosqlite.Connection, node: dict) -> int: + """Inserts a node into the database.""" + + async with db.execute( + *insert_dict( + "nodes", + node, + # deal with races + ignore=True, + ) + ) as cursor: + pk = cursor.lastrowid + + if pk == 0: + # the ignore part of the query was triggered, some other call + # must have inserted the node before this one + pk = await get_node(db, node["uuid"]) + + return pk + + +async def insert_job(db: aiosqlite.Connection, job: dict) -> int: + """Inserts a job into the database.""" + + async with db.execute( + *insert_dict( + "jobs", + job, + # if the job somehow gets added into the db (pod+id being unique) + # then ignore the insert + ignore=True, + ) + ) as cursor: + return cursor.lastrowid diff --git a/gantry/models/__init__.py b/gantry/models/__init__.py index 57e9b66..73d8633 100644 --- a/gantry/models/__init__.py +++ b/gantry/models/__init__.py @@ -1,3 +1,2 @@ # flake8: noqa -from .build import Build -from .vm import VM +from .job import Job diff --git a/gantry/models/build.py b/gantry/models/build.py deleted file mode 100644 index 169a6b8..0000000 --- a/gantry/models/build.py +++ /dev/null @@ -1,241 +0,0 @@ -import json -import logging -import re -from datetime import datetime - -import aiosqlite - -from gantry.clients.gitlab import GitlabClient -from gantry.util.misc import insert_dict, setattrs, spec_variants -from gantry.clients.prometheus import ( - IncompleteData, - PrometheusClient, - process_resources, - process_usage, -) - - -class Build: - def __init__( - self, - status: str, - name: str, - id: int, - start: str, - end: str, - retries: int, - ref: str, - ): - self.status = status - self.name = name - self.id = id - self.start = datetime.fromisoformat(start).timestamp() - self.end = datetime.fromisoformat(end).timestamp() - self.retries = retries - self.ref = ref - - @property - def midpoint(self) -> float: - """Returns the midpoint of the job in unix time.""" - # prometheus is not guaranteed to have data at the exact start and end times - # instead of creating an arbitrary buffer, ask for data in the middle of the job - return (self.start + self.end) / 2 - - async def is_ghost(self, db: aiosqlite.Connection, gl: GitlabClient) -> bool: - """Returns the job's ghost status.""" - - # prevent duplicate jobs from being inserted into the database - async with db.execute( - "select job_id from ghost_jobs where job_id = ?", (self.id,) - ) as cursor: - if await cursor.fetchone(): - # ghost job is already in the database - return True - - log = await gl.job_log(self.id) - ghost = "No need to rebuild" in log - - if ghost: - await db.execute(("insert into ghost_jobs (name) values (?)"), (self.id,)) - - return ghost - - async def in_db(self, db: aiosqlite.Connection) -> bool: - """Checks if the job is already in the db.""" - - async with db.execute( - "select job_id from builds where job_id = ?", (self.id,) - ) as cursor: - found = bool(await cursor.fetchone()) - - if found: - logging.warning(f"job {self.id} already in database") - - return found - - async def get_annotations(self, prometheus: PrometheusClient): - """Fetches the annotations and assigns multiple attributes.""" - - annotations_res = await prometheus.query( - type="single", - query={ - "metric": "kube_pod_annotations", - "filters": {"annotation_gitlab_ci_job_id": self.id}, - }, - time=self.midpoint, - ) - - if not annotations_res: - raise IncompleteData(f"missing annotations for job {self.id}") - - annotations = annotations_res[0]["labels"] - - setattrs( - self, - pod=annotations["pod"], - # if build jobs is not set, defaults to 16 due to spack settings - build_jobs=annotations.get("annotation_metrics_spack_job_build_jobs", 16), - arch=annotations["annotation_metrics_spack_job_spec_arch"], - pkg_name=annotations["annotation_metrics_spack_job_spec_pkg_name"], - pkg_version=annotations["annotation_metrics_spack_job_spec_pkg_version"], - pkg_variants=spec_variants( - annotations["annotation_metrics_spack_job_spec_variants"] - ), - compiler_name=annotations[ - "annotation_metrics_spack_job_spec_compiler_name" - ], - compiler_version=annotations[ - "annotation_metrics_spack_job_spec_compiler_version" - ], - stack=annotations["annotation_metrics_spack_ci_stack_name"], - ) - - async def get_resources(self, prometheus: PrometheusClient): - """fetches pod requests and limits, and also sets the node hostname""" - requests = process_resources( - await prometheus.query( - type="single", - query={ - "metric": "kube_pod_container_resource_requests", - "filters": {"container": "build", "pod": self.pod}, - }, - time=self.midpoint, - ), - self.id, - ) - - limits_res = await prometheus.query( - type="single", - query={ - "metric": "kube_pod_container_resource_limits", - "filters": {"container": "build", "pod": self.pod}, - }, - time=self.midpoint, - ) - - if not limits_res: - raise IncompleteData(f"missing limits for job {self.id}") - - # instead of needing to fetch the node where the pod ran from kube_pod_info - # we can grab it from kube_pod_container_resource_limits - # weirdly, it's not available in kube_pod_labels or annotations - # https://github.com/kubernetes/kube-state-metrics/issues/1148 - - self.node = limits_res[0]["labels"]["node"] - limits = process_resources(limits_res, self.id) - - setattrs( - self, - cpu_request=requests["cpu"]["value"], - mem_request=requests["memory"]["value"], - cpu_limit=limits.get("cpu", {}).get("value"), - mem_limit=limits["memory"]["value"], - ) - - async def get_usage(self, prometheus: PrometheusClient): - """Sets resource usage attributes.""" - - mem_usage = process_usage( - await prometheus.query( - type="range", - query={ - "metric": "container_memory_working_set_bytes", - "filters": {"container": "build", "pod": self.pod}, - }, - start=self.start, - end=self.end, - ), - self.id, - ) - - cpu_usage = process_usage( - await prometheus.query( - type="range", - custom_query=( - f"rate(container_cpu_usage_seconds_total{{" - f"pod='{self.pod}', container='build'}}[90s])" - ), - start=self.start, - end=self.end, - ), - self.id, - ) - - setattrs( - self, - cpu_mean=cpu_usage["mean"], - cpu_median=cpu_usage["median"], - cpu_max=cpu_usage["max"], - cpu_min=cpu_usage["min"], - cpu_stddev=cpu_usage["stddev"], - mem_mean=mem_usage["mean"], - mem_median=mem_usage["median"], - mem_max=mem_usage["max"], - mem_min=mem_usage["min"], - mem_stddev=mem_usage["stddev"], - ) - - async def insert(self, db: aiosqlite.Connection, vm_id: int) -> int: - """Inserts the build into the database and returns its id.""" - - async with db.execute( - *insert_dict( - "builds", - { - "pod": self.pod, - "vm": vm_id, - "start": self.start, - "end": self.end, - "job_id": self.id, - "job_status": self.status, - "retries": self.retries, - "ref": self.ref, - "pkg_name": self.pkg_name, - "pkg_version": self.pkg_version, - "pkg_variants": json.dumps(self.pkg_variants), # dict to string - "compiler_name": self.compiler_name, - "compiler_version": self.compiler_version, - "arch": self.arch, - "stack": self.stack, - "build_jobs": self.build_jobs, - "cpu_request": self.cpu_request, - "cpu_limit": self.cpu_limit, - "cpu_mean": self.cpu_mean, - "cpu_median": self.cpu_median, - "cpu_max": self.cpu_max, - "cpu_min": self.cpu_min, - "cpu_stddev": self.cpu_stddev, - "mem_request": self.mem_request, - "mem_limit": self.mem_limit, - "mem_mean": self.mem_mean, - "mem_median": self.mem_median, - "mem_max": self.mem_max, - "mem_min": self.mem_min, - "mem_stddev": self.mem_stddev, - }, - # if the job somehow gets added into the db (pod+id being unique) - # then ignore the insert - ignore=True, - ) - ) as cursor: - return cursor.lastrowid diff --git a/gantry/models/job.py b/gantry/models/job.py new file mode 100644 index 0000000..64b2f77 --- /dev/null +++ b/gantry/models/job.py @@ -0,0 +1,40 @@ +import re +from datetime import datetime + + +class Job: + def __init__( + self, + status: str, + name: str, + id: int, + start: str, + end: str, + ref: str, + ): + self.status = status + self.name = name + self.id = id + self.start = datetime.fromisoformat(start).timestamp() + self.end = datetime.fromisoformat(end).timestamp() + self.ref = ref + + @property + def midpoint(self) -> float: + """Returns the midpoint of the job in unix time.""" + # prometheus is not guaranteed to have data at the exact start and end times + # instead of creating an arbitrary buffer, ask for data in the middle of the job + return (self.start + self.end) / 2 + + @property + def valid_build_name(self) -> bool: + """validates the job name.""" + + # example: plumed@2.9.0 /i4u7p6u %gcc@11.4.0 + # arch=linux-ubuntu20.04-neoverse_v1 E4S ARM Neoverse V1 + job_name_pattern = re.compile( + r"([^/ ]+)@([^/ ]+) /([^%]+) %([^ ]+) ([^ ]+) (.+)" + ) + job_name_match = job_name_pattern.match(self.name) + # groups: 1: name, 2: version, 3: hash, 4: compiler, 5: arch, 6: stack + return bool(job_name_match) diff --git a/gantry/models/vm.py b/gantry/models/vm.py deleted file mode 100644 index 59fe864..0000000 --- a/gantry/models/vm.py +++ /dev/null @@ -1,107 +0,0 @@ -import aiosqlite - -from gantry.util.misc import insert_dict, setattrs -from gantry.clients.prometheus import IncompleteData, PrometheusClient - -MB_IN_BYTES = 1_000_000 - - -class VM: - def __init__(self, hostname: str, query_time: float): - """ - args: - hostname: the hostname of the VM - query_time: any point during VM runtime, usually grabbed from build - """ - self.hostname = hostname - self.query_time = query_time - - async def db_id( - self, db: aiosqlite.Connection, prometheus: PrometheusClient - ) -> int | None: - """ - Returns the id of the vm if it exists in the database, otherwise returns None. - Also sets the uuid of the vm. - """ - vm_info = await prometheus.query( - type="single", - query={ - "metric": "kube_node_info", - "filters": {"node": self.hostname}, - }, - time=self.query_time, - ) - - if not vm_info: - raise IncompleteData(f"missing vm info for {self.hostname}") - - self.uuid = vm_info[0]["labels"]["system_uuid"] - - # look for the vm in the database - async with db.execute( - "select id from vms where uuid = ?", (self.uuid,) - ) as cursor: - old_vm = await cursor.fetchone() - - if old_vm: - return old_vm[0] - - return None - - async def get_labels(self, prometheus: PrometheusClient): - """Sets multiple attributes of the VM based on its labels.""" - - vm_labels_res = await prometheus.query( - type="single", - query={ - "metric": "kube_node_labels", - "filters": {"node": self.hostname}, - }, - time=self.query_time, - ) - - if not vm_labels_res: - raise IncompleteData(f"missing vm labels for {self.hostname}") - - labels = vm_labels_res[0]["labels"] - - setattrs( - self, - cores=float(labels["label_karpenter_k8s_aws_instance_cpu"]), - mem=float(labels["label_karpenter_k8s_aws_instance_memory"]), - arch=labels["label_kubernetes_io_arch"], - os=labels["label_kubernetes_io_os"], - instance_type=labels["label_node_kubernetes_io_instance_type"], - ) - - async def insert(self, db: aiosqlite.Connection) -> int: - """Inserts the VM into the database and returns its id.""" - async with db.execute( - *insert_dict( - "vms", - { - "uuid": self.uuid, - "hostname": self.hostname, - "cores": self.cores, - # convert to bytes to be consistent with other resource metrics - "mem": self.mem * MB_IN_BYTES, - "arch": self.arch, - "os": self.os, - "instance_type": self.instance_type, - }, - # deal with races - ignore=True, - ) - ) as cursor: - pk = cursor.lastrowid - - if pk == 0: - # the ignore part of the query was triggered, some other call - # must have inserted the vm before this one - async with db.execute( - "select id from vms where uuid = ?", (self.uuid,) - ) as cursor: - pk_res = await cursor.fetchone() - pk = pk_res[0] - - return pk diff --git a/gantry/routes/collection.py b/gantry/routes/collection.py index b6c48a8..25e119f 100644 --- a/gantry/routes/collection.py +++ b/gantry/routes/collection.py @@ -2,15 +2,17 @@ import aiosqlite -from gantry.models import VM, Build +from gantry import db from gantry.clients.gitlab import GitlabClient from gantry.clients.prometheus import IncompleteData, PrometheusClient -from gantry.util.spec import valid_build_name +from gantry.models import Job +MB_IN_BYTES = 1_000_000 -async def fetch_build( + +async def fetch_job( payload: dict, - db: aiosqlite.Connection, + db_conn: aiosqlite.Connection, gitlab: GitlabClient, prometheus: PrometheusClient, ) -> None: @@ -27,68 +29,100 @@ async def fetch_build( returns: None in order to accomodate a 200 response for the webhook. """ - build = Build( + job = Job( status=payload["build_status"], name=payload["build_name"], id=payload["build_id"], start=payload["build_started_at"], end=payload["build_finished_at"], - retries=payload["retries_count"], ref=payload["ref"], ) # perform checks to see if we should collect data for this job if ( - build.status not in ("success",) - or not valid_build_name(build.name) # is not a build job - or await build.in_db(db) # job already in the database - or await build.is_ghost(db, gitlab) + job.status != "success" + or not job.valid_build_name # is not a build job + or await db.job_exists(db_conn, job.id) # job already in the database + or await db.ghost_exists(db_conn, job.id) # ghost already in db ): return + # check if the job is a ghost + job_log = await gitlab.job_log(job.id) + is_ghost = "No need to rebuild" in job_log + if is_ghost: + db.insert_ghost(db_conn, job.id) + return + try: - await build.get_annotations(prometheus) - await build.get_resources(prometheus) - await build.get_usage(prometheus) - vm_id = await fetch_vm(db, prometheus, build.node, build.midpoint) + annotations = await prometheus.get_job_annotations(job.id, job.midpoint) + resources, node_hostname = await prometheus.get_job_resources( + annotations["pod"], job.midpoint + ) + usage = await prometheus.get_job_usage(annotations["pod"], job.start, job.end) + node_id = await fetch_node(db_conn, prometheus, node_hostname, job.midpoint) except IncompleteData as e: # missing data, skip this job - logging.error(e) + logging.error(f"{e} job={job.id}") return - await build.insert(db, vm_id) - # vm and build will get saved at the same time to make sure - # we don't accidentally commit a vm without a build - await db.commit() + await db.insert_job( + db_conn, + { + "node": node_id, + "start": job.start, + "end": job.end, + "job_id": job.id, + "job_status": job.status, + "ref": job.ref, + **annotations, + **resources, + **usage, + }, + ) + + # job and node will get saved at the same time to make sure + # we don't accidentally commit a node without a job + await db_conn.commit() return -async def fetch_vm( - db: aiosqlite.Connection, +async def fetch_node( + db_conn: aiosqlite.Connection, prometheus: PrometheusClient, hostname: dict, query_time: float, ) -> int: """ - Finds an existing VM in the database or inserts a new one. + Finds an existing node in the database or inserts a new one. args: db: an active aiosqlite connection prometheus: - hostname: the hostname of the VM - query_time: any point during VM runtime, usually grabbed from build + hostname: the hostname of the node + query_time: any point during node runtime, usually grabbed from job - returns: id of the inserted or existing VM + returns: id of the inserted or existing node """ - vm = VM( - hostname=hostname, - query_time=query_time, - ) - # do not proceed if the VM exists - if existing_vm := await vm.db_id(db, prometheus): - return existing_vm - - await vm.get_labels(prometheus) - return await vm.insert(db) + node_uuid = await prometheus.get_node_uuid(hostname, query_time) + + # do not proceed if the node exists + if existing_node := await db.get_node(db_conn, node_uuid): + return existing_node + + node_labels = await prometheus.get_node_labels(hostname, query_time) + return await db.insert_node( + db_conn, + { + "uuid": node_uuid, + "hostname": hostname, + "cores": node_labels["cores"], + # convert to bytes to be consistent with other resource metrics + "mem": node_labels["mem"] * MB_IN_BYTES, + "arch": node_labels["arch"], + "os": node_labels["os"], + "instance_type": node_labels["instance_type"], + }, + ) diff --git a/gantry/util/misc.py b/gantry/util/misc.py deleted file mode 100644 index 0ff0892..0000000 --- a/gantry/util/misc.py +++ /dev/null @@ -1,4 +0,0 @@ -def setattrs(_self, **kwargs): - """Sets multiple attributes of an object from a dictionary.""" - for k, v in kwargs.items(): - setattr(_self, k, v) diff --git a/gantry/util/spec.py b/gantry/util/spec.py index 9376ece..eb1b33d 100644 --- a/gantry/util/spec.py +++ b/gantry/util/spec.py @@ -1,5 +1,3 @@ -import re - def spec_variants(spec: str) -> dict: """Given a spec's concrete variants, return a dict in name: value format.""" # example: +adios2~advanced_debug patches=02253c7,acb3805,b724e6a use_vtkm=on @@ -29,18 +27,3 @@ def spec_variants(spec: str) -> dict: variants[part[1:]] = False return variants - -def valid_build_name(name): - """Returns True if the job is a build job, False otherwise.""" - - # example: plumed@2.9.0 /i4u7p6u %gcc@11.4.0 - # arch=linux-ubuntu20.04-neoverse_v1 E4S ARM Neoverse V1 - job_name_pattern = re.compile( - r"([^/ ]+)@([^/ ]+) /([^%]+) %([^ ]+) ([^ ]+) (.+)" - ) - job_name_match = job_name_pattern.match(name) - # groups: 1: name, 2: version, 3: hash, 4: compiler, 5: arch, 6: stack - return bool(job_name_match) - - - diff --git a/gantry/views.py b/gantry/views.py index b311e8d..d9a0bb4 100644 --- a/gantry/views.py +++ b/gantry/views.py @@ -1,9 +1,10 @@ +import asyncio import json import os from aiohttp import web -from gantry.routes.collection import fetch_build +from gantry.routes.collection import fetch_job routes = web.RouteTableDef() @@ -21,7 +22,12 @@ async def collect_job(request: web.Request) -> web.Response: if request.headers.get("X-Gitlab-Event") != "Job Hook": return web.Response(status=400, text="invalid event type") - await fetch_build( - payload, request.app["db"], request.app["gitlab"], request.app["prometheus"] + # will return immediately, but will not block the event loop + # allowing fetch_job to run in the background + asyncio.ensure_future( + fetch_job( + payload, request.app["db"], request.app["gitlab"], request.app["prometheus"] + ) ) + return web.Response(status=200)