Skip to content

Commit b42b50a

Browse files
authored
Merge branch 'main' into chore/otel-updates-1.24.0
2 parents 2cc8fd0 + b7c7891 commit b42b50a

File tree

4 files changed

+34
-64
lines changed

4 files changed

+34
-64
lines changed

src/accountingservice/kafka/trace_interceptor.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ type OTelInterceptor struct {
2424
// headers with the span data.
2525
func NewOTelInterceptor(groupID string) *OTelInterceptor {
2626
oi := OTelInterceptor{}
27-
oi.tracer = otel.Tracer("github.com/open-telemetry/opentelemetry-demo/accountingservice/sarama")
27+
oi.tracer = otel.Tracer("accountingservice")
2828

2929
oi.fixedAttrs = []attribute.KeyValue{
3030
semconv.MessagingSystemKafka,

src/checkoutservice/kafka/producer.go

-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ func CreateKafkaProducer(brokers []string, log *logrus.Logger) (sarama.AsyncProd
1717
saramaConfig.Version = ProtocolVersion
1818
// So we can know the partition and offset of messages.
1919
saramaConfig.Producer.Return.Successes = true
20-
saramaConfig.Producer.Interceptors = []sarama.ProducerInterceptor{NewOTelInterceptor()}
2120

2221
producer, err := sarama.NewAsyncProducer(brokers, saramaConfig)
2322
if err != nil {

src/checkoutservice/kafka/trace_interceptor.go

-60
This file was deleted.

src/checkoutservice/main.go

+33-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"encoding/json"
99
"fmt"
10+
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
1011
"net"
1112
"net/http"
1213
"os"
@@ -310,7 +311,7 @@ func (cs *checkoutService) PlaceOrder(ctx context.Context, req *pb.PlaceOrderReq
310311

311312
// send to kafka only if kafka broker address is set
312313
if cs.kafkaBrokerSvcAddr != "" {
313-
cs.sendToPostProcessor(orderResult)
314+
cs.sendToPostProcessor(ctx, orderResult)
314315
}
315316

316317
resp := &pb.PlaceOrderResponse{Order: orderResult}
@@ -473,7 +474,7 @@ func (cs *checkoutService) shipOrder(ctx context.Context, address *pb.Address, i
473474
return resp.GetTrackingId(), nil
474475
}
475476

476-
func (cs *checkoutService) sendToPostProcessor(result *pb.OrderResult) {
477+
func (cs *checkoutService) sendToPostProcessor(ctx context.Context, result *pb.OrderResult) {
477478
message, err := proto.Marshal(result)
478479
if err != nil {
479480
log.Errorf("Failed to marshal message to protobuf: %+v", err)
@@ -485,7 +486,37 @@ func (cs *checkoutService) sendToPostProcessor(result *pb.OrderResult) {
485486
Value: sarama.ByteEncoder(message),
486487
}
487488

489+
// Inject tracing info into message
490+
span := createProducerSpan(ctx, &msg)
491+
defer span.End()
492+
488493
cs.KafkaProducerClient.Input() <- &msg
489494
successMsg := <-cs.KafkaProducerClient.Successes()
490495
log.Infof("Successful to write message. offset: %v", successMsg.Offset)
491496
}
497+
498+
func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace.Span {
499+
spanContext, span := tracer.Start(
500+
ctx,
501+
fmt.Sprintf("%s publish", msg.Topic),
502+
trace.WithSpanKind(trace.SpanKindProducer),
503+
trace.WithAttributes(
504+
semconv.PeerService("kafka"),
505+
semconv.NetworkTransportTCP,
506+
semconv.MessagingSystemKafka,
507+
semconv.MessagingDestinationName(msg.Topic),
508+
semconv.MessagingOperationPublish,
509+
semconv.MessagingKafkaDestinationPartition(int(msg.Partition)),
510+
),
511+
)
512+
513+
carrier := propagation.MapCarrier{}
514+
propagator := otel.GetTextMapPropagator()
515+
propagator.Inject(spanContext, carrier)
516+
517+
for key, value := range carrier {
518+
msg.Headers = append(msg.Headers, sarama.RecordHeader{Key: []byte(key), Value: []byte(value)})
519+
}
520+
521+
return span
522+
}

0 commit comments

Comments
 (0)