From 55e09e40292c347b81d0aae79ae358d4d9e2e9ee Mon Sep 17 00:00:00 2001 From: Erik Date: Thu, 2 Jan 2025 17:13:41 -0800 Subject: [PATCH] Pull out confusing stuff with logger and try/excepts in eventhub (#1) * Add method to ConnectionPool to allow callers to expire a particular connection * Expire unconnected Eventhub clients when send methods fail * Stop swallowing send exception failures * Improve test coverage for eventhub module to 99% --- aio_azure_clients_toolbox/clients/eventhub.py | 156 +++++++----------- .../connection_pooling.py | 27 ++- pyproject.toml | 2 +- tests/clients/test_eventhub.py | 146 ++++++++++++++++ uv.lock | 4 +- 5 files changed, 237 insertions(+), 98 deletions(-) diff --git a/aio_azure_clients_toolbox/clients/eventhub.py b/aio_azure_clients_toolbox/clients/eventhub.py index 2d58637..91a9aa3 100644 --- a/aio_azure_clients_toolbox/clients/eventhub.py +++ b/aio_azure_clients_toolbox/clients/eventhub.py @@ -3,7 +3,13 @@ from azure.eventhub import EventData, EventDataBatch, TransportType from azure.eventhub.aio import EventHubProducerClient -from azure.eventhub.exceptions import EventHubError +from azure.eventhub.exceptions import ( + AuthenticationError, + ClientClosedError, + ConnectError, + ConnectionLostError, + EventHubError, +) from azure.identity.aio import DefaultAzureCredential from aio_azure_clients_toolbox import connection_pooling @@ -39,7 +45,7 @@ def __init__( ) self._client: EventHubProducerClient | None = self.get_client() - def get_client(self): + def get_client(self) -> EventHubProducerClient: return EventHubProducerClient( fully_qualified_namespace=self.evhub_namespace, eventhub_name=self.evhub_name, @@ -48,7 +54,7 @@ def get_client(self): ) @property - def client(self): + def client(self) -> EventHubProducerClient: if self._client is None: self._client = self.get_client() return self._client @@ -62,8 +68,7 @@ async def send_event_data( self, event: EventData, partition_key: str | None = None, - log=None, - ): + ) -> EventDataBatch: """ Send a *single* EventHub event which is already encoded as `EventData`. @@ -79,15 +84,8 @@ async def send_event_data( # Add events to the batch. event_data_batch.add(event) - try: - # Send the batch of events to the event hub. - await self.client.send_batch(event_data_batch) - except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand. - if log is not None: - log.error("Eventhub Size of the event data list exceeds the size limit of a single send") - except EventHubError as eh_err: - if log is not None: - log.error("Eventhub Sending error: ", eh_err) + # Send the batch of events to the event hub. + await self.client.send_batch(event_data_batch) return event_data_batch @@ -95,8 +93,7 @@ async def send_event( self, event: bytes | str, partition_key: str | None = None, - log=None, - ): + ) -> EventDataBatch: """ Send a *single* EventHub event. See `send_events_batch` for sending multiple events @@ -113,15 +110,8 @@ async def send_event( # Add events to the batch. event_data_batch.add(EventData(event)) - try: - # Send the batch of events to the event hub. - await self.client.send_batch(event_data_batch) - except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand. - if log is not None: - log.error("Eventhub Size of the event data list exceeds the size limit of a single send") - except EventHubError as eh_err: - if log is not None: - log.error("Eventhub Sending error: ", eh_err) + # Send the batch of events to the event hub. + await self.client.send_batch(event_data_batch) return event_data_batch @@ -129,8 +119,7 @@ async def send_events_batch( self, events_list: list[bytes | str], partition_key: str | None = None, - log=None, - ): + ) -> EventDataBatch: """ Sending events in a batch is more performant than sending individual events. @@ -147,33 +136,20 @@ async def send_events_batch( for event in events_list: event_data_batch.add(EventData(event)) - try: - # Send the batch of events to the event hub. - await self.client.send_batch(event_data_batch) - except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand. - if log is not None: - log.error("Eventhub Size of the event data list exceeds the size limit of a single send") - except EventHubError as eh_err: - if log is not None: - log.error("Eventhub Sending error: ", eh_err) + # Send the batch of events to the event hub. + await self.client.send_batch(event_data_batch) + return event_data_batch async def send_events_data_batch( self, event_data_batch: EventDataBatch, - log=None, - ): + ) -> EventDataBatch: """ Sending events in a batch is more performant than sending individual events. """ - try: - # Send the batch of events to the event hub. - await self.client.send_batch(event_data_batch) - except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand. - if log is not None: - log.error("Eventhub Size of the event data list exceeds the size limit of a single send") - except EventHubError as eh_err: - if log is not None: - log.error("Eventhub Sending error: ", eh_err) + # Send the batch of events to the event hub. + await self.client.send_batch(event_data_batch) + return event_data_batch class ManagedAzureEventhubProducer(connection_pooling.AbstractorConnector): @@ -219,9 +195,6 @@ def __init__( ) self.ready_message = ready_message - def __getattr__(self, key): - return getattr - async def create(self): """Creates a new connection for our pool""" client = Eventhub( @@ -249,6 +222,10 @@ async def ready(self, conn: EventHubProducerClient) -> bool: # Send the batch of events to the event hub. await conn.send_batch(event_data_batch) return True + except AuthenticationError: + logger.warning("Eventhub readiness check failed due to authentication error. Cancelling.") + logger.error(f"{traceback.format_exc()}") + return False except EventHubError: logger.warning(f"Eventhub readiness check #{3 - attempts} failed; trying again.") logger.error(f"{traceback.format_exc()}") @@ -262,8 +239,7 @@ async def send_event_data( self, event: EventData, partition_key: str | None = None, - log=None, - ): + ) -> EventDataBatch: """ Send a *single* EventHub event which is already encoded as `EventData`. @@ -280,26 +256,24 @@ async def send_event_data( # Add events to the batch. event_data_batch.add(event) + logger.debug("Sending eventhub batch") + # Send the batch of events to the event hub. try: - logger.debug("Sending eventhub batch") - # Send the batch of events to the event hub. await conn.send_batch(event_data_batch) - except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand. - if log is not None: - log.error("Eventhub Size of the event data list exceeds the size limit of a single send") - except EventHubError as eh_err: - if log is not None: - log.error("Eventhub Sending error: ", eh_err) - - return event_data_batch + return event_data_batch + except (AuthenticationError, ClientClosedError, ConnectionLostError, ConnectError): + logger.error(f"Error sending event: {event}") + logger.error(f"{traceback.format_exc()}") + # Mark this connection closed so it won't be reused + await self.pool.expire_conn(conn) + raise @connection_pooling.send_time_deco(logger, "Eventhub.send_event") async def send_event( self, event: bytes | str, partition_key: str | None = None, - log=None, - ): + ) -> EventDataBatch: """ Send a *single* EventHub event. See `send_events_batch` for sending multiple events @@ -317,17 +291,17 @@ async def send_event( # Add events to the batch. event_data_batch.add(EventData(event)) + logger.debug("Sending eventhub batch") + try: - logger.debug("Sending eventhub batch") # Send the batch of events to the event hub. await conn.send_batch(event_data_batch) - except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand. - if log is not None: - log.error("Eventhub Size of the event data list exceeds the size limit of a single send") - except EventHubError as eh_err: - if log is not None: - log.error("Eventhub Sending error: ", eh_err) - + except (AuthenticationError, ClientClosedError, ConnectionLostError, ConnectError): + logger.error(f"Error sending event: {event}") + logger.error(f"{traceback.format_exc()}") + # Mark this connection closed so it won't be reused + await self.pool.expire_conn(conn) + raise return event_data_batch @connection_pooling.send_time_deco(logger, "Eventhub.send_events_batch") @@ -335,8 +309,7 @@ async def send_events_batch( self, events_list: list[bytes | str], partition_key: str | None = None, - log=None, - ): + ) -> EventDataBatch: """ Sending events in a batch is more performant than sending individual events. @@ -354,34 +327,33 @@ async def send_events_batch( for event in events_list: event_data_batch.add(EventData(event)) + logger.debug("Sending eventhub batch") + # Send the batch of events to the event hub. try: - logger.debug("Sending eventhub batch") - # Send the batch of events to the event hub. await conn.send_batch(event_data_batch) - except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand. - if log is not None: - log.error("Eventhub Size of the event data list exceeds the size limit of a single send") - except EventHubError as eh_err: - if log is not None: - log.error("Eventhub Sending error: ", eh_err) + return event_data_batch + except (AuthenticationError, ClientClosedError, ConnectionLostError, ConnectError): + logger.error(f"Error sending event: {traceback.format_exc()}") + # Mark this connection closed so it won't be reused + await self.pool.expire_conn(conn) + raise @connection_pooling.send_time_deco(logger, "Eventhub.send_events_data_batch") async def send_events_data_batch( self, event_data_batch: EventDataBatch, - log=None, - ): + ) -> EventDataBatch: """ Sending events in a batch is more performant than sending individual events. """ async with self.pool.get() as conn: + logger.debug("Sending eventhub batch") + # Send the batch of events to the event hub. try: - logger.debug("Sending eventhub batch") - # Send the batch of events to the event hub. await conn.send_batch(event_data_batch) - except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand. - if log is not None: - log.error("Eventhub Size of the event data list exceeds the size limit of a single send") - except EventHubError as eh_err: - if log is not None: - log.error("Eventhub Sending error: ", eh_err) + return event_data_batch + except (AuthenticationError, ClientClosedError, ConnectionLostError, ConnectError): + logger.error(f"Error sending batch {traceback.format_exc()}") + # Mark this connection closed so it won't be reused + await self.pool.expire_conn(conn) + raise diff --git a/aio_azure_clients_toolbox/connection_pooling.py b/aio_azure_clients_toolbox/connection_pooling.py index 7824e7f..c15743e 100644 --- a/aio_azure_clients_toolbox/connection_pooling.py +++ b/aio_azure_clients_toolbox/connection_pooling.py @@ -190,6 +190,14 @@ def __init__( self._id = (binascii.hexlify(os.urandom(3))).decode() self._ready = anyio.Event() + def __bool__(self): + if self._connection is None: + return False + return self.is_ready and not self.expired + + def eq(self, connection: AbstractConnection) -> bool: + return self._connection is connection + @property def available(self): """Check if connection exists and client usage limit has been reached""" @@ -436,8 +444,6 @@ def __init__( self.max_size = max_size if self.max_size < 1: raise ValueError("max_size must a postive integer") - # Number of available connections - self.connections: int = 0 self.connector = connector # A pool is just a heap of connection-managing things @@ -497,3 +503,20 @@ async def closeall(self) -> None: async with create_task_group() as tg: for conn in self._pool: tg.start_soon(conn.close) + + @property + def ready_connection_count(self): + return sum(1 for conn in self._pool if conn) + + async def expire_conn(self, connection: AbstractConnection) -> None: + """ + Expire a connection. + Because we yield AbstractConnections while our pool is SharedTransportConnections, + we need to give clients a way to look up a connection and expire it directly. + """ + for conn in self._pool: + if conn.eq(connection): + await conn.close() + break + heapq.heapify(self._pool) + return None diff --git a/pyproject.toml b/pyproject.toml index f9d4d2e..3ce17d6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "aio-azure-clients-toolbox" -version = "0.1.0" +version = "0.2.0" description = "Async Azure Clients Mulligan Python projects" authors = [ { "name" = "Erik Aker", "email" = "eaker@mulliganfunding.com" }, diff --git a/tests/clients/test_eventhub.py b/tests/clients/test_eventhub.py index 4b7c9de..6927096 100644 --- a/tests/clients/test_eventhub.py +++ b/tests/clients/test_eventhub.py @@ -2,6 +2,12 @@ import pytest from aio_azure_clients_toolbox.clients import eventhub +from azure.eventhub import EventData, EventDataBatch +from azure.eventhub.exceptions import AuthenticationError, ClientClosedError, ConnectError + +# Three calls to send a ready message +READY_MESSAGE_METHOD_CALLS = ["create_batch", "add", "send"] +READY_MESSAGE_CALL_COUNT = len(READY_MESSAGE_METHOD_CALLS) @pytest.fixture() @@ -13,6 +19,15 @@ def ehub(mockehub): ) +@pytest.fixture() +def managed_ehub(mockehub): + return eventhub.ManagedAzureEventhubProducer( + "namespace_url.example.net", + "name", + mock.AsyncMock(), # credential + ) + + def test_get_client(ehub, mockehub): assert ehub.get_client() == mockehub ehub._client = None @@ -34,6 +49,137 @@ async def test_evhub_send_event(ehub): assert len(ehub._client.method_calls) == 3 +async def test_evhub_send_event_data(ehub): + data = EventData(body=b"test") + await ehub.send_event_data(data) + assert len(ehub._client.method_calls) == 3 + + async def test_evhub_send_event_batch(ehub): await ehub.send_events_batch(["test1", "test2"]) assert len(ehub._client.method_calls) == 4 + + +async def test_evhub_send_events_data_batch(ehub): + batch = EventDataBatch() + batch.add(EventData(body=b"test1")) + batch.add(EventData(body=b"test2")) + + await ehub.send_events_data_batch(batch) + assert len(ehub._client.method_calls) == 1 + + +# # # # # # # # # # # # # # # # # # +# ---**--> Managed Client <--**--- +# # # # # # # # # # # # # # # # # # + + +async def test_managed_get_create(managed_ehub, mockehub): + assert await managed_ehub.create() == mockehub + + +async def test_managed_close(managed_ehub): + # set up + async with managed_ehub.pool.get() as _conn1: + async with managed_ehub.pool.get() as _conn2: + pass + assert managed_ehub.pool.ready_connection_count == 2 + await managed_ehub.close() + assert managed_ehub.pool.ready_connection_count == 0 + + +def get_mock_connection_from_pool(pool): + # This is expected to be a mock thing buried in here + return pool._pool[0]._connection + + +@pytest.fixture(params=[False, True]) +def mockehub_throwing(request, mockehub): + if request.param: + # We need the *first* (readiness) call to succeed, and the second to fail + mockehub.send_batch.side_effect = [None, ClientClosedError("test")] + return (mockehub, request.param) + + +async def test_ready_auth_failure(mockehub, managed_ehub): + mockehub.send_batch.side_effect = AuthenticationError("test") + assert not await managed_ehub.ready(mockehub) + assert mockehub.send_batch.call_count == 1 + + +async def test_ready_connect_failure(mockehub, managed_ehub): + mockehub.send_batch.side_effect = ConnectError("test") + assert not await managed_ehub.ready(mockehub) + assert mockehub.send_batch.call_count == 2 + + +async def test_managed_evhub_send_event(mockehub_throwing, managed_ehub): + _mockehub, should_throw = mockehub_throwing + expect_call_count = READY_MESSAGE_CALL_COUNT + 3 + if should_throw: + with pytest.raises(ClientClosedError): + await managed_ehub.send_event("test") + # Connection should be closed + assert managed_ehub.pool.ready_connection_count == 0 + expect_call_count += 1 # for the close + else: + await managed_ehub.send_event("test") + assert ( + len(get_mock_connection_from_pool(managed_ehub.pool).method_calls) + == expect_call_count + ) + + +async def test_managed_evhub_send_event_data(mockehub_throwing, managed_ehub): + _mockehub, should_throw = mockehub_throwing + data = EventData(body=b"test") + expect_call_count = READY_MESSAGE_CALL_COUNT + 3 + if should_throw: + with pytest.raises(ClientClosedError): + await managed_ehub.send_event_data(data) + # Connection should be closed + assert managed_ehub.pool.ready_connection_count == 0 + expect_call_count += 1 # for the close + else: + await managed_ehub.send_event_data(data) + assert ( + len(get_mock_connection_from_pool(managed_ehub.pool).method_calls) + == expect_call_count + ) + + +async def test_managed_evhub_send_event_batch(mockehub_throwing, managed_ehub): + _mockehub, should_throw = mockehub_throwing + expect_call_count = READY_MESSAGE_CALL_COUNT + 4 + + if should_throw: + with pytest.raises(ClientClosedError): + await managed_ehub.send_events_batch(["test1", "test2"]) + assert managed_ehub.pool.ready_connection_count == 0 + expect_call_count += 1 # for the close + + else: + await managed_ehub.send_events_batch(["test1", "test2"]) + assert ( + len(get_mock_connection_from_pool(managed_ehub.pool).method_calls) + == expect_call_count + ) + + +async def test_managed_evhub_send_events_data_batch(mockehub_throwing, managed_ehub): + batch = EventDataBatch() + batch.add(EventData(body=b"test1")) + batch.add(EventData(body=b"test2")) + _mockehub, should_throw = mockehub_throwing + expect_call_count = READY_MESSAGE_CALL_COUNT + 1 + if should_throw: + with pytest.raises(ClientClosedError): + await managed_ehub.send_events_data_batch(batch) + assert managed_ehub.pool.ready_connection_count == 0 + expect_call_count += 1 # for the close + else: + await managed_ehub.send_events_data_batch(batch) + assert ( + len(get_mock_connection_from_pool(managed_ehub.pool).method_calls) + == expect_call_count + ) diff --git a/uv.lock b/uv.lock index 8bf032e..5e5b238 100644 --- a/uv.lock +++ b/uv.lock @@ -3,7 +3,7 @@ requires-python = ">=3.11" [[package]] name = "aio-azure-clients-toolbox" -version = "0.1.0" +version = "0.2.0" source = { editable = "." } dependencies = [ { name = "aiofiles" }, @@ -496,7 +496,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/98/65/13d9e76ca19b0ba5603d71ac8424b5694415b348e719db277b5edc985ff5/cryptography-44.0.0-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:761817a3377ef15ac23cd7834715081791d4ec77f9297ee694ca1ee9c2c7e5eb", size = 3915420 }, { url = "https://files.pythonhosted.org/packages/b1/07/40fe09ce96b91fc9276a9ad272832ead0fddedcba87f1190372af8e3039c/cryptography-44.0.0-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:3c672a53c0fb4725a29c303be906d3c1fa99c32f58abe008a82705f9ee96f40b", size = 4154498 }, { url = "https://files.pythonhosted.org/packages/75/ea/af65619c800ec0a7e4034207aec543acdf248d9bffba0533342d1bd435e1/cryptography-44.0.0-cp37-abi3-manylinux_2_34_aarch64.whl", hash = "sha256:4ac4c9f37eba52cb6fbeaf5b59c152ea976726b865bd4cf87883a7e7006cc543", size = 3932569 }, - { url = "https://files.pythonhosted.org/packages/4e/d5/9cc182bf24c86f542129565976c21301d4ac397e74bf5a16e48241aab8a6/cryptography-44.0.0-cp37-abi3-manylinux_2_34_x86_64.whl", hash = "sha256:60eb32934076fa07e4316b7b2742fa52cbb190b42c2df2863dbc4230a0a9b385", size = 4164756 }, { url = "https://files.pythonhosted.org/packages/c7/af/d1deb0c04d59612e3d5e54203159e284d3e7a6921e565bb0eeb6269bdd8a/cryptography-44.0.0-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:ed3534eb1090483c96178fcb0f8893719d96d5274dfde98aa6add34614e97c8e", size = 4016721 }, { url = "https://files.pythonhosted.org/packages/bd/69/7ca326c55698d0688db867795134bdfac87136b80ef373aaa42b225d6dd5/cryptography-44.0.0-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:f3f6fdfa89ee2d9d496e2c087cebef9d4fcbb0ad63c40e821b39f74bf48d9c5e", size = 4240915 }, { url = "https://files.pythonhosted.org/packages/ef/d4/cae11bf68c0f981e0413906c6dd03ae7fa864347ed5fac40021df1ef467c/cryptography-44.0.0-cp37-abi3-win32.whl", hash = "sha256:eb33480f1bad5b78233b0ad3e1b0be21e8ef1da745d8d2aecbb20671658b9053", size = 2757925 }, @@ -507,7 +506,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d0/c7/c656eb08fd22255d21bc3129625ed9cd5ee305f33752ef2278711b3fa98b/cryptography-44.0.0-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:c5eb858beed7835e5ad1faba59e865109f3e52b3783b9ac21e7e47dc5554e289", size = 3915417 }, { url = "https://files.pythonhosted.org/packages/ef/82/72403624f197af0db6bac4e58153bc9ac0e6020e57234115db9596eee85d/cryptography-44.0.0-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:f53c2c87e0fb4b0c00fa9571082a057e37690a8f12233306161c8f4b819960b7", size = 4155160 }, { url = "https://files.pythonhosted.org/packages/a2/cd/2f3c440913d4329ade49b146d74f2e9766422e1732613f57097fea61f344/cryptography-44.0.0-cp39-abi3-manylinux_2_34_aarch64.whl", hash = "sha256:9e6fc8a08e116fb7c7dd1f040074c9d7b51d74a8ea40d4df2fc7aa08b76b9e6c", size = 3932331 }, - { url = "https://files.pythonhosted.org/packages/31/d9/90409720277f88eb3ab72f9a32bfa54acdd97e94225df699e7713e850bd4/cryptography-44.0.0-cp39-abi3-manylinux_2_34_x86_64.whl", hash = "sha256:9abcc2e083cbe8dde89124a47e5e53ec38751f0d7dfd36801008f316a127d7ba", size = 4165207 }, { url = "https://files.pythonhosted.org/packages/7f/df/8be88797f0a1cca6e255189a57bb49237402b1880d6e8721690c5603ac23/cryptography-44.0.0-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:d2436114e46b36d00f8b72ff57e598978b37399d2786fd39793c36c6d5cb1c64", size = 4017372 }, { url = "https://files.pythonhosted.org/packages/af/36/5ccc376f025a834e72b8e52e18746b927f34e4520487098e283a719c205e/cryptography-44.0.0-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:a01956ddfa0a6790d594f5b34fc1bfa6098aca434696a03cfdbe469b8ed79285", size = 4239657 }, { url = "https://files.pythonhosted.org/packages/46/b0/f4f7d0d0bcfbc8dd6296c1449be326d04217c57afb8b2594f017eed95533/cryptography-44.0.0-cp39-abi3-win32.whl", hash = "sha256:eca27345e1214d1b9f9490d200f9db5a874479be914199194e746c893788d417", size = 2758672 },