Skip to content

Commit

Permalink
Merge pull request #70 from JonatanMartens/development
Browse files Browse the repository at this point in the history
v2.2.2
  • Loading branch information
JonatanMartens authored Oct 22, 2020
2 parents 212b7ff + 544d933 commit f83e992
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 19 deletions.
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']

version = "2.2.1"
version = "2.2.2"

master_doc = 'index'
2 changes: 1 addition & 1 deletion pyzeebe/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "2.2.1"
__version__ = "2.2.2"

from pyzeebe import exceptions
from pyzeebe.client.client import ZeebeClient
Expand Down
11 changes: 6 additions & 5 deletions pyzeebe/grpc_internals/zeebe_adapter_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pyzeebe.credentials.base_credentials import BaseCredentials
from pyzeebe.exceptions import ZeebeBackPressure, ZeebeGatewayUnavailable, ZeebeInternalError

logger = logging.getLogger(__name__)

class ZeebeAdapterBase(object):
def __init__(self, hostname: str = None, port: int = None, credentials: BaseCredentials = None,
Expand Down Expand Up @@ -44,21 +45,21 @@ def _create_channel(connection_uri: str, credentials: BaseCredentials = None,
return grpc.insecure_channel(connection_uri)

def _check_connectivity(self, value: grpc.ChannelConnectivity) -> None:
logging.debug(f"Grpc channel connectivity changed to: {value}")
logger.debug(f"Grpc channel connectivity changed to: {value}")
if value in [grpc.ChannelConnectivity.READY, grpc.ChannelConnectivity.IDLE]:
logging.debug(f"Connected to {self.connection_uri or 'zeebe'}")
logger.debug(f"Connected to {self.connection_uri or 'zeebe'}")
self.connected = True
self.retrying_connection = False
elif value == grpc.ChannelConnectivity.CONNECTING:
logging.debug(f"Connecting to {self.connection_uri or 'zeebe'}.")
logger.debug(f"Connecting to {self.connection_uri or 'zeebe'}.")
self.connected = False
self.retrying_connection = True
elif value == grpc.ChannelConnectivity.TRANSIENT_FAILURE:
logging.warning(f"Lost connection to {self.connection_uri or 'zeebe'}. Retrying...")
logger.warning(f"Lost connection to {self.connection_uri or 'zeebe'}. Retrying...")
self.connected = False
self.retrying_connection = True
elif value == grpc.ChannelConnectivity.SHUTDOWN:
logging.error(f"Failed to establish connection to {self.connection_uri or 'zeebe'}. Non recoverable")
logger.error(f"Failed to establish connection to {self.connection_uri or 'zeebe'}. Non recoverable")
self.connected = False
self.retrying_connection = False
raise ConnectionAbortedError(f"Lost connection to {self.connection_uri or 'zeebe'}")
Expand Down
4 changes: 3 additions & 1 deletion pyzeebe/grpc_internals/zeebe_job_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from pyzeebe.job.job import Job


logger = logging.getLogger(__name__)

class ZeebeJobAdapter(ZeebeAdapterBase):
def activate_jobs(self, task_type: str, worker: str, timeout: int, max_jobs_to_activate: int,
variables_to_fetch: List[str], request_timeout: int) -> Generator[Job, None, None]:
Expand All @@ -21,7 +23,7 @@ def activate_jobs(self, task_type: str, worker: str, timeout: int, max_jobs_to_a
fetchVariable=variables_to_fetch, requestTimeout=request_timeout)):
for raw_job in response.jobs:
job = self._create_job_from_raw_job(raw_job)
logging.debug(f"Got job: {job} from zeebe")
logger.debug(f"Got job: {job} from zeebe")
yield job
except grpc.RpcError as rpc_error:
if self.is_error_status(rpc_error, grpc.StatusCode.INVALID_ARGUMENT):
Expand Down
4 changes: 3 additions & 1 deletion pyzeebe/worker/task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
from pyzeebe.task.task_decorator import TaskDecorator


logger = logging.getLogger(__name__)

def default_exception_handler(e: Exception, job: Job) -> None:
logging.warning(f"Task type: {job.type} - failed job {job}. Error: {e}.")
logger.warning(f"Task type: {job.type} - failed job {job}. Error: {e}.")
job.set_failure_status(f"Failed job. Error: {e}")


Expand Down
18 changes: 10 additions & 8 deletions pyzeebe/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from pyzeebe.worker.task_router import ZeebeTaskRouter


logger = logging.getLogger(__name__)

class ZeebeWorker(ZeebeTaskHandler):
"""A zeebe worker that can connect to a zeebe instance and perform tasks."""

Expand Down Expand Up @@ -58,22 +60,22 @@ def stop(self) -> None:
self.stop_event.set()

def _handle_task(self, task: Task) -> None:
logging.debug(f"Handling task {task}")
logger.debug(f"Handling task {task}")
while not self.stop_event.is_set() and self.zeebe_adapter.connected or self.zeebe_adapter.retrying_connection:
if self.zeebe_adapter.retrying_connection:
logging.info(f"Retrying connection to {self.zeebe_adapter.connection_uri or 'zeebe'}")
logger.info(f"Retrying connection to {self.zeebe_adapter.connection_uri or 'zeebe'}")
continue

self._handle_jobs(task)

def _handle_jobs(self, task: Task) -> None:
for job in self._get_jobs(task):
thread = Thread(target=task.handler, args=(job,))
logging.debug(f"Running job: {job}")
logger.debug(f"Running job: {job}")
thread.start()

def _get_jobs(self, task: Task) -> Generator[Job, None, None]:
logging.debug(f"Activating jobs for task: {task}")
logger.debug(f"Activating jobs for task: {task}")
return self.zeebe_adapter.activate_jobs(task_type=task.type, worker=self.name, timeout=task.timeout,
max_jobs_to_activate=task.max_jobs_to_activate,
variables_to_fetch=task.variables_to_fetch,
Expand Down Expand Up @@ -154,17 +156,17 @@ def _run_task_inner_function(task: Task, job: Job) -> Tuple[Job, bool]:
job.variables = task.inner_function(**job.variables)
task_succeeded = True
except Exception as e:
logging.debug(f"Failed job: {job}. Error: {e}.")
logger.debug(f"Failed job: {job}. Error: {e}.")
task.exception_handler(e, job)
finally:
return job, task_succeeded

def _complete_job(self, job: Job) -> None:
try:
logging.debug(f"Completing job: {job}")
logger.debug(f"Completing job: {job}")
self.zeebe_adapter.complete_job(job_key=job.key, variables=job.variables)
except Exception as e:
logging.warning(f"Failed to complete job: {job}. Error: {e}")
logger.warning(f"Failed to complete job: {job}. Error: {e}")

def _create_before_decorator_runner(self, task: Task) -> Callable[[Job], Job]:
decorators = task._before.copy()
Expand All @@ -190,5 +192,5 @@ def _run_decorator(decorator: TaskDecorator, job: Job) -> Job:
try:
return decorator(job)
except Exception as e:
logging.warning(f"Failed to run decorator {decorator}. Error: {e}")
logger.warning(f"Failed to run decorator {decorator}. Error: {e}")
return job
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="pyzeebe",
version="2.2.1",
version="2.2.2",
author="Jonatan Martens",
author_email="[email protected]",
description="Zeebe client api",
Expand Down
12 changes: 12 additions & 0 deletions tests/unit/grpc_internals/zeebe_adapter_base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ def test_connectivity_transient_failure():
assert not zeebe_adapter.connected


def test_connectivity_transient_failure_logs_warning(caplog):
zeebe_adapter._check_connectivity(grpc.ChannelConnectivity.TRANSIENT_FAILURE)
expected_logger = "pyzeebe.grpc_internals.zeebe_adapter_base"
expected_level = "WARNING"
matching_logs = [
log
for log in caplog.records
if log.name == expected_logger and log.levelname == expected_level
]
assert len(matching_logs) > 0


def test_connectivity_shutdown():
with pytest.raises(ConnectionAbortedError):
zeebe_adapter._check_connectivity(grpc.ChannelConnectivity.SHUTDOWN)
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/worker/task_handler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def no_dict_fn(x):


def test_default_exception_handler():
with patch("logging.warning") as logging_mock:
with patch("pyzeebe.worker.task_handler.logger.warning") as logging_mock:
with patch("pyzeebe.job.job.Job.set_failure_status") as failure_mock:
failure_mock.return_value = None
job = random_job()
Expand Down

0 comments on commit f83e992

Please sign in to comment.