From 01cdd993afd9fd4daa73695183a921c49a2d6f16 Mon Sep 17 00:00:00 2001 From: Kristoffer Bakkejord Date: Sat, 3 Jul 2021 15:50:06 +0200 Subject: [PATCH 1/4] Change gRPC client to send keepalive pings over the channel This will detect if there's a network issue, and close the channel. Options can be added/changed by monkeypatching GRPC_CHANNEL_OPTIONS. Port of camunda-community-hub/pyzeebe/pull/180 (e43945c). --- pyzeebe/grpc_internals/channel_options.py | 15 ++++ pyzeebe/grpc_internals/grpc_channel_utils.py | 9 +-- .../grpc_internals/channel_options_test.py | 70 +++++++++++++++++++ 3 files changed, 90 insertions(+), 4 deletions(-) create mode 100644 pyzeebe/grpc_internals/channel_options.py create mode 100644 tests/unit/grpc_internals/channel_options_test.py diff --git a/pyzeebe/grpc_internals/channel_options.py b/pyzeebe/grpc_internals/channel_options.py new file mode 100644 index 00000000..fdba65be --- /dev/null +++ b/pyzeebe/grpc_internals/channel_options.py @@ -0,0 +1,15 @@ +GRPC_CHANNEL_OPTIONS = { + # https://docs.camunda.io/docs/product-manuals/zeebe/deployment-guide/operations/setting-up-a-cluster/#keep-alive-intervals + # "By default, the official Zeebe clients (Java and Go) send keep alive pings every 45 seconds." + "grpc.keepalive_time_ms": 45_000 +} + + +def get_channel_options(): + """ + Channel arguments are expected as a tuple of tuples: + https://grpc.github.io/grpc/python/glossary.html#term-channel_arguments + """ + return tuple( + (k, v) for k, v in GRPC_CHANNEL_OPTIONS.items() + ) diff --git a/pyzeebe/grpc_internals/grpc_channel_utils.py b/pyzeebe/grpc_internals/grpc_channel_utils.py index 1ba4d71f..78fd7fd4 100644 --- a/pyzeebe/grpc_internals/grpc_channel_utils.py +++ b/pyzeebe/grpc_internals/grpc_channel_utils.py @@ -4,7 +4,7 @@ import grpc from pyzeebe.credentials.base_credentials import BaseCredentials - +from pyzeebe.grpc_internals.channel_options import get_channel_options def create_connection_uri( hostname: str = None, @@ -23,8 +23,9 @@ def create_channel( credentials: Optional[BaseCredentials] = None, secure_connection: bool = False ) -> grpc.aio.Channel: + options = get_channel_options() if credentials: - return grpc.aio.secure_channel(connection_uri, credentials.grpc_credentials) + return grpc.aio.secure_channel(connection_uri, credentials.grpc_credentials, options=options) if secure_connection: - return grpc.aio.secure_channel(connection_uri, grpc.ssl_channel_credentials()) - return grpc.aio.insecure_channel(connection_uri) + return grpc.aio.secure_channel(connection_uri, grpc.ssl_channel_credentials(), options=options) + return grpc.aio.insecure_channel(connection_uri, options=options) diff --git a/tests/unit/grpc_internals/channel_options_test.py b/tests/unit/grpc_internals/channel_options_test.py new file mode 100644 index 00000000..56ba08e9 --- /dev/null +++ b/tests/unit/grpc_internals/channel_options_test.py @@ -0,0 +1,70 @@ +from copy import deepcopy + +import pytest +from unittest.mock import patch, Mock + +from pyzeebe.grpc_internals.zeebe_adapter_base import ZeebeAdapterBase + +import pyzeebe.grpc_internals.channel_options +from pyzeebe.grpc_internals.channel_options import get_channel_options + + +@pytest.fixture +def revert_monkeypatch_after_test(): + """ + This sort of exists in pytest already (docs.pytest.org/en/stable/monkeypatch.html), + however that means a bit of "magic" happens, this is a bit clearer and tests the users + approach to this. + """ + options_before = deepcopy(pyzeebe.grpc_internals.channel_options.GRPC_CHANNEL_OPTIONS) + yield + pyzeebe.grpc_internals.channel_options.GRPC_CHANNEL_OPTIONS = options_before + + +def test_get_channel_options_returns_tuple_of_tuple_with_options(): + assert get_channel_options() == ( + ("grpc.keepalive_time_ms", 45000), + ) + + +@pytest.mark.parametrize("grpc_method,call_kwargs", + [ + ("grpc.aio.secure_channel", {"secure_connection": True}), + ("grpc.aio.insecure_channel", {}), + ("grpc.aio.secure_channel", {"credentials": Mock()}), + ]) +def test_create_channel_called_with_options(grpc_method, call_kwargs, zeebe_adapter): + with patch(grpc_method) as channel_mock: + ZeebeAdapterBase(**call_kwargs).connect() + expected_options = (('grpc.keepalive_time_ms', 45000),) + # `call_args.kwargs` as it's not available in python <=3.7 + # https://docs.python.org/3/library/unittest.mock.html#unittest.mock.Mock.call_args + # 0 is args, 1 is kwargs + assert channel_mock.call_args[1]["options"] == expected_options + + +@pytest.mark.usefixtures("revert_monkeypatch_after_test") +def test_monkeypatching_with_options_override(): + pyzeebe.grpc_internals.channel_options.GRPC_CHANNEL_OPTIONS["grpc.keepalive_time_ms"] = 4000 + assert get_channel_options() == ( + ("grpc.keepalive_time_ms", 4000), + ) + + +@pytest.mark.usefixtures("revert_monkeypatch_after_test") +def test_monkeypatching_with_options_added(): + pyzeebe.grpc_internals.channel_options.GRPC_CHANNEL_OPTIONS.update({ + "grpc.keepalive_timeout_ms": 120000, + "grpc.http2.min_time_between_pings_ms": 60000, + }) + assert get_channel_options() == ( + ("grpc.keepalive_time_ms", 45000), + ("grpc.keepalive_timeout_ms", 120000), + ("grpc.http2.min_time_between_pings_ms", 60000) + ) + + +@pytest.mark.usefixtures("revert_monkeypatch_after_test") +def test_monkeypatching_with_options_removed(): + pyzeebe.grpc_internals.channel_options.GRPC_CHANNEL_OPTIONS = {} + assert get_channel_options() == () From 0c09722bee21204bb59c7e1f21745e69420e2579 Mon Sep 17 00:00:00 2001 From: Kristoffer Bakkejord Date: Sat, 3 Jul 2021 16:56:24 +0200 Subject: [PATCH 2/4] Add functionality to pass gRPC channel options when creating channel --- pyzeebe/grpc_internals/channel_options.py | 34 +++++++++++++++---- pyzeebe/grpc_internals/grpc_channel_utils.py | 28 +++++++++++---- .../grpc_internals/grpc_channel_utils_test.py | 23 +++++++++++++ 3 files changed, 73 insertions(+), 12 deletions(-) diff --git a/pyzeebe/grpc_internals/channel_options.py b/pyzeebe/grpc_internals/channel_options.py index fdba65be..006330f9 100644 --- a/pyzeebe/grpc_internals/channel_options.py +++ b/pyzeebe/grpc_internals/channel_options.py @@ -1,15 +1,37 @@ +""" gRPC Channel Options + +``grpc.keepalive_time_ms`` +-------------------------- + +Time between keepalive pings. Following the official Zeebe Java/Go client, sending pings every 45 seconds. + +https://docs.camunda.io/docs/product-manuals/zeebe/deployment-guide/operations/setting-up-a-cluster/#keep-alive-intervals +""" +from typing import Dict, Any, Tuple + GRPC_CHANNEL_OPTIONS = { - # https://docs.camunda.io/docs/product-manuals/zeebe/deployment-guide/operations/setting-up-a-cluster/#keep-alive-intervals - # "By default, the official Zeebe clients (Java and Go) send keep alive pings every 45 seconds." "grpc.keepalive_time_ms": 45_000 } -def get_channel_options(): +def get_channel_options(options: Dict[str, Any] = None) -> Tuple[Tuple[str, Any], ...]: """ - Channel arguments are expected as a tuple of tuples: - https://grpc.github.io/grpc/python/glossary.html#term-channel_arguments + Convert options dict to tuple in expected format for creating the gRPC channel + + Args: + options (Dict[str, Any]): A key/value representation of `gRPC channel arguments_`. + Default: None (will use library defaults) + + Returns: + Tuple[Tuple[str, Any], ...]: Options for the gRPC channel + + .. _gRPC channel arguments: + https://grpc.github.io/grpc/python/glossary.html#term-channel_arguments """ + if options: + options = {**GRPC_CHANNEL_OPTIONS, **options} + else: + options = GRPC_CHANNEL_OPTIONS return tuple( - (k, v) for k, v in GRPC_CHANNEL_OPTIONS.items() + (k, v) for k, v in options.items() ) diff --git a/pyzeebe/grpc_internals/grpc_channel_utils.py b/pyzeebe/grpc_internals/grpc_channel_utils.py index 78fd7fd4..12767e91 100644 --- a/pyzeebe/grpc_internals/grpc_channel_utils.py +++ b/pyzeebe/grpc_internals/grpc_channel_utils.py @@ -1,5 +1,5 @@ import os -from typing import Optional +from typing import Optional, Dict, Any import grpc @@ -21,11 +21,27 @@ def create_connection_uri( def create_channel( connection_uri: str, credentials: Optional[BaseCredentials] = None, - secure_connection: bool = False + secure_connection: bool = False, + options: Dict[str, Any] = None ) -> grpc.aio.Channel: - options = get_channel_options() + """ + Create a gRPC channel. + + Args: + connection_uri (str): The URI to Zeebe. + credentials (BaseCredentials): Credentials for accessing the channel. Default: None + secure_connection (bool): Create a secure channel (set to True when using credentials). Default: False + options (Dict[str, Any]): A key/value representation of `gRPC channel arguments_`. Default: None (will use library defaults) + + Returns: + grpc.aio.Channel: A channel object set up to connect to Zeebe + + .. _gRPC channel arguments: + https://grpc.github.io/grpc/python/glossary.html#term-channel_arguments + """ + channel_options = get_channel_options(options) if credentials: - return grpc.aio.secure_channel(connection_uri, credentials.grpc_credentials, options=options) + return grpc.aio.secure_channel(connection_uri, credentials.grpc_credentials, options=channel_options) if secure_connection: - return grpc.aio.secure_channel(connection_uri, grpc.ssl_channel_credentials(), options=options) - return grpc.aio.insecure_channel(connection_uri, options=options) + return grpc.aio.secure_channel(connection_uri, grpc.ssl_channel_credentials(), options=channel_options) + return grpc.aio.insecure_channel(connection_uri, options=channel_options) diff --git a/tests/unit/grpc_internals/grpc_channel_utils_test.py b/tests/unit/grpc_internals/grpc_channel_utils_test.py index 44df7590..618493fa 100644 --- a/tests/unit/grpc_internals/grpc_channel_utils_test.py +++ b/tests/unit/grpc_internals/grpc_channel_utils_test.py @@ -26,6 +26,29 @@ def test_creates_secure_channel_when_secure_connection_is_enabled(self): channel_mock.assert_called_once() + def test_creates_secure_channel_with_default_options(self): + with patch("grpc.aio.insecure_channel") as channel_mock: + create_channel(str(uuid4()), options=None) + + expected_options = ( + ("grpc.keepalive_time_ms", 45000), + ) + assert channel_mock.call_args[1]["options"] == expected_options + + def test_creates_insecure_channel_with_options(self): + with patch("grpc.aio.insecure_channel") as channel_mock: + additional_options = { + "grpc.keepalive_timeout_ms": 120000, + "grpc.http2.min_time_between_pings_ms": 60000, + } + create_channel(str(uuid4()), options=additional_options) + + expected_options = ( + ("grpc.keepalive_time_ms", 45000), + ("grpc.keepalive_timeout_ms", 120000), + ("grpc.http2.min_time_between_pings_ms", 60000), + ) + assert channel_mock.call_args[1]["options"] == expected_options class TestCreateConnectionUri: def test_uses_credentials_first(self): From e661d4a391e8d93f1c0a979f27d881c08868c71e Mon Sep 17 00:00:00 2001 From: Kristoffer Bakkejord Date: Sat, 3 Jul 2021 17:08:08 +0200 Subject: [PATCH 3/4] Fix isort errors --- pyzeebe/grpc_internals/channel_options.py | 2 +- pyzeebe/grpc_internals/grpc_channel_utils.py | 3 ++- tests/unit/grpc_internals/channel_options_test.py | 5 ++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pyzeebe/grpc_internals/channel_options.py b/pyzeebe/grpc_internals/channel_options.py index 006330f9..9e20d256 100644 --- a/pyzeebe/grpc_internals/channel_options.py +++ b/pyzeebe/grpc_internals/channel_options.py @@ -7,7 +7,7 @@ https://docs.camunda.io/docs/product-manuals/zeebe/deployment-guide/operations/setting-up-a-cluster/#keep-alive-intervals """ -from typing import Dict, Any, Tuple +from typing import Any, Dict, Tuple GRPC_CHANNEL_OPTIONS = { "grpc.keepalive_time_ms": 45_000 diff --git a/pyzeebe/grpc_internals/grpc_channel_utils.py b/pyzeebe/grpc_internals/grpc_channel_utils.py index 12767e91..dfd6978e 100644 --- a/pyzeebe/grpc_internals/grpc_channel_utils.py +++ b/pyzeebe/grpc_internals/grpc_channel_utils.py @@ -1,11 +1,12 @@ import os -from typing import Optional, Dict, Any +from typing import Any, Dict, Optional import grpc from pyzeebe.credentials.base_credentials import BaseCredentials from pyzeebe.grpc_internals.channel_options import get_channel_options + def create_connection_uri( hostname: str = None, port: int = None, diff --git a/tests/unit/grpc_internals/channel_options_test.py b/tests/unit/grpc_internals/channel_options_test.py index 56ba08e9..71a967fd 100644 --- a/tests/unit/grpc_internals/channel_options_test.py +++ b/tests/unit/grpc_internals/channel_options_test.py @@ -1,12 +1,11 @@ from copy import deepcopy +from unittest.mock import Mock, patch import pytest -from unittest.mock import patch, Mock - -from pyzeebe.grpc_internals.zeebe_adapter_base import ZeebeAdapterBase import pyzeebe.grpc_internals.channel_options from pyzeebe.grpc_internals.channel_options import get_channel_options +from pyzeebe.grpc_internals.zeebe_adapter_base import ZeebeAdapterBase @pytest.fixture From 3c40f25a5921e2cc51fca5563cadb046c7434817 Mon Sep 17 00:00:00 2001 From: Kristoffer Bakkejord Date: Sat, 3 Jul 2021 22:39:18 +0200 Subject: [PATCH 4/4] Slim down docstring for internal function --- pyzeebe/grpc_internals/grpc_channel_utils.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/pyzeebe/grpc_internals/grpc_channel_utils.py b/pyzeebe/grpc_internals/grpc_channel_utils.py index dfd6978e..f31dea18 100644 --- a/pyzeebe/grpc_internals/grpc_channel_utils.py +++ b/pyzeebe/grpc_internals/grpc_channel_utils.py @@ -26,16 +26,7 @@ def create_channel( options: Dict[str, Any] = None ) -> grpc.aio.Channel: """ - Create a gRPC channel. - - Args: - connection_uri (str): The URI to Zeebe. - credentials (BaseCredentials): Credentials for accessing the channel. Default: None - secure_connection (bool): Create a secure channel (set to True when using credentials). Default: False - options (Dict[str, Any]): A key/value representation of `gRPC channel arguments_`. Default: None (will use library defaults) - - Returns: - grpc.aio.Channel: A channel object set up to connect to Zeebe + options: A key/value representation of `gRPC channel arguments_`. Default: None (will use library defaults) .. _gRPC channel arguments: https://grpc.github.io/grpc/python/glossary.html#term-channel_arguments