Skip to content

Commit

Permalink
TASK: Introduce dedicated CatchUpHadErrors exception
Browse files Browse the repository at this point in the history
  • Loading branch information
mhsdesign committed Dec 3, 2024
1 parent aa2e7b1 commit bccea53
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use Neos\ContentRepository\Core\Subscription\Engine\ProcessedResult;
use Neos\ContentRepository\Core\Subscription\Engine\Result;
use Neos\ContentRepository\Core\Subscription\Engine\SubscriptionEngineCriteria;
use Neos\ContentRepository\Core\Subscription\Exception\CatchUpHadErrors;
use Neos\ContentRepository\Core\Subscription\ProjectionSubscriptionStatus;
use Neos\ContentRepository\Core\Subscription\SubscriptionError;
use Neos\ContentRepository\Core\Subscription\SubscriptionId;
Expand Down Expand Up @@ -107,7 +108,7 @@ public function fixFailedProjectionViaReset()
);

$result = $this->subscriptionEngine->catchUpActive();
self::assertTrue($result->hasFailed());
self::assertTrue($result->hadErrors());

self::assertEquals(
$expectedFailure,
Expand Down Expand Up @@ -161,7 +162,7 @@ public function irreparableProjection()
// catchup active tries to apply the commited event
$result = $this->subscriptionEngine->catchUpActive();
// but fails
self::assertTrue($result->hasFailed());
self::assertTrue($result->hadErrors());
self::assertEquals($expectedFailure, $this->subscriptionStatus('Vendor.Package:SecondFakeProjection'));

// a second catchup active does not change anything
Expand Down Expand Up @@ -207,7 +208,7 @@ public function irreparableProjection()

// but booting will rethrow that error :D
$result = $this->subscriptionEngine->boot();
self::assertTrue($result->hasFailed());
self::assertTrue($result->hadErrors());
self::assertEquals(
ProjectionSubscriptionStatus::create(
subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'),
Expand Down Expand Up @@ -309,7 +310,7 @@ public function projectionIsRolledBackAfterErrorButKeepsSuccessFullEvents()
);

$result = $this->subscriptionEngine->catchUpActive();
self::assertTrue($result->hasFailed());
self::assertTrue($result->hadErrors());

$expectedFailure = ProjectionSubscriptionStatus::create(
subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'),
Expand Down Expand Up @@ -367,7 +368,7 @@ public function projectionErrorWithMultipleProjectionsInContentRepositoryHandle(
} catch (\RuntimeException $exception) {
$handleException = $exception;
}
self::assertNotNull($handleException);
self::assertInstanceOf(CatchUpHadErrors::class, $exception);
self::assertEquals('Exception in subscriber "Vendor.Package:FakeProjection" while catching up: This projection is kaputt.', $handleException->getMessage());
self::assertSame($originalException, $handleException->getPrevious());

Expand Down
9 changes: 7 additions & 2 deletions Neos.ContentRepository.Core/Classes/ContentRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName;
use Neos\ContentRepository\Core\SharedModel\Workspace\Workspaces;
use Neos\ContentRepository\Core\Subscription\Engine\SubscriptionEngine;
use Neos\ContentRepository\Core\Subscription\Exception\CatchUpHadErrors;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Exception\ConcurrencyException;
use Psr\Clock\ClockInterface;
Expand Down Expand Up @@ -98,7 +99,9 @@ public function handle(CommandInterface $command): void
$eventsToPublish = $this->enrichEventsToPublishWithMetadata($toPublish);
$this->eventStore->commit($eventsToPublish->streamName, $this->eventNormalizer->normalizeEvents($eventsToPublish->events), $eventsToPublish->expectedVersion);
$catchUpResult = $this->subscriptionEngine->catchUpActive();
$catchUpResult->throwOnFailure();
if ($catchUpResult->hadErrors()) {
throw CatchUpHadErrors::createFromErrors($catchUpResult->errors);
}
return;
}

Expand Down Expand Up @@ -128,7 +131,9 @@ public function handle(CommandInterface $command): void
// We always NEED to catchup even if there was an unexpected ConcurrencyException to make sure previous commits are handled.
// Technically it would be acceptable for the catchup to fail here (due to hook errors) because all the events are already persisted.
$catchUpResult = $this->subscriptionEngine->catchUpActive();
$catchUpResult->throwOnFailure();
if ($catchUpResult->hadErrors()) {
throw CatchUpHadErrors::createFromErrors($catchUpResult->errors);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public function removeDanglingContentStreams(\Closure $outputFn, \DateTimeImmuta

if ($danglingContentStreamsPresent) {
$result = $this->subscriptionEngine->catchUpActive();
if ($result->hasFailed()) {
if ($result->hadErrors()) {
$outputFn('Catchup after removing unused content streams led to errors. You might need to use ./flow contentstream:pruneremovedfromeventstream and replay.');
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,8 @@ public static function failed(int $numberOfProcessedEvents, Errors $errors): sel
}

/** @phpstan-assert-if-true !null $this->errors */
public function hasFailed(): bool
public function hadErrors(): bool
{
return $this->errors !== null;
}

public function throwOnFailure(): void
{
/** @var Error[] $errors */
$errors = iterator_to_array($this->errors ?? []);
if ($errors === []) {
return;
}
$firstError = array_shift($errors);

$additionalFailedSubscribers = array_map(fn (Error $error) => $error->subscriptionId->value, $errors);

$additionalErrors = $additionalFailedSubscribers === [] ? '' : sprintf(' | And subscribers %s with additional errors.', join(', ', $additionalFailedSubscribers));
$exceptionMessage = sprintf('Exception in subscriber "%s" while catching up: %s%s', $firstError->subscriptionId->value, $firstError->message, $additionalErrors);

// todo custom exception!
throw new \RuntimeException($exceptionMessage, 1732132930, $firstError->throwable);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<?php

declare(strict_types=1);

namespace Neos\ContentRepository\Core\Subscription\Exception;

use Neos\ContentRepository\Core\Projection\CatchUpHook\CatchUpHookFailed;
use Neos\ContentRepository\Core\Subscription\Engine\Error;
use Neos\ContentRepository\Core\Subscription\Engine\Errors;

/**
* Thrown if the subscribers could not be catchup without encountering errors.
*
* Still, as we collect the errors and don't halt the process the system will be still up-to-date as far as possible.
*
* Following reasons would trigger this error:
*
* - A projection has a failure in its code. Then the projection is rolled back to the last event and put into ERROR state.
* An exception will be part of this collection indicating this change. Further catchup's will not attempt to update that
* projection again, as it has to be fixed and reactivated first.
*
* - A catchup hook contains an error. In this case the projections is further updated and also all further catchup errors
* collected. This results in a {@see CatchUpHookFailed} exception in this exception collection.
*
* @implements \IteratorAggregate<\Throwable>
* @api
*/
final class CatchUpHadErrors extends \RuntimeException implements \IteratorAggregate
{
/**
* @internal
* @param array<\Throwable> $additionalExceptions
*/
private function __construct(
string $message,
int $code,
\Throwable $exception,
private readonly array $additionalExceptions
) {
parent::__construct($message, $code, $exception);
}

/**
* @internal
*/
public static function createFromErrors(Errors $errors): self
{
/** @var non-empty-array<Error> $errors */
$errors = iterator_to_array($errors);
$firstError = array_shift($errors);

$additionalFailedSubscribers = array_map(fn (Error $error) => $error->subscriptionId->value, $errors);

$additionalErrors = $additionalFailedSubscribers === [] ? '' : sprintf(' | And subscribers %s with additional errors.', join(', ', $additionalFailedSubscribers));
$exceptionMessage = sprintf('Exception in subscriber "%s" while catching up: %s%s', $firstError->subscriptionId->value, $firstError->message, $additionalErrors);

throw new self($exceptionMessage, 1732132930, $firstError->throwable, array_map(fn (Error $error) => $error->throwable, $errors));
}

public function getIterator(): \Traversable
{
$previous = $this->getPrevious();
if ($previous !== null) {
yield $previous;
}
yield from $this->additionalExceptions;
}
}

0 comments on commit bccea53

Please sign in to comment.