From afd939dc4e7dd86564db16fa12acc4f2110e60f8 Mon Sep 17 00:00:00 2001 From: clee2000 <44682903+clee2000@users.noreply.github.com> Date: Fri, 13 Jan 2023 09:31:29 -0800 Subject: [PATCH] Delete github-status-sync and hud-query-proxy (#1272) removing the github-status-sync and hud-query-proxy lambdas as they were used for grafana? would appreciate input from someone who knows how grafana worked This is some lingering cleanup that's part of https://github.com/pytorch/pytorch/issues/78951 --- .github/scripts/generate_ci_workflows.py | 8 - .github/workflows/lint.yml | 1 - .gitignore | 2 - aws/lambda/github-status-sync/Makefile | 10 - aws/lambda/github-status-sync/README.md | 71 --- .../github-status-sync/cli-requirements.txt | 2 - .../github-status-sync/lambda_function.py | 467 ------------------ .../github-status-sync/requirements.txt | 6 - .../github-status-sync/update_triggers.py | 150 ------ aws/lambda/hud-query-proxy/Makefile | 10 - aws/lambda/hud-query-proxy/lambda_function.py | 239 --------- aws/lambda/hud-query-proxy/requirements.txt | 9 - mypy.ini | 1 - tools/scripts/update_pending_hud.py | 160 ------ 14 files changed, 1136 deletions(-) delete mode 100644 aws/lambda/github-status-sync/Makefile delete mode 100644 aws/lambda/github-status-sync/README.md delete mode 100644 aws/lambda/github-status-sync/cli-requirements.txt delete mode 100644 aws/lambda/github-status-sync/lambda_function.py delete mode 100644 aws/lambda/github-status-sync/requirements.txt delete mode 100644 aws/lambda/github-status-sync/update_triggers.py delete mode 100644 aws/lambda/hud-query-proxy/Makefile delete mode 100644 aws/lambda/hud-query-proxy/lambda_function.py delete mode 100644 aws/lambda/hud-query-proxy/requirements.txt delete mode 100755 tools/scripts/update_pending_hud.py diff --git a/.github/scripts/generate_ci_workflows.py b/.github/scripts/generate_ci_workflows.py index d4c11986bc..83a5e63b93 100755 --- a/.github/scripts/generate_ci_workflows.py +++ b/.github/scripts/generate_ci_workflows.py @@ -45,14 +45,6 @@ def generate_workflow_file(self, workflow_template: jinja2.Template) -> None: lambda_name="rds-proxy", timeout=3, ), - # This can't be deployed from GitHub's runners since it installs incompatible - # binaries when downloading dependencies - # CIWorkflow( - # template="deploy_lambda.yml.j2", - # name="github-status-sync", - # lambda_name="ossci-job-status-sync", - # timeout=5 * 60, - # ), CIWorkflow( template="metrics_pytorch_org.yml.j2", name="metrics-pytorch-org", timeout=3 ), diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 940e24d89c..263bb61ec2 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -27,7 +27,6 @@ jobs: - name: Install dependencies run: | pip install --requirement=.mypy_requirements.txt - pip install -r aws/lambda/github-status-sync/requirements.txt - name: Run mypy env: MYPY_FORCE_COLOR: 1 diff --git a/.gitignore b/.gitignore index 96f6aba318..7b0fab5ae9 100644 --- a/.gitignore +++ b/.gitignore @@ -28,11 +28,9 @@ terraform.rc aws/websites/metrics.pytorch.org/vars.yml -aws/lambda/hud-query-proxy/python/* aws/lambda/github-webhook-rds-sync/python/* aws/lambda/github-webhook-rds-sync/hooks/* aws/lambda/rds-proxy/python/* -aws/lambda/github-status-sync/python/* .vercel diff --git a/aws/lambda/github-status-sync/Makefile b/aws/lambda/github-status-sync/Makefile deleted file mode 100644 index 4b378daacd..0000000000 --- a/aws/lambda/github-status-sync/Makefile +++ /dev/null @@ -1,10 +0,0 @@ -deployment: - pip install --target ./python -r requirements.txt - cd python && zip -r ../deployment.zip . - zip -g deployment.zip lambda_function.py - -manual_update: - aws lambda update-function-code --function-name ossci-job-status-sync --zip-file fileb://deployment.zip - -clean: - rm -rf deployment.zip python diff --git a/aws/lambda/github-status-sync/README.md b/aws/lambda/github-status-sync/README.md deleted file mode 100644 index 9996c039fa..0000000000 --- a/aws/lambda/github-status-sync/README.md +++ /dev/null @@ -1,71 +0,0 @@ -This lambda updates data in S3 that is used by hud.pytorch.org on the main status page. It lists out the top N commits for a branch along with the statuses for each. - -## Configuration - -Mandatory: - -- `app_id` - the GitHub App ID to authenticate with (e.g. `1234351`) -- `private_key` - a private key generated from the GitHub App with newlines replaced with `|` characters -- `bucket` - s3 bucket to write to (e.g. `ossci-job-status`), the Lambda should be configured to have write access to the S3 `bucket` - -Optional: - -- `branches` - comma separated list of branches to handle: `master,main,nightly,viable/strict,release/1.10` -- `repo` - GitHub repository (e.g. `vision`) -- `user` - GitHub username (e.g. `pytorch`) -- `history_size` - number of commits to fetch in the past (e.g. `100`) - -### `update_triggers.py` - -These can optionally be configured via an EventBridge event (which would let you sync multiple repos at different rates from a single lambda). Use `update_triggers.py` to configure how you want the Lambda to run and execute it with the relevant AWS credentials. - -```bash -pip install -r cli-requirements.txt -export ACCOUNT_ID=1234 -python update_triggers.py -``` - -### Manual events - -You can also add events via the AWS console: - -1. In the Lambda triggers configuration page, add a new EventBridge trigger to run on a schedule (e.g. `rate(1 minute)`). -2. Click on the EventBridge event and got "Edit" it -3. In "Select targets" expand "Configure input" and choose "Constant (JSON text)". Paste in something like this - - ```json - { - "branches": "master,main,nightly,viable/strict,release/1.10", - "user": "pytorch", - "repo": "pytorch", - "history_size": 100 - } - ``` - -4. "Update" to save the changes, monitor the logs to ensure the Lambda is functioning correctly - -## Local Development - -Use the environment variables above along with `DEBUG=1` to run locally. - -```bash -# One-time setup -export DEBUG=1 -export app_id=1234 -export bucket=ossci-job-status -export private_key=$(cat key.pem | tr '\n' '|') - -# Run and debug -python lambda_function.py -``` - -**Note**: The `cryptography` package relies on binaries, so you can only deploy this from a Linux machine (doing it from MacOS will result in errors reading ELF headers at import time) - -## Manual invocation - -You can also use `update_triggers.py` to manually call the lambda for one of the rules defined in `update_triggers.py`. - -```bash -# e.g. sync the pytorch/pytorch/master branch -python update_triggers.py invoke --rule sync-pytorch-pytorch -``` diff --git a/aws/lambda/github-status-sync/cli-requirements.txt b/aws/lambda/github-status-sync/cli-requirements.txt deleted file mode 100644 index d5bb6ee0f1..0000000000 --- a/aws/lambda/github-status-sync/cli-requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -boto3==1.16.52 -Click==7.0 \ No newline at end of file diff --git a/aws/lambda/github-status-sync/lambda_function.py b/aws/lambda/github-status-sync/lambda_function.py deleted file mode 100644 index 853bb2aaad..0000000000 --- a/aws/lambda/github-status-sync/lambda_function.py +++ /dev/null @@ -1,467 +0,0 @@ -import asyncio -import aiohttp -import math -import os -import datetime -import re -import boto3 # type: ignore -import json -import io -import gzip -import os -from cryptography.hazmat.backends import default_backend -import jwt -import requests -import time -from typing import * - - -BUCKET = os.getenv("bucket", "ossci-job-status") -APP_ID = int(os.environ["app_id"]) - -# The private key needs to maintain its newlines, get it via -# $ cat key.pem | tr '\n' '|' | pbcopy -PRIVATE_KEY = os.environ["private_key"].replace("|", "\n") - - -def app_headers() -> Dict[str, str]: - cert_bytes = PRIVATE_KEY.encode() - private_key = default_backend().load_pem_private_key(cert_bytes, None) # type: ignore - - time_since_epoch_in_seconds = int(time.time()) - - payload = { - # issued at time - "iat": time_since_epoch_in_seconds, - # JWT expiration time (10 minute maximum) - "exp": time_since_epoch_in_seconds + (10 * 60), - # GitHub App's identifier - "iss": APP_ID, - } - - actual_jwt = jwt.encode(payload, private_key, algorithm="RS256") - headers = { - "Authorization": f"Bearer {actual_jwt}", - "Accept": "application/vnd.github.machine-man-preview+json", - } - return headers - - -def jprint(obj: Any) -> None: - print(json.dumps(obj, indent=2)) - - -def installation_id(user: str) -> int: - r_bytes = requests.get( - "https://api.github.com/app/installations", headers=app_headers() - ) - r = json.loads(r_bytes.content.decode()) - for item in r: - if item["account"]["login"] == user: - return int(item["id"]) - - raise RuntimeError(f"User {user} not found in {r}") - - -def user_token(user: str) -> str: - """ - Authorize this request with the GitHub app set by the 'app_id' and - 'private_key' environment variables. - 1. Get the installation ID for the user that has installed the app - 2. Request a new token for that user - 3. Return it so it can be used in future API requests - """ - # Hardcode the installation to PyTorch so we can always get a valid ID key - id = installation_id("pytorch") - url = f"https://api.github.com/app/installations/{id}/access_tokens" - r_bytes = requests.post(url, headers=app_headers()) - r = json.loads(r_bytes.content.decode()) - token = str(r["token"]) - return token - - -if "AWS_KEY_ID" in os.environ and "AWS_SECRET_KEY" in os.environ: - # Use keys for local development - session = boto3.Session( - aws_access_key_id=os.environ.get("AWS_KEY_ID"), - aws_secret_access_key=os.environ.get("AWS_SECRET_KEY"), - ) -else: - # In the Lambda, use permissions on the Lambda's role - session = boto3.Session() -s3 = session.resource("s3") - - -def compress_query(query: str) -> str: - query = query.replace("\n", "") - query = re.sub("\s+", " ", query) - return query - - -def head_commit_query(user: str, repo: str, branches: List[str]) -> str: - """ - Fetch the head commit for a list of branches - """ - - def branch_part(branch: str, num: int) -> str: - return f""" - r{num}: repository(name: "{repo}", owner: "{user}") {{ - ref(qualifiedName:"refs/heads/{branch}") {{ - name - target {{ - ... on Commit {{ - oid - }} - }} - }} - }} - """ - - parts = [branch_part(branch, i) for i, branch in enumerate(branches)] - return "{" + "\n".join(parts) + "}" - - -def extract_gha(suites: List[Dict[str, Any]]) -> List[Dict[str, str]]: - jobs = [] - for suite in suites: - suite = suite["node"] - if suite["workflowRun"] is None: - # If no jobs were triggered this will be empty - continue - workflow = suite["workflowRun"]["workflow"]["name"] - for run in suite["checkRuns"]["nodes"]: - conclusion = run["conclusion"] - if conclusion is None: - if run["status"].lower() == "queued": - conclusion = "queued" - elif run["status"].lower() == "in_progress": - conclusion = "pending" - else: - raise RuntimeError(f"unexpected run {run}") - jobs.append( - { - "name": f"{workflow} / {run['name']}", - "status": conclusion.lower(), - "url": run["detailsUrl"], - } - ) - - return jobs - - -def extract_status(contexts: List[Dict[str, Any]]) -> List[Dict[str, str]]: - jobs = [] - for context in contexts: - jobs.append( - { - "name": context["context"], - "status": context["state"].lower(), - "url": context["targetUrl"], - } - ) - - return jobs - - -def extract_jobs(raw_commits: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - commits = [] - - for raw_commit in raw_commits: - if raw_commit["status"] is None: - # Will be none if no non-GHA jobs were triggered - status = [] - else: - status = extract_status(raw_commit["status"]["contexts"]) - gha = extract_gha(raw_commit["checkSuites"]["edges"]) - jobs = status + gha - - if raw_commit["author"]["user"] is None: - author = raw_commit["author"]["name"] - else: - author = raw_commit["author"]["user"]["login"] - commits.append( - { - "sha": raw_commit["oid"], - "headline": raw_commit["messageHeadline"], - "body": raw_commit["messageBody"], - "author": author, - "date": raw_commit["authoredDate"], - "jobs": jobs, - } - ) - return commits - - -class BranchHandler: - def __init__( - self, - gql: Any, - user: str, - repo: str, - name: str, - head: str, - history_size: int, - fetch_size: int, - ): - self.gql = gql - self.user = user - self.repo = repo - self.name = name - self.head = head - self.fetch_size = fetch_size - self.history_size = history_size - - def write_to_s3(self, data: Any) -> None: - content = json.dumps(data, default=str) - buf = io.BytesIO() - gzipfile = gzip.GzipFile(fileobj=buf, mode="w") - gzipfile.write(content.encode()) - gzipfile.close() - bucket = s3.Bucket(BUCKET) - prefix = f"v5/{self.user}/{self.repo}/{self.name.replace('/', '_')}.json" - bucket.put_object( - Key=prefix, - Body=buf.getvalue(), - ContentType="application/json", - ContentEncoding="gzip", - Expires="0", - ) - print(f"Wrote {len(data)} commits from {self.name} to {prefix}") - - def query(self, offset: int) -> str: - after = "" - # The cursor for fetches are formatted like after: " ", but - # the first commit isn't included, so shift all the offsets and don't - # use an "after" for the first batch - if offset > 0: - after = f', after: "{self.head} {offset - 1}"' - - return f""" - {{ - repository(name: "{self.repo}", owner: "{self.user}") {{ - ref(qualifiedName:"refs/heads/{self.name}") {{ - name - target {{ - ... on Commit {{ - history(first:{self.fetch_size}{after}) {{ - nodes {{ - oid - messageBody - messageHeadline - author {{ - name - user {{ - login - }} - }} - authoredDate - checkSuites(first:100) {{ - edges {{ - node {{ - checkRuns(first:100) {{ - nodes {{ - name - status - conclusion - detailsUrl - }} - }} - workflowRun {{ - workflow {{ - name - }} - }} - }} - }} - }} - status {{ - contexts {{ - context - state - targetUrl - }} - }} - }} - }} - }} - }} - }} - }} - }} - """ - - def check_response(self, gql_response: Any) -> None: - # Just check that this path in the dict exists - gql_response["data"]["repository"]["ref"]["target"]["history"]["nodes"] - - async def run(self) -> None: - """ - Fetch history for the branch (in batches) and merge them all together - """ - # GitHub's API errors out if you try to fetch too much data at once, so - # split up the 100 commits into batches of 'self.fetch_size' - fetches = math.ceil(self.history_size / self.fetch_size) - - async def fetch(i: int) -> Any: - try: - return await self.gql.query( - self.query(offset=self.fetch_size * i), verify=self.check_response - ) - except Exception as e: - print( - f"Error: {e}\nFailed to fetch {self.user}/{self.repo}/{self.name} on batch {i} / {fetches}" - ) - return None - - coros = [fetch(i) for i in range(fetches)] - result = await asyncio.gather(*coros) - raw_commits = [] - - print(f"Parsing results {self.name}") - # Merge all the batches - for r in result: - if r is None: - continue - try: - commits_batch = r["data"]["repository"]["ref"]["target"]["history"][ - "nodes" - ] - raw_commits += commits_batch - except Exception as e: - # Errors here are expected if the branch has less than HISTORY_SIZE - # commits (GitHub will just time out). There's no easy way to find - # this number ahead of time and avoid errors, but if we had that - # then we could delete this try-catch. - print(f"Error: Didn't find history in commit batch: {e}\n{r}") - - # Pull out the data and format it - commits = extract_jobs(raw_commits) - - print(f"Writing results for {self.name} to S3") - - # Store gzip'ed data to S3 - self.write_to_s3(commits) - - -class GraphQL: - def __init__(self, session: aiohttp.ClientSession) -> None: - self.session = session - - def log_rate_limit(self, headers: Any) -> None: - remaining = headers.get("X-RateLimit-Remaining") - used = headers.get("X-RateLimit-Used") - total = headers.get("X-RateLimit-Limit") - reset_timestamp = int(headers.get("X-RateLimit-Reset", 0)) - reset = datetime.datetime.fromtimestamp(reset_timestamp).strftime( - "%a, %d %b %Y %H:%M:%S" - ) - - print( - f"[rate limit] Used {used}, {remaining} / {total} remaining, reset at {reset}" - ) - - async def query( - self, - query: str, - verify: Optional[Callable[[Any], None]] = None, - retries: int = 5, - ) -> Any: - """ - Run an authenticated GraphQL query - """ - # Remove unnecessary white space - query = compress_query(query) - if retries <= 0: - raise RuntimeError(f"Query {query[:100]} failed, no retries left") - - url = "https://api.github.com/graphql" - try: - async with self.session.post(url, json={"query": query}) as resp: - self.log_rate_limit(resp.headers) - r = await resp.json() - if "data" not in r: - raise RuntimeError(r) - if verify is not None: - verify(r) - return r - except Exception as e: - print( - f"Retrying query {query[:100]}, remaining attempts: {retries - 1}\n{e}" - ) - return await self.query(query, verify=verify, retries=retries - 1) - - -async def main( - user: str, repo: str, branches: List[str], history_size: int, fetch_size: int -) -> None: - """ - Grab a list of all the head commits for each branch, then fetch all the jobs - for the last 'history_size' commits on that branch - """ - async with aiohttp.ClientSession( - headers={ - "Authorization": "token {}".format(user_token(user)), - "Accept": "application/vnd.github.machine-man-preview+json", - } - ) as aiosession: - gql = GraphQL(aiosession) - print(f"Querying branches: {branches}") - heads = await gql.query(head_commit_query(user, repo, branches)) - handlers = [] - - for head in heads["data"].values(): - sha = head["ref"]["target"]["oid"] - branch = head["ref"]["name"] - handlers.append( - BranchHandler(gql, user, repo, branch, sha, history_size, fetch_size) - ) - - await asyncio.gather(*[h.run() for h in handlers]) - - -def lambda_handler(event: Any, context: Any) -> None: - """ - 'event' here is the payload configured from EventBridge (or set manually - via environment variables) - """ - data: Dict[str, Any] = { - "branches": None, - "user": None, - "repo": None, - "history_size": None, - "fetch_size": None, - } - - for key in data.keys(): - if key in os.environ: - data[key] = os.environ[key] - else: - data[key] = event[key] - - if any(x is None for x in data.values()): - raise RuntimeError( - "Data missing from configuration, it must be set as an environment " - f"variable or as the input JSON payload in the Lambda event:\n{data}" - ) - - data["history_size"] = int(data["history_size"]) - data["fetch_size"] = int(data["fetch_size"]) - data["branches"] = data["branches"].split(",") - - # return - asyncio.run(main(**data)) - - -if os.getenv("DEBUG", "0") == "1": - # For local development - lambda_handler( - { - "branches": "release/1.10", - "user": "pytorch", - "repo": "pytorch", - "history_size": 100, - "fetch_size": 10, - }, - None, - ) - diff --git a/aws/lambda/github-status-sync/requirements.txt b/aws/lambda/github-status-sync/requirements.txt deleted file mode 100644 index 424ce4b666..0000000000 --- a/aws/lambda/github-status-sync/requirements.txt +++ /dev/null @@ -1,6 +0,0 @@ -boto3==1.16.52 -aiohttp==3.7.4 -cryptography==35.0.0 -requests==2.24.0 -types-requests==2.25.0 -PyJWT==2.4.0 diff --git a/aws/lambda/github-status-sync/update_triggers.py b/aws/lambda/github-status-sync/update_triggers.py deleted file mode 100644 index 1591093638..0000000000 --- a/aws/lambda/github-status-sync/update_triggers.py +++ /dev/null @@ -1,150 +0,0 @@ -import json -import os -import boto3 -import click - -eb = boto3.client("events") -lb = boto3.client("lambda") - - -def jprint(o): - print(json.dumps(o, indent=2, default=str)) - - -REGION = os.getenv("AWS_DEFAULT_REGION", "us-east-1") -ACCOUNT_ID = os.environ["ACCOUNT_ID"] -LAMBDA_ARN = f"arn:aws:lambda:{REGION}:{ACCOUNT_ID}:function:ossci-job-status-sync" - - -def generate_input(repo, branches, user="pytorch", history_size=100, fetch_size=10): - return { - "branches": ",".join(branches), - "user": user, - "repo": repo, - "history_size": history_size, - "fetch_size": fetch_size, - } - - -EVENT_TARGETS = { - "sync-pytorch-audio": { - "schedule": "rate(1 hour)", - "input": generate_input(repo="audio", branches=["main", "nightly", "release/0.10"]), - }, - "sync-pytorch-examples": { - "schedule": "rate(4 hours)", - "input": generate_input(repo="examples", branches=["master"]), - }, - "sync-pytorch-tutorials": { - "schedule": "rate(4 hours)", - "input": generate_input(repo="tutorials", branches=["master"]), - }, - "sync-pytorch-text": { - "schedule": "rate(2 hours)", - "input": generate_input(repo="text", branches=["main", "nightly", "release/0.11"]), - }, - "sync-pytorch-vision": { - "schedule": "rate(1 hour)", - "input": generate_input(repo="vision", branches=["main", "nightly", "release/0.11"]), - }, - "sync-pytorch-pytorch": { - "schedule": "rate(1 minute)", - "input": generate_input(repo="pytorch", branches=["master", "main"]), - }, - "sync-pytorch-pytorch-slow": { - "schedule": "rate(1 hour)", - "input": generate_input( - repo="pytorch", branches=["nightly", "viable/strict", "release/1.11"], - ), - }, - "sync-pytorch-lightning": { - "schedule": "rate(4 hours)", - "input": generate_input( - user="PyTorchLightning", - repo="pytorch-lightning", - branches=["master"], - fetch_size=4, - ), - }, - "sync-pytorch-torchx": { - "schedule": "rate(4 hours)", - "input": generate_input(repo="torchx", branches=["main"],), - }, -} - - -@click.group() -def cli(): - """ - Tool to manage CloudEvents triggers for the syncing job behind hud.pytorch.org. - - To use, you must set the ACCOUNT_ID environment variable: - - # You can also get the ID from any ARN on the account - $ aws sts get-caller-identity --query Account --output text - 123456 - $ export ACCOUNT_ID=123456 - """ - pass - - -@cli.command() -@click.option("--rule") -def invoke(rule): - data = EVENT_TARGETS[rule]["input"] - print(f"Sending to {LAMBDA_ARN}:") - print(json.dumps(data, indent=2)) - result = lb.invoke(FunctionName=LAMBDA_ARN, Payload=json.dumps(data).encode()) - print(result) - - -@cli.command() -def list(): - """ - Show all triggering rules for the lambda - """ - rules = eb.list_rule_names_by_target(TargetArn=LAMBDA_ARN)["RuleNames"] - for name in rules: - targets = eb.list_targets_by_rule(Rule=name) - for target in targets["Targets"]: - input = json.loads(target["Input"]) - print(f"Input for {name} ({target['Id']}):") - jprint(input) - print("") - - -@cli.command() -def update(): - """ - Remove and re-add all triggering rules for the lambda - """ - rules = eb.list_rule_names_by_target(TargetArn=LAMBDA_ARN)["RuleNames"] - for name in rules: - # Clear out targets - targets = eb.list_targets_by_rule(Rule=name) - ids = [t["Id"] for t in targets["Targets"]] - eb.remove_targets(Rule=name, Ids=ids) - - # Delete the rule - eb.delete_rule(Name=name) - print(f"Deleted rule {name}") - - # Add the rules specified above - for name, data in EVENT_TARGETS.items(): - eb.put_rule(Name=name, ScheduleExpression=data["schedule"], State="ENABLED") - - # Install a target on the rule - r = eb.put_targets( - Rule=name, - Targets=[ - {"Arn": LAMBDA_ARN, "Id": "update", "Input": json.dumps(data["input"])} - ], - ) - if r["FailedEntryCount"] == 0: - print(f"Updated {name}") - else: - print(f"Failed to update {name}") - - -if __name__ == "__main__": - cli() diff --git a/aws/lambda/hud-query-proxy/Makefile b/aws/lambda/hud-query-proxy/Makefile deleted file mode 100644 index 2319134da8..0000000000 --- a/aws/lambda/hud-query-proxy/Makefile +++ /dev/null @@ -1,10 +0,0 @@ -deployment: - pip install --target ./python -r requirements.txt - cd python && zip -r ../deployment.zip . - zip -g deployment.zip lambda_function.py utils.py - -manual_update: - aws lambda update-function-code --function-name hud-query-proxy --zip-file fileb://deployment.zip - -clean: - rm -rf deployment.zip python diff --git a/aws/lambda/hud-query-proxy/lambda_function.py b/aws/lambda/hud-query-proxy/lambda_function.py deleted file mode 100644 index 5f0a91cb6d..0000000000 --- a/aws/lambda/hud-query-proxy/lambda_function.py +++ /dev/null @@ -1,239 +0,0 @@ -""" -Query the database backing metrics.pytorch.org for recent commits and statuses, -then push these to S3 at ossci-job-status/single/{branch}.tar.gz - -This fetches the 100 most recent commits for the branch (GitHub doesn't send -push events for stacks of commits so we'd need to recover those manually), then -gets their GitHub Actions job statuses and status events (CircleCI, CodeCov, -Jenkins, etc.) - -Use environment variables 'num_commits' and 'branches' to configure the lambda -""" -import os -import re -import sys -import gzip -import json -import io -import socket -import boto3 - -from typing import * - -import asyncio -import aiomysql - - -def eprint(*args) -> None: - print(*args, file=sys.stderr) - - -async def run_query(query: str, params: Any = None) -> List[Dict[str, Any]]: - eprint(f"Executing '{query}' with params: {params}") - - my_con = await aiomysql.connect( - host=os.environ["db_host"], - port=socket.getservbyname("mysql"), - user=os.environ["db_user"], - password=os.environ["db_password"], - db="pytorch", - loop=asyncio.get_event_loop(), - ) - async with my_con.cursor(aiomysql.DictCursor) as cur: - await cur.execute(query, params) - return await cur.fetchall() - - -PR_RE = re.compile(r"(.*)\(#([0-9]+)\)$") -NUM_COMMITS = int(os.getenv("num_commits", 100)) -BRANCHES = os.getenv("branches", "master,main,release/1.9,nightly").split(",") - - -async def get_commits(branch: str) -> List[Dict[str, Any]]: - results = await run_query( - f""" - select head_commit_timestamp, - head_commit_id, - head_commit_author_username, - head_commit_message - from push_event - where ref = %s - order by head_commit_timestamp desc - limit %s; - """, - (branch, NUM_COMMITS), - ) - for row in results: - message = row["head_commit_message"] - title = message.split("\n")[0].strip() - del row["head_commit_message"] - match = PR_RE.search(title) - if match: - groups = match.groups() - if len(groups) == 2: - title = groups[0].strip() - pr = groups[1] - else: - pr = "" - else: - pr = "" - - row["head_commit_title"] = title - row["pr"] = pr - row["jobs"] = [] - - return results - - -async def gha_jobs_for_shas(shas: List[str]) -> List[Any]: - placeholders = ["%s"] * len(shas) - query = f""" - select workflow_run.name as "workflow", - workflow_job.name, - workflow_job.head_sha, - workflow_job.status, - workflow_job.started_at, - workflow_job.id, - workflow_job.conclusion - from workflow_job - inner join workflow_run on workflow_job.run_id = workflow_run.id - where workflow_job.head_sha in ({', '.join(placeholders)}); - """ - return await run_query(query, shas) - - -async def circleci_jobs_for_shas(shas: List[str]) -> List[Any]: - placeholders = ["%s"] * len(shas) - query = f""" - select context, - sha, - updated_at, - target_url, - state - from status_event - where sha in ({', '.join(placeholders)}); - """ - return await run_query(query, shas) - - -def deduplicate_jobs(commit: Dict[str, Any]) -> None: - deduplicated_jobs: Dict[str, Any] = {} - for job in commit["jobs"]: - old_entry = deduplicated_jobs.get(job["name"]) - if old_entry is not None: - if job["time"] > old_entry["time"]: - deduplicated_jobs[job["name"]] = job - else: - deduplicated_jobs[job["name"]] = job - - commit["jobs"] = deduplicated_jobs - - -def compact_data(commits: Dict[str, Any]) -> None: - # There could be re-runs, get rid of them and only keep the latest one - for commit in commits.values(): - deduplicate_jobs(commit) - - job_names = set() - for commit in commits.values(): - for job in commit["jobs"].values(): - job_names.add(job["name"]) - - for commit in commits.values(): - full_status: Dict[str, Any] = {} - for name in job_names: - job = commit["jobs"].get(name) - if job is None: - full_status[name] = None - else: - full_status[name] = [job["status"], job["url"]] - commit["jobs"] = full_status - - -async def main(branch): - commits = await get_commits(f"refs/heads/{branch}") - commits = {commit["head_commit_id"]: commit for commit in commits} - shas = list(commits.keys()) - - async def add_gha(): - gha = await gha_jobs_for_shas(shas) - for job in gha: - status = "" - if job["status"] != "completed": - status = "pending" - else: - status = job["conclusion"] - commits[job["head_sha"]]["jobs"].append( - { - "name": f"{job['workflow']} / {job['name']}", - "time": job["started_at"], - "url": f"https://github.com/pytorch/pytorch/runs/{job['id']}?check_suite_focus=true", - "status": status, - } - ) - - async def add_circleci(): - circleci = await circleci_jobs_for_shas(shas) - for job in circleci: - commits[job["sha"]]["jobs"].append( - { - "name": job["context"], - "time": job["updated_at"], - "url": job["target_url"], - "status": job["state"], - } - ) - - await asyncio.gather(add_gha(), add_circleci()) - - compact_data(commits) - - return [commit for commit in commits.values()] - - -def update_branch(branch): - data = asyncio.get_event_loop().run_until_complete(main(branch)) - - # Default response is huge due to large job names, use gzip to make them - # small - content = json.dumps(data, default=str) - buf = io.BytesIO() - gzipfile = gzip.GzipFile(fileobj=buf, mode="w") - gzipfile.write(content.encode()) - gzipfile.close() - - session = boto3.Session( - aws_access_key_id=os.environ.get("aws_key_id"), - aws_secret_access_key=os.environ.get("aws_access_key"), - ) - s3 = session.resource("s3") - - bucket = s3.Bucket("ossci-job-status") - name = f"single/{branch.replace('/', '_')}.json.gz" - bucket.put_object( - Key=name, - Body=buf.getvalue(), - ContentType="application/json", - ContentEncoding="gzip", - ) - print(f"Wrote object for {name}") - return "ok" - - -def lambda_handler(events, context): - exception = None - results = {} - for branch in BRANCHES: - branch = branch.strip() - results[branch] = "failed" - try: - results[branch] = update_branch(branch) - except Exception as e: - exception = e - - if exception is not None: - raise exception - return json.dumps(results, indent=2) - - -print(json.dumps(lambda_handler(None, None), indent=2, default=str)) diff --git a/aws/lambda/hud-query-proxy/requirements.txt b/aws/lambda/hud-query-proxy/requirements.txt deleted file mode 100644 index eed3ef13e9..0000000000 --- a/aws/lambda/hud-query-proxy/requirements.txt +++ /dev/null @@ -1,9 +0,0 @@ -# To install the lambda layers: -# -# pip install --target ./python -r requirements.txt -# zip -r layer.zip ./python -# -# Then upload the layer via aws console or aws cli -# Update the lambda's layer reference to the newly uploaded layer - -aiomysql==0.0.21 \ No newline at end of file diff --git a/mypy.ini b/mypy.ini index acbc935b4c..87296efec3 100644 --- a/mypy.ini +++ b/mypy.ini @@ -8,5 +8,4 @@ show_column_numbers = True strict = True files = - aws/lambda/github-status-sync/lambda_function.py, torchci/scripts/reverts.py diff --git a/tools/scripts/update_pending_hud.py b/tools/scripts/update_pending_hud.py deleted file mode 100755 index e8c9ff2192..0000000000 --- a/tools/scripts/update_pending_hud.py +++ /dev/null @@ -1,160 +0,0 @@ -#!/usr/bin/env python3 -# Update pending GH Runs/CircleCI jobs in HUD -# Copyright (c) 2021-present, Facebook, Inc. -import boto3 -import botocore -import json -import os -import re - -from typing import Any, Dict, List, Optional, Union -from urllib.request import urlopen, Request -from urllib.error import HTTPError -from datetime import datetime, timedelta - -s3 = boto3.resource('s3') -bucket_name = 'ossci-job-status' - - -def s3_get_json(bucket, path, empty_obj): - try: - return json.loads(s3.Object(bucket, path).get()['Body'].read().decode('utf-8')) - except botocore.exceptions.ClientError: - return empty_obj - - -def json_dumps(obj): - return json.dumps(obj, sort_keys=True, indent=4, separators=(',', ': ')) - - -def gh_fetch_json(url: str, params: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: - headers = {'Accept': 'application/vnd.github.v3+json'} - token = os.environ.get("GITHUB_TOKEN") - if token is not None and url.startswith('https://api.github.com/'): - headers['Authorization'] = f'token {token}' - if params is not None and len(params) > 0: - url += '?' + '&'.join(f"{name}={val}" for name, val in params.items()) - try: - with urlopen(Request(url, headers=headers)) as data: - return json.load(data) - except HTTPError as err: - if err.code == 403 and all(key in err.headers for key in ['X-RateLimit-Limit', 'X-RateLimit-Used']): - print(f"Rate limit exceeded: {err.headers['X-RateLimit-Used']}/{err.headers['X-RateLimit-Limit']}") - raise - - -def get_circleci_token() -> str: - token_file_path = os.path.join(os.getenv('HOME'), '.circleci_token') - token = os.getenv('CIRCLECI_TOKEN') - if token is not None: - return token - if not os.path.exists(token_file_path): - return None - with open(token_file_path) as f: - return f.read().strip() - - -def circleci_fetch_json(url: str) -> Union[Dict[str, Any], List[Dict[str, Any]]]: - token = get_circleci_token() - headers = {'Accept': 'application/json'} - if token is not None: - headers['Circle-Token'] = token - with urlopen(Request(url, headers=headers)) as data: - return json.load(data) - - -def circleci_get_job_status(org: str, project: str, job_id: int) -> Dict[str, Any]: - rc = circleci_fetch_json(f"https://circleci.com/api/v2/project/gh/{org}/{project}/job/{job_id}") - assert isinstance(rc, dict) - return rc - - -def gh_fetch_multipage_json(url: str, params: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: - if params is None: - params = {} - assert "page" not in params - page_idx, rc, prev_len, params = 1, [], -1, params.copy() - while len(rc) > prev_len: - prev_len = len(rc) - params["page"] = page_idx - page_idx += 1 - page_json = gh_fetch_json(url, params) - rc += page_json - return rc - - -def gh_get_ref_statuses(org: str, project: str, ref: str) -> Dict[str, Any]: - url = f'https://api.github.com/repos/{org}/{project}/commits/{ref}/status' - params = {"page": 1, "per_page": 100} - nrc = rc = gh_fetch_json(url, params) - while "statuses" in nrc and len(nrc["statuses"]) == 100: - params["page"] += 1 - nrc = gh_fetch_json(url, params) - if "statuses" in nrc: - rc["statuses"] += nrc["statuses"] - return rc - -def gh_get_runs_status(org: str, project: str, run_id: str) -> List[Dict[str, Any]]: - url = f'https://api.github.com/repos/{org}/{project}/check-runs/{run_id}' - return gh_fetch_json(url) - - -def map_circle_status(status: str) -> str: - if status in ["running", "queued"]: - return "pending" - if status in ["infrastructure_fail", "failed"]: - return "failure" - return status - - -def map_ghrun_status(status: str) -> str: - if status == "completed": - return "success" - return status - - -def update_pending(branch: str = "master") -> None: - commit_index = s3_get_json(bucket_name, f'{branch}/index.json', []) - for idx, item in enumerate(commit_index): - commit_id = item['id'] - title = item['message'].split("\n")[0] - timestamp = datetime.fromisoformat(item['timestamp']).replace(tzinfo=None) - if datetime.utcnow() - timestamp < timedelta(hours=5): - print(f"[{idx}/{len(commit_index)}] {title} ( {commit_id} ) is skipped as it was merged less than 5 hours ago") - continue - has_pending, has_updates = False, False - job_statuses = s3_get_json(bucket_name, f'{branch}/{commit_id}.json', {}) - for (name, value) in job_statuses.items(): - status = value['status'] - build_url = value['build_url'] - if status not in ['success', 'skipped', 'error', 'failure']: - circle_match = re.match("https://circleci.com/gh/pytorch/pytorch/(\\d+)\\?", build_url) - ghrun_match = re.match("https://github.com/pytorch/pytorch/runs/(\\d+)", build_url) - if circle_match is not None: - job_id = int(circle_match.group(1)) - job_status = circleci_get_job_status("pytorch", "pytorch", job_id) - circle_status = map_circle_status(job_status['status']) - if status != circle_status: - job_statuses[name]['status'] = circle_status - has_updates = True - continue - if ghrun_match is not None: - run_id = int(ghrun_match.group(1)) - check_status = gh_get_runs_status("pytorch", "pytorch", run_id) - ghrun_status = map_ghrun_status(check_status['status']) - if status != ghrun_status: - job_statuses[name]['status'] = ghrun_status - has_updates = True - continue - has_pending = True - if has_updates: - print(f"[{idx}/{len(commit_index)}] {title} ( {commit_id} ) has updates") - s3.Object(bucket_name, f'{branch}/{commit_id}.json').put(Body=json_dumps(job_statuses)) - elif has_pending: - print(f"[{idx}/{len(commit_index)}] {title} ( {commit_id} ) has pending statuses") - else: - print(f"[{idx}/{len(commit_index)}] were processed") - - -if __name__ == '__main__': - update_pending()