diff --git a/.travis.yml b/.travis.yml index 680e571..c064a3c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,13 +1,20 @@ sudo: required language: php -php: - - 7.2 - - 7.3 - - 7.4 + +jobs: + include: + # Combination PHP <7.4 and maglnet/composer-require-checker doesn't work with Composer 2 + - php: 7.2 + env: COMPOSER_VERSION=1.10.16 + - php: 7.3 + env: COMPOSER_VERSION=1.10.16 + - php: 7.4 + env: COMPOSER_VERSION=--stable env: - - ESB_CONSOLE_PORT=8080 ESB_HTTP_SERVER_PORT=34981 ESB_BEANSTALKD_URL=tcp://127.0.0.1:11300 ES_BASE_URI=http://127.0.0.1:9200 + global: + - ESB_CONSOLE_PORT=8080 ESB_HTTP_SERVER_PORT=34981 ESB_BEANSTALKD_URL=tcp://127.0.0.1:11300 ES_BASE_URI=http://127.0.0.1:9200 cache: directories: @@ -24,6 +31,8 @@ before_install: - echo -e '-Ddiscovery.type=single-node\n-XX:+DisableExplicitGC\n-Djdk.io.permissionsUseCanonicalPath=true\n-Dlog4j.skipJansi=true\n-server\n' | sudo tee -a /etc/elasticsearch/jvm.options - sudo chown -R elasticsearch:elasticsearch /etc/default/elasticsearch - sudo systemctl start elasticsearch + - composer --verbose self-update $COMPOSER_VERSION + - composer --version install: - sudo apt-get update @@ -31,7 +40,7 @@ install: before_script: - composer install - - composer global require maglnet/composer-require-checker && $HOME/.composer/vendor/bin/composer-require-checker --config-file=composer-require-checker.json; + - composer global require maglnet/composer-require-checker && $(composer config home)/vendor/bin/composer-require-checker --config-file=composer-require-checker.json; - vendor/bin/ecs check - vendor/bin/phpstan analyse --no-progress -l max -c phpstan.neon src/ diff --git a/composer.json b/composer.json index f4721fb..8ab1a28 100644 --- a/composer.json +++ b/composer.json @@ -56,8 +56,8 @@ "pda/pheanstalk": "^3.1", "mikey179/vfsstream": "^1.6", "amphp/artax": "^3.0", - "phpstan/phpstan": "^0.12", - "symplify/easy-coding-standard-prefixed": "^8.1" + "phpstan/phpstan": "^0.12 <=0.12.66", + "symplify/easy-coding-standard-prefixed": "^8.1 <8.3" }, "scripts": { "phpcs": "phpcs", diff --git a/src/Exception/ElasticSearch/JobNotFoundException.php b/src/Exception/ElasticSearch/JobNotFoundException.php index f1f16ff..7566146 100644 --- a/src/Exception/ElasticSearch/JobNotFoundException.php +++ b/src/Exception/ElasticSearch/JobNotFoundException.php @@ -8,7 +8,7 @@ class JobNotFoundException extends \RuntimeException { - public function __construct(string $jobUuid, $code = 0, Throwable $previous = null) + public function __construct(string $jobUuid, int $code = 0, Throwable $previous = null) { parent::__construct(sprintf('Job with UUID "%s" has not been found.', $jobUuid), $code, $previous); } diff --git a/src/Service/QueueManager.php b/src/Service/QueueManager.php index 505934e..b90ce29 100644 --- a/src/Service/QueueManager.php +++ b/src/Service/QueueManager.php @@ -223,9 +223,16 @@ private function jobExists(string $jobUuid): Promise private function processBatch(): \Generator { $this->logger->debug('Processing batch'); - yield $this->elasticSearch->bulkIndexJobs($this->batch, $this->flowConfig->getTube()); - foreach ($this->batch as $singleJob) { + // Store batch in a local variable and clear the shared property. Any other Producer instance may update and + // process $this->batch during a yield call within this method (emulating concurrency). This shouldn't cause + // duplicate inserts of the same job. + $batch = $this->batch; + $this->batch = []; + + yield $this->elasticSearch->bulkIndexJobs($batch, $this->flowConfig->getTube()); + + foreach ($batch as $singleJob) { yield $this->beanstalkClient->put( $singleJob->getUuid(), $singleJob->getTimeout(), @@ -233,8 +240,6 @@ private function processBatch(): \Generator $singleJob->getPriority() ); } - - $this->batch = []; } /**