-
Notifications
You must be signed in to change notification settings - Fork 292
Commit
ARUHA-2199: Added open tracing;
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
package org.zalando.nakadi.filters; | ||
|
||
import com.google.common.net.HttpHeaders; | ||
import io.opentracing.util.GlobalTracer; | ||
import org.json.JSONObject; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
@@ -22,6 +23,7 @@ | |
import javax.servlet.http.HttpServletResponse; | ||
import java.io.IOException; | ||
import java.util.Optional; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
@Component | ||
public class LoggingFilter extends OncePerRequestFilter { | ||
|
@@ -31,7 +33,6 @@ public class LoggingFilter extends OncePerRequestFilter { | |
|
||
private final NakadiKpiPublisher nakadiKpiPublisher; | ||
private final String accessLogEventType; | ||
|
||
private final AuthorizationService authorizationService; | ||
|
||
@Autowired | ||
|
@@ -61,10 +62,8 @@ private RequestLogInfo(final HttpServletRequest request, final long requestTime) | |
this.method = request.getMethod(); | ||
this.path = request.getRequestURI(); | ||
this.query = Optional.ofNullable(request.getQueryString()).map(q -> "?" + q).orElse(""); | ||
this.contentEncoding = Optional.ofNullable(request.getHeader(HttpHeaders.CONTENT_ENCODING)) | ||
.orElse("-"); | ||
this.acceptEncoding = Optional.ofNullable(request.getHeader(HttpHeaders.ACCEPT_ENCODING)) | ||
.orElse("-"); | ||
this.contentEncoding = Optional.ofNullable(request.getHeader(HttpHeaders.CONTENT_ENCODING)).orElse("-"); | ||
this.acceptEncoding = Optional.ofNullable(request.getHeader(HttpHeaders.ACCEPT_ENCODING)).orElse("-"); | ||
this.contentLength = request.getContentLengthLong() == -1 ? 0 : request.getContentLengthLong(); | ||
this.requestTime = requestTime; | ||
} | ||
|
@@ -76,27 +75,17 @@ private class AsyncRequestListener implements AsyncListener { | |
private final RequestLogInfo requestLogInfo; | ||
|
||
private AsyncRequestListener(final HttpServletRequest request, final HttpServletResponse response, | ||
final long startTime, final String flowId) { | ||
final long startTime, final String flowId) { | ||
this.response = response; | ||
this.flowId = flowId; | ||
|
||
this.requestLogInfo = new RequestLogInfo(request, startTime); | ||
ACCESS_LOGGER.info("{} \"{}{}\" \"{}\" \"{}\" {} {}ms \"{}\" \"{}\" {}B", | ||
requestLogInfo.method, | ||
requestLogInfo.path, | ||
requestLogInfo.query, | ||
requestLogInfo.userAgent, | ||
requestLogInfo.user, | ||
HttpStatus.PROCESSING.value(), | ||
0, | ||
requestLogInfo.contentEncoding, | ||
requestLogInfo.acceptEncoding, | ||
requestLogInfo.contentLength); | ||
logToAccessLog(this.requestLogInfo, HttpStatus.PROCESSING.value(), 0L); | ||
} | ||
|
||
private void logOnEvent() { | ||
FlowIdUtils.push(this.flowId); | ||
writeToAccessLogAndEventType(this.requestLogInfo, this.response); | ||
logRequest(this.requestLogInfo, this.response.getStatus()); | ||
FlowIdUtils.clear(); | ||
} | ||
|
||
|
@@ -124,7 +113,7 @@ public void onStartAsync(final AsyncEvent event) { | |
@Override | ||
protected void doFilterInternal(final HttpServletRequest request, | ||
final HttpServletResponse response, final FilterChain filterChain) | ||
throws IOException, ServletException{ | ||
throws IOException, ServletException { | ||
final long start = System.currentTimeMillis(); | ||
try { | ||
//execute request | ||
|
@@ -134,35 +123,75 @@ protected void doFilterInternal(final HttpServletRequest request, | |
request.getAsyncContext().addListener(new AsyncRequestListener(request, response, start, flowId)); | ||
} | ||
} finally { | ||
if(!request.isAsyncStarted()) { | ||
if (!request.isAsyncStarted()) { | ||
final RequestLogInfo requestLogInfo = new RequestLogInfo(request, start); | ||
writeToAccessLogAndEventType(requestLogInfo, response); | ||
logRequest(requestLogInfo, response.getStatus()); | ||
} | ||
} | ||
} | ||
|
||
private void writeToAccessLogAndEventType(final RequestLogInfo requestLogInfo, final HttpServletResponse response) { | ||
final long currentTime = System.currentTimeMillis(); | ||
final Long timing = currentTime - requestLogInfo.requestTime; | ||
|
||
ACCESS_LOGGER.info("{} \"{}{}\" \"{}\" \"{}\" {} {}ms \"{}\" \"{}\" {}B", | ||
requestLogInfo.method, | ||
requestLogInfo.path, | ||
requestLogInfo.query, | ||
requestLogInfo.userAgent, | ||
requestLogInfo.user, | ||
response.getStatus(), | ||
timing, | ||
requestLogInfo.contentEncoding, | ||
requestLogInfo.acceptEncoding, | ||
requestLogInfo.contentLength); | ||
nakadiKpiPublisher.publish(accessLogEventType, () -> new JSONObject() | ||
.put("method", requestLogInfo.method) | ||
.put("path", requestLogInfo.path) | ||
.put("query", requestLogInfo.query) | ||
.put("app", requestLogInfo.user) | ||
.put("app_hashed", nakadiKpiPublisher.hash(requestLogInfo.user)) | ||
.put("status_code", response.getStatus()) | ||
.put("response_time_ms", timing)); | ||
private void logRequest(final RequestLogInfo requestLogInfo, final int statusCode) { | ||
final Long timeSpentMs = System.currentTimeMillis() - requestLogInfo.requestTime; | ||
|
||
logToAccessLog(requestLogInfo, statusCode, timeSpentMs); | ||
logToNakadi(requestLogInfo, statusCode, timeSpentMs); | ||
traceRequest(requestLogInfo, statusCode, timeSpentMs); | ||
} | ||
|
||
private void logToNakadi(final RequestLogInfo requestLogInfo, final int statusCode, final Long timeSpentMs) { | ||
nakadiKpiPublisher.publish(accessLogEventType, () -> new JSONObject() | ||
.put("method", requestLogInfo.method) | ||
.put("path", requestLogInfo.path) | ||
.put("query", requestLogInfo.query) | ||
.put("app", requestLogInfo.user) | ||
.put("app_hashed", nakadiKpiPublisher.hash(requestLogInfo.user)) | ||
.put("status_code", statusCode) | ||
.put("response_time_ms", timeSpentMs)); | ||
} | ||
|
||
private void logToAccessLog(final RequestLogInfo requestLogInfo, final int statusCode, final Long timeSpentMs) { | ||
ACCESS_LOGGER.info("{} \"{}{}\" \"{}\" \"{}\" {} {}ms \"{}\" \"{}\" {}B", | ||
requestLogInfo.method, | ||
requestLogInfo.path, | ||
requestLogInfo.query, | ||
requestLogInfo.userAgent, | ||
requestLogInfo.user, | ||
statusCode, | ||
timeSpentMs, | ||
requestLogInfo.contentEncoding, | ||
requestLogInfo.acceptEncoding, | ||
requestLogInfo.contentLength); | ||
} | ||
|
||
private void traceRequest(final RequestLogInfo requestLogInfo, final int statusCode, final Long timeSpentMs) { | ||
if (requestLogInfo.path != null && "POST".equals(requestLogInfo.method) && | ||
requestLogInfo.path.startsWith("/event-types/") && requestLogInfo.path.contains("/events")) { | ||
|
||
final String eventType = requestLogInfo.path.substring("/event-types/".length(), | ||
requestLogInfo.path.lastIndexOf("/events")); | ||
|
||
String sloBucket = "5K-50K"; | ||
if (requestLogInfo.contentLength < 5000) { | ||
sloBucket = "<5K"; | ||
} else if (requestLogInfo.contentLength > 50000) { | ||
sloBucket = ">50K"; | ||
} | ||
|
||
GlobalTracer.get() | ||
.buildSpan("publish_events") | ||
.withStartTimestamp(TimeUnit.MILLISECONDS.toMicros(requestLogInfo.requestTime)) | ||
.start() | ||
.setTag("client_id", requestLogInfo.user) | ||
.setTag("event_type", eventType) | ||
.setTag("error", statusCode == 207 || statusCode >= 500) | ||
.setTag("http.status_code", statusCode) | ||
.setTag("http.url", requestLogInfo.path + requestLogInfo.query) | ||
.setTag("http.header.content_encoding", requestLogInfo.contentEncoding) | ||
.setTag("http.header.accept_encoding", requestLogInfo.acceptEncoding) | ||
.setTag("http.header.user_agent", requestLogInfo.userAgent) | ||
.setTag("slo_bucket", sloBucket) | ||
.setTag("content_length", requestLogInfo.contentLength) | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
v-stepanov
Author
Contributor
|
||
.finish(TimeUnit.MILLISECONDS.toMicros(requestLogInfo.requestTime + timeSpentMs)); | ||
} | ||
} | ||
} |
content_length
would probably be a better fit for a span log