Skip to content

Commit

Permalink
TASK: Implement that onAfterCatchUp called _after_ everything is pe…
Browse files Browse the repository at this point in the history
…rsisted

Test provided by 54b24b8

This partially reverts moving `$this->subscriptionStore->transactional` outside again from eb0d792, as we have to do a task afterwards still
  • Loading branch information
mhsdesign committed Dec 3, 2024
1 parent c47d181 commit fd9faa4
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public function error_onAfterCatchUp_crashesAfterProjectionsArePersisted_withPro
}
self::assertInstanceOf(CatchUpFailed::class, $expectedFailure);

self::assertSame($expectedFailure->getMessage(), 'Subscriber "Vendor.Package:SecondFakeProjection" had an error and also failed onAfterCatchUp: This catchup hook is kaputt.');
self::assertSame($expectedFailure->getMessage(), 'Subscriber "Vendor.Package:SecondFakeProjection" failed onAfterCatchUp: This catchup hook is kaputt.');

$expectedFailure = ProjectionSubscriptionStatus::create(
subscriptionId: SubscriptionId::fromString('Vendor.Package:SecondFakeProjection'),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,46 +80,25 @@ public function setup(SubscriptionEngineCriteria|null $criteria = null): Result
public function boot(SubscriptionEngineCriteria|null $criteria = null, \Closure $progressCallback = null): ProcessedResult
{
$criteria ??= SubscriptionEngineCriteria::noConstraints();
return $this->processExclusively(fn () => $this->subscriptionStore->transactional(
function () use ($criteria, $progressCallback) {
$this->logger?->info('Subscription Engine: Start catching up subscriptions in state "BOOTING".');
$subscriptionsToCatchup = $this->subscriptionStore->findByCriteriaForUpdate(
SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatus::BOOTING)
);
return $this->catchUpSubscriptions($subscriptionsToCatchup, $progressCallback);
}
));
return $this->processExclusively(
fn () => $this->catchUpSubscriptions($criteria, SubscriptionStatusFilter::fromArray([SubscriptionStatus::BOOTING]), $progressCallback)
);
}

public function catchUpActive(SubscriptionEngineCriteria|null $criteria = null, \Closure $progressCallback = null): ProcessedResult
{
$criteria ??= SubscriptionEngineCriteria::noConstraints();
return $this->processExclusively(fn () => $this->subscriptionStore->transactional(
function () use ($criteria, $progressCallback) {
$this->logger?->info('Subscription Engine: Start catching up subscriptions in state "ACTIVE".');
$subscriptionsToCatchup = $this->subscriptionStore->findByCriteriaForUpdate(
SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatus::ACTIVE)
);
return $this->catchUpSubscriptions($subscriptionsToCatchup, $progressCallback);
}
));
return $this->processExclusively(
fn () => $this->catchUpSubscriptions($criteria, SubscriptionStatusFilter::fromArray([SubscriptionStatus::ACTIVE]), $progressCallback)
);
}

public function reactivate(SubscriptionEngineCriteria|null $criteria = null, \Closure $progressCallback = null): ProcessedResult
{
$criteria ??= SubscriptionEngineCriteria::noConstraints();
return $this->processExclusively(fn () => $this->subscriptionStore->transactional(
function () use ($criteria, $progressCallback) {
$this->logger?->info('Subscription Engine: Start catching up subscriptions in state "ACTIVE".');
$subscriptionsToCatchup = $this->subscriptionStore->findByCriteriaForUpdate(
SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, SubscriptionStatusFilter::fromArray([
SubscriptionStatus::ERROR,
SubscriptionStatus::DETACHED,
]))
);
return $this->catchUpSubscriptions($subscriptionsToCatchup, $progressCallback);
}
));
return $this->processExclusively(
fn () => $this->catchUpSubscriptions($criteria, SubscriptionStatusFilter::fromArray([SubscriptionStatus::ERROR, SubscriptionStatus::DETACHED]), $progressCallback)
);
}

