Skip to content

Commit

Permalink
opentelemetry spans removed
Browse files Browse the repository at this point in the history
  • Loading branch information
surabhi-mahawar committed Jan 17, 2022
1 parent 97877eb commit d800e91
Showing 1 changed file with 29 additions and 23 deletions.
52 changes: 29 additions & 23 deletions src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class ReactiveConsumer {
public XMessageRepository xMessageRepository;

@Autowired
public SimpleProducer1 kafkaProducer;
public SimpleProducer kafkaProducer;

@Autowired
public ReactiveProducer reactiveProducer;
Expand Down Expand Up @@ -130,61 +130,44 @@ public void accept(ConsumerRecord<String, String> stringMessage) {
logTimeTaken(startTime, 0);
XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.value().getBytes()));

Context extracted = GlobalOpenTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), stringMessage.headers(), null);
log.info("extracted: "+extracted);
// extracted.getCurrentSpan();
Span rootSpan = tracer.spanBuilder("orchestrator-processMessage").startSpan();
try (Scope scope = rootSpan.makeCurrent()) {
Context currentContext = Context.current();
try {
SenderReceiverInfo from = msg.getFrom();
Span childSpan1 = createChildSpan("fetchAdapterID", currentContext, rootSpan);
logTimeTaken(startTime, 1);
fetchAdapterID(msg.getApp()).doOnNext(new Consumer<String>() {
@Override
public void accept(String adapterID) {
childSpan1.end();
Span childSpan2 = createChildSpan("resolveUserNew", currentContext, rootSpan);
logTimeTaken(startTime, 2);
from.setCampaignID(msg.getApp());
from.setDeviceType(DeviceType.PHONE);
resolveUserNew(msg).doOnNext(new Consumer<XMessage>() {
@Override
public void accept(XMessage msg) {
childSpan2.end();
Span childSpan3 = createChildSpan("getLastMessageID", currentContext, rootSpan);
SenderReceiverInfo from = msg.getFrom();
// msg.setFrom(from);
getLastMessageID(msg).doOnNext(lastMessageID -> {
childSpan3.end();
Span childSpan4 = createChildSpan("sendMessageToKafka", currentContext,
rootSpan);
logTimeTaken(startTime, 3);
msg.setLastMessageID(lastMessageID);
msg.setAdapterId(adapterID);
if (msg.getMessageState().equals(XMessage.MessageState.REPLIED)
|| msg.getMessageState().equals(XMessage.MessageState.OPTED_IN)) {
try {
kafkaProducer.send(odkTransformerTopic, msg.toXML());
childSpan4.end();
rootSpan.end();
// reactiveProducer.sendMessages(odkTransformerTopic,
// msg.toXML());
} catch (JAXBException e) {
e.printStackTrace();
}
logTimeTaken(startTime, 15);
}
}).doOnError(genericError("getLastMessageID", childSpan3)).subscribe();
}).doOnError(genericError("getLastMessageID")).subscribe();
}
}).doOnError(genericError("resolveUserNew", childSpan2)).subscribe();
}).doOnError(genericError("resolveUserNew")).subscribe();

}
}).doOnError(genericError("fetchAdapterID", childSpan1)).subscribe();
}).doOnError(genericError("fetchAdapterID")).subscribe();

} catch (Throwable e) {
genericException(e.getMessage(), rootSpan);
} finally {
// rootSpan.end();
genericException(e.getMessage());
}
} catch (Exception e) {
e.printStackTrace();
Expand Down Expand Up @@ -221,6 +204,16 @@ private void genericException(String eMsg, Span span) {
span.end();
}
}

/**
* Log Exceptions
*
* @param eMsg
*/
private void genericException(String eMsg) {
eMsg = "Exception: " + eMsg;
log.error(eMsg);
}

/**
* Log Exception & if span exists, add error to span
Expand All @@ -240,6 +233,19 @@ private Consumer<Throwable> genericError(String s, Span span) {
}
};
}

/**
* Log Exception
*
* @param s
* @return
*/
private Consumer<Throwable> genericError(String s) {
return c -> {
String msg = "Error in " + s + "::" + c.getMessage();
log.error(msg);
};
}

private Mono<XMessage> resolveUserNew(XMessage xmsg) {
try {
Expand Down

0 comments on commit d800e91

Please sign in to comment.