diff --git a/README.md b/README.md index 8aa81a5..13ef6d0 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ Table of Contents - [Chaining extractors / transformers / loaders](doc/advanced_usage.md#chaining-extractors--transformers--loaders) - [Reading from STDIN / Writing to STDOUT](doc/advanced_usage.md#reading-from-stdin--writing-to-stdout) - [Instantiators](doc/advanced_usage.md#instantiators) + - [Using React Streams (ReactPHP support)](doc/advanced_usage.md#using-react-streams-experimental) - [Recipes](doc/recipes.md) - [Contributing](#contribute) - [License](#license) diff --git a/composer.json b/composer.json index 1084edd..60f26fc 100644 --- a/composer.json +++ b/composer.json @@ -18,6 +18,7 @@ "pestphp/pest": "^2.24", "phpstan/phpstan": "^1.10", "phpstan/phpstan-mockery": "^1.1", + "react/stream": "^1.3", "symfony/var-dumper": "*" }, "license": "MIT", diff --git a/doc/advanced_usage.md b/doc/advanced_usage.md index 11775da..a780787 100644 --- a/doc/advanced_usage.md +++ b/doc/advanced_usage.md @@ -158,3 +158,41 @@ $report = withRecipe(new LoggerRecipe($logger)) ->transformWith(fn ($value) => strtoupper($value)) ->process(['foo', 'bar']); ``` + +Using React streams (experimental) +---------------------------------- + +You can plug your ETL dataflows to any [React Stream](https://github.com/reactphp/stream). + +Example with a TCP server: + +```php +use BenTools\ETL\EtlConfiguration; +use BenTools\ETL\EventDispatcher\Event\ExtractEvent; +use BenTools\ETL\EventDispatcher\Event\InitEvent; +use React\Socket\ConnectionInterface; +use React\Socket\SocketServer; + +use function BenTools\ETL\stdOut; +use function BenTools\ETL\useReact; + +$socket = new SocketServer('127.0.0.1:7000'); + +$etl = useReact() // or (new EtlExecutor())->withRecipe(new ReactStreamProcessor()); + ->loadInto(stdOut()) + ->onInit(function (InitEvent $event) { + /** @var ConnectionInterface $stream */ + $stream = $event->state->source; + $stream->on('close', function () use ($event) { + $event->state->stop(); // Will flush all pending items and gracefully stop the ETL for that connection + }); + }) + ->withOptions(new EtlConfiguration(flushEvery: 1)) // Optionally, flush on each data event + ->onExtract(function (ExtractEvent $event) { + if (!preg_match('//u', $event->item)) { + $event->state->skip(); // Ignore binary data + } + }); + +$socket->on('connection', $etl->process(...)); +``` diff --git a/src/EtlExecutor.php b/src/EtlExecutor.php index e388530..e33e3c6 100644 --- a/src/EtlExecutor.php +++ b/src/EtlExecutor.php @@ -20,15 +20,15 @@ use BenTools\ETL\Exception\StopRequest; use BenTools\ETL\Exception\TransformException; use BenTools\ETL\Extractor\ExtractorInterface; -use BenTools\ETL\Extractor\ExtractorProcessorInterface; use BenTools\ETL\Extractor\IterableExtractor; -use BenTools\ETL\Extractor\IterableExtractorProcessor; use BenTools\ETL\Internal\ClonableTrait; use BenTools\ETL\Internal\ConditionalLoaderTrait; use BenTools\ETL\Internal\EtlBuilderTrait; use BenTools\ETL\Internal\TransformResult; use BenTools\ETL\Loader\InMemoryLoader; use BenTools\ETL\Loader\LoaderInterface; +use BenTools\ETL\Processor\IterableProcessor; +use BenTools\ETL\Processor\ProcessorInterface; use BenTools\ETL\Transformer\NullTransformer; use BenTools\ETL\Transformer\TransformerInterface; use Psr\EventDispatcher\EventDispatcherInterface; @@ -57,7 +57,7 @@ public function __construct( public readonly TransformerInterface $transformer = new NullTransformer(), public readonly LoaderInterface $loader = new InMemoryLoader(), public readonly EtlConfiguration $options = new EtlConfiguration(), - public readonly ExtractorProcessorInterface $processor = new IterableExtractorProcessor(), + public readonly ProcessorInterface $processor = new IterableProcessor(), ) { $this->listenerProvider = new PrioritizedListenerProvider(); $this->eventDispatcher = new EventDispatcher($this->listenerProvider); diff --git a/src/Extractor/ReactStreamExtractor.php b/src/Extractor/ReactStreamExtractor.php new file mode 100644 index 0000000..c8700f3 --- /dev/null +++ b/src/Extractor/ReactStreamExtractor.php @@ -0,0 +1,21 @@ +source ?? $this->stream; + } +} diff --git a/src/Internal/EtlBuilderTrait.php b/src/Internal/EtlBuilderTrait.php index a7d8fa6..af50b16 100644 --- a/src/Internal/EtlBuilderTrait.php +++ b/src/Internal/EtlBuilderTrait.php @@ -8,10 +8,10 @@ use BenTools\ETL\Extractor\CallableExtractor; use BenTools\ETL\Extractor\ChainExtractor; use BenTools\ETL\Extractor\ExtractorInterface; -use BenTools\ETL\Extractor\ExtractorProcessorInterface; use BenTools\ETL\Loader\CallableLoader; use BenTools\ETL\Loader\ChainLoader; use BenTools\ETL\Loader\LoaderInterface; +use BenTools\ETL\Processor\ProcessorInterface; use BenTools\ETL\Recipe\Recipe; use BenTools\ETL\Transformer\CallableTransformer; use BenTools\ETL\Transformer\ChainTransformer; @@ -103,7 +103,7 @@ public function withRecipe(Recipe|callable $recipe, Recipe|callable ...$recipes) return $executor; } - public function withProcessor(ExtractorProcessorInterface $processor): self + public function withProcessor(ProcessorInterface $processor): self { return $this->cloneWith(['processor' => $processor]); } diff --git a/src/Iterator/CSVIterator.php b/src/Iterator/CSVIterator.php index 1ed3534..6e6cae8 100644 --- a/src/Iterator/CSVIterator.php +++ b/src/Iterator/CSVIterator.php @@ -32,10 +32,11 @@ private array $options; /** + * @param Traversable $text * @param array{delimiter?: string, enclosure?: string, escapeString?: string, columns?: 'auto'|string[]|null, normalizers?: ValueNormalizerInterface[]} $options */ public function __construct( - private PregSplitIterator|StrTokIterator|FileIterator $text, + private Traversable $text, array $options = [], ) { $resolver = (new OptionsResolver())->setIgnoreUndefined(); diff --git a/src/Extractor/IterableExtractorProcessor.php b/src/Processor/IterableProcessor.php similarity index 90% rename from src/Extractor/IterableExtractorProcessor.php rename to src/Processor/IterableProcessor.php index fef13fc..f19b70f 100644 --- a/src/Extractor/IterableExtractorProcessor.php +++ b/src/Processor/IterableProcessor.php @@ -2,7 +2,7 @@ declare(strict_types=1); -namespace BenTools\ETL\Extractor; +namespace BenTools\ETL\Processor; use BenTools\ETL\EtlExecutor; use BenTools\ETL\EtlState; @@ -16,7 +16,7 @@ /** * @internal */ -final readonly class IterableExtractorProcessor implements ExtractorProcessorInterface +final readonly class IterableProcessor implements ProcessorInterface { public function supports(mixed $extracted): bool { diff --git a/src/Extractor/ExtractorProcessorInterface.php b/src/Processor/ProcessorInterface.php similarity index 77% rename from src/Extractor/ExtractorProcessorInterface.php rename to src/Processor/ProcessorInterface.php index b14f01b..6bd5b2b 100644 --- a/src/Extractor/ExtractorProcessorInterface.php +++ b/src/Processor/ProcessorInterface.php @@ -2,12 +2,12 @@ declare(strict_types=1); -namespace BenTools\ETL\Extractor; +namespace BenTools\ETL\Processor; use BenTools\ETL\EtlExecutor; use BenTools\ETL\EtlState; -interface ExtractorProcessorInterface +interface ProcessorInterface { public function supports(mixed $extracted): bool; diff --git a/src/Processor/ReactStreamProcessor.php b/src/Processor/ReactStreamProcessor.php new file mode 100644 index 0000000..5ca4074 --- /dev/null +++ b/src/Processor/ReactStreamProcessor.php @@ -0,0 +1,61 @@ +on('data', function (mixed $item) use ($executor, &$key, $state, $stream) { + if (is_string($item)) { + $item = trim($item); + } + try { + $executor->processItem($item, ++$key, $state); + } catch (SkipRequest) { + } catch (StopRequest) { + $stream->close(); + } catch (Throwable $e) { + $stream->close(); + ExtractException::emit($executor, $e, $state); + } + }); + + Loop::run(); + + return $state->getLastVersion(); + } + + public function decorate(EtlExecutor $executor): EtlExecutor + { + return $executor->extractFrom(new ReactStreamExtractor())->withProcessor($this); + } +} diff --git a/src/functions.php b/src/functions.php index d772cf3..e57eb24 100644 --- a/src/functions.php +++ b/src/functions.php @@ -10,6 +10,7 @@ use BenTools\ETL\Loader\ChainLoader; use BenTools\ETL\Loader\LoaderInterface; use BenTools\ETL\Loader\STDOUTLoader; +use BenTools\ETL\Processor\ReactStreamProcessor; use BenTools\ETL\Recipe\Recipe; use BenTools\ETL\Transformer\ChainTransformer; use BenTools\ETL\Transformer\TransformerInterface; @@ -76,3 +77,8 @@ function stdOut(): STDOUTLoader { return new STDOUTLoader(); } + +function useReact(): EtlExecutor +{ + return withRecipe(new ReactStreamProcessor()); +} diff --git a/tests/Behavior/ReactStreamProcessorTest.php b/tests/Behavior/ReactStreamProcessorTest.php new file mode 100644 index 0000000..808e1f0 --- /dev/null +++ b/tests/Behavior/ReactStreamProcessorTest.php @@ -0,0 +1,63 @@ + $stream->emit('data', ['hello'])); + Loop::futureTick(fn () => $stream->emit('data', ['world'])); + $executor = useReact(); + + // When + $state = $executor->process($stream); + + // Then + expect($state->output)->toBe(['hello', 'world']); +}); + +it('can skip items and stop the workflow', function () { + // Given + $stream = new ReadableResourceStream(fopen('php://temp', 'rb')); + $fruits = ['banana', 'apple', 'strawberry', 'raspberry', 'peach']; + foreach ($fruits as $fruit) { + Loop::futureTick(fn () => $stream->emit('data', [$fruit])); + } + $executor = useReact() + ->onExtract(function (ExtractEvent $event) { + match ($event->item) { + 'apple' => $event->state->skip(), + 'peach' => $event->state->stop(), + default => null, + }; + }) + ; + + // When + $state = $executor->process($stream); + + // Then + expect($state->output)->toBe(['banana', 'strawberry', 'raspberry']); +}); + +it('throws ExtractExceptions', function () { + // Given + $stream = new ReadableResourceStream(fopen('php://temp', 'rb')); + Loop::futureTick(fn () => $stream->emit('data', ['hello'])); + $executor = useReact()->onExtract(fn () => throw new RuntimeException()); + + // When + $executor->process($stream); +})->throws(ExtractException::class); diff --git a/tests/Unit/EtlExecutorTest.php b/tests/Unit/EtlExecutorTest.php index 1b8a682..3dbe0c9 100644 --- a/tests/Unit/EtlExecutorTest.php +++ b/tests/Unit/EtlExecutorTest.php @@ -9,8 +9,8 @@ use BenTools\ETL\EtlState; use BenTools\ETL\EventDispatcher\Event\FlushEvent; use BenTools\ETL\Exception\ExtractException; -use BenTools\ETL\Extractor\ExtractorProcessorInterface; use BenTools\ETL\Loader\ConditionalLoaderInterface; +use BenTools\ETL\Processor\ProcessorInterface; use LogicException; use Pest\Exceptions\ShouldNotHappen; @@ -98,7 +98,7 @@ public function flush(bool $isPartial, EtlState $state): mixed it('yells if it cannot process extracted data', function () { // Given $executor = (new EtlExecutor())->withProcessor( - new class() implements ExtractorProcessorInterface { + new class() implements ProcessorInterface { public function supports(mixed $extracted): bool { return false; diff --git a/tests/Unit/Extractor/ReactStreamExtractorTest.php b/tests/Unit/Extractor/ReactStreamExtractorTest.php new file mode 100644 index 0000000..aff21ad --- /dev/null +++ b/tests/Unit/Extractor/ReactStreamExtractorTest.php @@ -0,0 +1,21 @@ +extract(new EtlState(source: $b)))->toBe($b) + ->and($extractor->extract(new EtlState()))->toBe($a); +});