Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #578 from zalando/aruha-592-subscription-metrics
Browse files Browse the repository at this point in the history
ARUHA-592: faceted metrics
  • Loading branch information
adyach authored Mar 6, 2017
2 parents d0863a7 + 64be482 commit 6b2e65b
Show file tree
Hide file tree
Showing 17 changed files with 385 additions and 190 deletions.
87 changes: 87 additions & 0 deletions src/main/java/org/zalando/nakadi/config/MetricsConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package org.zalando.nakadi.config;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.jvm.BufferPoolMetricSet;
import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
import com.codahale.metrics.servlets.MetricsServlet;
import com.ryantenney.metrics.spring.config.annotation.EnableMetrics;
import com.ryantenney.metrics.spring.config.annotation.MetricsConfigurerAdapter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.embedded.ServletRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.lang.management.ManagementFactory;

@Configuration
@EnableMetrics
public class MetricsConfig {
@Bean
public ServletRegistrationBean servletRegistrationBean(final MetricRegistry metricRegistry) {
return new ServletRegistrationBean(new MetricsServlet(metricRegistry), "/metrics/*");
}

class SubscriptionMetricsServlet extends MetricsServlet {
public SubscriptionMetricsServlet(final MetricRegistry metricRegistry) {
super(metricRegistry);
}
}

class StreamMetricsServlet extends MetricsServlet {
public StreamMetricsServlet(final MetricRegistry metricRegistry) {
super(metricRegistry);
}
}

@Bean
public ServletRegistrationBean subscriptionsServletRegistrationBean(
@Qualifier("perPathMetricRegistry") final MetricRegistry metricRegistry) {
return new ServletRegistrationBean(new SubscriptionMetricsServlet(metricRegistry), "/request-metrics/*");
}

@Bean
public ServletRegistrationBean streamMetricsServletRegistrationBean(
@Qualifier("streamMetricsRegistry") final MetricRegistry metricRegistry) {
return new ServletRegistrationBean(new StreamMetricsServlet(metricRegistry), "/stream-metrics/*");
}

@Bean
public MetricsConfigurerAdapter metricsConfigurerAdapter(final MetricRegistry metricRegistry) {
return new MetricsConfigurerAdapter() {
@Override
public MetricRegistry getMetricRegistry() {
return metricRegistry;
}
};
}

@Bean
@Qualifier("perPathMetricRegistry")
public MetricRegistry perPathMetricRegistry() {
final MetricRegistry metricRegistry = new MetricRegistry();

return metricRegistry;
}

@Bean
@Qualifier("streamMetricsRegistry")
public MetricRegistry streamMetricRegistry() {
final MetricRegistry metricRegistry = new MetricRegistry();

return metricRegistry;
}

@Bean
public MetricRegistry metricRegistry() {
final MetricRegistry metricRegistry = new MetricRegistry();

metricRegistry.register("jvm.gc", new GarbageCollectorMetricSet());
metricRegistry.register("jvm.buffers", new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer()));
metricRegistry.register("jvm.memory", new MemoryUsageGaugeSet());
metricRegistry.register("jvm.threads", new ThreadStatesGaugeSet());

return metricRegistry;
}
}
39 changes: 0 additions & 39 deletions src/main/java/org/zalando/nakadi/config/NakadiConfig.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,9 @@
package org.zalando.nakadi.config;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.jvm.BufferPoolMetricSet;
import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
import com.codahale.metrics.servlets.MetricsServlet;
import com.ryantenney.metrics.spring.config.annotation.EnableMetrics;
import com.ryantenney.metrics.spring.config.annotation.MetricsConfigurerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.embedded.ServletRegistrationBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -27,10 +18,7 @@
import org.zalando.nakadi.repository.zookeeper.ZooKeeperLockFactory;
import org.zalando.nakadi.service.subscription.zk.ZkSubscriptionClientFactory;

import java.lang.management.ManagementFactory;

@Configuration
@EnableMetrics
@EnableScheduling
public class NakadiConfig {

Expand All @@ -41,21 +29,6 @@ public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor();
}

@Bean
public ServletRegistrationBean servletRegistrationBean(final MetricRegistry metricRegistry) {
return new ServletRegistrationBean(new MetricsServlet(metricRegistry), "/metrics/*");
}

@Bean
public MetricsConfigurerAdapter metricsConfigurerAdapter(final MetricRegistry metricRegistry) {
return new MetricsConfigurerAdapter() {
@Override
public MetricRegistry getMetricRegistry() {
return metricRegistry;
}
};
}

@Bean
public ZooKeeperLockFactory zooKeeperLockFactory(final ZooKeeperHolder zooKeeperHolder) {
return new ZooKeeperLockFactory(zooKeeperHolder);
Expand All @@ -66,18 +39,6 @@ public ZkSubscriptionClientFactory zkSubscriptionClientFactory(final ZooKeeperHo
return new ZkSubscriptionClientFactory(zooKeeperHolder);
}

@Bean
public MetricRegistry metricRegistry() {
final MetricRegistry metricRegistry = new MetricRegistry();

metricRegistry.register("jvm.gc", new GarbageCollectorMetricSet());
metricRegistry.register("jvm.buffers", new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer()));
metricRegistry.register("jvm.memory", new MemoryUsageGaugeSet());
metricRegistry.register("jvm.threads", new ThreadStatesGaugeSet());

return metricRegistry;
}

