Skip to content

Commit

Permalink
fixup! backend: migrate job tables to v2 schema (v2 phase 1)
Browse files Browse the repository at this point in the history
  • Loading branch information
uael committed Feb 4, 2025
1 parent 55ea09a commit a16f275
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 14 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/migrations/20250201124431_v2_job.down.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ ALTER TABLE v2_job
DROP COLUMN cache_ttl CASCADE,
DROP COLUMN timeout CASCADE,
DROP COLUMN priority CASCADE,
DROP COLUMN preprocessed CASCADE,
DROP COLUMN args CASCADE,
DROP COLUMN labels CASCADE,
DROP COLUMN pre_run_error CASCADE;
Expand Down
1 change: 1 addition & 0 deletions backend/migrations/20250201124431_v2_job.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ ALTER TABLE v2_job
ADD COLUMN IF NOT EXISTS cache_ttl INTEGER,
ADD COLUMN IF NOT EXISTS timeout INTEGER,
ADD COLUMN IF NOT EXISTS priority SMALLINT,
ADD COLUMN IF NOT EXISTS preprocessed BOOLEAN,
ADD COLUMN IF NOT EXISTS args JSONB,
ADD COLUMN IF NOT EXISTS labels TEXT[],
ADD COLUMN IF NOT EXISTS pre_run_error TEXT;
Expand Down
13 changes: 9 additions & 4 deletions backend/migrations/20250201124743_v2_job_queue_sync.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ CREATE OR REPLACE FUNCTION v2_job_queue_after_insert() RETURNS TRIGGER AS $$ BEG
flow_step, flow_step_id, flow_innermost_root_job,
trigger, trigger_kind,
tag, same_worker, visible_to_owner, concurrent_limit, concurrency_time_window_s, cache_ttl, timeout, priority,
args, pre_run_error,
preprocessed, args, pre_run_error,
raw_code, raw_lock, raw_flow
) VALUES (
NEW.id, NEW.workspace_id, NEW.created_at, NEW.__created_by, NEW.__permissioned_as, NEW.__email,
Expand All @@ -86,7 +86,10 @@ CREATE OR REPLACE FUNCTION v2_job_queue_after_insert() RETURNS TRIGGER AS $$ BEG
NEW.__schedule_path, CASE WHEN NEW.__schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END,
NEW.tag, NEW.__same_worker, NEW.__visible_to_owner, NEW.__concurrent_limit, NEW.__concurrency_time_window_s,
NEW.__cache_ttl, NEW.__timeout, NEW.priority,
NEW.__args, NEW.__pre_run_error,
CASE WHEN (
NEW.__args->>'_ENTRYPOINT_OVERRIDE' IN ('__WM_PREPROCESSOR', 'preprocessor')
OR NEW.__flow_status->'preprocessor_module' IS NOT NULL
) THEN FALSE END, NEW.__args, NEW.__pre_run_error,
NEW.__raw_code, NEW.__raw_lock, NEW.__raw_flow
) ON CONFLICT (id) DO UPDATE SET
workspace_id = EXCLUDED.workspace_id,
Expand All @@ -113,6 +116,7 @@ CREATE OR REPLACE FUNCTION v2_job_queue_after_insert() RETURNS TRIGGER AS $$ BEG
cache_ttl = EXCLUDED.cache_ttl,
timeout = EXCLUDED.timeout,
priority = EXCLUDED.priority,
preprocessed = EXCLUDED.preprocessed,
args = EXCLUDED.args,
pre_run_error = EXCLUDED.pre_run_error,
raw_code = COALESCE(v2_job.raw_code, EXCLUDED.raw_code),
Expand Down Expand Up @@ -151,8 +155,9 @@ CREATE OR REPLACE FUNCTION v2_job_queue_before_update() RETURNS TRIGGER AS $$ BE
END IF;
-- `v2_job`: Only `args` are updated
IF NEW.__args::TEXT IS DISTINCT FROM OLD.__args::TEXT THEN
UPDATE v2_job
SET args = NEW.__args
UPDATE v2_job SET
args = NEW.__args,
preprocessed = CASE WHEN preprocessed = FALSE THEN TRUE END
WHERE id = NEW.id;
END IF;
-- `v2_job_runtime`:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ BEGIN
);
END IF;
-- 3. v2 -> flow_status._metadata.preprocessed_args`
IF job.script_entrypoint_override = '__WM_PREPROCESSOR' AND job.args->'wm_trigger' IS NULL
IF job.script_entrypoint_override = '__WM_PREPROCESSOR' AND job.preprocessed = TRUE
AND (NEW.flow_status IS NULL OR jsonb_typeof(NEW.flow_status) = 'object')
THEN
NEW.flow_status := jsonb_set(
Expand Down Expand Up @@ -121,8 +121,9 @@ BEGIN
-- 3. v2 <- flow_status._metadata.preprocessed_args`
IF NEW.flow_status->'_metadata'->'preprocessed_args' = 'true'::JSONB THEN
UPDATE v2_job SET
args = NEW.__args
WHERE id = NEW.id;
args = NEW.__args,
preprocessed = TRUE
WHERE id = NEW.id AND preprocessed = FALSE;
END IF;
END IF;
RETURN NEW;
Expand Down
16 changes: 12 additions & 4 deletions backend/migrations/20250201124748_v2_migrate_from_v1.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ INSERT INTO v2_job (
flow_step_id, flow_innermost_root_job,
trigger, trigger_kind,
tag, same_worker, visible_to_owner, concurrent_limit, concurrency_time_window_s, cache_ttl, timeout, priority,
args, pre_run_error,
preprocessed, args, pre_run_error,
raw_code, raw_lock, raw_flow
) SELECT
id, workspace_id, created_at, __created_by, __permissioned_as, __email,
Expand All @@ -35,7 +35,10 @@ INSERT INTO v2_job (
__schedule_path, CASE WHEN __schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END,
tag, __same_worker, __visible_to_owner, __concurrent_limit, __concurrency_time_window_s,
__cache_ttl, __timeout, priority,
__args, __pre_run_error,
CASE
WHEN __args->>'_ENTRYPOINT_OVERRIDE' IN ('__WM_PREPROCESSOR', 'preprocessor') THEN FALSE
WHEN __flow_status->'preprocessor_module' IS NOT NULL THEN __args->'wm_trigger' IS NULL
END, __args, __pre_run_error,
__raw_code, __raw_lock, __raw_flow
FROM v2_job_queue
WHERE NOT EXISTS (SELECT 1 FROM v2_job WHERE v2_job.id = v2_job_queue.id);
Expand All @@ -47,7 +50,7 @@ INSERT INTO v2_job (
script_lang, script_entrypoint_override,
trigger, trigger_kind,
tag, visible_to_owner, priority,
args,
preprocessed, args,
raw_code, raw_lock, raw_flow,
labels
) SELECT
Expand All @@ -56,7 +59,7 @@ INSERT INTO v2_job (
__language, __args->>'_ENTRYPOINT_OVERRIDE',
__schedule_path, CASE WHEN __schedule_path IS NOT NULL THEN 'schedule'::job_trigger_kind END,
__tag, __visible_to_owner, __priority,
__args,
CASE WHEN v2_job_completed.flow_status->'_metadata'->'preprocessed_args' = 'true'::JSONB THEN TRUE END, __args,
__raw_code, __raw_lock, __raw_flow,
CASE
WHEN jsonb_typeof(result->'wm_labels') = 'array' AND (
Expand Down Expand Up @@ -96,6 +99,10 @@ UPDATE v2_job SET
cache_ttl = v2_job_queue.__cache_ttl,
timeout = v2_job_queue.__timeout,
priority = v2_job_queue.priority,
preprocessed = CASE
WHEN __args->>'_ENTRYPOINT_OVERRIDE' IN ('__WM_PREPROCESSOR', 'preprocessor') THEN FALSE
WHEN __flow_status->'preprocessor_module' IS NOT NULL THEN __args->'wm_trigger' IS NULL
END,
args = v2_job_queue.__args,
pre_run_error = v2_job_queue.__pre_run_error,
raw_code = COALESCE(v2_job.raw_code, v2_job_queue.__raw_code),
Expand All @@ -121,6 +128,7 @@ UPDATE v2_job SET
tag = v2_job_completed.__tag,
visible_to_owner = v2_job_completed.__visible_to_owner,
priority = v2_job_completed.__priority,
preprocessed = CASE WHEN v2_job_completed.flow_status->'_metadata'->'preprocessed_args' = 'true'::JSONB THEN TRUE END,
args = v2_job_completed.__args,
raw_code = COALESCE(v2_job.raw_code, v2_job_completed.__raw_code),
raw_lock = COALESCE(v2_job.raw_lock, v2_job_completed.__raw_lock),
Expand Down
19 changes: 16 additions & 3 deletions backend/tests/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3903,12 +3903,13 @@ mod job_payload {
})
.run_until_complete_with(db, port, |id| async move {
let job = sqlx::query!(
"SELECT script_entrypoint_override FROM v2_job WHERE id = $1",
"SELECT preprocessed, script_entrypoint_override FROM v2_job WHERE id = $1",
id
)
.fetch_one(db)
.await
.unwrap();
assert_eq!(job.preprocessed, Some(false));
assert_eq!(
job.script_entrypoint_override.as_deref(),
Some("__WM_PREPROCESSOR")
Expand All @@ -3925,6 +3926,11 @@ mod job_payload {
Some(&json!({"preprocessed_args": true}))
);
assert_eq!(job.json_result().unwrap(), json!("Hello bar baz"));
let job = sqlx::query!("SELECT preprocessed FROM v2_job WHERE id = $1", job.id)
.fetch_one(db)
.await
.unwrap();
assert_eq!(job.preprocessed, Some(true));
};
test_for_versions(VERSION_FLAGS.iter().cloned(), test).await;
}
Expand Down Expand Up @@ -4256,7 +4262,13 @@ mod job_payload {
dedicated_worker: None,
apply_preprocessor: true,
})
.run_until_complete(db, port)
.run_until_complete_with(db, port, |id| async move {
let job = sqlx::query!("SELECT preprocessed FROM v2_job WHERE id = $1", id)
.fetch_one(db)
.await
.unwrap();
assert_eq!(job.preprocessed, Some(false));
})
.await;

let args = job.args.as_ref().unwrap();
Expand All @@ -4271,12 +4283,13 @@ mod job_payload {
};
let pp_id = job;
let job = sqlx::query!(
"SELECT script_entrypoint_override FROM v2_job WHERE id = $1",
"SELECT preprocessed, script_entrypoint_override FROM v2_job WHERE id = $1",
pp_id
)
.fetch_one(db)
.await
.unwrap();
assert_eq!(job.preprocessed, Some(true));
assert_eq!(
job.script_entrypoint_override.as_deref(),
Some("preprocessor")
Expand Down

0 comments on commit a16f275

Please sign in to comment.