Skip to content

Commit

Permalink
feat: next tick
Browse files Browse the repository at this point in the history
  • Loading branch information
bpolaszek committed Nov 9, 2023
1 parent 954c2fa commit b85f7ba
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 13 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,23 @@ $etl = (new EtlExecutor(options: new EtlConfiguration(flushFrequency: 10)))
);
```

Next tick
---------

You can also access the `EtlState` instance of the next item to be processed, for example to trigger
an early flush on the next item, or to stop the whole process once the current item will be loaded.

Example:
```php
$etl = $etl->onExtract(function (ExtractEvent $event) {
if (/* some reason */) {
$event->state->flush();
// $event->state->stop(); // This would actually stop immediately
$event->state->nextTick(fn (EtlState $state) => $state->stop()); // Stop at the next iteration instead
}
});
```

Recipes
-------

Expand Down
16 changes: 15 additions & 1 deletion src/EtlExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ public function process(mixed $source = null, mixed $destination = null, array $
try {
$this->dispatch(new InitEvent($state));

foreach ($this->extract($stateHolder) as $extractedItem) {
foreach ($this->extract($stateHolder) as $e => $extractedItem) {
$state = unref($stateHolder);
if (0 !== $e) {
$this->consumeNextTick($state);
}
try {
$transformedItems = $this->transform($extractedItem, $state);
$this->load($transformedItems, $stateHolder);
Expand All @@ -86,6 +90,7 @@ public function process(mixed $source = null, mixed $destination = null, array $
} catch (StopRequest) {
}

$this->consumeNextTick($state);
$output = $this->flush($stateHolder, false);

$state = unref($stateHolder);
Expand All @@ -103,6 +108,15 @@ public function process(mixed $source = null, mixed $destination = null, array $
return $state;
}

private function consumeNextTick(EtlState $state): void
{
if (null === $state->nextTickCallback) {
return;
}

($state->nextTickCallback)($state);
}

/**
* @param Ref<EtlState> $stateHolder
*/
Expand Down
49 changes: 37 additions & 12 deletions src/EtlState.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,21 @@
use BenTools\ETL\Exception\SkipRequest;
use BenTools\ETL\Exception\StopRequest;
use BenTools\ETL\Internal\ClonableTrait;
use Closure;
use DateTimeImmutable;

final class EtlState
{
use ClonableTrait;

/**
* @internal
*/
public ?Closure $nextTickCallback = null;

private int $nbLoadedItemsSinceLastFlush = 0;
private bool $earlyFlush = false;

/**
* @param array<string, mixed> $context
*/
Expand All @@ -29,11 +38,23 @@ public function __construct(
public readonly DateTimeImmutable $startedAt = new DateTimeImmutable(),
public readonly ?DateTimeImmutable $endedAt = null,
public readonly mixed $output = null,
private readonly int $nbLoadedItemsSinceLastFlush = 0,
private bool $earlyFlush = false,
) {
}

public function nextTick(?callable $callback): void
{
if (null === $callback) {
$this->nextTickCallback = null;

return;
}

$this->nextTickCallback = static function (EtlState $state) use ($callback) {
$callback($state);
$state->nextTick(null);
};
}

/**
* Flush after current item.
*/
Expand All @@ -58,23 +79,27 @@ public function stop(): void
throw new StopRequest();
}

public function shouldFlush(): bool
{
if (INF === $this->options->flushFrequency) {
return false;
}

return $this->earlyFlush
|| (0 === ($this->nbLoadedItemsSinceLastFlush % $this->options->flushFrequency));
}

public function getDuration(): float
{
$endedAt = $this->endedAt ?? new DateTimeImmutable();

return (float) ($endedAt->format('U.u') - $this->startedAt->format('U.u'));
}

/**
* @internal
*/
public function shouldFlush(): bool
{
return match (true) {
INF === $this->options->flushFrequency => false,
0 === $this->nbLoadedItemsSinceLastFlush => false,
0 === ($this->nbLoadedItemsSinceLastFlush % $this->options->flushFrequency) => true,
$this->earlyFlush => true,
default => false,
};
}

/**
* @internal
*/
Expand Down
37 changes: 37 additions & 0 deletions tests/Behavior/NextTickTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

declare(strict_types=1);

namespace BenTools\ETL\Tests\Behavior;

use BenTools\ETL\EtlConfiguration;
use BenTools\ETL\EtlExecutor;
use BenTools\ETL\EtlState;
use BenTools\ETL\EventDispatcher\Event\ExtractEvent;
use BenTools\ETL\Tests\InMemoryLoader;

use function expect;

it('does arbitrary stuff on next tick', function () {
$loader = new InMemoryLoader();

// Given
$options = ['flushEvery' => 3];
$etl = (new EtlExecutor(loader: $loader, options: new EtlConfiguration(...$options)))
->onExtract(function (ExtractEvent $event) {
// Let's trigger an early flush after the NEXT item (apple)
if ('banana' === $event->item) {
$event->state->nextTick(fn (EtlState $state) => $state->flush());
}
})
;

// When
$report = $etl->process(['banana', 'apple', 'strawberry', 'raspberry', 'peach']);

// Then
expect($report->output)->toBeArray()
->and($report->output)->toHaveCount(2)
->and($report->output[0])->toBe(['banana', 'apple'])
->and($report->output[1])->toBe(['strawberry', 'raspberry', 'peach']);
});

0 comments on commit b85f7ba

Please sign in to comment.