Skip to content

Commit

Permalink
TASK: Prevent catchup hooks from halting the projections
Browse files Browse the repository at this point in the history
updates bfb4655 to not do a full rollback if catchup hooks fail but continue and collect their errors, which will be rethrown later with the advantage that the catchup worked!
  • Loading branch information
mhsdesign committed Dec 3, 2024
1 parent bfb4655 commit aa2e7b1
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 159 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public function projectionWithError()
);

$result = $this->subscriptionEngine->catchUpActive();
self::assertEquals(ProcessedResult::failed(1, Errors::fromArray([Error::fromSubscriptionIdAndException(SubscriptionId::fromString('Vendor.Package:FakeProjection'), $exception)])), $result);
self::assertEquals(ProcessedResult::failed(1, Errors::fromArray([Error::forSubscription(SubscriptionId::fromString('Vendor.Package:FakeProjection'), $exception)])), $result);

self::assertEquals(
$expectedStatusForFailedProjection,
Expand Down Expand Up @@ -183,7 +183,7 @@ public function irreparableProjection()
$result = $this->subscriptionEngine->reactivate();
self::assertEquals(
ProcessedResult::failed(1, Errors::fromArray([
Error::fromSubscriptionIdAndException(SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), $exception)
Error::forSubscription(SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'), $exception)
])),
$result
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

declare(strict_types=1);

namespace Neos\ContentRepository\Core\Projection\CatchUpHook;

/**
* Thrown if a delegated catchup hook fails
*
* @implements \IteratorAggregate<\Throwable>
* @api
*/
final class CatchUpHookFailed extends \RuntimeException implements \IteratorAggregate
{
/**
* @internal
* @param array<\Throwable> $additionalExceptions
*/
public function __construct(
string $message,
int $code,
\Throwable $exception,
private readonly array $additionalExceptions
) {
parent::__construct($message, $code, $exception);
}

public function getIterator(): \Traversable
{
$previous = $this->getPrevious();
if ($previous !== null) {
yield $previous;
}
yield from $this->additionalExceptions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use Neos\EventStore\Model\EventEnvelope;

/**
* This is an internal API with which you can hook into the catch-up process of a Projection.
* This is an api with which you can hook into the catch-up process of a projection.
*
* To register such a CatchUpHook, create a corresponding {@see CatchUpHookFactoryInterface}
* and pass it to {@see ProjectionFactoryInterface::build()}.
Expand All @@ -23,35 +23,45 @@ interface CatchUpHookInterface
* This hook is called at the beginning of a catch-up run;
* AFTER the Database Lock is acquired, BEFORE any projection was called.
*
* Note that any errors thrown will cause the catchup to directly halt,
* and no projections or their subscriber state are updated.
* Note that any errors thrown will be ignored and the catchup will start as normal.
* The collect errors will be returned and rethrown by the content repository.
*
* @throws CatchUpHookFailed
*/
public function onBeforeCatchUp(SubscriptionStatus $subscriptionStatus): void;

/**
* This hook is called for every event during the catchup process, **before** the projection
* is updated but in the same transaction: {@see ProjectionInterface::transactional()}.
*
* Note that any errors thrown will cause the catchup to directly halt,
* and no projections or their subscriber state are updated, as the transaction is rolled back.
* Note that any errors thrown will be ignored and the catchup will continue as normal.
* The collect errors will be returned and rethrown by the content repository.
*
* @throws CatchUpHookFailed
*/
public function onBeforeEvent(EventInterface $eventInstance, EventEnvelope $eventEnvelope): void;

/**
* This hook is called for every event during the catchup process, **after** the projection
* is updated but in the same transaction: {@see ProjectionInterface::transactional()}.
*
* Note that any errors thrown will cause the catchup to directly halt,
* and no projections or their subscriber state are updated, as the transaction is rolled back.
* Note that any errors thrown will be ignored and the catchup will continue as normal.
* The collect errors will be returned and rethrown by the content repository.
*
* @throws CatchUpHookFailed
*/
public function onAfterEvent(EventInterface $eventInstance, EventEnvelope $eventEnvelope): void;

/**
* This hook is called at the END of a catch-up run
* BEFORE the Database Lock is released, but AFTER the transaction is commited.
*
* Note that any errors thrown will bubble up and do not implicate the projection.
* The projection and their new status and position will already be persisted without rollback.
* The projection and their new status and position are already persisted.
*
* Note that any errors thrown will be ignored and the catchup will finish as normal.
* The collect errors will be returned and rethrown by the content repository.
*
* @throws CatchUpHookFailed
*/
public function onAfterCatchUp(): void;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*
* @internal
*/
final class DelegatingCatchUpHook implements CatchUpHookInterface
final readonly class DelegatingCatchUpHook implements CatchUpHookInterface
{
/**
* @var CatchUpHookInterface[]
Expand All @@ -29,29 +29,62 @@ public function __construct(

public function onBeforeCatchUp(SubscriptionStatus $subscriptionStatus): void
{
foreach ($this->catchUpHooks as $catchUpHook) {
$catchUpHook->onBeforeCatchUp($subscriptionStatus);
}
$this->delegateHooks(
fn (CatchUpHookInterface $catchUpHook) => $catchUpHook->onBeforeCatchUp($subscriptionStatus),
'onBeforeCatchUp'
);
}

public function onBeforeEvent(EventInterface $eventInstance, EventEnvelope $eventEnvelope): void
{
foreach ($this->catchUpHooks as $catchUpHook) {
$catchUpHook->onBeforeEvent($eventInstance, $eventEnvelope);
}
$this->delegateHooks(
fn (CatchUpHookInterface $catchUpHook) => $catchUpHook->onBeforeEvent($eventInstance, $eventEnvelope),
'onBeforeEvent'
);
}

public function onAfterEvent(EventInterface $eventInstance, EventEnvelope $eventEnvelope): void
{
foreach ($this->catchUpHooks as $catchUpHook) {
$catchUpHook->onAfterEvent($eventInstance, $eventEnvelope);
}
$this->delegateHooks(
fn (CatchUpHookInterface $catchUpHook) => $catchUpHook->onAfterEvent($eventInstance, $eventEnvelope),
'onAfterEvent'
);
}

public function onAfterCatchUp(): void
{
$this->delegateHooks(
fn (CatchUpHookInterface $catchUpHook) => $catchUpHook->onAfterCatchUp(),
'onAfterCatchUp'
);
}

/**
* @param \Closure(CatchUpHookInterface): void $closure
* @return void
*/
private function delegateHooks(\Closure $closure, string $hookName): void
{
/** @var array<\Throwable> $errors */
$errors = [];
$firstFailedCatchupHook = null;
foreach ($this->catchUpHooks as $catchUpHook) {
$catchUpHook->onAfterCatchUp();
try {
$closure($catchUpHook);
} catch (\Throwable $e) {
$firstFailedCatchupHook ??= substr(strrchr($catchUpHook::class, '\\') ?: '', 1);
$errors[] = $e;
}
}
if ($errors !== []) {
$firstError = array_shift($errors);
$additionalMessage = $errors !== [] ? sprintf(' (and %d other)', count($errors)) : '';
throw new CatchUpHookFailed(
sprintf('Hook "%s"%s failed "%s": %s', $firstFailedCatchupHook, $additionalMessage, $hookName, $firstError->getMessage()),
1733243960,
$firstError,
$errors
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@
/**
* @internal implementation detail of the catchup
*/
final class Error
final readonly class Error
{
private function __construct(
public readonly SubscriptionId $subscriptionId,
public readonly string $message,
public readonly \Throwable $throwable,
public SubscriptionId $subscriptionId,
public string $message,
public \Throwable $throwable,
) {
}

public static function fromSubscriptionIdAndException(SubscriptionId $subscriptionId, \Throwable $exception): self
public static function forSubscription(SubscriptionId $subscriptionId, \Throwable $exception): self
{
return new self(
$subscriptionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,22 @@

use Doctrine\DBAL\Exception\TableNotFoundException;
use Neos\ContentRepository\Core\ContentRepository;
use Neos\ContentRepository\Core\EventStore\EventInterface;
use Neos\ContentRepository\Core\EventStore\EventNormalizer;
use Neos\ContentRepository\Core\Service\ContentRepositoryMaintainer;
use Neos\ContentRepository\Core\Subscription\DetachedSubscriptionStatus;
use Neos\ContentRepository\Core\Subscription\Exception\CatchUpFailed;
use Neos\ContentRepository\Core\Subscription\Exception\SubscriptionEngineAlreadyProcessingException;
use Neos\ContentRepository\Core\Subscription\ProjectionSubscriptionStatus;
use Neos\ContentRepository\Core\Subscription\Store\SubscriptionCriteria;
use Neos\ContentRepository\Core\Subscription\Store\SubscriptionStoreInterface;
use Neos\ContentRepository\Core\Subscription\Subscriber\Subscribers;
use Neos\ContentRepository\Core\Subscription\Subscription;
use Neos\ContentRepository\Core\Subscription\SubscriptionError;
use Neos\ContentRepository\Core\Subscription\SubscriptionId;
use Neos\ContentRepository\Core\Subscription\Subscriptions;
use Neos\ContentRepository\Core\Subscription\SubscriptionStatus;
use Neos\ContentRepository\Core\Subscription\SubscriptionStatusCollection;
use Neos\ContentRepository\Core\Subscription\SubscriptionStatusFilter;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Model\Event\SequenceNumber;
use Neos\EventStore\Model\EventEnvelope;
use Neos\EventStore\Model\EventStream\VirtualStreamName;
use Psr\Log\LoggerInterface;

Expand Down Expand Up @@ -223,7 +219,7 @@ private function setupSubscription(Subscription $subscription): ?Error
$subscription->position,
SubscriptionError::fromPreviousStatusAndException($subscription->status, $e)
);
return Error::fromSubscriptionIdAndException($subscription->id, $e);
return Error::forSubscription($subscription->id, $e);
}

if ($subscription->status === SubscriptionStatus::ACTIVE) {
Expand All @@ -248,7 +244,7 @@ private function resetSubscription(Subscription $subscription): ?Error
$subscriber->projection->resetState();
} catch (\Throwable $e) {
$this->logger?->error(sprintf('Subscription Engine: Subscriber "%s" for "%s" has an error in the resetState method: %s', $subscriber::class, $subscription->id->value, $e->getMessage()));
return Error::fromSubscriptionIdAndException($subscription->id, $e);
return Error::forSubscription($subscription->id, $e);
}
$this->subscriptionStore->update(
$subscription->id,
Expand All @@ -267,7 +263,11 @@ private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, Subs

$subscriptionCriteria = SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, $status);

$result = $this->subscriptionStore->transactional(function () use ($subscriptionCriteria, $progressClosure, &$subscriptionsToInvokeBeforeAndAfterCatchUpHooks) {
$numberOfProcessedEvents = 0;
/** @var array<Error> $errors */
$errors = [];

$this->subscriptionStore->transactional(function () use ($subscriptionCriteria, $progressClosure, &$subscriptionsToInvokeBeforeAndAfterCatchUpHooks, &$numberOfProcessedEvents, &$errors) {
$subscriptionsToCatchup = $this->subscriptionStore->findByCriteriaForUpdate($subscriptionCriteria);
foreach ($subscriptionsToCatchup as $subscription) {
if (!$this->subscribers->contain($subscription->id)) {
Expand All @@ -285,27 +285,21 @@ private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, Subs

if ($subscriptionsToCatchup->isEmpty()) {
$this->logger?->info('Subscription Engine: No subscriptions matched criteria. Finishing catch up.');
return ProcessedResult::success(0);
return;
}

$subscriptionsToInvokeBeforeAndAfterCatchUpHooks = $subscriptionsToCatchup;
foreach ($subscriptionsToInvokeBeforeAndAfterCatchUpHooks as $subscription) {
try {
$this->subscribers->get($subscription->id)->catchUpHook?->onBeforeCatchUp($subscription->status);
} catch (\Throwable $e) {
// analog to onAfterCatchUp, we tolerate no exceptions here and consider it a critical developer error.
$message = sprintf('Subscriber "%s" failed onBeforeCatchUp: %s', $subscription->id->value, $e->getMessage());
$this->logger?->critical($message);
throw new CatchUpFailed($message, 1732374000, $e); // todo throw here or in delegating hook?
$errors[] = Error::forSubscription($subscription->id, $e);
}
}

$startSequenceNumber = $subscriptionsToCatchup->lowestPosition()?->next() ?? SequenceNumber::none();
$this->logger?->debug(sprintf('Subscription Engine: Event stream is processed from position %s.', $startSequenceNumber->value));

/** @var array<Error> $errors */
$errors = [];
$numberOfProcessedEvents = 0;
/** @var array<string,SequenceNumber> $highestSequenceNumberForSubscriber */
$highestSequenceNumberForSubscriber = [];

Expand All @@ -326,14 +320,19 @@ private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, Subs
}
$subscriber = $this->subscribers->get($subscription->id);

$subscriber->catchUpHook?->onBeforeEvent($domainEvent, $eventEnvelope);
try {
$subscriber->catchUpHook?->onBeforeEvent($domainEvent, $eventEnvelope);
} catch (\Throwable $e) {
$errors[] = Error::forSubscription($subscription->id, $e);
}

$this->subscriptionStore->createSavepoint();
try {
$subscriber->projection->apply($domainEvent, $eventEnvelope);
} catch (\Throwable $e) {
// ERROR Case:
$this->logger?->error(sprintf('Subscription Engine: Subscriber "%s" for "%s" could not process the event "%s" (sequence number: %d): %s', $subscriber::class, $subscription->id->value, $eventEnvelope->event->type->value, $eventEnvelope->sequenceNumber->value, $e->getMessage()));
$error = Error::fromSubscriptionIdAndException($subscription->id, $e);
$error = Error::forSubscription($subscription->id, $e);

// 1.) roll back the partially applied event on the subscriber
$this->subscriptionStore->rollbackSavepoint();
Expand All @@ -357,7 +356,11 @@ private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, Subs
$this->subscriptionStore->releaseSavepoint();
$highestSequenceNumberForSubscriber[$subscription->id->value] = $eventEnvelope->sequenceNumber;

$subscriber->catchUpHook?->onAfterEvent($domainEvent, $eventEnvelope);
try {
$subscriber->catchUpHook?->onAfterEvent($domainEvent, $eventEnvelope);
} catch (\Throwable $e) {
$errors[] = Error::forSubscription($subscription->id, $e);
}
}
$numberOfProcessedEvents++;
}
Expand All @@ -375,7 +378,6 @@ private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, Subs
}
}
$this->logger?->info(sprintf('Subscription Engine: Finish catch up. %d processed events %d errors.', $numberOfProcessedEvents, count($errors)));
return $errors === [] ? ProcessedResult::success($numberOfProcessedEvents) : ProcessedResult::failed($numberOfProcessedEvents, Errors::fromArray($errors));
});

// todo do we need want to invoke for failed projections onAfterCatchUp, as onBeforeCatchUp was invoked already and to be consistent to "shutdown" this catchup iteration?
Expand All @@ -384,14 +386,11 @@ private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, Subs
try {
$this->subscribers->get($subscription->id)->catchUpHook?->onAfterCatchUp();
} catch (\Throwable $e) {
// analog to onBeforeCatchUp, we tolerate no exceptions here and consider it a critical developer error.
$message = sprintf('Subscriber "%s" failed onAfterCatchUp: %s', $subscription->id->value, $e->getMessage());
$this->logger?->critical($message);
throw new CatchUpFailed($message, 1732374000, $e);
$errors[] = Error::forSubscription($subscription->id, $e);
}
}

return $result;
return $errors === [] ? ProcessedResult::success($numberOfProcessedEvents) : ProcessedResult::failed($numberOfProcessedEvents, Errors::fromArray($errors));
}

/**
Expand Down

This file was deleted.

0 comments on commit aa2e7b1

Please sign in to comment.