diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/ProjectionErrorTest.php b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/ProjectionErrorTest.php index 2b98d6289f..bcf4be11ab 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/ProjectionErrorTest.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/ProjectionErrorTest.php @@ -14,6 +14,7 @@ use Neos\ContentRepository\Core\Subscription\Engine\ProcessedResult; use Neos\ContentRepository\Core\Subscription\Engine\Result; use Neos\ContentRepository\Core\Subscription\Engine\SubscriptionEngineCriteria; +use Neos\ContentRepository\Core\Subscription\Exception\CatchUpHadErrors; use Neos\ContentRepository\Core\Subscription\ProjectionSubscriptionStatus; use Neos\ContentRepository\Core\Subscription\SubscriptionError; use Neos\ContentRepository\Core\Subscription\SubscriptionId; @@ -107,7 +108,7 @@ public function fixFailedProjectionViaReset() ); $result = $this->subscriptionEngine->catchUpActive(); - self::assertTrue($result->hasFailed()); + self::assertTrue($result->hadErrors()); self::assertEquals( $expectedFailure, @@ -161,7 +162,7 @@ public function irreparableProjection() // catchup active tries to apply the commited event $result = $this->subscriptionEngine->catchUpActive(); // but fails - self::assertTrue($result->hasFailed()); + self::assertTrue($result->hadErrors()); self::assertEquals($expectedFailure, $this->subscriptionStatus('Vendor.Package:SecondFakeProjection')); // a second catchup active does not change anything @@ -207,7 +208,7 @@ public function irreparableProjection() // but booting will rethrow that error :D $result = $this->subscriptionEngine->boot(); - self::assertTrue($result->hasFailed()); + self::assertTrue($result->hadErrors()); self::assertEquals( ProjectionSubscriptionStatus::create( subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), @@ -309,7 +310,7 @@ public function projectionIsRolledBackAfterErrorButKeepsSuccessFullEvents() ); $result = $this->subscriptionEngine->catchUpActive(); - self::assertTrue($result->hasFailed()); + self::assertTrue($result->hadErrors()); $expectedFailure = ProjectionSubscriptionStatus::create( subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), @@ -367,7 +368,7 @@ public function projectionErrorWithMultipleProjectionsInContentRepositoryHandle( } catch (\RuntimeException $exception) { $handleException = $exception; } - self::assertNotNull($handleException); + self::assertInstanceOf(CatchUpHadErrors::class, $exception); self::assertEquals('Exception in subscriber "Vendor.Package:FakeProjection" while catching up: This projection is kaputt.', $handleException->getMessage()); self::assertSame($originalException, $handleException->getPrevious()); diff --git a/Neos.ContentRepository.Core/Classes/ContentRepository.php b/Neos.ContentRepository.Core/Classes/ContentRepository.php index 460cc96989..6c5b4f239d 100644 --- a/Neos.ContentRepository.Core/Classes/ContentRepository.php +++ b/Neos.ContentRepository.Core/Classes/ContentRepository.php @@ -41,6 +41,7 @@ use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName; use Neos\ContentRepository\Core\SharedModel\Workspace\Workspaces; use Neos\ContentRepository\Core\Subscription\Engine\SubscriptionEngine; +use Neos\ContentRepository\Core\Subscription\Exception\CatchUpHadErrors; use Neos\EventStore\EventStoreInterface; use Neos\EventStore\Exception\ConcurrencyException; use Psr\Clock\ClockInterface; @@ -98,7 +99,9 @@ public function handle(CommandInterface $command): void $eventsToPublish = $this->enrichEventsToPublishWithMetadata($toPublish); $this->eventStore->commit($eventsToPublish->streamName, $this->eventNormalizer->normalizeEvents($eventsToPublish->events), $eventsToPublish->expectedVersion); $catchUpResult = $this->subscriptionEngine->catchUpActive(); - $catchUpResult->throwOnFailure(); + if ($catchUpResult->hadErrors()) { + throw CatchUpHadErrors::createFromErrors($catchUpResult->errors); + } return; } @@ -128,7 +131,9 @@ public function handle(CommandInterface $command): void // We always NEED to catchup even if there was an unexpected ConcurrencyException to make sure previous commits are handled. // Technically it would be acceptable for the catchup to fail here (due to hook errors) because all the events are already persisted. $catchUpResult = $this->subscriptionEngine->catchUpActive(); - $catchUpResult->throwOnFailure(); + if ($catchUpResult->hadErrors()) { + throw CatchUpHadErrors::createFromErrors($catchUpResult->errors); + } } } diff --git a/Neos.ContentRepository.Core/Classes/Service/ContentStreamPruner.php b/Neos.ContentRepository.Core/Classes/Service/ContentStreamPruner.php index 5f1991c0a7..50ca89b5a8 100644 --- a/Neos.ContentRepository.Core/Classes/Service/ContentStreamPruner.php +++ b/Neos.ContentRepository.Core/Classes/Service/ContentStreamPruner.php @@ -161,7 +161,7 @@ public function removeDanglingContentStreams(\Closure $outputFn, \DateTimeImmuta if ($danglingContentStreamsPresent) { $result = $this->subscriptionEngine->catchUpActive(); - if ($result->hasFailed()) { + if ($result->hadErrors()) { $outputFn('Catchup after removing unused content streams led to errors. You might need to use ./flow contentstream:pruneremovedfromeventstream and replay.'); } } else { diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Engine/ProcessedResult.php b/Neos.ContentRepository.Core/Classes/Subscription/Engine/ProcessedResult.php index e90e6975fa..eabe5c5270 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Engine/ProcessedResult.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Engine/ProcessedResult.php @@ -26,26 +26,8 @@ public static function failed(int $numberOfProcessedEvents, Errors $errors): sel } /** @phpstan-assert-if-true !null $this->errors */ - public function hasFailed(): bool + public function hadErrors(): bool { return $this->errors !== null; } - - public function throwOnFailure(): void - { - /** @var Error[] $errors */ - $errors = iterator_to_array($this->errors ?? []); - if ($errors === []) { - return; - } - $firstError = array_shift($errors); - - $additionalFailedSubscribers = array_map(fn (Error $error) => $error->subscriptionId->value, $errors); - - $additionalErrors = $additionalFailedSubscribers === [] ? '' : sprintf(' | And subscribers %s with additional errors.', join(', ', $additionalFailedSubscribers)); - $exceptionMessage = sprintf('Exception in subscriber "%s" while catching up: %s%s', $firstError->subscriptionId->value, $firstError->message, $additionalErrors); - - // todo custom exception! - throw new \RuntimeException($exceptionMessage, 1732132930, $firstError->throwable); - } } diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Exception/CatchUpHadErrors.php b/Neos.ContentRepository.Core/Classes/Subscription/Exception/CatchUpHadErrors.php new file mode 100644 index 0000000000..7545c74706 --- /dev/null +++ b/Neos.ContentRepository.Core/Classes/Subscription/Exception/CatchUpHadErrors.php @@ -0,0 +1,68 @@ + + * @api + */ +final class CatchUpHadErrors extends \RuntimeException implements \IteratorAggregate +{ + /** + * @internal + * @param array<\Throwable> $additionalExceptions + */ + private function __construct( + string $message, + int $code, + \Throwable $exception, + private readonly array $additionalExceptions + ) { + parent::__construct($message, $code, $exception); + } + + /** + * @internal + */ + public static function createFromErrors(Errors $errors): self + { + /** @var non-empty-array $errors */ + $errors = iterator_to_array($errors); + $firstError = array_shift($errors); + + $additionalFailedSubscribers = array_map(fn (Error $error) => $error->subscriptionId->value, $errors); + + $additionalErrors = $additionalFailedSubscribers === [] ? '' : sprintf(' | And subscribers %s with additional errors.', join(', ', $additionalFailedSubscribers)); + $exceptionMessage = sprintf('Exception in subscriber "%s" while catching up: %s%s', $firstError->subscriptionId->value, $firstError->message, $additionalErrors); + + throw new self($exceptionMessage, 1732132930, $firstError->throwable, array_map(fn (Error $error) => $error->throwable, $errors)); + } + + public function getIterator(): \Traversable + { + $previous = $this->getPrevious(); + if ($previous !== null) { + yield $previous; + } + yield from $this->additionalExceptions; + } +}