Skip to content

Commit

Permalink
Refactor: Extraction (#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
bpolaszek authored Nov 15, 2023
1 parent 6f30d0a commit 25c79d5
Show file tree
Hide file tree
Showing 26 changed files with 296 additions and 263 deletions.
154 changes: 73 additions & 81 deletions src/EtlExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,33 @@
use BenTools\ETL\EventDispatcher\Event\TransformEvent;
use BenTools\ETL\EventDispatcher\EventDispatcher;
use BenTools\ETL\EventDispatcher\PrioritizedListenerProvider;
use BenTools\ETL\Exception\ExtractException;
use BenTools\ETL\Exception\FlushException;
use BenTools\ETL\Exception\LoadException;
use BenTools\ETL\Exception\SkipRequest;
use BenTools\ETL\Exception\StopRequest;
use BenTools\ETL\Exception\TransformException;
use BenTools\ETL\Extractor\ExtractorInterface;
use BenTools\ETL\Extractor\ExtractorProcessorInterface;
use BenTools\ETL\Extractor\IterableExtractor;
use BenTools\ETL\Extractor\IterableExtractorProcessor;
use BenTools\ETL\Internal\ClonableTrait;
use BenTools\ETL\Internal\ConditionalLoaderTrait;
use BenTools\ETL\Internal\EtlBuilderTrait;
use BenTools\ETL\Internal\EtlExceptionsTrait;
use BenTools\ETL\Internal\Ref;
use BenTools\ETL\Internal\TransformResult;
use BenTools\ETL\Loader\InMemoryLoader;
use BenTools\ETL\Loader\LoaderInterface;
use BenTools\ETL\Transformer\NullTransformer;
use BenTools\ETL\Transformer\TransformerInterface;
use Generator;
use Psr\EventDispatcher\EventDispatcherInterface;
use Throwable;

use function count;
use function gc_collect_cycles;
use function is_countable;
use function get_debug_type;
use function sprintf;

final class EtlExecutor
final class EtlExecutor implements EventDispatcherInterface
{
use ClonableTrait;

Expand All @@ -44,20 +48,16 @@ final class EtlExecutor
*/
use EtlBuilderTrait;

/**
* @use EtlExceptionsTrait<self>
*/
use EtlExceptionsTrait;

use ConditionalLoaderTrait;

private EventDispatcherInterface $eventDispatcher;
private EventDispatcher $eventDispatcher;

public function __construct(
public readonly ExtractorInterface $extractor = new IterableExtractor(),
public readonly TransformerInterface $transformer = new NullTransformer(),
public readonly LoaderInterface $loader = new InMemoryLoader(),
public readonly EtlConfiguration $options = new EtlConfiguration(),
public readonly ExtractorProcessorInterface $processor = new IterableExtractorProcessor(),
) {
$this->listenerProvider = new PrioritizedListenerProvider();
$this->eventDispatcher = new EventDispatcher($this->listenerProvider);
Expand All @@ -71,77 +71,48 @@ public function __construct(
public function process(mixed $source = null, mixed $destination = null, array $context = []): EtlState
{
$state = new EtlState(options: $this->options, source: $source, destination: $destination, context: $context);
$stateHolder = ref($state);

try {
$this->dispatch(new InitEvent($state));
$items = $this->extractor->extract($state);

foreach ($this->extract($stateHolder) as $e => $extractedItem) {
$state = unref($stateHolder);
if (0 !== $e) {
$this->consumeNextTick($state);
}
try {
$transformedItems = $this->transform($extractedItem, $state);
$this->load($transformedItems, $stateHolder);
} catch (SkipRequest) {
}
$state = $state->getLastVersion();
if (is_countable($items)) {
$state = $state->update($state->withNbTotalItems(count($items)));
}
} catch (StopRequest) {
}
$this->dispatch(new StartEvent($state));

$this->consumeNextTick($state);
$output = $this->flush($stateHolder, false);
if (!$this->processor->supports($items)) {
throw new ExtractException(sprintf('Current processor %s cannot process data of type: %s.', $this->processor::class, get_debug_type($items)));
}

$state = unref($stateHolder);
if (!$state->nbTotalItems) {
$state = $state->withNbTotalItems($state->nbLoadedItems);
$stateHolder->update($state);
$this->processor->process($this, $state, $items);
} catch (StopRequest) {
}

$state = $state->withOutput($output);
$stateHolder->update($state);
$this->dispatch(new EndEvent($state));

gc_collect_cycles();

return $state;
return $this->terminate($state->getLastVersion());
}

private function consumeNextTick(EtlState $state): void
public function processItem(mixed $item, mixed $key, EtlState $state): void
{
foreach ($state->nextTickCallbacks as $callback) {
($callback)($state);
$state->nextTickCallbacks->detach($callback);
$state = $state->update($state->getLastVersion()->withUpdatedItemKey($key));
if ($state->currentItemIndex > 0) {
$this->consumeNextTick($state);
}
$event = $this->dispatch(new ExtractEvent($state, $item));
$item = $event->item;
$itemsToLoad = $this->transform($item, $state);
$this->load($itemsToLoad, $state);
}

/**
* @param Ref<EtlState> $stateHolder
* @internal
*/
private function extract(Ref $stateHolder): Generator
private function consumeNextTick(EtlState $state): void
{
$state = unref($stateHolder);
try {
$items = $this->extractor->extract($state);
if (is_countable($items)) {
$state = $state->withNbTotalItems(count($items));
$stateHolder->update($state);
}
$this->dispatch(new StartEvent($state));
foreach ($items as $key => $value) {
try {
$state = unref($stateHolder)->withUpdatedItemKey($key);
$stateHolder->update($state);
$event = $this->dispatch(new ExtractEvent($state, $value));
yield $event->item;
} catch (SkipRequest) {
}
}
} catch (StopRequest) {
return;
} catch (Throwable $exception) {
$this->throwExtractException($exception, unref($stateHolder));
foreach ($state->nextTickCallbacks as $callback) {
($callback)($state);
$state->nextTickCallbacks->detach($callback);
}
}

Expand All @@ -160,44 +131,42 @@ private function transform(mixed $item, EtlState $state): array
} catch (SkipRequest|StopRequest $e) {
throw $e;
} catch (Throwable $e) {
$this->throwTransformException($e, $state);
TransformException::emit($this->eventDispatcher, $e, $state);
}

return [];
}

/**
* @param list<mixed> $items
* @param Ref<EtlState> $stateHolder
* @param list<mixed> $items
*
* @internal
*/
private function load(array $items, Ref $stateHolder): void
private function load(array $items, EtlState $state): void
{
$state = unref($stateHolder);
try {
foreach ($items as $item) {
if (!self::shouldLoad($this->loader, $item, $state)) {
continue;
}
$this->loader->load($item, $state);
$state = $state->withIncrementedNbLoadedItems();
$stateHolder->update($state);
$state = $state->update($state->getLastVersion()->withIncrementedNbLoadedItems());
$this->dispatch(new LoadEvent($state, $item));
}
} catch (SkipRequest|StopRequest $e) {
throw $e;
} catch (Throwable $e) {
$this->throwLoadException($e, unref($stateHolder));
LoadException::emit($this->eventDispatcher, $e, $state);
}

$this->flush($stateHolder, true);
$this->flush($state->getLastVersion(), true);
}

/**
* @param Ref<EtlState> $stateHolder
* @internal
*/
private function flush(Ref $stateHolder, bool $isPartial): mixed
private function flush(EtlState $state, bool $isPartial): mixed
{
$state = unref($stateHolder);
if ($isPartial && !$state->shouldFlush()) {
return null;
}
Expand All @@ -211,22 +180,45 @@ private function flush(Ref $stateHolder, bool $isPartial): mixed
try {
$output = $this->loader->flush($isPartial, $state);
} catch (Throwable $e) {
$this->throwFlushException($e, $state);
FlushException::emit($this->eventDispatcher, $e, $state);
}
$this->dispatch(new FlushEvent($state, $isPartial, $output));
$stateHolder->update($state->withClearedFlush());
$state->update($state->withClearedFlush());

return $output;
}

/**
* @template T of object
*
* @internal
*/
private function terminate(EtlState $state): EtlState
{
$this->consumeNextTick($state);
$output = $this->flush($state->getLastVersion(), false);

$state = $state->getLastVersion();
if (!$state->nbTotalItems) {
$state = $state->update($state->withNbTotalItems($state->nbLoadedItems));
}

$state = $state->update($state->withOutput($output));
$this->dispatch(new EndEvent($state->getLastVersion()));

gc_collect_cycles();

return $state;
}

/**
* @param T $event
*
* @return T
*
* @internal
*
* @template T of object
*/
private function dispatch(object $event): object
public function dispatch(object $event): object
{
$this->eventDispatcher->dispatch($event);

Expand Down
21 changes: 21 additions & 0 deletions src/EtlState.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use BenTools\ETL\Exception\SkipRequest;
use BenTools\ETL\Exception\StopRequest;
use BenTools\ETL\Internal\ClonableTrait;
use BenTools\ETL\Internal\StateHolder;
use Closure;
use DateTimeImmutable;
use SplObjectStorage;
Expand Down Expand Up @@ -41,8 +42,28 @@ public function __construct(
public readonly DateTimeImmutable $startedAt = new DateTimeImmutable(),
public readonly ?DateTimeImmutable $endedAt = null,
public readonly mixed $output = null,
public readonly StateHolder $stateHolder = new StateHolder(),
) {
$this->nextTickCallbacks ??= new SplObjectStorage();
$this->stateHolder->state ??= $this;
}

/**
* @internal
*/
public function getLastVersion(): self
{
return $this->stateHolder->state;
}

/**
* @internal
*/
public function update(self $state): self
{
$this->stateHolder->state = $state;

return $state;
}

public function nextTick(callable $callback): void
Expand Down
13 changes: 13 additions & 0 deletions src/Exception/ExtractException.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,19 @@

namespace BenTools\ETL\Exception;

use BenTools\ETL\EtlState;
use BenTools\ETL\EventDispatcher\Event\ExtractExceptionEvent;
use Psr\EventDispatcher\EventDispatcherInterface;
use Throwable;

final class ExtractException extends EtlException
{
public static function emit(EventDispatcherInterface $bus, Throwable $exception, EtlState $state): never
{
if (!$exception instanceof self) {
$exception = new self('Error during extraction.', previous: $exception);
}

throw $bus->dispatch(new ExtractExceptionEvent($state->getLastVersion(), $exception))->exception;
}
}
17 changes: 17 additions & 0 deletions src/Exception/FlushException.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,23 @@

namespace BenTools\ETL\Exception;

use BenTools\ETL\EtlState;
use BenTools\ETL\EventDispatcher\Event\FlushExceptionEvent;
use Psr\EventDispatcher\EventDispatcherInterface;
use Throwable;

final class FlushException extends EtlException
{
public static function emit(EventDispatcherInterface $bus, Throwable $exception, EtlState $state): void
{
if (!$exception instanceof self) {
$exception = new self('Error during flush.', previous: $exception);
}

$exception = $bus->dispatch(new FlushExceptionEvent($state, $exception))->exception;

if ($exception) {
throw $exception;
}
}
}
17 changes: 17 additions & 0 deletions src/Exception/LoadException.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,23 @@

namespace BenTools\ETL\Exception;

use BenTools\ETL\EtlState;
use BenTools\ETL\EventDispatcher\Event\LoadExceptionEvent;
use Psr\EventDispatcher\EventDispatcherInterface;
use Throwable;

final class LoadException extends EtlException
{
public static function emit(EventDispatcherInterface $bus, Throwable $exception, EtlState $state): void
{
if (!$exception instanceof self) {
$exception = new self('Error during loading.', previous: $exception);
}

$exception = $bus->dispatch(new LoadExceptionEvent($state->getLastVersion(), $exception))->exception;

if ($exception) {
throw $exception;
}
}
}
17 changes: 17 additions & 0 deletions src/Exception/TransformException.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,23 @@

namespace BenTools\ETL\Exception;

use BenTools\ETL\EtlState;
use BenTools\ETL\EventDispatcher\Event\TransformExceptionEvent;
use Psr\EventDispatcher\EventDispatcherInterface;
use Throwable;

final class TransformException extends EtlException
{
public static function emit(EventDispatcherInterface $bus, Throwable $exception, EtlState $state): void
{
if (!$exception instanceof self) {
$exception = new self('Error during transformation.', previous: $exception);
}

$exception = $bus->dispatch(new TransformExceptionEvent($state->getLastVersion(), $exception))->exception;

if ($exception) {
throw $exception;
}
}
}
Loading

0 comments on commit 25c79d5

Please sign in to comment.