diff --git a/.bumpversion.cfg b/.bumpversion.cfg index b197d4d6..a32e8186 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,20 +1,20 @@ [bumpversion] -current_version = 3.0.0rc2 +current_version = 3.0.0rc3 commit = True parse = (?P\d+)\.(?P\d+)\.(?P\d+)(?P.*) -serialize = - {major}.{minor}.{patch}{rc} - {major}.{minor}.{patch} +serialize = + {major}.{minor}.{patch}{rc} + {major}.{minor}.{patch} [bumpversion:part:rc] optional_value = final -values = - rc1 - rc2 - rc3 - rc4 - rc5 - final +values = + rc1 + rc2 + rc3 + rc4 + rc5 + final [bumpversion:file:setup.py] diff --git a/Pipfile b/Pipfile index ec96bc3a..4ad95598 100644 --- a/Pipfile +++ b/Pipfile @@ -19,6 +19,7 @@ pytest-mock = "*" pytest-asyncio = "~=0.15.1" asyncmock = "~=0.4.2" bump2version = "~=1.0.1" +responses = "~=0.13.3" [packages] oauthlib = "~=3.1.0" diff --git a/Pipfile.lock b/Pipfile.lock index 7bf1a6e0..c75b2916 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "013f47e136ae22953d5753fab997869ea41e9b1c2e23d92ee634f98dea441ded" + "sha256": "b8432193550dff748b3608533603cefbabf9fe872bef663ba85f6f86498c3bba" }, "pipfile-spec": 6, "requires": {}, @@ -171,7 +171,7 @@ "sha256:39fb8672126159acb139a7718dd10806104dec1e2f0f6c88aab05d17df10c8d4", "sha256:f57b4c16c62fa2760b7e3d97c35b255512fb6b59a259730f36ba32ce9f8e342f" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' and python_version < '4.0'", + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' and python_version < '4'", "version": "==1.26.6" }, "zeebe-grpc": { @@ -431,11 +431,11 @@ }, "isort": { "hashes": [ - "sha256:83510593e07e433b77bd5bff0f6f607dbafa06d1a89022616f02d8b699cfcd56", - "sha256:8e2c107091cfec7286bc0f68a547d0ba4c094d460b732075b6fba674f1035c0c" + "sha256:eed17b53c3e7912425579853d078a0832820f023191561fcee9d7cae424e0813", + "sha256:f65ce5bd4cbc6abdfbe29afc2f0245538ab358c14590912df638033f157d555e" ], "markers": "python_version < '4.0' and python_full_version >= '3.6.1'", - "version": "==5.9.1" + "version": "==5.9.2" }, "jinja2": { "hashes": [ @@ -574,11 +574,11 @@ }, "packaging": { "hashes": [ - "sha256:5b327ac1320dc863dca72f4514ecc086f31186744b84a230374cc1fd776feae5", - "sha256:67714da7f7bc052e064859c05c595155bd1ee9f69f76557e21f051443c20947a" + "sha256:7dc96269f53a4ccec5c0670940a4281106dd0bb343f47b7471f779df49c2fbe7", + "sha256:c86254f9220d55e31cc94d69bade760f0847da8000def4dfe1c6b872fd14ff14" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", - "version": "==20.9" + "markers": "python_version >= '3.6'", + "version": "==21.0" }, "pluggy": { "hashes": [ @@ -720,6 +720,14 @@ "index": "pypi", "version": "==1.3.0" }, + "responses": { + "hashes": [ + "sha256:18a5b88eb24143adbf2b4100f328a2f5bfa72fbdacf12d97d41f07c26c45553d", + "sha256:b54067596f331786f5ed094ff21e8d79e6a1c68ef625180a7d34808d6f36c11b" + ], + "index": "pypi", + "version": "==0.13.3" + }, "six": { "hashes": [ "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926", @@ -820,7 +828,7 @@ "sha256:39fb8672126159acb139a7718dd10806104dec1e2f0f6c88aab05d17df10c8d4", "sha256:f57b4c16c62fa2760b7e3d97c35b255512fb6b59a259730f36ba32ce9f8e342f" ], - "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' and python_version < '4.0'", + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' and python_version < '4'", "version": "==1.26.6" }, "wrapt": { @@ -839,11 +847,11 @@ }, "zipp": { "hashes": [ - "sha256:3607921face881ba3e026887d8150cca609d517579abe052ac81fc5aeffdbd76", - "sha256:51cb66cc54621609dd593d1787f286ee42a5c0adbb4b29abea5a63edc3e03098" + "sha256:957cfda87797e389580cb8b9e3870841ca991e2125350677b2ca83a0e99390a3", + "sha256:f5812b1e007e48cff63449a5e9f4e7ebea716b4111f9c4f9a645f91d579bf0c4" ], "markers": "python_version >= '3.6'", - "version": "==3.4.1" + "version": "==3.5.0" } } } diff --git a/README.md b/README.md index 9284f00b..0b8b0342 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,11 @@ The `ZeebeWorker` class uses threading to get and run jobs. ```python import asyncio -from pyzeebe import ZeebeWorker, Job +from pyzeebe import ZeebeWorker, Job, create_insecure_channel + + +channel = create_insecure_channel(hostname="localhost", port=26500) # Create grpc channel +worker = ZeebeWorker(channel) # Create a zeebe worker async def on_error(exception: Exception, job: Job): @@ -48,9 +52,6 @@ async def on_error(exception: Exception, job: Job): await job.set_error_status(f"Failed to handle job {job}. Error: {str(exception)}") - -worker = ZeebeWorker(hostname="", port=26500) # Create a zeebe worker - @worker.task(task_type="example", exception_handler=on_error) def example_task(input: str) -> dict: return {"output": f"Hello world, {input}!"} @@ -72,10 +73,11 @@ await zeebe_worker.stop() # Stops worker after all running jobs have been comple ### Client ```python -from pyzeebe import ZeebeClient +from pyzeebe import ZeebeClient, create_insecure_channel # Create a zeebe client -zeebe_client = ZeebeClient(hostname="localhost", port=26500) +channel = create_insecure_channel(hostname="localhost", port=26500) +zeebe_client = ZeebeClient(channel) # Run a Zeebe process instance process_instance_key = await zeebe_client.run_process(bpmn_process_id="My zeebe process", variables={}) diff --git a/docs/channels.rst b/docs/channels.rst new file mode 100644 index 00000000..6cd848f5 --- /dev/null +++ b/docs/channels.rst @@ -0,0 +1,64 @@ +======== +Channels +======== + +In order to instantiate a ZeebeWorker or ZeebeClient you will need to provide an instance of a `grpc.aio.Channel`. + +Pyzeebe provides a couple standard ways to achieve this: + + +Insecure +-------- + +Create a grpc channel connected to a Zeebe Gateway with tls disabled + + +.. autoclass:: pyzeebe.create_insecure_channel + :members: + + +Example: + +.. code-block:: python + + from pyzeebe import create_insecure_channel + + channel = create_insecure_channel(hostname="zeebe", port=443) + + +Secure +------ + +Create a grpc channel with a secure connection to a Zeebe Gateway with tls + +.. autoclass:: pyzeebe.create_secure_channel + :members: + +Example: + +.. code-block:: python + + import grpc + from pyzeebe import create_secure_channel + + + grpc.ssl_channel_credentials(root_certificates="", private_key="") + channel = create_secure_channel(channel_credentials=credentials) + + +Camunda Cloud +------------- + +Create a grpc channel connected to a Zeebe Gateway running in camunda cloud + +.. autoclass:: pyzeebe.create_camunda_cloud_channel + :members: + +Example: + +.. code-block:: python + + from pyzeebe import create_camunda_cloud_channel + + + channel = create_camunda_cloud_channel("client_id", "client_secret", "cluster_id") \ No newline at end of file diff --git a/docs/client_quickstart.rst b/docs/client_quickstart.rst index 10e2e3b7..2ab7ebe1 100644 --- a/docs/client_quickstart.rst +++ b/docs/client_quickstart.rst @@ -9,23 +9,17 @@ To create a client with default configuration: .. code-block:: python - from pyzeebe import ZeebeClient + from pyzeebe import ZeebeClient, create_insecure_channel - client = ZeebeClient() # Will use ZEEBE_ADDRESS environment variable or localhost:26500 - - -To create a client with custom hostname and port: - -.. code-block:: python - - client = ZeebeClient(hostname="zeebe_gateway", port=26500) + channel = create_insecure_channel() # Will use ZEEBE_ADDRESS environment variable or localhost:26500 + client = ZeebeClient(channel) To change connection retries: .. code-block:: python - client = ZeebeClient(max_connection_retries=1) # Will only accept one failure and disconnect upon the second + client = ZeebeClient(grpc_channel, max_connection_retries=1) # Will only accept one failure and disconnect upon the second This means the client will disconnect upon two consecutive failures. Each time the client connects successfully the counter is reset. @@ -36,33 +30,6 @@ This means the client will disconnect upon two consecutive failures. Each time t -To create a client with a secure connection: - -.. code-block:: python - - client = ZeebeClient(secure_connection=True) - -To create a client with OAuth 2.0 authentication: - -.. code-block:: python - - from pyzeebe import ZeebeClient, OAuthCredentials - - credentials = OAuthCredentials(url="oauth_token_url", client_id="client_id", client_secret="client_secret", - audience="audience") - client = ZeebeClient() - -To create a client for a Camunda Cloud instance: - -.. code-block:: python - - from pyzeebe import ZeebeClient, CamundaCloudCredentials - - credentials = CamundaCloudCredentials(client_id="client_id", client_secret="client_secret", - cluster_id="cluster_id") - client = ZeebeClient() - - Run a Zeebe process instance ---------------------------- diff --git a/docs/conf.py b/docs/conf.py index 90cc25bf..d1f3d60d 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -26,7 +26,7 @@ author = 'Jonatan Martens' # The full version, including alpha/beta/rc tags -release = '3.0.0rc2' +release = '3.0.0rc3' # -- General configuration --------------------------------------------------- @@ -59,6 +59,6 @@ # so a file named "default.css" will overwrite the builtin "default.css". html_static_path = ['_static'] -version = "3.0.0rc2" +version = "3.0.0rc3" master_doc = 'index' diff --git a/docs/index.rst b/docs/index.rst index c2a557fe..301ea9e6 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -19,9 +19,10 @@ Creating a worker .. code-block:: python - from pyzeebe import ZeebeWorker + from pyzeebe import ZeebeWorker, create_insecure_channel - worker = ZeebeWorker() + channel = create_insecure_channel() + worker = ZeebeWorker(channel) @worker.task(task_type="my_task") async def my_task(x: int): @@ -33,9 +34,10 @@ Creating a client .. code-block:: python - from pyzeebe import ZeebeClient + from pyzeebe import ZeebeClient, create_insecure_channel - client = ZeebeClient() + channel = create_insecure_channel() + client = ZeebeClient(channel) await client.run_process("my_process") @@ -61,5 +63,6 @@ Table Of Contents Client Worker + Channels Decorators Exceptions diff --git a/docs/worker_quickstart.rst b/docs/worker_quickstart.rst index 790bcc4c..af529201 100644 --- a/docs/worker_quickstart.rst +++ b/docs/worker_quickstart.rst @@ -9,10 +9,10 @@ Create and start a worker import asyncio - from pyzeebe import ZeebeWorker + from pyzeebe import ZeebeWorker, create_insecure_channel - - worker = ZeebeWorker() + channel = create_insecure_channel() + worker = ZeebeWorker(channel) @worker.task(task_type="my_task") @@ -25,26 +25,11 @@ Create and start a worker Worker connection options ------------------------- -To create a worker with default configuration: - -.. code-block:: python - - from pyzeebe import ZeebeWorker - - worker = ZeebeWorker() # Will use ZEEBE_ADDRESS environment variable or localhost:26500 - - -To create a worker with custom hostname and port: - -.. code-block:: python - - worker = ZeebeWorker(hostname="zeebe_gateway", port=26500) - To change connection retries: .. code-block:: python - worker = ZeebeWorker(max_connection_retries=1) # Will only accept one failure and disconnect upon the second + worker = ZeebeWorker(grpc_channel, max_connection_retries=1) # Will only accept one failure and disconnect upon the second This means the worker will disconnect upon two consecutive failures. Each time the worker connects successfully the counter is reset. @@ -52,32 +37,6 @@ This means the worker will disconnect upon two consecutive failures. Each time t The default behavior is 10 retries. If you want infinite retries just set to -1. -To create a worker with a secure connection: - -.. code-block:: python - - worker = ZeebeWorker(secure_connection=True) - -To create a worker with OAuth 2.0 authentication: - -.. code-block:: python - - from pyzeebe import ZeebeWorker, OAuthCredentials - - credentials = OAuthCredentials(url="oauth_token_url", client_id="client_id", client_secret="client_secret", - audience="audience") - worker = ZeebeWorker() - -To create a worker for a Camunda Cloud instance: - -.. code-block:: python - - from pyzeebe import ZeebeWorker, CamundaCloudCredentials - - credentials = CamundaCloudCredentials(client_id="client_id", client_secret="client_secret", - cluster_id="cluster_id") - worker = ZeebeWorker() - Add a task ---------- diff --git a/examples/client.py b/examples/client.py index 199f09d9..6858ee55 100644 --- a/examples/client.py +++ b/examples/client.py @@ -1,25 +1,30 @@ -from pyzeebe import CamundaCloudCredentials, ZeebeClient +from pyzeebe import (ZeebeClient, create_camunda_cloud_channel, + create_insecure_channel, create_secure_channel) # Create a zeebe client without credentials -zeebe_client = ZeebeClient(hostname="localhost", port=26500) +grpc_channel = create_insecure_channel(hostname="localhost", port=26500) +zeebe_client = ZeebeClient(grpc_channel) # Create a zeebe client with TLS -zeebe_client = ZeebeClient( - hostname="localhost", port=26500, secure_connection=True) +grpc_channel = create_secure_channel() +zeebe_client = ZeebeClient(grpc_channel) # Create a zeebe client for camunda cloud -camunda_cloud_credentials = CamundaCloudCredentials(client_id="", client_secret="", - cluster_id="") -zeebe_client = ZeebeClient(credentials=camunda_cloud_credentials) +grpc_channel = create_camunda_cloud_channel( + client_id="", + client_secret="", + cluster_id="", +) +zeebe_client = ZeebeClient(grpc_channel) # Run a Zeebe instance process process_instance_key = await zeebe_client.run_process( - bpmn_process_id="My zeebe process", variables={}) + bpmn_process_id="My zeebe process", variables={} +) # Run a Zeebe process instance and receive the result process_instance_key, process_result = await zeebe_client.run_process_with_result( - bpmn_process_id="My zeebe process", - timeout=10000 + bpmn_process_id="My zeebe process", timeout=10000 ) # Will wait 10000 milliseconds (10 seconds) # Deploy a bpmn process definition diff --git a/examples/worker.py b/examples/worker.py index cfbc01f8..edff196c 100644 --- a/examples/worker.py +++ b/examples/worker.py @@ -1,7 +1,8 @@ import asyncio from typing import Dict -from pyzeebe import CamundaCloudCredentials, Job, ZeebeWorker +from pyzeebe import (Job, ZeebeWorker, create_camunda_cloud_channel, + create_insecure_channel, create_secure_channel) from pyzeebe.errors import BusinessError @@ -12,15 +13,30 @@ async def example_logging_task_decorator(job: Job) -> Job: # Will use environment variable ZEEBE_ADDRESS or localhost:26500 and NOT use TLS -worker = ZeebeWorker() +# create_insecure_channel returns a grpc.aio.Channel instance. If needed you +# can build one on your own +grpc_channel = create_insecure_channel() +worker = ZeebeWorker(grpc_channel) + +# With custom hostname/port +grpc_channel = create_insecure_channel(hostname="zeebe-gateway.mydomain", port=443) +worker = ZeebeWorker(grpc_channel) # Will use environment variable ZEEBE_ADDRESS or localhost:26500 and use TLS -worker = ZeebeWorker(secure_connection=True) +grpc_channel = create_secure_channel() +worker = ZeebeWorker(grpc_channel) + +# With custom hostname/port +grpc_channel = create_secure_channel(hostname="zeebe-gateway.mydomain", port=443) +worker = ZeebeWorker(grpc_channel) # Connect to zeebe cluster in camunda cloud -camunda_cloud_credentials = CamundaCloudCredentials(client_id="", client_secret="", - cluster_id="") -worker = ZeebeWorker(credentials=camunda_cloud_credentials) +grpc_channel = create_camunda_cloud_channel( + client_id="", + client_secret="", + cluster_id="", +) +worker = ZeebeWorker(grpc_channel) # Decorators allow us to add functionality before and after each job worker.before(example_logging_task_decorator) @@ -32,11 +48,13 @@ async def example_logging_task_decorator(job: Job) -> Job: def example_task() -> Dict: return {"output": f"Hello world, test!"} + # Or like this: @worker.task(task_type="test2") async def second_example_task() -> Dict: return {"output": f"Hello world, test2!"} + # Create a task that will return a single value (not a dict) like this: # This task will return to zeebe: { y: x + 1 } @worker.task(task_type="add_one", single_value=True, variable_name="y") @@ -56,9 +74,7 @@ def exception_task(): async def example_exception_handler(exception: Exception, job: Job) -> None: print(exception) print(job) - await job.set_failure_status( - f"Failed to run task {job.type}. Reason: {exception}" - ) + await job.set_failure_status(f"Failed to run task {job.type}. Reason: {exception}") @worker.task(task_type="exception_task", exception_handler=example_exception_handler) @@ -70,8 +86,11 @@ async def exception_task(): # The order of the decorators will be as follows: # Worker decorators -> Task decorators -> Task -> Task decorators -> Worker decorators # Here is how: -@worker.task(task_type="decorator_task", before=[example_logging_task_decorator], - after=[example_logging_task_decorator]) +@worker.task( + task_type="decorator_task", + before=[example_logging_task_decorator], + after=[example_logging_task_decorator], +) async def decorator_task() -> Dict: return {"output": "Hello world, test!"} diff --git a/pyzeebe/__init__.py b/pyzeebe/__init__.py index 37bac1c3..d77607fe 100644 --- a/pyzeebe/__init__.py +++ b/pyzeebe/__init__.py @@ -1,11 +1,9 @@ -__version__ = "3.0.0rc2" +__version__ = "3.0.0rc3" from pyzeebe import errors +from pyzeebe.channel import * from pyzeebe.client.client import ZeebeClient from pyzeebe.client.sync_client import SyncZeebeClient # type: ignore -from pyzeebe.credentials.camunda_cloud_credentials import \ - CamundaCloudCredentials -from pyzeebe.credentials.oauth_credentials import OAuthCredentials from pyzeebe.job.job import Job from pyzeebe.job.job_status import JobStatus from pyzeebe.task.exception_handler import ExceptionHandler diff --git a/pyzeebe/channel/__init__.py b/pyzeebe/channel/__init__.py new file mode 100644 index 00000000..f07fedd4 --- /dev/null +++ b/pyzeebe/channel/__init__.py @@ -0,0 +1,3 @@ +from pyzeebe.channel.camunda_cloud_channel import create_camunda_cloud_channel +from pyzeebe.channel.insecure_channel import create_insecure_channel +from pyzeebe.channel.secure_channel import create_secure_channel diff --git a/pyzeebe/channel/camunda_cloud_channel.py b/pyzeebe/channel/camunda_cloud_channel.py new file mode 100644 index 00000000..9d470b5d --- /dev/null +++ b/pyzeebe/channel/camunda_cloud_channel.py @@ -0,0 +1,88 @@ +from typing import Dict, Optional + +import grpc +from oauthlib import oauth2 +from requests import HTTPError +from requests_oauthlib import OAuth2Session + +from pyzeebe.channel.channel_options import get_channel_options +from pyzeebe.errors import (InvalidCamundaCloudCredentialsError, + InvalidOAuthCredentialsError) + + +def create_camunda_cloud_channel( + client_id: str, + client_secret: str, + cluster_id: str, + channel_options: Optional[Dict] = None, +) -> grpc.aio.Channel: + """ + Create channel connected to a Camunda Cloud cluster + + Args: + client_id (str): The client id provided by Camunda Cloud + client_secret (str): The client secret provided by Camunda Cloud + cluster_id (str): The zeebe cluster id to connect to + channel_options (Optional[Dict], optional): GRPC channel options. See https://grpc.github.io/grpc/python/glossary.html + + Returns: + grpc.aio.Channel: A GRPC Channel connected to the Zeebe gateway. + + Raises: + InvalidCamundaCloudCredentialsError: One of the provided camunda credentials is not correct + """ + channel_credentials = _create_camunda_cloud_credentials( + client_id, client_secret, cluster_id + ) + + return grpc.aio.secure_channel( + f"{cluster_id}.zeebe.camunda.io:443", + channel_credentials, + options=get_channel_options(channel_options), + ) + + +def _create_camunda_cloud_credentials( + client_id: str, client_secret: str, cluster_id: str +) -> grpc.ChannelCredentials: + try: + access_token = _get_access_token( + "https://login.cloud.camunda.io/oauth/token", + client_id, + client_secret, + cluster_id, + ) + return _create_oauth_credentials(access_token) + except InvalidOAuthCredentialsError as oauth_error: + raise InvalidCamundaCloudCredentialsError( + client_id, cluster_id + ) from oauth_error + + +def _get_access_token( + url: str, client_id: str, client_secret: str, audience: str +) -> str: + try: + client = oauth2.BackendApplicationClient(client_id) + client.prepare_request_body(include_client_id=True) + with OAuth2Session(client=client) as session: + response = session.post( + url, + data={ + "client_id": client_id, + "client_secret": client_secret, + "audience": audience, + }, + ) + response.raise_for_status() + return response.json()["access_token"] + except HTTPError as http_error: + raise InvalidOAuthCredentialsError( + url=url, client_id=client_id, audience=audience + ) from http_error + + +def _create_oauth_credentials(access_token: str) -> grpc.ChannelCredentials: + token_credentials = grpc.access_token_call_credentials(access_token) + ssl_credentials = grpc.ssl_channel_credentials() + return grpc.composite_channel_credentials(ssl_credentials, token_credentials) diff --git a/pyzeebe/grpc_internals/channel_options.py b/pyzeebe/channel/channel_options.py similarity index 100% rename from pyzeebe/grpc_internals/channel_options.py rename to pyzeebe/channel/channel_options.py diff --git a/pyzeebe/channel/insecure_channel.py b/pyzeebe/channel/insecure_channel.py new file mode 100644 index 00000000..931d6e58 --- /dev/null +++ b/pyzeebe/channel/insecure_channel.py @@ -0,0 +1,26 @@ +from typing import Dict, Optional + +import grpc + +from pyzeebe.channel.channel_options import get_channel_options +from pyzeebe.channel.utils import create_address + + +def create_insecure_channel( + hostname: Optional[str] = None, + port: Optional[int] = None, + channel_options: Optional[Dict] = None +) -> grpc.aio.Channel: + """ + Create an insecure channel + + Args: + hostname (Optional[str], optional): Zeebe gateway hostname + port (Optional[int], optional): Zeebe gateway port + channel_options (Optional[Dict], optional): GRPC channel options. See https://grpc.github.io/grpc/python/glossary.html#term-channel_arguments + + Returns: + grpc.aio.Channel: A GRPC Channel connected to the Zeebe gateway. + """ + address = create_address(hostname, port) + return grpc.aio.insecure_channel(address, options=get_channel_options(channel_options)) diff --git a/pyzeebe/channel/secure_channel.py b/pyzeebe/channel/secure_channel.py new file mode 100644 index 00000000..e553021e --- /dev/null +++ b/pyzeebe/channel/secure_channel.py @@ -0,0 +1,31 @@ +from typing import Dict, Optional + +import grpc + +from pyzeebe.channel.channel_options import get_channel_options +from pyzeebe.channel.utils import create_address + + +def create_secure_channel( + hostname: Optional[str] = None, + port: Optional[int] = None, + channel_options: Optional[Dict] = None, + channel_credentials: Optional[grpc.ChannelCredentials] = None, +) -> grpc.aio.Channel: + """ + Create a secure channel + + Args: + hostname (Optional[str], optional): Zeebe gateway hostname + port (Optional[int], optional): Zeebe gateway port + channel_options (Optional[Dict], optional): GRPC channel options. See https://grpc.github.io/grpc/python/glossary.html#term-channel_arguments + channel_credentials (Optional[grpc.ChannelCredentials]): Channel credentials to use. Will use grpc.ssl_channel_credentials() if not provided. + + Returns: + grpc.aio.Channel: A GRPC Channel connected to the Zeebe gateway. + """ + address = create_address(hostname, port) + credentials = channel_credentials or grpc.ssl_channel_credentials() + return grpc.aio.secure_channel( + address, credentials, options=get_channel_options(channel_options) + ) diff --git a/pyzeebe/channel/utils.py b/pyzeebe/channel/utils.py new file mode 100644 index 00000000..8b768c36 --- /dev/null +++ b/pyzeebe/channel/utils.py @@ -0,0 +1,15 @@ +import os +from typing import Optional + +DEFAULT_HOSTNAME = "localhost" +DEFAULT_PORT = 26500 +DEFAULT_ADDRESS = f"{DEFAULT_HOSTNAME}:{DEFAULT_PORT}" + + +def create_address( + hostname: Optional[str] = None, + port: Optional[int] = None, +) -> str: + if hostname or port: + return f"{hostname or DEFAULT_HOSTNAME}:{port or DEFAULT_PORT}" + return os.getenv("ZEEBE_ADDRESS", DEFAULT_ADDRESS) diff --git a/pyzeebe/client/client.py b/pyzeebe/client/client.py index 411b8427..8d089a25 100644 --- a/pyzeebe/client/client.py +++ b/pyzeebe/client/client.py @@ -2,28 +2,26 @@ import grpc -from pyzeebe.credentials.base_credentials import BaseCredentials from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter class ZeebeClient(object): """A zeebe client that can connect to a zeebe instance and perform actions.""" - def __init__(self, hostname: str = None, port: int = None, credentials: BaseCredentials = None, - secure_connection: bool = False, max_connection_retries: int = 10): + def __init__( + self, grpc_channel: grpc.aio.Channel, max_connection_retries: int = 10 + ): """ Args: - hostname (str): Zeebe instance hostname - port (int): Port of the zeebe + grpc_channel (grpc.aio.Channel): GRPC Channel connected to a Zeebe gateway max_connection_retries (int): Amount of connection retries before client gives up on connecting to zeebe. To setup with infinite retries use -1 """ - self.zeebe_adapter = ZeebeAdapter(hostname=hostname, port=port, credentials=credentials, - secure_connection=secure_connection, - max_connection_retries=max_connection_retries) - self.zeebe_adapter.connect() + self.zeebe_adapter = ZeebeAdapter(grpc_channel, max_connection_retries) - async def run_process(self, bpmn_process_id: str, variables: Dict = None, version: int = -1) -> int: + async def run_process( + self, bpmn_process_id: str, variables: Dict = None, version: int = -1 + ) -> int: """ Run process @@ -44,10 +42,18 @@ async def run_process(self, bpmn_process_id: str, variables: Dict = None, versio ZeebeInternalError: If Zeebe experiences an internal error """ - return await self.zeebe_adapter.create_process_instance(bpmn_process_id=bpmn_process_id, variables=variables or {}, version=version) - - async def run_process_with_result(self, bpmn_process_id: str, variables: Dict = None, version: int = -1, - timeout: int = 0, variables_to_fetch: List[str] = None) -> Tuple[int, Dict]: + return await self.zeebe_adapter.create_process_instance( + bpmn_process_id=bpmn_process_id, variables=variables or {}, version=version + ) + + async def run_process_with_result( + self, + bpmn_process_id: str, + variables: Dict = None, + version: int = -1, + timeout: int = 0, + variables_to_fetch: List[str] = None, + ) -> Tuple[int, Dict]: """ Run process and wait for the result. @@ -70,10 +76,13 @@ async def run_process_with_result(self, bpmn_process_id: str, variables: Dict = ZeebeInternalError: If Zeebe experiences an internal error """ - return await self.zeebe_adapter.create_process_instance_with_result(bpmn_process_id=bpmn_process_id, - variables=variables or {}, version=version, - timeout=timeout, - variables_to_fetch=variables_to_fetch or []) + return await self.zeebe_adapter.create_process_instance_with_result( + bpmn_process_id=bpmn_process_id, + variables=variables or {}, + version=version, + timeout=timeout, + variables_to_fetch=variables_to_fetch or [], + ) async def cancel_process_instance(self, process_instance_key: int) -> int: """ @@ -92,7 +101,9 @@ async def cancel_process_instance(self, process_instance_key: int) -> int: ZeebeInternalError: If Zeebe experiences an internal error """ - await self.zeebe_adapter.cancel_process_instance(process_instance_key=process_instance_key) + await self.zeebe_adapter.cancel_process_instance( + process_instance_key=process_instance_key + ) return process_instance_key async def deploy_process(self, *process_file_path: str) -> None: @@ -111,8 +122,14 @@ async def deploy_process(self, *process_file_path: str) -> None: """ await self.zeebe_adapter.deploy_process(*process_file_path) - async def publish_message(self, name: str, correlation_key: str, variables: Dict = None, - time_to_live_in_milliseconds: int = 60000, message_id: str = None) -> None: + async def publish_message( + self, + name: str, + correlation_key: str, + variables: Dict = None, + time_to_live_in_milliseconds: int = 60000, + message_id: str = None, + ) -> None: """ Publish a message @@ -131,6 +148,10 @@ async def publish_message(self, name: str, correlation_key: str, variables: Dict ZeebeInternalError: If Zeebe experiences an internal error """ - await self.zeebe_adapter.publish_message(name=name, correlation_key=correlation_key, - time_to_live_in_milliseconds=time_to_live_in_milliseconds, - variables=variables or {}, message_id=message_id) + await self.zeebe_adapter.publish_message( + name=name, + correlation_key=correlation_key, + time_to_live_in_milliseconds=time_to_live_in_milliseconds, + variables=variables or {}, + message_id=message_id, + ) diff --git a/pyzeebe/client/sync_client.py b/pyzeebe/client/sync_client.py index 705bbf52..8044981a 100644 --- a/pyzeebe/client/sync_client.py +++ b/pyzeebe/client/sync_client.py @@ -3,29 +3,61 @@ import asyncio from typing import Dict, List, Tuple +import grpc + from pyzeebe import ZeebeClient -from pyzeebe.credentials.base_credentials import BaseCredentials class SyncZeebeClient(ZeebeClient): - def __init__(self, hostname: str = None, port: int = None, credentials: BaseCredentials = None, - secure_connection: bool = False, max_connection_retries: int = 10): - super().__init__(hostname, port, credentials, - secure_connection, max_connection_retries) + def __init__( + self, grpc_channel: grpc.aio.Channel, max_connection_retries: int = 10 + ): + super().__init__(grpc_channel, max_connection_retries) self.loop = asyncio.get_event_loop() - def run_process(self, bpmn_process_id: str, variables: Dict = None, version: int = -1) -> int: - return self.loop.run_until_complete(super().run_process(bpmn_process_id, variables, version)) - - def run_process_with_result(self, bpmn_process_id: str, variables: Dict = None, version: int = -1, - timeout: int = 0, variables_to_fetch: List[str] = None) -> Tuple[int, Dict]: - return self.loop.run_until_complete(super().run_process_with_result(bpmn_process_id, variables, version, timeout, variables_to_fetch)) + def run_process( + self, bpmn_process_id: str, variables: Dict = None, version: int = -1 + ) -> int: + return self.loop.run_until_complete( + super().run_process(bpmn_process_id, variables, version) + ) + + def run_process_with_result( + self, + bpmn_process_id: str, + variables: Dict = None, + version: int = -1, + timeout: int = 0, + variables_to_fetch: List[str] = None, + ) -> Tuple[int, Dict]: + return self.loop.run_until_complete( + super().run_process_with_result( + bpmn_process_id, variables, version, timeout, variables_to_fetch + ) + ) def cancel_process_instance(self, process_instance_key: int) -> int: - return self.loop.run_until_complete(super().cancel_process_instance(process_instance_key)) + return self.loop.run_until_complete( + super().cancel_process_instance(process_instance_key) + ) def deploy_process(self, *process_file_path: str) -> None: return self.loop.run_until_complete(super().deploy_process(*process_file_path)) - def publish_message(self, name: str, correlation_key: str, variables: Dict = None, time_to_live_in_milliseconds: int = 60000, message_id: str = None) -> None: - return self.loop.run_until_complete(super().publish_message(name, correlation_key, variables, time_to_live_in_milliseconds, message_id)) + def publish_message( + self, + name: str, + correlation_key: str, + variables: Dict = None, + time_to_live_in_milliseconds: int = 60000, + message_id: str = None, + ) -> None: + return self.loop.run_until_complete( + super().publish_message( + name, + correlation_key, + variables, + time_to_live_in_milliseconds, + message_id, + ) + ) diff --git a/pyzeebe/credentials/base_credentials.py b/pyzeebe/credentials/base_credentials.py deleted file mode 100644 index 34927808..00000000 --- a/pyzeebe/credentials/base_credentials.py +++ /dev/null @@ -1,11 +0,0 @@ -from abc import ABC, abstractmethod - -from grpc import ChannelCredentials - - -class BaseCredentials(ABC): - grpc_credentials: ChannelCredentials - - @abstractmethod - def get_connection_uri(self) -> str: - pass diff --git a/pyzeebe/credentials/camunda_cloud_credentials.py b/pyzeebe/credentials/camunda_cloud_credentials.py deleted file mode 100644 index 8edcaa37..00000000 --- a/pyzeebe/credentials/camunda_cloud_credentials.py +++ /dev/null @@ -1,15 +0,0 @@ -from pyzeebe.credentials.oauth_credentials import OAuthCredentials -from pyzeebe.errors import (InvalidCamundaCloudCredentialsError, - InvalidOAuthCredentialsError) - - -class CamundaCloudCredentials(OAuthCredentials): - def __init__(self, client_id: str, client_secret: str, cluster_id: str): - try: - super().__init__(url="https://login.cloud.camunda.io/oauth/token", client_id=client_id, - client_secret=client_secret, audience=f"{cluster_id}.zeebe.camunda.io") - except InvalidOAuthCredentialsError: - raise InvalidCamundaCloudCredentialsError(client_id=client_id, cluster_id=cluster_id) - - def get_connection_uri(self) -> str: - return f"{self.audience}:443" diff --git a/pyzeebe/credentials/oauth_credentials.py b/pyzeebe/credentials/oauth_credentials.py deleted file mode 100644 index a0e9c04a..00000000 --- a/pyzeebe/credentials/oauth_credentials.py +++ /dev/null @@ -1,40 +0,0 @@ -import grpc -from oauthlib import oauth2 -from requests import HTTPError -from requests_oauthlib import OAuth2Session - -from pyzeebe.credentials.base_credentials import BaseCredentials -from pyzeebe.errors import InvalidOAuthCredentialsError - - -class OAuthCredentials(BaseCredentials): - def __init__(self, url: str, client_id: str, client_secret: str, audience: str): - self.url = url - self.client_id = client_id - self.client_secret = client_secret - self.audience = audience - - self.access_token = self.get_access_token(url, client_id, client_secret, audience) - token_credentials = grpc.access_token_call_credentials(self.access_token) - ssl_credentials = grpc.ssl_channel_credentials() - self.grpc_credentials = grpc.composite_channel_credentials(ssl_credentials, token_credentials) - - @staticmethod - def get_access_token(url: str, client_id: str, client_secret: str, audience: str) -> str: - try: - client = oauth2.BackendApplicationClient(client_id) - client.prepare_request_body(include_client_id=True) - with OAuth2Session(client=client) as session: - response = session.post(url, - data={ - "client_id": client_id, - "client_secret": client_secret, - "audience": audience - }) - response.raise_for_status() - return response.json()["access_token"] - except HTTPError: - raise InvalidOAuthCredentialsError(url=url, client_id=client_id, audience=audience) - - def get_connection_uri(self) -> str: - return "" diff --git a/pyzeebe/grpc_internals/grpc_channel_utils.py b/pyzeebe/grpc_internals/grpc_channel_utils.py deleted file mode 100644 index f31dea18..00000000 --- a/pyzeebe/grpc_internals/grpc_channel_utils.py +++ /dev/null @@ -1,39 +0,0 @@ -import os -from typing import Any, Dict, Optional - -import grpc - -from pyzeebe.credentials.base_credentials import BaseCredentials -from pyzeebe.grpc_internals.channel_options import get_channel_options - - -def create_connection_uri( - hostname: str = None, - port: int = None, - credentials: BaseCredentials = None -) -> str: - if credentials and credentials.get_connection_uri(): - return credentials.get_connection_uri() - if hostname or port: - return f"{hostname or 'localhost'}:{port or 26500}" - return os.getenv("ZEEBE_ADDRESS", "localhost:26500") - - -def create_channel( - connection_uri: str, - credentials: Optional[BaseCredentials] = None, - secure_connection: bool = False, - options: Dict[str, Any] = None -) -> grpc.aio.Channel: - """ - options: A key/value representation of `gRPC channel arguments_`. Default: None (will use library defaults) - - .. _gRPC channel arguments: - https://grpc.github.io/grpc/python/glossary.html#term-channel_arguments - """ - channel_options = get_channel_options(options) - if credentials: - return grpc.aio.secure_channel(connection_uri, credentials.grpc_credentials, options=channel_options) - if secure_connection: - return grpc.aio.secure_channel(connection_uri, grpc.ssl_channel_credentials(), options=channel_options) - return grpc.aio.insecure_channel(connection_uri, options=channel_options) diff --git a/pyzeebe/grpc_internals/zeebe_adapter_base.py b/pyzeebe/grpc_internals/zeebe_adapter_base.py index ef84c453..cf549d84 100644 --- a/pyzeebe/grpc_internals/zeebe_adapter_base.py +++ b/pyzeebe/grpc_internals/zeebe_adapter_base.py @@ -5,49 +5,28 @@ import grpc from zeebe_grpc.gateway_pb2_grpc import GatewayStub -from pyzeebe.credentials.base_credentials import BaseCredentials from pyzeebe.errors import (ZeebeBackPressureError, ZeebeGatewayUnavailableError, ZeebeInternalError) -from pyzeebe.grpc_internals.grpc_channel_utils import (create_channel, - create_connection_uri) logger = logging.getLogger(__name__) -class ZeebeAdapterBase(object): - _channel: grpc.aio.Channel - _gateway_stub: GatewayStub - - def __init__(self, hostname: str = None, port: int = None, credentials: BaseCredentials = None, - secure_connection: bool = False, max_connection_retries: int = -1): - self.credentials = credentials - self.secure_connection = secure_connection - self.connection_uri = create_connection_uri( - hostname, port, credentials - ) - self.secure_connection = secure_connection - self.connected = False +class ZeebeAdapterBase: + def __init__( + self, grpc_channel: grpc.aio.Channel, max_connection_retries: int = -1 + ): + self._channel = grpc_channel + self._gateway_stub = GatewayStub(grpc_channel) + self.connected = True self.retrying_connection = False self._max_connection_retries = max_connection_retries self._current_connection_retries = 0 - def connect(self, channel: Optional[grpc.aio.Channel] = None): - self.retrying_connection = True - self.connected = True - if channel: - self._channel = channel - else: - self._channel = create_channel( - self.connection_uri, self.credentials, self.secure_connection - ) - - self._gateway_stub = GatewayStub(self._channel) - - async def disconnect(self): - await self._close() - def _should_retry(self): - return self._max_connection_retries == -1 or self._current_connection_retries < self._max_connection_retries + return ( + self._max_connection_retries == -1 + or self._current_connection_retries < self._max_connection_retries + ) async def _common_zeebe_grpc_errors(self, rpc_error: grpc.aio.AioRpcError): if self.is_error_status(rpc_error, grpc.StatusCode.RESOURCE_EXHAUSTED): diff --git a/pyzeebe/worker/worker.py b/pyzeebe/worker/worker.py index 355c5b72..8baff27e 100644 --- a/pyzeebe/worker/worker.py +++ b/pyzeebe/worker/worker.py @@ -1,10 +1,11 @@ import asyncio import logging import socket -from typing import List +from typing import List, Optional + +import grpc from pyzeebe import TaskDecorator -from pyzeebe.credentials.base_credentials import BaseCredentials from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter from pyzeebe.worker.job_executor import JobExecutor from pyzeebe.worker.job_poller import JobPoller @@ -17,15 +18,20 @@ class ZeebeWorker(ZeebeTaskRouter): """A zeebe worker that can connect to a zeebe instance and perform tasks.""" - def __init__(self, name: str = None, request_timeout: int = 0, hostname: str = None, port: int = None, - credentials: BaseCredentials = None, secure_connection: bool = False, - before: List[TaskDecorator] = None, after: List[TaskDecorator] = None, - max_connection_retries: int = 10, watcher_max_errors_factor: int = 3, - max_task_count: int = 32): + def __init__( + self, + grpc_channel: grpc.aio.Channel, + name: Optional[str] = None, + request_timeout: int = 0, + before: List[TaskDecorator] = None, + after: List[TaskDecorator] = None, + max_connection_retries: int = 10, + watcher_max_errors_factor: int = 3, + max_task_count: int = 32, + ): """ Args: - hostname (str): Zeebe instance hostname - port (int): Port of the zeebe + grpc_channel (grpc.aio.Channel): GRPC Channel connected to a Zeebe gateway name (str): Name of zeebe worker request_timeout (int): Longpolling timeout for getting tasks from zeebe. If 0 default value is used before (List[TaskDecorator]): Decorators to be performed before each task @@ -35,9 +41,7 @@ def __init__(self, name: str = None, request_timeout: int = 0, hostname: str = N max_task_count (int): The maximum amount of tasks the worker can handle simultaniously """ super().__init__(before, after) - self.zeebe_adapter = ZeebeAdapter(hostname=hostname, port=port, credentials=credentials, - secure_connection=secure_connection, - max_connection_retries=max_connection_retries) + self.zeebe_adapter = ZeebeAdapter(grpc_channel, max_connection_retries) self.name = name or socket.gethostname() self.request_timeout = request_timeout self.watcher_max_errors_factor = watcher_max_errors_factor @@ -58,19 +62,26 @@ async def work(self) -> None: ZeebeInternalError: If Zeebe experiences an internal error """ - self.zeebe_adapter.connect() self._job_executors, self._job_pollers = [], [] for task in self.tasks: jobs_queue: asyncio.Queue = asyncio.Queue() - poller = JobPoller(self.zeebe_adapter, task, jobs_queue, - self.name, self.request_timeout, self._task_state, self.max_task_count) + poller = JobPoller( + self.zeebe_adapter, + task, + jobs_queue, + self.name, + self.request_timeout, + self._task_state, + self.max_task_count, + ) executor = JobExecutor(task, jobs_queue, self._task_state) self._job_pollers.append(poller) self._job_executors.append(executor) - coroutines = [poller.poll() for poller in self._job_pollers] + \ - [executor.execute() for executor in self._job_executors] + coroutines = [poller.poll() for poller in self._job_pollers] + [ + executor.execute() for executor in self._job_executors + ] return await asyncio.gather(*coroutines) @@ -84,8 +95,6 @@ async def stop(self) -> None: for executor in self._job_executors: await executor.stop() - await self.zeebe_adapter.disconnect() - def include_router(self, *routers: ZeebeTaskRouter) -> None: """ Adds all router's tasks to the worker. diff --git a/setup.py b/setup.py index 597a6e98..95297dc5 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="pyzeebe", - version="3.0.0rc2", + version="3.0.0rc3", author="Jonatan Martens", author_email="jonatanmartenstav@gmail.com", description="Zeebe client api", diff --git a/tests/integration/integration_test.py b/tests/integration/integration_test.py index 0ac7b4ee..4a8e4986 100644 --- a/tests/integration/integration_test.py +++ b/tests/integration/integration_test.py @@ -3,30 +3,36 @@ from typing import Dict from uuid import uuid4 +import grpc import pytest -from pyzeebe import Job, ZeebeClient, ZeebeWorker +from pyzeebe import Job, ZeebeClient, ZeebeWorker, create_insecure_channel from pyzeebe.errors import ProcessDefinitionNotFoundError -@pytest.fixture(scope="session") +@pytest.fixture(scope="module") def event_loop(): loop = asyncio.new_event_loop() yield loop loop.close() -@pytest.fixture(scope="session") -def zeebe_client(): - return ZeebeClient() +@pytest.fixture(scope="module") +def grpc_channel(): + return create_insecure_channel() -@pytest.fixture(scope="session") -def zeebe_worker(): - return ZeebeWorker() +@pytest.fixture(scope="module") +def zeebe_client(grpc_channel: grpc.aio.Channel): + return ZeebeClient(grpc_channel) -@pytest.fixture(autouse=True, scope="session") +@pytest.fixture(scope="module") +def zeebe_worker(grpc_channel): + return ZeebeWorker(grpc_channel) + + +@pytest.fixture(autouse=True, scope="module") def task(zeebe_worker: ZeebeWorker): def exception_handler(exc: Exception, job: Job) -> None: job.set_error_status(f"Failed to run task {job.type}. Reason: {exc}") @@ -40,7 +46,7 @@ async def task_handler(should_throw: bool, input: str) -> Dict: @pytest.mark.asyncio -@pytest.fixture(autouse=True, scope="session") +@pytest.fixture(autouse=True, scope="module") async def deploy_process(zeebe_client: ZeebeClient): try: integration_tests_path = os.path.join("tests", "integration") @@ -51,7 +57,7 @@ async def deploy_process(zeebe_client: ZeebeClient): await zeebe_client.deploy_process("test.bpmn") -@pytest.fixture(autouse=True, scope="session") +@pytest.fixture(autouse=True, scope="module") def start_worker(event_loop: asyncio.AbstractEventLoop, zeebe_worker: ZeebeWorker): event_loop.create_task(zeebe_worker.work()) yield @@ -61,8 +67,7 @@ def start_worker(event_loop: asyncio.AbstractEventLoop, zeebe_worker: ZeebeWorke @pytest.mark.asyncio async def test_run_process(zeebe_client: ZeebeClient): process_key = await zeebe_client.run_process( - "test", - {"input": str(uuid4()), "should_throw": False} + "test", {"input": str(uuid4()), "should_throw": False} ) assert isinstance(process_key, int) @@ -77,8 +82,7 @@ async def test_non_existent_process(zeebe_client: ZeebeClient): async def test_run_process_with_result(zeebe_client: ZeebeClient): input = str(uuid4()) process_instance_key, process_result = await zeebe_client.run_process_with_result( - "test", - {"input": input, "should_throw": False} + "test", {"input": input, "should_throw": False} ) assert isinstance(process_instance_key, int) assert isinstance(process_result["output"], str) @@ -88,7 +92,6 @@ async def test_run_process_with_result(zeebe_client: ZeebeClient): @pytest.mark.asyncio async def test_cancel_process(zeebe_client: ZeebeClient): process_key = await zeebe_client.run_process( - "test", - {"input": str(uuid4()), "should_throw": False} + "test", {"input": str(uuid4()), "should_throw": False} ) await zeebe_client.cancel_process_instance(process_key) diff --git a/pyzeebe/credentials/__init__.py b/tests/unit/channel/__init__.py similarity index 100% rename from pyzeebe/credentials/__init__.py rename to tests/unit/channel/__init__.py diff --git a/tests/unit/channel/camunda_cloud_channel_test.py b/tests/unit/channel/camunda_cloud_channel_test.py new file mode 100644 index 00000000..1bc161b4 --- /dev/null +++ b/tests/unit/channel/camunda_cloud_channel_test.py @@ -0,0 +1,130 @@ +from uuid import uuid4 + +import grpc +import pytest +import responses +from mock import Mock, patch +from requests import HTTPError + +from pyzeebe import create_camunda_cloud_channel +from pyzeebe.errors import (InvalidCamundaCloudCredentialsError, + InvalidOAuthCredentialsError) + + +@pytest.fixture +def mocked_responses() -> responses.RequestsMock: + with responses.RequestsMock() as response_mock: + yield response_mock + + +@pytest.fixture +def client_id() -> str: + return str(uuid4()) + + +@pytest.fixture +def client_secret() -> str: + return str(uuid4()) + + +@pytest.fixture +def cluster_id() -> str: + return str(uuid4()) + + +@pytest.fixture +def url() -> str: + return "https://login.cloud.camunda.io/oauth/token" + + +@pytest.fixture +def access_token() -> str: + return str(uuid4()) + + +@pytest.fixture +def mock_access_token_response( + mocked_responses: responses.RequestsMock, url: str, access_token: str +): + mocked_responses.add( + responses.POST, url, json={"access_token": access_token}, status=200 + ) + + +class TestCamundaCloudChannel: + @pytest.fixture(autouse=True) + def secure_channel_mock(self, aio_grpc_channel: grpc.aio.Channel) -> Mock: + with patch("grpc.aio.secure_channel", return_value=aio_grpc_channel) as mock: + yield mock + + @pytest.mark.usefixtures("mock_access_token_response") + def test_returns_grpc_channel( + self, + mocked_responses: responses.RequestsMock, + client_id: str, + client_secret: str, + cluster_id: str, + ): + channel = create_camunda_cloud_channel(client_id, client_secret, cluster_id) + + assert isinstance(channel, grpc.aio.Channel) + + @pytest.mark.usefixtures("mock_access_token_response") + def test_gets_access_token_from_camunda_cloud( + self, + mocked_responses: responses.RequestsMock, + client_id: str, + client_secret: str, + cluster_id: str, + ): + create_camunda_cloud_channel(client_id, client_secret, cluster_id) + + assert len(mocked_responses.calls) == 1 + + @pytest.mark.usefixtures("mock_access_token_response") + def test_gets_access_token_using_correct_parameters( + self, + mocked_responses: responses.RequestsMock, + client_id: str, + client_secret: str, + cluster_id: str, + ): + expected_request_body = ( + f"client_id={client_id}&client_secret={client_secret}&audience={cluster_id}" + ) + + create_camunda_cloud_channel(client_id, client_secret, cluster_id) + + request = mocked_responses.calls[0].request + assert request.body == expected_request_body + + def test_raises_on_invalid_credentials( + self, + mocked_responses: responses.RequestsMock, + url: str, + client_id: str, + client_secret: str, + cluster_id: str, + ): + mocked_responses.add( + responses.POST, url, status=400 + ) # Camunda cloud returns 400 when invalid credentials are provided + + with pytest.raises(InvalidCamundaCloudCredentialsError): + create_camunda_cloud_channel(client_id, client_secret, cluster_id) + + @pytest.mark.usefixtures("mock_access_token_response") + def test_creates_channel_using_grpc_credentials( + self, + client_id: str, + client_secret: str, + cluster_id: str, + secure_channel_mock: Mock, + ): + create_camunda_cloud_channel(client_id, client_secret, cluster_id) + + secure_channel_call = secure_channel_mock.mock_calls[0] + arguments = [arg for arg in secure_channel_call.args] + assert any( + isinstance(arg, grpc.ChannelCredentials) for arg in arguments + ), "None of the arguments to grpc.aio.create_secure_channel were of type grpc.ChannelCredentials" diff --git a/tests/unit/channel/channel_options_test.py b/tests/unit/channel/channel_options_test.py new file mode 100644 index 00000000..bf5388f5 --- /dev/null +++ b/tests/unit/channel/channel_options_test.py @@ -0,0 +1,29 @@ +from copy import deepcopy + +import pytest + +import pyzeebe.channel.channel_options +from pyzeebe.channel.channel_options import get_channel_options + + +def test_get_channel_options_returns_tuple_of_tuple_with_options(): + assert get_channel_options() == (("grpc.keepalive_time_ms", 45000),) + + +def test_overrides_default_values_if_provided(): + grpc_options = {"grpc.keepalive_time_ms": 4000} + + assert get_channel_options(grpc_options) == (("grpc.keepalive_time_ms", 4000),) + + +def test_adds_custom_options(): + grpc_options = { + "grpc.keepalive_timeout_ms": 120000, + "grpc.http2.min_time_between_pings_ms": 60000, + } + + assert get_channel_options(grpc_options) == ( + ("grpc.keepalive_time_ms", 45000), + ("grpc.keepalive_timeout_ms", 120000), + ("grpc.http2.min_time_between_pings_ms", 60000), + ) diff --git a/tests/unit/channel/insecure_channel_test.py b/tests/unit/channel/insecure_channel_test.py new file mode 100644 index 00000000..ee1ead80 --- /dev/null +++ b/tests/unit/channel/insecure_channel_test.py @@ -0,0 +1,50 @@ +from uuid import uuid4 + +import grpc +import pytest +from mock import Mock, patch + +from pyzeebe import create_insecure_channel +from pyzeebe.channel.channel_options import get_channel_options +from pyzeebe.channel.utils import (DEFAULT_HOSTNAME, DEFAULT_PORT, + create_address) + + +class TestCreateInsecureChannel: + @pytest.fixture(autouse=True) + def insecure_channel_mock(self, aio_grpc_channel: grpc.aio.Channel): + with patch("grpc.aio.insecure_channel", return_value=aio_grpc_channel) as mock: + yield mock + + def test_returns_aio_grpc_channel(self): + channel = create_insecure_channel() + + assert isinstance(channel, grpc.aio.Channel) + + def test_calls_using_default_grpc_options(self, insecure_channel_mock: Mock): + create_insecure_channel() + + insecure_channel_call = insecure_channel_mock.mock_calls[0] + assert insecure_channel_call.kwargs["options"] == get_channel_options() + + def test_uses_default_address(self, insecure_channel_mock: Mock): + create_insecure_channel() + + insecure_channel_call = insecure_channel_mock.mock_calls[0] + assert insecure_channel_call.args[0] == create_address() + + def test_overrides_default_port_if_provided(self, insecure_channel_mock: Mock): + port = 123 + + create_insecure_channel(port=port) + + insecure_channel_call = insecure_channel_mock.mock_calls[0] + assert insecure_channel_call.args[0] == f"{DEFAULT_HOSTNAME}:{port}" + + def test_overrides_default_hostname_if_provided(self, insecure_channel_mock: Mock): + hostname = str(uuid4()) + + create_insecure_channel(hostname=hostname) + + insecure_channel_call = insecure_channel_mock.mock_calls[0] + assert insecure_channel_call.args[0] == f"{hostname}:{DEFAULT_PORT}" diff --git a/tests/unit/channel/secure_channel_test.py b/tests/unit/channel/secure_channel_test.py new file mode 100644 index 00000000..fc3bd843 --- /dev/null +++ b/tests/unit/channel/secure_channel_test.py @@ -0,0 +1,56 @@ +from uuid import uuid4 + +import grpc +import pytest +from mock import Mock, patch + +from pyzeebe import create_secure_channel +from pyzeebe.channel.channel_options import get_channel_options +from pyzeebe.channel.utils import (DEFAULT_HOSTNAME, DEFAULT_PORT, + create_address) + + +class TestCreateSecureChannel: + @pytest.fixture(autouse=True) + def secure_channel_mock(self, aio_grpc_channel: grpc.aio.Channel): + with patch("grpc.aio.secure_channel", return_value=aio_grpc_channel) as mock: + yield mock + + def test_returns_grpc_channel(self): + channel = create_secure_channel() + + assert isinstance(channel, grpc.aio.Channel) + + def test_uses_ssl_credentials_if_no_channel_credentials_provided(self): + with patch("grpc.ssl_channel_credentials") as ssl_mock: + create_secure_channel() + + ssl_mock.assert_called_once() + + def test_calls_using_default_grpc_options(self, secure_channel_mock: Mock): + create_secure_channel() + + secure_channel_call = secure_channel_mock.mock_calls[0] + assert secure_channel_call.kwargs["options"] == get_channel_options() + + def test_uses_default_address(self, secure_channel_mock: Mock): + create_secure_channel() + + secure_channel_call = secure_channel_mock.mock_calls[0] + assert secure_channel_call.args[0] == create_address() + + def test_overrides_default_port_if_provided(self, secure_channel_mock: Mock): + port = 123 + + create_secure_channel(port=port) + + secure_channel_call = secure_channel_mock.mock_calls[0] + assert secure_channel_call.args[0] == f"{DEFAULT_HOSTNAME}:{port}" + + def test_overrides_default_hostname_if_provided(self, secure_channel_mock: Mock): + hostname = str(uuid4()) + + create_secure_channel(hostname=hostname) + + secure_channel_call = secure_channel_mock.mock_calls[0] + assert secure_channel_call.args[0] == f"{hostname}:{DEFAULT_PORT}" diff --git a/tests/unit/channel/utils_test.py b/tests/unit/channel/utils_test.py new file mode 100644 index 00000000..7d262ff8 --- /dev/null +++ b/tests/unit/channel/utils_test.py @@ -0,0 +1,29 @@ +import os +from uuid import uuid4 + +from pyzeebe.channel.utils import DEFAULT_ADDRESS, create_address + + +class TestCreateAddress: + def test_returns_default_address(self): + address = create_address() + + assert address == DEFAULT_ADDRESS + + def test_default_port_is_26500(self): + address = create_address(hostname=str(uuid4())) + + assert address.split(":")[1] == "26500" + + def test_default_hostname_is_localhost(self): + address = create_address(port=12) + + assert address.split(":")[0] == "localhost" + + def test_returns_env_var_if_provided(self): + zeebe_address = str(uuid4()) + os.environ["ZEEBE_ADDRESS"] = zeebe_address + + address = create_address() + + assert address == zeebe_address diff --git a/tests/unit/client/sync_client_test.py b/tests/unit/client/sync_client_test.py index a7d46449..07e61720 100644 --- a/tests/unit/client/sync_client_test.py +++ b/tests/unit/client/sync_client_test.py @@ -1,5 +1,6 @@ from uuid import uuid4 +import grpc import pytest from mock import AsyncMock @@ -13,9 +14,8 @@ @pytest.fixture -def sync_zeebe_client(zeebe_adapter: ZeebeAdapter) -> SyncZeebeClient: - client = SyncZeebeClient() - client.zeebe_adapter = zeebe_adapter +def sync_zeebe_client(aio_grpc_channel: grpc.aio.Channel) -> SyncZeebeClient: + client = SyncZeebeClient(aio_grpc_channel) return client diff --git a/tests/conftest.py b/tests/unit/conftest.py similarity index 92% rename from tests/conftest.py rename to tests/unit/conftest.py index a7d1ace8..1ad88e2d 100644 --- a/tests/conftest.py +++ b/tests/unit/conftest.py @@ -43,22 +43,19 @@ def job_from_task(task): @pytest.fixture def zeebe_adapter(aio_grpc_channel: grpc.aio.Channel): - adapter = ZeebeAdapter() - adapter.connect(channel=aio_grpc_channel) + adapter = ZeebeAdapter(aio_grpc_channel) return adapter @pytest.fixture -def zeebe_client(zeebe_adapter): - client = ZeebeClient() - client.zeebe_adapter = zeebe_adapter +def zeebe_client(aio_grpc_channel: grpc.aio.Channel): + client = ZeebeClient(aio_grpc_channel) return client @pytest.fixture -def zeebe_worker(zeebe_adapter): - worker = ZeebeWorker() - worker.zeebe_adapter = zeebe_adapter +def zeebe_worker(aio_grpc_channel: grpc.aio.Channel): + worker = ZeebeWorker(aio_grpc_channel) return worker @@ -84,7 +81,7 @@ def task_config(task_type): single_value=False, variable_name="", before=[], - after=[] + after=[], ) diff --git a/tests/unit/credentials/__init__.py b/tests/unit/credentials/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/unit/credentials/camunda_cloud_credentials_test.py b/tests/unit/credentials/camunda_cloud_credentials_test.py deleted file mode 100644 index a180bf4c..00000000 --- a/tests/unit/credentials/camunda_cloud_credentials_test.py +++ /dev/null @@ -1,28 +0,0 @@ -from uuid import uuid4 - -import pytest -from mock import MagicMock, patch - -from pyzeebe.credentials.camunda_cloud_credentials import \ - CamundaCloudCredentials -from pyzeebe.errors import (InvalidCamundaCloudCredentialsError, - InvalidOAuthCredentialsError) - - -def test_init(): - client_id = str(uuid4()) - client_secret = str(uuid4()) - cluster_id = str(uuid4()) - - with patch("pyzeebe.credentials.oauth_credentials.OAuthCredentials.__init__") as init: - CamundaCloudCredentials(client_id, client_secret, cluster_id) - init.assert_called_with(url=f"https://login.cloud.camunda.io/oauth/token", client_id=client_id, - client_secret=client_secret, audience=f"{cluster_id}.zeebe.camunda.io") - - -def test_invalid_credentials(): - CamundaCloudCredentials.get_access_token = MagicMock( - side_effect=InvalidOAuthCredentialsError(str(uuid4()), str(uuid4()), str(uuid4()))) - - with pytest.raises(InvalidCamundaCloudCredentialsError): - CamundaCloudCredentials(str(uuid4()), str(uuid4()), str(uuid4())) diff --git a/tests/unit/credentials/oauth_credentials_test.py b/tests/unit/credentials/oauth_credentials_test.py deleted file mode 100644 index 44e1d82c..00000000 --- a/tests/unit/credentials/oauth_credentials_test.py +++ /dev/null @@ -1,33 +0,0 @@ -from uuid import uuid4 - -import pytest -from mock import patch -from requests import HTTPError - -from pyzeebe.credentials.oauth_credentials import OAuthCredentials -from pyzeebe.errors import InvalidOAuthCredentialsError - - -def test_get_access_token(): - with patch("requests_oauthlib.OAuth2Session.post") as post_mock: - url = f"https://{str(uuid4())}/oauth/token" - client_id = str(uuid4()) - client_secret = str(uuid4()) - audience = str(uuid4()) - OAuthCredentials.get_access_token( - url=url, client_id=client_id, client_secret=client_secret, audience=audience) - post_mock.assert_called_with(url, - data={ - "client_id": client_id, - "client_secret": client_secret, - "audience": audience - }) - - -def test_get_invalid_access_token(): - with patch("requests_oauthlib.OAuth2Session.post") as post_mock: - post_mock.side_effect = HTTPError() - - with pytest.raises(InvalidOAuthCredentialsError): - OAuthCredentials.get_access_token(url=f"https://{str(uuid4())}/oauth/token", client_id=str(uuid4()), - client_secret=str(uuid4()), audience=str(uuid4())) diff --git a/tests/unit/grpc_internals/channel_options_test.py b/tests/unit/grpc_internals/channel_options_test.py deleted file mode 100644 index 71a967fd..00000000 --- a/tests/unit/grpc_internals/channel_options_test.py +++ /dev/null @@ -1,69 +0,0 @@ -from copy import deepcopy -from unittest.mock import Mock, patch - -import pytest - -import pyzeebe.grpc_internals.channel_options -from pyzeebe.grpc_internals.channel_options import get_channel_options -from pyzeebe.grpc_internals.zeebe_adapter_base import ZeebeAdapterBase - - -@pytest.fixture -def revert_monkeypatch_after_test(): - """ - This sort of exists in pytest already (docs.pytest.org/en/stable/monkeypatch.html), - however that means a bit of "magic" happens, this is a bit clearer and tests the users - approach to this. - """ - options_before = deepcopy(pyzeebe.grpc_internals.channel_options.GRPC_CHANNEL_OPTIONS) - yield - pyzeebe.grpc_internals.channel_options.GRPC_CHANNEL_OPTIONS = options_before - - -def test_get_channel_options_returns_tuple_of_tuple_with_options(): - assert get_channel_options() == ( - ("grpc.keepalive_time_ms", 45000), - ) - - -@pytest.mark.parametrize("grpc_method,call_kwargs", - [ - ("grpc.aio.secure_channel", {"secure_connection": True}), - ("grpc.aio.insecure_channel", {}), - ("grpc.aio.secure_channel", {"credentials": Mock()}), - ]) -def test_create_channel_called_with_options(grpc_method, call_kwargs, zeebe_adapter): - with patch(grpc_method) as channel_mock: - ZeebeAdapterBase(**call_kwargs).connect() - expected_options = (('grpc.keepalive_time_ms', 45000),) - # `call_args.kwargs` as it's not available in python <=3.7 - # https://docs.python.org/3/library/unittest.mock.html#unittest.mock.Mock.call_args - # 0 is args, 1 is kwargs - assert channel_mock.call_args[1]["options"] == expected_options - - -@pytest.mark.usefixtures("revert_monkeypatch_after_test") -def test_monkeypatching_with_options_override(): - pyzeebe.grpc_internals.channel_options.GRPC_CHANNEL_OPTIONS["grpc.keepalive_time_ms"] = 4000 - assert get_channel_options() == ( - ("grpc.keepalive_time_ms", 4000), - ) - - -@pytest.mark.usefixtures("revert_monkeypatch_after_test") -def test_monkeypatching_with_options_added(): - pyzeebe.grpc_internals.channel_options.GRPC_CHANNEL_OPTIONS.update({ - "grpc.keepalive_timeout_ms": 120000, - "grpc.http2.min_time_between_pings_ms": 60000, - }) - assert get_channel_options() == ( - ("grpc.keepalive_time_ms", 45000), - ("grpc.keepalive_timeout_ms", 120000), - ("grpc.http2.min_time_between_pings_ms", 60000) - ) - - -@pytest.mark.usefixtures("revert_monkeypatch_after_test") -def test_monkeypatching_with_options_removed(): - pyzeebe.grpc_internals.channel_options.GRPC_CHANNEL_OPTIONS = {} - assert get_channel_options() == () diff --git a/tests/unit/grpc_internals/grpc_channel_utils_test.py b/tests/unit/grpc_internals/grpc_channel_utils_test.py deleted file mode 100644 index 618493fa..00000000 --- a/tests/unit/grpc_internals/grpc_channel_utils_test.py +++ /dev/null @@ -1,90 +0,0 @@ -from random import randint -from uuid import uuid4 - -from mock import MagicMock, patch - -from pyzeebe.grpc_internals.grpc_channel_utils import (create_channel, - create_connection_uri) - - -class TestCreateChannel: - def test_creates_insecure_channel_on_default(self): - with patch("grpc.aio.insecure_channel") as channel_mock: - create_channel(str(uuid4())) - - channel_mock.assert_called_once() - - def test_creates_secure_channel_when_given_credentials(self): - with patch("grpc.aio.secure_channel") as channel_mock: - create_channel(str(uuid4()), credentials=MagicMock()) - - channel_mock.assert_called_once() - - def test_creates_secure_channel_when_secure_connection_is_enabled(self): - with patch("grpc.aio.secure_channel") as channel_mock: - create_channel(str(uuid4()), secure_connection=True) - - channel_mock.assert_called_once() - - def test_creates_secure_channel_with_default_options(self): - with patch("grpc.aio.insecure_channel") as channel_mock: - create_channel(str(uuid4()), options=None) - - expected_options = ( - ("grpc.keepalive_time_ms", 45000), - ) - assert channel_mock.call_args[1]["options"] == expected_options - - def test_creates_insecure_channel_with_options(self): - with patch("grpc.aio.insecure_channel") as channel_mock: - additional_options = { - "grpc.keepalive_timeout_ms": 120000, - "grpc.http2.min_time_between_pings_ms": 60000, - } - create_channel(str(uuid4()), options=additional_options) - - expected_options = ( - ("grpc.keepalive_time_ms", 45000), - ("grpc.keepalive_timeout_ms", 120000), - ("grpc.http2.min_time_between_pings_ms", 60000), - ) - assert channel_mock.call_args[1]["options"] == expected_options - -class TestCreateConnectionUri: - def test_uses_credentials_first(self): - credentials = MagicMock(return_value=str(uuid4())) - - connection_uri = create_connection_uri( - credentials=credentials, hostname="localhost", port=123 - ) - - assert connection_uri == credentials.get_connection_uri() - - def test_uses_hostname_and_port_when_given(self): - hostname = str(uuid4()) - port = randint(0, 10000) - - connection_uri = create_connection_uri(hostname, port) - - assert connection_uri == f"{hostname}:{port}" - - def test_default_port_value_is_26500(self): - hostname = str(uuid4()) - - connection_uri = create_connection_uri(hostname) - - assert connection_uri == f"{hostname}:26500" - - def test_default_hostname_is_localhost(self): - port = randint(0, 10000) - - connection_uri = create_connection_uri(port=port) - - assert connection_uri == f"localhost:{port}" - - def test_default_values_are_taken_from_env_variables(self): - address = f"{str(uuid4())}:{randint(0, 10000)}" - with patch("os.getenv", return_value=address): - connection_uri = create_connection_uri() - - assert connection_uri == address diff --git a/tests/unit/worker/worker_test.py b/tests/unit/worker/worker_test.py index 8bd46da3..bd94f43b 100644 --- a/tests/unit/worker/worker_test.py +++ b/tests/unit/worker/worker_test.py @@ -1,6 +1,7 @@ from typing import List from uuid import uuid4 +import grpc import pytest from pyzeebe import TaskDecorator, ZeebeTaskRouter @@ -55,13 +56,17 @@ def test_add_after_decorator(self, zeebe_worker: ZeebeWorker, decorator: TaskDec assert len(zeebe_worker._after) == 1 assert decorator in zeebe_worker._after - def test_add_constructor_before_decorator(self, decorator: TaskDecorator): - zeebe_worker = ZeebeWorker(before=[decorator]) + def test_add_constructor_before_decorator( + self, aio_grpc_channel: grpc.aio.Channel, decorator: TaskDecorator + ): + zeebe_worker = ZeebeWorker(aio_grpc_channel, before=[decorator]) assert len(zeebe_worker._before) == 1 assert decorator in zeebe_worker._before - def test_add_constructor_after_decorator(self, decorator: TaskDecorator): - zeebe_worker = ZeebeWorker(after=[decorator]) + def test_add_constructor_after_decorator( + self, aio_grpc_channel: grpc.aio.Channel, decorator: TaskDecorator + ): + zeebe_worker = ZeebeWorker(aio_grpc_channel, after=[decorator]) assert len(zeebe_worker._after) == 1 assert decorator in zeebe_worker._after