diff --git a/src/Spork/AbstractJob.php b/src/Spork/AbstractJob.php new file mode 100644 index 0000000..fbdab88 --- /dev/null +++ b/src/Spork/AbstractJob.php @@ -0,0 +1,62 @@ +manager = $manager; + $this->data = $data; + $this->name = ''; + } + + public function setName($name) + { + $this->name = $name; + + return $this; + } + + public function setData($data) + { + $this->data = $data; + + return $this; + } + + public function setCallback($callback) + { + if (!is_callable($callback)) { + throw new UnexpectedTypeException($callback, 'callable'); + } + + $this->callback = $callback; + + return $this; + } + + public function execute($callback = null) + { + if (null !== $callback) { + $this->setCallback($callback); + } + + return $this->manager->fork($this)->setName($this->name); + } + + /** + * Runs in a child process. + * + * @see execute() + */ + abstract public function __invoke(); + +} diff --git a/src/Spork/Batch/BatchJob.php b/src/Spork/Batch/BatchJob.php index 3e05f02..5629247 100644 --- a/src/Spork/Batch/BatchJob.php +++ b/src/Spork/Batch/BatchJob.php @@ -14,29 +14,17 @@ use Spork\Batch\Strategy\ChunkStrategy; use Spork\Batch\Strategy\StrategyInterface; use Spork\Exception\UnexpectedTypeException; +use Spork\AbstractJob; use Spork\ProcessManager; -class BatchJob +class BatchJob extends AbstractJob { - private $manager; - private $data; - private $strategy; - private $name; - private $callback; + protected $strategy; public function __construct(ProcessManager $manager, $data = null, StrategyInterface $strategy = null) { - $this->manager = $manager; - $this->data = $data; + parent::__construct($manager, $data); $this->strategy = $strategy ?: new ChunkStrategy(); - $this->name = ''; - } - - public function setName($name) - { - $this->name = $name; - - return $this; } public function setStrategy(StrategyInterface $strategy) @@ -46,31 +34,13 @@ public function setStrategy(StrategyInterface $strategy) return $this; } - public function setData($data) - { - $this->data = $data; - - return $this; - } - - public function setCallback($callback) - { - if (!is_callable($callback)) { - throw new UnexpectedTypeException($callback, 'callable'); - } - - $this->callback = $callback; - - return $this; - } - public function execute($callback = null) { if (null !== $callback) { $this->setCallback($callback); } - return $this->manager->fork($this)->setName($this->name.' batch'); + return $this->manager->fork($this)->setName($this->name . ' batch'); } /** @@ -84,8 +54,7 @@ public function __invoke() foreach ($this->strategy->createBatches($this->data) as $index => $batch) { $forks[] = $this->manager ->fork($this->strategy->createRunner($batch, $this->callback)) - ->setName(sprintf('%s batch #%d', $this->name, $index)) - ; + ->setName(sprintf('%s batch #%d', $this->name, $index)); } // block until all forks have exited diff --git a/src/Spork/Factory.php b/src/Spork/Factory.php index f951022..ae29824 100644 --- a/src/Spork/Factory.php +++ b/src/Spork/Factory.php @@ -13,6 +13,7 @@ use Spork\Batch\BatchJob; use Spork\Batch\Strategy\StrategyInterface; +use Spork\Pool\PoolJob; class Factory { @@ -30,6 +31,18 @@ public function createBatchJob(ProcessManager $manager, $data = null, StrategyIn return new BatchJob($manager, $data, $strategy); } + /** + * @param ProcessManager $manager + * @param null $data + * @param int $poolSize + * @return AbstractJob + * + */ + public function createPoolJob(ProcessManager $manager, $data = null, $poolSize = 3) + { + return new PoolJob($manager, $data, $poolSize); + } + /** * Creates a new shared memory instance. * diff --git a/src/Spork/Pool/PoolJob.php b/src/Spork/Pool/PoolJob.php new file mode 100644 index 0000000..7a92e3e --- /dev/null +++ b/src/Spork/Pool/PoolJob.php @@ -0,0 +1,58 @@ +poolSize = $pollSize; + } + + /** + * Runs in a child process. + * + * @see execute() + */ + public function __invoke() + { + $forks = array(); + $results = array(); + $batches = $this->data; + $index = 0; + while (count($batches) > 0) { + while (count($forks) < $this->poolSize) { + $batch = array_splice($batches, 0, 1); + $fork = $this->manager->fork(new BatchRunner($batch, $this->callback)) + ->setName(sprintf('%s part #%d', $this->name, $index)); + $forks[$fork->getPid()] = $fork; + $index++; + } + do { + $endedFork = $this->manager->waitForNext(); + } while (!isset($endedFork)); + + $results = array_merge($results, $endedFork->getResult()); + unset($forks[$endedFork->getPid()]); + + $endedFork = null; + } + // block until all forks have exited + $this->manager->wait(); + + + foreach ($forks as $fork) { + $results = array_merge($results, $fork->getResult()); + } + + return $results; + } +} diff --git a/src/Spork/ProcessManager.php b/src/Spork/ProcessManager.php index 663c5a8..10c6ccb 100644 --- a/src/Spork/ProcessManager.php +++ b/src/Spork/ProcessManager.php @@ -76,11 +76,30 @@ public function createBatchJob($data = null, StrategyInterface $strategy = null) return $this->factory->createBatchJob($this, $data, $strategy); } + /** + * @param $data + * @param $callable + * @param StrategyInterface|null $strategy + * + * @return Fork + */ public function process($data, $callable, StrategyInterface $strategy = null) { return $this->createBatchJob($data, $strategy)->execute($callable); } + /** + * @param $data + * @param $callable + * @param int $poolSize + * @return Fork + * + */ + public function parallel($data, $callable, $poolSize = 3) + { + return $this->factory->createPoolJob($this, $data, $poolSize)->execute($callable); + } + /** * Forks something into another process and returns a deferred object. */ diff --git a/tests/Spork/Test/ProcessManagerTest.php b/tests/Spork/Test/ProcessManagerTest.php index 6ee72cb..4dc0489 100644 --- a/tests/Spork/Test/ProcessManagerTest.php +++ b/tests/Spork/Test/ProcessManagerTest.php @@ -97,6 +97,19 @@ public function testBatchProcessing() $this->assertEquals($expected, $fork->getResult()); } + public function testPoolProcessing() + { + $expected = range(50, 59); + $fork = $this->manager->parallel($expected, function($item) { + return $item; + }); + + $this->manager->wait(); + + $actual = $fork->getResult(); + $this->assertEquals(sort($expected), sort($actual)); + } + /** * Test batch processing with return values containing a newline character */