diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..e330963 --- /dev/null +++ b/.env.example @@ -0,0 +1,5 @@ +# Copy this file to .env and replace the values with your own +S3_TEMP_ACCESS_KEY=your_access_key +S3_TEMP_SECRET_KEY=your_secret_key +S3_PERM_ACCESS_KEY=your_access_key +S3_PERM_SECRET_KEY=your_secret_key diff --git a/.gitignore b/.gitignore index 68bc17f..dfc8d5a 100644 --- a/.gitignore +++ b/.gitignore @@ -158,3 +158,5 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ + +/tmp \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 1cb37a2..9899461 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,31 +1,14 @@ FROM python:3.11-bookworm +# Install dependencies +COPY requirements.txt /tmp/requirements.txt +RUN pip install -r /tmp/requirements.txt --break-system-packages + # Copy files into container WORKDIR /app -COPY /src/main.py /app -COPY /src/run.sh /app -COPY requirements.txt /app - -# Copy the private SSH key from github provisioning to the container -RUN mkdir /root/.ssh/ - -# RUN echo -e "${SSH_DEPLOY_KEY}" > /root/.ssh/id_rsa -# RUN cat /root/.ssh/id_rsa -# COPY id_rsa /root/.ssh/id_rsa - -# Set permissions for the SSH key -# RUN chmod 600 /root/.ssh/id_rsa - -# Add the Git host to the list of known hosts -# RUN ssh-keyscan -t rsa github.com >> /root/.ssh/known_hosts - -# Clone the repository using the deploy key -# RUN git clone git@github.com:WATonomous/infra-config.git +COPY /src /app -# Install s3cmd -# RUN pip install s3cmd +# Add github.com to known hosts +RUN mkdir /root/.ssh/ && ssh-keyscan -t rsa github.com >> /root/.ssh/known_hosts -# Run asset-management script -RUN chmod +x run.sh -CMD ["./run.sh"] -# CMD ["echo $PATH"] +CMD ["python", "agent.py", "run-agent"] \ No newline at end of file diff --git a/README.md b/README.md index 5889269..d23983f 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,27 @@ -# asset-management -Asset management for WATcloud +# WATcloud Asset Management System + +This repo contains the asset management system for WATcloud. +Currently, only the agent implementation is in this repo. +Additional components, including the SDK, the S3 bucket configuration, and deployment code, reside in the internal monorepo [infra-config](https://github.com/WATonomous/infra-config). + +## Useful Links + +- [Asset Manager Frontend](https://cloud.watonomous.ca/docs/utilities/assets) + +## Getting Started (Agent Development) + +Copy the `.env.example` file to `.env` and fill in the necessary information. + +Create `./tmp/deploy-keys` directory and place the required deploy keys in the directory. The list of deploy keys can be configured in `docker-compose.yml`. + +Run the following commands to start the development environment: + +```bash +docker compose up -d --build +``` + +Enter the container: + +```bash +docker compose exec agent bash +``` diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..017cc0d --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,36 @@ +# This file is used to assist development + +services: + agent: + build: . + entrypoint: [ "sleep", "infinity" ] + init: true + volumes: + - ./src:/app + - ./tmp/deploy-keys:/deploy-keys:ro + environment: + - | + AGENT_CONFIG={ + "repos": { + "git@github.com:WATonomous/infra-config.git": {"deploy_key_path": "/deploy-keys/infra-config"} + }, + "buckets": { + "temp": { + "endpoint": "https://rgw.watonomous.ca", + "bucket": "asset-temp", + "access_key_env_var": "S3_TEMP_ACCESS_KEY", + "secret_key_env_var": "S3_TEMP_SECRET_KEY" + }, + "perm": { + "endpoint": "https://rgw.watonomous.ca", + "bucket": "asset-perm", + "access_key_env_var": "S3_PERM_ACCESS_KEY", + "secret_key_env_var": "S3_PERM_SECRET_KEY" + } + } + } + # These can be set in the .env file + - S3_TEMP_ACCESS_KEY=${S3_TEMP_ACCESS_KEY:?} + - S3_TEMP_SECRET_KEY=${S3_TEMP_SECRET_KEY:?} + - S3_PERM_ACCESS_KEY=${S3_PERM_ACCESS_KEY:?} + - S3_PERM_SECRET_KEY=${S3_PERM_SECRET_KEY:?} diff --git a/requirements.txt b/requirements.txt index 1db657b..1e9eb6e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,4 @@ -boto3 \ No newline at end of file +boto3>=1.34.136,<2 +GitPython>=3.1.43,<4 +typer>=0.12.3,<1 +requests>=2.32.3,<3 \ No newline at end of file diff --git a/src/agent.py b/src/agent.py new file mode 100644 index 0000000..c2f1a1c --- /dev/null +++ b/src/agent.py @@ -0,0 +1,103 @@ +import logging +import os +from hashlib import sha256 +from tempfile import TemporaryDirectory + +from utils import app, clone_repos, flatten, get_watcloud_uris, get_bucket + + +@app.command() +def run_agent(): + logging.info("Starting agent") + + logging.info("Cloning repos") + repos = list(clone_repos()) + + logging.info("Extracting WATcloud URIs") + watcloud_uris = list( + # sorting to ensure consistent order for testing + sorted(flatten([get_watcloud_uris(repo.working_dir) for repo in repos])) + ) + + logging.info(f"Found {len(watcloud_uris)} WATcloud URIs:") + for uri in watcloud_uris: + logging.info(uri) + + desired_perm_objects = set(uri.sha256 for uri in watcloud_uris) + + temp_bucket = get_bucket("temp") + perm_bucket = get_bucket("perm") + + temp_objects = set(obj.key for obj in temp_bucket.objects.all()) + perm_objects = set(obj.key for obj in perm_bucket.objects.all()) + + logging.info(f"Found {len(temp_objects)} objects in temp bucket") + logging.info(f"Found {len(perm_objects)} objects in perm bucket") + + errors = [] + + if not desired_perm_objects.issubset(temp_objects | perm_objects): + errors.append( + ValueError( + f"Cannot find the following objects in any bucket: {desired_perm_objects - temp_objects - perm_objects}" + ) + ) + + # Objects that need to be copied to perm bucket + to_perm = desired_perm_objects - perm_objects + # Objects that need to be copied from temp bucket to perm bucket + temp_to_perm = to_perm & temp_objects + # Objects that need to be deleted from perm bucket + perm_to_temp = perm_objects - desired_perm_objects + # Objects that need to be deleted from the temp bucket (already exists in another bucket) + delete_from_temp = desired_perm_objects & temp_objects - temp_to_perm + + logging.info( + f"{len(desired_perm_objects&perm_objects)}/{len(desired_perm_objects)} objects are already in the perm bucket" + ) + logging.info(f"Copying {len(temp_to_perm)} object(s) from temp to perm bucket:") + for obj_key in temp_to_perm: + logging.info(obj_key) + logging.info(f"Copying {len(perm_to_temp)} object(s) from perm to temp bucket:") + for obj_key in perm_to_temp: + logging.info(obj_key) + logging.info(f"Deleting {len(delete_from_temp)} redundant object(s) from temp bucket:") + for obj_key in delete_from_temp: + logging.info(obj_key) + + with TemporaryDirectory() as temp_dir: + for obj_key in temp_to_perm: + temp_bucket.download_file(obj_key, os.path.join(temp_dir, obj_key)) + # Verify checksum + with open(os.path.join(temp_dir, obj_key), "rb") as f: + checksum = sha256(f.read()).hexdigest() + if checksum != obj_key: + errors.append( + ValueError( + f"Checksum mismatch for object {obj_key} in temp bucket! Not uploading to perm bucket." + ) + ) + continue + + perm_bucket.upload_file(os.path.join(temp_dir, obj_key), obj_key) + temp_bucket.delete_objects(Delete={"Objects": [{"Key": obj_key}]}) + + for obj_key in perm_to_temp: + perm_bucket.download_file(obj_key, os.path.join(temp_dir, obj_key)) + temp_bucket.upload_file(os.path.join(temp_dir, obj_key), obj_key) + perm_bucket.delete_objects(Delete={"Objects": [{"Key": obj_key}]}) + + for obj_key in delete_from_temp: + temp_bucket.delete_objects(Delete={"Objects": [{"Key": obj_key}]}) + + if errors: + logging.error("Encountered the following errors during execution:") + for error in errors: + logging.error(error) + raise ValueError("Encountered errors during agent execution.") + + logging.info("Agent execution complete") + + +if __name__ == "__main__": + app() diff --git a/src/main.py b/src/main.py deleted file mode 100644 index 7909523..0000000 --- a/src/main.py +++ /dev/null @@ -1,197 +0,0 @@ -import os -import subprocess - -import boto3 - -FILE_PATH = os.path.dirname(os.path.abspath(__file__)) -S3CFG_PERM_PATH = FILE_PATH + "/s3cfg_perm" -S3CFG_TEMP_PATH = FILE_PATH + "/s3cfg_temp" -S3_TEMP = "s3://asset-temp" -S3_PERM = "s3://asset-perm" -TEMP_ASSET_DIR = "temp" - -bucket_map = { - S3_TEMP: S3CFG_TEMP_PATH, - S3_PERM: S3CFG_PERM_PATH -} - -# Setup boto3 connection clients -host_base = "https://rgw.watonomous.ca" - -access_temp = os.getenv("ACCESS_TEMP") -secret_temp = os.getenv("SECRET_TEMP") -client_temp = boto3.resource( - 's3', - endpoint_url=host_base, - aws_access_key_id=access_temp, - aws_secret_access_key=secret_temp, -) -bucket_temp = None -for bucket in client_temp.buckets.all(): - bucket_temp = bucket - -access_perm = os.getenv("ACCESS_PERM") -secret_perm = os.getenv("SECRET_PERM") -client_perm = boto3.resource( - 's3', - endpoint_url=host_base, - aws_access_key_id=access_perm, - aws_secret_access_key=secret_perm, -) -bucket_perm = None -for bucket in client_perm.buckets.all(): - bucket_perm = bucket - -# Helper to use s3cmd -def run_command(cmd): - # Convert cmd string into args form - cmd_args = [] - for word in cmd.split(): - cmd_args.append(word) - - try: - # print(f'Command {cmd_args}:') - result = subprocess.run(cmd_args, check=True, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - cwd="infra-config") - return result.stdout - - except subprocess.CalledProcessError as e: - print("Error executing command:", e) - print("Command Output:", e.stdout) - print("Command Error:", e.stderr) - -def get_assets_in_bucket(bucket): - assets = [] - for obj in bucket.objects.all(): - assets.append(obj.key) - - return assets - -def get_assets_in_repo(): - """ - Returns a list of URI hashes in repo. - - URI hashes in repo has the form "watcloud://v1//sha256:...?name=filename.extension", - return just the "..." - """ - raw_output = run_command(f'git grep watcloud://v1/sha256:') - - print("uris:") - print(raw_output) - - # filename -> filepath relative to website/ - uris = [] - if not raw_output: - print("No matching URI found") - return None - - for output in raw_output.split("\n"): - start = output.rfind("watcloud://v1/sha256:") + 21 # watcloud://v1/sha256 is 20 chars - end = start + 64 # sha256 has 256 bits = 64 characters - if len(output[start:end]) == 64: - uris.append(output[start:end]) - - - return uris - -def compare_s3_to_repo(): - """ - Returns 2 lists of filenames for what to remove from s3 and what to upload to s3 - """ - temp_s3_uris = get_assets_in_bucket(bucket_temp) - - perm_s3_uris = get_assets_in_bucket(bucket_perm) - - repo_uris = get_assets_in_repo() - - # Lists containing uris - to_perm = [] - to_temp = [] - - # No changes just exit - if repo_uris == None: - return [], [] - - # Files to add to perm storage - for repo_uri in repo_uris: - if (repo_uri not in perm_s3_uris) and (repo_uri in temp_s3_uris): - to_perm.append(repo_uri) - - # Files to remove from perm storage - for perm_uri in perm_s3_uris: - if perm_uri not in repo_uris: - to_temp.append(perm_uri) - - return to_perm, to_temp - -# def upload_file(filepath, bucket_uri): -# run_command(f's3cmd --config={bucket_map[bucket_uri]} put {filepath} {bucket_uri}') -# print("Uploaded file") - -def upload_file(filepath, bucket): - bucket.upload_file(filepath, os.path.basename(filepath)) - print(f"Uploaded {filepath} to {bucket}") - -def download_file(filename, folder_path, bucket): - bucket.download_file(filename, folder_path) - print(f"Downloaded {filename} to {folder_path}") - -def delete_file(filename, bucket): - objs = bucket_perm.objects.all() - for obj in objs: - print(obj) - bucket.Object(filename).delete() - print(f"Deleted {filename} from {bucket}") - objs = bucket_perm.objects.all() - for obj in objs: - print(obj) - -def transfer_file(filename, from_bucket, to_bucket): - # Download file - download_file(filename, TEMP_ASSET_DIR + "/" + filename, from_bucket) - - # Delete file in the old bucket - # run_command(f's3cmd --config={bucket_map[from_bucket]} del {from_bucket}/{filename}') - delete_file(filename, from_bucket) - - # Upload file to new bucket - upload_file(f'{tmp_dir}/{filename}', to_bucket) - - # Delete local temp file - # os.remove(f'{tmp_dir}/{filename}') - - print("File Transfer Done!") - -def manage_assets(): - """ - Scans repo for asset URIs and checks temp and perm s3 buckets for them. - Asset in repo & asset in temp & asset not in perm: move asset from temp -> perm - Asset not repo & asset in perm: move asset perm -> temp - """ - - move_to_perm, move_to_temp = compare_s3_to_repo() - - print(f'Moving to perm: {move_to_perm}') - print(f'Moving to temp: {move_to_temp}') - - for filename in move_to_perm: - transfer_file(filename, bucket_temp, bucket_perm) - - for filename in move_to_temp: - transfer_file(filename, bucket_perm, bucket_perm) - -def clean_bucket(): - assets = get_assets_in_bucket(S3_PERM) - for asset in assets: - run_command(f's3cmd --config={bucket_map[S3_PERM]} del {S3_PERM}/{asset}') - - assets = get_assets_in_bucket(S3_TEMP) - for asset in assets: - run_command(f's3cmd --config={bucket_map[S3_TEMP]} del {S3_TEMP}/{asset}') - -if __name__ == "__main__": - # Make temp dir for assets if needed. - os.makedirs(TEMP_ASSET_DIR, exist_ok=True) - - # Run script - manage_assets() \ No newline at end of file diff --git a/src/run.sh b/src/run.sh deleted file mode 100644 index d5dbe46..0000000 --- a/src/run.sh +++ /dev/null @@ -1,28 +0,0 @@ -#!/bin/bash - -# Set permissions for the SSH key -chmod 600 /root/.ssh/id_rsa - -# Add the Git host to the list of known hosts -ssh-keyscan -t rsa github.com >> /root/.ssh/known_hosts - -# Clone the repository using the deploy key -git clone -b master git@github.com:WATonomous/infra-config.git - -# Install libraries -pip install -r requirements.txt - -# Get list of branches -cd infra-config -BRANCHES=$(git ls-remote --heads origin | sed 's?.*refs/heads/??') - -# Loop through each branch -for BRANCH in $BRANCHES; do - echo "Managing Assets for: $BRANCH" - git checkout $BRANCH - git pull origin $BRANCH - - cd .. - python3 main.py - cd infra-config -done diff --git a/src/utils.py b/src/utils.py new file mode 100644 index 0000000..edc92b4 --- /dev/null +++ b/src/utils.py @@ -0,0 +1,126 @@ +import itertools +import json +import logging +import os +from enum import Enum +from pathlib import Path +from tempfile import NamedTemporaryFile + +import boto3 +import typer +from git import Repo + +from watcloud_uri import WATcloudURI + +AGENT_CONFIG = json.loads(os.environ["AGENT_CONFIG"]) +WORKSPACE = Path("/tmp/workspace") +WORKSPACE.mkdir(exist_ok=True, parents=True) + +flatten = itertools.chain.from_iterable + + +def typer_result_callback(ret, *args, **kwargs): + # This is useful when the return value is a generator + if hasattr(ret, "__iter__"): + ret = list(ret) + + print(ret) + + +app = typer.Typer(result_callback=typer_result_callback) + + +class LogLevel(str, Enum): + DEBUG = "DEBUG" + INFO = "INFO" + WARNING = "WARNING" + ERROR = "ERROR" + + +@app.callback() +def callback(log_level: LogLevel = LogLevel.INFO): + logging.basicConfig( + level=log_level.value, + format="%(asctime)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S %Z", + ) + + +@app.command() +def clone_repos(): + for repo_url, repo_config in AGENT_CONFIG["repos"].items(): + # Temporary file is required to handle ssh key permissions. + # NamedTemporaryFile is always created with mode 0600: + # https://stackoverflow.com/a/10541972 + with NamedTemporaryFile() as deploy_key_file: + logging.debug( + f"Copying deploy key from {repo_config['deploy_key_path']} to {deploy_key_file.name}" + ) + deploy_key_file.write(Path(repo_config["deploy_key_path"]).read_bytes()) + deploy_key_file.flush() + + repo_path = WORKSPACE / repo_url + + if repo_path.exists(): + logging.debug( + f"Path {repo_path} already exists. Pulling latest changes." + ) + repo = Repo(repo_path) + repo.remote().pull( + env={"GIT_SSH_COMMAND": f"ssh -i {deploy_key_file.name}"} + ) + logging.info(f"Pulled latest changes to {repo.working_dir}") + else: + logging.debug(f"Path {repo_path} does not exist. Cloning repo.") + repo = Repo.clone_from( + repo_url, + repo_path, + env={"GIT_SSH_COMMAND": f"ssh -i {deploy_key_file.name}"}, + ) + logging.info(f"Cloned {repo_url} to {repo.working_dir}") + + yield repo + + +@app.command() +def get_raw_watcloud_uris(repo_path: Path): + repo = Repo(repo_path) + + # -h suppresses filename output + # --only-matching returns only the matched text + out = repo.git.execute( + ["git", "grep", "--only-matching", "-h", "watcloud://[^\"' ]*"] + + [r.name for r in repo.remote().refs] + ) + uris = set(u.strip() for u in out.splitlines() if u.strip()) + + return uris + + +@app.command() +def get_watcloud_uris(repo_path: Path): + raw_uris = get_raw_watcloud_uris(repo_path) + + for uri in raw_uris: + try: + yield WATcloudURI(uri) + except ValueError as e: + logging.debug(f"Skipping invalid WATcloud URI '{uri}': {e}") + + +@app.command() +def get_bucket(bucket_name: str): + return boto3.resource( + "s3", + endpoint_url=AGENT_CONFIG["buckets"][bucket_name]["endpoint"], + aws_access_key_id=os.environ[ + AGENT_CONFIG["buckets"][bucket_name]["access_key_env_var"] + ], + aws_secret_access_key=os.environ[ + AGENT_CONFIG["buckets"][bucket_name]["secret_key_env_var"] + ], + ).Bucket(AGENT_CONFIG["buckets"][bucket_name]["bucket"]) + + +if __name__ == "__main__": + app() diff --git a/src/watcloud_uri.py b/src/watcloud_uri.py new file mode 100644 index 0000000..020e7f4 --- /dev/null +++ b/src/watcloud_uri.py @@ -0,0 +1,52 @@ +import re +import requests +from urllib.parse import urlparse, parse_qs + +RESOLVER_URL_PREFIXES = [ + "https://rgw.watonomous.ca/asset-perm", + "https://rgw.watonomous.ca/asset-temp", +] + + +def extract_sha256(s): + sha256_match = re.search(r"sha256:([a-f0-9]{64})", s) + if not sha256_match: + raise ValueError("Invalid string: does not contain a SHA-256 hash.") + return sha256_match.group(1) + + +class WATcloudURI: + def __init__(self, input_url): + parsed_url = urlparse(input_url) + if parsed_url.scheme != "watcloud": + raise ValueError("Invalid WATCloud URI: protocol must be 'watcloud:'") + if parsed_url.hostname != "v1": + raise ValueError( + f"Invalid WATCloud URI: unsupported version '{parsed_url.hostname}'. Only 'v1' is supported" + ) + + self.sha256 = extract_sha256(parsed_url.path) + query_params = parse_qs(parsed_url.query) + self.name = query_params.get("name", [None])[0] + + def resolve_to_url(self): + for prefix in RESOLVER_URL_PREFIXES: + url = f"{prefix}/{self.sha256}" + response = requests.head(url) + if response.ok: + return url + raise ValueError("Asset not found.") + + def __str__(self): + return f"watcloud://v1/sha256:{self.sha256}?name={self.name}" + + def __repr__(self) -> str: + return f"WATcloudURI({str(self)})" + + def __lt__(self, other): + return self.sha256 < other.sha256 + +if __name__ == "__main__": + # Example usage + uri = WATcloudURI("watcloud://v1/sha256:906f98c1d660a70a6b36ad14c559a9468fe7712312beba1d24650cc379a62360?name=cloud-light.avif") + print(uri.resolve_to_url()) \ No newline at end of file