Skip to content

Commit

Permalink
remove EventPersister and much more :)
Browse files Browse the repository at this point in the history
  • Loading branch information
bwaidelich committed Nov 14, 2024
1 parent 7fea53e commit ec08d64
Show file tree
Hide file tree
Showing 37 changed files with 262 additions and 211 deletions.
19 changes: 12 additions & 7 deletions Neos.ContentRepository.Core/Classes/ContentRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use Neos\ContentRepository\Core\CommandHandler\CommandInterface;
use Neos\ContentRepository\Core\Dimension\ContentDimensionSourceInterface;
use Neos\ContentRepository\Core\DimensionSpace\InterDimensionalVariationGraph;
use Neos\ContentRepository\Core\EventStore\EventPersister;
use Neos\ContentRepository\Core\EventStore\EventNormalizer;
use Neos\ContentRepository\Core\EventStore\EventsToPublish;
use Neos\ContentRepository\Core\EventStore\InitiatingEventMetadata;
use Neos\ContentRepository\Core\NodeType\NodeTypeManager;
Expand All @@ -36,7 +36,10 @@
use Neos\ContentRepository\Core\SharedModel\Workspace\Workspace;
use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName;
use Neos\ContentRepository\Core\SharedModel\Workspace\Workspaces;
use Neos\ContentRepository\Core\Subscription\Engine\SubscriptionEngine;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Exception\ConcurrencyException;
use Neos\EventStore\Model\Events;
use Psr\Clock\ClockInterface;

