Skip to content

Commit

Permalink
phase 4: finalize v2 migration
Browse files Browse the repository at this point in the history
  • Loading branch information
uael committed Jan 17, 2025
1 parent 09c6044 commit 8eeffa6
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 14 deletions.
149 changes: 136 additions & 13 deletions backend/windmill-api/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
* LICENSE-AGPL for a copy of the license.
*/

use futures::FutureExt;
use sqlx::Executor;
use std::sync::atomic::Ordering;
use std::time::Duration;

use futures::FutureExt;
use sqlx::{
migrate::{Migrate, MigrateError},
pool::PoolConnection,
PgConnection, Pool, Postgres,
Executor, PgConnection, Pool, Postgres,
};

use windmill_audit::audit_ee::{AuditAuthor, AuditAuthorable};
use windmill_common::utils::generate_lock_id;
use windmill_common::{
Expand Down Expand Up @@ -199,6 +201,24 @@ pub async fn migrate(db: &DB) -> Result<(), Error> {
}
});

let db2 = db.clone();
let _ = tokio::task::spawn(async move {
use windmill_common::worker::MIN_VERSION_IS_LATEST;
loop {
if !MIN_VERSION_IS_LATEST.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_secs(30)).await;
continue;
}
if let Err(err) = v2_finalize(&db2).await {
tracing::error!("{err:#}: Could not apply v2 finalize migration, retry in 30s..");
tokio::time::sleep(Duration::from_secs(30)).await;
continue;
}
tracing::info!("v2 finalization step successfully applied.");
break;
}
});

Ok(())
}

Expand Down Expand Up @@ -250,7 +270,7 @@ async fn fix_flow_versioning_migration(
}

