Skip to content

Commit

Permalink
Fix async_task to not fail on some multi-statement tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Ngalstyan4 committed Nov 20, 2024
1 parent cd20305 commit 50fc513
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 6 deletions.
25 changes: 20 additions & 5 deletions lantern_hnsw/sql/lantern.sql
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ BEGIN

CREATE TABLE lantern.tasks (
jobid bigserial primary key,
pg_cron_jobid bigint default null, -- lantern.tasks jobid is independent from pg_cron jobid. We sometimes need pg_cron jobid to update status
query text not null,
pg_cron_job_name text default null, -- initially null, because it will be ready after job insertion
job_name text default null,
Expand Down Expand Up @@ -493,12 +494,12 @@ BEGIN
RAISE WARNING 'Lantern Async tasks: Unexpected status %', NEW.status;
END IF;

-- Get the job name from the jobid
-- Call the job finalizer if corresponding job exists BOTH in lantern async tasks AND
-- active cron jobs
-- Update pg_cron_jobid on lantern.tasks table before the job is unscheduled
-- This is necessary because under some circumstances jobs continue changing status even after they no longer
-- appear in cron.job. The easiest way to trigger this case is to schedule a multi-statement job
-- where the second statement causes a failure, e.g. async_task('select 1; select haha;')
UPDATE lantern.tasks t SET
(duration, status, error_message) = (run.end_time - t.started_at, NEW.status,
CASE WHEN NEW.status = 'failed' THEN return_message ELSE NULL END)
pg_cron_jobid = c.jobid
FROM cron.job c
LEFT JOIN cron.job_run_details run
ON c.jobid = run.jobid
Expand All @@ -507,8 +508,22 @@ BEGIN
c.jobid = NEW.jobid
-- using returning as a trick to run the unschedule function as a side effect
-- Note: have to unschedule by jobid because of pg_cron#320 https://github.com/citusdata/pg_cron/issues/320
-- Note2: unscheduling happens here since the update below may run multiple times for the same async task
-- and unscheduling same job multiple times is not allowed
-- At least experimentally so far, this update runs once per async task
RETURNING cron.unschedule(NEW.jobid) INTO res;

-- Get the job name from the jobid
-- Call the job finalizer if corresponding job exists BOTH in lantern async tasks AND
-- active cron jobs
UPDATE lantern.tasks t SET
(duration, status, error_message) = (run.end_time - t.started_at, NEW.status,
CASE WHEN NEW.status = 'failed' THEN return_message ELSE NULL END)
FROM cron.job_run_details run
WHERE
t.pg_cron_jobid = NEW.jobid
AND t.pg_cron_jobid = run.jobid;

RETURN NEW;

EXCEPTION
Expand Down
5 changes: 4 additions & 1 deletion lantern_hnsw/test/expected/async_tasks.out
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,10 @@ SELECT jobid, query, pg_cron_job_name, job_name, duration IS NOT NULL AS is_done
9 | SELECT 42 | async_task_9 | Life | t | succeeded |
10 | CREATE TABLE async_created_table(c text); +| async_task_10 | Multi-stmt job | t | succeeded |
| INSERT INTO async_created_table(c) VALUES ('async_inserted_value'); | | | | |
11 | SELECT 1; SELECT pg_sleep(haha); | async_task_11 | Status issue | t | succeeded |
11 | SELECT 1; SELECT pg_sleep(haha); | async_task_11 | Status issue | t | failed | ERROR: column "haha" does not exist +
| | | | | | LINE 1: SELECT 1; SELECT pg_sleep(haha); +
| | | | | | ^ +
| | | | | |

SET ROLE postgres;
DROP OWNED BY test_user_async;
Expand Down

0 comments on commit 50fc513

Please sign in to comment.