Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add TopologyRequest #563

Merged
merged 1 commit into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions pyzeebe/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
CreateProcessInstanceWithResultResponse,
DeployResourceResponse,
PublishMessageResponse,
TopologyResponse,
)
from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter
from pyzeebe.types import Variables
Expand Down Expand Up @@ -220,3 +221,19 @@ async def publish_message(
message_id=message_id,
tenant_id=tenant_id,
)

async def topology(self) -> TopologyResponse:
"""
Obtains the current topology of the cluster the gateway is part of.

Returns:
TopologyResponse: response from Zeebe.

Raises:
ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests)
ZeebeGatewayUnavailableError: If the Zeebe gateway is unavailable
ZeebeInternalError: If Zeebe experiences an internal error
UnknownGrpcStatusCodeError: If Zeebe returns an unexpected status code

"""
return await self.zeebe_adapter.topology()
27 changes: 18 additions & 9 deletions pyzeebe/client/sync_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import asyncio
import os
from functools import partial, wraps

import grpc

Expand All @@ -14,18 +13,16 @@
CreateProcessInstanceWithResultResponse,
DeployResourceResponse,
PublishMessageResponse,
TopologyResponse,
)
from pyzeebe.types import Variables

copy_docstring = partial(wraps, assigned=["__doc__"], updated=[])


class SyncZeebeClient:
def __init__(self, grpc_channel: grpc.aio.Channel, max_connection_retries: int = 10) -> None:
self.loop = asyncio.get_event_loop()
self.client = ZeebeClient(grpc_channel, max_connection_retries)

