Skip to content

Commit 9db7f6b

Browse files
committed
fix: updated amqp instrumentation, adapted attributes and unittests
Signed-off-by: Cagri Yonca <[email protected]>
1 parent e25f501 commit 9db7f6b

File tree

4 files changed

+82
-26
lines changed

4 files changed

+82
-26
lines changed

src/instana/instrumentation/aioamqp.py

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,29 @@ async def basic_publish_with_instana(
2626
"aioamqp-publisher", span_context=parent_context
2727
) as span:
2828
try:
29-
span.set_attribute("aioamqp.exchange", argv[0])
30-
return await wrapped(*argv, **kwargs)
29+
span.set_attribute("amqp.command", "publish")
30+
span.set_attribute("amqp.routing_key", kwargs.get("routing_key"))
31+
32+
protocol = getattr(instance, "protocol", None)
33+
transport = getattr(protocol, "_transport", None)
34+
extra = getattr(transport, "_extra", {}) if transport else {}
35+
peername = extra.get("peername")
36+
if (
37+
peername
38+
and isinstance(peername, (list, tuple))
39+
and len(peername) >= 2
40+
):
41+
connection_info = f"{peername[0]}:{peername[1]}"
42+
else:
43+
connection_info = "unknown"
44+
span.set_attribute("amqp.connection", connection_info)
45+
46+
response = await wrapped(*argv, **kwargs)
3147
except Exception as exc:
3248
span.record_exception(exc)
3349
logger.debug(f"aioamqp basic_publish_with_instana error: {exc}")
50+
else:
51+
return response
3452

3553
@wrapt.patch_function_wrapper("aioamqp.channel", "Channel.basic_consume")
3654
async def basic_consume_with_instana(
@@ -58,14 +76,29 @@ async def callback_wrapper(
5876
) as span:
5977
try:
6078
span.set_status(StatusCode.OK)
61-
span.set_attribute("aioamqp.callback", callback)
62-
span.set_attribute("aioamqp.message", args[1])
63-
span.set_attribute("aioamqp.exchange_name", args[2].exchange_name)
64-
span.set_attribute("aioamqp.routing_key", args[2].routing_key)
65-
return await wrapped_callback(*args, **kwargs)
79+
span.set_attribute("amqp.command", "consume")
80+
span.set_attribute("amqp.routing_key", args[2].routing_key)
81+
82+
protocol = getattr(args[0], "protocol", None)
83+
transport = getattr(protocol, "_transport", None)
84+
extra = getattr(transport, "_extra", {}) if transport else {}
85+
peername = extra.get("peername")
86+
if (
87+
peername
88+
and isinstance(peername, (list, tuple))
89+
and len(peername) >= 2
90+
):
91+
connection_info = f"{peername[0]}:{peername[1]}"
92+
else:
93+
connection_info = "unknown"
94+
span.set_attribute("amqp.connection", connection_info)
95+
96+
response = await wrapped_callback(*args, **kwargs)
6697
except Exception as exc:
6798
span.record_exception(exc)
6899
logger.debug(f"aioamqp basic_consume_with_instana error: {exc}")
100+
else:
101+
return response
69102

70103
wrapped_callback = callback_wrapper(callback)
71104
argv = (wrapped_callback,) + argv[1:]

src/instana/span/kind.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
)
2222

2323
ENTRY_SPANS = (
24+
"aioamqp-consumer",
2425
"aiohttp-server",
2526
"aws.lambda.entry",
2627
"celery-worker",
@@ -35,6 +36,7 @@
3536
)
3637

