Skip to content

Commit

Permalink
feat: add EvaluateDecision
Browse files Browse the repository at this point in the history
  • Loading branch information
dimastbk committed Jan 13, 2025
1 parent d2822f6 commit 4fb6e8d
Show file tree
Hide file tree
Showing 14 changed files with 448 additions and 3 deletions.
35 changes: 35 additions & 0 deletions pyzeebe/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
CreateProcessInstanceResponse,
CreateProcessInstanceWithResultResponse,
DeployResourceResponse,
EvaluateDecisionResponse,
PublishMessageResponse,
TopologyResponse,
)
Expand Down Expand Up @@ -151,6 +152,40 @@ async def deploy_resource(
"""
return await self.zeebe_adapter.deploy_resource(*resource_file_path, tenant_id=tenant_id)

async def evaluate_decision(
self,
decision_key: int | None,
decision_id: str | None,
variables: Variables | None = None,
tenant_id: str | None = None,
) -> EvaluateDecisionResponse:
"""Evaluates a decision.
You specify the decision to evaluate either by using its unique KEY (as returned by :py:method:`ZeebeClient.deploy_resource`), or using the decision ID.
When using the decision ID, the latest deployed version of the decision is used.
Args:
decision_key (int): The unique key identifying the decision to be evaluated
(e.g. returned from a decision in the DeployResourceResponse message)
decision_id (str): The ID of the decision to be evaluated
variables (dict): A dictionary containing all variables for the decision to be evaluated. Must be JSONable.
tenant_id (strc): The tenant ID of the resources to deploy. New in Zeebe 8.3.
Returns:
EvaluateDecisionResponse: response from Zeebe.
Raises:
DecisionNotFoundError: No decision with decision_key/decision_id exists
ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests)
ZeebeGatewayUnavailableError: If the Zeebe gateway is unavailable
ZeebeInternalError: If Zeebe experiences an internal error
UnknownGrpcStatusCodeError: If Zeebe returns an unexpected status code
"""
return await self.zeebe_adapter.evaluate_decision(
decision_key, decision_id, variables=variables or {}, tenant_id=tenant_id
)

async def broadcast_signal(
self,
signal_name: str,
Expand Down
14 changes: 14 additions & 0 deletions pyzeebe/client/sync_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
CreateProcessInstanceResponse,
CreateProcessInstanceWithResultResponse,
DeployResourceResponse,
EvaluateDecisionResponse,
PublishMessageResponse,
TopologyResponse,
)
Expand Down Expand Up @@ -63,6 +64,19 @@ def deploy_resource(

deploy_resource.__doc__ = ZeebeClient.deploy_resource.__doc__

def evaluate_decision(
self,
decision_key: int | None,
decision_id: str | None,
variables: Variables | None = None,
tenant_id: str | None = None,
) -> EvaluateDecisionResponse:
return self.loop.run_until_complete(
self.client.evaluate_decision(decision_key, decision_id, variables=variables, tenant_id=tenant_id)
)

evaluate_decision.__doc__ = ZeebeClient.evaluate_decision.__doc__

def broadcast_signal(
self,
signal_name: str,
Expand Down
2 changes: 2 additions & 0 deletions pyzeebe/errors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
)
from .message_errors import MessageAlreadyExistsError
from .process_errors import (
DecisionNotFoundError,
InvalidJSONError,
ProcessDefinitionHasNoStartEventError,
ProcessDefinitionNotFoundError,
Expand Down Expand Up @@ -42,6 +43,7 @@
"ProcessInstanceNotFoundError",
"ProcessInvalidError",
"ProcessTimeoutError",
"DecisionNotFoundError",
"BusinessError",
"DuplicateTaskTypeError",
"NoVariableNameGivenError",
Expand Down
13 changes: 13 additions & 0 deletions pyzeebe/errors/process_errors.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from pyzeebe.errors.pyzeebe_errors import PyZeebeError


Expand Down Expand Up @@ -32,3 +34,14 @@ class ProcessTimeoutError(PyZeebeError, TimeoutError):
def __init__(self, bpmn_process_id: str):
super().__init__(f"Timeout while waiting for process {bpmn_process_id} to complete")
self.bpmn_process_id = bpmn_process_id


class DecisionNotFoundError(PyZeebeError):
def __init__(self, decision_key: int | None, decision_id: str | None):
if decision_id is not None:
msg = f"Decision with id '{decision_id}' was not found"
else:
msg = f"Decision with key '{decision_key}' was not found"
super().__init__(msg)
self.decision_key = decision_key
self.decision_id = decision_id
97 changes: 96 additions & 1 deletion pyzeebe/grpc_internals/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import enum
from dataclasses import dataclass

from pyzeebe.types import Variables
from pyzeebe.types import JsonType, Variables


@dataclass(frozen=True)
Expand Down Expand Up @@ -135,6 +135,101 @@ class FormMetadata:
"""the tenant ID of the deployed resources"""


@dataclass(frozen=True)
class EvaluateDecisionResponse:

@dataclass(frozen=True)
class EvaluatedDecision:

@dataclass(frozen=True)
class MatchedDecisionRule:

@dataclass(frozen=True)
class EvaluatedDecisionOutput:
output_id: str
"""the id of the evaluated decision output"""
output_name: str
"""the name of the evaluated decision output"""
output_value: JsonType
"""the value of the evaluated decision output"""

rule_id: str
"""the id of the matched rule"""
rule_index: int
"""the index of the matched rule"""
evaluated_outputs: list[EvaluatedDecisionOutput]
"""the evaluated decision outputs"""

@dataclass(frozen=True)
class EvaluatedDecisionInput:
input_id: str
"""the id of the evaluated decision input"""
input_name: str
"""the name of the evaluated decision input"""
input_value: JsonType
"""the value of the evaluated decision input"""

decision_key: int
"""the unique key identifying the decision which was evaluated (e.g. returned
from a decision in the DeployResourceResponse message)
"""
decision_id: str
"""the ID of the decision which was evaluated"""
decision_name: str
"""the name of the decision which was evaluated"""
decision_version: int
"""the version of the decision which was evaluated"""
decision_type: str
"""the type of the decision which was evaluated"""
decision_output: JsonType
"""JSON document that will instantiate the result of the decision which was
evaluated; it will be a JSON object, as the result output will be mapped
in a key-value fashion, e.g. { "a": 1 }.
"""
matched_rules: list[MatchedDecisionRule]
"""the decision rules that matched within this decision evaluation"""
evaluated_inputs: list[EvaluatedDecisionInput]
"""the decision inputs that were evaluated within this decision evaluation"""
tenant_id: str | None
"""the tenant identifier of the evaluated decision"""

decision_key: int
"""the unique key identifying the decision which was evaluated (e.g. returned
from a decision in the DeployResourceResponse message)
"""
decision_id: str
"""the ID of the decision which was evaluated"""
decision_name: str
"""the name of the decision which was evaluated"""
decision_version: int
"""the version of the decision which was evaluated"""
decision_requirements_id: str
"""the ID of the decision requirements graph that the decision which was
evaluated is part of.
"""
decision_requirements_key: int
"""the unique key identifying the decision requirements graph that the
decision which was evaluated is part of.
"""
decision_output: JsonType
"""JSON document that will instantiate the result of the decision which was
evaluated; it will be a JSON object, as the result output will be mapped
in a key-value fashion, e.g. { "a": 1 }.
"""
evaluated_decisions: list[EvaluatedDecision]
"""a list of decisions that were evaluated within the requested decision evaluation"""
failed_decision_id: str
"""an optional string indicating the ID of the decision which
failed during evaluation
"""
failure_message: str
"""an optional message describing why the decision which was evaluated failed"""
tenant_id: str | None
"""the tenant identifier of the evaluated decision"""
decision_instance_key: int
"""the unique key identifying this decision evaluation"""


@dataclass(frozen=True)
class BroadcastSignalResponse:
key: int
Expand Down
102 changes: 102 additions & 0 deletions pyzeebe/grpc_internals/zeebe_process_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import grpc

from pyzeebe.errors import (
DecisionNotFoundError,
InvalidJSONError,
ProcessDefinitionHasNoStartEventError,
ProcessDefinitionNotFoundError,
Expand All @@ -25,7 +26,17 @@
DecisionMetadata,
DecisionRequirementsMetadata,
DeployResourceRequest,
EvaluatedDecision,
EvaluatedDecisionInput,
EvaluatedDecisionOutput,
EvaluateDecisionRequest,
)
from pyzeebe.proto.gateway_pb2 import (
EvaluateDecisionResponse as EvaluateDecisionResponseStub,
)
from pyzeebe.proto.gateway_pb2 import (
FormMetadata,
MatchedDecisionRule,
ProcessMetadata,
Resource,
)
Expand All @@ -36,6 +47,7 @@
CreateProcessInstanceResponse,
CreateProcessInstanceWithResultResponse,
DeployResourceResponse,
EvaluateDecisionResponse,
)


Expand Down Expand Up @@ -205,6 +217,96 @@ def _create_form_from_raw_form(response: FormMetadata) -> DeployResourceResponse
tenant_id=response.tenantId,
)

async def evaluate_decision(
self,
decision_key: int | None,
decision_id: str | None,
variables: Variables,
tenant_id: str | None = None,
) -> EvaluateDecisionResponse:
if decision_id is None and decision_key is None:
raise ValueError("decision_key or decision_id must be not None")

try:
response: EvaluateDecisionResponseStub = await self._gateway_stub.EvaluateDecision(
EvaluateDecisionRequest(
decisionKey=decision_key, # type: ignore[arg-type]
decisionId=decision_id, # type: ignore[arg-type]
variables=json.dumps(variables),
tenantId=tenant_id, # type: ignore[arg-type]
)
)
except grpc.aio.AioRpcError as grpc_error:
if is_error_status(grpc_error, grpc.StatusCode.INVALID_ARGUMENT) and (details := grpc_error.details()):
if "but no decision found for" in details:
raise DecisionNotFoundError(decision_id=decision_id, decision_key=decision_key) from grpc_error
await self._handle_grpc_error(grpc_error)

return EvaluateDecisionResponse(
decision_key=response.decisionKey,
decision_id=response.decisionId,
decision_name=response.decisionName,
decision_version=response.decisionVersion,
decision_requirements_id=response.decisionRequirementsId,
decision_requirements_key=response.decisionRequirementsKey,
decision_output=json.loads(response.decisionOutput),
evaluated_decisions=[
self._create_evaluated_decision_from_raw(evaluated_decision)
for evaluated_decision in response.evaluatedDecisions
],
failed_decision_id=response.failedDecisionId,
failure_message=response.failureMessage,
tenant_id=response.tenantId,
decision_instance_key=response.decisionInstanceKey,
)

def _create_evaluated_decision_from_raw(
self, response: EvaluatedDecision
) -> EvaluateDecisionResponse.EvaluatedDecision:
return EvaluateDecisionResponse.EvaluatedDecision(
decision_key=response.decisionKey,
decision_id=response.decisionId,
decision_name=response.decisionName,
decision_version=response.decisionVersion,
decision_type=response.decisionType,
decision_output=json.loads(response.decisionOutput),
matched_rules=[self._create_matched_rule_from_raw(matched_rule) for matched_rule in response.matchedRules],
evaluated_inputs=[
self._create_evaluated_input_from_raw(evaluated_input) for evaluated_input in response.evaluatedInputs
],
tenant_id=response.tenantId,
)

def _create_matched_rule_from_raw(
self, response: MatchedDecisionRule
) -> EvaluateDecisionResponse.EvaluatedDecision.MatchedDecisionRule:
return EvaluateDecisionResponse.EvaluatedDecision.MatchedDecisionRule(
rule_id=response.ruleId,
rule_index=response.ruleIndex,
evaluated_outputs=[
self._create_evaluated_output_from_raw(evaluated_output)
for evaluated_output in response.evaluatedOutputs
],
)

def _create_evaluated_input_from_raw(
self, response: EvaluatedDecisionInput
) -> EvaluateDecisionResponse.EvaluatedDecision.EvaluatedDecisionInput:
return EvaluateDecisionResponse.EvaluatedDecision.EvaluatedDecisionInput(
input_id=response.inputId,
input_name=response.inputName,
input_value=json.loads(response.inputValue),
)

def _create_evaluated_output_from_raw(
self, response: EvaluatedDecisionOutput
) -> EvaluateDecisionResponse.EvaluatedDecision.MatchedDecisionRule.EvaluatedDecisionOutput:
return EvaluateDecisionResponse.EvaluatedDecision.MatchedDecisionRule.EvaluatedDecisionOutput(
output_id=response.outputId,
output_name=response.outputName,
output_value=json.loads(response.outputValue),
)


_METADATA_PARSERS: dict[
str,
Expand Down
7 changes: 5 additions & 2 deletions pyzeebe/types.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from collections.abc import Mapping, Sequence
from typing import Any
from typing import Any, Union

from typing_extensions import TypeAlias

Headers: TypeAlias = Mapping[str, Any]
Variables: TypeAlias = Mapping[str, Any]
Unset = "UNSET"

ChannelArgumentType: TypeAlias = Sequence[tuple[str, Any]]

JsonType: TypeAlias = Union[Mapping[str, "JsonType"], Sequence["JsonType"], str, int, float, bool, None]
JsonDictType: TypeAlias = Mapping[str, JsonType]
Variables: TypeAlias = JsonDictType
Loading

0 comments on commit 4fb6e8d

Please sign in to comment.