Skip to content

Commit

Permalink
listing of pulled images works
Browse files Browse the repository at this point in the history
  • Loading branch information
bcumming committed Feb 16, 2024
1 parent 5aa2020 commit 929347a
Showing 1 changed file with 82 additions and 67 deletions.
149 changes: 82 additions & 67 deletions img
Original file line number Diff line number Diff line change
Expand Up @@ -45,37 +45,39 @@ def colorize(string, color):
else:
return string

def print_error_and_exit(message):
print(f"{colorize('error', 'red')} {message}", file=sys.stderr)
def log_error(message):
print(f"{colorize('[error]', 'red')} {message}", file=sys.stderr)
exit(1)

def exit_with_success():
exit(0)

def log(message):
print(f"{colorize('[log]', 'yellow')} {message}", file=sys.stderr)

def make_argparser():
parser = argparse.ArgumentParser(description=("Interact with the uenv artifactory"))
parser.add_argument("--no-color", action="store_true",
help="disable color output")
parser.add_argument("--build", action="store_true",
help="enable undeployed builds",
required=False)

subparsers = parser.add_subparsers(dest="command")

find_parser = subparsers.add_parser("find", help="find uenv in the CSCS registry")
find_parser.add_argument("-s", "--system", required=False, type=str)
find_parser.add_argument("-a", "--uarch", required=False, type=str)
find_parser.add_argument("--build", action="store_true", help="enable undeployed builds", required=False)
find_parser.add_argument("uenv", nargs="?", default=None, type=str)

pull_parser = subparsers.add_parser("pull", help="pull a uenv from the CSCS registry")
pull_parser.add_argument("-s", "--system", required=False, type=str)
pull_parser.add_argument("-a", "--uarch", required=False, type=str)
pull_parser.add_argument("--build", action="store_true", help="enable undeployed builds", required=False)
pull_parser.add_argument("uenv", nargs="?", default=None, type=str)

pull_parser = subparsers.add_parser("list", help="list cached images")
pull_parser.add_argument("-s", "--system", required=False, type=str)
pull_parser.add_argument("-a", "--uarch", required=False, type=str)
pull_parser.add_argument("uenv", nargs="?", default=None, type=str)
list_parser = subparsers.add_parser("list", help="list cached images")
list_parser.add_argument("-s", "--system", required=False, type=str)
list_parser.add_argument("-a", "--uarch", required=False, type=str)
list_parser.add_argument("uenv", nargs="?", default=None, type=str)

return parser

Expand All @@ -92,7 +94,6 @@ def get_options(args):
options["name"] = args.uenv
options["uarch"] = args.uarch

options["repo"] = "deploy" if not args.build else "build"
return options


Expand All @@ -106,9 +107,8 @@ def get_filter(args):

if options["uarch"] is not None:
img_filter["uarch"] = options["uarch"]
repo = options["repo"]

return repo, img_filter
return img_filter

class LockType(Enum):
READ = 1
Expand All @@ -130,12 +130,15 @@ class Lock():
# acquire exclusive lock
fcntl.flock(self._lock, fcntl.LOCK_EX)

log(f"aquired lock {self._lockfile}")

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, exc_traceback):
fcntl.flock(self._lock, fcntl.LOCK_UN) # Release the lock
self._lock.close()
log(f"released lock {self._lockfile}")


class Record:
Expand All @@ -162,14 +165,13 @@ class Record:

@classmethod
def fromjson(cls, raw: dict):
print(raw)
system = raw["system"]
uarch = raw["uarch"]
name = raw["name"]
version = raw["version"]
tag = raw["tag"]
date = raw["date"]
size_bytes = raw["size_bytes"]
date = to_datetime(raw["date"])
size_bytes = raw["size"]
sha256 = raw["sha256"]

return cls(system, uarch, name, version, tag, date, size_bytes, sha256)
Expand Down Expand Up @@ -280,34 +282,34 @@ def print_records(records):
class DataStore:
def __init__(self):
# all images store with (key,value) = (sha256,Record)
self.images = {}
self._images = {}

self.store = {"system": {}, "uarch": {}, "name": {}, "version": {}, "tag": {}}
self._store = {"system": {}, "uarch": {}, "name": {}, "version": {}, "tag": {}}

