Skip to content

Commit

Permalink
Initial Version
Browse files Browse the repository at this point in the history
- Built out initial worker pool/supervisor, auto scale system, and alerts
- Setup repo, tests, docs
- Enabled auto scale debouncing

Signed-off-by: RJ Garcia <[email protected]>
  • Loading branch information
ragboyjr committed Apr 21, 2020
1 parent 8f7c291 commit 3f1f481
Show file tree
Hide file tree
Showing 74 changed files with 2,949 additions and 3 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/vendor
/composer.lock
/var/
/tests/Feature/Fixtures/_message-info.txt
8 changes: 8 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM php:7.2-cli

RUN apt-get update && apt-get install -y git zip

COPY --from=mlocati/php-extension-installer /usr/bin/install-php-extensions /usr/bin/
RUN install-php-extensions redis pcntl

COPY --from=composer:1.9.1 /usr/bin/composer /usr/bin/composer
20 changes: 20 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
The MIT License (MIT)

Copyright (c) 2020 RJ Garcia

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
100 changes: 97 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,111 @@ return [

## Usage

### Standalone
After the bundle is loaded, you need to configure worker pools which will manage procs for a set of messenger receivers.

### Within Symfony Framework
```yaml
messenger_auto_scale:
console_path: '%kernel.project_dir%/tests/Feature/Fixtures/console'
pools:
sales:
min_procs: 0
max_procs: 5
receivers: "sales*"
heartbeat_interval: 5
default:
min_procs: 0
max_procs: 5
backed_up_alert_threshold: 100
receivers: "*"
heartbeat_interval: 10
```
Once configured, you can start the consumer with the `krak:auto-scale:consume` command which will start up and manage the worker pools.

## Matching Receivers

Each pool config must have a `receivers` property which is a simple Glob that will match any of the current transport names setup in the messenger config.

It's important to note, that a receiver can ONLY be apart of one pool. So if two pools have receiver patterns that match the same receiver, then the first defined pool would own that receiver.

## Configuring Heartbeats

By default, each worker pool will log a heartbeat event every 60 seconds. If you want to change the frequency of that, you use the pool `heartbeat_interval` to define the number of seconds between subsequent heartbeats.

## Monitoring

You can access the PoolControl from your own services if you want to build out custom monitoring, or you can just use the `krak:auto-scale:pool:*` commands that are registered.

## Auto Scaling

Auto scaling is managed with the AutoScale interface which is responsible for taking the current state of a worker pool captured in the `AutoScaleRequest` and returning the expected num workers for that worker pool captured in `AutoScaleResponse`.

The default auto scale is setup to work off of the current queue size and the configured message rate and then will clip to the min/max procs configured. There also is some logic included to debounce the auto scaling requests to ensure that the system is judicious about when to create new procs and isn't fluctuating too often.

Here is some example config and we'll go over some scenarios:

```yaml
messenger_auto_scale:
pools:
catalog:
max_procs: 5
message_rate: 100
scale_up_threshold_seconds: 5
scale_down_threshold_seconds: 20
receivers: "catalog"
sales:
min_procs: 5
message_rate: 10
scale_up_threshold_seconds: 5
scale_down_threshold_seconds: 20
receivers: "sales"
```

| Seconds from Start | Catalog Pool Queue Size | Catalog Pool Num Workers | Sales Pool Queue Size | Sales Pool Num Workers | Notes |
| -------------------|-------------------------|--------------------------|-----------------------|------------------------|-------|
| n/a | 0 | 0 | 0 | 0 | Initial State |
| 0 | 0 | 0 | 0 | 5 | First Run, scaled up to 5 because of min procs |
| 2 | 1 | 1 | 60 | 5 | Scale up to 1 on catalog immediately, but wait until scale up threshold for sales |
| 5 | 0 | 1 | 50 | 5 | Wait to scale down on for catalog, reset counter for sales for scale up because now a scale up isn't needed |
| 6 | 0 | 1 | 60 | 5 | Wait to scale up on sales again, timer started, needs 5 seconds before scale up |
| 11 | 0 | 1 | 60 | 6 | Size of queue maintained over 60 for 5 seconds, so now we can scale up. |
| 22 | 0 | 0 | 60 | 6 | Catalog now goes back to zero after waiting 20 seconds since needing to scale down |

### Defining your own Auto Scale algorithm

If you want to augment or perform your own auto-scaling algorithm, you can implement the AutoScale interface and then update the `Krak\SymfonyMessengerAutoScale\AutoScale` to point to your new auto scale service. The default service is defined like:

```php
use Krak\SymfonyMessengerAutoScale\AutoScale;
$autoScale = new AutoScale\MinMaxClipAutoScale(new AutoScale\DebouncingAutoScale(new AutoScale\QueueSizeMessageRateAutoScale()));
```

## Dashboards
## Alerts

The alerting system is designed to be flexible and allow each user define alerts as they see. Alerts are simply just events that get dispatched when a certain metric is reached as determined by the services that implement `RaiseAlerts`.

To actually trigger the alerts, you need to run the `krak:auto-scale:alert` command which will check the state of the pools and raise alerts. Put this command on a cron at whatever interval you want alerts monitored at.

### Subscribing to Alerts

You simply just can create a basic symfony event listener/subscriber for that event and you should be able to perform any action on those events.

#### PoolBackedUpAlert

This alert will fire if the there are too many messages for the given queue. To enable this on a pool, you need to define the `backed_up_alert_threshold` config value.

```yaml
# ...
sales:
backed_up_alert_threshold: 100
```

If there are over 100 messages in the sales pool, then the PoolBackedUpAlert will fire on the next check.

### Creating Your Own Alerts

To create an alert, you need to subscribe to the RaiseAlerts interface, then register that service, and if you enable auto configuration, it should automatically get tagged with `messenger_auto_scale.raise_alerts`.

## Testing

Expand Down
47 changes: 47 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
{
"name": "krak/symfony-messenger-auto-scale",
"description": "Symfony Messenger Auto Scaling",
"type": "symfony-bundle",
"authors": [
{
"name": "RJ Garcia",
"email": "[email protected]"
}
],
"license": "MIT",
"require": {
"php": "^7.1",
"ext-pcntl": "*",
"krak/schema": "^0.2.0",
"psr/event-dispatcher": "^1.0",
"symfony/messenger": "^4.4"
},
"autoload": {
"psr-4": {
"Krak\\SymfonyMessengerAutoScale\\": "src"
}
},
"autoload-dev": {
"psr-4": {
"Krak\\SymfonyMessengerAutoScale\\Tests\\": "tests"
}
},
"require-dev": {
"ext-redis": "*",
"krak/symfony-messenger-redis": "^0.1.0",
"nyholm/symfony-bundle-test": "^1.6",
"phpunit/phpunit": "^7.3",
"psr/simple-cache": "^1.0",
"symfony/cache": "^5.0",
"symfony/console": "^4.4",
"symfony/dependency-injection": "^4.1",
"symfony/http-kernel": "^4.1",
"symfony/process": "^5.0",
"symfony/property-access": "^4.1",
"symfony/serializer": "^4.1"
},
"scripts": {
"test": "phpunit --testdox --colors=always",
"flush-redis": "docker-compose exec -T redis redis-cli flushall"
}
}
14 changes: 14 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
version: '3'

services:
php:
build: .
command: "tail -f /dev/null"
working_dir: /var/www/html
volumes:
- ./:/var/www/html
redis:
image: redis
environment: { TERM: xterm }
ports: ["6379:6379"]
restart: unless-stopped
28 changes: 28 additions & 0 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>

<!-- https://phpunit.de/manual/current/en/appendixes.configuration.html -->
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="./vendor/phpunit/phpunit/phpunit.xsd"
backupGlobals="false"
colors="true"
bootstrap="vendor/autoload.php">
<php>
<ini name="error_reporting" value="-1" />
<env name="SHELL_VERBOSITY" value="-1" />
<env name="REDIS_DSN" value="redis://localhost:6379?queue=messenger"/>
</php>
<testsuites>
<testsuite name="feature">
<directory>tests/Feature</directory>
</testsuite>
<testsuite name="unit">
<directory>tests/Unit</directory>
</testsuite>
</testsuites>

<filter>
<whitelist>
<directory>./src/</directory>
</whitelist>
</filter>
</phpunit>
29 changes: 29 additions & 0 deletions src/AggregatingReceiverMessageCount.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

namespace Krak\SymfonyMessengerAutoScale;

use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;

/** take a collection of receivers and return the total count of pending messages */
final class AggregatingReceiverMessageCount implements MessageCountAwareInterface
{
private $receivers;

public function __construct(ReceiverInterface ...$receivers) {
$this->receivers = $receivers;
}

public static function createFromReceiverIds(array $receiverIds, ContainerInterface $receiversById) {
return new self(...array_map(function(string $receverId) use ($receiversById) {
return $receiversById->get($receverId);
}, $receiverIds));
}

public function getMessageCount(): int {
return array_reduce($this->receivers, function(int $sum, ReceiverInterface $receiver) {
return $receiver instanceof MessageCountAwareInterface ? $receiver->getMessageCount() + $sum : $sum;
}, 0);
}
}
12 changes: 12 additions & 0 deletions src/AutoScale.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

namespace Krak\SymfonyMessengerAutoScale;

use Krak\SymfonyMessengerAutoScale\AutoScale\AutoScaleRequest;
use Krak\SymfonyMessengerAutoScale\AutoScale\AutoScaleResponse;

/** Service responsible for determining the appropriate size of the pool based off of current state of pool and config */
interface AutoScale
{
public function __invoke(AutoScaleRequest $req): AutoScaleResponse;
}
42 changes: 42 additions & 0 deletions src/AutoScale/AutoScaleRequest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?php

namespace Krak\SymfonyMessengerAutoScale\AutoScale;

use Krak\SymfonyMessengerAutoScale\PoolConfig;

final class AutoScaleRequest
{
private $state;
private $timeSinceLastCall;
private $numProcs;
private $sizeOfQueue;
private $poolConfig;

public function __construct(?array $state, ?int $timeSinceLastCall, int $numProcs, int $sizeOfQueue, PoolConfig $poolConfig) {
$this->state = $state;
$this->timeSinceLastCall = $timeSinceLastCall;
$this->numProcs = $numProcs;
$this->sizeOfQueue = $sizeOfQueue;
$this->poolConfig = $poolConfig;
}

public function state(): ?array {
return $this->state;
}

public function timeSinceLastCall(): ?int {
return $this->timeSinceLastCall;
}

public function numProcs(): int {
return $this->numProcs;
}

public function sizeOfQueue(): int {
return $this->sizeOfQueue;
}

public function poolConfig(): PoolConfig {
return $this->poolConfig;
}
}
41 changes: 41 additions & 0 deletions src/AutoScale/AutoScaleResponse.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

namespace Krak\SymfonyMessengerAutoScale\AutoScale;

final class AutoScaleResponse
{
private $state;
private $expectedNumProcs;

public function __construct(?array $state, int $expectedNumProcs) {
if ($expectedNumProcs < 0) {
throw new \RuntimeException('Expected number of procs must be zero or greater.');
}
$this->state = $state;
$this->expectedNumProcs = $expectedNumProcs;
}

public function state(): ?array {
return $this->state;
}

public function withState(?array $state): self {
$self = clone $this;
$self->state = $state;
return $self;
}

public function withAddedState(array $stateToMerge): self {
return $this->withState(array_merge($this->state ?? [], $stateToMerge));
}

public function expectedNumProcs(): int {
return $this->expectedNumProcs;
}

public function withExpectedNumProcs(int $expectedNumProcs): self {
$self = clone $this;
$self->expectedNumProcs = $expectedNumProcs;
return $self;
}
}
Loading

0 comments on commit 3f1f481

Please sign in to comment.