@Bean
public SystemProperties systemProperties(final ApplicationContext context) {
return name -> context.getEnvironment().getProperty(name);
Expand Down
13 changes: 3 additions & 10 deletions src/main/java/org/zalando/nakadi/config/WebConfig.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
package org.zalando.nakadi.config;

import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.zalando.nakadi.metrics.MonitoringRequestFilter;
import org.zalando.nakadi.security.ClientResolver;
import org.zalando.nakadi.util.FlowIdRequestFilter;
import org.zalando.nakadi.util.GzipBodyRequestFilter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.embedded.FilterRegistrationBean;
Expand All @@ -23,6 +18,9 @@
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerMapping;
import org.zalando.nakadi.security.ClientResolver;
import org.zalando.nakadi.util.FlowIdRequestFilter;
import org.zalando.nakadi.util.GzipBodyRequestFilter;

import javax.servlet.Filter;
import java.util.List;
Expand Down Expand Up @@ -61,11 +59,6 @@ public FilterRegistrationBean gzipBodyRequestFilter(final ObjectMapper mapper) {
new GzipBodyRequestFilter(mapper), Ordered.HIGHEST_PRECEDENCE + 2);
}

@Bean
public FilterRegistrationBean monitoringRequestFilter(final MetricRegistry metricRegistry) {
return createFilterRegistrationBean(new MonitoringRequestFilter(metricRegistry), Ordered.HIGHEST_PRECEDENCE);
}

