Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

start queueing rebuilds for the latest version of each crate, if we had docs #2645

Merged
merged 3 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions src/bin/cratesfyi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ enum CommandLine {
repository_stats_updater: Toggle,
#[arg(long = "cdn-invalidator", default_value = "enabled", value_enum)]
cdn_invalidator: Toggle,
#[arg(long = "queue-rebuilds", default_value = "enabled", value_enum)]
queue_rebuilds: Toggle,
},

StartBuildServer {
Expand Down Expand Up @@ -192,13 +194,17 @@ impl CommandLine {
metric_server_socket_addr,
repository_stats_updater,
cdn_invalidator,
queue_rebuilds,
} => {
if repository_stats_updater == Toggle::Enabled {
docs_rs::utils::daemon::start_background_repository_stats_updater(&ctx)?;
}
if cdn_invalidator == Toggle::Enabled {
docs_rs::utils::daemon::start_background_cdn_invalidator(&ctx)?;
}
if queue_rebuilds == Toggle::Enabled {
docs_rs::utils::daemon::start_background_queue_rebuild(&ctx)?;
}

start_background_metrics_webserver(Some(metric_server_socket_addr), &ctx)?;

Expand Down
191 changes: 186 additions & 5 deletions src/build_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ use crate::{cdn, BuildPackageSummary};
use crate::{Config, Index, InstanceMetrics, RustwideBuilder};
use anyhow::Context as _;
use fn_error_context::context;
use futures_util::stream::TryStreamExt;
use futures_util::{stream::TryStreamExt, StreamExt};
use sqlx::Connection as _;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::runtime::Runtime;
use tracing::{debug, error, info};
use tracing::{debug, error, info, instrument};

// Threshold priority to decide whether a crate will in the rebuild-queue-list.
// If crate is in the rebuild-queue-list it won't in the build-queue-list.
/// The static priority for background rebuilds.
/// Used when queueing rebuilds, and when rendering them
/// collapsed in the UI.
/// For normal build priorities we use smaller values.
pub(crate) const REBUILD_PRIORITY: i32 = 20;

#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize)]
Expand Down Expand Up @@ -656,12 +658,191 @@ impl BuildQueue {
}
}

/// Queue rebuilds as configured.
///
/// The idea is to rebuild:
/// * the latest release of each crate
/// * when the nightly version is older than our configured threshold
/// * and there was a successful build for that release, that included documentation.
/// * starting with the oldest nightly versions.
/// * also checking if there is already a build queued.
///
/// This might exclude releases from rebuilds that
/// * previously failed but would succeed with a newer nightly version
/// * previously failed but would succeed just with a retry.
#[instrument(skip_all)]
pub async fn queue_rebuilds(
conn: &mut sqlx::PgConnection,
config: &Config,
build_queue: &AsyncBuildQueue,
) -> Result<()> {
let already_queued_rebuilds = sqlx::query_scalar!(
r#"SELECT COUNT(*) as "count!" FROM queue WHERE priority >= $1"#,
REBUILD_PRIORITY
)
.fetch_one(&mut *conn)
.await?;

let rebuilds_to_queue = config
.max_queued_rebuilds
.expect("config.max_queued_rebuilds not set") as i64
- already_queued_rebuilds;

if rebuilds_to_queue <= 0 {
info!("not queueing rebuilds; queue limit reached");
return Ok(());
}

let mut results = sqlx::query!(
"SELECT i.* FROM (
SELECT
c.name,
r.version,
max(b.rustc_nightly_date) as rustc_nightly_date

FROM crates AS c
INNER JOIN releases AS r ON c.latest_version_id = r.id
INNER JOIN builds AS b ON r.id = b.rid

WHERE
r.rustdoc_status = TRUE

GROUP BY c.name, r.version
) as i
WHERE i.rustc_nightly_date < $1
ORDER BY i.rustc_nightly_date ASC
LIMIT $2",
config
.rebuild_up_to_date
.expect("config.rebuild_up_to_date not set"),
rebuilds_to_queue,
)
.fetch(&mut *conn);

while let Some(row) = results.next().await {
let row = row?;

if !build_queue
.has_build_queued(&row.name, &row.version)
.await?
{
info!("queueing rebuild for {} {}...", &row.name, &row.version);
build_queue
.add_crate(&row.name, &row.version, REBUILD_PRIORITY, None)
.await?;
}
}

Ok(())
}