@copy_docstring(ZeebeClient.run_process)
def run_process(
self,
bpmn_process_id: str,
Expand All @@ -35,7 +32,8 @@ def run_process(
) -> CreateProcessInstanceResponse:
return self.loop.run_until_complete(self.client.run_process(bpmn_process_id, variables, version, tenant_id))

@copy_docstring(ZeebeClient.run_process_with_result)
run_process.__doc__ = ZeebeClient.publish_message.__doc__

def run_process_with_result(
self,
bpmn_process_id: str,
Expand All @@ -51,17 +49,20 @@ def run_process_with_result(
)
)

@copy_docstring(ZeebeClient.cancel_process_instance)
run_process_with_result.__doc__ = ZeebeClient.publish_message.__doc__

def cancel_process_instance(self, process_instance_key: int) -> CancelProcessInstanceResponse:
return self.loop.run_until_complete(self.client.cancel_process_instance(process_instance_key))

@copy_docstring(ZeebeClient.deploy_resource)
cancel_process_instance.__doc__ = ZeebeClient.cancel_process_instance.__doc__

def deploy_resource(
self, *resource_file_path: str | os.PathLike[str], tenant_id: str | None = None
) -> DeployResourceResponse:
return self.loop.run_until_complete(self.client.deploy_resource(*resource_file_path, tenant_id=tenant_id))

@copy_docstring(ZeebeClient.broadcast_signal)
deploy_resource.__doc__ = ZeebeClient.deploy_resource.__doc__

def broadcast_signal(
self,
signal_name: str,
Expand All @@ -76,7 +77,8 @@ def broadcast_signal(
)
)

@copy_docstring(ZeebeClient.publish_message)
broadcast_signal.__doc__ = ZeebeClient.broadcast_signal.__doc__

def publish_message(
self,
name: str,
Expand All @@ -96,3 +98,10 @@ def publish_message(
tenant_id,
)
)

publish_message.__doc__ = ZeebeClient.publish_message.__doc__

def topology(self) -> TopologyResponse:
return self.loop.run_until_complete(self.client.topology())

topology.__doc__ = ZeebeClient.topology.__doc__
54 changes: 54 additions & 0 deletions pyzeebe/grpc_internals/types.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import enum
from dataclasses import dataclass

from pyzeebe.types import Variables
Expand Down Expand Up @@ -163,3 +164,56 @@ class FailJobResponse:
@dataclass(frozen=True)
class ThrowErrorResponse:
pass


@dataclass(frozen=True)
class TopologyResponse:

@dataclass(frozen=True)
class BrokerInfo:

@dataclass(frozen=True)
class Partition:

class PartitionBrokerRole(enum.IntEnum):
"""Describes the Raft role of the broker for a given partition"""

LEADER = 0
FOLLOWER = 1
INACTIVE = 2

class PartitionBrokerHealth(enum.IntEnum):
"""Describes the current health of the partition"""

HEALTHY = 0
UNHEALTHY = 1
DEAD = 2

partition_id: int
"""the unique ID of this partition"""
role: PartitionBrokerRole
"""the role of the broker for this partition"""
health: PartitionBrokerHealth
"""the health of this partition"""

node_id: int
"""unique (within a cluster) node ID for the broker"""
host: str
"""hostname of the broker"""
port: int
"""port for the broker"""
partitions: list[Partition]
"""list of partitions managed or replicated on this broker"""
version: str
"""broker version"""

brokers: list[BrokerInfo]
"""list of brokers part of this cluster"""
cluster_size: int
"""how many nodes are in the cluster"""
partitions_count: int
"""how many partitions are spread across the cluster"""
replication_factor: int
"""configured replication factor for this cluster"""
gateway_version: str
"""gateway version"""
3 changes: 2 additions & 1 deletion pyzeebe/grpc_internals/zeebe_adapter.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from pyzeebe.grpc_internals.zeebe_cluster_adapter import ZeebeClusterAdapter
from pyzeebe.grpc_internals.zeebe_job_adapter import ZeebeJobAdapter
from pyzeebe.grpc_internals.zeebe_message_adapter import ZeebeMessageAdapter
from pyzeebe.grpc_internals.zeebe_process_adapter import ZeebeProcessAdapter


# Mixin class
class ZeebeAdapter(ZeebeProcessAdapter, ZeebeJobAdapter, ZeebeMessageAdapter):
class ZeebeAdapter(ZeebeClusterAdapter, ZeebeProcessAdapter, ZeebeJobAdapter, ZeebeMessageAdapter):
pass
40 changes: 40 additions & 0 deletions pyzeebe/grpc_internals/zeebe_cluster_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from __future__ import annotations

import grpc

from pyzeebe.grpc_internals.zeebe_adapter_base import ZeebeAdapterBase
from pyzeebe.proto.gateway_pb2 import TopologyRequest

from .types import TopologyResponse


class ZeebeClusterAdapter(ZeebeAdapterBase):
async def topology(self) -> TopologyResponse:
try:
response = await self._gateway_stub.Topology(TopologyRequest())
except grpc.aio.AioRpcError as grpc_error:
await self._handle_grpc_error(grpc_error)

return TopologyResponse(
brokers=[
TopologyResponse.BrokerInfo(
node_id=broker.nodeId,
host=broker.host,
port=broker.port,
partitions=[
TopologyResponse.BrokerInfo.Partition(
partition_id=partition.partitionId,
role=TopologyResponse.BrokerInfo.Partition.PartitionBrokerRole(partition.role),
health=TopologyResponse.BrokerInfo.Partition.PartitionBrokerHealth(partition.health),
)
for partition in broker.partitions
],
version=broker.version,
)
for broker in response.brokers
],
cluster_size=response.clusterSize,
partitions_count=response.partitionsCount,
replication_factor=response.replicationFactor,
gateway_version=response.gatewayVersion,
)
10 changes: 10 additions & 0 deletions tests/integration/topology_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import pytest

from pyzeebe import ZeebeClient


@pytest.mark.e2e
async def test_topology(zeebe_client: ZeebeClient):
topology = await zeebe_client.topology()

assert topology.cluster_size == 1
2 changes: 1 addition & 1 deletion tests/unit/channel/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class TestGetCamundaAddress:
def test_is_calculated_from_parameters_as_highest_priority(self):
result = get_camunda_address("cluster_id_param", "camunda_region_param")

assert result == f"cluster_id_param.camunda_region_param.zeebe.camunda.io:443"
assert result == "cluster_id_param.camunda_region_param.zeebe.camunda.io:443"

def test_raises_error_if_cluster_id_is_none(self):
with pytest.raises(SettingsError):
Expand Down
5 changes: 5 additions & 0 deletions tests/unit/client/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,8 @@ async def test_broadcast_signal(zeebe_client: ZeebeClient):
@pytest.mark.asyncio
async def test_publish_message(zeebe_client: ZeebeClient):
await zeebe_client.publish_message(name=str(uuid4()), correlation_key=str(uuid4()))


@pytest.mark.asyncio
async def test_topology(zeebe_client: ZeebeClient):
await zeebe_client.topology()
9 changes: 9 additions & 0 deletions tests/unit/client/sync_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,12 @@ def test_calls_publish_message_of_zeebe_client(self, sync_zeebe_client: SyncZeeb
sync_zeebe_client.publish_message(name, correlation_key)

sync_zeebe_client.client.publish_message.assert_called_once()


class TestTopology:
def test_calls_topology_of_zeebe_client(self, sync_zeebe_client: SyncZeebeClient):
sync_zeebe_client.client.topology = AsyncMock()

sync_zeebe_client.topology()

sync_zeebe_client.client.topology.assert_called_once()
12 changes: 12 additions & 0 deletions tests/unit/grpc_internals/zeebe_cluster_adapter_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import pytest

from pyzeebe.grpc_internals.types import TopologyResponse
from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter


@pytest.mark.asyncio
class TestTopology:
async def test_response_is_of_correct_type(self, zeebe_adapter: ZeebeAdapter):
response = await zeebe_adapter.topology()

assert isinstance(response, TopologyResponse)
4 changes: 4 additions & 0 deletions tests/unit/utils/gateway_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
FormMetadata,
ProcessMetadata,
PublishMessageResponse,
TopologyResponse,
)
from pyzeebe.proto.gateway_pb2_grpc import GatewayServicer
from pyzeebe.task.task import Task
Expand Down Expand Up @@ -218,3 +219,6 @@ def mock_deploy_process(self, bpmn_process_id: str, version: int, tasks: list[Ta
"version": version,
"tasks": tasks,
}

def Topology(self, request, context):
return TopologyResponse()
Loading