diff --git a/Dockerfile b/Dockerfile index c8d09ecc..6f0f1155 100644 --- a/Dockerfile +++ b/Dockerfile @@ -32,9 +32,9 @@ RUN ./gradlew distTar && \ tar xf build/distributions/*.tar && \ rm build/distributions/*.tar -FROM confluentinc/cp-base:3.3.1 +FROM confluentinc/cp-base:4.1.0 -MAINTAINER Nivethika M , Joris Borgdorff +MAINTAINER Nivethika M , Joris Borgdorff , Yatharth Ranjan LABEL description="RADAR-CNS Backend streams and monitor" diff --git a/build.gradle b/build.gradle index 72af8a08..4947956d 100644 --- a/build.gradle +++ b/build.gradle @@ -11,7 +11,8 @@ plugins { //---------------------------------------------------------------------------// group = 'org.radarcns' -version = '0.2.3' +version = '0.3.0' + ext.description = 'Kafka backend for processing device data.' mainClassName = 'org.radarcns.RadarBackend' @@ -24,9 +25,9 @@ sourceCompatibility = '1.8' ext.boundaryVersion = '1.0.6' ext.codacyVersion = '1.0.10' -ext.confluentVersion = '3.3.1' +ext.confluentVersion = '4.1.0' ext.hamcrestVersion = '1.3' -ext.kafkaVersion = '0.11.0.2' +ext.kafkaVersion = '1.1.0' ext.jacksonVersion='2.8.5' ext.javaMailVersion = '1.5.6' ext.junitVersion = '4.12' diff --git a/gradle/codacy.gradle b/gradle/codacy.gradle index 15da8846..34cd9905 100644 --- a/gradle/codacy.gradle +++ b/gradle/codacy.gradle @@ -8,17 +8,21 @@ configurations { codacy } +jacoco { + toolVersion = "0.8.1" +} + dependencies { - codacy group: 'com.github.codacy', name: 'codacy-coverage-reporter', version: '2.0.1' + codacy group: 'com.github.codacy', name: 'codacy-coverage-reporter', version: '4.0.1' } jacocoTestReport { - executionData test, integrationTest reports { xml.enabled true csv.enabled false html.enabled true } + executionData test, integrationTest } task sendCoverageToCodacy(type: JavaExec, dependsOn: jacocoTestReport) { diff --git a/gradle/test.gradle b/gradle/test.gradle index fc44a733..25e1ee78 100644 --- a/gradle/test.gradle +++ b/gradle/test.gradle @@ -34,7 +34,7 @@ dependencies { integrationTestImplementation group: 'org.slf4j', name: 'slf4j-log4j12', version: slf4jVersion // For Topic name validation based on Kafka classes - testImplementation (group: 'org.apache.kafka', name: 'kafka_2.11', version: kafkaVersion) { + testCompile (group: 'org.apache.kafka', name: 'kafka_2.11', version: kafkaVersion) { exclude group: 'org.apache.kafka', module: 'kafka-clients' exclude group: 'net.sf.jopt-simple' exclude group: 'com.yammer.metrics' diff --git a/radar.yml b/radar.yml index 045b9a61..6d14b1fa 100644 --- a/radar.yml +++ b/radar.yml @@ -19,7 +19,6 @@ broker: #Kafka internal parameters stream_properties: - auto_commit_interval_ms: 1000 max.request.size: 3500042 #Set message.max.bytes for kafka brokers higher than or equal to this value retries: 15 session_timeout_ms: 20000 diff --git a/src/integrationTest/docker/docker-compose.yml b/src/integrationTest/docker/docker-compose.yml index 90836073..a2d66ea2 100644 --- a/src/integrationTest/docker/docker-compose.yml +++ b/src/integrationTest/docker/docker-compose.yml @@ -1,12 +1,17 @@ --- version: '2' +networks: + kafka: + driver: bridge services: #---------------------------------------------------------------------------# # Zookeeper Cluster # #---------------------------------------------------------------------------# zookeeper-1: - image: confluentinc/cp-zookeeper:3.3.1 + image: confluentinc/cp-zookeeper:4.1.0 + networks: + - kafka environment: ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_CLIENT_PORT: 2181 @@ -19,9 +24,11 @@ services: # Kafka Cluster # #---------------------------------------------------------------------------# kafka-1: - image: confluentinc/cp-kafka:3.3.1 + image: confluentinc/cp-kafka:4.1.0 depends_on: - zookeeper-1 + networks: + - kafka environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181 @@ -33,9 +40,11 @@ services: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 kafka-2: - image: confluentinc/cp-kafka:3.3.1 + image: confluentinc/cp-kafka:4.1.0 depends_on: - zookeeper-1 + networks: + - kafka environment: KAFKA_BROKER_ID: 2 KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181 @@ -47,9 +56,11 @@ services: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 kafka-3: - image: confluentinc/cp-kafka:3.3.1 + image: confluentinc/cp-kafka:4.1.0 depends_on: - zookeeper-1 + networks: + - kafka environment: KAFKA_BROKER_ID: 3 KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181 @@ -64,12 +75,14 @@ services: # Schema Registry # #---------------------------------------------------------------------------# schema-registry-1: - image: confluentinc/cp-schema-registry:3.3.1 + image: confluentinc/cp-schema-registry:4.1.0 depends_on: - zookeeper-1 - kafka-1 - kafka-2 - kafka-3 + networks: + - kafka restart: always ports: - "8081:8081" @@ -83,12 +96,14 @@ services: # REST proxy # #---------------------------------------------------------------------------# rest-proxy-1: - image: confluentinc/cp-kafka-rest:3.3.1 + image: confluentinc/cp-kafka-rest:4.1.0 depends_on: - kafka-1 - kafka-2 - kafka-3 - schema-registry-1 + networks: + - kafka ports: - "8082:8082" environment: @@ -109,8 +124,9 @@ services: depends_on: - kafka-1 - schema-registry-1 - command: - - integrationTest + networks: + - kafka + command: integrationTest volumes: - ../../../build/jacoco:/code/build/jacoco - ../../../build/reports:/code/build/reports diff --git a/src/main/java/org/radarcns/monitor/SourceStatisticsMonitor.java b/src/main/java/org/radarcns/monitor/SourceStatisticsMonitor.java index 41ecefb7..a6ff6988 100644 --- a/src/main/java/org/radarcns/monitor/SourceStatisticsMonitor.java +++ b/src/main/java/org/radarcns/monitor/SourceStatisticsMonitor.java @@ -60,7 +60,7 @@ public SourceStatisticsMonitor(RadarPropertyHandler radar, SourceStatisticsMonitorConfig config) { super(radar, config.getTopics(), Objects.requireNonNull(config.getName(), "Source statistics monitor must have a name"), "1-" - + config.getOutputTopic(), + + config.getOutputTopic() + UUID.randomUUID(), new SourceStatisticsState()); if (getStateStore() == null) { diff --git a/src/main/java/org/radarcns/stream/GeneralStreamGroup.java b/src/main/java/org/radarcns/stream/GeneralStreamGroup.java index 7e1a2c21..8d320347 100644 --- a/src/main/java/org/radarcns/stream/GeneralStreamGroup.java +++ b/src/main/java/org/radarcns/stream/GeneralStreamGroup.java @@ -111,7 +111,7 @@ protected Collection createWindowedSensorStream(String input, Collection streams = Arrays.stream(TimeWindowMetadata.values()) .map(w -> new StreamDefinition( new KafkaTopic(input), new KafkaTopic(w.getTopicLabel(outputBase)), - w.getIntervalInMilliSec())) + w.getIntervalInMilliSec(), getCommitIntervalForTimeWindow(w))) .collect(Collectors.toList()); topicNames.addAll(streams.stream() @@ -135,6 +135,25 @@ public void addTopicNames(Collection topicNames) { this.topicNames.addAll(topicNames); } + public long getCommitIntervalForTimeWindow(TimeWindowMetadata metadata) { + switch (metadata) { + case ONE_DAY: + return CommitInterval.COMMIT_INTERVAL_FOR_ONE_DAY.getCommitInterval(); + case ONE_MIN: + return CommitInterval.COMMIT_INTERVAL_FOR_ONE_MIN.getCommitInterval(); + case TEN_MIN: + return CommitInterval.COMMIT_INTERVAL_FOR_TEN_MIN.getCommitInterval(); + case ONE_HOUR: + return CommitInterval.COMMIT_INTERVAL_FOR_ONE_HOUR.getCommitInterval(); + case ONE_WEEK: + return CommitInterval.COMMIT_INTERVAL_FOR_ONE_WEEK.getCommitInterval(); + case TEN_SECOND: + return CommitInterval.COMMIT_INTERVAL_FOR_TEN_SECOND.getCommitInterval(); + default: + return CommitInterval.COMMIT_INTERVAL_DEFAULT.getCommitInterval(); + } + } + @Override public Collection getStreamDefinition(String inputTopic) { Collection topic = topicMap.get(inputTopic); @@ -150,4 +169,25 @@ public List getTopicNames() { topicList.sort(String.CASE_INSENSITIVE_ORDER); return topicList; } + + public enum CommitInterval { + COMMIT_INTERVAL_FOR_TEN_SECOND(10_000L), + COMMIT_INTERVAL_FOR_ONE_MIN(30_000L), + COMMIT_INTERVAL_FOR_TEN_MIN(300_000L), + COMMIT_INTERVAL_FOR_ONE_HOUR(1800_000L), + COMMIT_INTERVAL_FOR_ONE_DAY(7200_000L), + COMMIT_INTERVAL_FOR_ONE_WEEK(10800_000L), + COMMIT_INTERVAL_DEFAULT(30_000L); + + private final long commitInterval; + + + CommitInterval(long commitInterval) { + this.commitInterval = commitInterval; + } + + public long getCommitInterval() { + return commitInterval; + } + } } diff --git a/src/main/java/org/radarcns/stream/KStreamWorker.java b/src/main/java/org/radarcns/stream/KStreamWorker.java index deaa7c22..5c1069f6 100644 --- a/src/main/java/org/radarcns/stream/KStreamWorker.java +++ b/src/main/java/org/radarcns/stream/KStreamWorker.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadLocalRandom; import java.util.function.Function; import java.util.stream.Collectors; import javax.annotation.Nonnull; @@ -31,6 +32,7 @@ import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; @@ -126,8 +128,14 @@ protected Properties getStreamProperties(@Nonnull StreamDefinition definition) { localClientId += '-' + window.sizeMs + '-' + window.advanceMs; } - return kafkaProperty.getStreamProperties(localClientId, numThreads, + Properties props = kafkaProperty.getStreamProperties(localClientId, numThreads, DeviceTimestampExtractor.class); + long interval = (long)(ThreadLocalRandom.current().nextDouble(0.75, 1.25) + * definition.getCommitIntervalMs()); + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, + String.valueOf(interval)); + + return props; } /** diff --git a/src/main/java/org/radarcns/stream/StreamDefinition.java b/src/main/java/org/radarcns/stream/StreamDefinition.java index 3bcf6fd2..07c3c177 100644 --- a/src/main/java/org/radarcns/stream/StreamDefinition.java +++ b/src/main/java/org/radarcns/stream/StreamDefinition.java @@ -16,6 +16,7 @@ package org.radarcns.stream; +import static org.radarcns.stream.GeneralStreamGroup.CommitInterval.COMMIT_INTERVAL_DEFAULT; import static org.radarcns.util.Comparison.compare; import java.util.Objects; @@ -24,10 +25,12 @@ import org.apache.kafka.streams.kstream.TimeWindows; import org.radarcns.topic.KafkaTopic; + public class StreamDefinition implements Comparable { private final KafkaTopic inputTopic; private final KafkaTopic outputTopic; private final TimeWindows window; + private final long commitIntervalMs; /** * Constructor. It takes in input the topic name to be consumed and to topic name where the @@ -36,7 +39,7 @@ public class StreamDefinition implements Comparable { * @param output output {@link KafkaTopic} */ public StreamDefinition(@Nonnull KafkaTopic input, @Nonnull KafkaTopic output) { - this(input, output, 0L); + this(input, output, 0L, COMMIT_INTERVAL_DEFAULT.getCommitInterval()); } /** @@ -47,24 +50,41 @@ public StreamDefinition(@Nonnull KafkaTopic input, @Nonnull KafkaTopic output) { * @param window time window for aggregation. */ public StreamDefinition(@Nonnull KafkaTopic input, @Nonnull KafkaTopic output, long window) { - this(input, output, window == 0 ? null : TimeWindows.of(window)); + this(input, output, window == 0 ? null : TimeWindows.of(window), + COMMIT_INTERVAL_DEFAULT.getCommitInterval()); + } + + /** + * Constructor. It takes in input the topic name to be consumed and to topic name where the + * related stream will write the computed values. + * @param input source {@link KafkaTopic} + * @param output output {@link KafkaTopic} + * @param window time window for aggregation. + * @param commitIntervalMs The commit.interval.ms config for the stream + */ + public StreamDefinition(@Nonnull KafkaTopic input, @Nonnull KafkaTopic output, long window, + long commitIntervalMs) { + this(input, output, window == 0 ? null : TimeWindows.of(window), commitIntervalMs); } + /** * Constructor. It takes in input the topic name to be consumed and to topic name where the * related stream will write the computed values. * @param input source {@link KafkaTopic} * @param output output {@link KafkaTopic} * @param window time window for aggregation. + * @param commitIntervalMs The commit.interval.ms config for the stream */ public StreamDefinition(@Nonnull KafkaTopic input, @Nonnull KafkaTopic output, - @Nullable TimeWindows window) { + @Nullable TimeWindows window, @Nonnull long commitIntervalMs) { Objects.requireNonNull(input); Objects.requireNonNull(output); this.inputTopic = input; this.outputTopic = output; this.window = window; + this.commitIntervalMs = commitIntervalMs; } @Nonnull @@ -94,6 +114,11 @@ public TimeWindows getTimeWindows() { return window; } + @Nullable + public long getCommitIntervalMs(){ + return commitIntervalMs; + } + @Override public boolean equals(Object o) { if (this == o) {