Skip to content

Commit

Permalink
refactor: batch (#123)
Browse files Browse the repository at this point in the history
* refactor: batch

* namespace reorg

* add timing

* restore metrics

* cosmetic

* revert composer.json
  • Loading branch information
priyadi authored Jul 16, 2024
1 parent 2e2d412 commit f637baf
Show file tree
Hide file tree
Showing 14 changed files with 394 additions and 176 deletions.
22 changes: 12 additions & 10 deletions packages/rekapager-core/src/Batch/BatchProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,35 +60,37 @@ final public function process(
?string $resume = null,
?int $pageSize = null
): void {
// determine start page identifier

if ($resume !== null) {
$startPageIdentifier = $this->pageableIdentifierResolver
->decode($this->pageable, $resume);
} else {
$startPageIdentifier = null;
}

// prepare pages

$itemsPerPage = $pageSize ?? $this->batchProcessor->getItemsPerPage();
$pageable = $this->pageable->withItemsPerPage($itemsPerPage);
$pages = $pageable->getPages($startPageIdentifier);

// emit event

$beforeProcessEvent = new BeforeProcessEvent(
pageable: $pageable,
startPageIdentifier: $resume,
);

$this->batchProcessor->beforeProcess($beforeProcessEvent);

$pages = $pageable->getPages($startPageIdentifier);

$numOfPages = 0;
$numOfItems = 0;

foreach ($pages as $page) {
$numOfPages++;

$pageIdentifier = $page->getPageIdentifier();
$pageIdentifierString = $this->pageableIdentifierResolver->encode($pageIdentifier);

if ($this->stopFlag) {
$interruptEvent = new InterruptEvent(
pageable: $pageable,
nextPageIdentifier: $pageIdentifierString,
);

Expand All @@ -105,8 +107,6 @@ final public function process(
$this->batchProcessor->beforePage($beforePageEvent);

foreach ($page as $key => $item) {
$numOfItems++;

$itemEvent = new ItemEvent(
key: $key,
item: $item,
Expand All @@ -122,7 +122,9 @@ final public function process(
$this->batchProcessor->afterPage($afterPageEvent);
}

$afterProcessEvent = new AfterProcessEvent();
$afterProcessEvent = new AfterProcessEvent(
pageable: $pageable,
);

$this->batchProcessor->afterProcess($afterProcessEvent);
}
Expand Down
20 changes: 11 additions & 9 deletions packages/rekapager-core/src/Batch/BatchProcessorDecorator.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,42 +28,44 @@
abstract class BatchProcessorDecorator implements BatchProcessorInterface
{
/**
* @return BatchProcessorInterface<TKey,T>
* @param BatchProcessorInterface<TKey,T> $decorated
*/
abstract protected function getDecorated(): BatchProcessorInterface;
public function __construct(private BatchProcessorInterface $decorated)
{
}

public function processItem(ItemEvent $itemEvent): void
{
$this->getDecorated()->processItem($itemEvent);
$this->decorated->processItem($itemEvent);
}

public function getItemsPerPage(): int
{
return $this->getDecorated()->getItemsPerPage();
return $this->decorated->getItemsPerPage();
}

public function beforeProcess(BeforeProcessEvent $event): void
{
$this->getDecorated()->beforeProcess($event);
$this->decorated->beforeProcess($event);
}

public function afterProcess(AfterProcessEvent $event): void
{
$this->getDecorated()->afterProcess($event);
$this->decorated->afterProcess($event);
}

public function beforePage(BeforePageEvent $event): void
{
$this->getDecorated()->beforePage($event);
$this->decorated->beforePage($event);
}

public function afterPage(AfterPageEvent $event): void
{
$this->getDecorated()->afterPage($event);
$this->decorated->afterPage($event);
}

public function onInterrupt(InterruptEvent $event): void
{
$this->getDecorated()->onInterrupt($event);
$this->decorated->onInterrupt($event);
}
}
10 changes: 10 additions & 0 deletions packages/rekapager-core/src/Batch/BatchProcessorInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,14 @@ public function processItem(ItemEvent $itemEvent): void;
*/
public function getItemsPerPage(): int;

/**
* @param BeforeProcessEvent<TKey,T> $event
*/
public function beforeProcess(BeforeProcessEvent $event): void;

/**
* @param AfterProcessEvent<TKey,T> $event
*/
public function afterProcess(AfterProcessEvent $event): void;

/**
Expand All @@ -49,5 +56,8 @@ public function beforePage(BeforePageEvent $event): void;
*/
public function afterPage(AfterPageEvent $event): void;

/**
* @param InterruptEvent<TKey,T> $event
*/
public function onInterrupt(InterruptEvent $event): void;
}
20 changes: 19 additions & 1 deletion packages/rekapager-core/src/Batch/Event/AfterProcessEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,27 @@

namespace Rekalogika\Rekapager\Batch\Event;

use Rekalogika\Contracts\Rekapager\PageableInterface;

/**
* @template TKey of array-key
* @template T
*/
final class AfterProcessEvent
{
public function __construct()
/**
* @param PageableInterface<TKey,T> $pageable
*/
public function __construct(
private readonly PageableInterface $pageable,
) {
}

/**
* @return PageableInterface<TKey,T>
*/
public function getPageable(): PageableInterface
{
return $this->pageable;
}
}
18 changes: 18 additions & 0 deletions packages/rekapager-core/src/Batch/Event/BeforeProcessEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,19 @@

namespace Rekalogika\Rekapager\Batch\Event;

use Rekalogika\Contracts\Rekapager\PageableInterface;

/**
* @template TKey of array-key
* @template T
*/
final class BeforeProcessEvent
{
/**
* @param PageableInterface<TKey,T> $pageable
*/
public function __construct(
private readonly PageableInterface $pageable,
private readonly ?string $startPageIdentifier,
) {
}
Expand All @@ -24,4 +34,12 @@ public function getStartPageIdentifier(): ?string
{
return $this->startPageIdentifier;
}

/**
* @return PageableInterface<TKey,T>
*/
public function getPageable(): PageableInterface
{
return $this->pageable;
}
}
18 changes: 18 additions & 0 deletions packages/rekapager-core/src/Batch/Event/InterruptEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,31 @@

namespace Rekalogika\Rekapager\Batch\Event;

use Rekalogika\Contracts\Rekapager\PageableInterface;

/**
* @template TKey of array-key
* @template T
*/
final class InterruptEvent
{
/**
* @param PageableInterface<TKey,T> $pageable
*/
public function __construct(
private readonly PageableInterface $pageable,
private readonly ?string $nextPageIdentifier,
) {
}

/**
* @return PageableInterface<TKey,T>
*/
public function getPageable(): PageableInterface
{
return $this->pageable;
}

public function getNextPageIdentifier(): ?string
{
return $this->nextPageIdentifier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* that was distributed with this source code.
*/

namespace Rekalogika\Rekapager\Symfony;
namespace Rekalogika\Rekapager\Symfony\Batch;

use Rekalogika\Contracts\Rekapager\Exception\InvalidArgumentException;
use Rekalogika\Contracts\Rekapager\Exception\LogicException;
Expand All @@ -20,6 +20,7 @@
use Rekalogika\Rekapager\Batch\BatchProcess;
use Rekalogika\Rekapager\Batch\BatchProcessFactoryInterface;
use Rekalogika\Rekapager\Batch\BatchProcessorInterface;
use Rekalogika\Rekapager\Symfony\Batch\Internal\CommandBatchProcessorDecorator;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Command\SignalableCommandInterface;
use Symfony\Component\Console\Input\InputInterface;
Expand Down Expand Up @@ -112,7 +113,7 @@ final protected function execute(InputInterface $input, OutputInterface $output)
$pageable = $this->getPageable($input, $output);
$this->io = new SymfonyStyle($input, $output);

$batchProcessor = new BatchProcessorDecorator(
$batchProcessor = new CommandBatchProcessorDecorator(
decorated: $this->getBatchProcessor(),
io: $this->io,
progressFile: $progressFile,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<?php

declare(strict_types=1);

/*
* This file is part of rekalogika/rekapager package.
*
* (c) Priyadi Iman Nurcahyo <https://rekalogika.dev>
*
* For the full copyright and license information, please view the LICENSE file
* that was distributed with this source code.
*/

namespace Rekalogika\Rekapager\Symfony\Batch\Internal;

use Rekalogika\Contracts\Rekapager\Exception\LogicException;

/**
* @internal
*/
class BatchTimer
{
public const TIMER_PROCESS = 'process';
public const TIMER_PAGE = 'page';
public const TIMER_ITEM = 'item';
public const TIMER_DISPLAY = 'display';

/**
* @var array<BatchTimer::TIMER_*,int|float>
*/
private array $timers = [];

/**
* @param BatchTimer::TIMER_* $timer
*/
public function start(string $timer): void
{
if (isset($this->timers[$timer])) {
throw new LogicException(sprintf('Timer "%s" is already started.', $timer));
}

$this->timers[$timer] = hrtime(true);
}

/**
* @param BatchTimer::TIMER_* $timer
*/
public function stop(string $timer): float
{
$result = $this->getDuration($timer);

if ($result === null) {
throw new LogicException(sprintf('Timer "%s" has not been started yet.', $timer));
}

$this->reset($timer);

return $result;
}

/**
* @param BatchTimer::TIMER_* $timer
*/
public function restart(string $timer): void
{
$this->reset($timer);
$this->start($timer);
}

/**
* @param BatchTimer::TIMER_* $timer
*/
public function reset(string $timer): void
{
unset($this->timers[$timer]);
}

/**
* @param BatchTimer::TIMER_* $timer
*/
public function getDuration(string $timer): ?float
{
if (!isset($this->timers[$timer])) {
return null;
}

return (hrtime(true) - $this->timers[$timer]) / 1e9;
}
}
Loading

0 comments on commit f637baf

Please sign in to comment.