Skip to content

Commit

Permalink
Fix workers problem (#6)
Browse files Browse the repository at this point in the history
* Feat: Change JobInitializer signature

* Feat+Refactor: Separate listener into a class

* Test: Update tests
  • Loading branch information
mpyw authored Jan 1, 2020
1 parent d588f9a commit 26e241d
Show file tree
Hide file tree
Showing 12 changed files with 408 additions and 83 deletions.
13 changes: 12 additions & 1 deletion src/JobInitializers/AlwaysFreshInitializer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Illuminate\Database\DatabaseManager;
use Illuminate\Queue\Events\JobProcessing;
use Mpyw\LaravelCachedDatabaseStickiness\Events\ConnectionCreated;
use Mpyw\LaravelCachedDatabaseStickiness\JobInitializers\Concerns\DetectsInterfaces;
use Mpyw\LaravelCachedDatabaseStickiness\StickinessManager;

Expand Down Expand Up @@ -41,7 +42,7 @@ public function __construct(StickinessManager $stickiness, DatabaseManager $db)
/**
* {@inheritdoc}
*/
public function initializeStickinessState(JobProcessing $event): void
public function initializeOnResolvedConnections(JobProcessing $event): void
{
if ($this->shouldAssumeModified($event->job)) {
foreach ($this->db->getConnections() as $connection) {
Expand All @@ -54,4 +55,14 @@ public function initializeStickinessState(JobProcessing $event): void
$this->stickiness->setRecordsFresh($connection);
}
}

/**
* {@inheritdoc}
*/
public function initializeOnNewConnection(JobProcessing $jobProcessingEvent, ConnectionCreated $connectionCreatedEvent): void
{
$this->shouldAssumeModified($jobProcessingEvent->job)
? $this->stickiness->setRecordsModified($connectionCreatedEvent->connection)
: $this->stickiness->setRecordsFresh($connectionCreatedEvent->connection);
}
}
13 changes: 12 additions & 1 deletion src/JobInitializers/AlwaysModifiedInitializer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Illuminate\Database\DatabaseManager;
use Illuminate\Queue\Events\JobProcessing;
use Mpyw\LaravelCachedDatabaseStickiness\Events\ConnectionCreated;
use Mpyw\LaravelCachedDatabaseStickiness\JobInitializers\Concerns\DetectsInterfaces;
use Mpyw\LaravelCachedDatabaseStickiness\StickinessManager;

Expand Down Expand Up @@ -41,7 +42,7 @@ public function __construct(StickinessManager $stickiness, DatabaseManager $db)
/**
* {@inheritdoc}
*/
public function initializeStickinessState(JobProcessing $event): void
public function initializeOnResolvedConnections(JobProcessing $event): void
{
if ($this->shouldAssumeFresh($event->job)) {
foreach ($this->db->getConnections() as $connection) {
Expand All @@ -54,4 +55,14 @@ public function initializeStickinessState(JobProcessing $event): void
$this->stickiness->setRecordsModified($connection);
}
}

/**
* {@inheritdoc}
*/
public function initializeOnNewConnection(JobProcessing $jobProcessingEvent, ConnectionCreated $connectionCreatedEvent): void
{
$this->shouldAssumeFresh($jobProcessingEvent->job)
? $this->stickiness->setRecordsFresh($connectionCreatedEvent->connection)
: $this->stickiness->setRecordsModified($connectionCreatedEvent->connection);
}
}
13 changes: 11 additions & 2 deletions src/JobInitializers/JobInitializerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,25 @@
namespace Mpyw\LaravelCachedDatabaseStickiness\JobInitializers;

use Illuminate\Queue\Events\JobProcessing;
use Mpyw\LaravelCachedDatabaseStickiness\Events\ConnectionCreated;

/**
* Interface JobInitializerInterface
*/
interface JobInitializerInterface
{
/**
* Initialize database stickiness state before processing each job.
* Initialize database stickiness state on already resolved connections before processing each job.
*
* @param \Illuminate\Queue\Events\JobProcessing $event
*/
public function initializeStickinessState(JobProcessing $event): void;
public function initializeOnResolvedConnections(JobProcessing $event): void;

/**
* Initialize database stickiness state on newly created connection before processing each job.
*
* @param \Illuminate\Queue\Events\JobProcessing $jobProcessingEvent
* @param \Mpyw\LaravelCachedDatabaseStickiness\Events\ConnectionCreated $connectionCreatedEvent
*/
public function initializeOnNewConnection(JobProcessing $jobProcessingEvent, ConnectionCreated $connectionCreatedEvent): void;
}
97 changes: 97 additions & 0 deletions src/StickinessEventListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
<?php

namespace Mpyw\LaravelCachedDatabaseStickiness;

use Illuminate\Queue\Events\JobExceptionOccurred;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
use Mpyw\LaravelCachedDatabaseStickiness\Events\ConnectionCreated;
use Mpyw\LaravelCachedDatabaseStickiness\Events\RecordsHaveBeenModified;

class StickinessEventListener
{
/**
* @var \Mpyw\LaravelCachedDatabaseStickiness\StickinessManager
*/
protected $stickiness;

/**
* @var null|\Illuminate\Queue\Events\JobProcessing
*/
protected $currentJobProcessingEvent;

/**
* StickinessEventListener constructor.
*
* @param \Mpyw\LaravelCachedDatabaseStickiness\StickinessManager $stickiness
*/
public function __construct(StickinessManager $stickiness)
{
$this->stickiness = $stickiness;
}

/**
* Called when JobProcessing dispatched.
*
* @param \Illuminate\Queue\Events\JobProcessing $event
*/
public function onJobProcessing(JobProcessing $event): void
{
$this->stickiness->initializeStickinessState($this->currentJobProcessingEvent = $event);
}

/**
* Called when JobProcessed dispatched.
*
* @param \Illuminate\Queue\Events\JobProcessed $event
*/
public function onJobProcessed(JobProcessed $event): void
{
$this->currentJobProcessingEvent = null;
}

/**
* Called when JobExceptionOccurred dispatched.
*
* @param \Illuminate\Queue\Events\JobExceptionOccurred $event
*/
public function onJobExceptionOccurred(JobExceptionOccurred $event): void
{
$this->currentJobProcessingEvent = null;
}

/**
* Called when JobFailed dispatched.
*
* @param \Illuminate\Queue\Events\JobFailed $event
*/
public function onJobFailed(JobFailed $event): void
{
$this->currentJobProcessingEvent = null;
}

/**
* Called when ConnectionCreated dispatched.
*
* @param \Mpyw\LaravelCachedDatabaseStickiness\Events\ConnectionCreated $event
*/
public function onConnectionCreated(ConnectionCreated $event): void
{
if ($this->currentJobProcessingEvent) {
$this->stickiness->initializeStickinessState($this->currentJobProcessingEvent, $event);
}

$this->stickiness->resolveRecordsModified($event->connection);
}

/**
* Called when RecordsHaveBeenModified dispatched.
*
* @param \Mpyw\LaravelCachedDatabaseStickiness\Events\RecordsHaveBeenModified $event
*/
public function onRecordsHaveBeenModified(RecordsHaveBeenModified $event): void
{
$this->stickiness->markAsModified($event->connection);
}
}
47 changes: 11 additions & 36 deletions src/StickinessManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
use Illuminate\Database\ConnectionInterface;
use Illuminate\Queue\Events\JobProcessing;
use Mpyw\LaravelCachedDatabaseStickiness\Events\ConnectionCreated;
use Mpyw\LaravelCachedDatabaseStickiness\Events\RecordsHaveBeenModified;
use Mpyw\LaravelCachedDatabaseStickiness\JobInitializers\JobInitializerInterface;
use Mpyw\LaravelCachedDatabaseStickiness\StickinessResolvers\StickinessResolverInterface;
use ReflectionProperty;
Expand Down Expand Up @@ -36,36 +35,6 @@ public function __construct(Container $container)
$this->container = $container;
}

