Skip to content

Commit

Permalink
Merge pull request #146 from JonatanMartens/development
Browse files Browse the repository at this point in the history
v2.3.1
  • Loading branch information
JonatanMartens authored Mar 15, 2021
2 parents 3074209 + 8773095 commit 9162778
Show file tree
Hide file tree
Showing 8 changed files with 322 additions and 361 deletions.
10 changes: 5 additions & 5 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ verify_ssl = true

[dev-packages]
autopep8 = "~=1.5.4"
pylint = "~=2.6.0"
coverage = "~=5.4"
pylint = "~=2.7.2"
coverage = "~=5.5"
pytest = "~=6.2.2"
pytest-grpc = "~=0.8.0"
mypy = "~=0.800"
mypy = "~=0.812"
coveralls = "~=2.2.0"
importlib-metadata = "~=3.4.0"
importlib-metadata = "~=3.7.3"
pyzeebe = {editable = true, path = "."}
sphinx = "~=3.5.1"
sphinx = "~=3.5.2"
sphinx-rtd-theme = "*"
pytest-mock = "*"

Expand Down
205 changes: 104 additions & 101 deletions Pipfile.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
author = 'Jonatan Martens'

# The full version, including alpha/beta/rc tags
release = '2.3.0'
release = '2.3.1'

# -- General configuration ---------------------------------------------------

Expand Down Expand Up @@ -59,6 +59,6 @@
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']

version = "2.3.0"
version = "2.3.1"

master_doc = 'index'
2 changes: 1 addition & 1 deletion pyzeebe/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "2.3.0"
__version__ = "2.3.1"

from pyzeebe import exceptions
from pyzeebe.client.client import ZeebeClient
Expand Down
18 changes: 10 additions & 8 deletions pyzeebe/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(self, name: str = None, request_timeout: int = 0, hostname: str = N
before (List[TaskDecorator]): Decorators to be performed before each task
after (List[TaskDecorator]): Decorators to be performed after each task
max_connection_retries (int): Amount of connection retries before worker gives up on connecting to zeebe. To setup with infinite retries use -1
watcher_max_errors_factor (int): Number of consequtive errors for a task watcher will accept before raising MaxConsecutiveTaskThreadError
"""
super().__init__(before, after)
self.zeebe_adapter = ZeebeAdapter(hostname=hostname, port=port, credentials=credentials,
Expand Down Expand Up @@ -120,20 +121,21 @@ def _should_watch_threads(self) -> bool:
return not self.stop_event.is_set() and bool(self._task_threads)

def _watch_task_threads_runner(self, frequency: int = 10) -> None:
consecutive_errors = 0
consecutive_errors = {}
while self._should_watch_threads():
logger.debug("Checking task thread status")
# converting to list to avoid "RuntimeError: dictionary changed size during iteration"
for task_type in list(self._task_threads.keys()):
consecutive_errors.setdefault(task_type, 0)
# thread might be none, if dict changed size, in that case we'll consider it
# an error, and check if we should handle it
thread = self._task_threads.get(task_type)
if not thread or not thread.is_alive():
consecutive_errors += 1
self._check_max_errors(consecutive_errors)
consecutive_errors[task_type] += 1
self._check_max_errors(consecutive_errors[task_type], task_type)
self._handle_not_alive_thread(task_type)
else:
consecutive_errors = 0
consecutive_errors[task_type] = 0
time.sleep(frequency)

def _handle_not_alive_thread(self, task_type: str):
Expand All @@ -143,11 +145,11 @@ def _handle_not_alive_thread(self, task_type: str):
else:
logger.warning(f"Task thread {task_type} is not alive, but condition not met for restarting")

def _check_max_errors(self, consecutive_errors: int):
max_errors = self.watcher_max_errors_factor * len(self.tasks)
if consecutive_errors >= max_errors:
def _check_max_errors(self, consecutive_errors: int, task_type: str):
if consecutive_errors >= self.watcher_max_errors_factor:
raise MaxConsecutiveTaskThreadError(f"Number of consecutive errors ({consecutive_errors}) exceeded "
f"max allowed number of errors ({max_errors})")
f"max allowed number of errors ({self.watcher_max_errors_factor}) "
f" for task {task_type}", task_type)

def _restart_task_thread(self, task_type: str) -> None:
task = self.get_task(task_type)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="pyzeebe",
version="2.3.0",
version="2.3.1",
author="Jonatan Martens",
author_email="[email protected]",
description="Zeebe client api",
Expand Down
28 changes: 24 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from random import randint
from threading import Event
from unittest.mock import patch
from unittest.mock import patch, MagicMock
from uuid import uuid4

import pytest

from pyzeebe import ZeebeClient, ZeebeWorker, ZeebeTaskRouter
from pyzeebe import ZeebeClient, ZeebeWorker, ZeebeTaskRouter, Job
from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter
from pyzeebe.task.task import Task
from pyzeebe.worker.task_handler import ZeebeTaskHandler
Expand All @@ -23,6 +23,13 @@ def job_without_adapter():
return random_job()


@pytest.fixture
def job_from_task(task):
job = random_job(task)
job.variables = dict(x=str(uuid4()))
return job


@pytest.fixture
def zeebe_adapter(grpc_create_channel):
return ZeebeAdapter(channel=grpc_create_channel())
Expand All @@ -41,8 +48,13 @@ def zeebe_worker(zeebe_adapter):


@pytest.fixture
def task():
return Task(str(uuid4()), lambda x: {"x": x}, lambda x, y, z: x)
def task(task_type):
return Task(task_type, MagicMock(wraps=lambda x: dict(x=x)), MagicMock(wraps=lambda x, y, z: x))


@pytest.fixture
def task_type():
return str(uuid4())


@pytest.fixture
Expand Down Expand Up @@ -85,6 +97,14 @@ def task_handler():
return ZeebeTaskHandler()


@pytest.fixture
def decorator():
def simple_decorator(job: Job) -> Job:
return job

return MagicMock(wraps=simple_decorator)


@pytest.fixture(scope="module")
def grpc_add_to_server():
from zeebe_grpc.gateway_pb2_grpc import add_GatewayServicer_to_server
Expand Down
Loading

0 comments on commit 9162778

Please sign in to comment.