From d8dc0877b151461bc1b21ee481b818fa85e4cfda Mon Sep 17 00:00:00 2001 From: Max Countryman Date: Sat, 16 Nov 2024 14:53:28 -0800 Subject: [PATCH] provide queue name in dequeue update While this is not technically required, since the CTE has already filtered with the name, it's nonetheless more clear regarding the intent and underlying data model at play. --- ...7c3e5f076c68ade8c96b4f5bbbbdfc83f5f60748.json} | 4 ++-- src/queue.rs | 15 ++++++++------- 2 files changed, 10 insertions(+), 9 deletions(-) rename .sqlx/{query-27910109973ed06a1a0f324bbb52fbe76e5070e0f613ca500fc3a38df8f323a6.json => query-6dc188d70555d7492ca115057c3e5f076c68ade8c96b4f5bbbbdfc83f5f60748.json} (87%) diff --git a/.sqlx/query-27910109973ed06a1a0f324bbb52fbe76e5070e0f613ca500fc3a38df8f323a6.json b/.sqlx/query-6dc188d70555d7492ca115057c3e5f076c68ade8c96b4f5bbbbdfc83f5f60748.json similarity index 87% rename from .sqlx/query-27910109973ed06a1a0f324bbb52fbe76e5070e0f613ca500fc3a38df8f323a6.json rename to .sqlx/query-6dc188d70555d7492ca115057c3e5f076c68ade8c96b4f5bbbbdfc83f5f60748.json index 9d76891..7523a23 100644 --- a/.sqlx/query-27910109973ed06a1a0f324bbb52fbe76e5070e0f613ca500fc3a38df8f323a6.json +++ b/.sqlx/query-6dc188d70555d7492ca115057c3e5f076c68ade8c96b4f5bbbbdfc83f5f60748.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n with available_task as (\n select id\n from underway.task\n where task_queue_name = $1\n and (\n -- Find pending tasks...\n state = $2\n -- ...Or look for stalled tasks.\n or (\n state = $3\n -- Has heartbeat stalled?\n and last_heartbeat_at < now() - heartbeat\n -- Are there remaining retries?\n and (retry_policy).max_attempts > (\n select count(*)\n from underway.task_attempt\n where task_id = id\n and task_queue_name = $1\n )\n )\n )\n and created_at + delay <= now()\n order by\n priority desc,\n created_at,\n id\n limit 1\n for update skip locked\n )\n update underway.task t\n set state = $3,\n last_attempt_at = now(),\n last_heartbeat_at = now()\n from available_task\n where t.id = available_task.id\n returning\n t.id as \"id: TaskId\",\n task_queue_name as \"queue_name\",\n input,\n timeout,\n heartbeat,\n retry_policy as \"retry_policy: RetryPolicy\",\n concurrency_key\n ", + "query": "\n with available_task as (\n select id\n from underway.task\n where task_queue_name = $1\n and (\n -- Find pending tasks...\n state = $2\n -- ...Or look for stalled tasks.\n or (\n state = $3\n -- Has heartbeat stalled?\n and last_heartbeat_at < now() - heartbeat\n -- Are there remaining retries?\n and (retry_policy).max_attempts > (\n select count(*)\n from underway.task_attempt\n where task_id = id\n and task_queue_name = $1\n )\n )\n )\n and created_at + delay <= now()\n order by\n priority desc,\n created_at,\n id\n limit 1\n for update skip locked\n )\n update underway.task t\n set state = $3,\n last_attempt_at = now(),\n last_heartbeat_at = now()\n from available_task\n where t.task_queue_name = $1\n and t.id = available_task.id\n returning\n t.id as \"id: TaskId\",\n t.task_queue_name as \"queue_name\",\n t.input,\n t.timeout,\n t.heartbeat,\n t.retry_policy as \"retry_policy: RetryPolicy\",\n t.concurrency_key\n ", "describe": { "columns": [ { @@ -106,5 +106,5 @@ true ] }, - "hash": "27910109973ed06a1a0f324bbb52fbe76e5070e0f613ca500fc3a38df8f323a6" + "hash": "6dc188d70555d7492ca115057c3e5f076c68ade8c96b4f5bbbbdfc83f5f60748" } diff --git a/src/queue.rs b/src/queue.rs index 0f99f53..e09cf4f 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -694,15 +694,16 @@ impl Queue { last_attempt_at = now(), last_heartbeat_at = now() from available_task - where t.id = available_task.id + where t.task_queue_name = $1 + and t.id = available_task.id returning t.id as "id: TaskId", - task_queue_name as "queue_name", - input, - timeout, - heartbeat, - retry_policy as "retry_policy: RetryPolicy", - concurrency_key + t.task_queue_name as "queue_name", + t.input, + t.timeout, + t.heartbeat, + t.retry_policy as "retry_policy: RetryPolicy", + t.concurrency_key "#, self.name, TaskState::Pending as TaskState,