Skip to content

Commit

Permalink
Receiver Priority
Browse files Browse the repository at this point in the history
- Added support for receiver priority sorting when pools match
  more than one receiver

Signed-off-by: RJ Garcia <[email protected]>
  • Loading branch information
ragboyjr committed Nov 11, 2020
1 parent 9e633b8 commit ee154cd
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 25 deletions.
50 changes: 50 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,56 @@ Each pool config must have a `receivers` property which is a simple Glob that wi

It's important to note, that a receiver can ONLY be apart of one pool. So if two pools have receiver patterns that match the same receiver, then the first defined pool would own that receiver.

### Receiver Priority

By default, if a pool matches more than one receiver, the order in which the receivers are defined in the framework messenger configuration will be the order in which they are consumed.

Let's look at an example:

```yaml
# auto scale config
messenger_auto_scale:
pools:
default:
receivers: '*' # this will match all receivers defined
# messenger config
framework:
messenger:
transports:
transport1: ''
transport2: ''
transport3: ''
```

Every worker in the pool will first process messages in transport1, then once empty, they will look at transport2, and so on. Essentially, we're making a call to the messenger consume command like: `console messenger:consume transport1 transport2 transport3`

If you'd like to be a bit more explicit about receiver priority, then you can define the priority option on your transport which will ensure that receivers with the highest priority will get processed before receivers with lower priority. If two receivers have the same priority, then the order in which they are defined will take precedent.

Let's look at an example:

```yaml
# auto scale config
messenger_auto_scale:
pools:
default:
receivers: '*' # this will match all receivers defined
# messenger config
framework:
messenger:
transports:
transport3:
dsn: ''
options: { priority: -1 }
transport1:
dsn: ''
options: { priority: 1 }
transport2: '' # default priority is 0
```

This would have the same effect as the above configuration. Even though the transports are defined in a different order, the priority option ensures they are in the same order as above.

### Disabling Must Match All Receivers

By default, the bundle will throw an exception if any receivers are not matched by the pool config. This is to help prevent any unexpected bugs where you the receiver name is for some reason not matched by a pool when you expected it to.
Expand Down
35 changes: 23 additions & 12 deletions src/DependencyInjection/BuildSupervisorPoolConfigCompilerPass.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
use Krak\SymfonyMessengerAutoScale\Internal\Glob;
use Krak\SymfonyMessengerAutoScale\PoolConfig;
use Krak\SymfonyMessengerAutoScale\SupervisorPoolConfig;
use Symfony\Bundle\FrameworkBundle;
use Symfony\Component\Config\Definition\Processor;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;