/**
* Called when JobProcessing dispatched.
*
* @param \Illuminate\Queue\Events\JobProcessing $event
*/
public function onJobProcessing(JobProcessing $event): void
{
$this->initializeStickinessState($event);
}

/**
* Called when RecordsHaveBeenModified dispatched.
*
* @param \Mpyw\LaravelCachedDatabaseStickiness\Events\RecordsHaveBeenModified $event
*/
public function onRecordsHaveBeenModified(RecordsHaveBeenModified $event): void
{
$this->markAsModified($event->connection);
}

/**
* Called when ConnectionCreated dispatched.
*
* @param \Mpyw\LaravelCachedDatabaseStickiness\Events\ConnectionCreated $event
*/
public function onConnectionCreated(ConnectionCreated $event): void
{
$this->resolveRecordsModified($event->connection);
}

/** @noinspection PhpDocMissingThrowsInspection */

/**
Expand Down Expand Up @@ -144,17 +113,23 @@ public function isRecentlyModified(ConnectionInterface $connection): bool
/**
* Initialize database stickiness state before processing each job.
*
* @param JobProcessing $event
* @param \Illuminate\Queue\Events\JobProcessing $jobProcessingEvent
* @param null|\Mpyw\LaravelCachedDatabaseStickiness\Events\ConnectionCreated $connectionCreatedEvent
*/
public function initializeStickinessState(JobProcessing $event): void
public function initializeStickinessState(JobProcessing $jobProcessingEvent, ?ConnectionCreated $connectionCreatedEvent = null): void
{
$this->jobInitializer()->initializeStickinessState($event);
$initializer = $this->jobInitializer();
$initializer->initializeOnResolvedConnections($jobProcessingEvent);

if ($connectionCreatedEvent) {
$initializer->initializeOnNewConnection($jobProcessingEvent, $connectionCreatedEvent);
}
}