/**
Expand All @@ -57,7 +60,9 @@
public function __construct(
public readonly ContentRepositoryId $id,
private readonly CommandBus $commandBus,
private readonly EventPersister $eventPersister,
private readonly EventStoreInterface $eventStore,
private readonly EventNormalizer $eventNormalizer,
private readonly SubscriptionEngine $subscriptionEngine,
private readonly NodeTypeManager $nodeTypeManager,
private readonly InterDimensionalVariationGraph $variationGraph,
private readonly ContentDimensionSourceInterface $contentDimensionSource,
Expand All @@ -83,8 +88,8 @@ public function handle(CommandInterface $command): void
// simple case
if ($toPublish instanceof EventsToPublish) {
$eventsToPublish = $this->enrichEventsToPublishWithMetadata($toPublish);
$this->eventPersister->publishWithoutCatchup($eventsToPublish);
// TODO how to solve this with a decoupled subscription engine? $this->catchupProjections();
$this->eventStore->commit($eventsToPublish->streamName, $this->eventNormalizer->normalizeEvents($eventsToPublish), $eventsToPublish->expectedVersion);
$this->subscriptionEngine->catchUpActive();
return;
}

Expand All @@ -93,7 +98,7 @@ public function handle(CommandInterface $command): void
foreach ($toPublish as $yieldedEventsToPublish) {
$eventsToPublish = $this->enrichEventsToPublishWithMetadata($yieldedEventsToPublish);
try {
$this->eventPersister->publishWithoutCatchup($eventsToPublish);
$this->eventStore->commit($eventsToPublish->streamName, $this->eventNormalizer->normalizeEvents($eventsToPublish), $eventsToPublish->expectedVersion);
} catch (ConcurrencyException $concurrencyException) {
// we pass the exception into the generator (->throw), so it could be try-caught and reacted upon:
//
Expand All @@ -105,15 +110,15 @@ public function handle(CommandInterface $command): void
// }
$yieldedErrorStrategy = $toPublish->throw($concurrencyException);
if ($yieldedErrorStrategy instanceof EventsToPublish) {
$this->eventPersister->publishWithoutCatchup($yieldedErrorStrategy);
$this->eventStore->commit($eventsToPublish->streamName, $this->eventNormalizer->normalizeEvents($yieldedErrorStrategy), $yieldedErrorStrategy->expectedVersion);
}
throw $concurrencyException;
}
}
} finally {
// We always NEED to catchup even if there was an unexpected ConcurrencyException to make sure previous commits are handled.
// Technically it would be acceptable for the catchup to fail here (due to hook errors) because all the events are already persisted.
// TODO how to solve with the decoupled subscription engine? $this->catchupProjections();
$this->subscriptionEngine->catchUpActive();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
use Neos\EventStore\Model\Event\EventData;
use Neos\EventStore\Model\Event\EventId;
use Neos\EventStore\Model\Event\EventType;
use Neos\EventStore\Model\Events;

/**
* Central authority to convert Content Repository domain events to Event Store EventData and EventType, vice versa.
Expand Down Expand Up @@ -147,6 +148,11 @@ public function normalize(EventInterface|DecoratedEvent $event): Event
);
}

public function normalizeEvents(EventsToPublish $eventsToPublish): Events
{
return Events::fromArray($eventsToPublish->events->map($this->normalize(...)));
}

public function denormalize(Event $event): EventInterface
{
$eventClassName = $this->getEventClassName($event);
Expand Down
52 changes: 0 additions & 52 deletions Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
use Neos\ContentRepository\Core\DimensionSpace\ContentDimensionZookeeper;
use Neos\ContentRepository\Core\DimensionSpace\InterDimensionalVariationGraph;
use Neos\ContentRepository\Core\EventStore\EventNormalizer;
use Neos\ContentRepository\Core\EventStore\EventPersister;
use Neos\ContentRepository\Core\Feature\DimensionSpaceAdjustment\DimensionSpaceCommandHandler;
use Neos\ContentRepository\Core\Feature\NodeAggregateCommandHandler;
use Neos\ContentRepository\Core\Feature\NodeDuplication\NodeDuplicationCommandHandler;
Expand Down Expand Up @@ -70,7 +69,6 @@ final class ContentRepositoryFactory

// The following properties store "singleton" references of objects for this content repository
private ?ContentRepository $contentRepositoryRuntimeCache = null;
private ?EventPersister $eventPersisterRuntimeCache = null;

/**
* @param CatchUpHookFactoryInterface<ContentGraphReadModelInterface> $contentGraphCatchUpHookFactory
Expand Down Expand Up @@ -202,7 +200,9 @@ public function getOrBuild(): ContentRepository
$this->contentRepositoryRuntimeCache = new ContentRepository(
$this->contentRepositoryId,
$publicCommandBus,
$this->buildEventPersister(),
$this->eventStore,
$this->subscriberFactoryDependencies->eventNormalizer,
$this->subscriptionEngine,
$this->subscriberFactoryDependencies->nodeTypeManager,
$this->subscriberFactoryDependencies->interDimensionalVariationGraph,
$this->subscriberFactoryDependencies->contentDimensionSource,
Expand Down Expand Up @@ -235,20 +235,8 @@ public function buildService(
$this->subscriberFactoryDependencies,
$this->eventStore,
$this->getOrBuild(),
$this->buildEventPersister(),
$this->subscriptionEngine,
);
return $serviceFactory->build($serviceFactoryDependencies);
}

private function buildEventPersister(): EventPersister
{
if (!$this->eventPersisterRuntimeCache) {
$this->eventPersisterRuntimeCache = new EventPersister(
$this->eventStore,
$this->subscriberFactoryDependencies->eventNormalizer,
);
}
return $this->eventPersisterRuntimeCache;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
use Neos\ContentRepository\Core\DimensionSpace\ContentDimensionZookeeper;
use Neos\ContentRepository\Core\DimensionSpace\InterDimensionalVariationGraph;
use Neos\ContentRepository\Core\EventStore\EventNormalizer;
use Neos\ContentRepository\Core\EventStore\EventPersister;
use Neos\ContentRepository\Core\Infrastructure\Property\PropertyConverter;
use Neos\ContentRepository\Core\NodeType\NodeTypeManager;
use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId;
Expand All @@ -45,7 +44,6 @@ private function __construct(
public PropertyConverter $propertyConverter,
public ContentRepository $contentRepository,
// we don't need CommandBus, because this is included in ContentRepository->handle()
public EventPersister $eventPersister,
public SubscriptionEngine $subscriptionEngine,
) {
}
Expand All @@ -57,7 +55,6 @@ public static function create(
SubscriberFactoryDependencies $projectionFactoryDependencies,
EventStoreInterface $eventStore,
ContentRepository $contentRepository,
EventPersister $eventPersister,
SubscriptionEngine $subscriptionEngine,
): self {
return new self(
Expand All @@ -70,7 +67,6 @@ public static function create(
$projectionFactoryDependencies->interDimensionalVariationGraph,
$projectionFactoryDependencies->propertyConverter,
$contentRepository,
$eventPersister,
$subscriptionEngine,
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ interface CatchUpHookInterface
{
/**
* This hook is called at the beginning of a catch-up run;
* AFTER the Database Lock is acquired ({@see SubscriptionEngine::run()}).
* AFTER the Database Lock is acquired ({@see SubscriptionEngine::catchUpActive()}).
*/
public function onBeforeCatchUp(SubscriptionStatus $subscriptionStatus): void;

