diff --git a/.envrc b/.envrc new file mode 100644 index 0000000..dcea6a5 --- /dev/null +++ b/.envrc @@ -0,0 +1,17 @@ +#------------------------------------------------------------------------ +# Load Development Spack Environment (If Spack is installed.) +# +# Run 'direnv allow' from within the cloned repository to automatically +# load the spack environment when you enter the directory. +#------------------------------------------------------------------------ +if type spack &>/dev/null; then + . $SPACK_ROOT/share/spack/setup-env.sh + spack env activate -d . +fi + +#------------------------------------------------------------------------ +# Load Environment Variables from .env (if files exists) +#------------------------------------------------------------------------ +if [ -e .env ]; then + source .env +fi diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..f295e07 --- /dev/null +++ b/.flake8 @@ -0,0 +1,3 @@ +[flake8] +max-line-length = 88 +extend-ignore = E203, E704 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..372e265 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +__pycache__ +.env +spack.lock +.spack-env +db/*.db diff --git a/db/schema.sql b/db/schema.sql new file mode 100644 index 0000000..bba3549 --- /dev/null +++ b/db/schema.sql @@ -0,0 +1,47 @@ +CREATE TABLE nodes ( + id INTEGER PRIMARY KEY, + uuid TEXT NOT NULL UNIQUE, + hostname TEXT NOT NULL, + cores REAL NOT NULL, + mem REAL NOT NULL, + arch TEXT NOT NULL, + os TEXT NOT NULL, + instance_type TEXT NOT NULL +); + +CREATE TABLE jobs ( + id INTEGER PRIMARY KEY, + pod TEXT NOT NULL UNIQUE, + node INTEGER NOT NULL, + start INTEGER NOT NULL, + end INTEGER NOT NULL, + gitlab_id INTEGER NOT NULL UNIQUE, + job_status TEXT NOT NULL, + ref TEXT NOT NULL, + pkg_name TEXT NOT NULL, + pkg_version TEXT NOT NULL, + pkg_variants TEXT NOT NULL, + compiler_name TEXT NOT NULL, + compiler_version TEXT NOT NULL, + arch TEXT NOT NULL, + stack TEXT NOT NULL, + build_jobs INTEGER NOT NULL, + cpu_request REAL NOT NULL, + cpu_limit REAL, -- this can be null becasue it's currently not set + cpu_mean REAL NOT NULL, + cpu_median REAL NOT NULL, + cpu_max REAL NOT NULL, + cpu_min REAL NOT NULL, + cpu_stddev REAL NOT NULL, + mem_request REAL NOT NULL, + mem_limit REAL NOT NULL, + mem_mean REAL NOT NULL, + mem_median REAL NOT NULL, + mem_max REAL NOT NULL, + mem_min REAL NOT NULL, + mem_stddev REAL NOT NULL, + FOREIGN KEY (node) + REFERENCES nodes (id) + ON UPDATE CASCADE + ON DELETE CASCADE +); diff --git a/gantry/__main__.py b/gantry/__main__.py index 491f8ff..ff25d64 100644 --- a/gantry/__main__.py +++ b/gantry/__main__.py @@ -1,5 +1,36 @@ +import os + +import aiosqlite +from aiohttp import web + +from gantry.clients.gitlab import GitlabClient +from gantry.clients.prometheus import PrometheusClient +from gantry.views import routes + + +async def init_db(app: web.Application): + db = await aiosqlite.connect(os.environ["DB_FILE"]) + await db.execute("PRAGMA foreign_keys = ON;") + app["db"] = db + yield + await db.close() + + +async def init_clients(app: web.Application): + app["gitlab"] = GitlabClient( + os.environ["GITLAB_URL"], os.environ["GITLAB_API_TOKEN"] + ) + app["prometheus"] = PrometheusClient( + os.environ["PROMETHEUS_URL"], os.environ.get("PROMETHEUS_COOKIE", "") + ) + + def main(): - print("Hello World") + app = web.Application() + app.add_routes(routes) + app.cleanup_ctx.append(init_db) + app.on_startup.append(init_clients) + web.run_app(app) if __name__ == "__main__": diff --git a/gantry/clients/__init__.py b/gantry/clients/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/gantry/clients/gitlab.py b/gantry/clients/gitlab.py new file mode 100644 index 0000000..97f9500 --- /dev/null +++ b/gantry/clients/gitlab.py @@ -0,0 +1,31 @@ +import aiohttp + + +class GitlabClient: + def __init__(self, base_url: str, api_token: str): + self.base_url = base_url + self.headers = {"PRIVATE-TOKEN": api_token} + + async def _request(self, url: str, response_type: str) -> dict | str: + """ + Helper for requests to the Gitlab API. + + args: + url: the url to request + response_type: the type of response to expect (json or text) + + returns: the response from Gitlab in the specified format + """ + + async with aiohttp.ClientSession(raise_for_status=True) as session: + async with session.get(url, headers=self.headers) as resp: + if response_type == "json": + return await resp.json() + if response_type == "text": + return await resp.text() + + async def job_log(self, gl_id: int) -> str: + """Given a job id, returns the log from that job""" + + url = f"{self.base_url}/jobs/{gl_id}/trace" + return await self._request(url, "text") diff --git a/gantry/clients/prometheus/__init__.py b/gantry/clients/prometheus/__init__.py new file mode 100644 index 0000000..9234832 --- /dev/null +++ b/gantry/clients/prometheus/__init__.py @@ -0,0 +1,2 @@ +# flake8: noqa +from .prometheus import PrometheusClient diff --git a/gantry/clients/prometheus/job.py b/gantry/clients/prometheus/job.py new file mode 100644 index 0000000..9f7162c --- /dev/null +++ b/gantry/clients/prometheus/job.py @@ -0,0 +1,143 @@ +import json + +from gantry.clients.prometheus import util +from gantry.util.spec import spec_variants + + +class PrometheusJobClient: + def __init__(self, client): + self.client = client + + async def get_annotations(self, gl_id: int, time: float) -> dict: + """ + args: + gl_id: gitlab job id + time: when to query (unix timestamp) + returns: dict of annotations + """ + + res = await self.client.query_single( + query={ + "metric": "kube_pod_annotations", + "filters": {"annotation_gitlab_ci_job_id": gl_id}, + }, + time=time, + ) + + if not res: + raise util.IncompleteData("annotation data is missing") + + annotations = res[0]["labels"] + + return { + "pod": annotations["pod"], + # if build jobs is not set, defaults to 16 due to spack config + "build_jobs": annotations.get( + "annotation_metrics_spack_job_build_jobs", 16 + ), + "arch": annotations["annotation_metrics_spack_job_spec_arch"], + "pkg_name": annotations["annotation_metrics_spack_job_spec_pkg_name"], + "pkg_version": annotations["annotation_metrics_spack_job_spec_pkg_version"], + "pkg_variants": json.dumps( + spec_variants(annotations["annotation_metrics_spack_job_spec_variants"]) + ), + "compiler_name": annotations[ + "annotation_metrics_spack_job_spec_compiler_name" + ], + "compiler_version": annotations[ + "annotation_metrics_spack_job_spec_compiler_version" + ], + "stack": annotations["annotation_metrics_spack_ci_stack_name"], + } + + async def get_resources(self, pod: str, time: float) -> tuple[dict, str]: + """ + args: + pod: pod name + time: when to query (unix timestamp) + returns: dict of resources and node hostname + """ + + requests = util.process_resources( + await self.client.query_single( + query={ + "metric": "kube_pod_container_resource_requests", + "filters": {"container": "build", "pod": pod}, + }, + time=time, + ) + ) + + limits_res = await self.client.query_single( + query={ + "metric": "kube_pod_container_resource_limits", + "filters": {"container": "build", "pod": pod}, + }, + time=time, + ) + + if not limits_res: + raise util.IncompleteData("missing limits") + + # instead of needing to fetch the node where the pod ran from kube_pod_info + # we can grab it from kube_pod_container_resource_limits + # weirdly, it's not available in kube_pod_labels or annotations + # https://github.com/kubernetes/kube-state-metrics/issues/1148 + node = limits_res[0]["labels"]["node"] + limits = util.process_resources(limits_res) + + return ( + { + "cpu_request": requests["cpu"]["value"], + "mem_request": requests["memory"]["value"], + "cpu_limit": limits.get("cpu", {}).get("value"), + "mem_limit": limits["memory"]["value"], + }, + node, + ) + + async def get_usage(self, pod: str, start: float, end: float) -> dict: + """ + Gets resource usage attributes for a job. + + args: + pod: pod name + start: start time (unix timestamp) + end: end time (unix timestamp) + returns: dict of usage stats + """ + + mem_usage = util.process_usage( + await self.client.query_range( + query={ + "metric": "container_memory_working_set_bytes", + "filters": {"container": "build", "pod": pod}, + }, + start=start, + end=end, + ) + ) + + cpu_usage = util.process_usage( + await self.client.query_range( + query=( + f"rate(container_cpu_usage_seconds_total{{" + f"pod='{pod}', container='build'}}[90s])" + ), + start=start, + end=end, + ) + ) + + return { + "cpu_mean": cpu_usage["mean"], + "cpu_median": cpu_usage["median"], + "cpu_max": cpu_usage["max"], + "cpu_min": cpu_usage["min"], + "cpu_stddev": cpu_usage["stddev"], + "mem_mean": mem_usage["mean"], + "mem_median": mem_usage["median"], + "mem_max": mem_usage["max"], + "mem_min": mem_usage["min"], + "mem_stddev": mem_usage["stddev"], + } diff --git a/gantry/clients/prometheus/node.py b/gantry/clients/prometheus/node.py new file mode 100644 index 0000000..abfb217 --- /dev/null +++ b/gantry/clients/prometheus/node.py @@ -0,0 +1,56 @@ +from gantry.clients.prometheus import util + + +class PrometheusNodeClient: + def __init__(self, client): + self.client = client + + async def get_uuid(self, hostname: str, time: float) -> dict: + """ + args: + hostname: node hostname + time: time to query (unix timestamp) + returns: dict of node info (UUID as of now) + """ + + res = await self.client.query_single( + query={ + "metric": "kube_node_info", + "filters": {"node": hostname}, + }, + time=time, + ) + + if not res: + raise util.IncompleteData(f"node info is missing. hostname={hostname}") + + return res[0]["labels"]["system_uuid"] + + async def get_labels(self, hostname: str, time: float) -> dict: + """ + args: + hostname: node hostname + time: time to query (unix timestamp) + returns: dict of node labels + """ + + res = await self.client.query_single( + query={ + "metric": "kube_node_labels", + "filters": {"node": hostname}, + }, + time=time, + ) + + if not res: + raise util.IncompleteData(f"node labels are missing. hostname={hostname}") + + labels = res[0]["labels"] + + return { + "cores": float(labels["label_karpenter_k8s_aws_instance_cpu"]), + "mem": float(labels["label_karpenter_k8s_aws_instance_memory"]), + "arch": labels["label_kubernetes_io_arch"], + "os": labels["label_kubernetes_io_os"], + "instance_type": labels["label_node_kubernetes_io_instance_type"], + } diff --git a/gantry/clients/prometheus/prometheus.py b/gantry/clients/prometheus/prometheus.py new file mode 100644 index 0000000..acb2dd3 --- /dev/null +++ b/gantry/clients/prometheus/prometheus.py @@ -0,0 +1,104 @@ +import logging +import math + +import aiohttp + +from gantry.clients.prometheus import util +from gantry.clients.prometheus.job import PrometheusJobClient +from gantry.clients.prometheus.node import PrometheusNodeClient + + +class PrometheusClient: + def __init__(self, base_url: str, auth_cookie: str = ""): + # cookie will only be used if set + if auth_cookie: + self.cookies = {"_oauth2_proxy": auth_cookie} + else: + self.cookies = {} + + self.base_url = base_url + + async def query_single(self, query: str | dict, time: int) -> list: + """Query Prometheus for a single value + args: + + query: str or dict + if str, the query string + if dict, the metric and filters + example: + "query": { + "metric": "metric_name", + "filters": {"filter1": "value1", "filter2": "value2"} + } + time: int (unix timestamp) + + returns: dict with {label: value} format + """ + + query = util.process_query(query) + url = f"{self.base_url}/query?query={query}&time={time}" + return await self._query(url) + + async def query_range(self, query: str | dict, start: int, end: int) -> list: + """Query Prometheus for a range of values + + args: + query: see query_single + start: int (unix timestamp) + end: int (unix timestamp) + + returns: list of dicts with {label: value} format + """ + + query = util.process_query(query) + # prometheus will only return this many frames + max_resolution = 10_000 + # calculating the max step size to get the desired resolution + step = math.ceil((end - start) / max_resolution) + url = ( + f"{self.base_url}/query_range?" + f"query={query}&" + f"start={start}&" + f"end={end}&" + f"step={step}s" + ) + return await self._query(url) + + async def _query(self, url: str) -> list: + """Query Prometheus with a query string""" + async with aiohttp.ClientSession(raise_for_status=True) as session: + # submit cookie with request + async with session.get(url, cookies=self.cookies) as resp: + try: + return self.prettify_res(await resp.json()) + except aiohttp.ContentTypeError: + logging.error( + """Prometheus query failed with unexpected response. + The cookie may have expired.""" + ) + return {} + + def prettify_res(self, response: dict) -> list: + """Process Prometheus response into an arrray of dicts with {label: value}""" + result_type = response.get("data", {}).get("resultType") + values_dict = { + "matrix": "values", + "vector": "value", + } + + if result_type not in values_dict: + logging.error(f"Prometheus response type {result_type} not supported") + return [] + + return [ + {"labels": result["metric"], "values": result[values_dict[result_type]]} + for result in response["data"]["result"] + ] + + @property + def job(self): + return PrometheusJobClient(self) + + @property + def node(self): + return PrometheusNodeClient(self) diff --git a/gantry/clients/prometheus/util.py b/gantry/clients/prometheus/util.py new file mode 100644 index 0000000..e749dae --- /dev/null +++ b/gantry/clients/prometheus/util.py @@ -0,0 +1,90 @@ +import math +import statistics +import urllib.parse + + +class IncompleteData(Exception): + pass + + +def process_query(query: dict | str) -> str: + """ + Processes query into a string that can be used in a URL. + See query_single in prometheus.py for more details on args. + """ + if isinstance(query, dict): + query = query_to_str(**query) + elif not isinstance(query, str): + raise ValueError("query must be a string or dict") + + return urllib.parse.quote(query) + + +def query_to_str(metric: str, filters: dict) -> str: + """ + In: "metric", {key1: value1, key2: value2} + Out: "metric{key1="value1", key2="value2"}" + """ + filters_str = ", ".join([f'{key}="{value}"' for key, value in filters.items()]) + return f"{metric}{{{filters_str}}}" + + +def process_resources(res: list) -> dict: + """ + Processes the resource limits and requests from a Prometheus response into + readable format. + + args: + res: Prometheus response + + returns: dict with {resource: {unit: value}} format + """ + + if not res: + raise IncompleteData("resource data is missing") + + processed = {} + for item in res: + # duplicates are ignored by overwriting the previous entry + processed[item["labels"]["resource"]] = { + "unit": item["labels"]["unit"], + "value": float(item["values"][1]), + } + + return processed + + +def process_usage(res: list) -> dict: + """ + Processes the usage data from a Prometheus response into readable format. + This could either be CPU usage or memory usage. + + args: + res: Prometheus response + + returns: dict with {statistic: value} format + """ + + if not res: + # sometimes prometheus reports no data for a job if the time range is too small + raise IncompleteData("usage data is missing") + + usage = [float(value) for timestamp, value in res[0]["values"]] + + sum_stats = { + "mean": statistics.fmean(usage), + # pstdev because we have the whole population + "stddev": statistics.pstdev(usage), + "max": max(usage), + "min": min(usage), + "median": statistics.median(usage), + } + + if ( + sum_stats["stddev"] == 0 + or sum_stats["mean"] == 0 + or math.isnan(sum_stats["stddev"]) + ): + raise IncompleteData("usage data is invalid") + + return sum_stats diff --git a/gantry/db/__init__.py b/gantry/db/__init__.py new file mode 100644 index 0000000..dab0a74 --- /dev/null +++ b/gantry/db/__init__.py @@ -0,0 +1,3 @@ +# flake8: noqa +from .get import * +from .insert import * diff --git a/gantry/db/get.py b/gantry/db/get.py new file mode 100644 index 0000000..c597c3e --- /dev/null +++ b/gantry/db/get.py @@ -0,0 +1,43 @@ +import logging + +import aiosqlite + + +async def get_node(db: aiosqlite.Connection, uuid: str) -> int | None: + """return the primary key if found, otherwise return None""" + + async with db.execute("select id from nodes where uuid = ?", (uuid,)) as cursor: + if cur_node := await cursor.fetchone(): + return cur_node[0] + + return None + + +async def job_exists(db: aiosqlite.Connection, gl_id: int) -> bool: + """return if the job exists in the database""" + + async with db.execute( + "select id from jobs where gitlab_id = ?", (gl_id,) + ) as cursor: + if await cursor.fetchone(): + logging.warning( + f""" + job {gl_id} already in database. + check why multiple requests are being sent. + """ + ) + return True + + return False + + +async def ghost_exists(db: aiosqlite.Connection, gl_id: int) -> bool: + """return if the ghost job exists in the database""" + + async with db.execute( + "select id from ghost_jobs where gitlab_id = ?", (gl_id,) + ) as cursor: + if await cursor.fetchone(): + return True + + return False diff --git a/gantry/db/insert.py b/gantry/db/insert.py new file mode 100644 index 0000000..7564ad9 --- /dev/null +++ b/gantry/db/insert.py @@ -0,0 +1,63 @@ +import aiosqlite + +from gantry.db.get import get_node + + +def insert_dict(table: str, input: dict, ignore=False) -> tuple[str, tuple]: + """ + crafts an sqlite insert statement from a dictionary. + + args: + table: name of the table to insert into + input: dictionary of values to insert + ignore: whether to ignore duplicate entries + + returns: tuple of (query, values) + """ + + columns = ", ".join(input.keys()) + values = ", ".join(["?" for _ in range(len(input))]) + query = f"INSERT INTO {table} ({columns}) VALUES ({values})" + + if ignore: + query = query.replace("INSERT", "INSERT OR IGNORE") + + # using a tuple of values from the dictionary + values_tuple = tuple(input.values()) + return query, values_tuple + + +async def insert_node(db: aiosqlite.Connection, node: dict) -> int: + """Inserts a node into the database.""" + + async with db.execute( + *insert_dict( + "nodes", + node, + # deal with races + ignore=True, + ) + ) as cursor: + pk = cursor.lastrowid + + if pk == 0: + # the ignore part of the query was triggered, some other call + # must have inserted the node before this one + pk = await get_node(db, node["uuid"]) + + return pk + + +async def insert_job(db: aiosqlite.Connection, job: dict) -> int: + """Inserts a job into the database.""" + + async with db.execute( + *insert_dict( + "jobs", + job, + # if the job somehow gets added into the db (pod+id being unique) + # then ignore the insert + ignore=True, + ) + ) as cursor: + return cursor.lastrowid diff --git a/gantry/models/__init__.py b/gantry/models/__init__.py new file mode 100644 index 0000000..73d8633 --- /dev/null +++ b/gantry/models/__init__.py @@ -0,0 +1,2 @@ +# flake8: noqa +from .job import Job diff --git a/gantry/models/job.py b/gantry/models/job.py new file mode 100644 index 0000000..3c3a794 --- /dev/null +++ b/gantry/models/job.py @@ -0,0 +1,40 @@ +import re +from datetime import datetime + + +class Job: + def __init__( + self, + status: str, + name: str, + gl_id: int, + start: str, + end: str, + ref: str, + ): + self.status = status + self.name = name + self.gl_id = gl_id + self.start = datetime.fromisoformat(start).timestamp() + self.end = datetime.fromisoformat(end).timestamp() + self.ref = ref + + @property + def midpoint(self) -> float: + """Returns the midpoint of the job in unix time.""" + # prometheus is not guaranteed to have data at the exact start and end times + # instead of creating an arbitrary buffer, ask for data in the middle of the job + return (self.start + self.end) / 2 + + @property + def valid_build_name(self) -> bool: + """validates the job name.""" + + # example: plumed@2.9.0 /i4u7p6u %gcc@11.4.0 + # arch=linux-ubuntu20.04-neoverse_v1 E4S ARM Neoverse V1 + job_name_pattern = re.compile( + r"([^/ ]+)@([^/ ]+) /([^%]+) %([^ ]+) ([^ ]+) (.+)" + ) + job_name_match = job_name_pattern.match(self.name) + # groups: 1: name, 2: version, 3: hash, 4: compiler, 5: arch, 6: stack + return bool(job_name_match) diff --git a/gantry/routes/collection.py b/gantry/routes/collection.py new file mode 100644 index 0000000..1fd7ed7 --- /dev/null +++ b/gantry/routes/collection.py @@ -0,0 +1,130 @@ +import logging + +import aiosqlite + +from gantry import db +from gantry.clients.gitlab import GitlabClient +from gantry.clients.prometheus import PrometheusClient +from gantry.clients.prometheus.util import IncompleteData +from gantry.models import Job + +MB_IN_BYTES = 1_000_000 + + +async def fetch_job( + payload: dict, + db_conn: aiosqlite.Connection, + gitlab: GitlabClient, + prometheus: PrometheusClient, +) -> None: + """ + Fetches a job's information from Prometheus and inserts it into the database. + If there is data missing at any point, the function will still return so the webhook + responds as expected. If an exception is thrown, that behavior was unanticipated by + this program and should be investigated. + + args: + payload: a dictionary containing the information from the Gitlab job hook + db: an active aiosqlite connection + + returns: None in order to accommodate a 200 response for the webhook. + """ + + job = Job( + status=payload["build_status"], + name=payload["build_name"], + gl_id=payload["build_id"], + start=payload["build_started_at"], + end=payload["build_finished_at"], + ref=payload["ref"], + ) + + # perform checks to see if we should collect data for this job + if ( + job.status != "success" + or not job.valid_build_name # is not a build job + # uo runners are not in Prometheus + or payload["runner"]["description"].startswith("uo") + or await db.job_exists(db_conn, job.gl_id) # job already in the database + or await db.ghost_exists(db_conn, job.gl_id) # ghost already in db + ): + return + + # check if the job is a ghost + job_log = await gitlab.job_log(job.gl_id) + is_ghost = "No need to rebuild" in job_log + if is_ghost: + return + + try: + annotations = await prometheus.job.get_annotations(job.gl_id, job.midpoint) + resources, node_hostname = await prometheus.job.get_resources( + annotations["pod"], job.midpoint + ) + usage = await prometheus.job.get_usage(annotations["pod"], job.start, job.end) + node_id = await fetch_node(db_conn, prometheus, node_hostname, job.midpoint) + except IncompleteData as e: + # missing data, skip this job + logging.error(f"{e} job={job.gl_id}") + return + + await db.insert_job( + db_conn, + { + "node": node_id, + "start": job.start, + "end": job.end, + "gitlab_id": job.gl_id, + "job_status": job.status, + "ref": job.ref, + **annotations, + **resources, + **usage, + }, + ) + + # job and node will get saved at the same time to make sure + # we don't accidentally commit a node without a job + await db_conn.commit() + + return + + +async def fetch_node( + db_conn: aiosqlite.Connection, + prometheus: PrometheusClient, + hostname: dict, + query_time: float, +) -> int: + """ + Finds an existing node in the database or inserts a new one. + + args: + db: an active aiosqlite connection + prometheus: + hostname: the hostname of the node + query_time: any point during node runtime, usually grabbed from job + + returns: id of the inserted or existing node + """ + + node_uuid = await prometheus.node.get_uuid(hostname, query_time) + + # do not proceed if the node exists + if existing_node := await db.get_node(db_conn, node_uuid): + return existing_node + + node_labels = await prometheus.node.get_labels(hostname, query_time) + return await db.insert_node( + db_conn, + { + "uuid": node_uuid, + "hostname": hostname, + "cores": node_labels["cores"], + # convert to bytes to be consistent with other resource metrics + "mem": node_labels["mem"] * MB_IN_BYTES, + "arch": node_labels["arch"], + "os": node_labels["os"], + "instance_type": node_labels["instance_type"], + }, + ) diff --git a/gantry/util/__init__.py b/gantry/util/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/gantry/util/spec.py b/gantry/util/spec.py new file mode 100644 index 0000000..eb1b33d --- /dev/null +++ b/gantry/util/spec.py @@ -0,0 +1,29 @@ +def spec_variants(spec: str) -> dict: + """Given a spec's concrete variants, return a dict in name: value format.""" + # example: +adios2~advanced_debug patches=02253c7,acb3805,b724e6a use_vtkm=on + + variants = {} + # give some padding to + and ~ so we can split on them + spec = spec.replace("+", " +") + spec = spec.replace("~", " ~") + parts = spec.split(" ") + + for part in parts: + if len(part) < 2: + continue + if "=" in part: + name, value = part.split("=") + if "," in value: + # array of the multiple values + variants[name] = value.split(",") + else: + # string of the single value + variants[name] = value + else: + # anything after the first character is the value + if part.startswith("+"): + variants[part[1:]] = True + elif part.startswith("~"): + variants[part[1:]] = False + + return variants diff --git a/gantry/views.py b/gantry/views.py new file mode 100644 index 0000000..b71a738 --- /dev/null +++ b/gantry/views.py @@ -0,0 +1,36 @@ +import asyncio +import json +import logging +import os + +from aiohttp import web + +from gantry.routes.collection import fetch_job + +routes = web.RouteTableDef() + + +@routes.post("/v1/collect") +async def collect_job(request: web.Request) -> web.Response: + try: + payload = await request.json() + except json.decoder.JSONDecodeError: + return web.Response(status=400, text="invalid json") + + if request.headers.get("X-Gitlab-Token") != os.environ["GITLAB_WEBHOOK_TOKEN"]: + return web.Response(status=401, text="invalid token") + + if request.headers.get("X-Gitlab-Event") != "Job Hook": + logging.error(f"invalid event type {request.headers.get('X-Gitlab-Event')}") + # return 200 so gitlab doesn't disable the webhook -- this is not fatal + return web.Response(status=200) + + # will return immediately, but will not block the event loop + # allowing fetch_job to run in the background + asyncio.ensure_future( + fetch_job( + payload, request.app["db"], request.app["gitlab"], request.app["prometheus"] + ) + ) + + return web.Response(status=200) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..4620ee1 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,4 @@ +[tool.isort] +profile = "black" +skip_gitignore = true +color_output = true diff --git a/spack.yaml b/spack.yaml new file mode 100644 index 0000000..44863c0 --- /dev/null +++ b/spack.yaml @@ -0,0 +1,14 @@ +spack: + specs: + - python + - py-aiohttp + - py-pytest + - py-pytest-asyncio + - py-flake8 + - py-black + - py-isort + - py-aiosqlite + - sqlite + view: true + concretizer: + unify: true