From c90754b057ef5c9b897fc337d6cf9bfa5ee4b4fe Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Mon, 13 Jan 2025 12:27:37 +0500 Subject: [PATCH] feat: add TopologyRequest --- pyzeebe/client/client.py | 17 ++++++ pyzeebe/client/sync_client.py | 27 ++++++---- pyzeebe/grpc_internals/types.py | 54 +++++++++++++++++++ pyzeebe/grpc_internals/zeebe_adapter.py | 3 +- .../grpc_internals/zeebe_cluster_adapter.py | 40 ++++++++++++++ tests/integration/topology_test.py | 10 ++++ tests/unit/channel/utils_test.py | 2 +- tests/unit/client/client_test.py | 5 ++ tests/unit/client/sync_client_test.py | 9 ++++ .../zeebe_cluster_adapter_test.py | 12 +++++ tests/unit/utils/gateway_mock.py | 4 ++ 11 files changed, 172 insertions(+), 11 deletions(-) create mode 100644 pyzeebe/grpc_internals/zeebe_cluster_adapter.py create mode 100644 tests/integration/topology_test.py create mode 100644 tests/unit/grpc_internals/zeebe_cluster_adapter_test.py diff --git a/pyzeebe/client/client.py b/pyzeebe/client/client.py index 0d9f3870..0ae4217b 100644 --- a/pyzeebe/client/client.py +++ b/pyzeebe/client/client.py @@ -12,6 +12,7 @@ CreateProcessInstanceWithResultResponse, DeployResourceResponse, PublishMessageResponse, + TopologyResponse, ) from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter from pyzeebe.types import Variables @@ -220,3 +221,19 @@ async def publish_message( message_id=message_id, tenant_id=tenant_id, ) + + async def topology(self) -> TopologyResponse: + """ + Obtains the current topology of the cluster the gateway is part of. + + Returns: + TopologyResponse: response from Zeebe. + + Raises: + ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests) + ZeebeGatewayUnavailableError: If the Zeebe gateway is unavailable + ZeebeInternalError: If Zeebe experiences an internal error + UnknownGrpcStatusCodeError: If Zeebe returns an unexpected status code + + """ + return await self.zeebe_adapter.topology() diff --git a/pyzeebe/client/sync_client.py b/pyzeebe/client/sync_client.py index 4fee6b96..8d65d34b 100644 --- a/pyzeebe/client/sync_client.py +++ b/pyzeebe/client/sync_client.py @@ -2,7 +2,6 @@ import asyncio import os -from functools import partial, wraps import grpc @@ -14,18 +13,16 @@ CreateProcessInstanceWithResultResponse, DeployResourceResponse, PublishMessageResponse, + TopologyResponse, ) from pyzeebe.types import Variables -copy_docstring = partial(wraps, assigned=["__doc__"], updated=[]) - class SyncZeebeClient: def __init__(self, grpc_channel: grpc.aio.Channel, max_connection_retries: int = 10) -> None: self.loop = asyncio.get_event_loop() self.client = ZeebeClient(grpc_channel, max_connection_retries) - @copy_docstring(ZeebeClient.run_process) def run_process( self, bpmn_process_id: str, @@ -35,7 +32,8 @@ def run_process( ) -> CreateProcessInstanceResponse: return self.loop.run_until_complete(self.client.run_process(bpmn_process_id, variables, version, tenant_id)) - @copy_docstring(ZeebeClient.run_process_with_result) + run_process.__doc__ = ZeebeClient.publish_message.__doc__ + def run_process_with_result( self, bpmn_process_id: str, @@ -51,17 +49,20 @@ def run_process_with_result( ) ) - @copy_docstring(ZeebeClient.cancel_process_instance) + run_process_with_result.__doc__ = ZeebeClient.publish_message.__doc__ + def cancel_process_instance(self, process_instance_key: int) -> CancelProcessInstanceResponse: return self.loop.run_until_complete(self.client.cancel_process_instance(process_instance_key)) - @copy_docstring(ZeebeClient.deploy_resource) + cancel_process_instance.__doc__ = ZeebeClient.cancel_process_instance.__doc__ + def deploy_resource( self, *resource_file_path: str | os.PathLike[str], tenant_id: str | None = None ) -> DeployResourceResponse: return self.loop.run_until_complete(self.client.deploy_resource(*resource_file_path, tenant_id=tenant_id)) - @copy_docstring(ZeebeClient.broadcast_signal) + deploy_resource.__doc__ = ZeebeClient.deploy_resource.__doc__ + def broadcast_signal( self, signal_name: str, @@ -76,7 +77,8 @@ def broadcast_signal( ) ) - @copy_docstring(ZeebeClient.publish_message) + broadcast_signal.__doc__ = ZeebeClient.broadcast_signal.__doc__ + def publish_message( self, name: str, @@ -96,3 +98,10 @@ def publish_message( tenant_id, ) ) + + publish_message.__doc__ = ZeebeClient.publish_message.__doc__ + + def topology(self) -> TopologyResponse: + return self.loop.run_until_complete(self.client.topology()) + + topology.__doc__ = ZeebeClient.topology.__doc__ diff --git a/pyzeebe/grpc_internals/types.py b/pyzeebe/grpc_internals/types.py index 4a4d4a7e..eb56d7f1 100644 --- a/pyzeebe/grpc_internals/types.py +++ b/pyzeebe/grpc_internals/types.py @@ -1,5 +1,6 @@ from __future__ import annotations +import enum from dataclasses import dataclass from pyzeebe.types import Variables @@ -163,3 +164,56 @@ class FailJobResponse: @dataclass(frozen=True) class ThrowErrorResponse: pass + + +@dataclass(frozen=True) +class TopologyResponse: + + @dataclass(frozen=True) + class BrokerInfo: + + @dataclass(frozen=True) + class Partition: + + class PartitionBrokerRole(enum.IntEnum): + """Describes the Raft role of the broker for a given partition""" + + LEADER = 0 + FOLLOWER = 1 + INACTIVE = 2 + + class PartitionBrokerHealth(enum.IntEnum): + """Describes the current health of the partition""" + + HEALTHY = 0 + UNHEALTHY = 1 + DEAD = 2 + + partition_id: int + """the unique ID of this partition""" + role: PartitionBrokerRole + """the role of the broker for this partition""" + health: PartitionBrokerHealth + """the health of this partition""" + + node_id: int + """unique (within a cluster) node ID for the broker""" + host: str + """hostname of the broker""" + port: int + """port for the broker""" + partitions: list[Partition] + """list of partitions managed or replicated on this broker""" + version: str + """broker version""" + + brokers: list[BrokerInfo] + """list of brokers part of this cluster""" + cluster_size: int + """how many nodes are in the cluster""" + partitions_count: int + """how many partitions are spread across the cluster""" + replication_factor: int + """configured replication factor for this cluster""" + gateway_version: str + """gateway version""" diff --git a/pyzeebe/grpc_internals/zeebe_adapter.py b/pyzeebe/grpc_internals/zeebe_adapter.py index 6cf64767..df215f58 100644 --- a/pyzeebe/grpc_internals/zeebe_adapter.py +++ b/pyzeebe/grpc_internals/zeebe_adapter.py @@ -1,8 +1,9 @@ +from pyzeebe.grpc_internals.zeebe_cluster_adapter import ZeebeClusterAdapter from pyzeebe.grpc_internals.zeebe_job_adapter import ZeebeJobAdapter from pyzeebe.grpc_internals.zeebe_message_adapter import ZeebeMessageAdapter from pyzeebe.grpc_internals.zeebe_process_adapter import ZeebeProcessAdapter # Mixin class -class ZeebeAdapter(ZeebeProcessAdapter, ZeebeJobAdapter, ZeebeMessageAdapter): +class ZeebeAdapter(ZeebeClusterAdapter, ZeebeProcessAdapter, ZeebeJobAdapter, ZeebeMessageAdapter): pass diff --git a/pyzeebe/grpc_internals/zeebe_cluster_adapter.py b/pyzeebe/grpc_internals/zeebe_cluster_adapter.py new file mode 100644 index 00000000..3694600c --- /dev/null +++ b/pyzeebe/grpc_internals/zeebe_cluster_adapter.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +import grpc + +from pyzeebe.grpc_internals.zeebe_adapter_base import ZeebeAdapterBase +from pyzeebe.proto.gateway_pb2 import TopologyRequest + +from .types import TopologyResponse + + +class ZeebeClusterAdapter(ZeebeAdapterBase): + async def topology(self) -> TopologyResponse: + try: + response = await self._gateway_stub.Topology(TopologyRequest()) + except grpc.aio.AioRpcError as grpc_error: + await self._handle_grpc_error(grpc_error) + + return TopologyResponse( + brokers=[ + TopologyResponse.BrokerInfo( + node_id=broker.nodeId, + host=broker.host, + port=broker.port, + partitions=[ + TopologyResponse.BrokerInfo.Partition( + partition_id=partition.partitionId, + role=TopologyResponse.BrokerInfo.Partition.PartitionBrokerRole(partition.role), + health=TopologyResponse.BrokerInfo.Partition.PartitionBrokerHealth(partition.health), + ) + for partition in broker.partitions + ], + version=broker.version, + ) + for broker in response.brokers + ], + cluster_size=response.clusterSize, + partitions_count=response.partitionsCount, + replication_factor=response.replicationFactor, + gateway_version=response.gatewayVersion, + ) diff --git a/tests/integration/topology_test.py b/tests/integration/topology_test.py new file mode 100644 index 00000000..1a74a822 --- /dev/null +++ b/tests/integration/topology_test.py @@ -0,0 +1,10 @@ +import pytest + +from pyzeebe import ZeebeClient + + +@pytest.mark.e2e +async def test_topology(zeebe_client: ZeebeClient): + topology = await zeebe_client.topology() + + assert topology.cluster_size == 1 diff --git a/tests/unit/channel/utils_test.py b/tests/unit/channel/utils_test.py index 4cbc3b80..c5455f42 100644 --- a/tests/unit/channel/utils_test.py +++ b/tests/unit/channel/utils_test.py @@ -176,7 +176,7 @@ class TestGetCamundaAddress: def test_is_calculated_from_parameters_as_highest_priority(self): result = get_camunda_address("cluster_id_param", "camunda_region_param") - assert result == f"cluster_id_param.camunda_region_param.zeebe.camunda.io:443" + assert result == "cluster_id_param.camunda_region_param.zeebe.camunda.io:443" def test_raises_error_if_cluster_id_is_none(self): with pytest.raises(SettingsError): diff --git a/tests/unit/client/client_test.py b/tests/unit/client/client_test.py index d98010a8..0ee20470 100644 --- a/tests/unit/client/client_test.py +++ b/tests/unit/client/client_test.py @@ -92,3 +92,8 @@ async def test_broadcast_signal(zeebe_client: ZeebeClient): @pytest.mark.asyncio async def test_publish_message(zeebe_client: ZeebeClient): await zeebe_client.publish_message(name=str(uuid4()), correlation_key=str(uuid4())) + + +@pytest.mark.asyncio +async def test_topology(zeebe_client: ZeebeClient): + await zeebe_client.topology() diff --git a/tests/unit/client/sync_client_test.py b/tests/unit/client/sync_client_test.py index 27e1ea0e..2cc12db1 100644 --- a/tests/unit/client/sync_client_test.py +++ b/tests/unit/client/sync_client_test.py @@ -103,3 +103,12 @@ def test_calls_publish_message_of_zeebe_client(self, sync_zeebe_client: SyncZeeb sync_zeebe_client.publish_message(name, correlation_key) sync_zeebe_client.client.publish_message.assert_called_once() + + +class TestTopology: + def test_calls_topology_of_zeebe_client(self, sync_zeebe_client: SyncZeebeClient): + sync_zeebe_client.client.topology = AsyncMock() + + sync_zeebe_client.topology() + + sync_zeebe_client.client.topology.assert_called_once() diff --git a/tests/unit/grpc_internals/zeebe_cluster_adapter_test.py b/tests/unit/grpc_internals/zeebe_cluster_adapter_test.py new file mode 100644 index 00000000..2a358d8f --- /dev/null +++ b/tests/unit/grpc_internals/zeebe_cluster_adapter_test.py @@ -0,0 +1,12 @@ +import pytest + +from pyzeebe.grpc_internals.types import TopologyResponse +from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter + + +@pytest.mark.asyncio +class TestTopology: + async def test_response_is_of_correct_type(self, zeebe_adapter: ZeebeAdapter): + response = await zeebe_adapter.topology() + + assert isinstance(response, TopologyResponse) diff --git a/tests/unit/utils/gateway_mock.py b/tests/unit/utils/gateway_mock.py index dbcc271e..bfe931f1 100644 --- a/tests/unit/utils/gateway_mock.py +++ b/tests/unit/utils/gateway_mock.py @@ -22,6 +22,7 @@ FormMetadata, ProcessMetadata, PublishMessageResponse, + TopologyResponse, ) from pyzeebe.proto.gateway_pb2_grpc import GatewayServicer from pyzeebe.task.task import Task @@ -218,3 +219,6 @@ def mock_deploy_process(self, bpmn_process_id: str, version: int, tasks: list[Ta "version": version, "tasks": tasks, } + + def Topology(self, request, context): + return TopologyResponse()