diff --git a/lantern_hnsw/sql/lantern.sql b/lantern_hnsw/sql/lantern.sql index 180702bc..0673bba4 100644 --- a/lantern_hnsw/sql/lantern.sql +++ b/lantern_hnsw/sql/lantern.sql @@ -464,6 +464,7 @@ BEGIN CREATE TABLE lantern.tasks ( jobid bigserial primary key, + pg_cron_jobid bigint default null, -- lantern.tasks jobid is independent from pg_cron jobid. We sometimes need pg_cron jobid to update status query text not null, pg_cron_job_name text default null, -- initially null, because it will be ready after job insertion job_name text default null, @@ -493,12 +494,12 @@ BEGIN RAISE WARNING 'Lantern Async tasks: Unexpected status %', NEW.status; END IF; - -- Get the job name from the jobid - -- Call the job finalizer if corresponding job exists BOTH in lantern async tasks AND - -- active cron jobs + -- Update pg_cron_jobid on lantern.tasks table before the job is unscheduled + -- This is necessary because under some circumstances jobs continue changing status even after they no longer + -- appear in cron.job. The easiest way to trigger this case is to schedule a multi-statement job + -- where the second statement causes a failure, e.g. async_task('select 1; select haha;') UPDATE lantern.tasks t SET - (duration, status, error_message) = (run.end_time - t.started_at, NEW.status, - CASE WHEN NEW.status = 'failed' THEN return_message ELSE NULL END) + pg_cron_jobid = c.jobid FROM cron.job c LEFT JOIN cron.job_run_details run ON c.jobid = run.jobid @@ -507,8 +508,22 @@ BEGIN c.jobid = NEW.jobid -- using returning as a trick to run the unschedule function as a side effect -- Note: have to unschedule by jobid because of pg_cron#320 https://github.com/citusdata/pg_cron/issues/320 + -- Note2: unscheduling happens here since the update below may run multiple times for the same async task + -- and unscheduling same job multiple times is not allowed + -- At least experimentally so far, this update runs once per async task RETURNING cron.unschedule(NEW.jobid) INTO res; + -- Get the job name from the jobid + -- Call the job finalizer if corresponding job exists BOTH in lantern async tasks AND + -- active cron jobs + UPDATE lantern.tasks t SET + (duration, status, error_message) = (run.end_time - t.started_at, NEW.status, + CASE WHEN NEW.status = 'failed' THEN return_message ELSE NULL END) + FROM cron.job_run_details run + WHERE + t.pg_cron_jobid = NEW.jobid + AND t.pg_cron_jobid = run.jobid; + RETURN NEW; EXCEPTION diff --git a/lantern_hnsw/test/expected/async_tasks.out b/lantern_hnsw/test/expected/async_tasks.out index d2485872..7d86eebb 100644 --- a/lantern_hnsw/test/expected/async_tasks.out +++ b/lantern_hnsw/test/expected/async_tasks.out @@ -272,7 +272,10 @@ SELECT jobid, query, pg_cron_job_name, job_name, duration IS NOT NULL AS is_done 9 | SELECT 42 | async_task_9 | Life | t | succeeded | 10 | CREATE TABLE async_created_table(c text); +| async_task_10 | Multi-stmt job | t | succeeded | | INSERT INTO async_created_table(c) VALUES ('async_inserted_value'); | | | | | - 11 | SELECT 1; SELECT pg_sleep(haha); | async_task_11 | Status issue | t | succeeded | + 11 | SELECT 1; SELECT pg_sleep(haha); | async_task_11 | Status issue | t | failed | ERROR: column "haha" does not exist + + | | | | | | LINE 1: SELECT 1; SELECT pg_sleep(haha); + + | | | | | | ^ + + | | | | | | SET ROLE postgres; DROP OWNED BY test_user_async;