Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Span context propagation from Consumer to Producer #59

Open
adurai81 opened this issue Oct 17, 2019 · 13 comments
Open

Span context propagation from Consumer to Producer #59

adurai81 opened this issue Oct 17, 2019 · 13 comments

Comments

@adurai81
Copy link

I've producer-1 (SignalsProducer) which is consumed by consumer-1 (SignalsConsumer). This consumer then enriches the record and then it becomes producer-2 (EnrichedLeadProducer) which is then consumed by consumer-2 (EnrichedLeadConsumer).

These leads to 2 different traces (each of which having 2 spans) looking like this -
image

Sample spans within the first trace (similarly the 2nd trace) -
image

Expectation: In this case there should be only 1 trace will 4 spans.

Question : How do I propagate the span context from Consumer-1 to Producer-2 ?

Reviewed issue - #49

The above issue seems to be very similar to my ask (may be slightly different). Is there an example of how the context could be propagated from consumer to producer ?

@adurai81
Copy link
Author

Here is my code -

Consumer 1

public static void subscribe() {
    final KafkaConsumer<String, String>
            consumer =
            new KafkaConsumer(consumerrConfig);

    BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider =
            (operationName, consumerRecord) -> "SignalsConsumer";

    TracingKafkaConsumer<String, String> tracingKafkaConsumer = new TracingKafkaConsumer<String, String>(consumer, tracer(), consumerSpanNameProvider);

    tracingKafkaConsumer.subscribe(Arrays.asList(listenTopic));

    while (true) {
        ConsumerRecords<String, String> records = tracingKafkaConsumer.poll(100);
        for (final ConsumerRecord<String, String> record : records) {

            String leadJson = record.value();

            Lead lead = convertJsonToLead(leadJson);

            System.out.printf("CustomerEmail: %s;     LeadName: %s;     CustomerId: %s;     Record-Key: %s;     Record-Partition: %d;     Record-offset: %d\n",
                    lead.getCustomerEmail(), lead.getLeadName(), lead.getCustomerId(),
                    record.key(), record.partition(), record.offset());

            publish(lead);  // This calls Producer 2
        }
    }
}

Producer 2

private static void publish(Lead lead)  {

    BiFunction<String, ProducerRecord, String> producerSpanNameProvider =
            (operationName, producerRecord) -> "EnrichedLeadProducer";

    Producer<String, String> producer = createProducer();
    TracingKafkaProducer<String, String> tracingKafkaProducer = new TracingKafkaProducer<String, String>(producer, tracer(), producerSpanNameProvider);

    lead = enrichLead(lead);

    String leadJsonStr = convertLeadToJsonStr(lead);

    ProducerRecord<String, String> record = new ProducerRecord<String, String>(publishTopic,
            lead.getLeadName(), leadJsonStr);

    tracingKafkaProducer.send(record);

    tracingKafkaProducer.close();
}

@adurai81
Copy link
Author

@burkaa01 @thall any thoughts on this please ?

@malafeev
Copy link
Contributor

malafeev commented Oct 21, 2019

@adurai81 in Consumer you can get SpanContext from ConsumerRecord:

SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);

Then

  1. you can use this spanContext as a parent for new span which should be active when you publish.

  2. Another solution is to create a new method in producer: send(record, spanContext) to use spanContext as parent of producer span

@adurai81
Copy link
Author

adurai81 commented Oct 21, 2019

Thanks @malafeev for the response.

For option-1:
I tried this -

        Tracer tracer =  tracer();
        tracer.buildSpan("EnrichedLeadProducer").asChildOf(consumerSpanContext).start();
        TracingKafkaProducer<String, String> tracingKafkaProducer = new TracingKafkaProducer<String, String>(producer, tracer, producerSpanNameProvider);

        lead = enrichLead(lead);

        String leadJsonStr = convertLeadToJsonStr(lead);

        ProducerRecord<String, String> record = new ProducerRecord<String, String>(publishTopic,
                lead.getLeadName(), leadJsonStr);

        tracingKafkaProducer.send(record);

        tracingKafkaProducer.close();

the value for consumerSpanContext is fetched from -
SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer());

For option-2:
Are you suggesting a possible change to overload send API in TracingKafkaProducer with this signature ?
public Future<RecordMetadata> send(ProducerRecord<K, V> record, SpanContext span) {

@adurai81
Copy link
Author

Thanks for the idea @malafeev

I could inject the consumer's SpanContext into the 2nd producer with this.

tracer.inject(consumerSpanContext, Format.Builtin.TEXT_MAP, new LeadHeadersMapInjectAdapter(record.headers()));

@adurai81
Copy link
Author

@malafeev Should inject in TracingKafkaUtils be exposed public so that consumers don't have to work-around this it interms of injecting the parent SpanContext ?

https://github.com/opentracing-contrib/java-kafka-client/blob/master/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaUtils.java#L52

@yurishkuro
Copy link

@adurai81 this example has the multi-hop-through-Kafka scenario: https://github.com/PacktPublishing/Mastering-Distributed-Tracing/tree/master/Chapter05

@malafeev
Copy link
Contributor

@adurai81
yes, I propose to overload send API in TracingKafkaProducer:

public Future<RecordMetadata> send(ProducerRecord<K, V> record, SpanContext parentSpanContext) {

So provided parentSpanContext will be used as parent for producer span.

I don't think inject in TracingKafkaUtils should be public.

@adurai81
Copy link
Author

Thanks @yurishkuro I'll take a look at the example.

@malafeev , yes it would be of help to provide overload. How do I take it forward. Will this issue be used to provide the overloaded api ?

@malafeev
Copy link
Contributor

@adurai81
let's use this issue for that

@malafeev
Copy link
Contributor

done in #62

@adurai81
Copy link
Author

adurai81 commented Nov 4, 2019

Thanks @malafeev , I'll test this out and let you know.

@JapuDCret
Copy link

@adurai81 have you tested it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants