From b85f7baca5ebc1b09a06348ac1f52dca965facfa Mon Sep 17 00:00:00 2001 From: Beno!t POLASZEK Date: Thu, 9 Nov 2023 15:48:18 +0100 Subject: [PATCH] feat: next tick --- README.md | 17 ++++++++++++ src/EtlExecutor.php | 16 ++++++++++- src/EtlState.php | 49 +++++++++++++++++++++++++-------- tests/Behavior/NextTickTest.php | 37 +++++++++++++++++++++++++ 4 files changed, 106 insertions(+), 13 deletions(-) create mode 100644 tests/Behavior/NextTickTest.php diff --git a/README.md b/README.md index a19253b..b00e152 100644 --- a/README.md +++ b/README.md @@ -209,6 +209,23 @@ $etl = (new EtlExecutor(options: new EtlConfiguration(flushFrequency: 10))) ); ``` +Next tick +--------- + +You can also access the `EtlState` instance of the next item to be processed, for example to trigger +an early flush on the next item, or to stop the whole process once the current item will be loaded. + +Example: +```php +$etl = $etl->onExtract(function (ExtractEvent $event) { + if (/* some reason */) { + $event->state->flush(); + // $event->state->stop(); // This would actually stop immediately + $event->state->nextTick(fn (EtlState $state) => $state->stop()); // Stop at the next iteration instead + } +}); +``` + Recipes ------- diff --git a/src/EtlExecutor.php b/src/EtlExecutor.php index 6ac1f6a..744a88d 100644 --- a/src/EtlExecutor.php +++ b/src/EtlExecutor.php @@ -76,7 +76,11 @@ public function process(mixed $source = null, mixed $destination = null, array $ try { $this->dispatch(new InitEvent($state)); - foreach ($this->extract($stateHolder) as $extractedItem) { + foreach ($this->extract($stateHolder) as $e => $extractedItem) { + $state = unref($stateHolder); + if (0 !== $e) { + $this->consumeNextTick($state); + } try { $transformedItems = $this->transform($extractedItem, $state); $this->load($transformedItems, $stateHolder); @@ -86,6 +90,7 @@ public function process(mixed $source = null, mixed $destination = null, array $ } catch (StopRequest) { } + $this->consumeNextTick($state); $output = $this->flush($stateHolder, false); $state = unref($stateHolder); @@ -103,6 +108,15 @@ public function process(mixed $source = null, mixed $destination = null, array $ return $state; } + private function consumeNextTick(EtlState $state): void + { + if (null === $state->nextTickCallback) { + return; + } + + ($state->nextTickCallback)($state); + } + /** * @param Ref $stateHolder */ diff --git a/src/EtlState.php b/src/EtlState.php index 447afad..515e571 100644 --- a/src/EtlState.php +++ b/src/EtlState.php @@ -7,12 +7,21 @@ use BenTools\ETL\Exception\SkipRequest; use BenTools\ETL\Exception\StopRequest; use BenTools\ETL\Internal\ClonableTrait; +use Closure; use DateTimeImmutable; final class EtlState { use ClonableTrait; + /** + * @internal + */ + public ?Closure $nextTickCallback = null; + + private int $nbLoadedItemsSinceLastFlush = 0; + private bool $earlyFlush = false; + /** * @param array $context */ @@ -29,11 +38,23 @@ public function __construct( public readonly DateTimeImmutable $startedAt = new DateTimeImmutable(), public readonly ?DateTimeImmutable $endedAt = null, public readonly mixed $output = null, - private readonly int $nbLoadedItemsSinceLastFlush = 0, - private bool $earlyFlush = false, ) { } + public function nextTick(?callable $callback): void + { + if (null === $callback) { + $this->nextTickCallback = null; + + return; + } + + $this->nextTickCallback = static function (EtlState $state) use ($callback) { + $callback($state); + $state->nextTick(null); + }; + } + /** * Flush after current item. */ @@ -58,16 +79,6 @@ public function stop(): void throw new StopRequest(); } - public function shouldFlush(): bool - { - if (INF === $this->options->flushFrequency) { - return false; - } - - return $this->earlyFlush - || (0 === ($this->nbLoadedItemsSinceLastFlush % $this->options->flushFrequency)); - } - public function getDuration(): float { $endedAt = $this->endedAt ?? new DateTimeImmutable(); @@ -75,6 +86,20 @@ public function getDuration(): float return (float) ($endedAt->format('U.u') - $this->startedAt->format('U.u')); } + /** + * @internal + */ + public function shouldFlush(): bool + { + return match (true) { + INF === $this->options->flushFrequency => false, + 0 === $this->nbLoadedItemsSinceLastFlush => false, + 0 === ($this->nbLoadedItemsSinceLastFlush % $this->options->flushFrequency) => true, + $this->earlyFlush => true, + default => false, + }; + } + /** * @internal */ diff --git a/tests/Behavior/NextTickTest.php b/tests/Behavior/NextTickTest.php new file mode 100644 index 0000000..64b43be --- /dev/null +++ b/tests/Behavior/NextTickTest.php @@ -0,0 +1,37 @@ + 3]; + $etl = (new EtlExecutor(loader: $loader, options: new EtlConfiguration(...$options))) + ->onExtract(function (ExtractEvent $event) { + // Let's trigger an early flush after the NEXT item (apple) + if ('banana' === $event->item) { + $event->state->nextTick(fn (EtlState $state) => $state->flush()); + } + }) + ; + + // When + $report = $etl->process(['banana', 'apple', 'strawberry', 'raspberry', 'peach']); + + // Then + expect($report->output)->toBeArray() + ->and($report->output)->toHaveCount(2) + ->and($report->output[0])->toBe(['banana', 'apple']) + ->and($report->output[1])->toBe(['strawberry', 'raspberry', 'peach']); +});