Skip to content

Commit

Permalink
Merge pull request #1 from DACSoftware/master
Browse files Browse the repository at this point in the history
Merge pool processing feature
gtgt authored Jan 30, 2019
2 parents d2d77dd + e2a71a1 commit c7e8b2d
Showing 6 changed files with 171 additions and 37 deletions.
62 changes: 62 additions & 0 deletions src/Spork/AbstractJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?php

namespace Spork;

use Spork\Exception\UnexpectedTypeException;

abstract class AbstractJob
{
protected $manager;
protected $data;
protected $name;
protected $callback;

public function __construct(ProcessManager $manager, $data = null)
{
$this->manager = $manager;
$this->data = $data;
$this->name = '<anonymous>';
}

public function setName($name)
{
$this->name = $name;

return $this;
}

public function setData($data)
{
$this->data = $data;

return $this;
}

public function setCallback($callback)
{
if (!is_callable($callback)) {
throw new UnexpectedTypeException($callback, 'callable');
}

$this->callback = $callback;

return $this;
}

public function execute($callback = null)
{
if (null !== $callback) {
$this->setCallback($callback);
}

return $this->manager->fork($this)->setName($this->name);
}

/**
* Runs in a child process.
*
* @see execute()
*/
abstract public function __invoke();

}
43 changes: 6 additions & 37 deletions src/Spork/Batch/BatchJob.php
Original file line number Diff line number Diff line change
@@ -14,29 +14,17 @@
use Spork\Batch\Strategy\ChunkStrategy;
use Spork\Batch\Strategy\StrategyInterface;
use Spork\Exception\UnexpectedTypeException;
use Spork\AbstractJob;
use Spork\ProcessManager;

class BatchJob
class BatchJob extends AbstractJob
{
private $manager;
private $data;
private $strategy;
private $name;
private $callback;
protected $strategy;

public function __construct(ProcessManager $manager, $data = null, StrategyInterface $strategy = null)
{
$this->manager = $manager;
$this->data = $data;
parent::__construct($manager, $data);
$this->strategy = $strategy ?: new ChunkStrategy();
$this->name = '<anonymous>';
}

public function setName($name)
{
$this->name = $name;

return $this;
}

public function setStrategy(StrategyInterface $strategy)
@@ -46,31 +34,13 @@ public function setStrategy(StrategyInterface $strategy)
return $this;
}

public function setData($data)
{
$this->data = $data;

return $this;
}

public function setCallback($callback)
{
if (!is_callable($callback)) {
throw new UnexpectedTypeException($callback, 'callable');
}

$this->callback = $callback;

return $this;
}

public function execute($callback = null)
{
if (null !== $callback) {
$this->setCallback($callback);
}

return $this->manager->fork($this)->setName($this->name.' batch');
return $this->manager->fork($this)->setName($this->name . ' batch');
}

/**
@@ -84,8 +54,7 @@ public function __invoke()
foreach ($this->strategy->createBatches($this->data) as $index => $batch) {
$forks[] = $this->manager
->fork($this->strategy->createRunner($batch, $this->callback))
->setName(sprintf('%s batch #%d', $this->name, $index))
;
->setName(sprintf('%s batch #%d', $this->name, $index));
}

// block until all forks have exited
13 changes: 13 additions & 0 deletions src/Spork/Factory.php
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@

use Spork\Batch\BatchJob;
use Spork\Batch\Strategy\StrategyInterface;
use Spork\Pool\PoolJob;

class Factory
{
@@ -30,6 +31,18 @@ public function createBatchJob(ProcessManager $manager, $data = null, StrategyIn
return new BatchJob($manager, $data, $strategy);
}

/**
* @param ProcessManager $manager
* @param null $data
* @param int $poolSize
* @return AbstractJob
*
*/
public function createPoolJob(ProcessManager $manager, $data = null, $poolSize = 3)
{
return new PoolJob($manager, $data, $poolSize);
}

/**
* Creates a new shared memory instance.
*
58 changes: 58 additions & 0 deletions src/Spork/Pool/PoolJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<?php

namespace Spork\Pool;

use Spork\AbstractJob;
use Spork\Batch\BatchRunner;
use Spork\ProcessManager;

class PoolJob extends AbstractJob
{
protected $poolSize;

public function __construct(ProcessManager $manager, $data, $pollSize = 3)
{
parent::__construct($manager, $data);

$this->poolSize = $pollSize;
}

/**
* Runs in a child process.
*
* @see execute()
*/
public function __invoke()
{
$forks = array();
$results = array();
$batches = $this->data;
$index = 0;
while (count($batches) > 0) {
while (count($forks) < $this->poolSize) {
$batch = array_splice($batches, 0, 1);
$fork = $this->manager->fork(new BatchRunner($batch, $this->callback))
->setName(sprintf('%s part #%d', $this->name, $index));
$forks[$fork->getPid()] = $fork;
$index++;
}
do {
$endedFork = $this->manager->waitForNext();
} while (!isset($endedFork));

$results = array_merge($results, $endedFork->getResult());
unset($forks[$endedFork->getPid()]);

$endedFork = null;
}
// block until all forks have exited
$this->manager->wait();


foreach ($forks as $fork) {
$results = array_merge($results, $fork->getResult());
}

return $results;
}
}
19 changes: 19 additions & 0 deletions src/Spork/ProcessManager.php
Original file line number Diff line number Diff line change
@@ -76,11 +76,30 @@ public function createBatchJob($data = null, StrategyInterface $strategy = null)
return $this->factory->createBatchJob($this, $data, $strategy);
}

/**
* @param $data
* @param $callable
* @param StrategyInterface|null $strategy
*
* @return Fork
*/
public function process($data, $callable, StrategyInterface $strategy = null)
{
return $this->createBatchJob($data, $strategy)->execute($callable);
}

/**
* @param $data
* @param $callable
* @param int $poolSize
* @return Fork
*
*/
public function parallel($data, $callable, $poolSize = 3)
{
return $this->factory->createPoolJob($this, $data, $poolSize)->execute($callable);
}

/**
* Forks something into another process and returns a deferred object.
*/
13 changes: 13 additions & 0 deletions tests/Spork/Test/ProcessManagerTest.php
Original file line number Diff line number Diff line change
@@ -97,6 +97,19 @@ public function testBatchProcessing()
$this->assertEquals($expected, $fork->getResult());
}

public function testPoolProcessing()
{
$expected = range(50, 59);
$fork = $this->manager->parallel($expected, function($item) {
return $item;
});

$this->manager->wait();

$actual = $fork->getResult();
$this->assertEquals(sort($expected), sort($actual));
}

/**
* Test batch processing with return values containing a newline character
*/

0 comments on commit c7e8b2d

Please sign in to comment.