diff --git a/.sqlx/query-27910109973ed06a1a0f324bbb52fbe76e5070e0f613ca500fc3a38df8f323a6.json b/.sqlx/query-6dc188d70555d7492ca115057c3e5f076c68ade8c96b4f5bbbbdfc83f5f60748.json similarity index 87% rename from .sqlx/query-27910109973ed06a1a0f324bbb52fbe76e5070e0f613ca500fc3a38df8f323a6.json rename to .sqlx/query-6dc188d70555d7492ca115057c3e5f076c68ade8c96b4f5bbbbdfc83f5f60748.json index 9d76891..7523a23 100644 --- a/.sqlx/query-27910109973ed06a1a0f324bbb52fbe76e5070e0f613ca500fc3a38df8f323a6.json +++ b/.sqlx/query-6dc188d70555d7492ca115057c3e5f076c68ade8c96b4f5bbbbdfc83f5f60748.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n with available_task as (\n select id\n from underway.task\n where task_queue_name = $1\n and (\n -- Find pending tasks...\n state = $2\n -- ...Or look for stalled tasks.\n or (\n state = $3\n -- Has heartbeat stalled?\n and last_heartbeat_at < now() - heartbeat\n -- Are there remaining retries?\n and (retry_policy).max_attempts > (\n select count(*)\n from underway.task_attempt\n where task_id = id\n and task_queue_name = $1\n )\n )\n )\n and created_at + delay <= now()\n order by\n priority desc,\n created_at,\n id\n limit 1\n for update skip locked\n )\n update underway.task t\n set state = $3,\n last_attempt_at = now(),\n last_heartbeat_at = now()\n from available_task\n where t.id = available_task.id\n returning\n t.id as \"id: TaskId\",\n task_queue_name as \"queue_name\",\n input,\n timeout,\n heartbeat,\n retry_policy as \"retry_policy: RetryPolicy\",\n concurrency_key\n ", + "query": "\n with available_task as (\n select id\n from underway.task\n where task_queue_name = $1\n and (\n -- Find pending tasks...\n state = $2\n -- ...Or look for stalled tasks.\n or (\n state = $3\n -- Has heartbeat stalled?\n and last_heartbeat_at < now() - heartbeat\n -- Are there remaining retries?\n and (retry_policy).max_attempts > (\n select count(*)\n from underway.task_attempt\n where task_id = id\n and task_queue_name = $1\n )\n )\n )\n and created_at + delay <= now()\n order by\n priority desc,\n created_at,\n id\n limit 1\n for update skip locked\n )\n update underway.task t\n set state = $3,\n last_attempt_at = now(),\n last_heartbeat_at = now()\n from available_task\n where t.task_queue_name = $1\n and t.id = available_task.id\n returning\n t.id as \"id: TaskId\",\n t.task_queue_name as \"queue_name\",\n t.input,\n t.timeout,\n t.heartbeat,\n t.retry_policy as \"retry_policy: RetryPolicy\",\n t.concurrency_key\n ", "describe": { "columns": [ { @@ -106,5 +106,5 @@ true ] }, - "hash": "27910109973ed06a1a0f324bbb52fbe76e5070e0f613ca500fc3a38df8f323a6" + "hash": "6dc188d70555d7492ca115057c3e5f076c68ade8c96b4f5bbbbdfc83f5f60748" } diff --git a/src/queue.rs b/src/queue.rs index 0f99f53..e09cf4f 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -694,15 +694,16 @@ impl Queue { last_attempt_at = now(), last_heartbeat_at = now() from available_task - where t.id = available_task.id + where t.task_queue_name = $1 + and t.id = available_task.id returning t.id as "id: TaskId", - task_queue_name as "queue_name", - input, - timeout, - heartbeat, - retry_policy as "retry_policy: RetryPolicy", - concurrency_key + t.task_queue_name as "queue_name", + t.input, + t.timeout, + t.heartbeat, + t.retry_policy as "retry_policy: RetryPolicy", + t.concurrency_key "#, self.name, TaskState::Pending as TaskState,