Skip to content

Commit

Permalink
backend: finalize v2 migration (v2 phase 4 - final)
Browse files Browse the repository at this point in the history
  • Loading branch information
uael committed Jan 30, 2025
1 parent f475f6f commit 6660e8c
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 18 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

138 changes: 124 additions & 14 deletions backend/windmill-api/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
* LICENSE-AGPL for a copy of the license.
*/

use futures::FutureExt;
use sqlx::Executor;
use std::sync::atomic::Ordering;
use std::time::Duration;

use futures::FutureExt;
use sqlx::{
migrate::{Migrate, MigrateError},
pool::PoolConnection,
PgConnection, Pool, Postgres,
Executor, PgConnection, Pool, Postgres,
};

use windmill_audit::audit_ee::{AuditAuthor, AuditAuthorable};
use windmill_common::utils::generate_lock_id;
use windmill_common::{
Expand Down Expand Up @@ -199,6 +201,24 @@ pub async fn migrate(db: &DB) -> Result<(), Error> {
}
});

let db2 = db.clone();
let _ = tokio::task::spawn(async move {
use windmill_common::worker::MIN_VERSION_IS_LATEST;
loop {
if !MIN_VERSION_IS_LATEST.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_secs(30)).await;
continue;
}
if let Err(err) = v2_finalize(&db2).await {
tracing::error!("{err:#}: Could not apply v2 finalize migration, retry in 30s..");
tokio::time::sleep(Duration::from_secs(30)).await;
continue;
}
tracing::info!("v2 finalization step successfully applied.");
break;
}
});

Ok(())
}

Expand Down Expand Up @@ -250,7 +270,7 @@ async fn fix_flow_versioning_migration(
}