3738
EXIT_SPANS = (
39+
"aioamqp-publisher",
3840
"aiohttp-client",
3941
"boto3",
4042
"cassandra",

src/instana/span/registered_span.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ def __init__(
5454
if "kafka" in span.name:
5555
self.n = "kafka"
5656

57+
# unify the span name for aioamqp-producer and aioamqp-consumer
58+
if "amqp" in span.name:
59+
self.n = "amqp"
60+
5761
# Logic to store custom attributes for registered spans (not used yet)
5862
if len(span.attributes) > 0:
5963
self.data["sdk"]["custom"]["tags"] = self._validate_attributes(
@@ -64,6 +68,16 @@ def _populate_entry_span_data(self, span: "InstanaSpan") -> None:
6468
if span.name in HTTP_SPANS:
6569
self._collect_http_attributes(span)
6670

71+
elif span.name == "aioamqp-consumer":
72+
self.data["amqp"]["command"] = span.attributes.pop("amqp.command", None)
73+
self.data["amqp"]["routingkey"] = span.attributes.pop(
74+
"amqp.routing_key", None
75+
)
76+
self.data["amqp"]["connection"] = span.attributes.pop(
77+
"amqp.connection", None
78+
)
79+
self.data["amqp"]["error"] = span.attributes.pop("amqp.error", None)
80+
6781
elif span.name == "aws.lambda.entry":
6882
self.data["lambda"]["arn"] = span.attributes.pop("lambda.arn", "Unknown")
6983
self.data["lambda"]["alias"] = None
@@ -167,6 +181,16 @@ def _populate_exit_span_data(self, span: "InstanaSpan") -> None:
167181
if span.name in HTTP_SPANS:
168182
self._collect_http_attributes(span)
169183

184+
elif span.name == "aioamqp-publisher":
185+
self.data["amqp"]["command"] = span.attributes.pop("amqp.command", None)
186+
self.data["amqp"]["routingkey"] = span.attributes.pop(
187+
"amqp.routing_key", None
188+
)
189+
self.data["amqp"]["connection"] = span.attributes.pop(
190+
"amqp.connection", None
191+
)
192+
self.data["amqp"]["error"] = span.attributes.pop("amqp.error", None)
193+
170194
elif span.name == "boto3":
171195
# boto3 also sends http attributes
172196
self._collect_http_attributes(span)

tests/frameworks/test_aioamqp.py renamed to tests/clients/test_aioamqp.py

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def _resource(self) -> Generator[None, None, None]:
2727
self.loop.close()
2828

2929
async def delete_queue(self) -> None:
30-
transport, protocol = await aioamqp.connect(
30+
_, protocol = await aioamqp.connect(
3131
testenv["rabbitmq_host"],
3232
testenv["rabbitmq_port"],
3333
)
@@ -79,8 +79,11 @@ def test_basic_publish(self) -> None:
7979
publisher_span = spans[0]
8080
test_span = spans[1]
8181

82-
assert publisher_span.n == "sdk"
83-
assert publisher_span.data["sdk"]["name"] == "aioamqp-publisher"
82+
assert publisher_span.n == "amqp"
83+
assert publisher_span.data["amqp"]["command"] == "publish"
84+
assert publisher_span.data["amqp"]["routingkey"] == "message_queue"
85+
assert publisher_span.data["amqp"]["connection"] == "127.0.0.1:5672"
86+
8487
assert publisher_span.p == test_span.s
8588

8689
assert test_span.n == "sdk"
@@ -100,31 +103,25 @@ def test_basic_consumer(self) -> None:
100103
consumer_span = spans[2]
101104
test_span = spans[3]
102105

103-
assert publisher_span.n == "sdk"
104-
assert publisher_span.data["sdk"]["name"] == "aioamqp-publisher"
106+
assert publisher_span.n == "amqp"
107+
assert publisher_span.data["amqp"]["command"] == "publish"
108+
assert publisher_span.data["amqp"]["routingkey"] == "message_queue"
109+
assert publisher_span.data["amqp"]["connection"] == "127.0.0.1:5672"
105110
assert publisher_span.p == test_span.s
106-
assert (
107-
publisher_span.data["sdk"]["custom"]["tags"]["aioamqp.exchange"]
108-
== "b'Instana test message'"
109-
)
110111

111112
assert callback_span.n == "sdk"
112113
assert callback_span.data["sdk"]["name"] == "callback-span"
113114
assert callback_span.data["sdk"]["type"] == "intermediate"
114115
assert callback_span.p == consumer_span.s
115116

116-
assert consumer_span.n == "sdk"
117-
assert consumer_span.data["sdk"]["name"] == "aioamqp-consumer"
118-
assert consumer_span.data["sdk"]["custom"]["tags"]["aioamqp.callback"]
119-
assert (
120-
consumer_span.data["sdk"]["custom"]["tags"]["aioamqp.message"]
121-
== "b'Instana test message'"
122-
)
117+
assert consumer_span.n == "amqp"
118+
assert consumer_span.data["amqp"]["command"] == "consume"
119+
assert consumer_span.data["amqp"]["routingkey"] == "message_queue"
120+
assert consumer_span.data["amqp"]["connection"] == "127.0.0.1:5672"
123121
assert (
124-
consumer_span.data["sdk"]["custom"]["tags"]["aioamqp.routing_key"]
125-
== "message_queue"
122+
consumer_span.data["amqp"]["connection"]
123+
== publisher_span.data["amqp"]["connection"]
126124
)
127-
assert not consumer_span.data["sdk"]["custom"]["tags"]["exchange_name"]
128125
assert consumer_span.p == test_span.s
129126

130127
assert test_span.n == "sdk"

0 commit comments

Comments
 (0)