Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mutex #499

Merged
merged 12 commits into from
Oct 28, 2024
3 changes: 2 additions & 1 deletion .github/workflows/run-test-suite.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ jobs:
name: PHP${{ matrix.php }}${{ matrix.extensions-suffix }}, ${{ matrix.os }}, ${{ matrix.dependencies }} deps
runs-on: ${{ matrix.os }}
timeout-minutes: ${{ matrix.timeout-minutes }}
env: {GITHUB_TOKEN: '${{ secrets.GITHUB_TOKEN }}'}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
strategy:
fail-fast: ${{ inputs.fail-fast }}
matrix:
Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
"jetbrains/phpstorm-attributes": "dev-master@dev",
"laminas/laminas-code": "^4.0",
"phpunit/phpunit": "^10.5",
"spiral/code-style": "^2.1.2",
"spiral/code-style": "~2.1.2",
"spiral/core": "^3.13",
"symfony/var-dumper": "^6.0 || ^7.0",
"vimeo/psalm": "^4.30 || ^5.4"
Expand Down
5 changes: 5 additions & 0 deletions psalm-baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1497,6 +1497,11 @@
<code><![CDATA[$returnType]]></code>
</MissingParamType>
</file>
<file src="src/Workflow/Mutex.php">
<TooManyTemplateParams>
<code><![CDATA[PromiseInterface<self>]]></code>
</TooManyTemplateParams>
</file>
<file src="src/Workflow/QueryMethod.php">
<DeprecatedClass>
<code><![CDATA[NamedArgumentConstructor]]></code>
Expand Down
5 changes: 3 additions & 2 deletions src/Interceptor/WorkflowOutboundCalls/AwaitInput.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Temporal\Interceptor\WorkflowOutboundCalls;

use React\Promise\PromiseInterface;
use Temporal\Workflow\Mutex;

/**
* @psalm-immutable
Expand All @@ -15,14 +16,14 @@ final class AwaitInput
* @no-named-arguments
* @internal Don't use the constructor. Use {@see self::with()} instead.
*
* @param array<callable|PromiseInterface> $conditions
* @param array<callable|Mutex|PromiseInterface> $conditions
*/
public function __construct(
public readonly array $conditions,
) {}