Expand All @@ -39,7 +39,7 @@ public function onAfterEvent(EventInterface $eventInstance, EventEnvelope $event

/**
* This hook is called at the END of a catch-up run
* BEFORE the Database Lock is released ({@see SubscriptionEngine::run()}).
* BEFORE the Database Lock is released ({@see SubscriptionEngine::catchUpActive()}).
*/
public function onAfterCatchUp(): void;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

namespace Neos\ContentRepository\Core\Projection;

use Neos\ContentRepository\Core\ContentRepository;
use Neos\ContentRepository\Core\EventStore\EventPersister;
use Neos\ContentRepository\Core\Subscription\Engine\SubscriptionEngine;

/**
* Additional marker interface to add to a {@see ProjectionInterface}.
Expand All @@ -19,7 +18,7 @@ interface WithMarkStaleInterface
{
/**
* Triggered during catching up after applying events
* {@see ContentRepository::catchUpProjection()}
* {@see SubscriptionEngine::catchUpActive()}
*
* Can be f.e. used to flush caches inside the Projection State.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use Neos\ContentRepository\Core\Service\ContentStreamPruner\ContentStreamForPruning;
use Neos\ContentRepository\Core\Service\ContentStreamPruner\ContentStreamStatus;
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId;
use Neos\ContentRepository\Core\Subscription\Engine\SubscriptionEngine;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Model\Event\EventType;
use Neos\EventStore\Model\Event\EventTypes;
Expand All @@ -40,7 +41,8 @@ class ContentStreamPruner implements ContentRepositoryServiceInterface
{
public function __construct(
private readonly EventStoreInterface $eventStore,
private readonly EventNormalizer $eventNormalizer
private readonly EventNormalizer $eventNormalizer,
private readonly SubscriptionEngine $subscriptionEngine,
) {
}

Expand Down Expand Up @@ -160,10 +162,9 @@ public function removeDanglingContentStreams(\Closure $outputFn, \DateTimeImmuta
}

if ($danglingContentStreamsPresent) {
try {
//TODO $this->contentRepository->catchUpProjections();
} catch (\Throwable $e) {
$outputFn(sprintf('Could not catchup after removing unused content streams: %s. You might need to use ./flow contentstream:pruneremovedfromeventstream and replay.', $e->getMessage()));
$result = $this->subscriptionEngine->catchUpActive();
if ($result->hasErrors()) {
$outputFn('Catchup after removing unused content streams led to errors. You might need to use ./flow contentstream:pruneremovedfromeventstream and replay.');
}
} else {
$outputFn('Okay. No pruneable streams in the event stream');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ public function build(ContentRepositoryServiceFactoryDependencies $serviceFactor
{
return new ContentStreamPruner(
$serviceFactoryDependencies->eventStore,
$serviceFactoryDependencies->eventNormalizer
$serviceFactoryDependencies->eventNormalizer,
$serviceFactoryDependencies->subscriptionEngine,
);
}
}
48 changes: 48 additions & 0 deletions Neos.ContentRepository.Core/Classes/Subscription/Engine/Errors.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?php

declare(strict_types=1);

namespace Neos\ContentRepository\Core\Subscription\Engine;

/**
* @implements \IteratorAggregate<Error>
*/
final readonly class Errors implements \IteratorAggregate, \Countable
{
/**
* array<Error>
*/
private array $errors;

/**
* @param array<Error> $errors
*/
private function __construct(
Error ...$errors
) {
$this->errors = $errors;
}

/**
* @param array<Error> $errors
*/
public static function fromArray(array $errors): self
{
return new self(...$errors);
}

public function getIterator(): \Traversable
{
yield from $this->errors;
}

public function isEmpty(): bool
{
return $this->errors === [];
}

public function count(): int
{
return count($this->errors);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,28 @@
namespace Neos\ContentRepository\Core\Subscription\Engine;

/**
* @internal
* @api
*/
final class ProcessedResult
final readonly class ProcessedResult
{
/** @param list<Error> $errors */
public function __construct(
private function __construct(
public readonly int $numberOfProcessedEvents,
public readonly bool $finished = false,
public readonly array $errors = [],
public readonly Errors|null $errors,
) {
}

public static function success(int $numberOfProcessedEvents): self
{
return new self($numberOfProcessedEvents, null);
}

public static function failed(int $numberOfProcessedEvents, Errors $errors): self
{
return new self($numberOfProcessedEvents, $errors);
}

public function hasErrors(): bool
{
return $this->errors !== null;
}
}
Loading

0 comments on commit ec08d64

Please sign in to comment.