Skip to content

Commit

Permalink
feat: 延迟队列处理
Browse files Browse the repository at this point in the history
  • Loading branch information
bingcool committed Feb 28, 2024
1 parent ee47937 commit b7fdc27
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 9 deletions.
6 changes: 3 additions & 3 deletions Test/Config/component/queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@

'delayQueue' => function() {
$redis = Application::getApp()->get('redis')->getObject();
//return new \Common\Library\Queues\RedisDelayQueue($redis,\Test\Process\QueueProcess\Queue::queue_order_list);
return new \Common\Library\Queues\RedisDelayQueue($redis,\Test\Process\QueueProcess\Queue::queue_order_list);

$predis = Application::getApp()->get('predis')->getObject();
return new \Common\Library\Queues\PredisDelayQueue($predis,\Test\Process\QueueProcess\Queue::queue_order_list);
// $predis = Application::getApp()->get('predis')->getObject();
// return new \Common\Library\Queues\PredisDelayQueue($predis,\Test\Process\QueueProcess\Queue::queue_order_list);
}

];
4 changes: 2 additions & 2 deletions Test/Event.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ public function onInit() {
//var_dump('pid='.$pid);

// redis的队列消费
// ProcessManager::getInstance()->addProcess('redis_list_test', \Test\Process\ListProcess\RedisList::class,true, [], null, true);
ProcessManager::getInstance()->addProcess('redis_list_test', \Test\Process\ListProcess\RedisList::class,true, [], null, true);

// redis的延迟队列消费
ProcessManager::getInstance()->addProcess('redis_delay_list_test', \Test\Process\QueueProcess\Queue::class,true, [], null, true);
// ProcessManager::getInstance()->addProcess('redis_delay_list_test', \Test\Process\QueueProcess\Queue::class,true, [], null, true);


// amqp-direct 生产队
Expand Down
3 changes: 2 additions & 1 deletion Test/Process/ListProcess/RedisList.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class RedisList extends AbstractProcess {
*/
public function run()
{
goAfter(2000, function () {
goTick(2000, function () {
$queue = Factory::getQueue();
$queue->push(['name'=> 'bingcool','num' => rand(1,10000)]);
});
Expand All @@ -36,6 +36,7 @@ public function run()
$list->doHandle();
});

//$queue->retry($data);
//var_dump('This is Redis List Queue process, pop item='.$data);
}

Expand Down
8 changes: 5 additions & 3 deletions Test/Process/QueueProcess/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@ public function run()
{
goAfter(2000, function () {
Factory::getDelayQueue()
->addItem(["order_id" => 1111], 10)
->addItem(["order_id" => 2222], 10)
->addItem(["order_id" => 1111], 2)
->addItem(["order_id" => 2222], 2)
->push();
});

while (true) {
$items = Factory::getDelayQueue()->pop();
// var_dump($items);
foreach ($items as $item) {
var_dump(date("Y-m-d H:i:s"));
var_dump($item);
//Factory::getDelayQueue()->retry($item, 5);
}

sleep(1);
}
}
Expand Down

0 comments on commit b7fdc27

Please sign in to comment.