/**
* @param array<callable|PromiseInterface> $conditions
* @param array<callable|Mutex|PromiseInterface> $conditions
*/
public function with(
?array $conditions = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace Temporal\Interceptor\WorkflowOutboundCalls;

use React\Promise\PromiseInterface;
use Temporal\Workflow\Mutex;

/**
* @psalm-immutable
Expand All @@ -22,15 +23,15 @@ final class AwaitWithTimeoutInput
* @no-named-arguments
* @internal Don't use the constructor. Use {@see self::with()} instead.
*
* @param array<callable|PromiseInterface> $conditions
* @param array<callable|Mutex|PromiseInterface> $conditions
*/
public function __construct(
public readonly \DateInterval $interval,
public readonly array $conditions,
) {}

/**
* @param array<callable|PromiseInterface> $conditions
* @param array<callable|Mutex|PromiseInterface> $conditions
*/
public function with(
?\DateInterval $interval = null,
Expand Down
8 changes: 6 additions & 2 deletions src/Internal/Workflow/Process/Scope.php
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public function onClose(callable $then): self
/**
* @param \Throwable|null $reason
*/
public function cancel(\Throwable $reason = null): void
public function cancel(?\Throwable $reason = null): void
{
if ($this->detached && !$reason instanceof DestructMemorizedInstanceException) {
// detaches scopes can be offload via memory flush
Expand Down Expand Up @@ -291,7 +291,7 @@ public function always(callable $onFulfilledOrRejected): PromiseInterface
*/
public function onAwait(Deferred $deferred): void
{
$this->onCancel[++$this->cancelID] = static function (\Throwable $e = null) use ($deferred): void {
$this->onCancel[++$this->cancelID] = static function (?\Throwable $e = null) use ($deferred): void {
$deferred->reject($e ?? new CanceledFailure(''));
};

Expand Down Expand Up @@ -429,6 +429,10 @@ protected function next(): void
$this->context->resolveConditions();

switch (true) {
case $current instanceof Workflow\Mutex:
$this->nextPromise($this->context->await($current));
break;

case $current instanceof PromiseInterface:
$this->nextPromise($current);
break;
Expand Down
13 changes: 6 additions & 7 deletions src/Internal/Workflow/WorkflowContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
use Temporal\Workflow\ChildWorkflowStubInterface;
use Temporal\Workflow\ContinueAsNewOptions;
use Temporal\Workflow\ExternalWorkflowStubInterface;
use Temporal\Workflow\Mutex;
use Temporal\Workflow\WorkflowContextInterface;
use Temporal\Workflow\WorkflowExecution;
use Temporal\Workflow\WorkflowInfo;
Expand Down Expand Up @@ -552,7 +553,7 @@ public function upsertSearchAttributes(array $searchAttributes): void
/**
* {@inheritDoc}
*/
public function await(...$conditions): PromiseInterface
public function await(callable|Mutex|PromiseInterface ...$conditions): PromiseInterface
{
return $this->callsInterceptor->with(
fn(AwaitInput $input): PromiseInterface => $this->awaitRequest(...$input->conditions),
Expand All @@ -564,7 +565,7 @@ public function await(...$conditions): PromiseInterface
/**
* {@inheritDoc}
*/
public function awaitWithTimeout($interval, ...$conditions): PromiseInterface
public function awaitWithTimeout($interval, callable|Mutex|PromiseInterface ...$conditions): PromiseInterface
{
$intervalObject = DateInterval::parse($interval, DateInterval::FORMAT_SECONDS);

Expand Down Expand Up @@ -660,16 +661,14 @@ public function destroy(): void
unset($this->workflowInstance);
}

/**
* @param callable|PromiseInterface ...$conditions
*/
protected function awaitRequest(...$conditions): PromiseInterface
protected function awaitRequest(callable|Mutex|PromiseInterface ...$conditions): PromiseInterface
{
$result = [];
$conditionGroupId = Uuid::v4();

foreach ($conditions as $condition) {
\assert(\is_callable($condition) || $condition instanceof PromiseInterface);
// Wrap Mutex into callable
$condition instanceof Mutex and $condition = static fn(): bool => !$condition->isLocked();

if ($condition instanceof \Closure) {
$callableResult = $condition($conditionGroupId);
Expand Down
32 changes: 28 additions & 4 deletions src/Workflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
use Temporal\Workflow\ChildWorkflowStubInterface;
use Temporal\Workflow\ContinueAsNewOptions;
use Temporal\Workflow\ExternalWorkflowStubInterface;
use Temporal\Workflow\Mutex;
use Temporal\Workflow\ScopedContextInterface;
use Temporal\Workflow\UpdateContext;
use Temporal\Workflow\WorkflowExecution;
Expand Down Expand Up @@ -280,10 +281,10 @@ public static function asyncDetached(callable $task): CancellationScopeInterface
* }
* ```
*
* @param callable|PromiseInterface ...$conditions
* @param callable|PromiseInterface|Mutex ...$conditions
* @return PromiseInterface
*/
public static function await(...$conditions): PromiseInterface
public static function await(callable|Mutex|PromiseInterface ...$conditions): PromiseInterface
{
return self::getCurrentContext()->await(...$conditions);
}
Expand All @@ -310,10 +311,10 @@ public static function await(...$conditions): PromiseInterface
* ```
*
* @param DateIntervalValue $interval
* @param callable|PromiseInterface ...$conditions
* @param callable|PromiseInterface|Mutex ...$conditions
* @return PromiseInterface<bool>
*/
public static function awaitWithTimeout($interval, ...$conditions): PromiseInterface
public static function awaitWithTimeout($interval, callable|Mutex|PromiseInterface ...$conditions): PromiseInterface
{
return self::getCurrentContext()->awaitWithTimeout($interval, ...$conditions);
}
Expand Down Expand Up @@ -993,4 +994,27 @@ public static function uuid7(?\DateTimeInterface $dateTime = null): PromiseInter

return $context->uuid7($dateTime);
}

/**
* Run a function when the mutex is released.
* The mutex is locked for the duration of the function.
*
* @template T
* @param Mutex $mutex Mutex name or instance.
* @param callable(): T $callable Function to run.
*
* @return CancellationScopeInterface<T>
*/
public static function runLocked(Mutex $mutex, callable $callable): CancellationScopeInterface
{
return Workflow::async(static function () use ($mutex, $callable): \Generator {
yield $mutex->lock();

try {
return yield $callable();
} finally {
$mutex->unlock();
}
});
}
}
1 change: 1 addition & 0 deletions src/Workflow/CancellationScopeInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

/**
* @template T
* @yield T
* @extends PromiseInterface<T>
*/
interface CancellationScopeInterface extends PromiseInterface
Expand Down
82 changes: 82 additions & 0 deletions src/Workflow/Mutex.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
<?php

declare(strict_types=1);

namespace Temporal\Workflow;

use React\Promise\Deferred;
use React\Promise\PromiseInterface;
use Temporal\Promise;

/**
* If a mutex is yielded without calling `lock()`, the Workflow will continue
* only when the lock is released.
*
* ```
* $this->mutex = new Mutex();
*
* // Continue only when the lock is released
* yield $this->mutex;
* ```
*/
final class Mutex
{
private bool $locked = false;

/** @var Deferred[] */
private array $waiters = [];

/**
* Lock the mutex.
*
* ```
* // Continue only when the lock is acquired
* yield $this->mutex->lock();
* ```
*
* @return PromiseInterface<self> A promise that resolves when the lock is acquired.
*/
public function lock(): PromiseInterface
{
if (!$this->locked) {
$this->locked = true;
return Promise::resolve($this);
}

$deferred = new Deferred();
$this->waiters[] = $deferred;

return $deferred->promise();
}

/**
* Try to lock the mutex.
*
* @return bool Returns true if the mutex was successfully locked, false otherwise.
*/
public function tryLock(): bool
{
return !$this->locked and $this->locked = true;
}

/**
* Release the lock.
*/
public function unlock(): void
{
if ($this->waiters === []) {
$this->locked = false;
return;
}

\array_shift($this->waiters)->resolve($this);
}

/**
* Check if the mutex is locked.
*/
public function isLocked(): bool
{
return $this->locked;
}
}
8 changes: 4 additions & 4 deletions src/Workflow/WorkflowContextInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,10 @@ public function newUntypedActivityStub(
*
* @see Workflow::await()
*
* @param callable|PromiseInterface ...$conditions
* @param callable|Mutex|PromiseInterface ...$conditions
* @return PromiseInterface
*/
public function await(...$conditions): PromiseInterface;
public function await(callable|Mutex|PromiseInterface ...$conditions): PromiseInterface;

/**
* Checks if any conditions were met or the timeout was reached.
Expand All @@ -321,10 +321,10 @@ public function await(...$conditions): PromiseInterface;
* @see Workflow::awaitWithTimeout()
*
* @param DateIntervalValue $interval
* @param callable|PromiseInterface ...$conditions
* @param callable|Mutex|PromiseInterface ...$conditions
* @return PromiseInterface<bool>
*/
public function awaitWithTimeout($interval, ...$conditions): PromiseInterface;
public function awaitWithTimeout($interval, callable|Mutex|PromiseInterface ...$conditions): PromiseInterface;

/**
* Returns a complete trace of the last calls (for debugging).
Expand Down
20 changes: 19 additions & 1 deletion tests/Acceptance/App/Feature/WorkflowStubInjector.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,25 @@ public function createInjection(
$attribute->memo === [] or $options = $options->withMemo($attribute->memo);

$stub = $client->newUntypedWorkflowStub($attribute->type, $options);
$client->start($stub, ...$attribute->args);
$run = $client->start($stub, ...$attribute->args);

// Wait 5 seconds for the workflow to start
$deadline = \microtime(true) + 5;
checkStart:
$description = $run->describe();
if ($description->info->historyLength <= 2) {
if (\microtime(true) < $deadline) {
goto checkStart;
}

throw new \RuntimeException(
\sprintf(
'Workflow %s did not start. TaskQueue: %s',
$attribute->type,
$feature->taskQueue,
),
);
}

return $stub;
}
Expand Down
Loading
Loading