macro_rules! run_windmill_migration {
($migration_job_name:expr, $db:expr, $code:block) => {
($migration_job_name:expr, $db:expr, |$tx:ident| $code:block) => {
{
let migration_job_name = $migration_job_name;
let db: &Pool<Postgres> = $db;
Expand All @@ -264,11 +284,11 @@ macro_rules! run_windmill_migration {
.unwrap_or(false);
if !has_done_migration {
tracing::info!("Applying {migration_job_name} migration");
let mut tx = db.begin().await?;
let mut $tx = db.begin().await?;
let mut r = false;
while !r {
r = sqlx::query_scalar!("SELECT pg_try_advisory_lock(4242)")
.fetch_one(&mut *tx)
.fetch_one(&mut *$tx)
.await
.map_err(|e| {
tracing::error!("Error acquiring {migration_job_name} lock: {e:#}");
Expand Down Expand Up @@ -298,17 +318,17 @@ macro_rules! run_windmill_migration {
"INSERT INTO windmill_migrations (name) VALUES ($1) ON CONFLICT DO NOTHING",
migration_job_name
)
.execute(&mut *tx)
.execute(&mut *$tx)
.await?;
tracing::info!("Finished applying {migration_job_name} migration");
} else {
tracing::debug!("migration {migration_job_name} already done");
}

let _ = sqlx::query("SELECT pg_advisory_unlock(4242)")
.execute(&mut *tx)
.execute(&mut *$tx)
.await?;
tx.commit().await?;
$tx.commit().await?;
tracing::info!("released lock for {migration_job_name}");
} else {
tracing::debug!("migration {migration_job_name} already done");
Expand All @@ -318,6 +338,109 @@ macro_rules! run_windmill_migration {
};
}

async fn v2_finalize(db: &DB) -> Result<(), Error> {
run_windmill_migration!("v2_finalize_disable_sync", db, |tx| {
tx.execute(
r#"
CREATE OR REPLACE TRIGGER v2_queue_instead_of_update_trigger
INSTEAD OF UPDATE ON v2_queue
FOR EACH ROW
EXECUTE PROCEDURE v2_queue_instead_of_update();
CREATE OR REPLACE TRIGGER v2_completed_job_instead_of_update_trigger
INSTEAD OF UPDATE ON v2_completed_job
FOR EACH ROW
EXECUTE PROCEDURE v2_completed_job_instead_of_update();
DROP FUNCTION v2_queue_instead_of_update_overlay() CASCADE;
DROP FUNCTION v2_completed_job_instead_of_update_overlay() CASCADE;
DROP FUNCTION v2_job_completed_before_insert() CASCADE;
DROP FUNCTION v2_job_flow_runtime_before_insert() CASCADE;
DROP FUNCTION v2_job_flow_runtime_before_update() CASCADE;
DROP FUNCTION v2_job_queue_after_insert() CASCADE;
DROP FUNCTION v2_job_queue_before_insert() CASCADE;
DROP FUNCTION v2_job_queue_before_update() CASCADE;
DROP FUNCTION v2_job_runtime_before_insert() CASCADE;
DROP FUNCTION v2_job_runtime_before_update() CASCADE;
DROP VIEW completed_job, completed_job_view, job, queue, queue_view CASCADE;
"#,
)
.await?;
});
run_windmill_migration!("v2_finalize_job_queue", db, |tx| {
tx.execute(
r#"
ALTER TABLE v2_job_queue
DROP COLUMN __parent_job CASCADE,
DROP COLUMN __created_by CASCADE,
DROP COLUMN __script_hash CASCADE,
DROP COLUMN __script_path CASCADE,
DROP COLUMN __args CASCADE,
DROP COLUMN __logs CASCADE,
DROP COLUMN __raw_code CASCADE,
DROP COLUMN __canceled CASCADE,
DROP COLUMN __last_ping CASCADE,
DROP COLUMN __job_kind CASCADE,
DROP COLUMN __env_id CASCADE,
DROP COLUMN __schedule_path CASCADE,
DROP COLUMN __permissioned_as CASCADE,
DROP COLUMN __flow_status CASCADE,
DROP COLUMN __raw_flow CASCADE,
DROP COLUMN __is_flow_step CASCADE,
DROP COLUMN __language CASCADE,
DROP COLUMN __same_worker CASCADE,
DROP COLUMN __raw_lock CASCADE,
DROP COLUMN __pre_run_error CASCADE,
DROP COLUMN __email CASCADE,
DROP COLUMN __visible_to_owner CASCADE,
DROP COLUMN __mem_peak CASCADE,
DROP COLUMN __root_job CASCADE,
DROP COLUMN __leaf_jobs CASCADE,
DROP COLUMN __concurrent_limit CASCADE,
DROP COLUMN __concurrency_time_window_s CASCADE,
DROP COLUMN __timeout CASCADE,
DROP COLUMN __flow_step_id CASCADE,
DROP COLUMN __cache_ttl CASCADE;
"#,
)
.await?;
});
run_windmill_migration!("v2_finalize_job_completed", db, |tx| {
tx.execute(
r#"
ALTER TABLE v2_job_completed
DROP COLUMN __parent_job CASCADE,
DROP COLUMN __created_by CASCADE,
DROP COLUMN __created_at CASCADE,
DROP COLUMN __success CASCADE,
DROP COLUMN __script_hash CASCADE,
DROP COLUMN __script_path CASCADE,
DROP COLUMN __args CASCADE,
DROP COLUMN __logs CASCADE,
DROP COLUMN __deleted CASCADE,
DROP COLUMN __raw_code CASCADE,
DROP COLUMN __canceled CASCADE,
DROP COLUMN __job_kind CASCADE,
DROP COLUMN __env_id CASCADE,
DROP COLUMN __schedule_path CASCADE,
DROP COLUMN __permissioned_as CASCADE,
DROP COLUMN __raw_flow CASCADE,
DROP COLUMN __is_flow_step CASCADE,
DROP COLUMN __language CASCADE,
DROP COLUMN __is_skipped CASCADE,
DROP COLUMN __raw_lock CASCADE,
DROP COLUMN __email CASCADE,
DROP COLUMN __visible_to_owner CASCADE,
DROP COLUMN __tag CASCADE,
DROP COLUMN __priority CASCADE;
"#,
)
.await?;
});
Ok(())
}

async fn fix_job_completed_index(db: &DB) -> Result<(), Error> {
// let has_done_migration = sqlx::query_scalar!(
// "SELECT EXISTS(SELECT name FROM windmill_migrations WHERE name = 'fix_job_completed_index')"
Expand Down Expand Up @@ -360,7 +483,7 @@ async fn fix_job_completed_index(db: &DB) -> Result<(), Error> {
// tx.commit().await?;
// }

run_windmill_migration!("fix_job_completed_index_2", &db, {
run_windmill_migration!("fix_job_completed_index_2", &db, |tx| {
// sqlx::query(
// "CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_completed_job_workspace_id_created_at_new_2 ON completed_job (workspace_id, job_kind, success, is_skipped, is_flow_step, created_at DESC)"
// ).execute(db).await?;
Expand All @@ -380,7 +503,7 @@ async fn fix_job_completed_index(db: &DB) -> Result<(), Error> {
.await?;
});

run_windmill_migration!("fix_job_completed_index_3", &db, {
run_windmill_migration!("fix_job_completed_index_3", &db, |tx| {
sqlx::query("DROP INDEX CONCURRENTLY IF EXISTS index_completed_job_on_schedule_path")
.execute(db)
.await?;
Expand All @@ -398,7 +521,7 @@ async fn fix_job_completed_index(db: &DB) -> Result<(), Error> {
.await?;
});

run_windmill_migration!("fix_job_index_1", &db, {
run_windmill_migration!("fix_job_index_1", &db, |tx| {
let migration_job_name = "fix_job_completed_index_4";
let mut i = 1;
tracing::info!("step {i} of {migration_job_name} migration");
Expand Down Expand Up @@ -479,7 +602,7 @@ async fn fix_job_completed_index(db: &DB) -> Result<(), Error> {
.await?;
});

run_windmill_migration!("fix_labeled_jobs_index", &db, {
run_windmill_migration!("fix_labeled_jobs_index", &db, |tx| {
tracing::info!("Special migration to add index concurrently on job labels 2");
sqlx::query!("DROP INDEX CONCURRENTLY IF EXISTS labeled_jobs_on_jobs")
.execute(db)
Expand Down
9 changes: 8 additions & 1 deletion backend/windmill-common/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use std::{
io::Write,
path::{Component, Path, PathBuf},
str::FromStr,
sync::{atomic::AtomicBool, Arc},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tokio::sync::RwLock;
use windmill_macros::annotations;
Expand Down Expand Up @@ -103,6 +106,8 @@ lazy_static::lazy_static! {
pub static ref DISABLE_FLOW_SCRIPT: bool = std::env::var("DISABLE_FLOW_SCRIPT").ok().is_some_and(|x| x == "1" || x == "true");
}

pub static MIN_VERSION_IS_LATEST: AtomicBool = AtomicBool::new(false);

pub async fn make_suspended_pull_query(wc: &WorkerConfig) {
if wc.worker_tags.len() == 0 {
tracing::error!("Empty tags in worker tags, skipping");
Expand Down Expand Up @@ -609,6 +614,8 @@ pub async fn update_min_version<'c, E: sqlx::Executor<'c, Database = sqlx::Postg
tracing::info!("Minimal worker version: {min_version}");
}

MIN_VERSION_IS_LATEST.store(min_version == cur_version, Ordering::Relaxed);

*MIN_VERSION_IS_AT_LEAST_1_427.write().await = min_version >= Version::new(1, 427, 0);
*MIN_VERSION_IS_AT_LEAST_1_432.write().await = min_version >= Version::new(1, 432, 0);
*MIN_VERSION_IS_AT_LEAST_1_440.write().await = min_version >= Version::new(1, 440, 0);
Expand Down

0 comments on commit 8eeffa6

Please sign in to comment.