@Bean
public MappingJackson2HttpMessageConverter mappingJackson2HttpMessageConverter() {
final MappingJackson2HttpMessageConverter converter = new MappingJackson2HttpMessageConverter();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.zalando.nakadi.controller;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -9,6 +10,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestHeader;
Expand All @@ -28,6 +30,7 @@
import org.zalando.nakadi.exceptions.NoSuchEventTypeException;
import org.zalando.nakadi.exceptions.ServiceUnavailableException;
import org.zalando.nakadi.exceptions.UnparseableCursorException;
import org.zalando.nakadi.metrics.MetricUtils;
import org.zalando.nakadi.repository.EventConsumer;
import org.zalando.nakadi.repository.EventTypeRepository;
import org.zalando.nakadi.repository.TopicRepository;
Expand Down Expand Up @@ -82,13 +85,15 @@ public class EventStreamController {
private final ConsumerLimitingService consumerLimitingService;
private final FeatureToggleService featureToggleService;
private final CursorConverter cursorConverter;
private final MetricRegistry streamMetrics;

@Autowired
public EventStreamController(final EventTypeRepository eventTypeRepository,
final TimelineService timelineService,
final ObjectMapper jsonMapper,
final EventStreamFactory eventStreamFactory,
final MetricRegistry metricRegistry,
@Qualifier("streamMetricsRegistry") final MetricRegistry streamMetrics,
final ClosedConnectionsCrutch closedConnectionsCrutch,
final BlacklistService blacklistService,
final ConsumerLimitingService consumerLimitingService,
Expand All @@ -99,6 +104,7 @@ public EventStreamController(final EventTypeRepository eventTypeRepository,
this.jsonMapper = jsonMapper;
this.eventStreamFactory = eventStreamFactory;
this.metricRegistry = metricRegistry;
this.streamMetrics = streamMetrics;
this.closedConnectionsCrutch = closedConnectionsCrutch;
this.blacklistService = blacklistService;
this.consumerLimitingService = consumerLimitingService;
Expand Down Expand Up @@ -183,7 +189,6 @@ public StreamingResponseBody streamEvents(
final AtomicBoolean connectionReady = closedConnectionsCrutch.listenForConnectionClose(request);
Counter consumerCounter = null;
EventStream eventStream = null;

List<ConnectionSlot> connectionSlots = ImmutableList.of();

try {
Expand Down Expand Up @@ -231,8 +236,16 @@ public StreamingResponseBody streamEvents(
final EventConsumer eventConsumer = topicRepository.createEventConsumer(
kafkaQuotaClientId,
streamConfig.getCursors());

final String bytesFlushedMetricName = MetricUtils.metricNameForLoLAStream(
client.getClientId(),
eventTypeName);

final Meter bytesFlushedMeter = this.streamMetrics.meter(bytesFlushedMetricName);

eventStream = eventStreamFactory.createEventStream(
outputStream, eventConsumer, streamConfig, blacklistService, cursorConverter);
outputStream, eventConsumer, streamConfig, blacklistService, cursorConverter,
bytesFlushedMeter);

outputStream.flush(); // Flush status code to client

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package org.zalando.nakadi.controller;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
Expand All @@ -14,8 +17,8 @@
import org.zalando.nakadi.config.NakadiSettings;
import org.zalando.nakadi.exceptions.NakadiException;
import org.zalando.nakadi.security.Client;
import org.zalando.nakadi.service.ClosedConnectionsCrutch;
import org.zalando.nakadi.service.BlacklistService;
import org.zalando.nakadi.service.ClosedConnectionsCrutch;
import org.zalando.nakadi.service.subscription.StreamParameters;
import org.zalando.nakadi.service.subscription.SubscriptionOutput;
import org.zalando.nakadi.service.subscription.SubscriptionStreamer;
Expand All @@ -31,10 +34,12 @@
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.zalando.nakadi.metrics.MetricUtils.metricNameForSubscription;
import static org.zalando.nakadi.util.FeatureToggleService.Feature.HIGH_LEVEL_API;

@RestController
public class SubscriptionStreamController {
public static final String CONSUMERS_COUNT_METRIC_NAME = "consumers";
private static final Logger LOG = LoggerFactory.getLogger(SubscriptionStreamController.class);

private final SubscriptionStreamerFactory subscriptionStreamerFactory;
Expand All @@ -43,20 +48,23 @@ public class SubscriptionStreamController {
private final ClosedConnectionsCrutch closedConnectionsCrutch;
private final NakadiSettings nakadiSettings;
private final BlacklistService blacklistService;
private final MetricRegistry metricRegistry;

@Autowired
public SubscriptionStreamController(final SubscriptionStreamerFactory subscriptionStreamerFactory,
final FeatureToggleService featureToggleService,
final ObjectMapper objectMapper,
final ClosedConnectionsCrutch closedConnectionsCrutch,
final NakadiSettings nakadiSettings,
final BlacklistService blacklistService) {
final BlacklistService blacklistService,
@Qualifier("perPathMetricRegistry") final MetricRegistry metricRegistry) {
this.subscriptionStreamerFactory = subscriptionStreamerFactory;
this.featureToggleService = featureToggleService;
this.jsonMapper = objectMapper;
this.closedConnectionsCrutch = closedConnectionsCrutch;
this.nakadiSettings = nakadiSettings;
this.blacklistService = blacklistService;
this.metricRegistry = metricRegistry;
}

private class SubscriptionOutputImpl implements SubscriptionOutput {
Expand Down Expand Up @@ -124,12 +132,15 @@ public StreamingResponseBody streamEvents(
throws IOException {

return outputStream -> {

if (!featureToggleService.isFeatureEnabled(HIGH_LEVEL_API)) {
response.setStatus(HttpServletResponse.SC_NOT_IMPLEMENTED);
return;
}

final String metricName = metricNameForSubscription(subscriptionId, CONSUMERS_COUNT_METRIC_NAME);
final Counter consumerCounter = metricRegistry.counter(metricName);
consumerCounter.inc();

final AtomicBoolean connectionReady = closedConnectionsCrutch.listenForConnectionClose(request);

SubscriptionStreamer streamer = null;
Expand All @@ -153,6 +164,7 @@ public StreamingResponseBody streamEvents(
} catch (final Exception e) {
output.onException(e);
} finally {
consumerCounter.dec();
outputStream.close();
}
};
Expand Down
29 changes: 27 additions & 2 deletions src/main/java/org/zalando/nakadi/metrics/MetricUtils.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,37 @@
package org.zalando.nakadi.metrics;

import com.codahale.metrics.MetricRegistry;

public class MetricUtils {

public static final String NAKADI_PREFIX = "nakadi.";
public static final String EVENTTYPES_PREFIX = NAKADI_PREFIX + "eventtypes.";
public static final String EVENTTYPES_PREFIX = NAKADI_PREFIX + "eventtypes";
public static final String SUBSCRIPTION_PREFIX = NAKADI_PREFIX + "subscriptions";
private static final String LOW_LEVEL_STREAM = "lola";
private static final String HIGH_LEVEL_STREAM = "hila";
private static final String BYTES_FLUSHED = "bytes-flushed";

public static String metricNameFor(final String eventTypeName, final String metricName) {
return EVENTTYPES_PREFIX + eventTypeName.replace('.', '#') + "." + metricName;
return MetricRegistry.name(EVENTTYPES_PREFIX, eventTypeName.replace('.', '#'), metricName);
}

public static String metricNameForSubscription(final String subscriptionId, final String metricName) {
return MetricRegistry.name(SUBSCRIPTION_PREFIX, subscriptionId, metricName);
}

public static String metricNameForLoLAStream(final String applicationId, final String eventTypeName) {
return MetricRegistry.name(
LOW_LEVEL_STREAM,
applicationId.replace(".", "#"),
eventTypeName.replace(".", "#"),
BYTES_FLUSHED);
}

public static String metricNameForHiLAStream(final String applicationId, final String subscriptionId) {
return MetricRegistry.name(
HIGH_LEVEL_STREAM,
applicationId.replace(".", "#"),
subscriptionId,
BYTES_FLUSHED);
}
}
Loading

0 comments on commit 6b2e65b

Please sign in to comment.