Skip to content

Commit

Permalink
Feat: React streams support (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
bpolaszek authored Nov 16, 2023
1 parent d966bc4 commit d07e791
Show file tree
Hide file tree
Showing 14 changed files with 225 additions and 12 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Table of Contents
- [Chaining extractors / transformers / loaders](doc/advanced_usage.md#chaining-extractors--transformers--loaders)
- [Reading from STDIN / Writing to STDOUT](doc/advanced_usage.md#reading-from-stdin--writing-to-stdout)
- [Instantiators](doc/advanced_usage.md#instantiators)
- [Using React Streams (ReactPHP support)](doc/advanced_usage.md#using-react-streams-experimental)
- [Recipes](doc/recipes.md)
- [Contributing](#contribute)
- [License](#license)
Expand Down
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"pestphp/pest": "^2.24",
"phpstan/phpstan": "^1.10",
"phpstan/phpstan-mockery": "^1.1",
"react/stream": "^1.3",
"symfony/var-dumper": "*"
},
"license": "MIT",
Expand Down
38 changes: 38 additions & 0 deletions doc/advanced_usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,41 @@ $report = withRecipe(new LoggerRecipe($logger))
->transformWith(fn ($value) => strtoupper($value))
->process(['foo', 'bar']);
```

Using React streams (experimental)
----------------------------------

You can plug your ETL dataflows to any [React Stream](https://github.com/reactphp/stream).

Example with a TCP server:

```php
use BenTools\ETL\EtlConfiguration;
use BenTools\ETL\EventDispatcher\Event\ExtractEvent;
use BenTools\ETL\EventDispatcher\Event\InitEvent;
use React\Socket\ConnectionInterface;
use React\Socket\SocketServer;

use function BenTools\ETL\stdOut;
use function BenTools\ETL\useReact;

$socket = new SocketServer('127.0.0.1:7000');

$etl = useReact() // or (new EtlExecutor())->withRecipe(new ReactStreamProcessor());
->loadInto(stdOut())
->onInit(function (InitEvent $event) {
/** @var ConnectionInterface $stream */
$stream = $event->state->source;
$stream->on('close', function () use ($event) {
$event->state->stop(); // Will flush all pending items and gracefully stop the ETL for that connection
});
})
->withOptions(new EtlConfiguration(flushEvery: 1)) // Optionally, flush on each data event
->onExtract(function (ExtractEvent $event) {
if (!preg_match('//u', $event->item)) {
$event->state->skip(); // Ignore binary data
}
});

$socket->on('connection', $etl->process(...));
```
6 changes: 3 additions & 3 deletions src/EtlExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
use BenTools\ETL\Exception\StopRequest;
use BenTools\ETL\Exception\TransformException;
use BenTools\ETL\Extractor\ExtractorInterface;
use BenTools\ETL\Extractor\ExtractorProcessorInterface;
use BenTools\ETL\Extractor\IterableExtractor;
use BenTools\ETL\Extractor\IterableExtractorProcessor;
use BenTools\ETL\Internal\ClonableTrait;
use BenTools\ETL\Internal\ConditionalLoaderTrait;
use BenTools\ETL\Internal\EtlBuilderTrait;
use BenTools\ETL\Internal\TransformResult;
use BenTools\ETL\Loader\InMemoryLoader;
use BenTools\ETL\Loader\LoaderInterface;
use BenTools\ETL\Processor\IterableProcessor;
use BenTools\ETL\Processor\ProcessorInterface;
use BenTools\ETL\Transformer\NullTransformer;
use BenTools\ETL\Transformer\TransformerInterface;
use Psr\EventDispatcher\EventDispatcherInterface;
Expand Down Expand Up @@ -57,7 +57,7 @@ public function __construct(
public readonly TransformerInterface $transformer = new NullTransformer(),
public readonly LoaderInterface $loader = new InMemoryLoader(),
public readonly EtlConfiguration $options = new EtlConfiguration(),
public readonly ExtractorProcessorInterface $processor = new IterableExtractorProcessor(),
public readonly ProcessorInterface $processor = new IterableProcessor(),
) {
$this->listenerProvider = new PrioritizedListenerProvider();
$this->eventDispatcher = new EventDispatcher($this->listenerProvider);
Expand Down
21 changes: 21 additions & 0 deletions src/Extractor/ReactStreamExtractor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

declare(strict_types=1);

namespace BenTools\ETL\Extractor;

use BenTools\ETL\EtlState;
use React\Stream\ReadableStreamInterface;

final readonly class ReactStreamExtractor implements ExtractorInterface
{
public function __construct(
public ?ReadableStreamInterface $stream = null,
) {
}

public function extract(EtlState $state): ReadableStreamInterface
{
return $state->source ?? $this->stream;
}
}
4 changes: 2 additions & 2 deletions src/Internal/EtlBuilderTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
use BenTools\ETL\Extractor\CallableExtractor;
use BenTools\ETL\Extractor\ChainExtractor;
use BenTools\ETL\Extractor\ExtractorInterface;
use BenTools\ETL\Extractor\ExtractorProcessorInterface;
use BenTools\ETL\Loader\CallableLoader;
use BenTools\ETL\Loader\ChainLoader;
use BenTools\ETL\Loader\LoaderInterface;
use BenTools\ETL\Processor\ProcessorInterface;
use BenTools\ETL\Recipe\Recipe;
use BenTools\ETL\Transformer\CallableTransformer;
use BenTools\ETL\Transformer\ChainTransformer;
Expand Down Expand Up @@ -103,7 +103,7 @@ public function withRecipe(Recipe|callable $recipe, Recipe|callable ...$recipes)
return $executor;
}

public function withProcessor(ExtractorProcessorInterface $processor): self
public function withProcessor(ProcessorInterface $processor): self
{
return $this->cloneWith(['processor' => $processor]);
}
Expand Down
3 changes: 2 additions & 1 deletion src/Iterator/CSVIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@
private array $options;

/**
* @param Traversable<string> $text
* @param array{delimiter?: string, enclosure?: string, escapeString?: string, columns?: 'auto'|string[]|null, normalizers?: ValueNormalizerInterface[]} $options
*/
public function __construct(
private PregSplitIterator|StrTokIterator|FileIterator $text,
private Traversable $text,
array $options = [],
) {
$resolver = (new OptionsResolver())->setIgnoreUndefined();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

declare(strict_types=1);

namespace BenTools\ETL\Extractor;
namespace BenTools\ETL\Processor;

use BenTools\ETL\EtlExecutor;
use BenTools\ETL\EtlState;
Expand All @@ -16,7 +16,7 @@
/**
* @internal
*/
final readonly class IterableExtractorProcessor implements ExtractorProcessorInterface
final readonly class IterableProcessor implements ProcessorInterface
{
public function supports(mixed $extracted): bool
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

declare(strict_types=1);

namespace BenTools\ETL\Extractor;
namespace BenTools\ETL\Processor;

use BenTools\ETL\EtlExecutor;
use BenTools\ETL\EtlState;

interface ExtractorProcessorInterface
interface ProcessorInterface
{
public function supports(mixed $extracted): bool;

Expand Down
61 changes: 61 additions & 0 deletions src/Processor/ReactStreamProcessor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?php

declare(strict_types=1);

namespace BenTools\ETL\Processor;

use BenTools\ETL\EtlExecutor;
use BenTools\ETL\EtlState;
use BenTools\ETL\Exception\ExtractException;
use BenTools\ETL\Exception\SkipRequest;
use BenTools\ETL\Exception\StopRequest;
use BenTools\ETL\Extractor\ReactStreamExtractor;
use BenTools\ETL\Recipe\Recipe;
use React\EventLoop\Loop;
use React\Stream\ReadableStreamInterface;
use Throwable;

use function is_string;
use function trim;

/**
* @experimental
*/
final class ReactStreamProcessor extends Recipe implements ProcessorInterface
{
public function supports(mixed $extracted): bool
{
return $extracted instanceof ReadableStreamInterface;
}

/**
* @param ReadableStreamInterface $stream
*/
public function process(EtlExecutor $executor, EtlState $state, mixed $stream): EtlState
{
$key = -1;
$stream->on('data', function (mixed $item) use ($executor, &$key, $state, $stream) {
if (is_string($item)) {
$item = trim($item);
}
try {
$executor->processItem($item, ++$key, $state);
} catch (SkipRequest) {
} catch (StopRequest) {
$stream->close();
} catch (Throwable $e) {
$stream->close();
ExtractException::emit($executor, $e, $state);
}
});

Loop::run();

return $state->getLastVersion();
}

public function decorate(EtlExecutor $executor): EtlExecutor
{
return $executor->extractFrom(new ReactStreamExtractor())->withProcessor($this);
}
}
6 changes: 6 additions & 0 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use BenTools\ETL\Loader\ChainLoader;
use BenTools\ETL\Loader\LoaderInterface;
use BenTools\ETL\Loader\STDOUTLoader;
use BenTools\ETL\Processor\ReactStreamProcessor;
use BenTools\ETL\Recipe\Recipe;
use BenTools\ETL\Transformer\ChainTransformer;
use BenTools\ETL\Transformer\TransformerInterface;
Expand Down Expand Up @@ -76,3 +77,8 @@ function stdOut(): STDOUTLoader
{
return new STDOUTLoader();
}

function useReact(): EtlExecutor
{
return withRecipe(new ReactStreamProcessor());
}
63 changes: 63 additions & 0 deletions tests/Behavior/ReactStreamProcessorTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?php

declare(strict_types=1);

namespace BenTools\ETL\Tests\Behavior;

use BenTools\ETL\EventDispatcher\Event\ExtractEvent;
use BenTools\ETL\Exception\ExtractException;
use React\EventLoop\Loop;
use React\Stream\ReadableResourceStream;
use RuntimeException;

use function BenTools\ETL\useReact;
use function expect;
use function fopen;

it('processes React streams', function () {
// Given
$stream = new ReadableResourceStream(fopen('php://temp', 'rb'));
Loop::futureTick(fn () => $stream->emit('data', ['hello']));
Loop::futureTick(fn () => $stream->emit('data', ['world']));
$executor = useReact();

// When
$state = $executor->process($stream);

// Then
expect($state->output)->toBe(['hello', 'world']);
});

it('can skip items and stop the workflow', function () {
// Given
$stream = new ReadableResourceStream(fopen('php://temp', 'rb'));
$fruits = ['banana', 'apple', 'strawberry', 'raspberry', 'peach'];
foreach ($fruits as $fruit) {
Loop::futureTick(fn () => $stream->emit('data', [$fruit]));
}
$executor = useReact()
->onExtract(function (ExtractEvent $event) {
match ($event->item) {
'apple' => $event->state->skip(),
'peach' => $event->state->stop(),
default => null,
};
})
;

// When
$state = $executor->process($stream);

// Then
expect($state->output)->toBe(['banana', 'strawberry', 'raspberry']);
});

it('throws ExtractExceptions', function () {
// Given
$stream = new ReadableResourceStream(fopen('php://temp', 'rb'));
Loop::futureTick(fn () => $stream->emit('data', ['hello']));
$executor = useReact()->onExtract(fn () => throw new RuntimeException());

// When
$executor->process($stream);
})->throws(ExtractException::class);
4 changes: 2 additions & 2 deletions tests/Unit/EtlExecutorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
use BenTools\ETL\EtlState;
use BenTools\ETL\EventDispatcher\Event\FlushEvent;
use BenTools\ETL\Exception\ExtractException;
use BenTools\ETL\Extractor\ExtractorProcessorInterface;
use BenTools\ETL\Loader\ConditionalLoaderInterface;
use BenTools\ETL\Processor\ProcessorInterface;
use LogicException;
use Pest\Exceptions\ShouldNotHappen;

Expand Down Expand Up @@ -98,7 +98,7 @@ public function flush(bool $isPartial, EtlState $state): mixed
it('yells if it cannot process extracted data', function () {
// Given
$executor = (new EtlExecutor())->withProcessor(
new class() implements ExtractorProcessorInterface {
new class() implements ProcessorInterface {
public function supports(mixed $extracted): bool
{
return false;
Expand Down
21 changes: 21 additions & 0 deletions tests/Unit/Extractor/ReactStreamExtractorTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

declare(strict_types=1);

namespace BenTools\ETL\Tests\Unit\Extractor;

use BenTools\ETL\EtlState;
use BenTools\ETL\Extractor\ReactStreamExtractor;
use Mockery;
use React\Stream\ReadableStreamInterface;

use function expect;

it('supports any readable stream', function () {
$a = Mockery::mock(ReadableStreamInterface::class);
$b = Mockery::mock(ReadableStreamInterface::class);

$extractor = new ReactStreamExtractor($a);
expect($extractor->extract(new EtlState(source: $b)))->toBe($b)
->and($extractor->extract(new EtlState()))->toBe($a);
});

0 comments on commit d07e791

Please sign in to comment.