#[cfg(test)]
mod tests {
use crate::test::FakeBuild;

use super::*;
use chrono::Utc;
use chrono::{NaiveDate, Utc};
use std::time::Duration;

#[test]
fn test_dont_rebuild_when_new() {
crate::test::async_wrapper(|env| async move {
env.override_config(|config| {
config.max_queued_rebuilds = Some(100);
config.rebuild_up_to_date = Some(NaiveDate::from_ymd_opt(2020, 1, 1).unwrap());
});

env.async_fake_release()
.await
.name("foo")
.version("0.1.0")
.builds(vec![FakeBuild::default()
.rustc_version("rustc 1.84.0-nightly (e7c0d2750 2020-10-15)")])
.create_async()
.await?;

let build_queue = env.async_build_queue().await;
assert!(build_queue.queued_crates().await?.is_empty());

let mut conn = env.async_db().await.async_conn().await;
queue_rebuilds(&mut conn, &env.config(), &build_queue).await?;

assert!(build_queue.queued_crates().await?.is_empty());

Ok(())
})
}

#[test]
fn test_rebuild_when_old() {
crate::test::async_wrapper(|env| async move {
env.override_config(|config| {
config.max_queued_rebuilds = Some(100);
config.rebuild_up_to_date = Some(NaiveDate::from_ymd_opt(2024, 1, 1).unwrap());
});

env.async_fake_release()
.await
.name("foo")
.version("0.1.0")
.builds(vec![FakeBuild::default()
.rustc_version("rustc 1.84.0-nightly (e7c0d2750 2020-10-15)")])
.create_async()
.await?;

let build_queue = env.async_build_queue().await;
assert!(build_queue.queued_crates().await?.is_empty());

let mut conn = env.async_db().await.async_conn().await;
queue_rebuilds(&mut conn, &env.config(), &build_queue).await?;

let queue = build_queue.queued_crates().await?;
assert_eq!(queue.len(), 1);
assert_eq!(queue[0].name, "foo");
assert_eq!(queue[0].version, "0.1.0");
assert_eq!(queue[0].priority, REBUILD_PRIORITY);

Ok(())
})
}

#[test]
fn test_dont_rebuild_when_full() {
crate::test::async_wrapper(|env| async move {
env.override_config(|config| {
config.max_queued_rebuilds = Some(1);
config.rebuild_up_to_date = Some(NaiveDate::from_ymd_opt(2024, 1, 1).unwrap());
});

let build_queue = env.async_build_queue().await;
build_queue
.add_crate("foo1", "0.1.0", REBUILD_PRIORITY, None)
.await?;
build_queue
.add_crate("foo2", "0.1.0", REBUILD_PRIORITY, None)
.await?;

env.async_fake_release()
.await
.name("foo")
.version("0.1.0")
.builds(vec![FakeBuild::default()
.rustc_version("rustc 1.84.0-nightly (e7c0d2750 2020-10-15)")])
.create_async()
.await?;

let build_queue = env.async_build_queue().await;
assert_eq!(build_queue.queued_crates().await?.len(), 2);

let mut conn = env.async_db().await.async_conn().await;
queue_rebuilds(&mut conn, &env.config(), &build_queue).await?;

assert_eq!(build_queue.queued_crates().await?.len(), 2);

Ok(())
})
}

#[test]
fn test_add_duplicate_doesnt_fail_last_priority_wins() {
crate::test::async_wrapper(|env| async move {
Expand Down
7 changes: 7 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{cdn::CdnKind, storage::StorageKind};
use anyhow::{anyhow, bail, Context, Result};
use chrono::NaiveDate;
use std::{env::VarError, error::Error, path::PathBuf, str::FromStr, time::Duration};
use tracing::trace;
use url::Url;
Expand Down Expand Up @@ -113,6 +114,10 @@ pub struct Config {
pub(crate) build_default_memory_limit: Option<usize>,
pub(crate) include_default_targets: bool,
pub(crate) disable_memory_limit: bool,

// automatic rebuild configuration
pub(crate) max_queued_rebuilds: Option<u16>,
pub(crate) rebuild_up_to_date: Option<NaiveDate>,
}

impl Config {
Expand Down Expand Up @@ -230,6 +235,8 @@ impl Config {
"DOCSRS_BUILD_WORKSPACE_REINITIALIZATION_INTERVAL",
86400,
)?),
max_queued_rebuilds: maybe_env("DOCSRS_MAX_QUEUED_REBUILDS")?,
rebuild_up_to_date: maybe_env("DOCSRS_REBUILD_UP_TO_DATE")?,
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! documentation of crates for the Rust Programming Language.
#![allow(clippy::cognitive_complexity)]

pub use self::build_queue::{AsyncBuildQueue, BuildQueue};
pub use self::build_queue::{queue_rebuilds, AsyncBuildQueue, BuildQueue};
pub use self::config::Config;
pub use self::context::Context;
pub use self::docbuilder::PackageKind;
Expand Down
32 changes: 31 additions & 1 deletion src/utils/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! This daemon will start web server, track new packages and build them

use crate::{
cdn,
cdn, queue_rebuilds,
utils::{queue_builder, report_error},
web::start_web_server,
AsyncBuildQueue, Config, Context, Index, RustwideBuilder,
Expand Down Expand Up @@ -91,6 +91,35 @@ pub fn start_background_repository_stats_updater(context: &dyn Context) -> Resul
Ok(())
}

pub fn start_background_queue_rebuild(context: &dyn Context) -> Result<(), Error> {
let runtime = context.runtime()?;
let pool = context.pool()?;
let config = context.config()?;
let build_queue = runtime.block_on(context.async_build_queue())?;

if config.max_queued_rebuilds.is_none() || config.rebuild_up_to_date.is_none() {
info!("rebuild config incomplete, skipping rebuild queueing");
return Ok(());
}

async_cron(
&runtime,
"background queue rebuilder",
Duration::from_secs(60 * 60),
move || {
let pool = pool.clone();
let build_queue = build_queue.clone();
let config = config.clone();
async move {
let mut conn = pool.get_async().await?;
queue_rebuilds(&mut conn, &config, &build_queue).await?;
Ok(())
}
},
);
Ok(())
}

pub fn start_background_cdn_invalidator(context: &dyn Context) -> Result<(), Error> {
let metrics = context.instance_metrics()?;
let config = context.config()?;
Expand Down Expand Up @@ -183,6 +212,7 @@ pub fn start_daemon<C: Context + Send + Sync + 'static>(

start_background_repository_stats_updater(&*context)?;
start_background_cdn_invalidator(&*context)?;
start_background_queue_rebuild(&*context)?;

// NOTE: if a error occurred earlier in `start_daemon`, the server will _not_ be joined -
// instead it will get killed when the process exits.
Expand Down
Loading