Skip to content

Commit

Permalink
TASK: Change that hooks are not executed in the same savepoint and do…
Browse files Browse the repository at this point in the history
… not put the projection in ERROR state

... instead a full rollback is done (which might be still changed)

- calls "internals" of `ProjectionSubscriber` directly from the subscription engine
- inline `handleEvent` method
  • Loading branch information
mhsdesign committed Dec 3, 2024
1 parent 582c5e6 commit bfb4655
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ final protected function commitExampleContentStreamEvent(): void
);
}

final protected function expectOkayStatus($subscriptionId, SubscriptionStatus $status, SequenceNumber $sequenceNumber): void
final protected function expectOkayStatus(string $subscriptionId, SubscriptionStatus $status, SequenceNumber $sequenceNumber): void
{
$actual = $this->subscriptionStatus($subscriptionId);
self::assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,38 +24,31 @@ public function error_onBeforeEvent_projectionIsNotRun()
$this->subscriptionEngine->setup();
$this->subscriptionEngine->boot();

// commit an event
// commit two events, we expect neither to be catchupd correctly because handing on the first fails
$this->commitExampleContentStreamEvent();
$this->commitExampleContentStreamEvent();

$this->catchupHookForFakeProjection->expects(self::once())->method('onBeforeCatchUp')->with(SubscriptionStatus::ACTIVE);
// Todo test that onBeforeEvent|onAfterEvent are in the same transaction and that a rollback will also revert their state
$this->catchupHookForFakeProjection->expects(self::once())->method('onBeforeEvent')->with(self::isInstanceOf(ContentStreamWasCreated::class))->willThrowException(
$exception = new \RuntimeException('This catchup hook is kaputt.')
);
$this->catchupHookForFakeProjection->expects(self::never())->method('onAfterEvent');
$this->catchupHookForFakeProjection->expects(self::once())->method('onAfterCatchUp');

$this->secondFakeProjection->injectSaboteur(fn () => self::fail('Projection apply is not expected to be called!'));

$expectedFailure = ProjectionSubscriptionStatus::create(
subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'),
subscriptionStatus: SubscriptionStatus::ERROR,
subscriptionPosition: SequenceNumber::none(),
subscriptionError: SubscriptionError::fromPreviousStatusAndException(SubscriptionStatus::ACTIVE, $exception),
setupStatus: ProjectionStatus::ok(),
);
$this->catchupHookForFakeProjection->expects(self::never())->method('onAfterCatchUp');

self::assertEmpty(
$this->secondFakeProjection->getState()->findAppliedSequenceNumbers()
);

$result = $this->subscriptionEngine->catchUpActive();
self::assertSame($result->errors?->first()->message, 'This catchup hook is kaputt.');
$actualException = null;
try {
$this->subscriptionEngine->catchUpActive();
} catch (\Throwable $e) {
$actualException = $e;
}
self::assertSame($exception, $actualException);
self::assertSame('This catchup hook is kaputt.', $actualException->getMessage());

self::assertEquals(
$expectedFailure,
$this->subscriptionStatus('Vendor.Package:SecondFakeProjection')
);
$this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::none());