public function reset(SubscriptionEngineCriteria|null $criteria = null): Result
Expand Down Expand Up @@ -294,95 +273,112 @@ private function resetSubscription(Subscription $subscription): ?Error
return null;
}

private function catchUpSubscriptions(Subscriptions $subscriptionsToCatchup, \Closure $progressClosure = null): ProcessedResult
private function catchUpSubscriptions(SubscriptionEngineCriteria $criteria, SubscriptionStatusFilter $status, \Closure $progressClosure = null): ProcessedResult
{
foreach ($subscriptionsToCatchup as $subscription) {
if (!$this->subscribers->contain($subscription->id)) {
// mark detached subscriptions as we cannot handle them and exclude them from catchup
$this->subscriptionStore->update(
$subscription->id,
status: SubscriptionStatus::DETACHED,
position: $subscription->position,
subscriptionError: null,
);
$this->logger?->info(sprintf('Subscription Engine: Subscriber for "%s" not found and has been marked as detached.', $subscription->id->value));
$subscriptionsToCatchup = $subscriptionsToCatchup->without($subscription->id);
}
}
$this->logger?->info(sprintf('Subscription Engine: Start catching up subscriptions in states %s.', join(',', $status->toStringArray())));
$subscriptionsToInvokeBeforeAndAfterCatchUpHooks = Subscriptions::none();

if ($subscriptionsToCatchup->isEmpty()) {
$this->logger?->info('Subscription Engine: No subscriptions matched criteria. Finishing catch up.');
return ProcessedResult::success(0);
}
$subscriptionCriteria = SubscriptionCriteria::forEngineCriteriaAndStatus($criteria, $status);

foreach ($subscriptionsToCatchup as $subscription) {
try {
$this->subscribers->get($subscription->id)->onBeforeCatchUp($subscription->status);
} catch (\Throwable $e) {
// analog to onAfterCatchUp, we tolerate no exceptions here and consider it a critical developer error.
$message = sprintf('Subscriber "%s" failed onBeforeCatchUp: %s', $subscription->id->value, $e->getMessage());
$this->logger?->critical($message);
throw new CatchUpFailed($message, 1732374000, $e);
$result = $this->subscriptionStore->transactional(function () use ($subscriptionCriteria, $progressClosure, &$subscriptionsToInvokeBeforeAndAfterCatchUpHooks) {
$subscriptionsToCatchup = $this->subscriptionStore->findByCriteriaForUpdate($subscriptionCriteria);
foreach ($subscriptionsToCatchup as $subscription) {
if (!$this->subscribers->contain($subscription->id)) {
// mark detached subscriptions as we cannot handle them and exclude them from catchup
$this->subscriptionStore->update(
$subscription->id,
status: SubscriptionStatus::DETACHED,
position: $subscription->position,
subscriptionError: null,
);
$this->logger?->info(sprintf('Subscription Engine: Subscriber for "%s" not found and has been marked as detached.', $subscription->id->value));
$subscriptionsToCatchup = $subscriptionsToCatchup->without($subscription->id);
}
}
}
$startSequenceNumber = $subscriptionsToCatchup->lowestPosition()?->next() ?? SequenceNumber::none();
$this->logger?->debug(sprintf('Subscription Engine: Event stream is processed from position %s.', $startSequenceNumber->value));

/** @var array<Error> $errors */
$errors = [];
$numberOfProcessedEvents = 0;
/** @var array<string,SequenceNumber> $highestSequenceNumberForSubscriber */
$highestSequenceNumberForSubscriber = [];

$eventStream = $this->eventStore->load(VirtualStreamName::all())->withMinimumSequenceNumber($startSequenceNumber);
foreach ($eventStream as $eventEnvelope) {
$sequenceNumber = $eventEnvelope->sequenceNumber;
if ($numberOfProcessedEvents > 0) {
$this->logger?->debug(sprintf('Subscription Engine: Current event stream position: %s', $sequenceNumber->value));
if ($subscriptionsToCatchup->isEmpty()) {
$this->logger?->info('Subscription Engine: No subscriptions matched criteria. Finishing catch up.');
return ProcessedResult::success(0);
}
if ($progressClosure !== null) {
$progressClosure($eventEnvelope);

$subscriptionsToInvokeBeforeAndAfterCatchUpHooks = $subscriptionsToCatchup;
foreach ($subscriptionsToInvokeBeforeAndAfterCatchUpHooks as $subscription) {
try {
$this->subscribers->get($subscription->id)->onBeforeCatchUp($subscription->status);
} catch (\Throwable $e) {
// analog to onAfterCatchUp, we tolerate no exceptions here and consider it a critical developer error.
$message = sprintf('Subscriber "%s" failed onBeforeCatchUp: %s', $subscription->id->value, $e->getMessage());
$this->logger?->critical($message);
throw new CatchUpFailed($message, 1732374000, $e); // todo throw here or in delegating hook?
}
}
$domainEvent = $this->eventNormalizer->denormalize($eventEnvelope->event);
foreach ($subscriptionsToCatchup as $subscription) {
if ($subscription->position->value >= $sequenceNumber->value) {
$this->logger?->debug(sprintf('Subscription Engine: Subscription "%s" is farther than the current position (%d >= %d), continue catch up.', $subscription->id->value, $subscription->position->value, $sequenceNumber->value));
continue;

$startSequenceNumber = $subscriptionsToCatchup->lowestPosition()?->next() ?? SequenceNumber::none();
$this->logger?->debug(sprintf('Subscription Engine: Event stream is processed from position %s.', $startSequenceNumber->value));

/** @var array<Error> $errors */
$errors = [];
$numberOfProcessedEvents = 0;
/** @var array<string,SequenceNumber> $highestSequenceNumberForSubscriber */
$highestSequenceNumberForSubscriber = [];

$eventStream = $this->eventStore->load(VirtualStreamName::all())->withMinimumSequenceNumber($startSequenceNumber);
foreach ($eventStream as $eventEnvelope) {
$sequenceNumber = $eventEnvelope->sequenceNumber;
if ($numberOfProcessedEvents > 0) {
$this->logger?->debug(sprintf('Subscription Engine: Current event stream position: %s', $sequenceNumber->value));
}
$error = $this->handleEvent($eventEnvelope, $domainEvent, $subscription->id);
if ($error !== null) {
// ERROR Case:
// 1.) for the leftover events we are not including this failed subscription for catchup
$subscriptionsToCatchup = $subscriptionsToCatchup->without($subscription->id);
// 2.) update the subscription error state on either its unchanged or new position (if some events worked)
$this->subscriptionStore->update(
$subscription->id,
status: SubscriptionStatus::ERROR,
position: $highestSequenceNumberForSubscriber[$subscription->id->value] ?? $subscription->position,
subscriptionError: SubscriptionError::fromPreviousStatusAndException(
$subscription->status,
$error->throwable
),
);
// 3.) invoke onAfterCatchUp, as onBeforeCatchUp was invoked already and to be consistent we want to "shutdown" this catchup iteration event though we know it failed
// todo put the ERROR $subscriptionStatus into the after hook, so it can properly be reacted upon
try {
$this->subscribers->get($subscription->id)->onAfterCatchUp();
} catch (\Throwable $e) {
// analog to onBeforeCatchUp, we tolerate no exceptions here and consider it a critical developer error.
$message = sprintf('Subscriber "%s" had an error and also failed onAfterCatchUp: %s', $subscription->id->value, $e->getMessage());
$this->logger?->critical($message);
throw new CatchUpFailed($message, 1732733740, $e);
if ($progressClosure !== null) {
$progressClosure($eventEnvelope);
}
$domainEvent = $this->eventNormalizer->denormalize($eventEnvelope->event);
foreach ($subscriptionsToCatchup as $subscription) {
if ($subscription->position->value >= $sequenceNumber->value) {
$this->logger?->debug(sprintf('Subscription Engine: Subscription "%s" is farther than the current position (%d >= %d), continue catch up.', $subscription->id->value, $subscription->position->value, $sequenceNumber->value));
continue;
}
$error = $this->handleEvent($eventEnvelope, $domainEvent, $subscription->id);
if ($error !== null) {
// ERROR Case:
// 1.) for the leftover events we are not including this failed subscription for catchup
$subscriptionsToCatchup = $subscriptionsToCatchup->without($subscription->id);
// 2.) update the subscription error state on either its unchanged or new position (if some events worked)
$this->subscriptionStore->update(
$subscription->id,
status: SubscriptionStatus::ERROR,
position: $highestSequenceNumberForSubscriber[$subscription->id->value] ?? $subscription->position,
subscriptionError: SubscriptionError::fromPreviousStatusAndException(
$subscription->status,
$error->throwable
),
);
$errors[] = $error;
continue;
}
$errors[] = $error;
continue;
// HAPPY Case:
$highestSequenceNumberForSubscriber[$subscription->id->value] = $eventEnvelope->sequenceNumber;
}
// HAPPY Case:
$highestSequenceNumberForSubscriber[$subscription->id->value] = $eventEnvelope->sequenceNumber;
$numberOfProcessedEvents++;
}
$numberOfProcessedEvents++;
}
foreach ($subscriptionsToCatchup as $subscription) {
foreach ($subscriptionsToCatchup as $subscription) {
// after catchup mark all subscriptions as active, so they are triggered automatically now.
// The position will be set to the one the subscriber handled last, or if no events were in the stream, and we booted we keep the persisted position
$this->subscriptionStore->update(
$subscription->id,
status: SubscriptionStatus::ACTIVE,
position: $highestSequenceNumberForSubscriber[$subscription->id->value] ?? $subscription->position,
subscriptionError: null,
);
if ($subscription->status !== SubscriptionStatus::ACTIVE) {
$this->logger?->info(sprintf('Subscription Engine: Subscription "%s" has been set to active after booting.', $subscription->id->value));
}
}
$this->logger?->info(sprintf('Subscription Engine: Finish catch up. %d processed events %d errors.', $numberOfProcessedEvents, count($errors)));
return $errors === [] ? ProcessedResult::success($numberOfProcessedEvents) : ProcessedResult::failed($numberOfProcessedEvents, Errors::fromArray($errors));
});

// todo do we need want to invoke for failed projections onAfterCatchUp, as onBeforeCatchUp was invoked already and to be consistent to "shutdown" this catchup iteration?
foreach ($subscriptionsToInvokeBeforeAndAfterCatchUpHooks as $subscription) {
try {
$this->subscribers->get($subscription->id)->onAfterCatchUp();
} catch (\Throwable $e) {
Expand All @@ -391,20 +387,9 @@ private function catchUpSubscriptions(Subscriptions $subscriptionsToCatchup, \Cl
$this->logger?->critical($message);
throw new CatchUpFailed($message, 1732374000, $e);
}
// after catchup mark all subscriptions as active, so they are triggered automatically now.
// The position will be set to the one the subscriber handled last, or if no events were in the stream, and we booted we keep the persisted position
$this->subscriptionStore->update(
$subscription->id,
status: SubscriptionStatus::ACTIVE,
position: $highestSequenceNumberForSubscriber[$subscription->id->value] ?? $subscription->position,
subscriptionError: null,
);
if ($subscription->status !== SubscriptionStatus::ACTIVE) {
$this->logger?->info(sprintf('Subscription Engine: Subscription "%s" has been set to active after booting.', $subscription->id->value));
}
}
$this->logger?->info(sprintf('Subscription Engine: Finish catch up. %d processed events %d errors.', $numberOfProcessedEvents, count($errors)));
return $errors === [] ? ProcessedResult::success($numberOfProcessedEvents) : ProcessedResult::failed($numberOfProcessedEvents, Errors::fromArray($errors));

return $result;
}

/**
Expand Down

0 comments on commit fd9faa4

Please sign in to comment.