Skip to content

Commit

Permalink
Merge pull request #247 from patchlevel/command-bus-config
Browse files Browse the repository at this point in the history
add command bus service
  • Loading branch information
DavidBadura authored Feb 17, 2025
2 parents 8a0778d + 478ef55 commit 6af37a0
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 24 deletions.
32 changes: 11 additions & 21 deletions docs/pages/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -417,20 +417,8 @@ patchlevel_event_sourcing:

## Command Bus

You can also enable and register our handlers in symfony messenger.
These handlers allow you to use your aggregates as command handlers.

```yaml
patchlevel_event_sourcing:
aggregate_handlers: ~
```
!!! note

You can find out more about the command bus integration [here](https://event-sourcing.patchlevel.io/latest/command_bus/).

Default the handlers will be registered for all buses.
You can also specify a specific bus.
Before you have to define the bus in the messenger configuration.
You can enable the command bus integration to use your aggregates as command handlers.
For this bundle we provide only a symfony messenger integration, so you have to define the bus in the messenger configuration.

```yaml
framework:
Expand All @@ -439,17 +427,19 @@ framework:
buses:
command.bus: ~
```
!!! tip

This is also useful if you have a event bus and a command bus.

And then you can specify the bus in the configuration.
After this, you need to define the command bus in the event sourcing configuration.
This will automatically register the aggregate handlers for the symfony messenger,
so you can handle commands with your aggregates.

```yaml
patchlevel_event_sourcing:
aggregate_handlers:
bus: command.bus
command_bus:
service: command.bus
```
!!! note

You can find out more about the command bus and the aggregate handlers [here](https://event-sourcing.patchlevel.io/latest/command_bus/).

## Event Bus

You can enable the event bus to listen for events and messages synchronously.
Expand Down
21 changes: 21 additions & 0 deletions src/CommandBus/SymfonyCommandBus.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcingBundle\CommandBus;

use Patchlevel\EventSourcing\CommandBus\CommandBus;
use Symfony\Component\Messenger\MessageBusInterface;

final class SymfonyCommandBus implements CommandBus
{
public function __construct(
private readonly MessageBusInterface $messageBus,
) {
}

public function dispatch(object $command): void
{
$this->messageBus->dispatch($command);
}
}
15 changes: 15 additions & 0 deletions src/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
/**
* @psalm-type Config = array{
* event_bus: array{enabled: bool, type: string, service: string},
* command_bus: array{enabled: bool, service: string},
* subscription: array{
* store: array{type: string, service: string|null},
* retry_strategy: array{base_delay: int, delay_factor: int, max_attempts: int},
Expand Down Expand Up @@ -238,12 +239,26 @@ public function getConfigTreeBuilder(): TreeBuilder
->end()
->end()

->arrayNode('command_bus')
->canBeEnabled()
->addDefaultsIfNotSet()
->children()
->scalarNode('service')->isRequired()->end()
->booleanNode('register_aggregate_handlers')->defaultTrue()->end()
->end()
->end()

->arrayNode('aggregate_handlers')
->canBeEnabled()
->addDefaultsIfNotSet()
->children()
->scalarNode('bus')->defaultNull()->end()
->end()
->setDeprecated(
'patchlevel/event-sourcing-bundle',
'3.9',
'The "%node%" option is deprecated and will be removed in 4.0. Use "patchlevel_event_sourcing.command_bus" instead.'
)
->end()

->end();
Expand Down
31 changes: 28 additions & 3 deletions src/DependencyInjection/PatchlevelEventSourcingExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use Patchlevel\EventSourcing\Attribute\Subscriber;
use Patchlevel\EventSourcing\Clock\FrozenClock;
use Patchlevel\EventSourcing\Clock\SystemClock;
use Patchlevel\EventSourcing\CommandBus\CommandBus;
use Patchlevel\EventSourcing\Console\Command\DatabaseCreateCommand;
use Patchlevel\EventSourcing\Console\Command\DatabaseDropCommand;
use Patchlevel\EventSourcing\Console\Command\DebugCommand;
Expand Down Expand Up @@ -107,6 +108,7 @@
use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberHelper;
use Patchlevel\EventSourcingBundle\Attribute\AsListener;
use Patchlevel\EventSourcingBundle\Command\StoreMigrateCommand;
use Patchlevel\EventSourcingBundle\CommandBus\SymfonyCommandBus;
use Patchlevel\EventSourcingBundle\DataCollector\EventSourcingCollector;
use Patchlevel\EventSourcingBundle\DataCollector\MessageCollectorEventBus;
use Patchlevel\EventSourcingBundle\Doctrine\DbalConnectionFactory;
Expand Down Expand Up @@ -158,7 +160,7 @@ public function load(array $configs, ContainerBuilder $container): void
$this->configureUpcaster($container);
$this->configureSerializer($config, $container);
$this->configureMessageDecorator($container);
$this->configureAggregateHandlers($config, $container);
$this->configureCommandBus($config, $container);
$this->configureEventBus($config, $container);
$this->configureConnection($config, $container);
$this->configureStore($config, $container);
Expand Down Expand Up @@ -218,13 +220,36 @@ private function configureSerializer(array $config, ContainerBuilder $container)
}

/** @param Config $config */
private function configureAggregateHandlers(array $config, ContainerBuilder $container): void
private function configureCommandBus(array $config, ContainerBuilder $container): void
{
if ($config['command_bus']['enabled'] && $config['aggregate_handlers']['enabled']) {
throw new InvalidArgumentException('Remove legacy aggregate_handlers configuration when using command_bus');
}

if ($config['command_bus']['enabled']) {
$container->register(SymfonyCommandBus::class)
->setArguments([
new Reference($config['command_bus']['service']),
]);

$container->setAlias(CommandBus::class, SymfonyCommandBus::class);

$container->setParameter(
'patchlevel_event_sourcing.aggregate_handlers.bus',
$config['command_bus']['service'],
);

return;
}

if (!$config['aggregate_handlers']['enabled']) {
return;
}

$container->setParameter('patchlevel_event_sourcing.aggregate_handlers.bus', $config['aggregate_handlers']['bus']);
$container->setParameter(
'patchlevel_event_sourcing.aggregate_handlers.bus',
$config['aggregate_handlers']['bus'],
);
}

/** @param Config $config */
Expand Down
66 changes: 66 additions & 0 deletions tests/Unit/PatchlevelEventSourcingBundleTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use Patchlevel\EventSourcing\Aggregate\CustomId;
use Patchlevel\EventSourcing\Clock\FrozenClock;
use Patchlevel\EventSourcing\Clock\SystemClock;
use Patchlevel\EventSourcing\CommandBus\CommandBus;
use Patchlevel\EventSourcing\CommandBus\Handler\CreateAggregateHandler;
use Patchlevel\EventSourcing\Console\Command\DatabaseCreateCommand;
use Patchlevel\EventSourcing\Console\Command\DatabaseDropCommand;
Expand Down Expand Up @@ -70,6 +71,7 @@
use Patchlevel\EventSourcing\Subscription\Store\SubscriptionStore;
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository;
use Patchlevel\EventSourcingBundle\Command\StoreMigrateCommand;
use Patchlevel\EventSourcingBundle\CommandBus\SymfonyCommandBus;
use Patchlevel\EventSourcingBundle\DependencyInjection\PatchlevelEventSourcingExtension;
use Patchlevel\EventSourcingBundle\EventBus\SymfonyEventBus;
use Patchlevel\EventSourcingBundle\PatchlevelEventSourcingBundle;
Expand Down Expand Up @@ -547,6 +549,69 @@ public function testCommandHandler(): void
self::assertEquals('command.bus', $tag['bus']);
}

public function testCommandBusAndLegacyConfigurationNotAllowed(): void
{
$container = new ContainerBuilder();

$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage('Remove legacy aggregate_handlers configuration when using command_bus');

$this->compileContainer(
$container,
[
'patchlevel_event_sourcing' => [
'connection' => [
'service' => 'doctrine.dbal.eventstore_connection',
],
'aggregates' => [__DIR__ . '/../Fixtures'],
'aggregate_handlers' => [
'bus' => 'command.bus',
],
'command_bus' => [
'service' => 'command.bus',
],
],
]
);
}

public function testCommandBus(): void
{
$container = new ContainerBuilder();

$this->compileContainer(
$container,
[
'patchlevel_event_sourcing' => [
'connection' => [
'service' => 'doctrine.dbal.eventstore_connection',
],
'aggregates' => [__DIR__ . '/../Fixtures'],
'command_bus' => [
'service' => 'command.bus',
],
],
]
);

$handler = $container->get('event_sourcing.handler.profile.create');

self::assertInstanceOf(CreateAggregateHandler::class, $handler);

$handler(new CreateProfile(CustomId::fromString('1')));

$definition = $container->getDefinition('event_sourcing.handler.profile.create');
$tags = $definition->getTag('messenger.message_handler');

self::assertCount(1, $tags);

$tag = $tags[0];

self::assertEquals(CreateProfile::class, $tag['handles']);
self::assertEquals('command.bus', $tag['bus']);
self::assertInstanceOf(SymfonyCommandBus::class, $container->get(CommandBus::class));
}

public function testSnapshotStore(): void
{
$container = new ContainerBuilder();
Expand Down Expand Up @@ -1222,6 +1287,7 @@ private function compileContainer(ContainerBuilder $container, array $config): v

$container->set('doctrine.dbal.eventstore_connection', $this->prophesize(Connection::class)->reveal());
$container->set('event.bus', $this->prophesize(MessageBusInterface::class)->reveal());
$container->set('command.bus', $this->prophesize(MessageBusInterface::class)->reveal());
$container->set('cache.default', $this->prophesize(CacheItemPoolInterface::class)->reveal());
$container->set('event_dispatcher', $this->prophesize(EventDispatcherInterface::class)->reveal());
$container->set('services_resetter', $this->prophesize(ServicesResetter::class)->reveal());
Expand Down

0 comments on commit 6af37a0

Please sign in to comment.