Skip to content

Commit

Permalink
[ch] pr_commits, some dr ci queries (#5626)
Browse files Browse the repository at this point in the history
the dr ci queries are a bit slow (15s for the recent jobs query)


still gated, did some brief checking on Dr. CI comments but I think it
could use more checking

Dr. CI tests will need to be rewritten for clickhouse mocks

I'm a little tempted to just switch over and not gate keep for dr. ci

For comparing:

```
curl "http://localhost:3000/api/drci/drci" --data 'repo=pytorch' > new.json
```
```
curl --request POST \
    --url 'https://www.torch-ci.com/api/drci/drci' \
    --header 'Authorization: i9ju23nf39nunu3asn' \
    --data 'repo=pytorch' > orig.json
```
```


import json
from library import REPO_ROOT
import re

def failures(thing):
    for pr_num in thing:
        for classification in thing[pr_num]:
            thing[pr_num][classification] = sorted(thing[pr_num][classification], key=lambda x: x["id"])
            for failure in thing[pr_num][classification]:
                if failure["failure_captures"] == None:
                    failure["failure_captures"] = []
                if failure["failure_context"] == None:
                    failure["failure_context"] = []
                if failure["failure_lines"] == None:
                    failure["failure_lines"] = []
                for key, value in failure.items():
                    date_match = re.match(r"(.*) (.*)\.000000000", str(value))
                    if date_match:
                        failure[key] = f"date"
                    date_match_2 = re.match(r"(\d\d\d\d-\d\d-\d\d)T(\d\d:\d\d:\d\d).*", str(value))
                    if date_match_2:
                        failure[key] = f"date"


if __name__ == "__main__":
    orig_file = REPO_ROOT / "../test-infra/torchci/orig.json"
    new_file = REPO_ROOT / "../test-infra/torchci/new.json"

    with open(orig_file, "r") as f:
        orig = json.load(f)

    with open(new_file, "r") as f:
        new = json.load(f)

    failures(orig)
    failures(new)

    with open(orig_file, "w") as f:
        json.dump(orig, f, indent=2)

    with open(new_file, "w") as f:
        json.dump(new, f, indent=2)
```
  • Loading branch information
clee2000 authored Sep 6, 2024
1 parent 7254ee1 commit 174648d
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 121 deletions.
45 changes: 20 additions & 25 deletions torchci/clickhouse_queries/commit_failed_jobs/query.sql
Original file line number Diff line number Diff line change
@@ -1,41 +1,36 @@
-- !!! Query is not converted to CH syntax yet. Delete this line when it gets converted
-- This query is used by Dr.CI to get all the failed jobs from the base commit. They can then be
-- used to decide if a failure is due to broken trunk
with relevant_pushes as (
select
p.head_commit.timestamp,
p.after
from commons.push p
p.head_commit.timestamp as timestamp,
p.head_commit.'id' as after
from default.push p final
where
ARRAY_CONTAINS(
SPLIT(:shas, ','), p.after
)
p.head_commit.'id' in {shas: Array(String)}
)
SELECT
j.id,
j.id as id,
j.name AS jobName,
CONCAT(w.name, ' / ', j.name) AS name,
j.runner_name AS runnerName,
w.head_commit.author.email as authorEmail,
j.conclusion,
j.completed_at,
j.html_url,
j.head_sha,
w.head_commit.'author'.'email' as authorEmail,
j.conclusion as conclusion,
j.completed_at as completed_at,
j.html_url as html_url,
j.head_sha as head_sha,
p.timestamp AS head_sha_timestamp,
j.head_branch,
j.torchci_classification.captures AS failure_captures,
IF(j.torchci_classification.line IS NULL, null, ARRAY_CREATE(j.torchci_classification.line)) AS failure_lines,
j.torchci_classification.context AS failure_context,
j._event_time AS time,
j.head_branch as head_branch,
j.torchci_classification.'captures' AS failure_captures,
IF(j.torchci_classification.'line' = '', [], [j.torchci_classification.'line']) AS failure_lines,
j.torchci_classification.'context' AS failure_context,
j.created_at AS time
FROM
commons.workflow_run w
JOIN commons.workflow_job j ON w.id = j.run_id HINT(join_broadcast = true)
default.workflow_run w final
JOIN default.workflow_job j final ON w.id = j.run_id
-- Do a left join here because the push table won't have any information about
-- commits from forked repo
LEFT JOIN relevant_pushes p ON p.after = j.head_sha HINT(join_strategy = lookup)
LEFT JOIN relevant_pushes p ON p.after = j.head_sha
WHERE
ARRAY_CONTAINS(
SPLIT(: shas, ','),
j.head_sha
)
j.id in (select id from materialized_views.workflow_job_by_head_sha where head_sha in {shas: Array(String)})
and w.id in (select id from materialized_views.workflow_run_by_head_sha where head_sha in {shas: Array(String)})
AND j.conclusion IN ('failure', 'cancelled')
51 changes: 28 additions & 23 deletions torchci/clickhouse_queries/pr_commits/query.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
-- !!! Query is not converted to CH syntax yet. Delete this line when it gets converted
-- This query is used by the HUD's /pull page to populate the list of historical commits
-- made against a given PR.
-- This improves upon the default github commits view because it allows HUD to show jobs
Expand All @@ -8,37 +7,43 @@ WITH
-- Get all PRs that were merged into master, and get all the SHAs for commits from that PR which CI jobs ran against
-- We need the shas because some jobs (like trunk) don't have a PR they explicitly ran against, but they _were_ run against
-- a commit from a PR
pr_shas AS (
shas as (
SELECT DISTINCT
FORMAT_ISO8601(
PARSE_TIMESTAMP_ISO8601(p.head_commit.timestamp),
'America/Los_Angeles'
) as timestamp,
r.pull_requests[1].number AS pr_number,
p.head_commit.id AS sha,
p.head_commit.message,
r.pull_requests[1].'number' AS pr_number,
r.head_sha AS sha,
CONCAT(
'https://github.com/',
:owner,
{owner: String},
'/',
:repo,
{repo: String},
'/pull/',
r.pull_requests[1].number
) AS pr_url,
p.head_commit.url AS commit_url,
r.pull_requests[1].'number'
) AS pr_url
FROM
commons.push p
JOIN commons.workflow_run r ON p.head_commit.id = r.head_sha HINT(join_strategy=lookup)
default.workflow_run r final
WHERE
1 = 1
AND LENGTH(r.pull_requests) = 1
AND r.repository.owner.login = :owner
AND r.pull_requests[1].head.repo.name = :repo
AND r.pull_requests[1].number = :pr_num

LENGTH(r.pull_requests) = 1
AND r.repository.'owner'.'login' = {owner: String}
AND r.pull_requests[1].'head'.'repo'.'name' = {repo: String}
AND r.pull_requests[1].'number' = {pr_num: Int64}
and r.id in (select id from materialized_views.workflow_run_by_pr_num where pr_number = {pr_num: Int64})
),
shas_with_info AS (
SELECT DISTINCT
p.head_commit.'timestamp' as timestamp,
s.pr_number AS pr_number,
s.pr_url as pr_url,
p.head_commit.'id' AS sha,
p.head_commit.'message' as message,
p.head_commit.url AS commit_url
FROM
default.push p final
JOIN shas s ON p.head_commit.id = s.sha
-- Make the query faster by using a materialized view with a more relevant primary key
where p.head_commit.'timestamp' in (select timestamp from materialized_views.push_by_sha where id in (select sha from shas))
)
SELECT
*
FROM
pr_shas
shas_with_info
ORDER BY timestamp
95 changes: 47 additions & 48 deletions torchci/clickhouse_queries/recent_pr_workflows_query/query.sql
Original file line number Diff line number Diff line change
@@ -1,84 +1,83 @@
-- !!! Query is not converted to CH syntax yet. Delete this line when it gets converted
-- This workflow is used by Dr.CI to get all the jobs from pull requests. The failures will then be
-- classified into new failures and unrelated failures such as broken trunk, flaky, unstable, etc.
WITH relevant_shas as (
select j.head_sha
from workflow_job j
where
PARSE_TIMESTAMP_ISO8601(j.completed_at) > (
CURRENT_TIMESTAMP() - MINUTES(: numMinutes)
)
AND :prNumber = 0
union
select pr.head.sha as head_sha
from commons.pull_request pr
where pr.number = :prNumber
select head_sha
from materialized_views.workflow_job_by_completed_at
where completed_at > now() - Interval {numMinutes: Int64} MINUTES
and {prNumber: Int64} = 0
union all
select pr.head.'sha' as head_sha
from default.pull_request pr final
where pr.number = {prNumber: Int64}
),
recent_prs AS (
SELECT
distinct pull_request.head.sha AS sha,
distinct pull_request.head.'sha' AS sha,
pull_request.number AS number,
push.head_commit.timestamp AS timestamp,
push.head_commit.'timestamp' AS timestamp
FROM
relevant_shas r
JOIN commons.pull_request pull_request ON r.head_sha = pull_request.head.sha HINT(join_broadcast = true)
JOIN default.pull_request pull_request final ON r.head_sha = pull_request.head.'sha'
-- Do a left join here because the push table won't have any information about
-- commits from forked repo
LEFT JOIN commons.push push ON r.head_sha = push.after HINT(join_strategy = lookup)
LEFT JOIN default.push push final ON r.head_sha = push.head_commit.'id'
WHERE
pull_request.base.repo.full_name =: repo
pull_request.base.'repo'.'full_name' = {repo: String}
)
SELECT
w.id AS workflowId,
w.workflow_id as workflowUniqueId,
j.id,
j.id as id,
j.runner_name AS runnerName,
w.head_commit.author.email as authorEmail,
w.head_commit.'author'.'email' as authorEmail,
CONCAT(w.name, ' / ', j.name) AS name,
j.name AS jobName,
j.conclusion,
j.completed_at,
j.html_url,
j.head_branch,
j.conclusion as conclusion,
j.completed_at as completed_at,
j.html_url as html_url,
j.head_branch as head_branch,
recent_prs.number AS pr_number,
recent_prs.sha AS head_sha,
recent_prs.timestamp AS head_sha_timestamp,
j.torchci_classification.captures AS failure_captures,
j.torchci_classification.'captures' AS failure_captures,
IF(
j.torchci_classification.line IS NULL,
null,
ARRAY_CREATE(j.torchci_classification.line)
j.torchci_classification.'line' = '',
[],
[j.torchci_classification.'line']
) AS failure_lines,
j.torchci_classification.context AS failure_context,
j._event_time AS time
j.torchci_classification.'context' AS failure_context,
j.created_at AS time
FROM
commons.workflow_run w
JOIN (
commons.workflow_job j
JOIN recent_prs ON j.head_sha = recent_prs.sha HINT(join_strategy = lookup)
) ON w.id = j.run_id HINT(join_broadcast = true)
UNION
default.workflow_run w final
JOIN default.workflow_job j final ON w.id = j.run_id
JOIN recent_prs ON j.head_sha = recent_prs.sha
where
w.id in (select id from materialized_views.workflow_run_by_head_sha where head_sha in (select sha from recent_prs))
and j.id in (select id from materialized_views.workflow_job_by_head_sha where head_sha in (select sha from recent_prs))
UNION all
SELECT
null AS workflowId,
w.workflow_id as workflowUniqueId,
w.id,
w.id as id,
null AS runnerName,
w.head_commit.author.email as authorEmail,
w.head_commit.'author'.'email' as authorEmail,
w.name AS name,
w.name AS jobName,
w.conclusion,
w.completed_at,
w.html_url,
w.head_branch,
w.conclusion as conclusion,
null as completed_at,
w.html_url as html_url,
w.head_branch as head_branch,
recent_prs.number AS pr_number,
w.head_sha,
w.head_sha AS head_sha,
recent_prs.timestamp AS head_sha_timestamp,
null AS failure_captures,
null AS failure_lines,
null AS failure_context,
w._event_time as time
[] AS failure_captures,
[] AS failure_lines,
[] AS failure_context,
w.created_at as time
FROM
commons.workflow_run w
JOIN recent_prs ON w.head_sha = recent_prs.sha HINT(join_broadcast = true)
default.workflow_run w final
JOIN recent_prs ON w.head_sha = recent_prs.sha
WHERE
w.id in (select id from materialized_views.workflow_run_by_head_sha where head_sha in (select sha from recent_prs))
ORDER BY
time DESC
62 changes: 40 additions & 22 deletions torchci/lib/fetchPR.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,24 @@
import { Octokit } from "octokit";
import rocksetVersions from "rockset/prodVersions.json";
import { enableClickhouse, queryClickhouseSaved } from "./clickhouse";
import getRocksetClient from "./rockset";
import { PRData } from "./types";

export default async function fetchPR(
async function fetchHistoricalCommits(
owner: string,
repo: string,
prNumber: string,
octokit: Octokit
): Promise<PRData> {
// We pull data from both Rockset and Github to get all commits, including
// the ones that have been force merged out of the git history.
// Rockset is the primary source, GitHub covers anything newer that might
// have been missed.
const rocksetClient = getRocksetClient();
const [pull, commits, historicalCommits] = await Promise.all([
octokit.rest.pulls.get({
prNumber: string
) {
if (enableClickhouse()) {
return await queryClickhouseSaved("pr_commits", {
pr_num: prNumber,
owner,
repo,
pull_number: parseInt(prNumber),
}),
octokit.paginate(octokit.rest.pulls.listCommits, {
owner,
repo,
pull_number: parseInt(prNumber),
per_page: 100,
}),
rocksetClient.queryLambdas.executeQueryLambda(
});
}
const rocksetClient = getRocksetClient();
return (
await rocksetClient.queryLambdas.executeQueryLambda(
"commons",
"pr_commits",
rocksetVersions.commons.pr_commits as string,
Expand All @@ -49,12 +41,38 @@ export default async function fetchPR(
},
],
}
),
)
).results!;
}

export default async function fetchPR(
owner: string,
repo: string,
prNumber: string,
octokit: Octokit
): Promise<PRData> {
// We pull data from both Rockset and Github to get all commits, including
// the ones that have been force merged out of the git history.
// Rockset is the primary source, GitHub covers anything newer that might
// have been missed.
const [pull, commits, historicalCommits] = await Promise.all([
octokit.rest.pulls.get({
owner,
repo,
pull_number: parseInt(prNumber),
}),
octokit.paginate(octokit.rest.pulls.listCommits, {
owner,
repo,
pull_number: parseInt(prNumber),
per_page: 100,
}),
fetchHistoricalCommits(owner, repo, prNumber),
]);
const title = pull.data.title;
const body = pull.data.body ?? "";

let shas = historicalCommits.results!.map((commit) => {
let shas = historicalCommits.map((commit) => {
return { sha: commit.sha, title: commit.message.split("\n")[0] };
});

Expand Down
23 changes: 23 additions & 0 deletions torchci/lib/fetchRecentWorkflows.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import rocksetVersions from "rockset/prodVersions.json";
import { enableClickhouse, queryClickhouseSaved } from "./clickhouse";
import getRocksetClient from "./rockset";
import { RecentWorkflowsData } from "./types";

Expand All @@ -7,6 +8,23 @@ export async function fetchRecentWorkflows(
prNumber: string = "0",
numMinutes: string = "30"
): Promise<RecentWorkflowsData[]> {
if (enableClickhouse()) {
const res = await queryClickhouseSaved("recent_pr_workflows_query", {
numMinutes,
prNumber,
repo,
});
for (const row of res) {
// Check for time 0 since CH uses default value
if (row["head_sha_timestamp"] == "1970-01-01 00:00:00.000000000") {
row["head_sha_timestamp"] = null;
}
if (row["completed_at"] == "1970-01-01 00:00:00.000000000") {
row["completed_at"] = null;
}
}
return res;
}
const rocksetClient = getRocksetClient();
const recentWorkflowsQuery =
await rocksetClient.queryLambdas.executeQueryLambda(
Expand Down Expand Up @@ -39,6 +57,11 @@ export async function fetchRecentWorkflows(
export async function fetchFailedJobsFromCommits(
shas: string[]
): Promise<RecentWorkflowsData[]> {
if (enableClickhouse()) {
return await queryClickhouseSaved("commit_failed_jobs", {
shas,
});
}
const rocksetClient = getRocksetClient();
const commitFailedJobsQuery =
await rocksetClient.queryLambdas.executeQueryLambda(
Expand Down
Loading

0 comments on commit 174648d

Please sign in to comment.