diff --git a/src/instana/instrumentation/aioamqp.py b/src/instana/instrumentation/aioamqp.py index 00931158..01efdc4a 100644 --- a/src/instana/instrumentation/aioamqp.py +++ b/src/instana/instrumentation/aioamqp.py @@ -26,11 +26,29 @@ async def basic_publish_with_instana( "aioamqp-publisher", span_context=parent_context ) as span: try: - span.set_attribute("aioamqp.exchange", argv[0]) - return await wrapped(*argv, **kwargs) + span.set_attribute("amqp.command", "publish") + span.set_attribute("amqp.routing_key", kwargs.get("routing_key")) + + protocol = getattr(instance, "protocol", None) + transport = getattr(protocol, "_transport", None) + extra = getattr(transport, "_extra", {}) if transport else {} + peername = extra.get("peername") + if ( + peername + and isinstance(peername, (list, tuple)) + and len(peername) >= 2 + ): + connection_info = f"{peername[0]}:{peername[1]}" + else: + connection_info = "unknown" + span.set_attribute("amqp.connection", connection_info) + + response = await wrapped(*argv, **kwargs) except Exception as exc: span.record_exception(exc) logger.debug(f"aioamqp basic_publish_with_instana error: {exc}") + else: + return response @wrapt.patch_function_wrapper("aioamqp.channel", "Channel.basic_consume") async def basic_consume_with_instana( @@ -58,14 +76,29 @@ async def callback_wrapper( ) as span: try: span.set_status(StatusCode.OK) - span.set_attribute("aioamqp.callback", callback) - span.set_attribute("aioamqp.message", args[1]) - span.set_attribute("aioamqp.exchange_name", args[2].exchange_name) - span.set_attribute("aioamqp.routing_key", args[2].routing_key) - return await wrapped_callback(*args, **kwargs) + span.set_attribute("amqp.command", "consume") + span.set_attribute("amqp.routing_key", args[2].routing_key) + + protocol = getattr(args[0], "protocol", None) + transport = getattr(protocol, "_transport", None) + extra = getattr(transport, "_extra", {}) if transport else {} + peername = extra.get("peername") + if ( + peername + and isinstance(peername, (list, tuple)) + and len(peername) >= 2 + ): + connection_info = f"{peername[0]}:{peername[1]}" + else: + connection_info = "unknown" + span.set_attribute("amqp.connection", connection_info) + + response = await wrapped_callback(*args, **kwargs) except Exception as exc: span.record_exception(exc) logger.debug(f"aioamqp basic_consume_with_instana error: {exc}") + else: + return response wrapped_callback = callback_wrapper(callback) argv = (wrapped_callback,) + argv[1:] diff --git a/src/instana/span/kind.py b/src/instana/span/kind.py index b93fa207..c86f8b97 100644 --- a/src/instana/span/kind.py +++ b/src/instana/span/kind.py @@ -21,6 +21,7 @@ ) ENTRY_SPANS = ( + "aioamqp-consumer", "aiohttp-server", "aws.lambda.entry", "celery-worker", @@ -35,6 +36,7 @@ ) EXIT_SPANS = ( + "aioamqp-publisher", "aiohttp-client", "boto3", "cassandra", diff --git a/src/instana/span/registered_span.py b/src/instana/span/registered_span.py index 852cf8bd..68557c1f 100644 --- a/src/instana/span/registered_span.py +++ b/src/instana/span/registered_span.py @@ -54,6 +54,10 @@ def __init__( if "kafka" in span.name: self.n = "kafka" + # unify the span name for aioamqp-producer and aioamqp-consumer + if "amqp" in span.name: + self.n = "amqp" + # Logic to store custom attributes for registered spans (not used yet) if len(span.attributes) > 0: self.data["sdk"]["custom"]["tags"] = self._validate_attributes( @@ -64,6 +68,16 @@ def _populate_entry_span_data(self, span: "InstanaSpan") -> None: if span.name in HTTP_SPANS: self._collect_http_attributes(span) + elif span.name == "aioamqp-consumer": + self.data["amqp"]["command"] = span.attributes.pop("amqp.command", None) + self.data["amqp"]["routingkey"] = span.attributes.pop( + "amqp.routing_key", None + ) + self.data["amqp"]["connection"] = span.attributes.pop( + "amqp.connection", None + ) + self.data["amqp"]["error"] = span.attributes.pop("amqp.error", None) + elif span.name == "aws.lambda.entry": self.data["lambda"]["arn"] = span.attributes.pop("lambda.arn", "Unknown") self.data["lambda"]["alias"] = None @@ -167,6 +181,16 @@ def _populate_exit_span_data(self, span: "InstanaSpan") -> None: if span.name in HTTP_SPANS: self._collect_http_attributes(span) + elif span.name == "aioamqp-publisher": + self.data["amqp"]["command"] = span.attributes.pop("amqp.command", None) + self.data["amqp"]["routingkey"] = span.attributes.pop( + "amqp.routing_key", None + ) + self.data["amqp"]["connection"] = span.attributes.pop( + "amqp.connection", None + ) + self.data["amqp"]["error"] = span.attributes.pop("amqp.error", None) + elif span.name == "boto3": # boto3 also sends http attributes self._collect_http_attributes(span) diff --git a/tests/frameworks/test_aioamqp.py b/tests/clients/test_aioamqp.py similarity index 78% rename from tests/frameworks/test_aioamqp.py rename to tests/clients/test_aioamqp.py index aa2deb78..7afa04b9 100644 --- a/tests/frameworks/test_aioamqp.py +++ b/tests/clients/test_aioamqp.py @@ -27,7 +27,7 @@ def _resource(self) -> Generator[None, None, None]: self.loop.close() async def delete_queue(self) -> None: - transport, protocol = await aioamqp.connect( + _, protocol = await aioamqp.connect( testenv["rabbitmq_host"], testenv["rabbitmq_port"], ) @@ -79,8 +79,11 @@ def test_basic_publish(self) -> None: publisher_span = spans[0] test_span = spans[1] - assert publisher_span.n == "sdk" - assert publisher_span.data["sdk"]["name"] == "aioamqp-publisher" + assert publisher_span.n == "amqp" + assert publisher_span.data["amqp"]["command"] == "publish" + assert publisher_span.data["amqp"]["routingkey"] == "message_queue" + assert publisher_span.data["amqp"]["connection"] == "127.0.0.1:5672" + assert publisher_span.p == test_span.s assert test_span.n == "sdk" @@ -100,31 +103,25 @@ def test_basic_consumer(self) -> None: consumer_span = spans[2] test_span = spans[3] - assert publisher_span.n == "sdk" - assert publisher_span.data["sdk"]["name"] == "aioamqp-publisher" + assert publisher_span.n == "amqp" + assert publisher_span.data["amqp"]["command"] == "publish" + assert publisher_span.data["amqp"]["routingkey"] == "message_queue" + assert publisher_span.data["amqp"]["connection"] == "127.0.0.1:5672" assert publisher_span.p == test_span.s - assert ( - publisher_span.data["sdk"]["custom"]["tags"]["aioamqp.exchange"] - == "b'Instana test message'" - ) assert callback_span.n == "sdk" assert callback_span.data["sdk"]["name"] == "callback-span" assert callback_span.data["sdk"]["type"] == "intermediate" assert callback_span.p == consumer_span.s - assert consumer_span.n == "sdk" - assert consumer_span.data["sdk"]["name"] == "aioamqp-consumer" - assert consumer_span.data["sdk"]["custom"]["tags"]["aioamqp.callback"] - assert ( - consumer_span.data["sdk"]["custom"]["tags"]["aioamqp.message"] - == "b'Instana test message'" - ) + assert consumer_span.n == "amqp" + assert consumer_span.data["amqp"]["command"] == "consume" + assert consumer_span.data["amqp"]["routingkey"] == "message_queue" + assert consumer_span.data["amqp"]["connection"] == "127.0.0.1:5672" assert ( - consumer_span.data["sdk"]["custom"]["tags"]["aioamqp.routing_key"] - == "message_queue" + consumer_span.data["amqp"]["connection"] + == publisher_span.data["amqp"]["connection"] ) - assert not consumer_span.data["sdk"]["custom"]["tags"]["exchange_name"] assert consumer_span.p == test_span.s assert test_span.n == "sdk"