From aa2e7b13632fbe4f50cd75ce70a85990213eda45 Mon Sep 17 00:00:00 2001 From: mhsdesign <85400359+mhsdesign@users.noreply.github.com> Date: Tue, 3 Dec 2024 20:08:29 +0100 Subject: [PATCH] TASK: Prevent catchup hooks from halting the projections updates bfb4655173ef7c437aa33f007a4ffd0160705e9b to not do a full rollback if catchup hooks fail but continue and collect their errors, which will be rethrown later with the advantage that the catchup worked! --- .../Subscription/CatchUpHookErrorTest.php | 250 +++++++++++------- .../Subscription/ProjectionErrorTest.php | 4 +- .../CatchUpHook/CatchUpHookFailed.php | 36 +++ .../CatchUpHook/CatchUpHookInterface.php | 28 +- .../CatchUpHook/DelegatingCatchUpHook.php | 55 +++- .../Classes/Subscription/Engine/Error.php | 10 +- .../Engine/SubscriptionEngine.php | 47 ++-- .../Subscription/Exception/CatchUpFailed.php | 16 -- 8 files changed, 287 insertions(+), 159 deletions(-) create mode 100644 Neos.ContentRepository.Core/Classes/Projection/CatchUpHook/CatchUpHookFailed.php delete mode 100644 Neos.ContentRepository.Core/Classes/Subscription/Exception/CatchUpFailed.php diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/CatchUpHookErrorTest.php b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/CatchUpHookErrorTest.php index 0ad8b30ac33..f0f481002c3 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/CatchUpHookErrorTest.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/CatchUpHookErrorTest.php @@ -5,206 +5,261 @@ namespace Neos\ContentRepository\BehavioralTests\Tests\Functional\Subscription; use Neos\ContentRepository\Core\Feature\ContentStreamCreation\Event\ContentStreamWasCreated; +use Neos\ContentRepository\Core\Projection\CatchUpHook\CatchUpHookFailed; use Neos\ContentRepository\Core\Projection\ProjectionStatus; -use Neos\ContentRepository\Core\Subscription\Exception\CatchUpFailed; +use Neos\ContentRepository\Core\Subscription\Engine\Error; +use Neos\ContentRepository\Core\Subscription\Engine\Errors; +use Neos\ContentRepository\Core\Subscription\Engine\ProcessedResult; use Neos\ContentRepository\Core\Subscription\ProjectionSubscriptionStatus; use Neos\ContentRepository\Core\Subscription\SubscriptionError; use Neos\ContentRepository\Core\Subscription\SubscriptionId; use Neos\ContentRepository\Core\Subscription\SubscriptionStatus; use Neos\EventStore\Model\Event\SequenceNumber; +use Neos\EventStore\Model\EventEnvelope; final class CatchUpHookErrorTest extends AbstractSubscriptionEngineTestCase { /** @test */ - public function error_onBeforeEvent_projectionIsNotRun() + public function error_onBeforeEvent_isIgnoredAndCollected() { $this->eventStore->setup(); $this->fakeProjection->expects(self::once())->method('setUp'); - $this->fakeProjection->expects(self::once())->method('apply'); + $this->fakeProjection->expects(self::exactly(2))->method('apply'); $this->subscriptionEngine->setup(); $this->subscriptionEngine->boot(); - // commit two events, we expect neither to be catchupd correctly because handing on the first fails + // commit two events. we expect that the hook will throw for both events but the catchup is NOT halted $this->commitExampleContentStreamEvent(); $this->commitExampleContentStreamEvent(); $this->catchupHookForFakeProjection->expects(self::once())->method('onBeforeCatchUp')->with(SubscriptionStatus::ACTIVE); - $this->catchupHookForFakeProjection->expects(self::once())->method('onBeforeEvent')->with(self::isInstanceOf(ContentStreamWasCreated::class))->willThrowException( - $exception = new \RuntimeException('This catchup hook is kaputt.') - ); - $this->catchupHookForFakeProjection->expects(self::never())->method('onAfterEvent'); - $this->catchupHookForFakeProjection->expects(self::never())->method('onAfterCatchUp'); + + $exception = new \RuntimeException('This catchup hook is kaputt.'); + $this->catchupHookForFakeProjection->expects($invokedCount = self::exactly(2))->method('onBeforeEvent')->willReturnCallback(function ($_, EventEnvelope $eventEnvelope) use ($invokedCount, $exception) { + match ($invokedCount->getInvocationCount()) { + 1 => self::assertSame(1, $eventEnvelope->sequenceNumber->value), + 2 => self::assertSame(2, $eventEnvelope->sequenceNumber->value), + }; + throw $exception; + }); + $this->catchupHookForFakeProjection->expects(self::exactly(2))->method('onAfterEvent'); + $this->catchupHookForFakeProjection->expects(self::once())->method('onAfterCatchUp'); self::assertEmpty( $this->secondFakeProjection->getState()->findAppliedSequenceNumbers() ); - $actualException = null; - try { - $this->subscriptionEngine->catchUpActive(); - } catch (\Throwable $e) { - $actualException = $e; - } - self::assertSame($exception, $actualException); - self::assertSame('This catchup hook is kaputt.', $actualException->getMessage()); + $expectedWrappedException = new CatchUpHookFailed( + 'Hook "" failed "onBeforeEvent": This catchup hook is kaputt.', + 1733243960, + $exception, + [] + ); - $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::none()); + // two errors for both of the events + $result = $this->subscriptionEngine->catchUpActive(); + self::assertEquals( + ProcessedResult::failed( + 2, + Errors::fromArray([ + Error::forSubscription(SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), $expectedWrappedException), + Error::forSubscription(SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), $expectedWrappedException), + ]) + ), + $result + ); - // must be still empty because apply was never called - self::assertEmpty( + // both events are applied still + $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(2)); + self::assertEquals( + [SequenceNumber::fromInteger(1), SequenceNumber::fromInteger(2)], $this->secondFakeProjection->getState()->findAppliedSequenceNumbers() ); } /** @test */ - public function error_onAfterEvent_projectionIsRolledBack() + public function error_onAfterEvent_isIgnoredAndCollected() { $this->eventStore->setup(); $this->fakeProjection->expects(self::once())->method('setUp'); - $this->fakeProjection->expects(self::once())->method('apply'); + $this->fakeProjection->expects(self::exactly(2))->method('apply'); $this->subscriptionEngine->setup(); $this->subscriptionEngine->boot(); - // commit an events, we expect neither to be catchupd correctly because handing on the first fails + // commit two events. we expect that the hook will throw for both events but the catchup is NOT halted $this->commitExampleContentStreamEvent(); $this->commitExampleContentStreamEvent(); $this->catchupHookForFakeProjection->expects(self::once())->method('onBeforeCatchUp')->with(SubscriptionStatus::ACTIVE); - $this->catchupHookForFakeProjection->expects(self::once())->method('onBeforeEvent')->with(self::isInstanceOf(ContentStreamWasCreated::class)); - $this->catchupHookForFakeProjection->expects(self::once())->method('onAfterEvent')->with(self::isInstanceOf(ContentStreamWasCreated::class))->willThrowException( - $exception = new \RuntimeException('This catchup hook is kaputt.') - ); - $this->catchupHookForFakeProjection->expects(self::never())->method('onAfterCatchUp'); + $this->catchupHookForFakeProjection->expects(self::exactly(2))->method('onBeforeEvent')->with(self::isInstanceOf(ContentStreamWasCreated::class)); + $exception = new \RuntimeException('This catchup hook is kaputt.'); + $this->catchupHookForFakeProjection->expects($invokedCount = self::exactly(2))->method('onAfterEvent')->willReturnCallback(function ($_, EventEnvelope $eventEnvelope) use ($invokedCount, $exception) { + match ($invokedCount->getInvocationCount()) { + 1 => self::assertSame(1, $eventEnvelope->sequenceNumber->value), + 2 => self::assertSame(2, $eventEnvelope->sequenceNumber->value), + }; + throw $exception; + }); + $this->catchupHookForFakeProjection->expects(self::once())->method('onAfterCatchUp'); // todo asset no parameters! self::assertEmpty( $this->secondFakeProjection->getState()->findAppliedSequenceNumbers() ); - $actualException = null; - try { - $this->subscriptionEngine->catchUpActive(); - } catch (\Throwable $e) { - $actualException = $e; - } - self::assertSame($exception, $actualException); - self::assertSame('This catchup hook is kaputt.', $actualException->getMessage()); + $expectedWrappedException = new CatchUpHookFailed( + 'Hook "" failed "onAfterEvent": This catchup hook is kaputt.', + 1733243960, + $exception, + [] + ); - // will be empty again because the full transaction was rolled back - $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::none()); - self::assertEmpty( + // two errors for both of the events + $result = $this->subscriptionEngine->catchUpActive(); + self::assertEquals( + ProcessedResult::failed( + 2, + Errors::fromArray([ + Error::forSubscription(SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), $expectedWrappedException), + Error::forSubscription(SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), $expectedWrappedException), + ]) + ), + $result + ); + + // both events are applied still + $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(2)); + self::assertEquals( + [SequenceNumber::fromInteger(1), SequenceNumber::fromInteger(2)], $this->secondFakeProjection->getState()->findAppliedSequenceNumbers() ); } /** @test */ - public function error_onBeforeCatchUp_abortsCatchup() + public function error_onBeforeCatchUp_isIgnoredAndCollected() { $this->eventStore->setup(); $this->fakeProjection->expects(self::once())->method('setUp'); - $this->fakeProjection->expects(self::never())->method('apply'); + $this->fakeProjection->expects(self::exactly(2))->method('apply'); $this->subscriptionEngine->setup(); $this->subscriptionEngine->boot(); $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::none()); - // commit an event + // commit two events. we expect that the hook will throw for both events but the catchup is NOT halted + $this->commitExampleContentStreamEvent(); $this->commitExampleContentStreamEvent(); $this->catchupHookForFakeProjection->expects(self::once())->method('onBeforeCatchUp')->with(SubscriptionStatus::ACTIVE)->willThrowException( - new \RuntimeException('This catchup hook is kaputt.') + $exception = new \RuntimeException('This catchup hook is kaputt.') ); - $this->catchupHookForFakeProjection->expects(self::never())->method('onBeforeEvent'); - $this->catchupHookForFakeProjection->expects(self::never())->method('onAfterEvent'); - $this->catchupHookForFakeProjection->expects(self::never())->method('onAfterCatchUp'); - - $this->secondFakeProjection->injectSaboteur(fn () => self::fail('Projection apply is not expected to be called!')); + $this->catchupHookForFakeProjection->expects(self::exactly(2))->method('onBeforeEvent'); + $this->catchupHookForFakeProjection->expects(self::exactly(2))->method('onAfterEvent'); + $this->catchupHookForFakeProjection->expects(self::once())->method('onAfterCatchUp'); self::assertEmpty( $this->secondFakeProjection->getState()->findAppliedSequenceNumbers() ); - $expectedFailure = null; - try { - $this->subscriptionEngine->catchUpActive(); - } catch (\Throwable $e) { - $expectedFailure = $e; - } - self::assertInstanceOf(CatchUpFailed::class, $expectedFailure); - - self::assertSame($expectedFailure->getMessage(), 'Subscriber "Vendor.Package:SecondFakeProjection" failed onBeforeCatchUp: This catchup hook is kaputt.'); + $expectedWrappedException = new CatchUpHookFailed( + 'Hook "" failed "onBeforeCatchUp": This catchup hook is kaputt.', + 1733243960, + $exception, + [] + ); - // still the initial status - $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::none()); + $result = $this->subscriptionEngine->catchUpActive(); + self::assertEquals( + ProcessedResult::failed( + 2, + Errors::fromArray([ + Error::forSubscription(SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), $expectedWrappedException), + ]) + ), + $result + ); - // must be still empty because apply was never called - self::assertEmpty( + // both events are applied still + $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(2)); + self::assertEquals( + [SequenceNumber::fromInteger(1), SequenceNumber::fromInteger(2)], $this->secondFakeProjection->getState()->findAppliedSequenceNumbers() ); } /** @test */ - public function error_onAfterCatchUp_crashesAfterProjectionsArePersisted() + public function error_onAfterCatchUp_isIgnoredAndCollected() { $this->eventStore->setup(); $this->fakeProjection->expects(self::once())->method('setUp'); - $this->fakeProjection->expects(self::once())->method('apply'); + $this->fakeProjection->expects(self::exactly(2))->method('apply'); $this->subscriptionEngine->setup(); $this->subscriptionEngine->boot(); $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::none()); - // commit an event + // commit two events. we expect that the hook will throw for both events but the catchup is NOT halted + $this->commitExampleContentStreamEvent(); $this->commitExampleContentStreamEvent(); $this->catchupHookForFakeProjection->expects(self::once())->method('onBeforeCatchUp'); - $this->catchupHookForFakeProjection->expects(self::once())->method('onBeforeEvent'); - $this->catchupHookForFakeProjection->expects(self::once())->method('onAfterEvent'); + $this->catchupHookForFakeProjection->expects(self::exactly(2))->method('onBeforeEvent'); + $this->catchupHookForFakeProjection->expects(self::exactly(2))->method('onAfterEvent'); // todo test that other catchup hooks are still run and all errors are collected! $this->catchupHookForFakeProjection->expects(self::once())->method('onAfterCatchUp')->willThrowException( - new \RuntimeException('This catchup hook is kaputt.') + $exception = new \RuntimeException('This catchup hook is kaputt.') ); self::assertEmpty( $this->secondFakeProjection->getState()->findAppliedSequenceNumbers() ); - $expectedFailure = null; - try { - $this->subscriptionEngine->catchUpActive(); - } catch (\Throwable $e) { - $expectedFailure = $e; - } - self::assertInstanceOf(CatchUpFailed::class, $expectedFailure); + $expectedWrappedException = new CatchUpHookFailed( + 'Hook "" failed "onAfterCatchUp": This catchup hook is kaputt.', + 1733243960, + $exception, + [] + ); - self::assertSame($expectedFailure->getMessage(), 'Subscriber "Vendor.Package:SecondFakeProjection" failed onAfterCatchUp: This catchup hook is kaputt.'); + $result = $this->subscriptionEngine->catchUpActive(); + self::assertEquals( + ProcessedResult::failed( + 2, + Errors::fromArray([ + Error::forSubscription(SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), $expectedWrappedException), + ]) + ), + $result + ); - // one event is applied! - $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(1)); + // both events are applied still + $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::fromInteger(2)); self::assertEquals( - [SequenceNumber::fromInteger(1)], + [SequenceNumber::fromInteger(1), SequenceNumber::fromInteger(2)], $this->secondFakeProjection->getState()->findAppliedSequenceNumbers() ); } /** @test */ - public function error_onAfterCatchUp_crashesAfterProjectionsArePersisted_withProjectionError() + public function error_onAfterCatchUp_isIgnoredAndCollected_withProjectionError() { $this->eventStore->setup(); $this->fakeProjection->expects(self::once())->method('setUp'); - $this->fakeProjection->expects(self::once())->method('apply'); + $this->fakeProjection->expects(self::exactly(2))->method('apply'); $this->subscriptionEngine->setup(); $this->subscriptionEngine->boot(); $this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::none()); - // commit an event + // commit two events. we expect that the hook will throw for both events but the catchup is NOT halted + $this->commitExampleContentStreamEvent(); $this->commitExampleContentStreamEvent(); $this->catchupHookForFakeProjection->expects(self::once())->method('onBeforeCatchUp'); - $this->catchupHookForFakeProjection->expects(self::once())->method('onBeforeEvent'); + // only the onBeforeEvent hook will be invoked as afterward the projection errored + $this->catchupHookForFakeProjection->expects(self::exactly(1))->method('onBeforeEvent'); $this->catchupHookForFakeProjection->expects(self::never())->method('onAfterEvent'); $this->catchupHookForFakeProjection->expects(self::once())->method('onAfterCatchUp')->willThrowException( - new \RuntimeException('This catchup hook is kaputt.') + $exception = new \RuntimeException('This catchup hook is kaputt.') ); $innerException = new \RuntimeException('Projection is kaputt.'); @@ -214,15 +269,26 @@ public function error_onAfterCatchUp_crashesAfterProjectionsArePersisted_withPro $this->secondFakeProjection->getState()->findAppliedSequenceNumbers() ); - $expectedFailure = null; - try { - $this->subscriptionEngine->catchUpActive(); - } catch (\Throwable $e) { - $expectedFailure = $e; - } - self::assertInstanceOf(CatchUpFailed::class, $expectedFailure); + $expectedWrappedException = new CatchUpHookFailed( + 'Hook "" failed "onAfterCatchUp": This catchup hook is kaputt.', + 1733243960, + $exception, + [] + ); + + // two errors for both of the events + $result = $this->subscriptionEngine->catchUpActive(); - self::assertSame($expectedFailure->getMessage(), 'Subscriber "Vendor.Package:SecondFakeProjection" failed onAfterCatchUp: This catchup hook is kaputt.'); + self::assertEquals( + ProcessedResult::failed( + 2, + Errors::fromArray([ + Error::forSubscription(SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), $innerException), + Error::forSubscription(SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), $expectedWrappedException), + ]) + ), + $result + ); $expectedFailure = ProjectionSubscriptionStatus::create( subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), @@ -232,7 +298,7 @@ public function error_onAfterCatchUp_crashesAfterProjectionsArePersisted_withPro setupStatus: ProjectionStatus::ok(), ); - // projection is still marked as error + // projection is marked as error self::assertEquals( $expectedFailure, $this->subscriptionStatus('Vendor.Package:SecondFakeProjection') diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/ProjectionErrorTest.php b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/ProjectionErrorTest.php index 78d3054c6b1..2b98d6289f2 100644 --- a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/ProjectionErrorTest.php +++ b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Subscription/ProjectionErrorTest.php @@ -51,7 +51,7 @@ public function projectionWithError() ); $result = $this->subscriptionEngine->catchUpActive(); - self::assertEquals(ProcessedResult::failed(1, Errors::fromArray([Error::fromSubscriptionIdAndException(SubscriptionId::fromString('Vendor.Package:FakeProjection'), $exception)])), $result); + self::assertEquals(ProcessedResult::failed(1, Errors::fromArray([Error::forSubscription(SubscriptionId::fromString('Vendor.Package:FakeProjection'), $exception)])), $result); self::assertEquals( $expectedStatusForFailedProjection, @@ -183,7 +183,7 @@ public function irreparableProjection() $result = $this->subscriptionEngine->reactivate(); self::assertEquals( ProcessedResult::failed(1, Errors::fromArray([ - Error::fromSubscriptionIdAndException(SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), $exception) + Error::forSubscription(SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), $exception) ])), $result ); diff --git a/Neos.ContentRepository.Core/Classes/Projection/CatchUpHook/CatchUpHookFailed.php b/Neos.ContentRepository.Core/Classes/Projection/CatchUpHook/CatchUpHookFailed.php new file mode 100644 index 00000000000..ccdaa36a79d --- /dev/null +++ b/Neos.ContentRepository.Core/Classes/Projection/CatchUpHook/CatchUpHookFailed.php @@ -0,0 +1,36 @@ + + * @api + */ +final class CatchUpHookFailed extends \RuntimeException implements \IteratorAggregate +{ + /** + * @internal + * @param array<\Throwable> $additionalExceptions + */ + public function __construct( + string $message, + int $code, + \Throwable $exception, + private readonly array $additionalExceptions + ) { + parent::__construct($message, $code, $exception); + } + + public function getIterator(): \Traversable + { + $previous = $this->getPrevious(); + if ($previous !== null) { + yield $previous; + } + yield from $this->additionalExceptions; + } +} diff --git a/Neos.ContentRepository.Core/Classes/Projection/CatchUpHook/CatchUpHookInterface.php b/Neos.ContentRepository.Core/Classes/Projection/CatchUpHook/CatchUpHookInterface.php index 83e9e392c41..b95c9e26d7e 100644 --- a/Neos.ContentRepository.Core/Classes/Projection/CatchUpHook/CatchUpHookInterface.php +++ b/Neos.ContentRepository.Core/Classes/Projection/CatchUpHook/CatchUpHookInterface.php @@ -10,7 +10,7 @@ use Neos\EventStore\Model\EventEnvelope; /** - * This is an internal API with which you can hook into the catch-up process of a Projection. + * This is an api with which you can hook into the catch-up process of a projection. * * To register such a CatchUpHook, create a corresponding {@see CatchUpHookFactoryInterface} * and pass it to {@see ProjectionFactoryInterface::build()}. @@ -23,8 +23,10 @@ interface CatchUpHookInterface * This hook is called at the beginning of a catch-up run; * AFTER the Database Lock is acquired, BEFORE any projection was called. * - * Note that any errors thrown will cause the catchup to directly halt, - * and no projections or their subscriber state are updated. + * Note that any errors thrown will be ignored and the catchup will start as normal. + * The collect errors will be returned and rethrown by the content repository. + * + * @throws CatchUpHookFailed */ public function onBeforeCatchUp(SubscriptionStatus $subscriptionStatus): void; @@ -32,8 +34,10 @@ public function onBeforeCatchUp(SubscriptionStatus $subscriptionStatus): void; * This hook is called for every event during the catchup process, **before** the projection * is updated but in the same transaction: {@see ProjectionInterface::transactional()}. * - * Note that any errors thrown will cause the catchup to directly halt, - * and no projections or their subscriber state are updated, as the transaction is rolled back. + * Note that any errors thrown will be ignored and the catchup will continue as normal. + * The collect errors will be returned and rethrown by the content repository. + * + * @throws CatchUpHookFailed */ public function onBeforeEvent(EventInterface $eventInstance, EventEnvelope $eventEnvelope): void; @@ -41,8 +45,10 @@ public function onBeforeEvent(EventInterface $eventInstance, EventEnvelope $even * This hook is called for every event during the catchup process, **after** the projection * is updated but in the same transaction: {@see ProjectionInterface::transactional()}. * - * Note that any errors thrown will cause the catchup to directly halt, - * and no projections or their subscriber state are updated, as the transaction is rolled back. + * Note that any errors thrown will be ignored and the catchup will continue as normal. + * The collect errors will be returned and rethrown by the content repository. + * + * @throws CatchUpHookFailed */ public function onAfterEvent(EventInterface $eventInstance, EventEnvelope $eventEnvelope): void; @@ -50,8 +56,12 @@ public function onAfterEvent(EventInterface $eventInstance, EventEnvelope $event * This hook is called at the END of a catch-up run * BEFORE the Database Lock is released, but AFTER the transaction is commited. * - * Note that any errors thrown will bubble up and do not implicate the projection. - * The projection and their new status and position will already be persisted without rollback. + * The projection and their new status and position are already persisted. + * + * Note that any errors thrown will be ignored and the catchup will finish as normal. + * The collect errors will be returned and rethrown by the content repository. + * + * @throws CatchUpHookFailed */ public function onAfterCatchUp(): void; } diff --git a/Neos.ContentRepository.Core/Classes/Projection/CatchUpHook/DelegatingCatchUpHook.php b/Neos.ContentRepository.Core/Classes/Projection/CatchUpHook/DelegatingCatchUpHook.php index 12a2e73c9b8..a7e0d109e22 100644 --- a/Neos.ContentRepository.Core/Classes/Projection/CatchUpHook/DelegatingCatchUpHook.php +++ b/Neos.ContentRepository.Core/Classes/Projection/CatchUpHook/DelegatingCatchUpHook.php @@ -14,7 +14,7 @@ * * @internal */ -final class DelegatingCatchUpHook implements CatchUpHookInterface +final readonly class DelegatingCatchUpHook implements CatchUpHookInterface { /** * @var CatchUpHookInterface[] @@ -29,29 +29,62 @@ public function __construct( public function onBeforeCatchUp(SubscriptionStatus $subscriptionStatus): void { - foreach ($this->catchUpHooks as $catchUpHook) { - $catchUpHook->onBeforeCatchUp($subscriptionStatus); - } + $this->delegateHooks( + fn (CatchUpHookInterface $catchUpHook) => $catchUpHook->onBeforeCatchUp($subscriptionStatus), + 'onBeforeCatchUp' + ); } public function onBeforeEvent(EventInterface $eventInstance, EventEnvelope $eventEnvelope): void { - foreach ($this->catchUpHooks as $catchUpHook) { - $catchUpHook->onBeforeEvent($eventInstance, $eventEnvelope); - } + $this->delegateHooks( + fn (CatchUpHookInterface $catchUpHook) => $catchUpHook->onBeforeEvent($eventInstance, $eventEnvelope), + 'onBeforeEvent' + ); } public function onAfterEvent(EventInterface $eventInstance, EventEnvelope $eventEnvelope): void { - foreach ($this->catchUpHooks as $catchUpHook) { - $catchUpHook->onAfterEvent($eventInstance, $eventEnvelope); - } + $this->delegateHooks( + fn (CatchUpHookInterface $catchUpHook) => $catchUpHook->onAfterEvent($eventInstance, $eventEnvelope), + 'onAfterEvent' + ); } public function onAfterCatchUp(): void { + $this->delegateHooks( + fn (CatchUpHookInterface $catchUpHook) => $catchUpHook->onAfterCatchUp(), + 'onAfterCatchUp' + ); + } + + /** + * @param \Closure(CatchUpHookInterface): void $closure + * @return void + */ + private function delegateHooks(\Closure $closure, string $hookName): void + { + /** @var array<\Throwable> $errors */ + $errors = []; + $firstFailedCatchupHook = null; foreach ($this->catchUpHooks as $catchUpHook) { - $catchUpHook->onAfterCatchUp(); + try { + $closure($catchUpHook); + } catch (\Throwable $e) { + $firstFailedCatchupHook ??= substr(strrchr($catchUpHook::class, '\\') ?: '', 1); + $errors[] = $e; + } + } + if ($errors !== []) { + $firstError = array_shift($errors); + $additionalMessage = $errors !== [] ? sprintf(' (and %d other)', count($errors)) : ''; + throw new CatchUpHookFailed( + sprintf('Hook "%s"%s failed "%s": %s', $firstFailedCatchupHook, $additionalMessage, $hookName, $firstError->getMessage()), + 1733243960, + $firstError, + $errors + ); } } } diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Engine/Error.php b/Neos.ContentRepository.Core/Classes/Subscription/Engine/Error.php index 546f82c5d3c..98ae41bd1d1 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Engine/Error.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Engine/Error.php @@ -9,16 +9,16 @@ /** * @internal implementation detail of the catchup */ -final class Error +final readonly class Error { private function __construct( - public readonly SubscriptionId $subscriptionId, - public readonly string $message, - public readonly \Throwable $throwable, + public SubscriptionId $subscriptionId, + public string $message, + public \Throwable $throwable, ) { } - public static function fromSubscriptionIdAndException(SubscriptionId $subscriptionId, \Throwable $exception): self + public static function forSubscription(SubscriptionId $subscriptionId, \Throwable $exception): self { return new self( $subscriptionId, diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php b/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php index cab0ec97b4b..7cbddd43cbb 100644 --- a/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php +++ b/Neos.ContentRepository.Core/Classes/Subscription/Engine/SubscriptionEngine.php @@ -6,11 +6,9 @@ use Doctrine\DBAL\Exception\TableNotFoundException; use Neos\ContentRepository\Core\ContentRepository; -use Neos\ContentRepository\Core\EventStore\EventInterface; use Neos\ContentRepository\Core\EventStore\EventNormalizer; use Neos\ContentRepository\Core\Service\ContentRepositoryMaintainer; use Neos\ContentRepository\Core\Subscription\DetachedSubscriptionStatus; -use Neos\ContentRepository\Core\Subscription\Exception\CatchUpFailed; use Neos\ContentRepository\Core\Subscription\Exception\SubscriptionEngineAlreadyProcessingException; use Neos\ContentRepository\Core\Subscription\ProjectionSubscriptionStatus; use Neos\ContentRepository\Core\Subscription\Store\SubscriptionCriteria; @@ -18,14 +16,12 @@ use Neos\ContentRepository\Core\Subscription\Subscriber\Subscribers; use Neos\ContentRepository\Core\Subscription\Subscription; use Neos\ContentRepository\Core\Subscription\SubscriptionError; -use Neos\ContentRepository\Core\Subscription\SubscriptionId; use Neos\ContentRepository\Core\Subscription\Subscriptions; use Neos\ContentRepository\Core\Subscription\SubscriptionStatus; use Neos\ContentRepository\Core\Subscription\SubscriptionStatusCollection; use Neos\ContentRepository\Core\Subscription\SubscriptionStatusFilter; use Neos\EventStore\EventStoreInterface; use Neos\EventStore\Model\Event\SequenceNumber; -use Neos\EventStore\Model\EventEnvelope; use Neos\EventStore\Model\EventStream\VirtualStreamName; use Psr\Log\LoggerInterface; @@ -223,7 +219,7 @@ private function setupSubscription(Subscription $subscription): ?Error $subscription->position, SubscriptionError::fromPreviousStatusAndException($subscription->status, $e) ); - return Error::fromSubscriptionIdAndException($subscription->id, $e); + return Error::forSubscription($subscription->id, $e); } if ($subscription->status === SubscriptionStatus::ACTIVE) { @@ -248,7 +244,7 @@ private function resetSubscription(Subscription $subscription): ?Error $subscriber->projection->resetState(); } catch (\Throwable $e) { $this->logger?->error(sprintf('Subscription Engine: Subscriber "%s" for "%s" has an error in the resetState method: %s', $subscriber::class, $subscription->id->value, $e->getMessage())); - return Error::fromSubscriptionIdAndException($subscription->id, $e); + return Error::forSubscription($subscription->id, $e); } $this->subscriptionStore->update( $subscription->id, @@ -267,7 +263,11 @@ private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, Subs $subscriptionCriteria = SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, $status); - $result = $this->subscriptionStore->transactional(function () use ($subscriptionCriteria, $progressClosure, &$subscriptionsToInvokeBeforeAndAfterCatchUpHooks) { + $numberOfProcessedEvents = 0; + /** @var array $errors */ + $errors = []; + + $this->subscriptionStore->transactional(function () use ($subscriptionCriteria, $progressClosure, &$subscriptionsToInvokeBeforeAndAfterCatchUpHooks, &$numberOfProcessedEvents, &$errors) { $subscriptionsToCatchup = $this->subscriptionStore->findByCriteriaForUpdate($subscriptionCriteria); foreach ($subscriptionsToCatchup as $subscription) { if (!$this->subscribers->contain($subscription->id)) { @@ -285,7 +285,7 @@ private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, Subs if ($subscriptionsToCatchup->isEmpty()) { $this->logger?->info('Subscription Engine: No subscriptions matched criteria. Finishing catch up.'); - return ProcessedResult::success(0); + return; } $subscriptionsToInvokeBeforeAndAfterCatchUpHooks = $subscriptionsToCatchup; @@ -293,19 +293,13 @@ private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, Subs try { $this->subscribers->get($subscription->id)->catchUpHook?->onBeforeCatchUp($subscription->status); } catch (\Throwable $e) { - // analog to onAfterCatchUp, we tolerate no exceptions here and consider it a critical developer error. - $message = sprintf('Subscriber "%s" failed onBeforeCatchUp: %s', $subscription->id->value, $e->getMessage()); - $this->logger?->critical($message); - throw new CatchUpFailed($message, 1732374000, $e); // todo throw here or in delegating hook? + $errors[] = Error::forSubscription($subscription->id, $e); } } $startSequenceNumber = $subscriptionsToCatchup->lowestPosition()?->next() ?? SequenceNumber::none(); $this->logger?->debug(sprintf('Subscription Engine: Event stream is processed from position %s.', $startSequenceNumber->value)); - /** @var array $errors */ - $errors = []; - $numberOfProcessedEvents = 0; /** @var array $highestSequenceNumberForSubscriber */ $highestSequenceNumberForSubscriber = []; @@ -326,14 +320,19 @@ private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, Subs } $subscriber = $this->subscribers->get($subscription->id); - $subscriber->catchUpHook?->onBeforeEvent($domainEvent, $eventEnvelope); + try { + $subscriber->catchUpHook?->onBeforeEvent($domainEvent, $eventEnvelope); + } catch (\Throwable $e) { + $errors[] = Error::forSubscription($subscription->id, $e); + } + $this->subscriptionStore->createSavepoint(); try { $subscriber->projection->apply($domainEvent, $eventEnvelope); } catch (\Throwable $e) { // ERROR Case: $this->logger?->error(sprintf('Subscription Engine: Subscriber "%s" for "%s" could not process the event "%s" (sequence number: %d): %s', $subscriber::class, $subscription->id->value, $eventEnvelope->event->type->value, $eventEnvelope->sequenceNumber->value, $e->getMessage())); - $error = Error::fromSubscriptionIdAndException($subscription->id, $e); + $error = Error::forSubscription($subscription->id, $e); // 1.) roll back the partially applied event on the subscriber $this->subscriptionStore->rollbackSavepoint(); @@ -357,7 +356,11 @@ private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, Subs $this->subscriptionStore->releaseSavepoint(); $highestSequenceNumberForSubscriber[$subscription->id->value] = $eventEnvelope->sequenceNumber; - $subscriber->catchUpHook?->onAfterEvent($domainEvent, $eventEnvelope); + try { + $subscriber->catchUpHook?->onAfterEvent($domainEvent, $eventEnvelope); + } catch (\Throwable $e) { + $errors[] = Error::forSubscription($subscription->id, $e); + } } $numberOfProcessedEvents++; } @@ -375,7 +378,6 @@ private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, Subs } } $this->logger?->info(sprintf('Subscription Engine: Finish catch up. %d processed events %d errors.', $numberOfProcessedEvents, count($errors))); - return $errors === [] ? ProcessedResult::success($numberOfProcessedEvents) : ProcessedResult::failed($numberOfProcessedEvents, Errors::fromArray($errors)); }); // todo do we need want to invoke for failed projections onAfterCatchUp, as onBeforeCatchUp was invoked already and to be consistent to "shutdown" this catchup iteration? @@ -384,14 +386,11 @@ private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, Subs try { $this->subscribers->get($subscription->id)->catchUpHook?->onAfterCatchUp(); } catch (\Throwable $e) { - // analog to onBeforeCatchUp, we tolerate no exceptions here and consider it a critical developer error. - $message = sprintf('Subscriber "%s" failed onAfterCatchUp: %s', $subscription->id->value, $e->getMessage()); - $this->logger?->critical($message); - throw new CatchUpFailed($message, 1732374000, $e); + $errors[] = Error::forSubscription($subscription->id, $e); } } - return $result; + return $errors === [] ? ProcessedResult::success($numberOfProcessedEvents) : ProcessedResult::failed($numberOfProcessedEvents, Errors::fromArray($errors)); } /** diff --git a/Neos.ContentRepository.Core/Classes/Subscription/Exception/CatchUpFailed.php b/Neos.ContentRepository.Core/Classes/Subscription/Exception/CatchUpFailed.php deleted file mode 100644 index 68cce1e2ed1..00000000000 --- a/Neos.ContentRepository.Core/Classes/Subscription/Exception/CatchUpFailed.php +++ /dev/null @@ -1,16 +0,0 @@ -