def add_record(self, r: Record, overwrite: bool = False):
# test for collisions
if (not overwrite) and (self.images.get(r.sha256, None) is not None):
if (not overwrite) and (self._images.get(r.sha256, None) is not None):
raise ValueError(f"an image with the hash {r.sha256} already exists")

sha = r.sha256
self.images[sha] = r
self.store["system"] .setdefault(r.system, []).append(sha)
self.store["uarch"] .setdefault(r.uarch, []).append(sha)
self.store["name"] .setdefault(r.name, []).append(sha)
self.store["version"].setdefault(r.version, []).append(sha)
self.store["tag"] .setdefault(r.tag, []).append(sha)
self._images[sha] = r
self._store["system"] .setdefault(r.system, []).append(sha)
self._store["uarch"] .setdefault(r.uarch, []).append(sha)
self._store["name"] .setdefault(r.name, []).append(sha)
self._store["version"].setdefault(r.version, []).append(sha)
self._store["tag"] .setdefault(r.tag, []).append(sha)

def find_records(self, **constraints):
if not constraints:
raise ValueError("At least one constraint must be provided")

for field in constraints:
if field not in self.store:
if field not in self._store:
raise ValueError(f"Invalid field: {field}. Must be one of 'system', 'uarch', 'name', 'version', 'tag'")

# Find matching records for each constraint
matching_records_sets = [
set(self.store[field].get(value, [])) for field, value in constraints.items()
set(self._store[field].get(value, [])) for field, value in constraints.items()
]

# Intersect all sets of matching records
Expand All @@ -316,20 +318,24 @@ class DataStore:
else:
unique = set()

results = [self.images[sha] for sha in unique]
results = [self._images[sha] for sha in unique]
results.sort(reverse=True)
return results

@property
def images(self):
return self._images

def get_record(self, sha256: str) -> Record:
return self.images.get(sha256, None)
return self._images.get(sha256, None)

# Convert to a dictionary that can be written to file as JSON
# The serialisation and deserialisation are central: able to represent
# uenv that are available in both JFrog and filesystem directory tree.
def serialise(self, version: int=UENV_CLI_API_VERSION):
return {
"API_VERSION": version,
"images": [img.dictionary for img in self.images.values()]
"images": [img.dictionary for img in self._images.values()]
}

# Convert to a dictionary that can be written to file as JSON
Expand All @@ -354,7 +360,7 @@ class FileSystemCache():
raw = json.loads(fid.read())
self._database = DataStore()
for img in raw["images"]:
self._database.add_record(Record(img))
self._database.add_record(Record.fromjson(img))

@staticmethod
def create_if_missing(path: str):
Expand Down Expand Up @@ -382,7 +388,7 @@ class FileSystemCache():
# The path where an image would be stored
# will return a path even for images that are not stored
def image_path(self, r: Record) -> str:
return self._path + "/images/" + r.path
return self._path + "/images/" + r.sha256

# Return the full record for a given hash
# Returns None if no image with that hash is stored in the repo.
Expand Down Expand Up @@ -428,6 +434,7 @@ def query_jfrog() -> tuple:
try:
# GET request to the middleware
url = "https://cicd-ext-mw.cscs.ch/uenv/list"
log(f"querying jfrog at {url}")
response = requests.get(url)
response.raise_for_status()

Expand Down Expand Up @@ -482,8 +489,7 @@ def run_oras_command(args):
try:
command = ['oras'] + args

#print(f"{colorize('running oras', 'yellow')}: {' '.join(command)}")

log(f"calling oras: {' '.join(command)}")
result = subprocess.run(
command,
stdout=subprocess.PIPE, # Capture standard output
Expand All @@ -493,11 +499,11 @@ def run_oras_command(args):
)

# Print standard output
print("Output:\n", result.stdout)
log("Output:\n{result.stdout}")

except subprocess.CalledProcessError as e:
# Print error message along with captured standard error
print("An error occurred:\n", e.stderr)
log_error("An error occurred:\n", e.stderr)

# the path used to store a users cached images and meta data
def uenv_repo_path():
Expand All @@ -509,7 +515,7 @@ def uenv_repo_path():
# if not, try to use the path $SCRATCH/.uenv-images/, if SCRATCH exists
path = os.environ.get('SCRATCH')
if path is not None:
return path
return path + "/.uenv-images"