macro_rules! run_windmill_migration {
($migration_job_name:expr, $db:expr, $code:block) => {
($migration_job_name:expr, $db:expr, |$tx:ident| $code:block) => {
{
let migration_job_name = $migration_job_name;
let db: &Pool<Postgres> = $db;
Expand All @@ -264,11 +284,11 @@ macro_rules! run_windmill_migration {
.unwrap_or(false);
if !has_done_migration {
tracing::info!("Applying {migration_job_name} migration");
let mut tx = db.begin().await?;
let mut $tx = db.begin().await?;
let mut r = false;
while !r {
r = sqlx::query_scalar!("SELECT pg_try_advisory_lock(4242)")
.fetch_one(&mut *tx)
.fetch_one(&mut *$tx)
.await
.map_err(|e| {
tracing::error!("Error acquiring {migration_job_name} lock: {e:#}");
Expand Down Expand Up @@ -298,17 +318,17 @@ macro_rules! run_windmill_migration {
"INSERT INTO windmill_migrations (name) VALUES ($1) ON CONFLICT DO NOTHING",
migration_job_name
)
.execute(&mut *tx)
.execute(&mut *$tx)
.await?;
tracing::info!("Finished applying {migration_job_name} migration");
} else {
tracing::debug!("migration {migration_job_name} already done");
}

let _ = sqlx::query("SELECT pg_advisory_unlock(4242)")
.execute(&mut *tx)
.execute(&mut *$tx)
.await?;
tx.commit().await?;
$tx.commit().await?;
tracing::info!("released lock for {migration_job_name}");
} else {
tracing::debug!("migration {migration_job_name} already done");
Expand All @@ -318,6 +338,96 @@ macro_rules! run_windmill_migration {
};
}

async fn v2_finalize(db: &DB) -> Result<(), Error> {
run_windmill_migration!("v2_finalize_disable_sync", db, |tx| {
tx.execute(
r#"
DROP FUNCTION v2_job_completed_before_insert() CASCADE;
DROP FUNCTION v2_job_flow_runtime_before_insert() CASCADE;
DROP FUNCTION v2_job_flow_runtime_before_update() CASCADE;
DROP FUNCTION v2_job_queue_after_insert() CASCADE;
DROP FUNCTION v2_job_queue_before_insert() CASCADE;
DROP FUNCTION v2_job_queue_before_update() CASCADE;
DROP FUNCTION v2_job_runtime_before_insert() CASCADE;
DROP FUNCTION v2_job_runtime_before_update() CASCADE;
DROP VIEW completed_job, completed_job_view, job, queue, queue_view CASCADE;
"#,
)
.await?;
});
run_windmill_migration!("v2_finalize_job_queue", db, |tx| {
tx.execute(
r#"
ALTER TABLE v2_job_queue
DROP COLUMN __parent_job CASCADE,
DROP COLUMN __created_by CASCADE,
DROP COLUMN __script_hash CASCADE,
DROP COLUMN __script_path CASCADE,
DROP COLUMN __args CASCADE,
DROP COLUMN __logs CASCADE,
DROP COLUMN __raw_code CASCADE,
DROP COLUMN __canceled CASCADE,
DROP COLUMN __last_ping CASCADE,
DROP COLUMN __job_kind CASCADE,
DROP COLUMN __env_id CASCADE,
DROP COLUMN __schedule_path CASCADE,
DROP COLUMN __permissioned_as CASCADE,
DROP COLUMN __flow_status CASCADE,
DROP COLUMN __raw_flow CASCADE,
DROP COLUMN __is_flow_step CASCADE,
DROP COLUMN __language CASCADE,
DROP COLUMN __same_worker CASCADE,
DROP COLUMN __raw_lock CASCADE,
DROP COLUMN __pre_run_error CASCADE,
DROP COLUMN __email CASCADE,
DROP COLUMN __visible_to_owner CASCADE,
DROP COLUMN __mem_peak CASCADE,
DROP COLUMN __root_job CASCADE,
DROP COLUMN __leaf_jobs CASCADE,
DROP COLUMN __concurrent_limit CASCADE,
DROP COLUMN __concurrency_time_window_s CASCADE,
DROP COLUMN __timeout CASCADE,
DROP COLUMN __flow_step_id CASCADE,
DROP COLUMN __cache_ttl CASCADE;
"#,
)
.await?;
});
run_windmill_migration!("v2_finalize_job_completed", db, |tx| {
tx.execute(
r#"
ALTER TABLE v2_job_completed
DROP COLUMN __parent_job CASCADE,
DROP COLUMN __created_by CASCADE,
DROP COLUMN __created_at CASCADE,
DROP COLUMN __success CASCADE,
DROP COLUMN __script_hash CASCADE,
DROP COLUMN __script_path CASCADE,
DROP COLUMN __args CASCADE,
DROP COLUMN __logs CASCADE,
DROP COLUMN __raw_code CASCADE,
DROP COLUMN __canceled CASCADE,
DROP COLUMN __job_kind CASCADE,
DROP COLUMN __env_id CASCADE,
DROP COLUMN __schedule_path CASCADE,
DROP COLUMN __permissioned_as CASCADE,
DROP COLUMN __raw_flow CASCADE,
DROP COLUMN __is_flow_step CASCADE,
DROP COLUMN __language CASCADE,
DROP COLUMN __is_skipped CASCADE,
DROP COLUMN __raw_lock CASCADE,
DROP COLUMN __email CASCADE,
DROP COLUMN __visible_to_owner CASCADE,
DROP COLUMN __tag CASCADE,
DROP COLUMN __priority CASCADE;
"#,
)
.await?;
});
Ok(())
}

async fn fix_job_completed_index(db: &DB) -> Result<(), Error> {
// let has_done_migration = sqlx::query_scalar!(
// "SELECT EXISTS(SELECT name FROM windmill_migrations WHERE name = 'fix_job_completed_index')"
Expand Down Expand Up @@ -360,7 +470,7 @@ async fn fix_job_completed_index(db: &DB) -> Result<(), Error> {
// tx.commit().await?;
// }

run_windmill_migration!("fix_job_completed_index_2", &db, {
run_windmill_migration!("fix_job_completed_index_2", &db, |tx| {
// sqlx::query(
// "CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_completed_job_workspace_id_created_at_new_2 ON completed_job (workspace_id, job_kind, success, is_skipped, is_flow_step, created_at DESC)"
// ).execute(db).await?;
Expand All @@ -380,7 +490,7 @@ async fn fix_job_completed_index(db: &DB) -> Result<(), Error> {
.await?;
});

run_windmill_migration!("fix_job_completed_index_3", &db, {
run_windmill_migration!("fix_job_completed_index_3", &db, |tx| {
sqlx::query("DROP INDEX CONCURRENTLY IF EXISTS index_completed_job_on_schedule_path")
.execute(db)
.await?;
Expand All @@ -398,7 +508,7 @@ async fn fix_job_completed_index(db: &DB) -> Result<(), Error> {
.await?;
});

run_windmill_migration!("fix_job_index_1", &db, {
run_windmill_migration!("fix_job_index_1", &db, |tx| {
let migration_job_name = "fix_job_completed_index_4";
let mut i = 1;
tracing::info!("step {i} of {migration_job_name} migration");
Expand Down Expand Up @@ -479,7 +589,7 @@ async fn fix_job_completed_index(db: &DB) -> Result<(), Error> {
.await?;
});

run_windmill_migration!("fix_labeled_jobs_index", &db, {
run_windmill_migration!("fix_labeled_jobs_index", &db, |tx| {
tracing::info!("Special migration to add index concurrently on job labels 2");
sqlx::query!("DROP INDEX CONCURRENTLY IF EXISTS labeled_jobs_on_jobs")
.execute(db)
Expand All @@ -489,7 +599,7 @@ async fn fix_job_completed_index(db: &DB) -> Result<(), Error> {
).execute(db).await?;
});

run_windmill_migration!("v2_fix_labeled_jobs_index", &db, {
run_windmill_migration!("v2_fix_labeled_jobs_index", &db, |tx| {
tracing::info!("Special migration to add index concurrently on job labels v2");
sqlx::query!(
"CREATE INDEX CONCURRENTLY v2_labeled_jobs_on_jobs ON v2_job_completed
Expand Down
9 changes: 8 additions & 1 deletion backend/windmill-common/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use std::{
io::Write,
path::{Component, Path, PathBuf},
str::FromStr,
sync::{atomic::AtomicBool, Arc},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tokio::sync::RwLock;
use windmill_macros::annotations;
Expand Down Expand Up @@ -103,6 +106,8 @@ lazy_static::lazy_static! {
pub static ref DISABLE_FLOW_SCRIPT: bool = std::env::var("DISABLE_FLOW_SCRIPT").ok().is_some_and(|x| x == "1" || x == "true");
}

pub static MIN_VERSION_IS_LATEST: AtomicBool = AtomicBool::new(false);

fn format_pull_query(peek: String) -> String {
format!(
"WITH peek AS (
Expand Down Expand Up @@ -658,6 +663,8 @@ pub async fn update_min_version<'c, E: sqlx::Executor<'c, Database = sqlx::Postg
tracing::info!("Minimal worker version: {min_version}");
}

MIN_VERSION_IS_LATEST.store(min_version == cur_version, Ordering::Relaxed);

*MIN_VERSION_IS_AT_LEAST_1_427.write().await = min_version >= Version::new(1, 427, 0);
*MIN_VERSION_IS_AT_LEAST_1_432.write().await = min_version >= Version::new(1, 432, 0);
*MIN_VERSION_IS_AT_LEAST_1_440.write().await = min_version >= Version::new(1, 440, 0);
Expand Down
39 changes: 36 additions & 3 deletions backend/windmill-queue/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,36 +396,66 @@ pub struct WrappedError {

pub trait ValidableJson {
fn is_valid_json(&self) -> bool;
fn wm_labels(&self) -> Option<Vec<String>>;
}

#[derive(serde::Deserialize)]
struct ResultLabels {
wm_labels: Vec<String>,
}

impl ValidableJson for WrappedError {
fn is_valid_json(&self) -> bool {
true
}

fn wm_labels(&self) -> Option<Vec<String>> {
None
}
}

impl ValidableJson for Box<RawValue> {
fn is_valid_json(&self) -> bool {
!self.get().is_empty()
}

fn wm_labels(&self) -> Option<Vec<String>> {
serde_json::from_str::<ResultLabels>(self.get())
.ok()
.map(|r| r.wm_labels)
}
}

impl ValidableJson for Arc<Box<RawValue>> {
impl<T: ValidableJson> ValidableJson for Arc<T> {
fn is_valid_json(&self) -> bool {
!self.get().is_empty()
T::is_valid_json(&self)
}

fn wm_labels(&self) -> Option<Vec<String>> {
T::wm_labels(&self)
}
}

impl ValidableJson for serde_json::Value {
fn is_valid_json(&self) -> bool {
true
}

fn wm_labels(&self) -> Option<Vec<String>> {
serde_json::from_value::<ResultLabels>(self.clone())
.ok()
.map(|r| r.wm_labels)
}
}

impl<T: ValidableJson> ValidableJson for Json<T> {
fn is_valid_json(&self) -> bool {
self.0.is_valid_json()
}

fn wm_labels(&self) -> Option<Vec<String>> {
self.0.wm_labels()
}
}

pub async fn register_metric<T, F, F2, R>(
Expand Down Expand Up @@ -547,6 +577,7 @@ pub async fn add_completed_job<T: Serialize + Send + Sync + ValidableJson>(
serde_json::to_string(&result).unwrap_or_else(|_| "".to_string())
);

let labels = result.wm_labels();
let mem_peak = mem_peak.max(queued_job.mem_peak.unwrap_or(0));
// add_time!(bench, "add_completed_job query START");

Expand All @@ -562,12 +593,13 @@ pub async fn add_completed_job<T: Serialize + Send + Sync + ValidableJson>(
, flow_status
, memory_peak
, status
, labels
)
VALUES ($1, $2, $3, COALESCE($12::bigint, (EXTRACT('epoch' FROM (now())) - EXTRACT('epoch' FROM (COALESCE($3, now()))))*1000), $5, $7, $8, $9,\
$11, CASE WHEN $6::BOOL THEN 'canceled'::job_status
WHEN $10::BOOL THEN 'skipped'::job_status
WHEN $4::BOOL THEN 'success'::job_status
ELSE 'failure'::job_status END)
ELSE 'failure'::job_status END, $13)
ON CONFLICT (id) DO UPDATE SET status = EXCLUDED.status, result = $5 RETURNING duration_ms AS \"duration_ms!\"",
/* $1 */ queued_job.workspace_id,
/* $2 */ queued_job.id,
Expand All @@ -581,6 +613,7 @@ pub async fn add_completed_job<T: Serialize + Send + Sync + ValidableJson>(
/* $10 */ skipped,
/* $11 */ if mem_peak > 0 { Some(mem_peak) } else { None },
/* $12 */ duration,
/* $13 */ labels.as_ref().map(Vec::as_slice),
)
.fetch_one(&mut *tx)
.await
Expand Down

0 comments on commit 6660e8c

Please sign in to comment.