Skip to content

Commit

Permalink
Remove review_prefs from the DB
Browse files Browse the repository at this point in the history
It will be stored in-memory only.
  • Loading branch information
Kobzol committed Feb 18, 2025
1 parent 74d4dde commit 0f17701
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 183 deletions.
1 change: 1 addition & 0 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,4 +344,5 @@ CREATE UNIQUE INDEX IF NOT EXISTS review_prefs_user_id ON review_prefs(user_id);
"
ALTER TABLE review_prefs ADD COLUMN IF NOT EXISTS max_assigned_prs INTEGER DEFAULT NULL;
",
"ALTER TABLE review_prefs DROP COLUMN assigned_prs;",
];
2 changes: 1 addition & 1 deletion src/github.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3050,7 +3050,7 @@ async fn project_items_by_status(
}

/// Retrieve all pull requests in status OPEN that are not drafts
pub async fn retrieve_pull_requests(
pub async fn retrieve_open_pull_requests(
repo: &Repository,
client: &GithubClient,
) -> anyhow::Result<Vec<(User, i32)>> {
Expand Down
98 changes: 36 additions & 62 deletions src/handlers/assign.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
use crate::{
config::{AssignConfig, WarnNonDefaultBranchException},
github::{self, Event, FileDiff, Issue, IssuesAction, Selection},
handlers::{pr_tracking::has_user_capacity, Context, GithubClient, IssuesEvent},
handlers::{Context, GithubClient, IssuesEvent},
interactions::EditIssueBody,
};
use anyhow::{bail, Context as _};
Expand Down Expand Up @@ -560,22 +560,22 @@ pub(super) async fn handle_command(
}
let db_client = ctx.db.get().await;
if is_self_assign(&name, &event.user().login) {
let work_queue = has_user_capacity(&db_client, &name).await;
if work_queue.is_err() {
// NOTE: disabled for now, just log
log::warn!(
"[#{}] PR self-assign failed, DB reported that user {} has no review capacity. Ignoring.",
issue.number,
name
);
// issue
// .post_comment(
// &ctx.github,
// &REVIEWER_HAS_NO_CAPACITY.replace("{username}", &name),
// )
// .await?;
// return Ok(());
}
// let work_queue = has_user_capacity(&db_client, &name).await;
// if work_queue.is_err() {
// // NOTE: disabled for now, just log
// log::warn!(
// "[#{}] PR self-assign failed, DB reported that user {} has no review capacity. Ignoring.",
// issue.number,
// name
// );
// // issue
// // .post_comment(
// // &ctx.github,
// // &REVIEWER_HAS_NO_CAPACITY.replace("{username}", &name),
// // )
// // .await?;
// // return Ok(());
// }

name.to_string()
} else {
Expand Down Expand Up @@ -802,7 +802,7 @@ impl fmt::Display for FindReviewerError {
/// auto-assign groups, or rust-lang team names. It must have at least one
/// entry.
async fn find_reviewer_from_names(
db: &DbClient,
_db: &DbClient,
teams: &Teams,
config: &AssignConfig,
issue: &Issue,
Expand Down Expand Up @@ -841,24 +841,24 @@ async fn find_reviewer_from_names(
}

// filter out team members without capacity
let filtered_candidates = filter_by_capacity(db, &candidates)
.await
.expect("Error while filtering out team members");

if filtered_candidates.is_empty() {
// NOTE: disabled for now, just log
log::info!("[#{}] Filtered list of PR assignee is empty", issue.number);
// return Err(FindReviewerError::AllReviewersFiltered {
// initial: names.to_vec(),
// filtered: names.to_vec(),
// });
}

log::info!(
"[#{}] Filtered list of candidates: {:?}",
issue.number,
filtered_candidates
);
// let filtered_candidates = filter_by_capacity(db, &candidates)
// .await
// .expect("Error while filtering out team members");
//
// if filtered_candidates.is_empty() {
// // NOTE: disabled for now, just log
// log::info!("[#{}] Filtered list of PR assignee is empty", issue.number);
// // return Err(FindReviewerError::AllReviewersFiltered {
// // initial: names.to_vec(),
// // filtered: names.to_vec(),
// // });
// }
//
// log::info!(
// "[#{}] Filtered list of candidates: {:?}",
// issue.number,
// filtered_candidates
// );

// Return unfiltered list of candidates
Ok(candidates
Expand All @@ -868,32 +868,6 @@ async fn find_reviewer_from_names(
.to_string())
}

/// Filter out candidates not having review capacity
async fn filter_by_capacity(
db: &DbClient,
candidates: &HashSet<&str>,
) -> anyhow::Result<HashSet<String>> {
let usernames = candidates
.iter()
.map(|c| *c)
.collect::<Vec<&str>>()
.join(",");

let q = format!(
"
SELECT username
FROM review_prefs r
JOIN users on users.user_id=r.user_id
AND username = ANY('{{ {} }}')
AND CARDINALITY(r.assigned_prs) < LEAST(COALESCE(r.max_assigned_prs,1000000))",
usernames
);
let result = db.query(&q, &[]).await.context("Select DB error")?;
let candidates: HashSet<String> = result.iter().map(|row| row.get("username")).collect();
log::info!("DB returned these candidates: {:?}", candidates);
Ok(candidates)
}

/// Returns a list of candidate usernames (from relevant teams) to choose as a reviewer.
fn candidate_reviewers_from_names<'a>(
teams: &'a Teams,
Expand Down
36 changes: 15 additions & 21 deletions src/handlers/pr_tracking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@
//! - Adds the PR to the workqueue of one team member (after the PR has been assigned or reopened)
//! - Removes the PR from the workqueue of one team member (after the PR has been unassigned or closed)
use super::assign::{FindReviewerError, REVIEWER_HAS_NO_CAPACITY, SELF_ASSIGN_HAS_NO_CAPACITY};
use crate::github::{User, UserId};
use crate::{
config::ReviewPrefsConfig,
github::{IssuesAction, IssuesEvent},
handlers::Context,
ReviewPrefs,
};
use std::collections::{HashMap, HashSet};
use tracing as log;
Expand All @@ -29,6 +27,12 @@ pub struct ReviewerWorkqueue {
reviewers: HashMap<UserId, HashSet<PullRequestNumber>>,
}

impl ReviewerWorkqueue {
pub fn new(reviewers: HashMap<UserId, HashSet<PullRequestNumber>>) -> Self {
Self { reviewers }
}
}

pub(super) enum ReviewPrefsInput {
Assigned { assignee: User },
Unassigned { assignee: User },
Expand Down Expand Up @@ -123,25 +127,15 @@ pub(super) async fn handle_input<'a>(
Ok(())
}

// Check user review capacity.
// Returns error if SQL query fails or user has no capacity
pub async fn has_user_capacity(
db: &crate::db::PooledClient,
assignee: &str,
) -> anyhow::Result<ReviewPrefs, FindReviewerError> {
let q = "
SELECT username, r.*
FROM review_prefs r
JOIN users ON users.user_id = r.user_id
WHERE username = $1
AND CARDINALITY(r.assigned_prs) < LEAST(COALESCE(r.max_assigned_prs,1000000));";
let rec = db.query_one(q, &[&assignee]).await;
if let Err(_) = rec {
return Err(FindReviewerError::ReviewerHasNoCapacity {
username: assignee.to_string(),
});
}
Ok(rec.unwrap().into())
/// Get pull request assignments for a team member
pub async fn get_assigned_prs(ctx: &Context, user_id: UserId) -> HashSet<PullRequestNumber> {
ctx.workqueue
.read()
.await
.reviewers
.get(&user_id)
.cloned()
.unwrap_or_default()
}

/// Add a PR to the workqueue of a team member.
Expand Down
79 changes: 14 additions & 65 deletions src/handlers/pull_requests_assignment_update.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
use std::collections::HashMap;

use crate::db::notifications::record_username;
use crate::github::retrieve_pull_requests;
use crate::github::{retrieve_open_pull_requests, UserId};
use crate::handlers::pr_tracking::{PullRequestNumber, ReviewerWorkqueue};
use crate::jobs::Job;
use crate::ReviewPrefs;
use anyhow::Context as _;
use async_trait::async_trait;
use tokio_postgres::Client as DbClient;
use std::collections::{HashMap, HashSet};

pub struct PullRequestAssignmentUpdate;

Expand All @@ -17,70 +13,23 @@ impl Job for PullRequestAssignmentUpdate {
}

async fn run(&self, ctx: &super::Context, _metadata: &serde_json::Value) -> anyhow::Result<()> {
let db = ctx.db.get().await;
let gh = &ctx.github;

tracing::trace!("starting pull_request_assignment_update");

let rust_repo = gh.repository("rust-lang/rust").await?;
let prs = retrieve_pull_requests(&rust_repo, &gh).await?;

// delete all PR assignments before populating
init_table(&db).await?;

// aggregate by user first
let aggregated = prs.into_iter().fold(HashMap::new(), |mut acc, (user, pr)| {
let (_, prs) = acc.entry(user.id).or_insert_with(|| (user, Vec::new()));
prs.push(pr);
acc
});
let rust_repo = gh.repository("kobzol/bors-kindergarten").await?;
let prs = retrieve_open_pull_requests(&rust_repo, &gh).await?;

// populate the table
for (_user_id, (assignee, prs)) in &aggregated {
let assignee_id = assignee.id;
let _ = record_username(&db, assignee_id, &assignee.login).await;
create_team_member_workqueue(&db, assignee_id, &prs).await?;
}
// Aggregate PRs by user
let aggregated: HashMap<UserId, HashSet<PullRequestNumber>> =
prs.into_iter().fold(HashMap::new(), |mut acc, (user, pr)| {
let prs = acc.entry(user.id).or_default();
prs.insert(pr as PullRequestNumber);
acc
});
tracing::info!("PR assignments\n{aggregated:?}");
*ctx.workqueue.write().await = ReviewerWorkqueue::new(aggregated);

Ok(())
}
}

/// Truncate the review prefs table
async fn init_table(db: &DbClient) -> anyhow::Result<u64> {
let res = db
.execute("UPDATE review_prefs SET assigned_prs='{}';", &[])
.await?;
Ok(res)
}

/// Create a team member work queue
async fn create_team_member_workqueue(
db: &DbClient,
user_id: u64,
prs: &Vec<i32>,
) -> anyhow::Result<u64, anyhow::Error> {
let q = "
INSERT INTO review_prefs (user_id, assigned_prs) VALUES ($1, $2)
ON CONFLICT (user_id)
DO UPDATE SET assigned_prs = $2
WHERE review_prefs.user_id=$1";
db.execute(q, &[&(user_id as i64), prs])
.await
.context("Insert DB error")
}

/// Get pull request assignments for a team member
pub async fn get_review_prefs(db: &DbClient, user_id: u64) -> anyhow::Result<ReviewPrefs> {
let q = "
SELECT username,r.*
FROM review_prefs r
JOIN users on r.user_id=users.user_id
WHERE r.user_id = $1;";
let row = db
.query_one(q, &[&(user_id as i64)])
.await
.context("Error retrieving review preferences")
.unwrap();
Ok(row.into())
}
42 changes: 18 additions & 24 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(clippy::new_without_default)]

use crate::github::PullRequestDetails;
use crate::github::{PullRequestDetails, UserId};

use anyhow::Context;
use handlers::HandlerError;
Expand Down Expand Up @@ -128,43 +128,37 @@ impl From<anyhow::Error> for WebhookError {
#[derive(Debug, Serialize)]
pub struct ReviewPrefs {
pub id: uuid::Uuid,
pub username: String,
pub user_id: i64,
pub assigned_prs: Vec<i32>,
pub max_assigned_prs: Option<i32>,
}

impl ReviewPrefs {
fn to_string(&self) -> String {
let capacity = match self.max_assigned_prs {
Some(max) => format!("{}", max),
None => String::from("Not set (i.e. unlimited)"),
};
let prs = self
.assigned_prs
.iter()
.map(|pr| format!("#{}", pr))
.collect::<Vec<String>>()
.join(", ");
format!(
"Username: {}\nAssigned PRs: {}\nReview capacity: {}",
self.username, prs, capacity
)
}
}

impl From<tokio_postgres::row::Row> for ReviewPrefs {
fn from(row: tokio_postgres::row::Row) -> Self {
Self {
id: row.get("id"),
username: row.get("username"),
user_id: row.get("user_id"),
assigned_prs: row.get("assigned_prs"),
max_assigned_prs: row.get("max_assigned_prs"),
}
}
}

/// Get team member review preferences.
/// If they are missing, returns `Ok(None)`.
pub async fn get_review_prefs(
db: &tokio_postgres::Client,
user_id: UserId,
) -> anyhow::Result<Option<ReviewPrefs>> {
let query = "
SELECT id, user_id, max_assigned_prs
FROM review_prefs
WHERE review_prefs.user_id = $1;";
let row = db
.query_opt(query, &[&(user_id as i64)])
.await
.context("Error retrieving review preferences")?;
Ok(row.map(|r| r.into()))
}

pub fn deserialize_payload<T: serde::de::DeserializeOwned>(v: &str) -> anyhow::Result<T> {
let mut deserializer = serde_json::Deserializer::from_str(&v);
let res: Result<T, _> = serde_path_to_error::deserialize(&mut deserializer);
Expand Down
Loading

0 comments on commit 0f17701

Please sign in to comment.