From 9172dd8dac62a6803c3057d0c2cbffa91eb56a9e Mon Sep 17 00:00:00 2001 From: bcumming Date: Sun, 18 Feb 2024 21:19:30 +0100 Subject: [PATCH] refactor --- .gitignore | 2 + img | 533 +++++++---------------------------------------- lib/datastore.py | 130 ++++++++++++ lib/flock.py | 36 ++++ lib/jfrog.py | 75 +++++++ lib/oras.py | 24 +++ lib/record.py | 112 ++++++++++ lib/terminal.py | 51 +++++ 8 files changed, 511 insertions(+), 452 deletions(-) create mode 100644 lib/datastore.py create mode 100644 lib/flock.py create mode 100644 lib/jfrog.py create mode 100644 lib/oras.py create mode 100644 lib/record.py create mode 100644 lib/terminal.py diff --git a/.gitignore b/.gitignore index 218fb7b..f7268a9 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,5 @@ test/tmp # site generated by mkdocs site + +__pycache__ diff --git a/img b/img index 0e9eb5e..b7648db 100755 --- a/img +++ b/img @@ -1,77 +1,42 @@ -#!/usr/bin/env python3 - -# for handling http requests with the middleware +#!/usr/bin/env python import argparse -from datetime import datetime, timezone -from enum import Enum -import fcntl -import json import os -import requests -import subprocess +import pathlib import sys -UENV_CLI_API_VERSION=1 - -# Choose whether to use colored output. -# - by default colored output is ON -# - if the flag --no-color is passed it is OFF -# - if the environment variable NO_COLOR is set it is OFF -def use_colored_output(cli_arg): - # The --no-color argument overrides all environment variables if passed. - if cli_arg: - return False - - # Check the env. var NO_COLOR and disable color if set. - if os.environ.get('NO_COLOR') is not None: - color_var = os.environ.get('NO_COLOR') - if len(color_var)>0 and color_var != "0": - return False - - return True - -def colorize(string, color): - colors = { - "red": "31", - "green": "32", - "yellow": "33", - "blue": "34", - "magenta": "35", - "cyan": "36", - "white": "37", - } - if colored_output: - return f"\033[1;{colors[color]}m{string}\033[0m" - else: - return string - -def log_error(message): - print(f"{colorize('[error]', 'red')} {message}", file=sys.stderr) - exit(1) - -def exit_with_success(): - exit(0) +prefix = pathlib.Path(__file__).parent.resolve() +libpath = prefix / 'lib' +sys.path = [libpath.as_posix()] + sys.path -def log(message): - print(f"{colorize('[log]', 'yellow')} {message}", file=sys.stderr) +import flock +import jfrog +import oras +import datastore +import terminal def make_argparser(): parser = argparse.ArgumentParser(description=("Interact with the uenv artifactory")) - parser.add_argument("--no-color", action="store_true", + parser.add_argument("--no-color", + action="store_true", help="disable color output") + parser.add_argument("-r", "--repo", + required=False, default=None, type=str, + help="the local repository") subparsers = parser.add_subparsers(dest="command") find_parser = subparsers.add_parser("find", help="find uenv in the CSCS registry") find_parser.add_argument("-s", "--system", required=False, type=str) find_parser.add_argument("-a", "--uarch", required=False, type=str) - find_parser.add_argument("--build", action="store_true", help="enable undeployed builds", required=False) + find_parser.add_argument("--build", action="store_true", + help="enable undeployed builds", required=False) find_parser.add_argument("uenv", nargs="?", default=None, type=str) pull_parser = subparsers.add_parser("pull", help="pull a uenv from the CSCS registry") pull_parser.add_argument("-s", "--system", required=False, type=str) pull_parser.add_argument("-a", "--uarch", required=False, type=str) - pull_parser.add_argument("--build", action="store_true", help="enable undeployed builds", required=False) + pull_parser.add_argument("--build", action="store_true", required=False, + help="enable undeployed builds") pull_parser.add_argument("uenv", nargs="?", default=None, type=str) list_parser = subparsers.add_parser("list", help="list cached images") @@ -79,6 +44,11 @@ def make_argparser(): list_parser.add_argument("-a", "--uarch", required=False, type=str) list_parser.add_argument("uenv", nargs="?", default=None, type=str) + create_parser = subparsers.add_parser("create", + help="create a local file system repository") + create_parser.add_argument("--exists-ok", action="store_true", required=False, + help="no error if the local registry exists") + return parser def get_options(args): @@ -96,7 +66,6 @@ def get_options(args): return options - def get_filter(args): options = get_options(args) img_filter = {"system": options["system"]} @@ -110,148 +79,6 @@ def get_filter(args): return img_filter -class LockType(Enum): - READ = 1 - WRITE = 2 - -class Lock(): - def __init__(self, path: str, type: LockType): - self._lockfile = f"{path}.lock" - - # open the file - self._lock = open(self._lockfile, "a") - - # acquire lock - self._type = type - if self._type==LockType.READ: - # acquire shared lock - fcntl.flock(self._lock, fcntl.LOCK_SH) - else: - # acquire exclusive lock - fcntl.flock(self._lock, fcntl.LOCK_EX) - - log(f"aquired lock {self._lockfile}") - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, exc_traceback): - fcntl.flock(self._lock, fcntl.LOCK_UN) # Release the lock - self._lock.close() - log(f"released lock {self._lockfile}") - - -class Record: - - def __init__(self, system: str, uarch: str, name: str, version: str, tag: str, date: str, size_bytes: int, sha256: str): - self._system = system - self._uarch = uarch - self._name = name - self._version = version - self._tag = tag - self._date = date - self._bytes = size_bytes - self._sha256 = sha256 - - # build/eiger/zen2/cp2k/2023/1133706947 - @classmethod - def frompath(cls, path: str, date: str, size_bytes: int, sha256: str): - fields = path.split("/") - if len(fields) != 5: - raise ValueError("Record must have exactly 5 fields") - - system, uarch, name, version, tag = fields - return cls(system, uarch, name, version, tag, date, size_bytes, sha256) - - @classmethod - def fromjson(cls, raw: dict): - system = raw["system"] - uarch = raw["uarch"] - name = raw["name"] - version = raw["version"] - tag = raw["tag"] - date = to_datetime(raw["date"]) - size_bytes = raw["size"] - sha256 = raw["sha256"] - - return cls(system, uarch, name, version, tag, date, size_bytes, sha256) - - def __eq__(self, other): - if not isinstance(other, Record): - return False - return self.sha256==other.sha256 - - def __lt__(self, other): - if self.system < other.system: return True - if other.system < self.system: return False - if self.uarch < other.uarch: return True - if other.uarch < self.uarch: return False - if self.name < other.name: return True - if other.name < self.name: return False - if self.version < other.version: return True - if other.version< self.version: return False - if self.tag < other.tag: return True - #if other.tag < self.tag: return False - return False - - def __str__(self): - return f"{self.name}/{self.version}:{self.tag} @ {self.system}:{self.uarch}" - - def __repr__(self): - return f"Record({self.system}, {self.uarch}, {self.name}, {self.version}, {self.tag})" - - @property - def system(self): - return self._system - - @property - def uarch(self): - return self._uarch - - @property - def name(self): - return self._name - - @property - def date(self): - return self._date - - @property - def version(self): - return self._version - - @property - def tag(self): - return self._tag - - @property - def sha256(self): - return self._sha256 - - @property - def size(self): - return self._bytes - - @property - def datestring(self): - return self.date.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z' - - @property - def path(self): - return f"{self.system}/{self.uarch}/{self.name}{self.version}/{self.tag}" - - @property - def dictionary(self): - return { - "system": self.system, - "uarch": self.uarch, - "name": self.name, - "date": self.datestring, - "version": self.version, - "tag": self.tag, - "sha256": self.sha256, - "size": self.size - } def relative_path_from_record(record): return f"{record.sha256}" @@ -262,7 +89,7 @@ def relative_jfrog_from_record(record): # pretty print a list of Record def print_records(records): if len(records)>0: - print(colorize(f"{'uenv/version:tag':40}{'uarch':6}{'date':10} {'sha256':16} {'size':<10}", "yellow")) + print(terminal.colorize(f"{'uenv/version:tag':40}{'uarch':6}{'date':10} {'sha256':16} {'size':<10}", "yellow")) for r in records: namestr = f"{r.name}/{r.version}" tagstr = f"{r.tag}" @@ -279,195 +106,6 @@ def print_records(records): size_str = f"{(S/(1024*1024*1024)):<.1f} GB" print(f"{label:<40}{r.uarch:6}{datestr:10} {r.sha256[:16]:16} {size_str:<10}") -class DataStore: - def __init__(self): - # all images store with (key,value) = (sha256,Record) - self._images = {} - - self._store = {"system": {}, "uarch": {}, "name": {}, "version": {}, "tag": {}} - - def add_record(self, r: Record, overwrite: bool = False): - # test for collisions - if (not overwrite) and (self._images.get(r.sha256, None) is not None): - raise ValueError(f"an image with the hash {r.sha256} already exists") - - sha = r.sha256 - self._images[sha] = r - self._store["system"] .setdefault(r.system, []).append(sha) - self._store["uarch"] .setdefault(r.uarch, []).append(sha) - self._store["name"] .setdefault(r.name, []).append(sha) - self._store["version"].setdefault(r.version, []).append(sha) - self._store["tag"] .setdefault(r.tag, []).append(sha) - - def find_records(self, **constraints): - if not constraints: - raise ValueError("At least one constraint must be provided") - - for field in constraints: - if field not in self._store: - raise ValueError(f"Invalid field: {field}. Must be one of 'system', 'uarch', 'name', 'version', 'tag'") - - # Find matching records for each constraint - matching_records_sets = [ - set(self._store[field].get(value, [])) for field, value in constraints.items() - ] - - # Intersect all sets of matching records - if matching_records_sets: - unique = set.intersection(*matching_records_sets) - else: - unique = set() - - results = [self._images[sha] for sha in unique] - results.sort(reverse=True) - return results - - @property - def images(self): - return self._images - - def get_record(self, sha256: str) -> Record: - return self._images.get(sha256, None) - - # Convert to a dictionary that can be written to file as JSON - # The serialisation and deserialisation are central: able to represent - # uenv that are available in both JFrog and filesystem directory tree. - def serialise(self, version: int=UENV_CLI_API_VERSION): - return { - "API_VERSION": version, - "images": [img.dictionary for img in self._images.values()] - } - - # Convert to a dictionary that can be written to file as JSON - # The serialisation and deserialisation are central: able to represent - # uenv that are available in both JFrog and filesystem directory tree. - @classmethod - def deserialise(cls, datastore): - result = cls() - for img in datastore["images"]: - result.add_record(Record.from_dictionary(img)) - -class FileSystemCache(): - def __init__(self, path: str, create: bool=False): - self._path = path - self._index = path + "/index.json" - - if not os.path.exists(self._index): - # error: cache does not exists - raise FileNotFoundError(f"filesystem cache not found {self._path}") - - with open(self._index, "r") as fid: - raw = json.loads(fid.read()) - self._database = DataStore() - for img in raw["images"]: - self._database.add_record(Record.fromjson(img)) - - @staticmethod - def create_if_missing(path: str): - if not os.path.exists(path): - # LOG: f"FileSyStemCache: creating path {path}" - os.makedirs(path) - index_file = f"{path}/index.json" - if not os.path.exists(index_file): - # LOG: f"FileSyStemCache: creating empty index {index_file}" - empty_config = { "API_VERSION": UENV_CLI_API_VERSION, "images": [] } - with open(index_file, "w") as f: - # default serialisation is str to serialise the pathlib.PosixPath - f.write(json.dumps(empty_config, sort_keys=True, indent=2, default=str)) - f.write("\n") - - # LOG: f"FileSyStemCache: available {index_file}" - - @property - def database(self): - return self._database - - def add_record(self, record: Record): - self._database.add_record(record) - - # The path where an image would be stored - # will return a path even for images that are not stored - def image_path(self, r: Record) -> str: - return self._path + "/images/" + r.sha256 - - # Return the full record for a given hash - # Returns None if no image with that hash is stored in the repo. - def get_record(self, sha256: str): - return self._database.get_record(sha256) - - def publish(self): - with open(self._index, "w") as f: - # default serialisation is str to serialise the pathlib.PosixPath - f.write(json.dumps(self._database.serialise(), sort_keys=True, indent=2, default=str)) - f.write("\n") - -# The https://cicd-ext-mw.cscs.ch/uenv/list API endpoint returns -# a list of images in the jfrog uenv. -# -#{ -# "results": -# [ -# { -# "repo" : "uenv", -# "path" : "build/clariden/zen3/prgenv-gnu/23.11/1094139948", -# "name" : "manifest.json", -# "created" : "2023-12-04T09:05:44.034Z", -# "size" : "123683707", -# "sha256" : "134c04d01bb3583726804a094b144d3637997877ef6162d1fe19eabff3c72c3a", -# "stats" : [{ -# "downloaded" : "2023-12-11T17:56:59.052Z", -# "downloads" : 11 -# }] -# }, -# ... -# ], -# "range" : -# { -# "start_pos" : 0, -# "end_pos" : 22, -# "total" : 22 -# } -#} -# - -def query_jfrog() -> tuple: - try: - # GET request to the middleware - url = "https://cicd-ext-mw.cscs.ch/uenv/list" - log(f"querying jfrog at {url}") - response = requests.get(url) - response.raise_for_status() - - raw_records = response.json() - - deploy_database = DataStore() - build_database = DataStore() - - for record in raw_records["results"]: - path = record["path"] - - date = to_datetime(record["created"]) - sha256 = record["sha256"] - size = record["size"] - if path.startswith("build/"): - r = Record.frompath(path[len("build/"):], date, size, sha256) - build_database.add_record(r) - if path.startswith("deploy/"): - r = Record.frompath(path[len("deploy/"):], date, size, sha256) - deploy_database.add_record(r) - - - return (deploy_database, build_database) - - except Exception as error: - raise RuntimeError("unable to access the JFrog uenv API.") - -def to_datetime(date: str): - # In Python 3.6, datetime.fromisoformat is not available. - # Manually parsing the string. - dt_format = '%Y-%m-%dT%H:%M:%S.%fZ' - return datetime.strptime(date, dt_format).replace(tzinfo=timezone.utc) - # return dictionary {"name", "version", "tag"} from a uenv description string # "prgenv_gnu" -> ("prgenv_gnu", None, None) # "prgenv_gnu/23.11" -> ("prgenv_gnu", "23.11", None) @@ -485,30 +123,13 @@ def parse_uenv_string(desc: str) -> dict: return {"name": name, "version": version, "tag": tag} -def run_oras_command(args): - try: - command = ['oras'] + args - - log(f"calling oras: {' '.join(command)}") - result = subprocess.run( - command, - stdout=subprocess.PIPE, # Capture standard output - stderr=subprocess.PIPE, # Capture standard error - check=True, # Raise exception if command fails - encoding='utf-8' # Decode output from bytes to string - ) - - # Print standard output - log("Output:\n{result.stdout}") - - except subprocess.CalledProcessError as e: - # Print error message along with captured standard error - log_error("An error occurred:\n", e.stderr) - # the path used to store a users cached images and meta data -def uenv_repo_path(): +def uenv_repo_path(path: str=None) -> str: + if path is not None: + return path + # check whether the image path has been explicitly set: - path = os.environ.get('UENV_IMAGE_PATH') + path = os.environ.get('UENV_REPO_PATH') if path is not None: return path @@ -517,7 +138,7 @@ def uenv_repo_path(): if path is not None: return path + "/.uenv-images" - return None + terminal.error("No repository path available: set UENV_REPO_PATH or use the --repo flag") # return the relative path of an image def record_path(record): @@ -531,54 +152,52 @@ def uenv_record_path(record): return store if __name__ == "__main__": + parser = make_argparser() args = parser.parse_args() if args.command is None: parser.print_help() - sys.exit() - - global colored_output - colored_output = use_colored_output(args.no_color) + sys.exit(0) - log(f"args.command: {args.command}") + terminal.info(f"command mode: {args.command}") if args.command in ["find", "pull"]: img_filter = get_filter(args) - log(f"filter for remote repo {img_filter}") - log(f"using {'build' if args.build else 'deploy'} remote repo") + terminal.info(f"filter for remote repo {img_filter}") + terminal.info(f"using {'build' if args.build else 'deploy'} remote repo") - deploy, build = query_jfrog() - log(f"downloaded jfrog meta data: build->{len(build.images)} images, deploy->{len(deploy.images)}") + try: + deploy, build = jfrog.query() + except RuntimeError as err: + terminal.error(f"{str(err)}") + terminal.info(f"downloaded jfrog meta data: build->{len(build.images)} images, deploy->{len(deploy.images)}") remote_database = build if args.build else deploy records = remote_database.find_records(**img_filter) if args.command == "find": - print_records(records) - - # verify that there is at least one image that matches the query - if len(records)==0: + if len(records)>0: + print_records(records) + else: print("no images match the query") - exit_with_success() - - if args.command == "pull": + elif args.command == "pull": # verify that there is at least one image that matches the query if len(records)==0: - log_error(f"no images match the query {args.uenv}") + terminal.error(f"no images match the query {args.uenv}") # check that there is only one uenv name if len(set([r.name for r in records]))>1: print_records(records) print() - log_error(f"ambiguous uenv {args.uenv}") + terminal.error(f"ambiguous uenv {args.uenv}") # check that there is only one uenv name if len(set([r.uarch for r in records]))>1: print_records(records) print() - log_error( + terminal.error( "more than one uarch matches the the requested uenv. " "Specify the desired uarch with the --uarch flag") @@ -586,46 +205,56 @@ if __name__ == "__main__": t = records[0] jfrog_address = f"{base}/{relative_jfrog_from_record(t)}" - log(f"pulling {t} from {jfrog_address} {t.size/(1024*1024):.0f} MB") + terminal.info(f"pulling {t} from {jfrog_address} {t.size/(1024*1024):.0f} MB") + + repo_path = uenv_repo_path(args.repo) + terminal.info(f"repo path: {repo_path}") - repo_path = uenv_repo_path() - with Lock(f"{repo_path}/index.json", LockType.WRITE) as lk: - cache = FileSystemCache(repo_path) + with flock.Lock(f"{repo_path}/index.json", flock.Lock.WRITE) as lk: + cache = datastore.FileSystemCache(repo_path) image_path = cache.image_path(t) # if the record isn't already in the filesystem repo download it if cache.get_record(t.sha256) is None: - log(f"downloading {t.sha256}") + terminal.info(f"downloading {t.sha256}") # download the image using oras - run_oras_command(["pull", "-o", image_path, jfrog_address]) + oras.run_command(["pull", "-o", image_path, jfrog_address]) # add the record to the cache - log(f"updating file system cache") + terminal.info(f"updating file system cache") cache.add_record(t) # publish the updated index - log(f"publishing file system cache") + terminal.info(f"publishing file system cache") cache.publish() else: - log(f"image {t.sha256} is already in the cache") - log(f"image downloaded at {image_path}/store.squashfs") + terminal.info(f"image {t.sha256} is already in the cache") + terminal.info(f"image downloaded at {image_path}/store.squashfs") - exit_with_success() + sys.exit(0) - if args.command == "list": - repo_path = uenv_repo_path() - log(f"using repo path {repo_path}") - img_filter = get_filter(args) + elif args.command == "list": + repo_path = uenv_repo_path(args.repo) + terminal.info(f"repo path: {repo_path}") - #try: - # FileSystemCache.create_if_missing(repo_path) - #except Exception as err: - # log_error(f"unable to find or initialise the local registry: {str(err)}") + img_filter = get_filter(args) - with Lock(f"{repo_path}/index.json", LockType.READ) as lk: - fscache = FileSystemCache(repo_path) + with flock.Lock(f"{repo_path}/index.json", flock.Lock.READ) as lk: + fscache = datastore.FileSystemCache(repo_path) records = fscache.database.find_records(**img_filter) print_records(records) - #for sha, image in fscache.database.images.items(): - # print(sha, image) + sys.exit(0) + + if args.command == "create": + repo_path = uenv_repo_path(args.repo) + terminal.info(f"repo path: {repo_path}") + + try: + datastore.FileSystemCache.create(repo_path, exists_ok=args.exists_ok) + except Exception as err: + terminal.error(f"unable to find or initialise the local registry: {str(err)}") + + sys.exit(0) + + diff --git a/lib/datastore.py b/lib/datastore.py new file mode 100644 index 0000000..9b04187 --- /dev/null +++ b/lib/datastore.py @@ -0,0 +1,130 @@ +import os + +import json +from record import Record +import terminal + +UENV_CLI_API_VERSION=1 + +class DataStore: + def __init__(self): + # all images store with (key,value) = (sha256,Record) + self._images = {} + + self._store = {"system": {}, "uarch": {}, "name": {}, "version": {}, "tag": {}} + + def add_record(self, r: Record, overwrite: bool = False): + # test for collisions + if (not overwrite) and (self._images.get(r.sha256, None) is not None): + raise ValueError(f"an image with the hash {r.sha256} already exists") + + sha = r.sha256 + self._images[sha] = r + self._store["system"] .setdefault(r.system, []).append(sha) + self._store["uarch"] .setdefault(r.uarch, []).append(sha) + self._store["name"] .setdefault(r.name, []).append(sha) + self._store["version"].setdefault(r.version, []).append(sha) + self._store["tag"] .setdefault(r.tag, []).append(sha) + + def find_records(self, **constraints): + if not constraints: + raise ValueError("At least one constraint must be provided") + + for field in constraints: + if field not in self._store: + raise ValueError(f"Invalid field: {field}. Must be one of 'system', 'uarch', 'name', 'version', 'tag'") + + # Find matching records for each constraint + matching_records_sets = [ + set(self._store[field].get(value, [])) for field, value in constraints.items() + ] + + # Intersect all sets of matching records + if matching_records_sets: + unique = set.intersection(*matching_records_sets) + else: + unique = set() + + results = [self._images[sha] for sha in unique] + results.sort(reverse=True) + return results + + @property + def images(self): + return self._images + + def get_record(self, sha256: str) -> Record: + return self._images.get(sha256, None) + + # Convert to a dictionary that can be written to file as JSON + # The serialisation and deserialisation are central: able to represent + # uenv that are available in both JFrog and filesystem directory tree. + def serialise(self, version: int=UENV_CLI_API_VERSION): + return { + "API_VERSION": version, + "images": [img.dictionary for img in self._images.values()] + } + + # Convert to a dictionary that can be written to file as JSON + # The serialisation and deserialisation are central: able to represent + # uenv that are available in both JFrog and filesystem directory tree. + @classmethod + def deserialise(cls, datastore): + result = cls() + for img in datastore["images"]: + result.add_record(Record.from_dictionary(img)) + +class FileSystemCache(): + def __init__(self, path: str): + self._path = path + self._index = path + "/index.json" + + if not os.path.exists(self._index): + # error: cache does not exists + raise FileNotFoundError(f"filesystem cache not found {self._path}") + + with open(self._index, "r") as fid: + raw = json.loads(fid.read()) + self._database = DataStore() + for img in raw["images"]: + self._database.add_record(Record.fromjson(img)) + + @staticmethod + def create(path: str, exists_ok: bool=False): + if not os.path.exists(path): + terminal.info(f"FileSyStemCache: creating path {path}") + os.makedirs(path) + index_file = f"{path}/index.json" + if not os.path.exists(index_file): + terminal.info(f"FileSyStemCache: creating empty index {index_file}") + empty_config = { "API_VERSION": UENV_CLI_API_VERSION, "images": [] } + with open(index_file, "w") as f: + # default serialisation is str to serialise the pathlib.PosixPath + f.write(json.dumps(empty_config, sort_keys=True, indent=2, default=str)) + f.write("\n") + + terminal.info(f"FileSyStemCache: available {index_file}") + + @property + def database(self): + return self._database + + def add_record(self, record: Record): + self._database.add_record(record) + + # The path where an image would be stored + # will return a path even for images that are not stored + def image_path(self, r: Record) -> str: + return self._path + "/images/" + r.sha256 + + # Return the full record for a given hash + # Returns None if no image with that hash is stored in the repo. + def get_record(self, sha256: str): + return self._database.get_record(sha256) + + def publish(self): + with open(self._index, "w") as f: + # default serialisation is str to serialise the pathlib.PosixPath + f.write(json.dumps(self._database.serialise(), sort_keys=True, indent=2, default=str)) + f.write("\n") + diff --git a/lib/flock.py b/lib/flock.py new file mode 100644 index 0000000..7827999 --- /dev/null +++ b/lib/flock.py @@ -0,0 +1,36 @@ +import fcntl +import time + +import terminal + +class Lock(): + READ = 1 + WRITE = 2 + def __init__(self, path: str, type: int): + self._lockfile = f"{path}.lock" + + # open the file + self._lock = open(self._lockfile, "a") + + self._time = time.time() + + # acquire lock + self._type = type + if self._type==Lock.READ: + # acquire shared lock + fcntl.flock(self._lock, fcntl.LOCK_SH) + else: + # acquire exclusive lock + fcntl.flock(self._lock, fcntl.LOCK_EX) + + terminal.info(f"aquired lock {self._lockfile} at {self._time}") + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_traceback): + fcntl.flock(self._lock, fcntl.LOCK_UN) # Release the lock + self._lock.close() + endtime = time.time() + terminal.info(f"released lock {self._lockfile} at {endtime}, held for {(endtime - self._time)*1000:.2f} ms") + diff --git a/lib/jfrog.py b/lib/jfrog.py new file mode 100644 index 0000000..7de40a4 --- /dev/null +++ b/lib/jfrog.py @@ -0,0 +1,75 @@ +from datetime import datetime, timezone +import requests + +from datastore import DataStore +from record import Record +import terminal + + +def to_datetime(date: str): + # In Python 3.6, datetime.fromisoformat is not available. + # Manually parsing the string. + dt_format = '%Y-%m-%dT%H:%M:%S.%fZ' + return datetime.strptime(date, dt_format).replace(tzinfo=timezone.utc) + +# The https://cicd-ext-mw.cscs.ch/uenv/list API endpoint returns +# a list of images in the jfrog uenv. +# +#{ +# "results": +# [ +# { +# "repo" : "uenv", +# "path" : "build/clariden/zen3/prgenv-gnu/23.11/1094139948", +# "name" : "manifest.json", +# "created" : "2023-12-04T09:05:44.034Z", +# "size" : "123683707", +# "sha256" : "134c04d01bb3583726804a094b144d3637997877ef6162d1fe19eabff3c72c3a", +# "stats" : [{ +# "downloaded" : "2023-12-11T17:56:59.052Z", +# "downloads" : 11 +# }] +# }, +# ... +# ], +# "range" : +# { +# "start_pos" : 0, +# "end_pos" : 22, +# "total" : 22 +# } +#} +# + +def query() -> tuple: + try: + # GET request to the middleware + url = "https://cicd-ext-mw.cscs.ch/uenv/list" + terminal.info(f"querying jfrog at {url}") + response = requests.get(url) + response.raise_for_status() + + raw_records = response.json() + + deploy_database = DataStore() + build_database = DataStore() + + for record in raw_records["results"]: + path = record["path"] + + date = to_datetime(record["created"]) + sha256 = record["sha256"] + size = record["size"] + if path.startswith("build/"): + r = Record.frompath(path[len("build/"):], date, size, sha256) + build_database.add_record(r) + if path.startswith("deploy/"): + r = Record.frompath(path[len("deploy/"):], date, size, sha256) + deploy_database.add_record(r) + + return (deploy_database, build_database) + + except Exception as error: + raise RuntimeError("unable to access the JFrog uenv API.") + + diff --git a/lib/oras.py b/lib/oras.py new file mode 100644 index 0000000..716c650 --- /dev/null +++ b/lib/oras.py @@ -0,0 +1,24 @@ +import subprocess + +import terminal + +def run_command(args): + try: + command = ['oras'] + args + + terminal.info(f"calling oras: {' '.join(command)}") + result = subprocess.run( + command, + stdout=subprocess.PIPE, # Capture standard output + stderr=subprocess.PIPE, # Capture standard error + check=True, # Raise exception if command fails + encoding='utf-8' # Decode output from bytes to string + ) + + # Print standard output + terminal.info("Output:\n{result.stdout}") + + except subprocess.CalledProcessError as e: + # Print error message along with captured standard error + terminal.error("An error occurred:\n", e.stderr) + diff --git a/lib/record.py b/lib/record.py new file mode 100644 index 0000000..edde10b --- /dev/null +++ b/lib/record.py @@ -0,0 +1,112 @@ +class Record: + + def __init__(self, system: str, uarch: str, name: str, version: str, tag: str, date: str, size_bytes: int, sha256: str): + self._system = system + self._uarch = uarch + self._name = name + self._version = version + self._tag = tag + self._date = date + self._bytes = size_bytes + self._sha256 = sha256 + + # build/eiger/zen2/cp2k/2023/1133706947 + @classmethod + def frompath(cls, path: str, date: str, size_bytes: int, sha256: str): + fields = path.split("/") + if len(fields) != 5: + raise ValueError("Record must have exactly 5 fields") + + system, uarch, name, version, tag = fields + return cls(system, uarch, name, version, tag, date, size_bytes, sha256) + + @classmethod + def fromjson(cls, raw: dict): + system = raw["system"] + uarch = raw["uarch"] + name = raw["name"] + version = raw["version"] + tag = raw["tag"] + date = to_datetime(raw["date"]) + size_bytes = raw["size"] + sha256 = raw["sha256"] + + return cls(system, uarch, name, version, tag, date, size_bytes, sha256) + + def __eq__(self, other): + if not isinstance(other, Record): + return False + return self.sha256==other.sha256 + + def __lt__(self, other): + if self.system < other.system: return True + if other.system < self.system: return False + if self.uarch < other.uarch: return True + if other.uarch < self.uarch: return False + if self.name < other.name: return True + if other.name < self.name: return False + if self.version < other.version: return True + if other.version< self.version: return False + if self.tag < other.tag: return True + #if other.tag < self.tag: return False + return False + + def __str__(self): + return f"{self.name}/{self.version}:{self.tag} @ {self.system}:{self.uarch}" + + def __repr__(self): + return f"Record({self.system}, {self.uarch}, {self.name}, {self.version}, {self.tag})" + + @property + def system(self): + return self._system + + @property + def uarch(self): + return self._uarch + + @property + def name(self): + return self._name + + @property + def date(self): + return self._date + + @property + def version(self): + return self._version + + @property + def tag(self): + return self._tag + + @property + def sha256(self): + return self._sha256 + + @property + def size(self): + return self._bytes + + @property + def datestring(self): + return self.date.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z' + + @property + def path(self): + return f"{self.system}/{self.uarch}/{self.name}{self.version}/{self.tag}" + + @property + def dictionary(self): + return { + "system": self.system, + "uarch": self.uarch, + "name": self.name, + "date": self.datestring, + "version": self.version, + "tag": self.tag, + "sha256": self.sha256, + "size": self.size + } + diff --git a/lib/terminal.py b/lib/terminal.py new file mode 100644 index 0000000..aba3985 --- /dev/null +++ b/lib/terminal.py @@ -0,0 +1,51 @@ +import sys + +colored_output = True + +# Choose whether to use colored output. +# - by default colored output is ON +# - if the flag --no-color is passed it is OFF +# - if the environment variable NO_COLOR is set it is OFF +def use_colored_output(cli_arg): + colored_output = True + + # The --no-color argument overrides all environment variables if passed. + if cli_arg: + colored_output = False + return + + # Check the env. var NO_COLOR and disable color if set. + if os.environ.get('NO_COLOR') is not None: + color_var = os.environ.get('NO_COLOR') + if len(color_var)>0 and color_var != "0": + colored_output = False + return + + colored_output = True + +def colorize(string, color): + colors = { + "red": "31", + "green": "32", + "yellow": "33", + "blue": "34", + "magenta": "35", + "cyan": "36", + "white": "37", + } + if colored_output: + return f"\033[1;{colors[color]}m{string}\033[0m" + else: + return string + +def error(message): + print(f"{colorize('[error]', 'red')} {message}", file=sys.stderr) + exit(1) + +def exit_with_success(): + exit(0) + +def info(message): + print(f"{colorize('[log]', 'yellow')} {message}", file=sys.stderr) + +