Skip to content

Commit

Permalink
Merge pull request #556 from camunda-community-hub/issue493
Browse files Browse the repository at this point in the history
feat: add BroadcastSignalRequest
  • Loading branch information
dimastbk authored Jan 5, 2025
2 parents f3b3fa0 + 46fbc2b commit 0b09f03
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 3 deletions.
5 changes: 5 additions & 0 deletions docs/zeebe_adapter_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ Zeebe GRPC Responses
:undoc-members:
:member-order: bysource

.. autoclass:: pyzeebe.grpc_internals.types.BroadcastSignalResponse
:members:
:undoc-members:
:member-order: bysource

.. autoclass:: pyzeebe.grpc_internals.types.PublishMessageResponse
:members:
:undoc-members:
Expand Down
31 changes: 31 additions & 0 deletions pyzeebe/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import grpc

from pyzeebe.grpc_internals.types import (
BroadcastSignalResponse,
CancelProcessInstanceResponse,
CreateProcessInstanceResponse,
CreateProcessInstanceWithResultResponse,
Expand Down Expand Up @@ -146,6 +147,36 @@ async def deploy_resource(self, *resource_file_path: str, tenant_id: str | None
"""
return await self.zeebe_adapter.deploy_resource(*resource_file_path, tenant_id=tenant_id)

async def broadcast_signal(
self,
signal_name: str,
variables: Variables | None = None,
tenant_id: str | None = None,
) -> BroadcastSignalResponse:
"""
Broadcasts a signal
Args:
signal_name (str): The name of the signal
variables (dict): The variables the signal should contain.
tenant_id (str): The tenant ID of the message. New in Zeebe 8.4.
Returns:
BroadcastSignalResponse: response from Zeebe.
Raises:
ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests)
ZeebeGatewayUnavailableError: If the Zeebe gateway is unavailable
ZeebeInternalError: If Zeebe experiences an internal error
UnknownGrpcStatusCodeError: If Zeebe returns an unexpected status code
"""
return await self.zeebe_adapter.broadcast_signal(
signal_name=signal_name,
variables=variables or {},
tenant_id=tenant_id,
)

async def publish_message(
self,
name: str,
Expand Down
16 changes: 16 additions & 0 deletions pyzeebe/client/sync_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from pyzeebe import ZeebeClient
from pyzeebe.grpc_internals.types import (
BroadcastSignalResponse,
CancelProcessInstanceResponse,
CreateProcessInstanceResponse,
CreateProcessInstanceWithResultResponse,
Expand Down Expand Up @@ -57,6 +58,21 @@ def cancel_process_instance(self, process_instance_key: int) -> CancelProcessIns
def deploy_resource(self, *resource_file_path: str, tenant_id: str | None = None) -> DeployResourceResponse:
return self.loop.run_until_complete(self.client.deploy_resource(*resource_file_path, tenant_id=tenant_id))

@copy_docstring(ZeebeClient.broadcast_signal)
def broadcast_signal(
self,
signal_name: str,
variables: Variables | None = None,
tenant_id: str | None = None,
) -> BroadcastSignalResponse:
return self.loop.run_until_complete(
self.client.broadcast_signal(
signal_name,
variables,
tenant_id,
)
)

@copy_docstring(ZeebeClient.publish_message)
def publish_message(
self,
Expand Down
8 changes: 8 additions & 0 deletions pyzeebe/grpc_internals/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ class FormMetadata:
"""the tenant ID of the deployed resources"""


@dataclass(frozen=True)
class BroadcastSignalResponse:
key: int
"""the unique ID of the signal that was broadcasted"""
tenant_id: str | None
"""the tenant ID of the signal that was broadcasted"""


@dataclass(frozen=True)
class PublishMessageResponse:
key: int
Expand Down
23 changes: 21 additions & 2 deletions pyzeebe/grpc_internals/zeebe_message_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,32 @@
from pyzeebe.errors import MessageAlreadyExistsError
from pyzeebe.grpc_internals.grpc_utils import is_error_status
from pyzeebe.grpc_internals.zeebe_adapter_base import ZeebeAdapterBase
from pyzeebe.proto.gateway_pb2 import PublishMessageRequest
from pyzeebe.proto.gateway_pb2 import BroadcastSignalRequest, PublishMessageRequest
from pyzeebe.types import Variables

from .types import PublishMessageResponse
from .types import BroadcastSignalResponse, PublishMessageResponse


class ZeebeMessageAdapter(ZeebeAdapterBase):
async def broadcast_signal(
self,
signal_name: str,
variables: Variables,
tenant_id: str | None = None,
) -> BroadcastSignalResponse:
try:
response = await self._gateway_stub.BroadcastSignal(
BroadcastSignalRequest(
signalName=signal_name,
variables=json.dumps(variables),
tenantId=tenant_id, # type: ignore[arg-type]
)
)
except grpc.aio.AioRpcError as grpc_error:
await self._handle_grpc_error(grpc_error)

return BroadcastSignalResponse(key=response.key, tenant_id=response.tenantId)

async def publish_message(
self,
name: str,
Expand Down
5 changes: 5 additions & 0 deletions tests/unit/client/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ async def test_cancel_process_instance(zeebe_client: ZeebeClient, grpc_servicer:
)


@pytest.mark.asyncio
async def test_broadcast_signal(zeebe_client: ZeebeClient):
await zeebe_client.broadcast_signal(signal_name=str(uuid4()))


@pytest.mark.asyncio
async def test_publish_message(zeebe_client: ZeebeClient):
await zeebe_client.publish_message(name=str(uuid4()), correlation_key=str(uuid4()))
10 changes: 10 additions & 0 deletions tests/unit/client/sync_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ def test_calls_deploy_resource_of_zeebe_client(self, sync_zeebe_client: SyncZeeb
sync_zeebe_client.client.deploy_resource.assert_called_with(file_path, tenant_id=None)


class TestBroadcastSignal:
def test_calls_broadcast_signal_of_zeebe_client(self, sync_zeebe_client: SyncZeebeClient):
sync_zeebe_client.client.broadcast_signal = AsyncMock()
name = str(uuid4())

sync_zeebe_client.broadcast_signal(name)

sync_zeebe_client.client.broadcast_signal.assert_called_once()


class TestPublishMessage:
def test_calls_publish_message_of_zeebe_client(self, sync_zeebe_client: SyncZeebeClient):
sync_zeebe_client.client.publish_message = AsyncMock()
Expand Down
23 changes: 22 additions & 1 deletion tests/unit/grpc_internals/zeebe_message_adapter_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pytest

from pyzeebe.errors import MessageAlreadyExistsError
from pyzeebe.grpc_internals.types import PublishMessageResponse
from pyzeebe.grpc_internals.types import BroadcastSignalResponse, PublishMessageResponse
from pyzeebe.grpc_internals.zeebe_message_adapter import ZeebeMessageAdapter
from tests.unit.utils.random_utils import RANDOM_RANGE

Expand Down Expand Up @@ -52,3 +52,24 @@ async def test_raises_on_duplicate_message(self):
with pytest.raises(MessageAlreadyExistsError):
await self.publish_message(message_id=message_id)
await self.publish_message(message_id=message_id)


@pytest.mark.asyncio
class TestBroadcastSignal:
zeebe_message_adapter: ZeebeMessageAdapter

@pytest.fixture(autouse=True)
def set_up(self, zeebe_adapter: ZeebeMessageAdapter):
self.zeebe_message_adapter = zeebe_adapter

async def broadcast_signal(
self,
name=str(uuid4()),
variables={},
):
return await self.zeebe_message_adapter.broadcast_signal(name, variables)

async def test_response_is_of_correct_type(self):
response = await self.broadcast_signal()

assert isinstance(response, BroadcastSignalResponse)
4 changes: 4 additions & 0 deletions tests/unit/utils/gateway_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pyzeebe.proto.gateway_pb2 import (
ActivatedJob,
ActivateJobsResponse,
BroadcastSignalResponse,
CancelProcessInstanceResponse,
CompleteJobResponse,
CreateProcessInstanceResponse,
Expand Down Expand Up @@ -201,6 +202,9 @@ def DeployResource(self, request, context):

return DeployResourceResponse(key=randint(0, RANDOM_RANGE), deployments=resources, tenantId=request.tenantId)

def BroadcastSignal(self, request, context):
return BroadcastSignalResponse()

def PublishMessage(self, request, context):
if request.messageId in self.messages.keys():
context.set_code(grpc.StatusCode.ALREADY_EXISTS)
Expand Down

0 comments on commit 0b09f03

Please sign in to comment.