Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.7 #27

Open
wants to merge 71 commits into
base: test
Choose a base branch
from
Open

0.7 #27

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
4f6e227
added jsontype
johanliefers Jan 7, 2016
06d1bc4
renamed to relation_json
johanliefers Jan 11, 2016
b06c2da
removed count $i
johanliefers Jan 11, 2016
fa6a7ce
Merge pull request #2 from integratedfordevelopers/INTEGRATED-656-jso…
Jan 11, 2016
fd12e08
added solr property type
johanliefers Jan 19, 2016
e04dcbe
Merge pull request #4 from integratedfordevelopers/INTEGRATED-668-sol…
marijnotte Jan 19, 2016
6a48a52
[INTEGRATED-802] Added JSON Solr mapping type
May 6, 2016
1869c9e
Merge pull request #5 from integratedfordevelopers/INTEGRATED-802-add…
johanliefers May 6, 2016
8fb1c2b
INTEGRATED-669 errors while converting document will no longer halt t…
jansanne May 18, 2016
50927d0
Merge pull request #6 from integratedfordevelopers/INTEGRATED-669
joenivl May 18, 2016
4774312
Merge branch '0.4' into 0.5
Jun 10, 2016
f54dca1
Implemented the solr worker services.
jansanne Jun 16, 2016
fc5e539
Updated the readme with extra instructions.
jansanne Jun 27, 2016
d0b741a
Merge pull request #8 from integratedfordevelopers/INTEGRATED-829
jansanne Jun 27, 2016
0a6dd66
[INTEGRATED-869] Use FilterContainer by default
Aug 30, 2016
d4bc0c1
Merge pull request #9 from integratedfordevelopers/INTEGRATED-869-use…
johanliefers Aug 31, 2016
e3d0fb6
[INTEGRATED-744] Moved PropertyType and RelationJsonType from SolrBun…
Sep 8, 2016
d6fe956
Merge pull request #10 from integratedfordevelopers/INTEGRATED-744-mo…
johanliefers Sep 15, 2016
3239d4f
The mongo db provider now also implements the content type provider a…
jansanne Sep 15, 2016
42b7525
Merge pull request #11 from integratedfordevelopers/INTEGRATED-333
joenivl Sep 16, 2016
40741a8
Update composer versions
marijnotte Oct 30, 2016
0126bcb
Allow tags trough the DI.
Oct 24, 2016
7654f59
Fixed the unit tests.
Nov 9, 2016
f00fc95
Merge pull request #15 from integratedfordevelopers/INTEGRATED-907-so…
koenprins Nov 10, 2016
483d9d8
Merge remote-tracking branch 'origin/0.5' into 0.6
Nov 14, 2016
8279530
Added Query expander
Jan 2, 2017
ce7820b
Added compiler pass for registrering query expansions
Jan 2, 2017
df08bf8
Load the Solr xml
Jan 2, 2017
6259b9f
Revert "Added Query expander"
Jan 4, 2017
0363cde
Revert "Added compiler pass for registrering query expansions"
Jan 4, 2017
585fc0a
Revert "Load the Solr xml"
Jan 4, 2017
df4fcaa
Inject the event dispatcher into the solr client
Jan 4, 2017
b469005
[INTEGRATED-947] Don't hydrate to improve indexer performance
Dec 14, 2016
fe2c58b
[INTEGRATED-947] ContentType can be optional
Dec 14, 2016
160aaa1
Merge pull request #19 from integratedfordevelopers/INTEGRATED-948-im…
koenprins Jan 6, 2017
0baa5aa
Removed the compiler because the event dispatcher is injected into th…
Jan 6, 2017
7717ea5
Merge pull request #18 from integratedfordevelopers/INTEGRATED-952-do…
jansanne Jan 9, 2017
5e19083
the process based indexer
Dec 15, 2016
e88ecef
Added environment to the command.
johnnyborg Dec 16, 2016
5b59798
add environment
johnnyborg Dec 16, 2016
dc85696
Typo
johnnyborg Dec 16, 2016
9f64a62
removed dots
Dec 19, 2016
2808fb8
Added memory saving strategy.
Dec 19, 2016
f46a3a1
Fix the number of starting threads.
Dec 19, 2016
db15111
Should output when running in blocked mode.
Dec 20, 2016
df52809
This should keep the group processes running a bit longer.
Dec 20, 2016
bb10dd3
Keep running until the queue is really empty and pass along any outpu…
Dec 21, 2016
d6d5d46
Removed debug statements.
Dec 21, 2016
8a08011
Clear should always be done.
Dec 21, 2016
a1bcfaf
Added comment for -b mode.
Dec 21, 2016
2086cda
[INTEGRATED-946] Removed unused statements and fixed merge issue
Jan 10, 2017
2b5ac2b
[INTEGRATED-946] Restored output
Jan 10, 2017
3c3cead
Merge pull request #21 from integratedfordevelopers/INTEGRATED-946-im…
johnnyborg Jan 11, 2017
bb6c5e1
Upgrade to symfony 2.8
jansanne Jan 17, 2017
8e5db68
long to short array conversion
jansanne Jan 20, 2017
2e780db
Merge pull request #22 from integratedfordevelopers/INTEGRATED-955
joenivl Jan 23, 2017
8750dac
Added process locking.
jansanne Jan 26, 2017
5fc6f6f
Merge pull request #24 from integratedfordevelopers/INTEGRATED-977
pappi1987 Jan 31, 2017
da34125
Allow username, password and scheme for solr
marijnotte Feb 8, 2017
e37e123
Merge pull request #25 from integratedfordevelopers/INTEGRATED-1023
integratedforpublishers Feb 10, 2017
3aa1a39
Upgrade the bundle from psr-0 to psr-4.
jansanne Mar 2, 2017
cefce88
Merge pull request #28 from integratedfordevelopers/INTEGRATED-965
joenivl Mar 2, 2017
ce7384a
Merge branch 'master' into 0.6
marijnotte Mar 7, 2017
aa0dcd7
Merge branch '0.6' into 0.7
marijnotte Mar 7, 2017
97449ee
added travis file
Mar 27, 2017
4c27562
corrected directory
Mar 30, 2017
de9824b
Merge pull request #29 from integratedfordevelopers/INTEGRATED-1025-a…
Apr 10, 2017
eae7f33
Updated symfony and integrated versions.
jansanne Apr 11, 2017
32bb893
Merge pull request #30 from integratedfordevelopers/INTEGRATED-1002
integratedforpublishers Apr 12, 2017
8f60462
Fixed PHP error: Allowed memory size ... exhausted...
psat01 Jul 25, 2017
9d9bb40
Merge pull request #32 from integratedfordevelopers/INTEGRATED-1185-s…
integratedforpublishers Jul 27, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
install:
- curl -sS https://getcomposer.org/installer | php
before_script:
- php composer.phar update -n
script:
- vendor/bin/phpcs . --ignore=*/vendor/* --standard=PSR2 --extensions=php
- vendor/bin/phpunit -c phpunit.xml
36 changes: 13 additions & 23 deletions Command/IndexerQueueCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
use DateTime;
use DateTimeZone;

use Integrated\Common\Content\ContentInterface;
use Integrated\Common\ContentType\ResolverInterface;
use Integrated\Common\Solr\Indexer\Job;
use Integrated\Common\Queue\QueueInterface;

use Integrated\Bundle\ContentBundle\Document\Content\Content;

use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;

use Symfony\Component\Console\Helper\ProgressHelper;
Expand Down Expand Up @@ -175,32 +176,21 @@ protected function executeDelete(InputInterface $input, OutputInterface $output)
*/
protected function executeIndex(InputInterface $input, OutputInterface $output)
{
$result = null;
// Don't hydrate for performance reasons
$builder = $this->getDocumentManager()->createQueryBuilder(Content::class);
$builder->select('id', 'contentType', 'class')->hydrate(false);

if ($input->getOption('full')) {

//use createQueryBuilder for performance reasons
$qb = $this->getDocumentManager()->createQueryBuilder(
'Integrated\Bundle\ContentBundle\Document\Content\Content'
)
->select('id', 'contentType', 'class');
$result = $qb->getQuery()->execute();
$result = $builder->getQuery()->execute();

// The entire site is going to be reindex so everything that is now in the queue
// will be redone so just clear it so content is not double indexed.

$this->getQueue()->clear();
} else {
$criteria['$or'] = [];
$builder->field('contentType')->in($input->getArgument('id'));

foreach ($input->getArgument('id') as $id) {
$criteria['$or'][] = ['contentType' => $id];
}

$result = $this->getDocumentManager()
->getUnitOfWork()
->getDocumentPersister('Integrated\Bundle\ContentBundle\Document\Content\Content')
->loadAll($criteria);
$result = $builder->getQuery()->execute();
}

if ($count = $result->count()) {
Expand Down Expand Up @@ -251,17 +241,17 @@ protected function doIndex(Cursor $cursor, ProgressHelper $progress)
$count = 0;
$manager = $this->getDocumentManager();

/** @var ContentInterface $document */

foreach ($cursor as $document) {
$progress->advance();

$job = new Job('ADD');

$job->setOption('document.id', $document->getContentType() . '-' . $document->getId());
$contentType = isset($document['contentType']) ? $document['contentType'] : '';

$job->setOption('document.id', $contentType . '-' . $document['_id']);

$job->setOption('document.data', $this->getSerializer()->serialize($document, 'json'));
$job->setOption('document.class', get_class($document));
$job->setOption('document.data', json_encode(['id' => $document['_id']]));
$job->setOption('document.class', $document['class']);
$job->setOption('document.format', 'json');

$queue->push($job);
Expand Down
186 changes: 159 additions & 27 deletions Command/IndexerRunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,72 @@

namespace Integrated\Bundle\SolrBundle\Command;

use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Exception;

use Integrated\Bundle\SolrBundle\EventListener\DoctrineClearEventSubscriber;
use Integrated\Bundle\SolrBundle\Process\ArgumentProcess;
use Integrated\Bundle\SolrBundle\Process\ProcessPoolGenerator;

use Integrated\Common\Queue\Provider\DBAL\QueueProvider;
use Integrated\Common\Solr\Indexer\Indexer;

use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\HttpKernel\KernelInterface;
use Symfony\Component\Process\Process;
use Symfony\Component\Filesystem\LockHandler;

use Exception;

use Integrated\Common\Solr\Indexer\IndexerInterface;

/**
* @author Jan Sanne Mulder <[email protected]>
*/
class IndexerRunCommand extends ContainerAwareCommand
class IndexerRunCommand extends Command
{
/**
* @var Indexer
*/
protected $indexer;

/**
* @var QueueProvider
*/
protected $queueProvider;

/**
* @var string
*/
protected $workingDirectory;

/**
* @var DoctrineClearEventSubscriber
*/
protected $clearEventSubscriber;

/**
* @var KernelInterface
*/
protected $kernel;

/**
* @param Indexer $indexer
* @param QueueProvider $queueProvider
* @param KernelInterface $kernel
* @param DoctrineClearEventSubscriber $clearEventSubscriber
* @param string $workingDirectory
*/
public function __construct(Indexer $indexer, QueueProvider $queueProvider, DoctrineClearEventSubscriber $clearEventSubscriber, KernelInterface $kernel, $workingDirectory)
{
parent::__construct();

$this->indexer = $indexer;
$this->queueProvider = $queueProvider;
$this->workingDirectory = $workingDirectory;
$this->clearEventSubscriber = $clearEventSubscriber;
$this->kernel = $kernel;
}

/**
* @see Command
*/
Expand All @@ -48,7 +98,19 @@ protected function configure()
'Time in milliseconds to wait between runs (in combination with --full or --daemon)',
0
)
->setDescription('Execute a sol indexer run')
->addArgument(
'processes',
InputArgument::OPTIONAL,
'Creates a number of proccess that run the queue',
0
)
->addOption(
'blocking',
'b',
InputOption::VALUE_NONE,
'Block the current command until all sub-processes are done'
)
->setDescription('Execute a solr indexer run')
->setHelp('
The <info>%command.name%</info> command starts a indexer run.

Expand All @@ -58,39 +120,44 @@ protected function configure()
}

/**
* @see Command::execute()
* @param InputInterface $input
* @param OutputInterface $output
* @return int
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
if ($input->getOption('full') || $input->getOption('daemon')) {
if ($argument = $input->getArgument('processes')) {
return $this->runProcess(new ArgumentProcess($argument), $input, $output);
} else if ($input->getOption('full') || $input->getOption('daemon')) {
return $this->runExternal($input, $output);
}

return $this->runInternal($input, $output);
return $this->runInternal(self::class, $output);
}

/**
* @param InputInterface $input
* @param string $lock
* @param OutputInterface $output
* @return int
*/
private function runInternal(InputInterface $input, OutputInterface $output)
private function runInternal($lock, OutputInterface $output)
{
try {
$lock = new LockHandler('Integrated\Bundle\SolrBundle\Command\IndexerRunCommand');
$attemps = 0;
$lock = new LockHandler($lock);
$attempts = 0;
while (!$lock->lock()) {
//retry for almost a minute, otherwise don't throw an error (after all another indexer is running)
if ($attemps++ >= 10) {
// Retry for almost a minute, otherwise don't throw an error (after all another indexer is running)
if ($attempts++ >= 10) {
return 0;
}
sleep(5);
}

/** @var IndexerInterface $indexer */
$indexer = $this->getContainer()->get('integrated_solr.indexer');
$indexer->execute();
if ($output->isDebug() && method_exists($this->indexer, 'setDebug')) {
$this->indexer->setDebug();
}

$this->indexer->execute();
} catch (Exception $e) {
$output->writeln("Aborting: " . $e->getMessage());

Expand All @@ -107,24 +174,27 @@ private function runInternal(InputInterface $input, OutputInterface $output)
*/
private function runExternal(InputInterface $input, OutputInterface $output)
{
$wait = (int)$input->getOption('wait');
$wait = (int) $input->getOption('wait');
$wait = $wait * 1000; // convert from milli to micro

while (true) {
// Run a external process
$process = new Process(
'php app/console solr:indexer:run -e ' . $input->getOption('env'),
$this->getRootDir()
sprintf('php app/console solr:indexer:run -e %s', $this->kernel->getEnvironment()),
$this->workingDirectory
);

$process->setTimeout(0);
$process->run();
$process->run(function ($type, $buffer) use ($output) {
$output->write($buffer, false, $type);
});

if (!$process->isSuccessful()) {
break; // terminate when there is a error
}

if (!$input->getOption('daemon')) {
if (!$this->getContainer()->get('integrated_solr.indexer')->getQueue()->count()) {
if (!$this->indexer->getQueue()->count()) {
break;
}
}
Expand All @@ -136,10 +206,72 @@ private function runExternal(InputInterface $input, OutputInterface $output)
}

/**
* @return string
* @param ArgumentProcess $argument
* @param InputInterface $input
* @param OutputInterface $output
* @return int
*/
protected function getRootDir()
private function runProcess(ArgumentProcess $argument, InputInterface $input, OutputInterface $output)
{
return realpath($this->getContainer()->get('kernel')->getRootDir() . '/..');
if ($argument->isParentProcess()) {
// Create pool generator to generate the processes
$generator = new ProcessPoolGenerator($input, $this->kernel);
$pool = $generator->getProcessesPool($argument, $this->workingDirectory);

// Start them accordingly
foreach ($pool as $i => $process) {
// Run it
$process->start();

// Tell somebody
$output->writeln(sprintf('Started process %d with pid %d to run the queue', ($i+1), $process->getPid()));
}

if ($input->getOption('blocking')) {
$output->writeln('Running in blocking mode, waiting until all started processes are done');

// While the pool contains processes we're running
while ($pool->count()) {
foreach ($pool as $i => $process) {
// Read stout for anything to pass thru
if ($processOutput = $process->getIncrementalOutput()) {
$output->writeln(sprintf('Prcocess %d: %s', $i, $processOutput));
}
// Read sterr for anything to pass thru
if ($processOutput = $process->getIncrementalErrorOutput()) {
$output->writeln(sprintf('Prcocess %d: %s', $i, $processOutput));
}

if (!$process->isRunning()) {
// Tell the user
$output->writeln(sprintf('Process %d finished', ($i+1)));

// This one is important
$pool->removeElement($process);
}
}

// Don't create a cpu load, check periodically
sleep(1);
}
}
} else {
// Set the modulo to run over the data set with x processes, creating a unique list per thread
$this->queueProvider->setOption('where', sprintf('(id %% %d) = %d', $argument->getProcessMax(), $argument->getProcessNumber()));

// Add the clear event listener only for the thread
$this->indexer->getEventDispatcher()->addSubscriber($this->clearEventSubscriber);

// Seems to be a sub-process, ran it with a the number appended to the class
while ($this->indexer->getQueue()->count()) {
$this->runInternal(sprintf('%s:%d', self::class, $argument->getProcessNumber()), $output);

// Give them cores some relaxation
usleep(5000);
}
}

// Good to go
return 0;
}
}
Loading