final class BuildSupervisorPoolConfigCompilerPass implements CompilerPassInterface
{
public function process(ContainerBuilder $container) {
$receiverNames = $this->findReceiverNames($container);

$receiverNames = $this->findReceiverNamesSortedByPriorityAndPosition($container);
$this->registerMappedPoolConfigData($container, $receiverNames);
$container->setParameter('messenger_auto_scale.receiver_names', $receiverNames);
}
Expand Down Expand Up @@ -63,17 +64,27 @@ private function matchReceiverNameFromRawPool(array $rawPool, array $receiverNam
return [$matched, $unmatched];
}

private function findReceiverNames(ContainerBuilder $container): array {
$receiverMapping = [];
foreach ($container->findTaggedServiceIds('messenger.receiver') as $id => $tags) {
foreach ($tags as $tag) {
if (isset($tag['alias'])) {
$receiverMapping[$tag['alias']] = null;
}
}
private function findReceiverNamesSortedByPriorityAndPosition(ContainerBuilder $container): array {
$frameworkConfig = (new Processor())->processConfiguration(
new FrameworkBundle\DependencyInjection\Configuration($container->getParameter('kernel.debug')),
$container->getExtensionConfig('framework')
);
$transports = $frameworkConfig['messenger']['transports'] ?? [];
$receiverNamesToSort = [];
$position = 0;
foreach ($transports as $transportName => $config) {
$receiverNamesToSort[] = [
'name' => $transportName,
'priority' => $config['options']['priority'] ?? 0,
'position' => $position
];
$position += 1;
}

return \array_unique(\array_keys($receiverMapping));
usort($receiverNamesToSort, function(array $a, array $b) {
// sort by priority desc, position asc
return ($b['priority'] <=> $a['priority']) ?: ($a['position'] <=> $b['position']);
});
return array_column($receiverNamesToSort, 'name');
}

public static function createSupervisorPoolConfigsFromArray(array $poolConfigs): array {
Expand Down
53 changes: 40 additions & 13 deletions tests/Feature/BundleTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,26 @@ protected function getBundleClass() {

/** @test */
public function supervisor_pool_config_is_built_from_sf_configuration() {
$this->given_the_kernel_is_booted_with_messenger_and_auto_scale_config();
$this->given_the_kernel_is_booted_with_config($this->messengerAndAutoScaleConfig());
$this->when_the_requires_supervisor_pool_configs_is_created();
$this->then_the_pool_configs_match_the_auto_scale_config();
$this->then_the_supervisor_pool_configs_match([
'sales' => ['sales', 'sales_order'],
'default' => ['catalog']
]);
}

/** @test */
public function supervisor_pool_config_receiver_ids_are_sorted_off_of_transport_priority_option() {
$this->given_the_kernel_is_booted_with_config($this->messengerAndAutoScaleConfigWithPriority());
$this->when_the_requires_supervisor_pool_configs_is_created();
$this->then_the_supervisor_pool_configs_match([
'catalog' => ['catalog_highest', 'catalog_high', 'catalog', 'catalog_low'],
]);
}

/** @test */
public function receiver_to_pool_mapping_is_built_from_auto_scale_config() {
$this->given_the_kernel_is_booted_with_messenger_and_auto_scale_config();
$this->given_the_kernel_is_booted_with_config($this->messengerAndAutoScaleConfig());
$this->when_the_requires_supervisor_pool_configs_is_created();
$this->then_the_receiver_to_pools_mapping_matches([
'catalog' => 'default',
Expand All @@ -52,7 +64,7 @@ public function receiver_to_pool_mapping_is_built_from_auto_scale_config() {
/** @test */
public function consuming_messages_with_a_running_supervisor() {
$this->given_the_message_info_file_is_reset();
$this->given_the_kernel_is_booted_with_messenger_and_auto_scale_config();
$this->given_the_kernel_is_booted_with_config($this->messengerAndAutoScaleConfig());
$this->given_the_supervisor_is_started();
$this->when_the_messages_are_dispatched();
$this->then_the_message_info_file_matches_the_messages_sent();
Expand All @@ -67,15 +79,30 @@ private function given_the_message_info_file_is_reset() {
@unlink(__DIR__ . '/Fixtures/_message-info.txt');
}

private function given_the_kernel_is_booted_with_messenger_and_auto_scale_config() {
private function given_the_kernel_is_booted_with_config(array $configFiles) {
$kernel = $this->createKernel();
$kernel->addBundle(Fixtures\TestFixtureBundle::class);
$kernel->addBundle(MessengerRedisBundle::class);
$kernel->addConfigFile(__DIR__ . '/Fixtures/messenger-config.yaml');
$kernel->addConfigFile(__DIR__ . '/Fixtures/auto-scale-config.yaml');
foreach ($configFiles as $configFile) {
$kernel->addConfigFile($configFile);
}
$this->bootKernel();
}

private function messengerAndAutoScaleConfig(): array {
return [
__DIR__ . '/Fixtures/messenger-config.yaml',
__DIR__ . '/Fixtures/auto-scale-config.yaml',
];
}

private function messengerAndAutoScaleConfigWithPriority(): array {
return [
__DIR__ . '/Fixtures/messenger-config-with-priority.yaml',
__DIR__ . '/Fixtures/auto-scale-config-with-priority.yaml',
];
}

private function given_the_supervisor_is_started() {
$this->proc = new Process([__DIR__ . '/Fixtures/console', 'krak:auto-scale:consume']);
$this->proc
Expand All @@ -96,12 +123,12 @@ private function when_the_messages_are_dispatched() {
usleep(2000 * 1000); // 2000ms
}

private function then_the_pool_configs_match_the_auto_scale_config() {
$res = $this->requiresPoolConfigs;
$this->assertEquals('sales', $res->poolConfigs[0]->name());
$this->assertEquals(['sales', 'sales_order'], $res->poolConfigs[0]->receiverIds());
$this->assertEquals('default', $res->poolConfigs[1]->name());
$this->assertEquals(['catalog'], $res->poolConfigs[1]->receiverIds());
private function then_the_supervisor_pool_configs_match(array $expectedPoolNameToReceiverIds) {
$poolNameToReceiverIds = [];
foreach ($this->requiresPoolConfigs->poolConfigs as $poolConfig) {
$poolNameToReceiverIds[$poolConfig->name()] = $poolConfig->receiverIds();
}
$this->assertEquals($expectedPoolNameToReceiverIds, $poolNameToReceiverIds);
}

private function then_the_receiver_to_pools_mapping_matches(array $mapping) {
Expand Down
8 changes: 8 additions & 0 deletions tests/Feature/Fixtures/auto-scale-config-with-priority.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
messenger_auto_scale:
console_path: '%kernel.project_dir%/tests/Feature/Fixtures/console'
pools:
catalog:
min_procs: 0
max_procs: 5
receivers: "catalog*"
heartbeat_interval: 5
19 changes: 19 additions & 0 deletions tests/Feature/Fixtures/messenger-config-with-priority.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
framework:
serializer:
enabled: true
cache:
default_redis_provider: 'redis://redis:6379'
messenger:
transports:
catalog_low:
dsn: 'redis://redis:6379?queue=catalog'
options: { priority: -1 }
catalog: 'redis://redis:6379?queue=catalog'
catalog_highest:
dsn: 'redis://redis:6379?queue=catalog'
options: { priority: 2 }
catalog_high:
dsn: 'redis://redis:6379?queue=catalog'
options: { priority: 1 }
routing:
'Krak\SymfonyMessengerAutoScale\Tests\Feature\Fixtures\Message\CatalogMessage': [catalog_low, catalog, catalog_high, catalog_highest]

0 comments on commit ee154cd

Please sign in to comment.