/** @noinspection PhpDocMissingThrowsInspection */

/**
* @return StickinessResolverInterface
* @return \Mpyw\LaravelCachedDatabaseStickiness\StickinessResolvers\StickinessResolverInterface
*/
protected function stickinessResolver(): StickinessResolverInterface
{
Expand All @@ -165,7 +140,7 @@ protected function stickinessResolver(): StickinessResolverInterface
/** @noinspection PhpDocMissingThrowsInspection */

/**
* @return JobInitializerInterface
* @return \Mpyw\LaravelCachedDatabaseStickiness\JobInitializers\JobInitializerInterface
*/
protected function jobInitializer(): JobInitializerInterface
{
Expand Down
19 changes: 13 additions & 6 deletions src/StickinessServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
namespace Mpyw\LaravelCachedDatabaseStickiness;

use Illuminate\Contracts\Events\Dispatcher;
use Illuminate\Queue\Events\JobExceptionOccurred;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
use Illuminate\Support\ServiceProvider;
use Mpyw\LaravelCachedDatabaseStickiness\Events\ConnectionCreated;
Expand All @@ -17,14 +20,17 @@ class StickinessServiceProvider extends ServiceProvider
/**
* {@inheritdoc}
*
* @param \Illuminate\Contracts\Events\Dispatcher $events
* @param \Mpyw\LaravelCachedDatabaseStickiness\StickinessManager $stickiness
* @param \Illuminate\Contracts\Events\Dispatcher $events
* @param \Mpyw\LaravelCachedDatabaseStickiness\StickinessEventListener $listener
*/
public function boot(Dispatcher $events, StickinessManager $stickiness): void
public function boot(Dispatcher $events, StickinessEventListener $listener): void
{
$events->listen(ConnectionCreated::class, [$stickiness, 'onConnectionCreated']);
$events->listen(JobProcessing::class, [$stickiness, 'onJobProcessing']);
$events->listen(RecordsHaveBeenModified::class, [$stickiness, 'onRecordsHaveBeenModified']);
$events->listen(JobProcessing::class, [$listener, 'onJobProcessing']);
$events->listen(JobProcessed::class, [$listener, 'onJobProcessed']);
$events->listen(JobExceptionOccurred::class, [$listener, 'onJobExceptionOccurred']);
$events->listen(JobFailed::class, [$listener, 'onJobFailed']);
$events->listen(ConnectionCreated::class, [$listener, 'onConnectionCreated']);
$events->listen(RecordsHaveBeenModified::class, [$listener, 'onRecordsHaveBeenModified']);
}

/**
Expand All @@ -33,6 +39,7 @@ public function boot(Dispatcher $events, StickinessManager $stickiness): void
public function register(): void
{
$this->app->singleton(StickinessManager::class);
$this->app->singleton(StickinessEventListener::class);
$this->app->singleton('db.factory', ConnectionFactory::class);

$this->app->bind(StickinessResolverInterface::class, IpBasedResolver::class);
Expand Down
8 changes: 8 additions & 0 deletions tests/Feature/InitializingTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Illuminate\Support\Facades\Notification;
use Mpyw\LaravelCachedDatabaseStickiness\ConnectionServiceProvider;
use Mpyw\LaravelCachedDatabaseStickiness\StickinessServiceProvider;
use Mpyw\LaravelCachedDatabaseStickiness\Tests\Stubs\Jobs\ConnectionResolvableJob;
use Mpyw\LaravelCachedDatabaseStickiness\Tests\Stubs\Jobs\FreshJob;
use Mpyw\LaravelCachedDatabaseStickiness\Tests\Stubs\Jobs\GeneralJob;
use Mpyw\LaravelCachedDatabaseStickiness\Tests\Stubs\Jobs\ModifiedJob;
Expand Down Expand Up @@ -126,4 +127,11 @@ public function testInitializationForMailables(): void

$this->assertTrue($this->getRecordsModifiedViaReflection());
}

public function testUnresolvedConnectionShouldBeInitializedAfterJobProcessingDispatched(): void
{
Bus::dispatch(new ConnectionResolvableJob());

$this->assertTrue($this->getRecordsModifiedViaReflection());
}
}
14 changes: 14 additions & 0 deletions tests/Stubs/Jobs/ConnectionResolvableJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

namespace Mpyw\LaravelCachedDatabaseStickiness\Tests\Stubs\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Support\Facades\DB;

class ConnectionResolvableJob implements ShouldQueue
{
public function handle(): void
{
DB::connection();
}
}
Loading

0 comments on commit 26e241d

Please sign in to comment.