// must be still empty because apply was never called
self::assertEmpty(
Expand All @@ -72,38 +65,32 @@ public function error_onAfterEvent_projectionIsRolledBack()
$this->subscriptionEngine->setup();
$this->subscriptionEngine->boot();

// commit an event
// commit an events, we expect neither to be catchupd correctly because handing on the first fails
$this->commitExampleContentStreamEvent();
$this->commitExampleContentStreamEvent();

$this->catchupHookForFakeProjection->expects(self::once())->method('onBeforeCatchUp')->with(SubscriptionStatus::ACTIVE);
$this->catchupHookForFakeProjection->expects(self::once())->method('onBeforeEvent')->with(self::isInstanceOf(ContentStreamWasCreated::class));
$this->catchupHookForFakeProjection->expects(self::once())->method('onAfterEvent')->with(self::isInstanceOf(ContentStreamWasCreated::class))->willThrowException(
$exception = new \RuntimeException('This catchup hook is kaputt.')
);
// TODO pass the error subscription status to onAfterCatchUp, so that in case of an error it can be prevented that mails f.x. will be sent?
$this->catchupHookForFakeProjection->expects(self::once())->method('onAfterCatchUp');

$expectedFailure = ProjectionSubscriptionStatus::create(
subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'),
subscriptionStatus: SubscriptionStatus::ERROR,
subscriptionPosition: SequenceNumber::none(),
subscriptionError: SubscriptionError::fromPreviousStatusAndException(SubscriptionStatus::ACTIVE, $exception),
setupStatus: ProjectionStatus::ok(),
);
$this->catchupHookForFakeProjection->expects(self::never())->method('onAfterCatchUp');

self::assertEmpty(
$this->secondFakeProjection->getState()->findAppliedSequenceNumbers()
);

$result = $this->subscriptionEngine->catchUpActive();
self::assertSame($result->errors?->first()->message, 'This catchup hook is kaputt.');

self::assertEquals(
$expectedFailure,
$this->subscriptionStatus('Vendor.Package:SecondFakeProjection')
);
$actualException = null;
try {
$this->subscriptionEngine->catchUpActive();
} catch (\Throwable $e) {
$actualException = $e;
}
self::assertSame($exception, $actualException);
self::assertSame('This catchup hook is kaputt.', $actualException->getMessage());

// should be empty as we need an exact once delivery
// will be empty again because the full transaction was rolled back
$this->expectOkayStatus('Vendor.Package:SecondFakeProjection', SubscriptionStatus::ACTIVE, SequenceNumber::none());
self::assertEmpty(
$this->secondFakeProjection->getState()->findAppliedSequenceNumbers()
);
Expand Down Expand Up @@ -172,6 +159,7 @@ public function error_onAfterCatchUp_crashesAfterProjectionsArePersisted()
$this->catchupHookForFakeProjection->expects(self::once())->method('onBeforeCatchUp');
$this->catchupHookForFakeProjection->expects(self::once())->method('onBeforeEvent');
$this->catchupHookForFakeProjection->expects(self::once())->method('onAfterEvent');
// todo test that other catchup hooks are still run and all errors are collected!
$this->catchupHookForFakeProjection->expects(self::once())->method('onAfterCatchUp')->willThrowException(
new \RuntimeException('This catchup hook is kaputt.')
);
Expand Down Expand Up @@ -214,13 +202,14 @@ public function error_onAfterCatchUp_crashesAfterProjectionsArePersisted_withPro

$this->catchupHookForFakeProjection->expects(self::once())->method('onBeforeCatchUp');
$this->catchupHookForFakeProjection->expects(self::once())->method('onBeforeEvent');
$this->catchupHookForFakeProjection->expects(self::once())->method('onAfterEvent')->willThrowException(
$innerException = new \RuntimeException('Inner event handling is kaputt.')
);;
$this->catchupHookForFakeProjection->expects(self::never())->method('onAfterEvent');
$this->catchupHookForFakeProjection->expects(self::once())->method('onAfterCatchUp')->willThrowException(
new \RuntimeException('This catchup hook is kaputt.')
);

$innerException = new \RuntimeException('Projection is kaputt.');
$this->secondFakeProjection->injectSaboteur(fn () => throw $innerException);

self::assertEmpty(
$this->secondFakeProjection->getState()->findAppliedSequenceNumbers()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

use Neos\ContentRepository\Core\EventStore\EventInterface;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Subscription\Exception\CatchUpFailed;
use Neos\ContentRepository\Core\Subscription\SubscriptionStatus;
use Neos\EventStore\Model\EventEnvelope;

Expand All @@ -22,34 +21,37 @@ interface CatchUpHookInterface
{
/**
* This hook is called at the beginning of a catch-up run;
* AFTER the Database Lock is acquired, BEFORE any projection was opened.
* AFTER the Database Lock is acquired, BEFORE any projection was called.
*
* Its important that no errors are thrown, as they will cause the catchup to directly halt with a {@see CatchUpFailed} exception.
* Note that any errors thrown will cause the catchup to directly halt,
* and no projections or their subscriber state are updated.
*/
public function onBeforeCatchUp(SubscriptionStatus $subscriptionStatus): void;

/**
* This hook is called for every event during the catchup process, **before** the projection
* is updated but in the same transaction: {@see ProjectionInterface::transactional()}.
*
* Any errors will cause the transaction being rolled back, and the projection goes into {@see SubscriptionStatus::ERROR} state.
* Note that any errors thrown will cause the catchup to directly halt,
* and no projections or their subscriber state are updated, as the transaction is rolled back.
*/
public function onBeforeEvent(EventInterface $eventInstance, EventEnvelope $eventEnvelope): void;

/**
* This hook is called for every event during the catchup process, **after** the projection
* is updated but in the same transaction: {@see ProjectionInterface::transactional()}.
*
* Any errors will cause the transaction being rolled back, and the projection goes into {@see SubscriptionStatus::ERROR} state.
* Note that any errors thrown will cause the catchup to directly halt,
* and no projections or their subscriber state are updated, as the transaction is rolled back.
*/
public function onAfterEvent(EventInterface $eventInstance, EventEnvelope $eventEnvelope): void;

/**
* This hook is called at the END of a catch-up run
* BEFORE the Database Lock is released, but AFTER the transaction is commited.
*
* Its important that no errors are thrown, as they will cause the catchup to directly halt with a {@see CatchUpFailed} exception.
* The projections and their new position will already be persisted and there is no rollback.
* Note that any errors thrown will bubble up and do not implicate the projection.
* The projection and their new status and position will already be persisted without rollback.
*/
public function onAfterCatchUp(): void;
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,19 +168,6 @@ public function subscriptionStatus(SubscriptionEngineCriteria|null $criteria = n
return SubscriptionStatusCollection::fromArray($statuses);
}

private function handleEvent(EventEnvelope $eventEnvelope, EventInterface $domainEvent, SubscriptionId $subscriptionId): Error|null
{
$subscriber = $this->subscribers->get($subscriptionId);
try {
$subscriber->handle($domainEvent, $eventEnvelope);
} catch (\Throwable $e) {
$this->logger?->error(sprintf('Subscription Engine: Subscriber "%s" for "%s" could not process the event "%s" (sequence number: %d): %s', $subscriber::class, $subscriptionId->value, $eventEnvelope->event->type->value, $eventEnvelope->sequenceNumber->value, $e->getMessage()));
return Error::fromSubscriptionIdAndException($subscriptionId, $e);
}
$this->logger?->debug(sprintf('Subscription Engine: Subscriber "%s" for "%s" processed the event "%s" (sequence number: %d).', substr(strrchr($subscriber::class, '\\') ?: '', 1), $subscriptionId->value, $eventEnvelope->event->type->value, $eventEnvelope->sequenceNumber->value));
return null;
}

/**
* Find all subscribers that don't have a corresponding subscription.
* For each match a subscription is added
Expand Down Expand Up @@ -304,7 +291,7 @@ private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, Subs
$subscriptionsToInvokeBeforeAndAfterCatchUpHooks = $subscriptionsToCatchup;
foreach ($subscriptionsToInvokeBeforeAndAfterCatchUpHooks as $subscription) {
try {
$this->subscribers->get($subscription->id)->onBeforeCatchUp($subscription->status);
$this->subscribers->get($subscription->id)->catchUpHook?->onBeforeCatchUp($subscription->status);
} catch (\Throwable $e) {
// analog to onAfterCatchUp, we tolerate no exceptions here and consider it a critical developer error.
$message = sprintf('Subscriber "%s" failed onBeforeCatchUp: %s', $subscription->id->value, $e->getMessage());
Expand Down Expand Up @@ -337,11 +324,17 @@ private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, Subs
$this->logger?->debug(sprintf('Subscription Engine: Subscription "%s" is farther than the current position (%d >= %d), continue catch up.', $subscription->id->value, $subscription->position->value, $sequenceNumber->value));
continue;
}
$subscriber = $this->subscribers->get($subscription->id);

$subscriber->catchUpHook?->onBeforeEvent($domainEvent, $eventEnvelope);
$this->subscriptionStore->createSavepoint();
$error = $this->handleEvent($eventEnvelope, $domainEvent, $subscription->id);
if ($error !== null) {
try {
$subscriber->projection->apply($domainEvent, $eventEnvelope);
} catch (\Throwable $e) {
// ERROR Case:
$this->logger?->error(sprintf('Subscription Engine: Subscriber "%s" for "%s" could not process the event "%s" (sequence number: %d): %s', $subscriber::class, $subscription->id->value, $eventEnvelope->event->type->value, $eventEnvelope->sequenceNumber->value, $e->getMessage()));
$error = Error::fromSubscriptionIdAndException($subscription->id, $e);

// 1.) roll back the partially applied event on the subscriber
$this->subscriptionStore->rollbackSavepoint();
// 2.) for the leftover events we are not including this failed subscription for catchup
Expand All @@ -360,8 +353,11 @@ private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, Subs
continue;
}
// HAPPY Case:
$this->logger?->debug(sprintf('Subscription Engine: Subscriber "%s" for "%s" processed the event "%s" (sequence number: %d).', substr(strrchr($subscriber::class, '\\') ?: '', 1), $subscription->id->value, $eventEnvelope->event->type->value, $eventEnvelope->sequenceNumber->value));
$this->subscriptionStore->releaseSavepoint();
$highestSequenceNumberForSubscriber[$subscription->id->value] = $eventEnvelope->sequenceNumber;

$subscriber->catchUpHook?->onAfterEvent($domainEvent, $eventEnvelope);
}
$numberOfProcessedEvents++;
}
Expand All @@ -383,9 +379,10 @@ private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, Subs
});

// todo do we need want to invoke for failed projections onAfterCatchUp, as onBeforeCatchUp was invoked already and to be consistent to "shutdown" this catchup iteration?
// note that a catchup error in onAfterEvent would bubble up directly and never invoke onAfterCatchUp
foreach ($subscriptionsToInvokeBeforeAndAfterCatchUpHooks as $subscription) {
try {
$this->subscribers->get($subscription->id)->onAfterCatchUp();
$this->subscribers->get($subscription->id)->catchUpHook?->onAfterCatchUp();
} catch (\Throwable $e) {
// analog to onBeforeCatchUp, we tolerate no exceptions here and consider it a critical developer error.
$message = sprintf('Subscriber "%s" failed onAfterCatchUp: %s', $subscription->id->value, $e->getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
namespace Neos\ContentRepository\Core\Subscription\Exception;

/**
* Only thrown if there is no way to recover the started catchup. The transaction will be rolled back.
* Only thrown if there is no way to recover the started catchup.
*
* Todo move to delegating hook!
*
* @api
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,23 @@

namespace Neos\ContentRepository\Core\Subscription\Subscriber;

use Neos\ContentRepository\Core\EventStore\EventInterface;
use Neos\ContentRepository\Core\Projection\CatchUpHook\CatchUpHookInterface;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
use Neos\ContentRepository\Core\Subscription\SubscriptionId;
use Neos\ContentRepository\Core\Subscription\SubscriptionStatus;
use Neos\EventStore\Model\EventEnvelope;

/**
* @internal implementation detail of the catchup
*/
final class ProjectionSubscriber
final readonly class ProjectionSubscriber
{
/**
* @param ProjectionInterface<ProjectionStateInterface> $projection
*/
public function __construct(
public readonly SubscriptionId $id,
public readonly ProjectionInterface $projection,
private readonly ?CatchUpHookInterface $catchUpHook
public SubscriptionId $id,
public ProjectionInterface $projection,
public ?CatchUpHookInterface $catchUpHook
) {
}

public function onBeforeCatchUp(SubscriptionStatus $subscriptionStatus): void
{
$this->catchUpHook?->onBeforeCatchUp($subscriptionStatus);
}

public function handle(EventInterface $event, EventEnvelope $eventEnvelope): void
{
$this->catchUpHook?->onBeforeEvent($event, $eventEnvelope);
$this->projection->apply($event, $eventEnvelope);
$this->catchUpHook?->onAfterEvent($event, $eventEnvelope);
}

public function onAfterCatchUp(): void
{
$this->catchUpHook?->onAfterCatchUp();
}
}

0 comments on commit bfb4655

Please sign in to comment.