Skip to content

Commit

Permalink
BREAK: Introduce otel middleware and require kstream >= 0.17
Browse files Browse the repository at this point in the history
  • Loading branch information
woile committed Oct 1, 2024
1 parent 36e6ec9 commit 6184458
Show file tree
Hide file tree
Showing 10 changed files with 350 additions and 235 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
Version: `0.3.0`

> [!IMPORTANT]
> This instrumentation works only with [ksterams middlewares](https://kpn.github.io/kstreams/middleware/) after `v0.17.0`
## Installation

```sh
Expand Down
52 changes: 26 additions & 26 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ python = "^3.8"
opentelemetry-api = "^1.27.0"
opentelemetry-instrumentation = "^0.48b0"
opentelemetry-semantic-conventions = "^0.48b0"
kstreams = { version = ">=0.12.0", optional = true }
kstreams = { version = ">=0.17.0", optional = true }

[tool.poetry.group.dev.dependencies]
ruff = "^0.6"
Expand Down
13 changes: 9 additions & 4 deletions src/opentelemetry_instrumentation_kstreams/instrumentor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
from opentelemetry.instrumentation.utils import unwrap
from wrapt import wrap_function_wrapper
from .version import __version__
from .utils import (
from .wrappers import (
_wrap_send,
_wrap_getone,
# _wrap_getone,
_wrap_build_stream_middleware_stack,
)

from .package import _instruments
Expand Down Expand Up @@ -37,8 +38,12 @@ def _instrument(self, **kwargs: Any):
schema_url="https://opentelemetry.io/schemas/1.11.0",
)
wrap_function_wrapper(StreamEngine, "send", _wrap_send(tracer))
wrap_function_wrapper(Stream, "getone", _wrap_getone(tracer))
wrap_function_wrapper(
StreamEngine,
"build_stream_middleware_stack",
_wrap_build_stream_middleware_stack(tracer),
)

def _uninstrument(self, **kwargs: Any):
unwrap(StreamEngine, "send")
unwrap(Stream, "getone")
unwrap(Stream, "build_stream_middleware_stack")
112 changes: 112 additions & 0 deletions src/opentelemetry_instrumentation_kstreams/middlewares.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from typing import Any, Optional

from kstreams import (
ConsumerRecord,
middleware,
)
from kstreams.backends.kafka import Kafka
from opentelemetry import context, propagate, trace
from opentelemetry.context.context import Context

# Enable after 0.49 is released
# from opentelemetry.semconv._incubating.attributes import messaging_attributes as SpanAttributes
from opentelemetry.trace import SpanKind, Tracer

from .utils import (
KStreamsKafkaExtractor,
_enrich_base_span,
_enrich_span_with_record_info,
_get_span_name,
_kstreams_getter,
)


class OpenTelemetryMiddleware(middleware.BaseMiddleware):
"""
Middleware for integrating OpenTelemetry tracing with Kafka Streams.
This middleware extracts tracing information from Kafka consumer records and
creates spans for tracing the processing of these records.
Attributes:
tracer: The OpenTelemetry tracer instance used for creating spans.
Methods:
__call__(cr: ConsumerRecord) -> Any:
Asynchronously processes a Kafka consumer record, creating and enriching
an OpenTelemetry span with tracing information.
"""

def __init__(self, *, tracer: Optional[Tracer] = None, **kwargs) -> None:
super().__init__(**kwargs)
if tracer is None:
tracer = trace.get_tracer(__name__)

# The current tracer instance
self.tracer = tracer

# Initialize variables computed once which are injected into the span
if not isinstance(self.stream.backend, Kafka):
raise NotImplementedError("Only Kafka backend is supported for now")
self.bootstrap_servers = KStreamsKafkaExtractor.extract_bootstrap_servers(
self.stream.backend
)
self.consumer_group = KStreamsKafkaExtractor.extract_consumer_group(
self.stream.consumer
)
self.client_id = KStreamsKafkaExtractor.extract_consumer_client_id(self.stream)

async def __call__(self, cr: ConsumerRecord) -> Any:
"""
Asynchronously processes a ConsumerRecord by creating and managing a span.
Args:
cr (ConsumerRecord): The consumer record to be processed.
Returns:
Any: The result of the next call in the processing chain.
This method performs the following steps:
1. Extracts the context from the record headers.
2. Starts a new span with the extracted context.
3. Enriches the span with base and record-specific information.
4. Optionally sets the consumer group attribute (currently commented out).
5. Calls the next processing function in the chain.
6. Detaches the context token.
"""
tracer = self.tracer
record = cr
bootstrap_servers = self.bootstrap_servers
client_id = self.client_id
span_name = _get_span_name("receive", record.topic)
extracted_context: Context = propagate.extract(
record.headers, getter=_kstreams_getter
)

with tracer.start_as_current_span(
span_name,
context=extracted_context,
end_on_exit=True,
kind=SpanKind.CONSUMER,
) as span:
new_context = trace.set_span_in_context(span, extracted_context)
context_token = context.attach(new_context)

_enrich_base_span(
span,
bootstrap_servers,
record.topic,
client_id,
)
_enrich_span_with_record_info(
span, record.topic, record.partition, record.offset
)

# TODO: enable after 0.49 is released
# if self.consumer_group is not None:
# span.set_attribute(
# SpanAttributes.MESSAGING_CONSUMER_GROUP_NAME, self.consumer_group
# )

await self.next_call(cr)
context.detach(context_token)
2 changes: 1 addition & 1 deletion src/opentelemetry_instrumentation_kstreams/package.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
_instruments = ("kstreams >= 0.13.0",)
_instruments = ("kstreams >= 0.17.0",)
Loading

0 comments on commit 6184458

Please sign in to comment.