Skip to content

Commit

Permalink
Refactor: Improve performance (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
bpolaszek authored Nov 17, 2023
1 parent 443e0ac commit 80296e6
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 39 deletions.
54 changes: 20 additions & 34 deletions src/EtlExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@

namespace BenTools\ETL;

use BenTools\ETL\EventDispatcher\Event\BeforeLoadEvent;
use BenTools\ETL\EventDispatcher\Event\EndEvent;
use BenTools\ETL\EventDispatcher\Event\ExtractEvent;
use BenTools\ETL\EventDispatcher\Event\FlushEvent;
use BenTools\ETL\EventDispatcher\Event\InitEvent;
use BenTools\ETL\EventDispatcher\Event\LoadEvent;
use BenTools\ETL\EventDispatcher\Event\StartEvent;
use BenTools\ETL\EventDispatcher\Event\TransformEvent;
use BenTools\ETL\EventDispatcher\EventDispatcher;
use BenTools\ETL\EventDispatcher\PrioritizedListenerProvider;
use BenTools\ETL\Exception\ExtractException;
Expand All @@ -24,6 +21,7 @@
use BenTools\ETL\Extractor\IterableExtractor;
use BenTools\ETL\Internal\ClonableTrait;
use BenTools\ETL\Internal\ConditionalLoaderTrait;
use BenTools\ETL\Internal\DispatchEventsTrait;
use BenTools\ETL\Internal\EtlBuilderTrait;
use BenTools\ETL\Internal\TransformResult;
use BenTools\ETL\Loader\InMemoryLoader;
Expand All @@ -49,6 +47,11 @@ final class EtlExecutor implements EventDispatcherInterface
*/
use EtlBuilderTrait;

/**
* @use DispatchEventsTrait<self>
*/
use DispatchEventsTrait;

use ConditionalLoaderTrait;

private EventDispatcher $eventDispatcher;
Expand All @@ -74,14 +77,14 @@ public function process(mixed $source = null, mixed $destination = null, array $
$state = new EtlState(options: $this->options, source: $source, destination: $destination, context: $context);

try {
$this->dispatch(new InitEvent($state));
$this->emit(InitEvent::class, $state);
$items = $this->extractor->extract($state);

$state = $state->getLastVersion();
if (is_countable($items)) {
$state = $state->update($state->withNbTotalItems(count($items)));
}
$this->dispatch(new StartEvent($state));
$this->emit(StartEvent::class, $state);

if (!$this->processor->supports($items)) {
throw new ExtractException(sprintf('Current processor %s cannot process data of type: %s.', $this->processor::class, get_debug_type($items)));
Expand All @@ -100,8 +103,7 @@ public function processItem(mixed $item, mixed $key, EtlState $state): void
if ($state->currentItemIndex > 0) {
$this->consumeNextTick($state);
}
$event = $this->dispatch(new ExtractEvent($state, $item));
$item = $event->item;
$item = $this->emitExtractEvent($state, $item);
$itemsToLoad = $this->transform($item, $state);
$this->load($itemsToLoad, $state);
}
Expand All @@ -123,10 +125,10 @@ private function consumeNextTick(EtlState $state): void
private function transform(mixed $item, EtlState $state): array
{
try {
$transformResult = TransformResult::create($this->transformer->transform($item, $state));

$event = $this->dispatch(new TransformEvent($state, $transformResult));
$transformResult = TransformResult::create($event->transformResult);
$transformResult = $this->emitTransformEvent(
$state,
TransformResult::create($this->transformer->transform($item, $state)),
);

return [...$transformResult];
} catch (SkipRequest|StopRequest $e) {
Expand All @@ -151,15 +153,15 @@ private function load(array $items, EtlState $state): void
continue;
}
try {
$item = $this->dispatch(new BeforeLoadEvent($state, $item))->item;
$item = $this->emitBeforeLoadEvent($state, $item);
} catch (SkipRequest) {
continue;
} catch (StopRequest) {
break;
}
$this->loader->load($item, $state);
$state = $state->update($state->getLastVersion()->withIncrementedNbLoadedItems());
$this->dispatch(new LoadEvent($state, $item));
$this->emit(LoadEvent::class, $state, $item);
}
} catch (SkipRequest|StopRequest $e) {
throw $e;
Expand All @@ -173,9 +175,9 @@ private function load(array $items, EtlState $state): void
/**
* @internal
*/
private function flush(EtlState $state, bool $isPartial): mixed
private function flush(EtlState $state, bool $early): mixed
{
if ($isPartial && !$state->shouldFlush()) {
if ($early && !$state->shouldFlush()) {
return null;
}

Expand All @@ -186,11 +188,11 @@ private function flush(EtlState $state, bool $isPartial): mixed
$output = null;
$state->flush();
try {
$output = $this->loader->flush($isPartial, $state);
$output = $this->loader->flush($early, $state);
} catch (Throwable $e) {
FlushException::emit($this->eventDispatcher, $e, $state);
}
$this->dispatch(new FlushEvent($state, $isPartial, $output));
$this->emit(FlushEvent::class, $state, $early, $output);
$state->update($state->withClearedFlush());

return $output;
Expand All @@ -210,26 +212,10 @@ private function terminate(EtlState $state): EtlState
}

$state = $state->update($state->withOutput($output));
$this->dispatch(new EndEvent($state));
$this->emit(EndEvent::class, $state);

gc_collect_cycles();

return $state;
}

/**
* @internal
*
* @param T $event
*
* @return T
*
* @template T of object
*/
public function dispatch(object $event): object
{
$this->eventDispatcher->dispatch($event);

return $event;
}
}
2 changes: 1 addition & 1 deletion src/EventDispatcher/Event/FlushEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ final class FlushEvent extends Event implements StoppableEventInterface
public function __construct(
public readonly EtlState $state,
public readonly bool $early,
public mixed $output,
public readonly mixed $output,
) {
}
}
5 changes: 5 additions & 0 deletions src/EventDispatcher/PrioritizedListenerProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ public function listenTo(string $eventClass, callable $callback, int $priority =
$this->flattenedListeners[$eventClass] = array_merge(...$this->prioritizedListeners[$eventClass]);
}

public function hasListeners(string $eventClass): bool
{
return isset($this->flattenedListeners[$eventClass]);
}

/**
* @return iterable<callable>
*/
Expand Down
3 changes: 3 additions & 0 deletions src/Internal/ConditionalLoaderTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
use BenTools\ETL\Loader\ConditionalLoaderInterface;
use BenTools\ETL\Loader\LoaderInterface;

/**
* @internal
*/
trait ConditionalLoaderTrait
{
private static function shouldLoad(LoaderInterface $loader, mixed $item, EtlState $state): bool
Expand Down
72 changes: 72 additions & 0 deletions src/Internal/DispatchEventsTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
<?php

declare(strict_types=1);

namespace BenTools\ETL\Internal;

use BenTools\ETL\EtlState;
use BenTools\ETL\EventDispatcher\Event\BeforeLoadEvent;
use BenTools\ETL\EventDispatcher\Event\Event;
use BenTools\ETL\EventDispatcher\Event\ExtractEvent;
use BenTools\ETL\EventDispatcher\Event\TransformEvent;

/**
* @internal
*
* @template EtlExecutor
*/
trait DispatchEventsTrait
{
/**
* @internal
*
* @template T of object
*
* @param T $event
*
* @return T
*/
public function dispatch(object $event): object
{
$this->eventDispatcher->dispatch($event);

return $event;
}

/**
* @template E of Event
*
* @param class-string<E> $eventClass
*
* @return E|null
*/
private function emit(string $eventClass, EtlState $state, mixed ...$args): ?Event
{
if (!$this->listenerProvider->hasListeners($eventClass)) {
return null;
}

return $this->dispatch(new $eventClass($state, ...$args));
}

private function emitExtractEvent(EtlState $state, mixed $item): mixed
{
$event = $this->emit(ExtractEvent::class, $state, $item);

return $event?->item ?? $item;
}

private function emitTransformEvent(EtlState $state, TransformResult $transformResult): TransformResult
{
$event = $this->emit(TransformEvent::class, $state, $transformResult);

return TransformResult::create($event?->transformResult ?? $transformResult);
}

private function emitBeforeLoadEvent(EtlState $state, mixed $item): mixed
{
$event = $this->emit(BeforeLoadEvent::class, $state, $item);

return $event?->item ?? $item;
}
}
5 changes: 3 additions & 2 deletions src/Internal/EtlBuilderTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace BenTools\ETL\Internal;

use BenTools\ETL\EtlConfiguration;
use BenTools\ETL\EtlExecutor;
use BenTools\ETL\Extractor\CallableExtractor;
use BenTools\ETL\Extractor\ChainExtractor;
use BenTools\ETL\Extractor\ExtractorInterface;
Expand All @@ -22,12 +23,12 @@
/**
* @internal
*
* @template T
* @template EtlExecutor
*/
trait EtlBuilderTrait
{
/**
* @use EtlEventListenersTrait<T>
* @use EtlEventListenersTrait<EtlExecutor>
*/
use EtlEventListenersTrait;

Expand Down
2 changes: 1 addition & 1 deletion src/Internal/EtlEventListenersTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/**
* @internal
*
* @template T
* @template EtlExecutor
*/
trait EtlEventListenersTrait
{
Expand Down
3 changes: 3 additions & 0 deletions src/Internal/StateHolder.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

use BenTools\ETL\EtlState;

/**
* @internal
*/
final class StateHolder
{
public function __construct(
Expand Down
6 changes: 5 additions & 1 deletion src/Internal/TransformResult.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@ public static function create(mixed $value): self
static $prototype;
$prototype ??= new self();

if ($value instanceof self) {
return $value;
}

$that = clone $prototype;
if ($value instanceof Generator || $value instanceof self) {
if ($value instanceof Generator) {
$that->value = [...$value];
$that->iterable = true;
} else {
Expand Down

0 comments on commit 80296e6

Please sign in to comment.