From 174648de9057bc0e1af8c14c11458d53fba76517 Mon Sep 17 00:00:00 2001 From: clee2000 <44682903+clee2000@users.noreply.github.com> Date: Fri, 6 Sep 2024 15:50:52 -0700 Subject: [PATCH] [ch] pr_commits, some dr ci queries (#5626) the dr ci queries are a bit slow (15s for the recent jobs query) still gated, did some brief checking on Dr. CI comments but I think it could use more checking Dr. CI tests will need to be rewritten for clickhouse mocks I'm a little tempted to just switch over and not gate keep for dr. ci For comparing: ``` curl "http://localhost:3000/api/drci/drci" --data 'repo=pytorch' > new.json ``` ``` curl --request POST \ --url 'https://www.torch-ci.com/api/drci/drci' \ --header 'Authorization: i9ju23nf39nunu3asn' \ --data 'repo=pytorch' > orig.json ``` ``` import json from library import REPO_ROOT import re def failures(thing): for pr_num in thing: for classification in thing[pr_num]: thing[pr_num][classification] = sorted(thing[pr_num][classification], key=lambda x: x["id"]) for failure in thing[pr_num][classification]: if failure["failure_captures"] == None: failure["failure_captures"] = [] if failure["failure_context"] == None: failure["failure_context"] = [] if failure["failure_lines"] == None: failure["failure_lines"] = [] for key, value in failure.items(): date_match = re.match(r"(.*) (.*)\.000000000", str(value)) if date_match: failure[key] = f"date" date_match_2 = re.match(r"(\d\d\d\d-\d\d-\d\d)T(\d\d:\d\d:\d\d).*", str(value)) if date_match_2: failure[key] = f"date" if __name__ == "__main__": orig_file = REPO_ROOT / "../test-infra/torchci/orig.json" new_file = REPO_ROOT / "../test-infra/torchci/new.json" with open(orig_file, "r") as f: orig = json.load(f) with open(new_file, "r") as f: new = json.load(f) failures(orig) failures(new) with open(orig_file, "w") as f: json.dump(orig, f, indent=2) with open(new_file, "w") as f: json.dump(new, f, indent=2) ``` --- .../commit_failed_jobs/query.sql | 45 ++++----- .../clickhouse_queries/pr_commits/query.sql | 51 +++++----- .../recent_pr_workflows_query/query.sql | 95 +++++++++---------- torchci/lib/fetchPR.ts | 62 +++++++----- torchci/lib/fetchRecentWorkflows.ts | 23 +++++ torchci/pages/api/drci/drci.ts | 11 ++- 6 files changed, 166 insertions(+), 121 deletions(-) diff --git a/torchci/clickhouse_queries/commit_failed_jobs/query.sql b/torchci/clickhouse_queries/commit_failed_jobs/query.sql index 509f97043b..6a153faaad 100644 --- a/torchci/clickhouse_queries/commit_failed_jobs/query.sql +++ b/torchci/clickhouse_queries/commit_failed_jobs/query.sql @@ -1,41 +1,36 @@ --- !!! Query is not converted to CH syntax yet. Delete this line when it gets converted -- This query is used by Dr.CI to get all the failed jobs from the base commit. They can then be -- used to decide if a failure is due to broken trunk with relevant_pushes as ( select - p.head_commit.timestamp, - p.after - from commons.push p + p.head_commit.timestamp as timestamp, + p.head_commit.'id' as after + from default.push p final where - ARRAY_CONTAINS( - SPLIT(:shas, ','), p.after - ) + p.head_commit.'id' in {shas: Array(String)} ) SELECT - j.id, + j.id as id, j.name AS jobName, CONCAT(w.name, ' / ', j.name) AS name, j.runner_name AS runnerName, - w.head_commit.author.email as authorEmail, - j.conclusion, - j.completed_at, - j.html_url, - j.head_sha, + w.head_commit.'author'.'email' as authorEmail, + j.conclusion as conclusion, + j.completed_at as completed_at, + j.html_url as html_url, + j.head_sha as head_sha, p.timestamp AS head_sha_timestamp, - j.head_branch, - j.torchci_classification.captures AS failure_captures, - IF(j.torchci_classification.line IS NULL, null, ARRAY_CREATE(j.torchci_classification.line)) AS failure_lines, - j.torchci_classification.context AS failure_context, - j._event_time AS time, + j.head_branch as head_branch, + j.torchci_classification.'captures' AS failure_captures, + IF(j.torchci_classification.'line' = '', [], [j.torchci_classification.'line']) AS failure_lines, + j.torchci_classification.'context' AS failure_context, + j.created_at AS time FROM - commons.workflow_run w - JOIN commons.workflow_job j ON w.id = j.run_id HINT(join_broadcast = true) + default.workflow_run w final + JOIN default.workflow_job j final ON w.id = j.run_id -- Do a left join here because the push table won't have any information about -- commits from forked repo - LEFT JOIN relevant_pushes p ON p.after = j.head_sha HINT(join_strategy = lookup) + LEFT JOIN relevant_pushes p ON p.after = j.head_sha WHERE - ARRAY_CONTAINS( - SPLIT(: shas, ','), - j.head_sha - ) + j.id in (select id from materialized_views.workflow_job_by_head_sha where head_sha in {shas: Array(String)}) + and w.id in (select id from materialized_views.workflow_run_by_head_sha where head_sha in {shas: Array(String)}) AND j.conclusion IN ('failure', 'cancelled') diff --git a/torchci/clickhouse_queries/pr_commits/query.sql b/torchci/clickhouse_queries/pr_commits/query.sql index 835d2f30fc..e0acb8b7cc 100644 --- a/torchci/clickhouse_queries/pr_commits/query.sql +++ b/torchci/clickhouse_queries/pr_commits/query.sql @@ -1,4 +1,3 @@ --- !!! Query is not converted to CH syntax yet. Delete this line when it gets converted -- This query is used by the HUD's /pull page to populate the list of historical commits -- made against a given PR. -- This improves upon the default github commits view because it allows HUD to show jobs @@ -8,37 +7,43 @@ WITH -- Get all PRs that were merged into master, and get all the SHAs for commits from that PR which CI jobs ran against -- We need the shas because some jobs (like trunk) don't have a PR they explicitly ran against, but they _were_ run against -- a commit from a PR -pr_shas AS ( +shas as ( SELECT DISTINCT - FORMAT_ISO8601( - PARSE_TIMESTAMP_ISO8601(p.head_commit.timestamp), - 'America/Los_Angeles' - ) as timestamp, - r.pull_requests[1].number AS pr_number, - p.head_commit.id AS sha, - p.head_commit.message, + r.pull_requests[1].'number' AS pr_number, + r.head_sha AS sha, CONCAT( 'https://github.com/', - :owner, + {owner: String}, '/', - :repo, + {repo: String}, '/pull/', - r.pull_requests[1].number - ) AS pr_url, - p.head_commit.url AS commit_url, + r.pull_requests[1].'number' + ) AS pr_url FROM - commons.push p - JOIN commons.workflow_run r ON p.head_commit.id = r.head_sha HINT(join_strategy=lookup) + default.workflow_run r final WHERE - 1 = 1 - AND LENGTH(r.pull_requests) = 1 - AND r.repository.owner.login = :owner - AND r.pull_requests[1].head.repo.name = :repo - AND r.pull_requests[1].number = :pr_num - + LENGTH(r.pull_requests) = 1 + AND r.repository.'owner'.'login' = {owner: String} + AND r.pull_requests[1].'head'.'repo'.'name' = {repo: String} + AND r.pull_requests[1].'number' = {pr_num: Int64} + and r.id in (select id from materialized_views.workflow_run_by_pr_num where pr_number = {pr_num: Int64}) +), +shas_with_info AS ( + SELECT DISTINCT + p.head_commit.'timestamp' as timestamp, + s.pr_number AS pr_number, + s.pr_url as pr_url, + p.head_commit.'id' AS sha, + p.head_commit.'message' as message, + p.head_commit.url AS commit_url + FROM + default.push p final + JOIN shas s ON p.head_commit.id = s.sha + -- Make the query faster by using a materialized view with a more relevant primary key + where p.head_commit.'timestamp' in (select timestamp from materialized_views.push_by_sha where id in (select sha from shas)) ) SELECT * FROM - pr_shas + shas_with_info ORDER BY timestamp diff --git a/torchci/clickhouse_queries/recent_pr_workflows_query/query.sql b/torchci/clickhouse_queries/recent_pr_workflows_query/query.sql index 73dca76139..114b5ab358 100644 --- a/torchci/clickhouse_queries/recent_pr_workflows_query/query.sql +++ b/torchci/clickhouse_queries/recent_pr_workflows_query/query.sql @@ -1,84 +1,83 @@ --- !!! Query is not converted to CH syntax yet. Delete this line when it gets converted -- This workflow is used by Dr.CI to get all the jobs from pull requests. The failures will then be -- classified into new failures and unrelated failures such as broken trunk, flaky, unstable, etc. WITH relevant_shas as ( - select j.head_sha - from workflow_job j - where - PARSE_TIMESTAMP_ISO8601(j.completed_at) > ( - CURRENT_TIMESTAMP() - MINUTES(: numMinutes) - ) - AND :prNumber = 0 - union - select pr.head.sha as head_sha - from commons.pull_request pr - where pr.number = :prNumber + select head_sha + from materialized_views.workflow_job_by_completed_at + where completed_at > now() - Interval {numMinutes: Int64} MINUTES + and {prNumber: Int64} = 0 + union all + select pr.head.'sha' as head_sha + from default.pull_request pr final + where pr.number = {prNumber: Int64} ), recent_prs AS ( SELECT - distinct pull_request.head.sha AS sha, + distinct pull_request.head.'sha' AS sha, pull_request.number AS number, - push.head_commit.timestamp AS timestamp, + push.head_commit.'timestamp' AS timestamp FROM relevant_shas r - JOIN commons.pull_request pull_request ON r.head_sha = pull_request.head.sha HINT(join_broadcast = true) + JOIN default.pull_request pull_request final ON r.head_sha = pull_request.head.'sha' -- Do a left join here because the push table won't have any information about -- commits from forked repo - LEFT JOIN commons.push push ON r.head_sha = push.after HINT(join_strategy = lookup) + LEFT JOIN default.push push final ON r.head_sha = push.head_commit.'id' WHERE - pull_request.base.repo.full_name =: repo + pull_request.base.'repo'.'full_name' = {repo: String} ) SELECT w.id AS workflowId, w.workflow_id as workflowUniqueId, - j.id, + j.id as id, j.runner_name AS runnerName, - w.head_commit.author.email as authorEmail, + w.head_commit.'author'.'email' as authorEmail, CONCAT(w.name, ' / ', j.name) AS name, j.name AS jobName, - j.conclusion, - j.completed_at, - j.html_url, - j.head_branch, + j.conclusion as conclusion, + j.completed_at as completed_at, + j.html_url as html_url, + j.head_branch as head_branch, recent_prs.number AS pr_number, recent_prs.sha AS head_sha, recent_prs.timestamp AS head_sha_timestamp, - j.torchci_classification.captures AS failure_captures, + j.torchci_classification.'captures' AS failure_captures, IF( - j.torchci_classification.line IS NULL, - null, - ARRAY_CREATE(j.torchci_classification.line) + j.torchci_classification.'line' = '', + [], + [j.torchci_classification.'line'] ) AS failure_lines, - j.torchci_classification.context AS failure_context, - j._event_time AS time + j.torchci_classification.'context' AS failure_context, + j.created_at AS time FROM - commons.workflow_run w - JOIN ( - commons.workflow_job j - JOIN recent_prs ON j.head_sha = recent_prs.sha HINT(join_strategy = lookup) - ) ON w.id = j.run_id HINT(join_broadcast = true) -UNION + default.workflow_run w final + JOIN default.workflow_job j final ON w.id = j.run_id + JOIN recent_prs ON j.head_sha = recent_prs.sha +where + w.id in (select id from materialized_views.workflow_run_by_head_sha where head_sha in (select sha from recent_prs)) + and j.id in (select id from materialized_views.workflow_job_by_head_sha where head_sha in (select sha from recent_prs)) +UNION all SELECT null AS workflowId, w.workflow_id as workflowUniqueId, - w.id, + w.id as id, null AS runnerName, - w.head_commit.author.email as authorEmail, + w.head_commit.'author'.'email' as authorEmail, w.name AS name, w.name AS jobName, - w.conclusion, - w.completed_at, - w.html_url, - w.head_branch, + w.conclusion as conclusion, + null as completed_at, + w.html_url as html_url, + w.head_branch as head_branch, recent_prs.number AS pr_number, - w.head_sha, + w.head_sha AS head_sha, recent_prs.timestamp AS head_sha_timestamp, - null AS failure_captures, - null AS failure_lines, - null AS failure_context, - w._event_time as time + [] AS failure_captures, + [] AS failure_lines, + [] AS failure_context, + w.created_at as time FROM - commons.workflow_run w - JOIN recent_prs ON w.head_sha = recent_prs.sha HINT(join_broadcast = true) + default.workflow_run w final + JOIN recent_prs ON w.head_sha = recent_prs.sha +WHERE + w.id in (select id from materialized_views.workflow_run_by_head_sha where head_sha in (select sha from recent_prs)) ORDER BY time DESC diff --git a/torchci/lib/fetchPR.ts b/torchci/lib/fetchPR.ts index 0c64e42ded..31af4cba01 100644 --- a/torchci/lib/fetchPR.ts +++ b/torchci/lib/fetchPR.ts @@ -1,32 +1,24 @@ import { Octokit } from "octokit"; import rocksetVersions from "rockset/prodVersions.json"; +import { enableClickhouse, queryClickhouseSaved } from "./clickhouse"; import getRocksetClient from "./rockset"; import { PRData } from "./types"; -export default async function fetchPR( +async function fetchHistoricalCommits( owner: string, repo: string, - prNumber: string, - octokit: Octokit -): Promise { - // We pull data from both Rockset and Github to get all commits, including - // the ones that have been force merged out of the git history. - // Rockset is the primary source, GitHub covers anything newer that might - // have been missed. - const rocksetClient = getRocksetClient(); - const [pull, commits, historicalCommits] = await Promise.all([ - octokit.rest.pulls.get({ + prNumber: string +) { + if (enableClickhouse()) { + return await queryClickhouseSaved("pr_commits", { + pr_num: prNumber, owner, repo, - pull_number: parseInt(prNumber), - }), - octokit.paginate(octokit.rest.pulls.listCommits, { - owner, - repo, - pull_number: parseInt(prNumber), - per_page: 100, - }), - rocksetClient.queryLambdas.executeQueryLambda( + }); + } + const rocksetClient = getRocksetClient(); + return ( + await rocksetClient.queryLambdas.executeQueryLambda( "commons", "pr_commits", rocksetVersions.commons.pr_commits as string, @@ -49,12 +41,38 @@ export default async function fetchPR( }, ], } - ), + ) + ).results!; +} + +export default async function fetchPR( + owner: string, + repo: string, + prNumber: string, + octokit: Octokit +): Promise { + // We pull data from both Rockset and Github to get all commits, including + // the ones that have been force merged out of the git history. + // Rockset is the primary source, GitHub covers anything newer that might + // have been missed. + const [pull, commits, historicalCommits] = await Promise.all([ + octokit.rest.pulls.get({ + owner, + repo, + pull_number: parseInt(prNumber), + }), + octokit.paginate(octokit.rest.pulls.listCommits, { + owner, + repo, + pull_number: parseInt(prNumber), + per_page: 100, + }), + fetchHistoricalCommits(owner, repo, prNumber), ]); const title = pull.data.title; const body = pull.data.body ?? ""; - let shas = historicalCommits.results!.map((commit) => { + let shas = historicalCommits.map((commit) => { return { sha: commit.sha, title: commit.message.split("\n")[0] }; }); diff --git a/torchci/lib/fetchRecentWorkflows.ts b/torchci/lib/fetchRecentWorkflows.ts index cbc8ab007e..8bdac73be7 100644 --- a/torchci/lib/fetchRecentWorkflows.ts +++ b/torchci/lib/fetchRecentWorkflows.ts @@ -1,4 +1,5 @@ import rocksetVersions from "rockset/prodVersions.json"; +import { enableClickhouse, queryClickhouseSaved } from "./clickhouse"; import getRocksetClient from "./rockset"; import { RecentWorkflowsData } from "./types"; @@ -7,6 +8,23 @@ export async function fetchRecentWorkflows( prNumber: string = "0", numMinutes: string = "30" ): Promise { + if (enableClickhouse()) { + const res = await queryClickhouseSaved("recent_pr_workflows_query", { + numMinutes, + prNumber, + repo, + }); + for (const row of res) { + // Check for time 0 since CH uses default value + if (row["head_sha_timestamp"] == "1970-01-01 00:00:00.000000000") { + row["head_sha_timestamp"] = null; + } + if (row["completed_at"] == "1970-01-01 00:00:00.000000000") { + row["completed_at"] = null; + } + } + return res; + } const rocksetClient = getRocksetClient(); const recentWorkflowsQuery = await rocksetClient.queryLambdas.executeQueryLambda( @@ -39,6 +57,11 @@ export async function fetchRecentWorkflows( export async function fetchFailedJobsFromCommits( shas: string[] ): Promise { + if (enableClickhouse()) { + return await queryClickhouseSaved("commit_failed_jobs", { + shas, + }); + } const rocksetClient = getRocksetClient(); const commitFailedJobsQuery = await rocksetClient.queryLambdas.executeQueryLambda( diff --git a/torchci/pages/api/drci/drci.ts b/torchci/pages/api/drci/drci.ts index ab090e87f6..097219cce1 100644 --- a/torchci/pages/api/drci/drci.ts +++ b/torchci/pages/api/drci/drci.ts @@ -527,7 +527,7 @@ function constructResultsJobsSections( } output += "\n"; - if (job.failure_captures) { + if (job.failure_captures && job.failure_captures.length > 0) { output += ` \`${job.failure_captures[0]}\`\n`; } } @@ -836,8 +836,13 @@ export async function getWorkflowJobsStatuses( for (const job of prInfo.jobs) { if ( - (job.conclusion === undefined || job.conclusion === null) && - (job.completed_at === undefined || job.completed_at === null) + (job.conclusion === undefined || + job.conclusion === null || + job.conclusion === "") && + (job.completed_at === undefined || + job.completed_at === null || + job.completed_at === "" || + job.completed_at === "1970-01-01 00:00:00.000000000") // Time 0 ) { pending++; } else if (job.conclusion === "failure" || job.conclusion === "cancelled") {