return None

Expand All @@ -534,14 +540,19 @@ if __name__ == "__main__":
global colored_output
colored_output = use_colored_output(args.no_color)

log(f"args.command: {args.command}")

if args.command in ["find", "pull"]:
img_filter = get_filter(args)
log(f"filter for remote repo {img_filter}")
log(f"using {'build' if args.build else 'deploy'} remote repo")

deploy, build = query_jfrog()
#print(deploy.serialise())
#print(build.serialise())
database = {"build": build, "deploy": deploy}
log(f"downloaded jfrog meta data: build->{len(build.images)} images, deploy->{len(deploy.images)}")

remote_database = build if args.build else deploy

repo, img_filter = get_filter(args)
records = database[repo].find_records(**img_filter)
records = remote_database.find_records(**img_filter)

if args.command == "find":
print_records(records)
Expand All @@ -555,62 +566,66 @@ if __name__ == "__main__":
if args.command == "pull":
# verify that there is at least one image that matches the query
if len(records)==0:
print_error_and_exit(f"no images match the query {args.uenv}")
log_error(f"no images match the query {args.uenv}")

# check that there is only one uenv name
if len(set([r.name for r in records]))>1:
print_records(records)
print()
print_error_and_exit(f"ambiguous uenv {args.uenv}")
log_error(f"ambiguous uenv {args.uenv}")

# check that there is only one uenv name
if len(set([r.uarch for r in records]))>1:
print_records(records)
print()
print_error_and_exit(
log_error(
"more than one uarch matches the the requested uenv. "
"Specify the desired uarch with the --uarch flag")

base = f"jfrog.svc.cscs.ch/uenv/{repo}"
base = f"jfrog.svc.cscs.ch/uenv/{'build' if args.build else 'deploy'}"
t = records[0]
jfrog_address = f"{base}/{relative_jfrog_from_record(t)}"

print(f"{t} from {jfrog_address} {t.size/(1024*1024):.0f} MB")
log(f"pulling {t} from {jfrog_address} {t.size/(1024*1024):.0f} MB")

repo_path = uenv_repo_path()
with Lock(f"{repo_path}/index.json", LockType.READ) as lk:
with Lock(f"{repo_path}/index.json", LockType.WRITE) as lk:
cache = FileSystemCache(repo_path)
print(cache.database)
print(f"{t.dictionary}")

base_path = cache.image_path(t)
image_path = base_path + "/store.squashfs"
print(f"{base_path}")
print(f"{image_path}")
image_path = cache.image_path(t)

# if the record isn't already in the filesystem repo download it
if cache.get_record(t.sha256) is None:
print(f" downloading {t.sha256}")
run_oras_command(["pull", "-o", base_path, jfrog_address])
print(f" ... available at {path}/store.squashfs")

#else:
#print_error_and_exit("set UENV_IMAGE_PATH to specify where uenv images should be stored")
log(f"downloading {t.sha256}")
# download the image using oras
run_oras_command(["pull", "-o", image_path, jfrog_address])
# add the record to the cache
log(f"updating file system cache")
cache.add_record(t)
# publish the updated index
log(f"publishing file system cache")
cache.publish()
else:
log(f"image {t.sha256} is already in the cache")
log(f"image downloaded at {image_path}/store.squashfs")

exit_with_success()

if args.command == "list":
repo_path = uenv_repo_path()
print(f"repo {colorize(repo_path, 'yellow')}")
#cache = FileSystemCache(repo_path, create=True)
#print(cache.database)
log(f"using repo path {repo_path}")
img_filter = get_filter(args)

try:
FileSystemCache.create_if_missing(repo_path)
except Exception as err:
print_error_and_exit(f"unable to find or initialise the local registry: {str(err)}")
#try:
# FileSystemCache.create_if_missing(repo_path)
#except Exception as err:
# log_error(f"unable to find or initialise the local registry: {str(err)}")

with Lock(f"{repo_path}/index.json", LockType.READ) as lk:
fscache = FileSystemCache(repo_path)
print(fscache.database)

records = fscache.database.find_records(**img_filter)
print_records(records)

#for sha, image in fscache.database.images.items():
# print(sha, image)

0 comments on commit 929347a

Please sign in to comment.