From c44eecffd775a102870bc41d2e0b8e12e846db5f Mon Sep 17 00:00:00 2001 From: Kristoffer Bakkejord Date: Thu, 22 Oct 2020 19:26:43 +0200 Subject: [PATCH 1/5] Create a logger for the module and use it for logging --- pyzeebe/grpc_internals/zeebe_adapter_base.py | 11 ++++++----- pyzeebe/grpc_internals/zeebe_job_adapter.py | 4 +++- pyzeebe/worker/task_handler.py | 4 +++- pyzeebe/worker/worker.py | 18 ++++++++++-------- 4 files changed, 22 insertions(+), 15 deletions(-) diff --git a/pyzeebe/grpc_internals/zeebe_adapter_base.py b/pyzeebe/grpc_internals/zeebe_adapter_base.py index 6e4cbba9..d68f30d0 100644 --- a/pyzeebe/grpc_internals/zeebe_adapter_base.py +++ b/pyzeebe/grpc_internals/zeebe_adapter_base.py @@ -7,6 +7,7 @@ from pyzeebe.credentials.base_credentials import BaseCredentials from pyzeebe.exceptions import ZeebeBackPressure, ZeebeGatewayUnavailable, ZeebeInternalError +logger = logging.getLogger(__name__) class ZeebeAdapterBase(object): def __init__(self, hostname: str = None, port: int = None, credentials: BaseCredentials = None, @@ -44,21 +45,21 @@ def _create_channel(connection_uri: str, credentials: BaseCredentials = None, return grpc.insecure_channel(connection_uri) def _check_connectivity(self, value: grpc.ChannelConnectivity) -> None: - logging.debug(f"Grpc channel connectivity changed to: {value}") + logger.debug(f"Grpc channel connectivity changed to: {value}") if value in [grpc.ChannelConnectivity.READY, grpc.ChannelConnectivity.IDLE]: - logging.debug(f"Connected to {self.connection_uri or 'zeebe'}") + logger.debug(f"Connected to {self.connection_uri or 'zeebe'}") self.connected = True self.retrying_connection = False elif value == grpc.ChannelConnectivity.CONNECTING: - logging.debug(f"Connecting to {self.connection_uri or 'zeebe'}.") + logger.debug(f"Connecting to {self.connection_uri or 'zeebe'}.") self.connected = False self.retrying_connection = True elif value == grpc.ChannelConnectivity.TRANSIENT_FAILURE: - logging.warning(f"Lost connection to {self.connection_uri or 'zeebe'}. Retrying...") + logger.warning(f"Lost connection to {self.connection_uri or 'zeebe'}. Retrying...") self.connected = False self.retrying_connection = True elif value == grpc.ChannelConnectivity.SHUTDOWN: - logging.error(f"Failed to establish connection to {self.connection_uri or 'zeebe'}. Non recoverable") + logger.error(f"Failed to establish connection to {self.connection_uri or 'zeebe'}. Non recoverable") self.connected = False self.retrying_connection = False raise ConnectionAbortedError(f"Lost connection to {self.connection_uri or 'zeebe'}") diff --git a/pyzeebe/grpc_internals/zeebe_job_adapter.py b/pyzeebe/grpc_internals/zeebe_job_adapter.py index b6782571..f967425b 100644 --- a/pyzeebe/grpc_internals/zeebe_job_adapter.py +++ b/pyzeebe/grpc_internals/zeebe_job_adapter.py @@ -11,6 +11,8 @@ from pyzeebe.job.job import Job +logger = logging.getLogger(__name__) + class ZeebeJobAdapter(ZeebeAdapterBase): def activate_jobs(self, task_type: str, worker: str, timeout: int, max_jobs_to_activate: int, variables_to_fetch: List[str], request_timeout: int) -> Generator[Job, None, None]: @@ -21,7 +23,7 @@ def activate_jobs(self, task_type: str, worker: str, timeout: int, max_jobs_to_a fetchVariable=variables_to_fetch, requestTimeout=request_timeout)): for raw_job in response.jobs: job = self._create_job_from_raw_job(raw_job) - logging.debug(f"Got job: {job} from zeebe") + logger.debug(f"Got job: {job} from zeebe") yield job except grpc.RpcError as rpc_error: if self.is_error_status(rpc_error, grpc.StatusCode.INVALID_ARGUMENT): diff --git a/pyzeebe/worker/task_handler.py b/pyzeebe/worker/task_handler.py index fc7a62b1..cfec777c 100644 --- a/pyzeebe/worker/task_handler.py +++ b/pyzeebe/worker/task_handler.py @@ -10,8 +10,10 @@ from pyzeebe.task.task_decorator import TaskDecorator +logger = logging.getLogger(__name__) + def default_exception_handler(e: Exception, job: Job) -> None: - logging.warning(f"Task type: {job.type} - failed job {job}. Error: {e}.") + logger.warning(f"Task type: {job.type} - failed job {job}. Error: {e}.") job.set_failure_status(f"Failed job. Error: {e}") diff --git a/pyzeebe/worker/worker.py b/pyzeebe/worker/worker.py index f2148bd5..f8471c13 100644 --- a/pyzeebe/worker/worker.py +++ b/pyzeebe/worker/worker.py @@ -13,6 +13,8 @@ from pyzeebe.worker.task_router import ZeebeTaskRouter +logger = logging.getLogger(__name__) + class ZeebeWorker(ZeebeTaskHandler): """A zeebe worker that can connect to a zeebe instance and perform tasks.""" @@ -58,10 +60,10 @@ def stop(self) -> None: self.stop_event.set() def _handle_task(self, task: Task) -> None: - logging.debug(f"Handling task {task}") + logger.debug(f"Handling task {task}") while not self.stop_event.is_set() and self.zeebe_adapter.connected or self.zeebe_adapter.retrying_connection: if self.zeebe_adapter.retrying_connection: - logging.info(f"Retrying connection to {self.zeebe_adapter.connection_uri or 'zeebe'}") + logger.info(f"Retrying connection to {self.zeebe_adapter.connection_uri or 'zeebe'}") continue self._handle_jobs(task) @@ -69,11 +71,11 @@ def _handle_task(self, task: Task) -> None: def _handle_jobs(self, task: Task) -> None: for job in self._get_jobs(task): thread = Thread(target=task.handler, args=(job,)) - logging.debug(f"Running job: {job}") + logger.debug(f"Running job: {job}") thread.start() def _get_jobs(self, task: Task) -> Generator[Job, None, None]: - logging.debug(f"Activating jobs for task: {task}") + logger.debug(f"Activating jobs for task: {task}") return self.zeebe_adapter.activate_jobs(task_type=task.type, worker=self.name, timeout=task.timeout, max_jobs_to_activate=task.max_jobs_to_activate, variables_to_fetch=task.variables_to_fetch, @@ -154,17 +156,17 @@ def _run_task_inner_function(task: Task, job: Job) -> Tuple[Job, bool]: job.variables = task.inner_function(**job.variables) task_succeeded = True except Exception as e: - logging.debug(f"Failed job: {job}. Error: {e}.") + logger.debug(f"Failed job: {job}. Error: {e}.") task.exception_handler(e, job) finally: return job, task_succeeded def _complete_job(self, job: Job) -> None: try: - logging.debug(f"Completing job: {job}") + logger.debug(f"Completing job: {job}") self.zeebe_adapter.complete_job(job_key=job.key, variables=job.variables) except Exception as e: - logging.warning(f"Failed to complete job: {job}. Error: {e}") + logger.warning(f"Failed to complete job: {job}. Error: {e}") def _create_before_decorator_runner(self, task: Task) -> Callable[[Job], Job]: decorators = task._before.copy() @@ -190,5 +192,5 @@ def _run_decorator(decorator: TaskDecorator, job: Job) -> Job: try: return decorator(job) except Exception as e: - logging.warning(f"Failed to run decorator {decorator}. Error: {e}") + logger.warning(f"Failed to run decorator {decorator}. Error: {e}") return job From 14ef70df7bfe48455d75f840fa73c5c4ba0d2863 Mon Sep 17 00:00:00 2001 From: Kristoffer Bakkejord Date: Thu, 22 Oct 2020 19:29:02 +0200 Subject: [PATCH 2/5] Fix test after changing to logger instantiated in module --- tests/unit/worker/task_handler_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/worker/task_handler_test.py b/tests/unit/worker/task_handler_test.py index b5a8b24a..98502031 100644 --- a/tests/unit/worker/task_handler_test.py +++ b/tests/unit/worker/task_handler_test.py @@ -108,7 +108,7 @@ def no_dict_fn(x): def test_default_exception_handler(): - with patch("logging.warning") as logging_mock: + with patch("pyzeebe.worker.task_handler.logger.warning") as logging_mock: with patch("pyzeebe.job.job.Job.set_failure_status") as failure_mock: failure_mock.return_value = None job = random_job() From 19f7bd1206039b8179d4e9af19dcd52e64cf9bc6 Mon Sep 17 00:00:00 2001 From: Kristoffer Bakkejord Date: Thu, 22 Oct 2020 19:29:28 +0200 Subject: [PATCH 3/5] Add a test that ensures that we're logging with a named logger --- tests/unit/grpc_internals/zeebe_adapter_base_test.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/unit/grpc_internals/zeebe_adapter_base_test.py b/tests/unit/grpc_internals/zeebe_adapter_base_test.py index 0f312ce0..5742a920 100644 --- a/tests/unit/grpc_internals/zeebe_adapter_base_test.py +++ b/tests/unit/grpc_internals/zeebe_adapter_base_test.py @@ -44,6 +44,18 @@ def test_connectivity_transient_failure(): assert not zeebe_adapter.connected +def test_connectivity_transient_failure_logs_warning(caplog): + zeebe_adapter._check_connectivity(grpc.ChannelConnectivity.TRANSIENT_FAILURE) + expected_logger = 'pyzeebe.grpc_internals.zeebe_adapter_base' + expected_level = "WARNING" + matching_logs = [ + l + for l in caplog.records + if l.name == expected_logger and l.levelname == expected_level + ] + assert len(matching_logs) > 0 + + def test_connectivity_shutdown(): with pytest.raises(ConnectionAbortedError): zeebe_adapter._check_connectivity(grpc.ChannelConnectivity.SHUTDOWN) From ac0db46332405a87df82d481ebe39f690a09cdad Mon Sep 17 00:00:00 2001 From: Kristoffer Bakkejord Date: Thu, 22 Oct 2020 21:26:07 +0200 Subject: [PATCH 4/5] Use preferred double over single quotes, meaningful variable name --- tests/unit/grpc_internals/zeebe_adapter_base_test.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/unit/grpc_internals/zeebe_adapter_base_test.py b/tests/unit/grpc_internals/zeebe_adapter_base_test.py index 5742a920..691f6a53 100644 --- a/tests/unit/grpc_internals/zeebe_adapter_base_test.py +++ b/tests/unit/grpc_internals/zeebe_adapter_base_test.py @@ -46,12 +46,12 @@ def test_connectivity_transient_failure(): def test_connectivity_transient_failure_logs_warning(caplog): zeebe_adapter._check_connectivity(grpc.ChannelConnectivity.TRANSIENT_FAILURE) - expected_logger = 'pyzeebe.grpc_internals.zeebe_adapter_base' + expected_logger = "pyzeebe.grpc_internals.zeebe_adapter_base" expected_level = "WARNING" matching_logs = [ - l - for l in caplog.records - if l.name == expected_logger and l.levelname == expected_level + log + for log in caplog.records + if log.name == expected_logger and log.levelname == expected_level ] assert len(matching_logs) > 0 From 544d933b5a3f966245bb7ad27a811afbb9a8d8b0 Mon Sep 17 00:00:00 2001 From: Jonatan Martens Date: Thu, 22 Oct 2020 22:32:31 +0300 Subject: [PATCH 5/5] Bumped version to v2.2.2 --- docs/conf.py | 2 +- pyzeebe/__init__.py | 2 +- setup.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index 86364970..632b2cff 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -59,6 +59,6 @@ # so a file named "default.css" will overwrite the builtin "default.css". html_static_path = ['_static'] -version = "2.2.1" +version = "2.2.2" master_doc = 'index' diff --git a/pyzeebe/__init__.py b/pyzeebe/__init__.py index e754804d..234a7a69 100644 --- a/pyzeebe/__init__.py +++ b/pyzeebe/__init__.py @@ -1,4 +1,4 @@ -__version__ = "2.2.1" +__version__ = "2.2.2" from pyzeebe import exceptions from pyzeebe.client.client import ZeebeClient diff --git a/setup.py b/setup.py index 55567333..4b3426ca 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="pyzeebe", - version="2.2.1", + version="2.2.2", author="Jonatan Martens", author_email="jonatanmartenstav@gmail.com", description="Zeebe client api",