Skip to content

Commit

Permalink
Feat: Next tick (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
bpolaszek authored Nov 10, 2023
1 parent 643db21 commit b8df541
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 13 deletions.
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,29 @@ $etl = (new EtlExecutor(options: new EtlConfiguration(flushFrequency: 10)))
);
```

Next tick
---------

You can also access the `EtlState` instance of the next item to be processed, for example to trigger
an early flush on the next item, or to stop the whole process once the current item will be loaded.

Example:

```php
use BenTools\ETL\EventDispatcher\Event\LoadEvent;

$etl = $etl->onLoad(function (LoadEvent $event) {
$item = $event->item;
if (/* some reason */) {
$event->state->flush(); // Request early flush after loading
$event->state->nextTick(function (EtlState $state) use ($item) {
// $item will be flushed, so we can do something with it
var_dump($item->id);
});
}
});
```

Recipes
-------

Expand Down
16 changes: 15 additions & 1 deletion src/EtlExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ public function process(mixed $source = null, mixed $destination = null, array $
try {
$this->dispatch(new InitEvent($state));

foreach ($this->extract($stateHolder) as $extractedItem) {
foreach ($this->extract($stateHolder) as $e => $extractedItem) {
$state = unref($stateHolder);
if (0 !== $e) {
$this->consumeNextTick($state);
}
try {
$transformedItems = $this->transform($extractedItem, $state);
$this->load($transformedItems, $stateHolder);
Expand All @@ -86,6 +90,7 @@ public function process(mixed $source = null, mixed $destination = null, array $
} catch (StopRequest) {
}

$this->consumeNextTick($state);
$output = $this->flush($stateHolder, false);

$state = unref($stateHolder);
Expand All @@ -103,6 +108,15 @@ public function process(mixed $source = null, mixed $destination = null, array $
return $state;
}

private function consumeNextTick(EtlState $state): void
{
if (null === $state->nextTickCallback) {
return;
}

($state->nextTickCallback)($state);
}

/**
* @param Ref<EtlState> $stateHolder
*/
Expand Down
49 changes: 37 additions & 12 deletions src/EtlState.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,21 @@
use BenTools\ETL\Exception\SkipRequest;
use BenTools\ETL\Exception\StopRequest;
use BenTools\ETL\Internal\ClonableTrait;
use Closure;
use DateTimeImmutable;

final class EtlState
{
use ClonableTrait;

/**
* @internal
*/
public ?Closure $nextTickCallback = null;

private int $nbLoadedItemsSinceLastFlush = 0;
private bool $earlyFlush = false;

/**
* @param array<string, mixed> $context
*/
Expand All @@ -29,11 +38,23 @@ public function __construct(
public readonly DateTimeImmutable $startedAt = new DateTimeImmutable(),
public readonly ?DateTimeImmutable $endedAt = null,
public readonly mixed $output = null,
private readonly int $nbLoadedItemsSinceLastFlush = 0,
private bool $earlyFlush = false,
) {
}

public function nextTick(?callable $callback): void
{
if (null === $callback) {
$this->nextTickCallback = null;

return;
}

$this->nextTickCallback = static function (EtlState $state) use ($callback) {
$callback($state);
$state->nextTick(null);
};
}

/**
* Flush after current item.
*/
Expand All @@ -58,23 +79,27 @@ public function stop(): void
throw new StopRequest();
}

public function shouldFlush(): bool
{
if (INF === $this->options->flushFrequency) {
return false;
}

return $this->earlyFlush
|| (0 === ($this->nbLoadedItemsSinceLastFlush % $this->options->flushFrequency));
}

public function getDuration(): float
{
$endedAt = $this->endedAt ?? new DateTimeImmutable();

return (float) ($endedAt->format('U.u') - $this->startedAt->format('U.u'));
}

/**
* @internal
*/
public function shouldFlush(): bool
{
return match (true) {
INF === $this->options->flushFrequency => false,
0 === $this->nbLoadedItemsSinceLastFlush => false,
0 === ($this->nbLoadedItemsSinceLastFlush % $this->options->flushFrequency) => true,
$this->earlyFlush => true,
default => false,
};
}

/**
* @internal
*/
Expand Down
37 changes: 37 additions & 0 deletions tests/Behavior/NextTickTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

declare(strict_types=1);

namespace BenTools\ETL\Tests\Behavior;

use BenTools\ETL\EtlConfiguration;
use BenTools\ETL\EtlExecutor;
use BenTools\ETL\EtlState;
use BenTools\ETL\EventDispatcher\Event\ExtractEvent;
use BenTools\ETL\Tests\InMemoryLoader;

use function expect;

it('does arbitrary stuff on next tick', function () {
$loader = new InMemoryLoader();

// Given
$options = ['flushEvery' => 3];
$etl = (new EtlExecutor(loader: $loader, options: new EtlConfiguration(...$options)))
->onExtract(function (ExtractEvent $event) {
// Let's trigger an early flush after the NEXT item (apple)
if ('banana' === $event->item) {
$event->state->nextTick(fn (EtlState $state) => $state->flush());
}
})
;

// When
$report = $etl->process(['banana', 'apple', 'strawberry', 'raspberry', 'peach']);

// Then
expect($report->output)->toBeArray()
->and($report->output)->toHaveCount(2)
->and($report->output[0])->toBe(['banana', 'apple'])
->and($report->output[1])->toBe(['strawberry', 'raspberry', 'peach']);
});

0 comments on commit b8df541

Please sign in to comment.