From 3f1f481a6a4deb7a332de14be1c57651e11f4acc Mon Sep 17 00:00:00 2001 From: RJ Garcia Date: Fri, 10 Apr 2020 00:58:28 -0400 Subject: [PATCH] Initial Version - Built out initial worker pool/supervisor, auto scale system, and alerts - Setup repo, tests, docs - Enabled auto scale debouncing Signed-off-by: RJ Garcia --- .gitignore | 4 + Dockerfile | 8 + LICENSE | 20 ++ README.md | 100 ++++++++- composer.json | 47 ++++ docker-compose.yml | 14 ++ phpunit.xml.dist | 28 +++ src/AggregatingReceiverMessageCount.php | 29 +++ src/AutoScale.php | 12 + src/AutoScale/AutoScaleRequest.php | 42 ++++ src/AutoScale/AutoScaleResponse.php | 41 ++++ src/AutoScale/DebouncingAutoScale.php | 48 ++++ src/AutoScale/MinMaxClipAutoScale.php | 21 ++ .../QueueSizeMessageRateAutoScale.php | 13 ++ src/Command/AlertCommand.php | 36 +++ src/Command/ConsumeCommand.php | 28 +++ src/Command/Pool/PauseCommand.php | 24 ++ src/Command/Pool/PoolCommand.php | 33 +++ src/Command/Pool/RestartCommand.php | 24 ++ src/Command/Pool/ResumeCommand.php | 24 ++ src/Command/Pool/StatusCommand.php | 55 +++++ .../BuildSupervisorPoolConfigCompilerPass.php | 79 +++++++ src/Event/PoolBackedUpAlert.php | 28 +++ src/EventLogger.php | 31 +++ src/Internal/Glob.php | 100 +++++++++ src/MessengerAutoScaleBundle.php | 78 +++++++ src/PoolConfig.php | 89 ++++++++ src/PoolControl.php | 12 + src/PoolControl/ActorPoolControl.php | 18 ++ src/PoolControl/InMemoryPoolControl.php | 70 ++++++ .../InMemoryPoolControlFactory.php | 18 ++ src/PoolControl/PsrSimpleCachePoolControl.php | 73 ++++++ .../PsrSimpleCachePoolControlFactory.php | 25 +++ src/PoolControl/WorkerPoolControl.php | 16 ++ src/PoolControlFactory.php | 12 + src/PoolStatus.php | 36 +++ src/ProcessManager.php | 14 ++ src/ProcessManager/MockProcessManager.php | 39 ++++ .../SymfonyMessengerProcessManagerFactory.php | 29 +++ .../SymfonyProcessProcessManager.php | 38 ++++ src/ProcessManagerFactory.php | 8 + src/RaiseAlerts.php | 10 + src/RaiseAlerts/ChainRaiseAlerts.php | 22 ++ src/RaiseAlerts/PoolBackedUpRaiseAlerts.php | 32 +++ src/Resources/config/services.xml | 41 ++++ src/Supervisor.php | 94 ++++++++ src/SupervisorPoolConfig.php | 35 +++ src/WorkerPool.php | 147 +++++++++++++ .../AggregatingReceiverMessageCountTest.php | 18 ++ tests/Feature/ArrayContainer.php | 24 ++ tests/Feature/AutoScaleTest.php | 208 ++++++++++++++++++ tests/Feature/BundleTest.php | 107 +++++++++ tests/Feature/Fixtures/FixtureKernel.php | 37 ++++ .../Fixtures/Message/CatalogMessage.php | 12 + .../Fixtures/Message/HandleCatalogMessage.php | 10 + .../Fixtures/Message/HandleSalesMessage.php | 10 + .../Message/HandleSalesOrderMessage.php | 10 + .../Feature/Fixtures/Message/SalesMessage.php | 12 + .../RequiresSupervisorPoolConfigs.php | 15 ++ tests/Feature/Fixtures/SfRedisMessage.php | 8 + tests/Feature/Fixtures/TestFixtureBundle.php | 25 +++ tests/Feature/Fixtures/auto-scale-config.yaml | 14 ++ tests/Feature/Fixtures/console | 33 +++ tests/Feature/Fixtures/messenger-config.yaml | 13 ++ tests/Feature/Fixtures/services.php | 22 ++ .../PoolControl/InMemoryPoolControlTest.php | 14 ++ .../PsrSimpleCachePoolControlTest.php | 15 ++ tests/Feature/PoolControlTestOutline.php | 191 ++++++++++++++++ .../ProcessManager/Fixtures/run-proc.php | 14 ++ .../SymfonyProcessProcessManagerTest.php | 13 ++ tests/Feature/ProcessManagerTestOutline.php | 47 ++++ tests/Feature/StaticGetMessageCount.php | 32 +++ tests/Feature/WorkerPoolTest.php | 177 +++++++++++++++ tests/Unit/Internal/GlobTest.php | 26 +++ 74 files changed, 2949 insertions(+), 3 deletions(-) create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 LICENSE create mode 100644 composer.json create mode 100644 docker-compose.yml create mode 100644 phpunit.xml.dist create mode 100644 src/AggregatingReceiverMessageCount.php create mode 100644 src/AutoScale.php create mode 100644 src/AutoScale/AutoScaleRequest.php create mode 100644 src/AutoScale/AutoScaleResponse.php create mode 100644 src/AutoScale/DebouncingAutoScale.php create mode 100644 src/AutoScale/MinMaxClipAutoScale.php create mode 100644 src/AutoScale/QueueSizeMessageRateAutoScale.php create mode 100644 src/Command/AlertCommand.php create mode 100644 src/Command/ConsumeCommand.php create mode 100644 src/Command/Pool/PauseCommand.php create mode 100644 src/Command/Pool/PoolCommand.php create mode 100644 src/Command/Pool/RestartCommand.php create mode 100644 src/Command/Pool/ResumeCommand.php create mode 100644 src/Command/Pool/StatusCommand.php create mode 100644 src/DependencyInjection/BuildSupervisorPoolConfigCompilerPass.php create mode 100644 src/Event/PoolBackedUpAlert.php create mode 100644 src/EventLogger.php create mode 100644 src/Internal/Glob.php create mode 100644 src/MessengerAutoScaleBundle.php create mode 100644 src/PoolConfig.php create mode 100644 src/PoolControl.php create mode 100644 src/PoolControl/ActorPoolControl.php create mode 100644 src/PoolControl/InMemoryPoolControl.php create mode 100644 src/PoolControl/InMemoryPoolControlFactory.php create mode 100644 src/PoolControl/PsrSimpleCachePoolControl.php create mode 100644 src/PoolControl/PsrSimpleCachePoolControlFactory.php create mode 100644 src/PoolControl/WorkerPoolControl.php create mode 100644 src/PoolControlFactory.php create mode 100644 src/PoolStatus.php create mode 100644 src/ProcessManager.php create mode 100644 src/ProcessManager/MockProcessManager.php create mode 100644 src/ProcessManager/SymfonyMessengerProcessManagerFactory.php create mode 100644 src/ProcessManager/SymfonyProcessProcessManager.php create mode 100644 src/ProcessManagerFactory.php create mode 100644 src/RaiseAlerts.php create mode 100644 src/RaiseAlerts/ChainRaiseAlerts.php create mode 100644 src/RaiseAlerts/PoolBackedUpRaiseAlerts.php create mode 100644 src/Resources/config/services.xml create mode 100644 src/Supervisor.php create mode 100644 src/SupervisorPoolConfig.php create mode 100644 src/WorkerPool.php create mode 100644 tests/Feature/AggregatingReceiverMessageCountTest.php create mode 100644 tests/Feature/ArrayContainer.php create mode 100644 tests/Feature/AutoScaleTest.php create mode 100644 tests/Feature/BundleTest.php create mode 100644 tests/Feature/Fixtures/FixtureKernel.php create mode 100644 tests/Feature/Fixtures/Message/CatalogMessage.php create mode 100644 tests/Feature/Fixtures/Message/HandleCatalogMessage.php create mode 100644 tests/Feature/Fixtures/Message/HandleSalesMessage.php create mode 100644 tests/Feature/Fixtures/Message/HandleSalesOrderMessage.php create mode 100644 tests/Feature/Fixtures/Message/SalesMessage.php create mode 100644 tests/Feature/Fixtures/RequiresSupervisorPoolConfigs.php create mode 100644 tests/Feature/Fixtures/SfRedisMessage.php create mode 100644 tests/Feature/Fixtures/TestFixtureBundle.php create mode 100644 tests/Feature/Fixtures/auto-scale-config.yaml create mode 100755 tests/Feature/Fixtures/console create mode 100644 tests/Feature/Fixtures/messenger-config.yaml create mode 100644 tests/Feature/Fixtures/services.php create mode 100644 tests/Feature/PoolControl/InMemoryPoolControlTest.php create mode 100644 tests/Feature/PoolControl/PsrSimpleCachePoolControlTest.php create mode 100644 tests/Feature/PoolControlTestOutline.php create mode 100644 tests/Feature/ProcessManager/Fixtures/run-proc.php create mode 100644 tests/Feature/ProcessManager/SymfonyProcessProcessManagerTest.php create mode 100644 tests/Feature/ProcessManagerTestOutline.php create mode 100644 tests/Feature/StaticGetMessageCount.php create mode 100644 tests/Feature/WorkerPoolTest.php create mode 100644 tests/Unit/Internal/GlobTest.php diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7ad4daf --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +/vendor +/composer.lock +/var/ +/tests/Feature/Fixtures/_message-info.txt diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..52b2f1b --- /dev/null +++ b/Dockerfile @@ -0,0 +1,8 @@ +FROM php:7.2-cli + +RUN apt-get update && apt-get install -y git zip + +COPY --from=mlocati/php-extension-installer /usr/bin/install-php-extensions /usr/bin/ +RUN install-php-extensions redis pcntl + +COPY --from=composer:1.9.1 /usr/bin/composer /usr/bin/composer diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..5ec19cb --- /dev/null +++ b/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2020 RJ Garcia + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md index 2e8c8c8..120afee 100644 --- a/README.md +++ b/README.md @@ -21,17 +21,111 @@ return [ ## Usage -### Standalone +After the bundle is loaded, you need to configure worker pools which will manage procs for a set of messenger receivers. -### Within Symfony Framework +```yaml +messenger_auto_scale: + console_path: '%kernel.project_dir%/tests/Feature/Fixtures/console' + pools: + sales: + min_procs: 0 + max_procs: 5 + receivers: "sales*" + heartbeat_interval: 5 + default: + min_procs: 0 + max_procs: 5 + backed_up_alert_threshold: 100 + receivers: "*" + heartbeat_interval: 10 +``` + +Once configured, you can start the consumer with the `krak:auto-scale:consume` command which will start up and manage the worker pools. + +## Matching Receivers + +Each pool config must have a `receivers` property which is a simple Glob that will match any of the current transport names setup in the messenger config. + +It's important to note, that a receiver can ONLY be apart of one pool. So if two pools have receiver patterns that match the same receiver, then the first defined pool would own that receiver. + +## Configuring Heartbeats + +By default, each worker pool will log a heartbeat event every 60 seconds. If you want to change the frequency of that, you use the pool `heartbeat_interval` to define the number of seconds between subsequent heartbeats. + +## Monitoring + +You can access the PoolControl from your own services if you want to build out custom monitoring, or you can just use the `krak:auto-scale:pool:*` commands that are registered. + +## Auto Scaling + +Auto scaling is managed with the AutoScale interface which is responsible for taking the current state of a worker pool captured in the `AutoScaleRequest` and returning the expected num workers for that worker pool captured in `AutoScaleResponse`. + +The default auto scale is setup to work off of the current queue size and the configured message rate and then will clip to the min/max procs configured. There also is some logic included to debounce the auto scaling requests to ensure that the system is judicious about when to create new procs and isn't fluctuating too often. + +Here is some example config and we'll go over some scenarios: ```yaml messenger_auto_scale: pools: + catalog: + max_procs: 5 + message_rate: 100 + scale_up_threshold_seconds: 5 + scale_down_threshold_seconds: 20 + receivers: "catalog" + sales: + min_procs: 5 + message_rate: 10 + scale_up_threshold_seconds: 5 + scale_down_threshold_seconds: 20 + receivers: "sales" +``` + +| Seconds from Start | Catalog Pool Queue Size | Catalog Pool Num Workers | Sales Pool Queue Size | Sales Pool Num Workers | Notes | +| -------------------|-------------------------|--------------------------|-----------------------|------------------------|-------| +| n/a | 0 | 0 | 0 | 0 | Initial State | +| 0 | 0 | 0 | 0 | 5 | First Run, scaled up to 5 because of min procs | +| 2 | 1 | 1 | 60 | 5 | Scale up to 1 on catalog immediately, but wait until scale up threshold for sales | +| 5 | 0 | 1 | 50 | 5 | Wait to scale down on for catalog, reset counter for sales for scale up because now a scale up isn't needed | +| 6 | 0 | 1 | 60 | 5 | Wait to scale up on sales again, timer started, needs 5 seconds before scale up | +| 11 | 0 | 1 | 60 | 6 | Size of queue maintained over 60 for 5 seconds, so now we can scale up. | +| 22 | 0 | 0 | 60 | 6 | Catalog now goes back to zero after waiting 20 seconds since needing to scale down | + +### Defining your own Auto Scale algorithm + +If you want to augment or perform your own auto-scaling algorithm, you can implement the AutoScale interface and then update the `Krak\SymfonyMessengerAutoScale\AutoScale` to point to your new auto scale service. The default service is defined like: + +```php +use Krak\SymfonyMessengerAutoScale\AutoScale; + +$autoScale = new AutoScale\MinMaxClipAutoScale(new AutoScale\DebouncingAutoScale(new AutoScale\QueueSizeMessageRateAutoScale())); ``` -## Dashboards +## Alerts + +The alerting system is designed to be flexible and allow each user define alerts as they see. Alerts are simply just events that get dispatched when a certain metric is reached as determined by the services that implement `RaiseAlerts`. + +To actually trigger the alerts, you need to run the `krak:auto-scale:alert` command which will check the state of the pools and raise alerts. Put this command on a cron at whatever interval you want alerts monitored at. + +### Subscribing to Alerts + +You simply just can create a basic symfony event listener/subscriber for that event and you should be able to perform any action on those events. + +#### PoolBackedUpAlert + +This alert will fire if the there are too many messages for the given queue. To enable this on a pool, you need to define the `backed_up_alert_threshold` config value. + +```yaml +# ... + sales: + backed_up_alert_threshold: 100 +``` + +If there are over 100 messages in the sales pool, then the PoolBackedUpAlert will fire on the next check. + +### Creating Your Own Alerts +To create an alert, you need to subscribe to the RaiseAlerts interface, then register that service, and if you enable auto configuration, it should automatically get tagged with `messenger_auto_scale.raise_alerts`. ## Testing diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..41bb09a --- /dev/null +++ b/composer.json @@ -0,0 +1,47 @@ +{ + "name": "krak/symfony-messenger-auto-scale", + "description": "Symfony Messenger Auto Scaling", + "type": "symfony-bundle", + "authors": [ + { + "name": "RJ Garcia", + "email": "ragboyjr@icloud.com" + } + ], + "license": "MIT", + "require": { + "php": "^7.1", + "ext-pcntl": "*", + "krak/schema": "^0.2.0", + "psr/event-dispatcher": "^1.0", + "symfony/messenger": "^4.4" + }, + "autoload": { + "psr-4": { + "Krak\\SymfonyMessengerAutoScale\\": "src" + } + }, + "autoload-dev": { + "psr-4": { + "Krak\\SymfonyMessengerAutoScale\\Tests\\": "tests" + } + }, + "require-dev": { + "ext-redis": "*", + "krak/symfony-messenger-redis": "^0.1.0", + "nyholm/symfony-bundle-test": "^1.6", + "phpunit/phpunit": "^7.3", + "psr/simple-cache": "^1.0", + "symfony/cache": "^5.0", + "symfony/console": "^4.4", + "symfony/dependency-injection": "^4.1", + "symfony/http-kernel": "^4.1", + "symfony/process": "^5.0", + "symfony/property-access": "^4.1", + "symfony/serializer": "^4.1" + }, + "scripts": { + "test": "phpunit --testdox --colors=always", + "flush-redis": "docker-compose exec -T redis redis-cli flushall" + } +} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..4687c9f --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,14 @@ +version: '3' + +services: + php: + build: . + command: "tail -f /dev/null" + working_dir: /var/www/html + volumes: + - ./:/var/www/html + redis: + image: redis + environment: { TERM: xterm } + ports: ["6379:6379"] + restart: unless-stopped diff --git a/phpunit.xml.dist b/phpunit.xml.dist new file mode 100644 index 0000000..b8954c1 --- /dev/null +++ b/phpunit.xml.dist @@ -0,0 +1,28 @@ + + + + + + + + + + + + tests/Feature + + + tests/Unit + + + + + + ./src/ + + + diff --git a/src/AggregatingReceiverMessageCount.php b/src/AggregatingReceiverMessageCount.php new file mode 100644 index 0000000..9d6195a --- /dev/null +++ b/src/AggregatingReceiverMessageCount.php @@ -0,0 +1,29 @@ +receivers = $receivers; + } + + public static function createFromReceiverIds(array $receiverIds, ContainerInterface $receiversById) { + return new self(...array_map(function(string $receverId) use ($receiversById) { + return $receiversById->get($receverId); + }, $receiverIds)); + } + + public function getMessageCount(): int { + return array_reduce($this->receivers, function(int $sum, ReceiverInterface $receiver) { + return $receiver instanceof MessageCountAwareInterface ? $receiver->getMessageCount() + $sum : $sum; + }, 0); + } +} diff --git a/src/AutoScale.php b/src/AutoScale.php new file mode 100644 index 0000000..275f09c --- /dev/null +++ b/src/AutoScale.php @@ -0,0 +1,12 @@ +state = $state; + $this->timeSinceLastCall = $timeSinceLastCall; + $this->numProcs = $numProcs; + $this->sizeOfQueue = $sizeOfQueue; + $this->poolConfig = $poolConfig; + } + + public function state(): ?array { + return $this->state; + } + + public function timeSinceLastCall(): ?int { + return $this->timeSinceLastCall; + } + + public function numProcs(): int { + return $this->numProcs; + } + + public function sizeOfQueue(): int { + return $this->sizeOfQueue; + } + + public function poolConfig(): PoolConfig { + return $this->poolConfig; + } +} diff --git a/src/AutoScale/AutoScaleResponse.php b/src/AutoScale/AutoScaleResponse.php new file mode 100644 index 0000000..d9f75fc --- /dev/null +++ b/src/AutoScale/AutoScaleResponse.php @@ -0,0 +1,41 @@ +state = $state; + $this->expectedNumProcs = $expectedNumProcs; + } + + public function state(): ?array { + return $this->state; + } + + public function withState(?array $state): self { + $self = clone $this; + $self->state = $state; + return $self; + } + + public function withAddedState(array $stateToMerge): self { + return $this->withState(array_merge($this->state ?? [], $stateToMerge)); + } + + public function expectedNumProcs(): int { + return $this->expectedNumProcs; + } + + public function withExpectedNumProcs(int $expectedNumProcs): self { + $self = clone $this; + $self->expectedNumProcs = $expectedNumProcs; + return $self; + } +} diff --git a/src/AutoScale/DebouncingAutoScale.php b/src/AutoScale/DebouncingAutoScale.php new file mode 100644 index 0000000..b3e0531 --- /dev/null +++ b/src/AutoScale/DebouncingAutoScale.php @@ -0,0 +1,48 @@ +autoScale = $autoScale; + } + + public function __invoke(AutoScaleRequest $req): AutoScaleResponse { + $resp = ($this->autoScale)($req); + if ($req->timeSinceLastCall() === null || $resp->expectedNumProcs() === $req->numProcs() || $req->numProcs() === 0) { + return $this->respWithDebounceSinceNeededScale($resp, null, null); + } + + $scaleDirection = $resp->expectedNumProcs() > $req->numProcs() ? self::SCALE_UP : self::SCALE_DOWN; + + // number of seconds for a scale event to be active before allowing scale event + $scaleThreshold = $scaleDirection === self::SCALE_UP + ? $req->poolConfig()->scaleUpThresholdSeconds() + : $req->poolConfig()->scaleDownThresholdSeconds(); + + [$timeSinceNeededScale, $scaleDirectionSinceNeededScale] = $req->state()['debounce_since_needed_scale'] ?? [null, null]; + + $debouncedResp = $resp->withExpectedNumProcs($req->numProcs()); + if ($timeSinceNeededScale === null || $scaleDirection !== $scaleDirectionSinceNeededScale) { + return $this->respWithDebounceSinceNeededScale($debouncedResp, 0, $scaleDirection); + } + $updatedTimeSinceNeededScale = $timeSinceNeededScale + $req->timeSinceLastCall(); + if ($updatedTimeSinceNeededScale < $scaleThreshold) { + return $this->respWithDebounceSinceNeededScale($debouncedResp, $updatedTimeSinceNeededScale, $scaleDirection); + } + + return $this->respWithDebounceSinceNeededScale($resp, null, null); + } + + private function respWithDebounceSinceNeededScale(AutoScaleResponse $resp, ?int $timeSinceNeededScale, ?string $scaleDirection): AutoScaleResponse { + return $resp->withAddedState(['debounce_since_needed_scale' => [$timeSinceNeededScale, $scaleDirection]]); + } +} diff --git a/src/AutoScale/MinMaxClipAutoScale.php b/src/AutoScale/MinMaxClipAutoScale.php new file mode 100644 index 0000000..a3fd237 --- /dev/null +++ b/src/AutoScale/MinMaxClipAutoScale.php @@ -0,0 +1,21 @@ +autoScale = $autoScale; + } + + public function __invoke(AutoScaleRequest $req): AutoScaleResponse { + $resp = ($this->autoScale)($req); + $poolConfig = $req->poolConfig(); + $expectedNumProcs = min($poolConfig->maxProcs(), max($resp->expectedNumProcs(), $poolConfig->minProcs())); + return $resp->withExpectedNumProcs($expectedNumProcs); + } +} diff --git a/src/AutoScale/QueueSizeMessageRateAutoScale.php b/src/AutoScale/QueueSizeMessageRateAutoScale.php new file mode 100644 index 0000000..788a8d3 --- /dev/null +++ b/src/AutoScale/QueueSizeMessageRateAutoScale.php @@ -0,0 +1,13 @@ +sizeOfQueue() / $req->poolConfig()->messageRate()); + return new AutoScaleResponse($req->state(), $expectedNumProcs); + } +} diff --git a/src/Command/AlertCommand.php b/src/Command/AlertCommand.php new file mode 100644 index 0000000..d719a9a --- /dev/null +++ b/src/Command/AlertCommand.php @@ -0,0 +1,36 @@ +raiseAlerts = $raiseAlerts; + $this->eventDispatcher = $eventDispatcher; + $this->supervisorPoolConfigs = $supervisorPoolConfigs; + } + + protected function configure() { + $this->setName('krak:auto-scale:alert') + ->setDescription('Raise any of the configured alerts.'); + } + + protected function execute(InputInterface $input, OutputInterface $output) { + foreach ($this->supervisorPoolConfigs as $poolConfig) { + foreach (($this->raiseAlerts)($poolConfig) as $event) { + $this->eventDispatcher->dispatch($event); + } + } + } +} diff --git a/src/Command/ConsumeCommand.php b/src/Command/ConsumeCommand.php new file mode 100644 index 0000000..ad903f6 --- /dev/null +++ b/src/Command/ConsumeCommand.php @@ -0,0 +1,28 @@ +supervisor = $supervisor; + } + + protected function configure() { + $this->setName('krak:auto-scale:consume') + ->setDescription('Start the supervisor to manage the worker consumers.'); + } + + protected function execute(InputInterface $input, OutputInterface $output) { + $output->writeln('Starting Supervisor.'); + $this->supervisor->run(); + } +} diff --git a/src/Command/Pool/PauseCommand.php b/src/Command/Pool/PauseCommand.php new file mode 100644 index 0000000..87989f9 --- /dev/null +++ b/src/Command/Pool/PauseCommand.php @@ -0,0 +1,24 @@ +setName('krak:auto-scale:pool:pause') + ->setDescription('Request a pause for the selected pools') + ->addPoolArgument('The names of the pools to perform a pause'); + } + + protected function execute(InputInterface $input, OutputInterface $output) { + $poolNames = $this->getPoolNames($input); + foreach ($poolNames as $poolName) { + $control = $this->poolControlFactory->createForActor($poolName); + $output->writeln("Pausing Pool: {$poolName}"); + $control->pause(); + } + } +} diff --git a/src/Command/Pool/PoolCommand.php b/src/Command/Pool/PoolCommand.php new file mode 100644 index 0000000..ae30b4e --- /dev/null +++ b/src/Command/Pool/PoolCommand.php @@ -0,0 +1,33 @@ +poolControlFactory = $poolControlFactory; + $this->poolNames = array_map(function(SupervisorPoolConfig $config) { + return $config->name(); + }, $supervisorPoolConfigs); + + parent::__construct(); + } + + protected function getPoolNames(InputInterface $input) { + return $input->getArgument('names') ?: $this->poolNames; + } + + /** @return $this */ + protected function addPoolArgument(string $description) { + return $this->addArgument('names', InputArgument::OPTIONAL | InputArgument::IS_ARRAY, $description . ', available options: ' . implode(', ', $this->poolNames)); + } +} diff --git a/src/Command/Pool/RestartCommand.php b/src/Command/Pool/RestartCommand.php new file mode 100644 index 0000000..8a89615 --- /dev/null +++ b/src/Command/Pool/RestartCommand.php @@ -0,0 +1,24 @@ +setName('krak:auto-scale:pool:restart') + ->setDescription('Request a restart for the selected pools') + ->addPoolArgument('The names of the pools to perform a restart'); + } + + protected function execute(InputInterface $input, OutputInterface $output) { + $poolNames = $this->getPoolNames($input); + foreach ($poolNames as $poolName) { + $control = $this->poolControlFactory->createForActor($poolName); + $output->writeln("Restarting Pool: {$poolName}"); + $control->restart(); + } + } +} diff --git a/src/Command/Pool/ResumeCommand.php b/src/Command/Pool/ResumeCommand.php new file mode 100644 index 0000000..ee27b3c --- /dev/null +++ b/src/Command/Pool/ResumeCommand.php @@ -0,0 +1,24 @@ +setName('krak:auto-scale:pool:resume') + ->setDescription('Request a resume for the selected pools') + ->addPoolArgument('The names of the pools to perform a resume'); + } + + protected function execute(InputInterface $input, OutputInterface $output) { + $poolNames = $this->getPoolNames($input); + foreach ($poolNames as $poolName) { + $control = $this->poolControlFactory->createForActor($poolName); + $output->writeln("Resuming Pool: {$poolName}"); + $control->resume(); + } + } +} diff --git a/src/Command/Pool/StatusCommand.php b/src/Command/Pool/StatusCommand.php new file mode 100644 index 0000000..f2ca195 --- /dev/null +++ b/src/Command/Pool/StatusCommand.php @@ -0,0 +1,55 @@ +setName('krak:auto-scale:pool:status') + ->setDescription('Show the status of the selected pool (or all if no pool name is given).') + ->addPoolArgument('The names of the pools to display the status') + ->addOption('poll', 'p', InputOption::VALUE_NONE, 'Poll the pool control indefinitely at an interval') + ->addOption('poll-interval', 'i', InputOption::VALUE_REQUIRED, 'The interval to poll at, defaults to 5 seconds', 5); + } + + protected function execute(InputInterface $input, OutputInterface $output) { + $poolNames = $this->getPoolNames($input); + $shouldPoll = $input->getOption('poll'); + $pollInterval = $input->getOption('poll-interval'); + + if (!$shouldPoll) { + $this->printPools($output, $poolNames); + return; + } + + while (true) { + $this->printPools($output, $poolNames); + sleep($pollInterval); + } + } + + private function printPools(OutputInterface $output, array $poolNames) { + $table = new Table($output); + $table->setHeaders(['Pool', 'Size of Queues', 'Num Workers', 'Status', 'Should Stop', 'Config']); + foreach ($poolNames as $poolName) { + $poolControl = $this->poolControlFactory->createForActor($poolName); + $table->addRow([$poolName, $poolControl->getSizeOfQueues(), $poolControl->getNumWorkers(), (string) $poolControl->getStatus(), $poolControl->shouldStop(), $this->encodePoolConfig($poolControl->getPoolConfig())]); + } + $table->render(); + } + + private function encodePoolConfig(?PoolConfig $poolConfig): string { + if (\function_exists('json_encode')) { + return \json_encode($poolConfig); + } + + return print_r($poolConfig->jsonSerialize(), true); + } +} diff --git a/src/DependencyInjection/BuildSupervisorPoolConfigCompilerPass.php b/src/DependencyInjection/BuildSupervisorPoolConfigCompilerPass.php new file mode 100644 index 0000000..4d1d558 --- /dev/null +++ b/src/DependencyInjection/BuildSupervisorPoolConfigCompilerPass.php @@ -0,0 +1,79 @@ +findReceiverNames($container); + + $this->registerSupervisorPoolConfigsToTaggedServices($container, $receiverNames); + $container->setParameter('messenger_auto_scale.receiver_names', $receiverNames); + } + + /** + * Any service that uses the SUPERVISOR_POOL_CONFIGS tag will be automatically injected with a configured array of + * SupervisorPoolConfig objects. + * We need to use a service factory to support this because you can't have objects as parameters in SF DI. + */ + private function registerSupervisorPoolConfigsToTaggedServices(ContainerBuilder $container, array $receiverNames): void { + $rawPoolConfig = $container->getParameter('krak.messenger_auto_scale.config.pools'); + $supervisorPoolConfigs = $this->buildSupervisorPoolConfigs($rawPoolConfig, $receiverNames); + $container->findDefinition('krak.messenger_auto_scale.supervisor_pool_configs')->addArgument(iterator_to_array($supervisorPoolConfigs)); + } + + /** @return SupervisorPoolConfig[] */ + private function buildSupervisorPoolConfigs(array $rawPoolConfig, array $receiverNames): iterable { + foreach ($rawPoolConfig['pools'] as $poolName => $rawPool) { + if (!count($receiverNames)) { + throw new \LogicException('No receivers/transports are left to match pool config - ' . $poolName); + } + + [$matchedReceiverNames, $receiverNames] = $this->matchReceiverNameFromRawPool($rawPool, $receiverNames); + yield ['name' => $poolName, 'poolConfig' => $rawPool, 'receiverIds' => $matchedReceiverNames]; + } + } + + /** return the matched receiver names and unmatched recevier names as a two tuple. */ + private function matchReceiverNameFromRawPool(array $rawPool, array $receiverNames): array { + $matched = []; + $unmatched = []; + $glob = new Glob($rawPool['receivers']); + foreach ($receiverNames as $receiverName) { + if ($glob->matches($receiverName)) { + $matched[] = $receiverName; + } else { + $unmatched[] = $receiverName; + } + } + + return [$matched, $unmatched]; + } + + private function findReceiverNames(ContainerBuilder $container): array { + $receiverMapping = []; + foreach ($container->findTaggedServiceIds('messenger.receiver') as $id => $tags) { + foreach ($tags as $tag) { + if (isset($tag['alias'])) { + $receiverMapping[$tag['alias']] = null; + } + } + } + + return \array_unique(\array_keys($receiverMapping)); + } + + public static function createSupervisorPoolConfigsFromArray(array $poolConfigs): array { + return \array_map(function(array $pool) { + return new SupervisorPoolConfig($pool['name'], PoolConfig::createFromOptionsArray($pool['poolConfig']), $pool['receiverIds']); + }, $poolConfigs); + } +} diff --git a/src/Event/PoolBackedUpAlert.php b/src/Event/PoolBackedUpAlert.php new file mode 100644 index 0000000..462fc23 --- /dev/null +++ b/src/Event/PoolBackedUpAlert.php @@ -0,0 +1,28 @@ +poolName = $poolName; + $this->thresholdLimit = $thresholdLimit; + $this->currentNumberOfMessages = $currentNumberOfMessages; + } + + public function poolName(): string { + return $this->poolName; + } + + public function thresholdLimit(): int { + return $this->thresholdLimit; + } + + public function currentNumberOfMessages(): int { + return $this->currentNumberOfMessages; + } +} diff --git a/src/EventLogger.php b/src/EventLogger.php new file mode 100644 index 0000000..269fb84 --- /dev/null +++ b/src/EventLogger.php @@ -0,0 +1,31 @@ +logger = $logger; + } + + public function logEvent(string $message, string $event, array $context = []): void { + $this->log(LogLevel::INFO, $message, [ + 'messenger_auto_scale_event' => $event, + 'messenger_auto_scale_event_' . $event . '_context' => $context, + ]); + } + + public function log($level, $message, array $context = array()) { + $this->logger->log($level, $message, $context); + } +} diff --git a/src/Internal/Glob.php b/src/Internal/Glob.php new file mode 100644 index 0000000..283c1b9 --- /dev/null +++ b/src/Internal/Glob.php @@ -0,0 +1,100 @@ +glob = $glob; + $this->strictLeadingDot = $strictLeadingDot; + $this->strictWildcardSlash = $strictWildcardSlash; + $this->delimiter = $delimiter; + } + + public function matches(string $stringToMatch): bool { + $regex = $this->cachedRegex + ? $this->cachedRegex + : $this->cachedRegex = self::patternToRegex($this->glob, $this->strictLeadingDot, $this->strictWildcardSlash, $this->delimiter); + + return preg_match($regex, $stringToMatch) === 1; + } + + private static function patternToRegex(string $glob, bool $strictLeadingDot = true, bool $strictWildcardSlash = true, string $delimiter = '#'): string { + $firstByte = true; + $escaping = false; + $inCurlies = 0; + $regex = ''; + $sizeGlob = \strlen($glob); + for ($i = 0; $i < $sizeGlob; ++$i) { + $car = $glob[$i]; + if ($firstByte && $strictLeadingDot && '.' !== $car) { + $regex .= '(?=[^\.])'; + } + + $firstByte = '/' === $car; + + if ($firstByte && $strictWildcardSlash && isset($glob[$i + 2]) && '**' === $glob[$i + 1].$glob[$i + 2] && (!isset($glob[$i + 3]) || '/' === $glob[$i + 3])) { + $car = '[^/]++/'; + if (!isset($glob[$i + 3])) { + $car .= '?'; + } + + if ($strictLeadingDot) { + $car = '(?=[^\.])'.$car; + } + + $car = '/(?:'.$car.')*'; + $i += 2 + isset($glob[$i + 3]); + + if ('/' === $delimiter) { + $car = str_replace('/', '\\/', $car); + } + } + + if ($delimiter === $car || '.' === $car || '(' === $car || ')' === $car || '|' === $car || '+' === $car || '^' === $car || '$' === $car) { + $regex .= "\\$car"; + } elseif ('*' === $car) { + $regex .= $escaping ? '\\*' : ($strictWildcardSlash ? '[^/]*' : '.*'); + } elseif ('?' === $car) { + $regex .= $escaping ? '\\?' : ($strictWildcardSlash ? '[^/]' : '.'); + } elseif ('{' === $car) { + $regex .= $escaping ? '\\{' : '('; + if (!$escaping) { + ++$inCurlies; + } + } elseif ('}' === $car && $inCurlies) { + $regex .= $escaping ? '}' : ')'; + if (!$escaping) { + --$inCurlies; + } + } elseif (',' === $car && $inCurlies) { + $regex .= $escaping ? ',' : '|'; + } elseif ('\\' === $car) { + if ($escaping) { + $regex .= '\\\\'; + $escaping = false; + } else { + $escaping = true; + } + + continue; + } else { + $regex .= $car; + } + $escaping = false; + } + + return $delimiter.'^'.$regex.'$'.$delimiter; + } +} diff --git a/src/MessengerAutoScaleBundle.php b/src/MessengerAutoScaleBundle.php new file mode 100644 index 0000000..13f74fd --- /dev/null +++ b/src/MessengerAutoScaleBundle.php @@ -0,0 +1,78 @@ +addCompilerPass(new DependencyInjection\BuildSupervisorPoolConfigCompilerPass()); + $container->registerForAutoconfiguration(RaiseAlerts::class)->addTag(self::TAG_RAISE_ALERTS); + } + + public function getContainerExtension(): ExtensionInterface { + return new class() extends Extension { + public function getAlias(): string { + return 'messenger_auto_scale'; + } + + /** @param mixed[] $configs */ + public function load(array $configs, ContainerBuilder $container): void { + $processedConfig = $this->createProcessedConfiguration($configs); + $this->loadServices($container); + + // processed pool config to be accessible as a parameter. + $container->setParameter('krak.messenger_auto_scale.config.pools', $processedConfig); + $container->findDefinition(ProcessManager\SymfonyMessengerProcessManagerFactory::class) + ->addArgument($processedConfig['console_path']); + } + + private function createProcessedConfiguration(array $configs): array { + return $this->processConfiguration(new class() implements ConfigurationInterface { + public function getConfigTreeBuilder() { + return configTree('messenger_auto_scale', struct([ + 'console_path' => string(['configure' => function(ScalarNodeDefinition $def) { + $def->defaultValue('%kernel.project_dir%/bin/console'); + }]), + 'pools' => dict(struct([ + 'min_procs' => int(), + 'max_procs' => int(), + 'message_rate' => int(), + 'scale_up_threshold_seconds' => int(), + 'scale_down_threshold_seconds' => int(), + 'worker_command' => string(), + 'worker_command_options' => listOf(string()), + 'backed_up_alert_threshold' => int(), + 'receivers' => string(), + ], ['allowExtraKeys' => true])) + ])); + } + }, $configs); + } + + private function loadServices(ContainerBuilder $container): void { + $loader = new XmlFileLoader($container, new FileLocator(__DIR__ . '/Resources/config')); + $loader->load('services.xml'); + } + }; + } +} diff --git a/src/PoolConfig.php b/src/PoolConfig.php new file mode 100644 index 0000000..473163d --- /dev/null +++ b/src/PoolConfig.php @@ -0,0 +1,89 @@ +minProcs = $minProcs; + $this->maxProcs = $maxProcs; + $this->messageRate = $messageRate; + $this->scaleUpThresholdSeconds = $scaleUpThresholdSeconds; + $this->scaleDownThresholdSeconds = $scaleDownThresholdSeconds; + $this->attributes = $attributes; + } + + public static function createFromOptionsArray(array $poolConfig): self { + return new self( + $poolConfig['min_procs'] ?? null, + $poolConfig['max_procs'] ?? null, + $poolConfig['message_rate'] ?? 100, + $poolConfig['scale_up_threshold_seconds'] ?? 5, + $poolConfig['scale_down_threshold_seconds'] ?? 60, + $poolConfig + ); + } + + public function minProcs(): ?int { + return $this->minProcs; + } + + public function maxProcs(): ?int { + return $this->maxProcs; + } + + public function withMinMax(?int $minProcs, ?int $maxProcs): self { + $self = clone $this; + $self->minProcs = $minProcs; + $self->maxProcs = $maxProcs; + return $self; + } + + public function messageRate(): int { + return $this->messageRate; + } + + public function withMessageRate(int $messageRate): self { + $self = clone $this; + $self->messageRate = $messageRate; + return $self; + } + + public function scaleUpThresholdSeconds(): int { + return $this->scaleUpThresholdSeconds; + } + + public function withScaleUpThresholdSeconds(int $scaleUpThresholdSeconds): self { + $self = clone $this; + $self->scaleUpThresholdSeconds = $scaleUpThresholdSeconds; + return $self; + } + + public function scaleDownThresholdSeconds(): int { + return $this->scaleDownThresholdSeconds; + } + + public function withScaleDownThresholdSeconds( $scaleDownThresholdSeconds): self { + $self = clone $this; + $self->scaleDownThresholdSeconds = $scaleDownThresholdSeconds; + return $self; + } + + public function attributes(): array { + return $this->attributes; + } + + public function jsonSerialize() { + return get_object_vars($this); + } +} diff --git a/src/PoolControl.php b/src/PoolControl.php new file mode 100644 index 0000000..12d2e35 --- /dev/null +++ b/src/PoolControl.php @@ -0,0 +1,12 @@ +numWorkers = 0; + $this->poolConfig = $poolConfig; + $this->status = PoolStatus::stopped(); + $this->shouldStop = false; + $this->shouldPause = false; + } + + public function getStatus(): PoolStatus { + return $this->status; + } + + public function getNumWorkers(): int { + return $this->numWorkers; + } + + public function getSizeOfQueues(): ?int { + return $this->sizeOfQueues; + } + + public function getPoolConfig(): ?PoolConfig { + return $this->poolConfig; + } + + public function shouldStop(): bool { + return $this->shouldStop || $this->shouldPause; + } + + public function scaleWorkers(int $numWorkers): void { + $this->numWorkers = $numWorkers; + } + + public function updatePoolConfig(?PoolConfig $poolConfig): void { + $this->poolConfig = $poolConfig; + } + + public function restart(): void { + $this->shouldStop = true; + } + + public function pause(): void { + $this->shouldPause = true; + } + + public function resume(): void { + $this->shouldPause = false; + } + + public function updateStatus(PoolStatus $poolStatus, ?int $sizeOfQueues = null): void { + $this->status = $poolStatus; + $this->shouldStop = false; + $this->sizeOfQueues = $sizeOfQueues; + } +} diff --git a/src/PoolControl/InMemoryPoolControlFactory.php b/src/PoolControl/InMemoryPoolControlFactory.php new file mode 100644 index 0000000..7f0dd96 --- /dev/null +++ b/src/PoolControl/InMemoryPoolControlFactory.php @@ -0,0 +1,18 @@ +poolControl ? $this->poolControl : $this->poolControl = new InMemoryPoolControl(); + } + + public function createForActor(string $poolName): ActorPoolControl { + return $this->poolControl ? $this->poolControl : $this->poolControl = new InMemoryPoolControl(); + } +} diff --git a/src/PoolControl/PsrSimpleCachePoolControl.php b/src/PoolControl/PsrSimpleCachePoolControl.php new file mode 100644 index 0000000..cbd6d26 --- /dev/null +++ b/src/PoolControl/PsrSimpleCachePoolControl.php @@ -0,0 +1,73 @@ +cache = $cache; + $this->keyScope = $keyScope; + } + + private function key(string $key): string { + return 'messenger_auto_scale_consumer_control_' . $this->keyScope . '_' . $key; + } + + public function getStatus(): PoolStatus { + return $this->cache->get($this->key('status'), PoolStatus::stopped()); + } + + public function getNumWorkers(): int { + return $this->cache->get($this->key('num_workers'), 0); + } + + public function getSizeOfQueues(): ?int { + return $this->cache->get($this->key('size_of_queues'), null); + } + + public function getPoolConfig(): ?PoolConfig { + return $this->cache->get($this->key('pool_config'), null); + } + + public function shouldStop(): bool { + return + $this->cache->get($this->key('should_stop'), false) + || $this->cache->get($this->key('should_pause'), false); + } + + public function scaleWorkers(int $numWorkers): void { + $this->cache->set($this->key('num_workers'), $numWorkers); + } + + public function updatePoolConfig(?PoolConfig $poolConfig): void { + $this->cache->set($this->key('pool_config'), $poolConfig); + } + + public function restart(): void { + $this->cache->set($this->key('should_stop'), true); + } + + public function pause(): void { + $this->cache->set($this->key('should_pause'), true); + } + + public function resume(): void { + $this->cache->set($this->key('should_pause'), false); + } + + public function updateStatus(PoolStatus $poolStatus, ?int $sizeOfQueues = null): void { + $this->cache->setMultiple([ + $this->key('status') => $poolStatus, + $this->key('should_stop') => false, + $this->key('size_of_queues') => $sizeOfQueues, + ]); + } +} diff --git a/src/PoolControl/PsrSimpleCachePoolControlFactory.php b/src/PoolControl/PsrSimpleCachePoolControlFactory.php new file mode 100644 index 0000000..6d49d8e --- /dev/null +++ b/src/PoolControl/PsrSimpleCachePoolControlFactory.php @@ -0,0 +1,25 @@ +cache = $cache; + $this->additionalKeyPrefix = $additionalKeyPrefix; + } + + public function createForWorker(string $poolName): WorkerPoolControl { + return new PsrSimpleCachePoolControl($this->cache, $this->additionalKeyPrefix . $poolName); + } + + public function createForActor(string $poolName): ActorPoolControl { + return new PsrSimpleCachePoolControl($this->cache, $this->additionalKeyPrefix . $poolName); + } +} diff --git a/src/PoolControl/WorkerPoolControl.php b/src/PoolControl/WorkerPoolControl.php new file mode 100644 index 0000000..00bf192 --- /dev/null +++ b/src/PoolControl/WorkerPoolControl.php @@ -0,0 +1,16 @@ +value = $value; + } + + public static function running(): self { + return new self('running'); + } + + public static function stopped(): self { + return new self('stopped'); + } + + public static function paused(): self { + return new self('paused'); + } + + public static function stopping(): self { + return new self('stopping'); + } + + public function value(): string { + return $this->value; + } + + public function __toString() { + return $this->value(); + } +} diff --git a/src/ProcessManager.php b/src/ProcessManager.php new file mode 100644 index 0000000..458b062 --- /dev/null +++ b/src/ProcessManager.php @@ -0,0 +1,14 @@ +idCounter; + $this->idCounter += 1; + $proc = ['isRunning' => true, 'id' => $procId]; + $this->procsById[$procId] = $proc; + return $procId; + } + + public function killProcess($processRef) { + unset($this->procsById[$processRef]); + } + + public function isProcessRunning($processRef): bool { + return $this->procsById[$processRef]['isRunning']; + } + + public function getPid($processRef): ?int { + return $processRef; + } + + public function stopProcess(int $processId): void { + $this->procsById[$processId]['isRunning'] = false; + } + + public function getProcs(): array { + return array_values($this->procsById); + } +} diff --git a/src/ProcessManager/SymfonyMessengerProcessManagerFactory.php b/src/ProcessManager/SymfonyMessengerProcessManagerFactory.php new file mode 100644 index 0000000..9dea7d0 --- /dev/null +++ b/src/ProcessManager/SymfonyMessengerProcessManagerFactory.php @@ -0,0 +1,29 @@ +pathToConsole = $pathToConsole; + $this->command = $command; + $this->defaultOpts = $defaultOpts; + } + + public function createFromSupervisorPoolConfig(SupervisorPoolConfig $config): ProcessManager { + $command = $config->poolConfig()->attributes()['worker_command'] ?? $this->command; + $options = $config->poolConfig()->attributes()['worker_command_options'] ?? $this->defaultOpts; + return new SymfonyProcessProcessManager(array_merge([$this->pathToConsole, $command], $options, $config->receiverIds())); + } +} diff --git a/src/ProcessManager/SymfonyProcessProcessManager.php b/src/ProcessManager/SymfonyProcessProcessManager.php new file mode 100644 index 0000000..243913c --- /dev/null +++ b/src/ProcessManager/SymfonyProcessProcessManager.php @@ -0,0 +1,38 @@ +cmd = $cmd; + } + + public function createProcess() { + $proc = new Process($this->cmd); + $proc->setTimeout(null) + ->disableOutput() + ->start(); + return $proc; + } + + public function killProcess($processRef) { + /** @var Process $processRef */ + $processRef->stop(); + } + + public function isProcessRunning($processRef): bool { + /** @var Process $processRef */ + return $processRef->isRunning(); + } + + public function getPid($processRef): ?int { + /** @var Process $processRef */ + return $processRef->getPid(); + } +} diff --git a/src/ProcessManagerFactory.php b/src/ProcessManagerFactory.php new file mode 100644 index 0000000..e739221 --- /dev/null +++ b/src/ProcessManagerFactory.php @@ -0,0 +1,8 @@ +raiseAlerts = $raiseAlerts; + } + + public function __invoke(SupervisorPoolConfig $poolConfig): iterable { + foreach ($this->raiseAlerts as $raiseAlert) { + yield from $raiseAlert($poolConfig); + } + } +} diff --git a/src/RaiseAlerts/PoolBackedUpRaiseAlerts.php b/src/RaiseAlerts/PoolBackedUpRaiseAlerts.php new file mode 100644 index 0000000..24c0be1 --- /dev/null +++ b/src/RaiseAlerts/PoolBackedUpRaiseAlerts.php @@ -0,0 +1,32 @@ +receiversById = $receiversById; + } + + public function __invoke(SupervisorPoolConfig $poolConfig): iterable { + $backedUpAlertThreshold = $poolConfig->poolConfig()->attributes()['backed_up_alert_threshold'] ?? null; + if (!$backedUpAlertThreshold) { + return []; + } + + $getMessageCount = AggregatingReceiverMessageCount::createFromReceiverIds($poolConfig->receiverIds(), $this->receiversById); + $total = $getMessageCount->getMessageCount(); + + return $total > intval($backedUpAlertThreshold) + ? [new PoolBackedUpAlert($poolConfig->name(), $backedUpAlertThreshold, $total)] + : []; + } +} diff --git a/src/Resources/config/services.xml b/src/Resources/config/services.xml new file mode 100644 index 0000000..3d8fee8 --- /dev/null +++ b/src/Resources/config/services.xml @@ -0,0 +1,41 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/Supervisor.php b/src/Supervisor.php new file mode 100644 index 0000000..2607e2c --- /dev/null +++ b/src/Supervisor.php @@ -0,0 +1,94 @@ +processManagerFactory = $processManagerFactory; + $this->poolControlFactory = $poolControlFactory; + $this->receiversById = $receiversById; + $this->supervisorPoolConfigs = $this->assertUniquePoolNames($supervisorPoolConfigs); + $this->autoScale = $autoScale ?: self::defaultAutoScale(); + $this->logger = new EventLogger($logger ?: new NullLogger()); + } + + public static function defaultAutoScale(): AutoScale { + return new AutoScale\MinMaxClipAutoScale(new AutoScale\DebouncingAutoScale(new AutoScale\QueueSizeMessageRateAutoScale())); + } + + public function run(): void { + $this->registerPcntlSignalHandlers(); + + $workerPools = $this->createWorkersFromPoolConfigs($this->supervisorPoolConfigs); + $timeSinceLastCall = null; + while (!$this->shouldShutdown) { + foreach ($workerPools as $pool) { + $pool->manage($timeSinceLastCall); + } + sleep(self::SLEEP_TIME); + $timeSinceLastCall = self::SLEEP_TIME; + } + + foreach ($workerPools as $pool) { + $pool->stop(); + } + } + + /** + * @param SupervisorPoolConfig[] $supervisorPoolConfigs + * @return SupervisorPoolConfig[] + */ + private function assertUniquePoolNames(array $supervisorPoolConfigs): array { + $poolNames = array_map(function(SupervisorPoolConfig $config) { + return $config->name(); + }, $supervisorPoolConfigs); + + if (\count($poolNames) === count(\array_unique($poolNames))) { + return $supervisorPoolConfigs; + } + + throw new \RuntimeException('The pool names must be unique across all pool configurations.'); + } + + /** @return WorkerPool[] */ + private function createWorkersFromPoolConfigs(array $supervisorPoolConfigs): array { + return array_map(function(SupervisorPoolConfig $config) { + return new WorkerPool( + $config->name(), + AggregatingReceiverMessageCount::createFromReceiverIds($config->receiverIds(), $this->receiversById), + $this->poolControlFactory->createForWorker($config->name()), + $this->processManagerFactory->createFromSupervisorPoolConfig($config), + $this->autoScale, + $this->logger, + $config->poolConfig() + ); + }, $supervisorPoolConfigs); + } + + private function registerPcntlSignalHandlers(): void { + pcntl_async_signals(true); + foreach ([SIGTERM, SIGINT] as $signal) { + pcntl_signal($signal, function() { + $this->shouldShutdown = true; + }); + } + } +} diff --git a/src/SupervisorPoolConfig.php b/src/SupervisorPoolConfig.php new file mode 100644 index 0000000..d202ba1 --- /dev/null +++ b/src/SupervisorPoolConfig.php @@ -0,0 +1,35 @@ +name = $name; + $this->poolConfig = $poolConfig; + $this->receiverIds = $receiverIds; + } + + public function name(): string { + return $this->name; + } + + public function poolConfig(): PoolConfig { + return $this->poolConfig; + } + + /** @rerturn string[] */ + public function receiverIds(): array { + return $this->receiverIds; + } +} diff --git a/src/WorkerPool.php b/src/WorkerPool.php new file mode 100644 index 0000000..2309dd6 --- /dev/null +++ b/src/WorkerPool.php @@ -0,0 +1,147 @@ +name = $name; + $this->getMessageCount = $getMessageCount; + $this->poolControl = $poolControl; + $this->processManager = $processManager; + $this->autoScale = $autoScale; + $this->logger = $logger; + $this->poolConfig = $poolConfig; + $this->procs = []; + } + + public function manage(?int $timeSinceLastCallInSeconds): void { + $poolConfig = $this->poolControl->getPoolConfig() ?: $this->poolConfig; + $sizeOfQueues = $this->getMessageCount->getMessageCount(); + + if ($this->poolControl->shouldStop()) { + $this->stop(); + return; + } + + $this->beatHeart($poolConfig, $sizeOfQueues, $timeSinceLastCallInSeconds); + $this->refreshDeadProcs(); + + $resp = ($this->autoScale)(new AutoScaleRequest($this->autoScaleState, $timeSinceLastCallInSeconds, $this->numProcs(), $sizeOfQueues, $poolConfig)); + $this->scaleTo($resp->expectedNumProcs()); + $this->autoScaleState = $resp->state(); + } + + public function stop(): void { + if ($this->poolControl->getStatus() == PoolStatus::stopped() && $this->numProcs() == 0) { + return; + } + + $this->logEvent('Stopping Pool', 'stopping'); + $this->poolControl->updateStatus(PoolStatus::stopping()); + + $this->scaleTo(0); + + $this->logger->logEvent('Pool stopped', 'stopped'); + $this->poolControl->updateStatus(PoolStatus::stopped()); + } + + private function beatHeart(PoolConfig $poolConfig, int $sizeOfQueues, ?int $timeSinceLastCallInSeconds): void { + $heartBeatInterval = $poolConfig->attributes()['heartbeat_interval'] ?? self::DEFAULT_HEARTBEAT_INTERVAL; + $this->timeSinceLastHeartBeat += $timeSinceLastCallInSeconds ?: 0; + + if ($this->timeSinceLastHeartBeat >= $heartBeatInterval) { + $this->timeSinceLastHeartBeat = 0; + } + + if ($this->timeSinceLastHeartBeat !== 0) { + return; + } + + $this->poolControl->updateStatus(PoolStatus::running(), $sizeOfQueues); + $this->logEvent('Running', 'running', ['sizeOfQueues' => $sizeOfQueues]); + } + + /** Scales up or down to the expected num procs */ + private function scaleTo(int $expectedNumProcs): void { + while ($expectedNumProcs > $this->numProcs()) { + $this->scaleUp(); + } + while ($expectedNumProcs < $this->numProcs()) { + $this->scaleDown(); + } + } + + private function scaleDown() { + $procRef = array_pop($this->procs); + $this->processManager->killProcess($procRef); + $this->logEvent("Scaling down worker pool", 'scale', ['direction' => 'down']); + $this->poolControl->scaleWorkers($this->numProcs()); + } + + private function scaleUp() { + $proc = $this->processManager->createProcess(); + $this->procs[] = $proc; + $this->logEvent("Scaling up worker pool", 'scale', ['direction' => 'up']); + $this->poolControl->scaleWorkers($this->numProcs()); + } + + private function logEvent(string $message, string $event, array $context = []): void { + $this->logger->logEvent($message, 'pool_'.$event, array_merge([ + 'num_procs' => $this->numProcs(), + 'pool' => $this->name, + ], $context)); + } + + private function numProcs(): int { + return count($this->procs); + } + + /** if any of our procs got killed for some reason, we'll need to start up a replacement proc */ + private function refreshDeadProcs() { + $this->procs = \iterator_to_array((function(array $procs) { + foreach ($procs as $proc) { + if ($this->processManager->isProcessRunning($proc)) { + yield $proc; + continue; + } + + $this->logEvent('Restarting Process', 'restart_proc', [ + 'pid' => $this->processManager->getPid($proc), + ]); + $this->processManager->killProcess($proc); + yield $this->processManager->createProcess(); + } + })($this->procs)); + } +} diff --git a/tests/Feature/AggregatingReceiverMessageCountTest.php b/tests/Feature/AggregatingReceiverMessageCountTest.php new file mode 100644 index 0000000..106c1a9 --- /dev/null +++ b/tests/Feature/AggregatingReceiverMessageCountTest.php @@ -0,0 +1,18 @@ + new StaticGetMessageCount(5), + 'b' => new StaticGetMessageCount(4), + ])); + $this->assertEquals(9, $getMessageCount->getMessageCount()); + } +} diff --git a/tests/Feature/ArrayContainer.php b/tests/Feature/ArrayContainer.php new file mode 100644 index 0000000..82ab1e9 --- /dev/null +++ b/tests/Feature/ArrayContainer.php @@ -0,0 +1,24 @@ +services = $services; + } + + public function get($id) { + return $this->services[$id]; + } + + public function has($id) { + return isset($this->services[$id]); + } +} diff --git a/tests/Feature/AutoScaleTest.php b/tests/Feature/AutoScaleTest.php new file mode 100644 index 0000000..b1e192e --- /dev/null +++ b/tests/Feature/AutoScaleTest.php @@ -0,0 +1,208 @@ +poolConfig = new PoolConfig(); + } + + /** + * @test + * @dataProvider provide_for_min_max_boundaries + */ + public function can_clip_to_min_max_boundaries(int $numProcs, int $expectedNumProcs) { + $this->given_there_is_a_static_auto_scale_at($numProcs); + $this->given_there_is_a_wrapping_min_max_auto_scale(); + $this->given_the_pool_config_has_min_max(1, 5); + $this->when_auto_scale_occurs(); + $this->then_expected_num_procs_is($expectedNumProcs); + } + + public function provide_for_min_max_boundaries() { + yield 'below min' => [0, 1]; + yield 'ok' => [3, 3]; + yield 'above max' => [6, 5]; + } + + /** + * @test + * @dataProvider provide_for_queue_size_and_message_rate + */ + public function can_determine_num_procs_on_queue_size_and_message_rate(int $queueSize, int $messageRate, int $expectedNumProcs) { + $this->given_there_is_a_queue_size_threshold_auto_scale(); + $this->given_the_pool_config_has_message_rate($messageRate); + $this->when_auto_scale_occurs($queueSize); + $this->then_expected_num_procs_is($expectedNumProcs); + } + + public function provide_for_queue_size_and_message_rate() { + yield 'queue@0,100' => [0, 100, 0]; + yield 'queue@50,100' => [50, 100, 1]; + yield 'queue@100,100' => [100, 100, 1]; + yield 'queue@101,100' => [101, 100, 2]; + yield 'queue@200,100' => [200, 100, 2]; + yield 'queue@201,100' => [201, 100, 3]; + yield 'queue@53,5' => [53, 5, 11]; + yield 'queue@57,5' => [57, 5, 12]; + } + + /** + * @test + * @dataProvider provide_for_debouncing + * + * @param array> $runs array of tuples where first element is queueSize and next element is current num procs + */ + public function debounces_auto_scaling(array $runs, int $expectedProcs) { + $this->given_there_is_a_queue_size_threshold_auto_scale(); + $this->given_there_is_a_wrapping_debouncing_auto_scale(); + $this->given_the_pool_config_has_message_rate(1); + $this->given_the_time_since_last_call_is(1); + $this->given_the_pool_config_has_scale_up_threshold_of(2); + $this->given_the_pool_config_has_scale_down_threshold_of(4); + $this->when_auto_scale_occurs_n_times($runs); + $this->then_expected_num_procs_is($expectedProcs); + } + + public function provide_for_debouncing() { + yield 'no debouncing on first auto scale up' => [[ + [2, 0] + ], 2]; + yield 'debounces scale up before threshold is met' => [[ + [1, 0], + [1, 1], + [2, 1], + [2, 1], + ], 1]; + yield 'debounces scale up until threshold is met' => [[ + [1, 0], + [1, 1], + [2, 1], + [3, 1], + [2, 1], + ], 2]; + yield 'resets debounce state if expected num procs matches current procs' => [[ + [1, 0], + [1, 1], + [2, 1], + [2, 1], + [1, 1], + [2, 1], + ], 1]; + yield 'resets debounce state if needed scale direction changes' => [[ + [1, 0], + [1, 1], + [2, 1], + [2, 1], + [0, 1], + [2, 1], + ], 1]; + yield 'prevents scale up debounce if scaling from 0' => [[ + [0, 0], + [0, 0], + [4, 0] + ], 4]; + yield 'resets threshold after first scale event' => [[ + [1, 0], + [2, 1], + [2, 1], + [2, 1], + [3, 2], + [3, 2], + ], 2]; + yield 'multiple scale up events' => [[ + [1, 0], + [2, 1], + [2, 1], + [2, 1], + [3, 2], + [3, 2], + [3, 2], + ], 3]; + yield 'debounces on scale down' => [[ + [2, 0], + [0, 2], + ], 2]; + yield 'finishes debounces on scale down' => [[ + [2, 0], + [0, 2], + [0, 2], + [0, 2], + [0, 2], + [0, 2], + ], 0]; + } + + private function given_there_is_a_static_auto_scale_at(int $numProcs): void { + $this->autoScale = new class($numProcs) implements AutoScale { + private $expectedNumProcs; + + public function __construct(int $expectedNumProcs) { + $this->expectedNumProcs = $expectedNumProcs; + } + + public function __invoke(AutoScale\AutoScaleRequest $req): AutoScale\AutoScaleResponse { + return new AutoScale\AutoScaleResponse($req->state(), $this->expectedNumProcs); + } + }; + } + + private function given_there_is_a_queue_size_threshold_auto_scale(): void { + $this->autoScale = new AutoScale\QueueSizeMessageRateAutoScale(); + } + + private function given_there_is_a_wrapping_min_max_auto_scale(): void { + $this->autoScale = new AutoScale\MinMaxClipAutoScale($this->autoScale); + } + + private function given_there_is_a_wrapping_debouncing_auto_scale(): void { + $this->autoScale = new AutoScale\DebouncingAutoScale($this->autoScale); + } + + private function given_the_pool_config_has_min_max(?int $min, ?int $max): void { + $this->poolConfig = $this->poolConfig->withMinMax($min, $max); + } + + private function given_the_pool_config_has_message_rate(int $messageRate): void { + $this->poolConfig = $this->poolConfig->withMessageRate($messageRate); + } + + private function given_the_pool_config_has_scale_up_threshold_of(int $threshold) { + $this->poolConfig = $this->poolConfig->withScaleUpThresholdSeconds($threshold); + } + + private function given_the_pool_config_has_scale_down_threshold_of(int $threshold) { + $this->poolConfig = $this->poolConfig->withScaleDownThresholdSeconds($threshold); + } + + private function given_the_time_since_last_call_is(?int $timeSinceLastCall) { + $this->timeSinceLastCall = $timeSinceLastCall; + } + + private function when_auto_scale_occurs(int $queueSize = 1, int $numProcs = 1) { + $this->autoScaleResp = ($this->autoScale)(new AutoScale\AutoScaleRequest($this->autoScaleState, $this->timeSinceLastCall, $numProcs, $queueSize, $this->poolConfig)); + $this->autoScaleState = $this->autoScaleResp->state(); + } + + private function when_auto_scale_occurs_n_times(array $args) { + foreach ($args as [$queueSize, $numProcs]) { + $this->when_auto_scale_occurs($queueSize, $numProcs); + } + } + + private function then_expected_num_procs_is(int $expected) { + $this->assertEquals($expected, $this->autoScaleResp->expectedNumProcs()); + } +} diff --git a/tests/Feature/BundleTest.php b/tests/Feature/BundleTest.php new file mode 100644 index 0000000..7f3e858 --- /dev/null +++ b/tests/Feature/BundleTest.php @@ -0,0 +1,107 @@ +addCompilerPass(new PublicServicePass('/(Krak.*|krak\..*|messenger.default_serializer|message_bus)/')); + } + + protected function tearDown() { + if ($this->proc) { + $this->proc->stop(); + } + } + + protected function getBundleClass() { + return MessengerAutoScaleBundle::class; + } + + /** @test */ + public function supervisor_pool_config_is_built_from_sf_configuration() { + $this->given_the_kernel_is_booted_with_messenger_and_auto_scale_config(); + $this->when_the_requires_supervisor_pool_configs_is_created(); + $this->then_the_pool_configs_match_the_auto_scale_config(); + } + + /** @test */ + public function consuming_messages_with_a_running_supervisor() { + $this->given_the_message_info_file_is_reset(); + $this->given_the_kernel_is_booted_with_messenger_and_auto_scale_config(); + $this->given_the_supervisor_is_started(); + $this->when_the_messages_are_dispatched(); + $this->then_the_message_info_file_matches_the_messages_sent(); + } + + public function alerts_system() { + // setup a queue that's overflowing + // run the queue command + } + + private function given_the_message_info_file_is_reset() { + @unlink(__DIR__ . '/Fixtures/_message-info.txt'); + } + + private function given_the_kernel_is_booted_with_messenger_and_auto_scale_config() { + $kernel = $this->createKernel(); + $kernel->addBundle(Fixtures\TestFixtureBundle::class); + $kernel->addBundle(MessengerRedisBundle::class); + $kernel->addConfigFile(__DIR__ . '/Fixtures/messenger-config.yaml'); + $kernel->addConfigFile(__DIR__ . '/Fixtures/auto-scale-config.yaml'); + $this->bootKernel(); + } + + private function given_the_supervisor_is_started() { + $this->proc = new Process([__DIR__ . '/Fixtures/console', 'krak:auto-scale:consume']); + $this->proc + ->setTimeout(null) + ->disableOutput() + ->start(); + } + + private function when_the_requires_supervisor_pool_configs_is_created(): void { + $this->requiresPoolConfigs = $this->getContainer()->get(RequiresSupervisorPoolConfigs::class); + } + + private function when_the_messages_are_dispatched() { + /** @var MessageBusInterface $bus */ + $bus = $this->getContainer()->get('message_bus'); + $bus->dispatch(new Fixtures\Message\CatalogMessage(1)); + $bus->dispatch(new Fixtures\Message\SalesMessage(2)); + usleep(2000 * 1000); // 2000ms + } + + private function then_the_pool_configs_match_the_auto_scale_config() { + $res = $this->requiresPoolConfigs; + $this->assertEquals('sales', $res->poolConfigs[0]->name()); + $this->assertEquals(['sales', 'sales_order'], $res->poolConfigs[0]->receiverIds()); + $this->assertEquals('default', $res->poolConfigs[1]->name()); + $this->assertEquals(['catalog'], $res->poolConfigs[1]->receiverIds()); + } + + private function then_the_message_info_file_matches_the_messages_sent() { + $res = array_map('trim', file(__DIR__ . '/Fixtures/_message-info.txt')); + sort($res); + $this->assertEquals([ + 'catalog: 1', + 'sales-order: 2', + 'sales-order: 2', + 'sales: 2', + 'sales: 2', + ], $res); + } +} diff --git a/tests/Feature/Fixtures/FixtureKernel.php b/tests/Feature/Fixtures/FixtureKernel.php new file mode 100644 index 0000000..444177a --- /dev/null +++ b/tests/Feature/Fixtures/FixtureKernel.php @@ -0,0 +1,37 @@ +load(__DIR__ . '/messenger-config.yaml'); + $loader->load(__DIR__ . '/auto-scale-config.yaml'); + $loader->load(__DIR__ . '/services.php'); + } +} diff --git a/tests/Feature/Fixtures/Message/CatalogMessage.php b/tests/Feature/Fixtures/Message/CatalogMessage.php new file mode 100644 index 0000000..dac5e4e --- /dev/null +++ b/tests/Feature/Fixtures/Message/CatalogMessage.php @@ -0,0 +1,12 @@ +id = $id; + } +} diff --git a/tests/Feature/Fixtures/Message/HandleCatalogMessage.php b/tests/Feature/Fixtures/Message/HandleCatalogMessage.php new file mode 100644 index 0000000..9e10d2f --- /dev/null +++ b/tests/Feature/Fixtures/Message/HandleCatalogMessage.php @@ -0,0 +1,10 @@ +id . "\n", FILE_APPEND); + } +} diff --git a/tests/Feature/Fixtures/Message/HandleSalesMessage.php b/tests/Feature/Fixtures/Message/HandleSalesMessage.php new file mode 100644 index 0000000..e5fa5ab --- /dev/null +++ b/tests/Feature/Fixtures/Message/HandleSalesMessage.php @@ -0,0 +1,10 @@ +id . "\n", FILE_APPEND); + } +} diff --git a/tests/Feature/Fixtures/Message/HandleSalesOrderMessage.php b/tests/Feature/Fixtures/Message/HandleSalesOrderMessage.php new file mode 100644 index 0000000..4c0a7c4 --- /dev/null +++ b/tests/Feature/Fixtures/Message/HandleSalesOrderMessage.php @@ -0,0 +1,10 @@ +id . "\n", FILE_APPEND); + } +} diff --git a/tests/Feature/Fixtures/Message/SalesMessage.php b/tests/Feature/Fixtures/Message/SalesMessage.php new file mode 100644 index 0000000..8a50a84 --- /dev/null +++ b/tests/Feature/Fixtures/Message/SalesMessage.php @@ -0,0 +1,12 @@ +id = $id; + } +} diff --git a/tests/Feature/Fixtures/RequiresSupervisorPoolConfigs.php b/tests/Feature/Fixtures/RequiresSupervisorPoolConfigs.php new file mode 100644 index 0000000..e4b96d1 --- /dev/null +++ b/tests/Feature/Fixtures/RequiresSupervisorPoolConfigs.php @@ -0,0 +1,15 @@ +poolConfigs = $poolConfigs; + } +} diff --git a/tests/Feature/Fixtures/SfRedisMessage.php b/tests/Feature/Fixtures/SfRedisMessage.php new file mode 100644 index 0000000..2cbf722 --- /dev/null +++ b/tests/Feature/Fixtures/SfRedisMessage.php @@ -0,0 +1,8 @@ +load('services.php'); + } + }; + } +} diff --git a/tests/Feature/Fixtures/auto-scale-config.yaml b/tests/Feature/Fixtures/auto-scale-config.yaml new file mode 100644 index 0000000..c97ce75 --- /dev/null +++ b/tests/Feature/Fixtures/auto-scale-config.yaml @@ -0,0 +1,14 @@ +messenger_auto_scale: + console_path: '%kernel.project_dir%/tests/Feature/Fixtures/console' + pools: + sales: + min_procs: 0 + max_procs: 5 + receivers: "sales*" + heartbeat_interval: 5 + default: + min_procs: 0 + max_procs: 5 + backed_up_alert_threshold: 100 + receivers: "*" + heartbeat_interval: 10 diff --git a/tests/Feature/Fixtures/console b/tests/Feature/Fixtures/console new file mode 100755 index 0000000..751397a --- /dev/null +++ b/tests/Feature/Fixtures/console @@ -0,0 +1,33 @@ +#!/usr/bin/env php +getParameterOption(['--env', '-e'], isset($_SERVER['APP_ENV']) ? $_SERVER['APP_ENV'] : 'dev'); +$debug = (isset($_SERVER['APP_DEBUG']) ? $_SERVER['APP_DEBUG'] : ('prod' !== $env)) && !$input->hasParameterOption(['--no-debug', '']); + +if ($debug) { + umask(0000); + + if (class_exists(Debug::class)) { + Debug::enable(); + } +} + +$kernel = new FixtureKernel($env, $debug); +$application = new Application($kernel); +$application->run($input); diff --git a/tests/Feature/Fixtures/messenger-config.yaml b/tests/Feature/Fixtures/messenger-config.yaml new file mode 100644 index 0000000..6ab7c27 --- /dev/null +++ b/tests/Feature/Fixtures/messenger-config.yaml @@ -0,0 +1,13 @@ +framework: + serializer: + enabled: true + cache: + default_redis_provider: 'redis://redis:6379' + messenger: + transports: + catalog: 'redis://redis:6379?queue=catalog' + sales: 'redis://redis:6379?queue=sales' + sales_order: 'redis://redis:6379?queue=sales_order' + routing: + 'Krak\SymfonyMessengerAutoScale\Tests\Feature\Fixtures\Message\CatalogMessage': catalog + 'Krak\SymfonyMessengerAutoScale\Tests\Feature\Fixtures\Message\SalesMessage': [sales, sales_order] diff --git a/tests/Feature/Fixtures/services.php b/tests/Feature/Fixtures/services.php new file mode 100644 index 0000000..101bc7c --- /dev/null +++ b/tests/Feature/Fixtures/services.php @@ -0,0 +1,22 @@ +services() + ->defaults() + ->public() + ->set(RequiresSupervisorPoolConfigs::class) + ->args([ref('krak.messenger_auto_scale.supervisor_pool_configs')]) + ->set(HandleCatalogMessage::class) + ->tag('messenger.message_handler') + ->set(HandleSalesMessage::class) + ->tag('messenger.message_handler') + ->set(HandleSalesOrderMessage::class) + ->tag('messenger.message_handler') + ; +}; diff --git a/tests/Feature/PoolControl/InMemoryPoolControlTest.php b/tests/Feature/PoolControl/InMemoryPoolControlTest.php new file mode 100644 index 0000000..80a68c6 --- /dev/null +++ b/tests/Feature/PoolControl/InMemoryPoolControlTest.php @@ -0,0 +1,14 @@ +createPoolControls(); + /** @var PoolControl\WorkerPoolControl */ + /** @var PoolControl\ActorPoolControl */ + [$workerPoolControl, $actorPoolControl] = $tup; + + $workerPoolControl->updateStatus(PoolStatus::running()); + $workerPoolControl->scaleWorkers(0); + + $this->assertEquals(0, $workerPoolControl->getNumWorkers()); + $this->assertEquals(PoolStatus::running(), $workerPoolControl->getStatus()); + $this->assertEquals(false, $workerPoolControl->shouldStop()); + $this->assertEquals(null, $workerPoolControl->getPoolConfig()); + return $tup; + } + + /** + * @test + * @depends can_start_pool + */ + public function can_scale_workers(array $tup) { + /** @var PoolControl\WorkerPoolControl */ + /** @var PoolControl\ActorPoolControl */ + [$workerPoolControl, $actorPoolControl] = $tup; + + $workerPoolControl->scaleWorkers(5); + + $this->assertEquals(5, $actorPoolControl->getNumWorkers()); + return $tup; + } + + /** + * @test + * @depends can_scale_workers + */ + public function can_update_pool_config(array $tup) { + /** @var PoolControl\WorkerPoolControl */ + /** @var PoolControl\ActorPoolControl */ + [$workerPoolControl, $actorPoolControl] = $tup; + + $actorPoolControl->updatePoolConfig(new PoolConfig(5, 10, 50)); + + $this->assertEquals(5, $workerPoolControl->getPoolConfig()->minProcs()); + $this->assertEquals(10, $workerPoolControl->getPoolConfig()->maxProcs()); + $this->assertEquals(50, $workerPoolControl->getPoolConfig()->messageRate()); + return $tup; + } + + /** + * @test + * @depends can_update_pool_config + */ + public function can_request_a_restart(array $tup) { + /** @var PoolControl\WorkerPoolControl */ + /** @var PoolControl\ActorPoolControl */ + [$workerPoolControl, $actorPoolControl] = $tup; + + $actorPoolControl->restart(); + + $this->assertEquals(true, $workerPoolControl->shouldStop()); + $this->assertEquals(PoolStatus::running(), $workerPoolControl->getStatus()); + return $tup; + } + + /** + * @test + * @depends can_request_a_restart + */ + public function can_mark_consumer_stopping(array $tup) { + /** @var PoolControl\WorkerPoolControl */ + /** @var PoolControl\ActorPoolControl */ + [$workerPoolControl, $actorPoolControl] = $tup; + + $workerPoolControl->updateStatus(PoolStatus::stopping()); + + $this->assertEquals(false, $workerPoolControl->shouldStop()); + $this->assertEquals(PoolStatus::stopping(), $workerPoolControl->getStatus()); + return $tup; + } + + /** + * @test + * @depends can_mark_consumer_stopping + */ + public function can_finish_restart(array $tup) { + /** @var PoolControl\WorkerPoolControl */ + /** @var PoolControl\ActorPoolControl */ + [$workerPoolControl, $actorPoolControl] = $tup; + + $workerPoolControl->updateStatus(PoolStatus::running()); + + $this->assertEquals(false, $workerPoolControl->shouldStop()); + $this->assertEquals(PoolStatus::running(), $workerPoolControl->getStatus()); + return $tup; + } + + /** + * @test + * @depends can_finish_restart + */ + public function can_request_a_pause(array $tup) { + /** @var PoolControl\WorkerPoolControl */ + /** @var PoolControl\ActorPoolControl */ + [$workerPoolControl, $actorPoolControl] = $tup; + + $actorPoolControl->pause(); + + $this->assertEquals(true, $workerPoolControl->shouldStop()); + $this->assertEquals(PoolStatus::running(), $workerPoolControl->getStatus()); + return $tup; + } + + /** + * @test + * @depends can_request_a_pause + */ + public function can_start_pausing_pool(array $tup) { + /** @var PoolControl\WorkerPoolControl */ + /** @var PoolControl\ActorPoolControl */ + [$workerPoolControl, $actorPoolControl] = $tup; + + $workerPoolControl->updateStatus(PoolStatus::stopping()); + + $this->assertEquals(true, $workerPoolControl->shouldStop()); + $this->assertEquals(PoolStatus::stopping(), $workerPoolControl->getStatus()); + return $tup; + } + + /** + * @test + * @depends can_start_pausing_pool + */ + public function can_pause_a_pool(array $tup) { + /** @var PoolControl\WorkerPoolControl */ + /** @var PoolControl\ActorPoolControl */ + [$workerPoolControl, $actorPoolControl] = $tup; + + $workerPoolControl->updateStatus(PoolStatus::stopped()); + + $this->assertEquals(true, $workerPoolControl->shouldStop()); + $this->assertEquals(PoolStatus::stopped(), $workerPoolControl->getStatus()); + return $tup; + } + + /** + * @test + * @depends can_pause_a_pool + */ + public function can_request_resume_a_pool(array $tup) { + /** @var PoolControl\WorkerPoolControl */ + /** @var PoolControl\ActorPoolControl */ + [$workerPoolControl, $actorPoolControl] = $tup; + + $actorPoolControl->resume(); + + $this->assertEquals(false, $workerPoolControl->shouldStop()); + $this->assertEquals(PoolStatus::stopped(), $workerPoolControl->getStatus()); + return $tup; + } + + /** + * @test + * @depends can_request_resume_a_pool + */ + public function can_resume_a_pool(array $tup) { + /** @var PoolControl\WorkerPoolControl */ + /** @var PoolControl\ActorPoolControl */ + [$workerPoolControl, $actorPoolControl] = $tup; + + $workerPoolControl->updateStatus(PoolStatus::running()); + + $this->assertEquals(false, $workerPoolControl->shouldStop()); + $this->assertEquals(PoolStatus::running(), $workerPoolControl->getStatus()); + return $tup; + } +} diff --git a/tests/Feature/ProcessManager/Fixtures/run-proc.php b/tests/Feature/ProcessManager/Fixtures/run-proc.php new file mode 100644 index 0000000..06058a0 --- /dev/null +++ b/tests/Feature/ProcessManager/Fixtures/run-proc.php @@ -0,0 +1,14 @@ +createProcessManager(); + + $proc1 = $procManager->createProcess(); + $proc2 = $procManager->createProcess(); + + $this->assertEquals(true, $procManager->isProcessRunning($proc1)); + $this->assertEquals(true, $procManager->isProcessRunning($proc2)); + + $proc1Pid = $procManager->getPid($proc1); + $proc2Pid = $procManager->getPid($proc2); + + usleep(100000); + + $procManager->killProcess($proc1); + $procManager->killProcess($proc2); + $this->assertEquals(false, $procManager->isProcessRunning($proc1)); + $this->assertEquals(false, $procManager->isProcessRunning($proc2)); + + $rows = array_unique(file(__DIR__ . '/ProcessManager/Fixtures/run-proc.log')); + sort($rows); + + $this->assertEquals([ + "log: $proc1Pid\n", + "log: $proc2Pid\n", + ], $rows); + } +} diff --git a/tests/Feature/StaticGetMessageCount.php b/tests/Feature/StaticGetMessageCount.php new file mode 100644 index 0000000..d1ec035 --- /dev/null +++ b/tests/Feature/StaticGetMessageCount.php @@ -0,0 +1,32 @@ +messageCount = $messageCount; + } + + public function getMessageCount(): int { + return $this->messageCount; + } + + public function get(): iterable { + return []; + } + + public function ack(Envelope $envelope): void { + // TODO: Implement ack() method. + } + + public function reject(Envelope $envelope): void { + // TODO: Implement reject() method. + } +} diff --git a/tests/Feature/WorkerPoolTest.php b/tests/Feature/WorkerPoolTest.php new file mode 100644 index 0000000..aab7760 --- /dev/null +++ b/tests/Feature/WorkerPoolTest.php @@ -0,0 +1,177 @@ +getMessageCount = new StaticGetMessageCount(); + $this->procManager = new MockProcessManager(); + $this->testLogger = new TestLogger(); + $this->logger = new EventLogger($this->testLogger); + $this->poolControl = new InMemoryPoolControl(); + } + + /** @test */ + public function can_start_a_worker_pool() { + $this->given_the_unprocessed_message_count_is(5); + $this->given_there_is_a_queue_size_auto_scale(); + $this->given_the_worker_pool_is_created(); + $this->when_the_worker_pool_is_managed(); + $this->then_the_total_running_procs_is(5); + $this->then_the_pool_control_status_is(PoolStatus::running()); + } + + /** @test */ + public function restarting_a_worker_pool_stops_on_first_manage() { + $this->given_there_is_a_running_worker_pool_with_num_procs(5); + $this->given_the_pool_control_requests_a_restart(); + $this->when_the_worker_pool_is_managed(); + $this->then_the_total_running_procs_is(0); + $this->then_the_pool_control_status_is(PoolStatus::stopped()); + } + + /** @test */ + public function restarting_a_worker_pool_restarts_on_second_manage() { + $this->given_there_is_a_running_worker_pool_with_num_procs(5); + $this->given_the_pool_control_requests_a_restart(); + $this->when_the_worker_pool_is_managed_n_times([null, null]); + $this->then_the_total_running_procs_is(5); + $this->then_the_pool_control_status_is(PoolStatus::running()); + } + + /** @test */ + public function stopping_a_worker_pool() { + $this->given_there_is_a_running_worker_pool_with_num_procs(5); + $this->when_the_worker_pool_is_stopped(); + $this->then_the_total_running_procs_is(0); + $this->then_the_pool_control_status_is(PoolStatus::stopped()); + } + + /** @test */ + public function refreshing_dead_procs() { + $this->given_there_is_a_running_worker_pool_with_num_procs(5); + $this->given_a_proc_is_killed(3); + $this->when_the_worker_pool_is_managed(); + $this->then_the_total_running_procs_is(5); + } + + /** + * @test + * @dataProvider provide_for_scaling_procs + */ + public function can_scale_procs_to_meet_autoscale_expectations(int $expectedNumProcs) { + $this->given_there_is_a_running_worker_pool_with_num_procs(5); + $this->given_the_unprocessed_message_count_is($expectedNumProcs); + $this->when_the_worker_pool_is_managed(); + $this->then_the_total_running_procs_is($expectedNumProcs); + } + + public function provide_for_scaling_procs() { + yield 'scale up' => [6]; + yield 'scale down' => [5]; + } + + /** @test */ + public function can_maintain_state_to_auto_scale() { + $this->given_there_is_a_running_worker_pool_with_num_procs(5, function() { + $this->given_there_is_a_queue_size_auto_scale(); + $this->given_there_is_a_wrapping_debouncing_auto_scale(); + }); + $this->given_the_unprocessed_message_count_is(6); + $this->when_the_worker_pool_is_managed_n_times([1, 1]); + $this->then_the_total_running_procs_is(5); + } + + private function given_the_unprocessed_message_count_is(int $messageCount) { + $this->getMessageCount->messageCount = $messageCount; + } + + private function given_the_worker_pool_is_created(): void { + $this->workerPool = new WorkerPool( + 'test', + $this->getMessageCount, + $this->poolControl, + $this->procManager, + $this->autoScale, + $this->logger, + (new PoolConfig())->withMessageRate(1)->withScaleUpThresholdSeconds(5) + ); + } + + private function given_there_is_a_running_worker_pool_with_num_procs(int $numProcs = 5, ?callable $initAutoScale = null) { + $this->given_the_unprocessed_message_count_is($numProcs); + ($initAutoScale ?? function() { + $this->given_there_is_a_queue_size_auto_scale(); + })(); + $this->given_the_worker_pool_is_created(); + $this->when_the_worker_pool_is_managed(); + } + + private function given_there_is_a_queue_size_auto_scale() { + $this->autoScale = new QueueSizeMessageRateAutoScale(); + } + + private function given_there_is_a_wrapping_debouncing_auto_scale(): void { + $this->autoScale = new DebouncingAutoScale($this->autoScale); + } + + private function given_the_pool_control_requests_a_restart() { + $this->poolControl->restart(); + } + + private function given_a_proc_is_killed(int $procId) { + $this->procManager->stopProcess($procId); + } + + private function given_the_worker_pool_has_been_managed_n_times(array $args): void { + $this->when_the_worker_pool_is_managed_n_times($args); + } + + private function when_the_worker_pool_is_stopped() { + $this->workerPool->stop(); + } + + private function when_the_worker_pool_is_managed(?int $timeSinceLastCallSeconds = null) { + $this->workerPool->manage($timeSinceLastCallSeconds); + } + + private function when_the_worker_pool_is_managed_n_times(array $args): void { + foreach ($args as $timeSinceLastCallSeconds) { + $this->workerPool->manage($timeSinceLastCallSeconds); + } + } + + private function then_the_total_running_procs_is(int $numProcs): void { + $this->assertCount( + $numProcs, + array_filter($this->procManager->getProcs(), function(array $proc) { return $proc['isRunning']; }) + ); + $this->assertEquals($numProcs, $this->poolControl->getNumWorkers()); + } + + private function then_the_pool_control_status_is(PoolStatus $status): void { + $this->assertEquals($status, $this->poolControl->getStatus()); + } +} diff --git a/tests/Unit/Internal/GlobTest.php b/tests/Unit/Internal/GlobTest.php new file mode 100644 index 0000000..9511862 --- /dev/null +++ b/tests/Unit/Internal/GlobTest.php @@ -0,0 +1,26 @@ +assertEquals($expectedMatch, $glob->matches($toMatch)); + } + + public function provide_strings_for_matching() { + yield 'abc => abc' => ['abc', 'abc', true]; + yield 'abc !=> def' => ['abc', 'def', false]; + yield 'abc !=> a' => ['abc', 'a', false]; + yield 'abc !=> ab' => ['abc', 'ab', false]; + yield 'abc !=> abcd' => ['abc', 'abcd', false]; + yield 'abc* => abcd' => ['abc*', 'abcd', true]; + yield '*abc* => xabc' => ['*abc*', 'xabc', true]; + yield '*abc* => xabcx' => ['*abc*', 'xabcx', true]; + } +}