Skip to content

Commit

Permalink
+ poolContext
Browse files Browse the repository at this point in the history
  • Loading branch information
EdmondDantes committed Oct 13, 2024
1 parent 31976d6 commit c13838d
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 8 deletions.
14 changes: 10 additions & 4 deletions src/Strategies/RunnerStrategy/DefaultRunner.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public static function processEntryPoint(Channel $channel): void
}

try {
['id' => $id, 'group' => $group, 'groupsScheme' => $groupsScheme, 'workersStorage' => $workersStorage] = self::readWorkerMetadata($channel);
['id' => $id, 'group' => $group, 'groupsScheme' => $groupsScheme, 'workersStorage' => $workersStorage, 'context' => $context]
= self::readWorkerMetadata($channel);

} catch (\Throwable $exception) {
throw new FatalWorkerException('Could not connect to IPC socket', 0, $exception);
Expand All @@ -57,7 +58,7 @@ public static function processEntryPoint(Channel $channel): void
throw new FatalWorkerException('Entry point class must implement WorkerEntryPointI');
}

$worker = new Worker((int) $id, $channel, $group, $groupsScheme, $workersStorage);
$worker = new Worker((int) $id, $channel, $group, $groupsScheme, $workersStorage, null, $context);

$entryPoint->initialize($worker);
$worker->applyGlobalErrorHandler();
Expand Down Expand Up @@ -101,7 +102,7 @@ public function getScript(): string|array
* @throws SerializationException
* @throws ChannelException
*/
public function initiateWorkerContext(Context $processContext, int $workerId, WorkerGroupInterface $group): void
public function initiateWorkerContext(Context $processContext, int $workerId, WorkerGroupInterface $group, array $context = []): void
{
$workerPool = $this->getWorkerPool();

Expand All @@ -113,7 +114,8 @@ public function initiateWorkerContext(Context $processContext, int $workerId, Wo
'id' => $workerId,
'group' => $group,
'groupsScheme' => $workerPool->getGroupsScheme(),
'workersStorage' => $workerPool->getWorkersStorage()::class
'workersStorage' => $workerPool->getWorkersStorage()::class,
'context' => $context
]);
}

Expand Down Expand Up @@ -141,6 +143,10 @@ protected static function readWorkerMetadata(Channel $channel): array
if(!\is_array($data['groupsScheme'])) {
throw new \Error('Invalid groups scheme. Expected array');
}

if(false === array_key_exists('context', $data)) {
$data['context'] = [];
}

return $data;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Strategies/RunnerStrategy/RunnerStrategyInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ public function getScript(): string|array;
*
*
**/
public function initiateWorkerContext(Context $processContext, int $workerId, WorkerGroupInterface $group): void;
public function initiateWorkerContext(Context $processContext, int $workerId, WorkerGroupInterface $group, array $context = []): void;
}
12 changes: 11 additions & 1 deletion src/Worker/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class Worker implements WorkerInterface

/** @var ConcurrentIterator<TReceive> */
protected readonly ConcurrentIterator $iterator;

protected readonly array $context;

private LoggerInterface $logger;
private WorkersStorageInterface $workersStorage;
Expand Down Expand Up @@ -81,13 +83,15 @@ public function __construct(
private readonly array $groupsScheme,
string $workersStorageClass,
?LoggerInterface $logger = null,
array $context = []
) {
$this->queue = new Queue();
$this->iterator = $this->queue->iterate();
$this->mainCancellation = new DeferredCancellation;
$this->workerFuture = new DeferredFuture;

$this->eventEmitter = new WorkerEventEmitter;
$this->context = $context;

if(\class_exists($workersStorageClass) === false) {
throw new \RuntimeException('Invalid storage class provided. Expected ' . WorkersStorageInterface::class . ' implementation');
Expand Down Expand Up @@ -173,7 +177,13 @@ public function getWorkerType(): WorkerTypeEnum
{
return $this->group->getWorkerType();
}


#[\Override]
public function getWorkerContext(): array
{
return $this->context;
}

public function getWorkerEventEmitter(): WorkerEventEmitterInterface
{
return $this->eventEmitter;
Expand Down
1 change: 1 addition & 0 deletions src/Worker/WorkerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public function getWorkerId(): int;
public function getWorkerGroup(): WorkerGroup;
public function getWorkerGroupId(): int;
public function getWorkerType(): WorkerTypeEnum;
public function getWorkerContext(): array;

public function getAbortCancellation(): Cancellation;

Expand Down
19 changes: 17 additions & 2 deletions src/WorkerPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ final class WorkerPool implements WorkerPoolInterface
* @var WorkerDescriptor[]
*/
protected array $workers = [];

protected array $poolContext = [];

protected readonly Queue $queue;
private readonly ConcurrentIterator $iterator;
Expand Down Expand Up @@ -277,7 +279,20 @@ public function validateGroupsScheme(): void

}
}


#[\Override]
public function getPoolsContext(): array
{
return $this->poolContext;
}

#[\Override]
public function setPoolContext(array $context): static
{
$this->poolContext = $context;
return $this;
}

public function run(): void
{
if ($this->running) {
Expand Down Expand Up @@ -656,7 +671,7 @@ private function startWorker(WorkerDescriptor $workerDescriptor): void

$workerDescriptor->setWorkerProcess($workerProcess);

$runnerStrategy->initiateWorkerContext($context, $workerDescriptor->id, $workerDescriptor->group);
$runnerStrategy->initiateWorkerContext($context, $workerDescriptor->id, $workerDescriptor->group, $this->poolContext);

$this->eventEmitter->emitWorkerEvent(
new WorkerProcessStarted($workerDescriptor->id, $workerDescriptor->group, $context),
Expand Down
4 changes: 4 additions & 0 deletions src/WorkerPoolInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ public function getGroupsScheme(): array;
public function findGroup(int|string $groupIdOrName): WorkerGroupInterface|null;

public function validateGroupsScheme(): void;

public function getPoolsContext(): array;

public function setPoolContext(array $context): static;

/**
* Run the worker pool.
* This method will block the current fiber and release it after the pool is stopped.
Expand Down

0 comments on commit c13838d

Please sign in to comment.