Skip to content

Commit

Permalink
Pull out confusing stuff with logger and try/excepts in eventhub (#1)
Browse files Browse the repository at this point in the history
* Add method to ConnectionPool to allow callers to expire a particular connection

* Expire unconnected Eventhub clients when send methods fail

* Stop swallowing send exception failures

* Improve test coverage for eventhub module to 99%
  • Loading branch information
erewok authored Jan 3, 2025
1 parent 99e5313 commit 55e09e4
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 98 deletions.
156 changes: 64 additions & 92 deletions aio_azure_clients_toolbox/clients/eventhub.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@

from azure.eventhub import EventData, EventDataBatch, TransportType
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub.exceptions import EventHubError
from azure.eventhub.exceptions import (
AuthenticationError,
ClientClosedError,
ConnectError,
ConnectionLostError,
EventHubError,
)
from azure.identity.aio import DefaultAzureCredential

from aio_azure_clients_toolbox import connection_pooling
Expand Down Expand Up @@ -39,7 +45,7 @@ def __init__(
)
self._client: EventHubProducerClient | None = self.get_client()

def get_client(self):
def get_client(self) -> EventHubProducerClient:
return EventHubProducerClient(
fully_qualified_namespace=self.evhub_namespace,
eventhub_name=self.evhub_name,
Expand All @@ -48,7 +54,7 @@ def get_client(self):
)

@property
def client(self):
def client(self) -> EventHubProducerClient:
if self._client is None:
self._client = self.get_client()
return self._client
Expand All @@ -62,8 +68,7 @@ async def send_event_data(
self,
event: EventData,
partition_key: str | None = None,
log=None,
):
) -> EventDataBatch:
"""
Send a *single* EventHub event which is already encoded as `EventData`.
Expand All @@ -79,24 +84,16 @@ async def send_event_data(
# Add events to the batch.
event_data_batch.add(event)

try:
# Send the batch of events to the event hub.
await self.client.send_batch(event_data_batch)
except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand.
if log is not None:
log.error("Eventhub Size of the event data list exceeds the size limit of a single send")
except EventHubError as eh_err:
if log is not None:
log.error("Eventhub Sending error: ", eh_err)
# Send the batch of events to the event hub.
await self.client.send_batch(event_data_batch)

return event_data_batch

async def send_event(
self,
event: bytes | str,
partition_key: str | None = None,
log=None,
):
) -> EventDataBatch:
"""
Send a *single* EventHub event. See `send_events_batch` for
sending multiple events
Expand All @@ -113,24 +110,16 @@ async def send_event(
# Add events to the batch.
event_data_batch.add(EventData(event))

try:
# Send the batch of events to the event hub.
await self.client.send_batch(event_data_batch)
except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand.
if log is not None:
log.error("Eventhub Size of the event data list exceeds the size limit of a single send")
except EventHubError as eh_err:
if log is not None:
log.error("Eventhub Sending error: ", eh_err)
# Send the batch of events to the event hub.
await self.client.send_batch(event_data_batch)

return event_data_batch

async def send_events_batch(
self,
events_list: list[bytes | str],
partition_key: str | None = None,
log=None,
):
) -> EventDataBatch:
"""
Sending events in a batch is more performant than sending individual events.
Expand All @@ -147,33 +136,20 @@ async def send_events_batch(
for event in events_list:
event_data_batch.add(EventData(event))

try:
# Send the batch of events to the event hub.
await self.client.send_batch(event_data_batch)
except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand.
if log is not None:
log.error("Eventhub Size of the event data list exceeds the size limit of a single send")
except EventHubError as eh_err:
if log is not None:
log.error("Eventhub Sending error: ", eh_err)
# Send the batch of events to the event hub.
await self.client.send_batch(event_data_batch)
return event_data_batch

async def send_events_data_batch(
self,
event_data_batch: EventDataBatch,
log=None,
):
) -> EventDataBatch:
"""
Sending events in a batch is more performant than sending individual events.
"""
try:
# Send the batch of events to the event hub.
await self.client.send_batch(event_data_batch)
except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand.
if log is not None:
log.error("Eventhub Size of the event data list exceeds the size limit of a single send")
except EventHubError as eh_err:
if log is not None:
log.error("Eventhub Sending error: ", eh_err)
# Send the batch of events to the event hub.
await self.client.send_batch(event_data_batch)
return event_data_batch


class ManagedAzureEventhubProducer(connection_pooling.AbstractorConnector):
Expand Down Expand Up @@ -219,9 +195,6 @@ def __init__(
)
self.ready_message = ready_message

def __getattr__(self, key):
return getattr

async def create(self):
"""Creates a new connection for our pool"""
client = Eventhub(
Expand Down Expand Up @@ -249,6 +222,10 @@ async def ready(self, conn: EventHubProducerClient) -> bool:
# Send the batch of events to the event hub.
await conn.send_batch(event_data_batch)
return True
except AuthenticationError:
logger.warning("Eventhub readiness check failed due to authentication error. Cancelling.")
logger.error(f"{traceback.format_exc()}")
return False
except EventHubError:
logger.warning(f"Eventhub readiness check #{3 - attempts} failed; trying again.")
logger.error(f"{traceback.format_exc()}")
Expand All @@ -262,8 +239,7 @@ async def send_event_data(
self,
event: EventData,
partition_key: str | None = None,
log=None,
):
) -> EventDataBatch:
"""
Send a *single* EventHub event which is already encoded as `EventData`.
Expand All @@ -280,26 +256,24 @@ async def send_event_data(
# Add events to the batch.
event_data_batch.add(event)

logger.debug("Sending eventhub batch")
# Send the batch of events to the event hub.
try:
logger.debug("Sending eventhub batch")
# Send the batch of events to the event hub.
await conn.send_batch(event_data_batch)
except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand.
if log is not None:
log.error("Eventhub Size of the event data list exceeds the size limit of a single send")
except EventHubError as eh_err:
if log is not None:
log.error("Eventhub Sending error: ", eh_err)

return event_data_batch
return event_data_batch
except (AuthenticationError, ClientClosedError, ConnectionLostError, ConnectError):
logger.error(f"Error sending event: {event}")
logger.error(f"{traceback.format_exc()}")
# Mark this connection closed so it won't be reused
await self.pool.expire_conn(conn)
raise

@connection_pooling.send_time_deco(logger, "Eventhub.send_event")
async def send_event(
self,
event: bytes | str,
partition_key: str | None = None,
log=None,
):
) -> EventDataBatch:
"""
Send a *single* EventHub event. See `send_events_batch` for
sending multiple events
Expand All @@ -317,26 +291,25 @@ async def send_event(
# Add events to the batch.
event_data_batch.add(EventData(event))

logger.debug("Sending eventhub batch")

try:
logger.debug("Sending eventhub batch")
# Send the batch of events to the event hub.
await conn.send_batch(event_data_batch)
except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand.
if log is not None:
log.error("Eventhub Size of the event data list exceeds the size limit of a single send")
except EventHubError as eh_err:
if log is not None:
log.error("Eventhub Sending error: ", eh_err)

except (AuthenticationError, ClientClosedError, ConnectionLostError, ConnectError):
logger.error(f"Error sending event: {event}")
logger.error(f"{traceback.format_exc()}")
# Mark this connection closed so it won't be reused
await self.pool.expire_conn(conn)
raise
return event_data_batch

@connection_pooling.send_time_deco(logger, "Eventhub.send_events_batch")
async def send_events_batch(
self,
events_list: list[bytes | str],
partition_key: str | None = None,
log=None,
):
) -> EventDataBatch:
"""
Sending events in a batch is more performant than sending individual events.
Expand All @@ -354,34 +327,33 @@ async def send_events_batch(
for event in events_list:
event_data_batch.add(EventData(event))

logger.debug("Sending eventhub batch")
# Send the batch of events to the event hub.
try:
logger.debug("Sending eventhub batch")
# Send the batch of events to the event hub.
await conn.send_batch(event_data_batch)
except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand.
if log is not None:
log.error("Eventhub Size of the event data list exceeds the size limit of a single send")
except EventHubError as eh_err:
if log is not None:
log.error("Eventhub Sending error: ", eh_err)
return event_data_batch
except (AuthenticationError, ClientClosedError, ConnectionLostError, ConnectError):
logger.error(f"Error sending event: {traceback.format_exc()}")
# Mark this connection closed so it won't be reused
await self.pool.expire_conn(conn)
raise

@connection_pooling.send_time_deco(logger, "Eventhub.send_events_data_batch")
async def send_events_data_batch(
self,
event_data_batch: EventDataBatch,
log=None,
):
) -> EventDataBatch:
"""
Sending events in a batch is more performant than sending individual events.
"""
async with self.pool.get() as conn:
logger.debug("Sending eventhub batch")
# Send the batch of events to the event hub.
try:
logger.debug("Sending eventhub batch")
# Send the batch of events to the event hub.
await conn.send_batch(event_data_batch)
except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand.
if log is not None:
log.error("Eventhub Size of the event data list exceeds the size limit of a single send")
except EventHubError as eh_err:
if log is not None:
log.error("Eventhub Sending error: ", eh_err)
return event_data_batch
except (AuthenticationError, ClientClosedError, ConnectionLostError, ConnectError):
logger.error(f"Error sending batch {traceback.format_exc()}")
# Mark this connection closed so it won't be reused
await self.pool.expire_conn(conn)
raise
27 changes: 25 additions & 2 deletions aio_azure_clients_toolbox/connection_pooling.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,14 @@ def __init__(
self._id = (binascii.hexlify(os.urandom(3))).decode()
self._ready = anyio.Event()

def __bool__(self):
if self._connection is None:
return False
return self.is_ready and not self.expired

def eq(self, connection: AbstractConnection) -> bool:
return self._connection is connection

@property
def available(self):
"""Check if connection exists and client usage limit has been reached"""
Expand Down Expand Up @@ -436,8 +444,6 @@ def __init__(
self.max_size = max_size
if self.max_size < 1:
raise ValueError("max_size must a postive integer")
# Number of available connections
self.connections: int = 0
self.connector = connector

# A pool is just a heap of connection-managing things
Expand Down Expand Up @@ -497,3 +503,20 @@ async def closeall(self) -> None:
async with create_task_group() as tg:
for conn in self._pool:
tg.start_soon(conn.close)

@property
def ready_connection_count(self):
return sum(1 for conn in self._pool if conn)

async def expire_conn(self, connection: AbstractConnection) -> None:
"""
Expire a connection.
Because we yield AbstractConnections while our pool is SharedTransportConnections,
we need to give clients a way to look up a connection and expire it directly.
"""
for conn in self._pool:
if conn.eq(connection):
await conn.close()
break
heapq.heapify(self._pool)
return None
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "aio-azure-clients-toolbox"
version = "0.1.0"
version = "0.2.0"
description = "Async Azure Clients Mulligan Python projects"
authors = [
{ "name" = "Erik Aker", "email" = "[email protected]" },
Expand Down
Loading

0 comments on commit 55e09e4

Please sign in to comment.