Skip to content

Commit

Permalink
TASK: Protect against possible race conditions (#5)
Browse files Browse the repository at this point in the history
Uses transactions (multi / exec) or command reordering to prevent some
possible race conditions.
  • Loading branch information
hlubek authored and Bastian Waidelich committed May 17, 2017
1 parent 0438af4 commit c5045ee
Showing 1 changed file with 8 additions and 6 deletions.
14 changes: 8 additions & 6 deletions Classes/Queue/RedisQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public function submit($payload, array $options = [])
$messageId = Algorithms::generateUUID();
$idStored = $this->client->hSet("queue:{$this->name}:ids", $messageId, json_encode($payload));
if ($idStored === 0) {
return null;
throw new JobQueueException(sprintf('Duplicate message id: "%s"', $messageId), 1470656350);
}

$this->client->lPush("queue:{$this->name}:messages", $messageId);
Expand Down Expand Up @@ -141,10 +141,11 @@ public function waitAndReserve($timeout = null)
public function release($messageId, array $options = [])
{
$this->checkClientConnection();
$this->client->lRem("queue:{$this->name}:processing", $messageId, 0);
$numberOfReleases = (integer)$this->client->hGet("queue:{$this->name}:releases", $messageId);
$this->client->hSet("queue:{$this->name}:releases", $messageId, $numberOfReleases + 1);
$this->client->lPush("queue:{$this->name}:messages", $messageId);
$this->client->multi()
->lRem("queue:{$this->name}:processing", $messageId, 0)
->hIncrBy("queue:{$this->name}:releases", $messageId, 1)
->lPush("queue:{$this->name}:messages", $messageId)
->exec();
}

/**
Expand All @@ -165,9 +166,10 @@ public function abort($messageId)
public function finish($messageId)
{
$this->checkClientConnection();
$numberOfRemoved = $this->client->lRem("queue:{$this->name}:processing", $messageId, 0);
$this->client->hDel("queue:{$this->name}:ids", $messageId);
$this->client->hDel("queue:{$this->name}:releases", $messageId);
return $this->client->lRem("queue:{$this->name}:processing", $messageId, 0) > 0;
return $numberOfRemoved > 0;
}

/**
Expand Down

0 comments on commit c5045ee

Please sign in to comment.