Skip to content

Commit

Permalink
Merge pull request #77 from RADAR-base/release-0.3.0
Browse files Browse the repository at this point in the history
Release 0.3.0
  • Loading branch information
yatharthranjan authored Jul 17, 2018
2 parents 7adbef1 + 599809b commit b5191c6
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 23 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ RUN ./gradlew distTar && \
tar xf build/distributions/*.tar && \
rm build/distributions/*.tar

FROM confluentinc/cp-base:3.3.1
FROM confluentinc/cp-base:4.1.0

MAINTAINER Nivethika M <[email protected]> , Joris Borgdorff <[email protected]>
MAINTAINER Nivethika M <[email protected]> , Joris Borgdorff <[email protected]> , Yatharth Ranjan <[email protected]>

LABEL description="RADAR-CNS Backend streams and monitor"

Expand Down
7 changes: 4 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ plugins {
//---------------------------------------------------------------------------//

group = 'org.radarcns'
version = '0.2.3'
version = '0.3.0'

ext.description = 'Kafka backend for processing device data.'

mainClassName = 'org.radarcns.RadarBackend'
Expand All @@ -24,9 +25,9 @@ sourceCompatibility = '1.8'

ext.boundaryVersion = '1.0.6'
ext.codacyVersion = '1.0.10'
ext.confluentVersion = '3.3.1'
ext.confluentVersion = '4.1.0'
ext.hamcrestVersion = '1.3'
ext.kafkaVersion = '0.11.0.2'
ext.kafkaVersion = '1.1.0'
ext.jacksonVersion='2.8.5'
ext.javaMailVersion = '1.5.6'
ext.junitVersion = '4.12'
Expand Down
8 changes: 6 additions & 2 deletions gradle/codacy.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,21 @@ configurations {
codacy
}

jacoco {
toolVersion = "0.8.1"
}

dependencies {
codacy group: 'com.github.codacy', name: 'codacy-coverage-reporter', version: '2.0.1'
codacy group: 'com.github.codacy', name: 'codacy-coverage-reporter', version: '4.0.1'
}

jacocoTestReport {
executionData test, integrationTest
reports {
xml.enabled true
csv.enabled false
html.enabled true
}
executionData test, integrationTest
}

task sendCoverageToCodacy(type: JavaExec, dependsOn: jacocoTestReport) {
Expand Down
2 changes: 1 addition & 1 deletion gradle/test.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ dependencies {
integrationTestImplementation group: 'org.slf4j', name: 'slf4j-log4j12', version: slf4jVersion

// For Topic name validation based on Kafka classes
testImplementation (group: 'org.apache.kafka', name: 'kafka_2.11', version: kafkaVersion) {
testCompile (group: 'org.apache.kafka', name: 'kafka_2.11', version: kafkaVersion) {
exclude group: 'org.apache.kafka', module: 'kafka-clients'
exclude group: 'net.sf.jopt-simple'
exclude group: 'com.yammer.metrics'
Expand Down
1 change: 0 additions & 1 deletion radar.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ broker:

#Kafka internal parameters
stream_properties:
auto_commit_interval_ms: 1000
max.request.size: 3500042 #Set message.max.bytes for kafka brokers higher than or equal to this value
retries: 15
session_timeout_ms: 20000
Expand Down
32 changes: 24 additions & 8 deletions src/integrationTest/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
---
version: '2'

networks:
kafka:
driver: bridge
services:
#---------------------------------------------------------------------------#
# Zookeeper Cluster #
#---------------------------------------------------------------------------#
zookeeper-1:
image: confluentinc/cp-zookeeper:3.3.1
image: confluentinc/cp-zookeeper:4.1.0
networks:
- kafka
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
Expand All @@ -19,9 +24,11 @@ services:
# Kafka Cluster #
#---------------------------------------------------------------------------#
kafka-1:
image: confluentinc/cp-kafka:3.3.1
image: confluentinc/cp-kafka:4.1.0
depends_on:
- zookeeper-1
networks:
- kafka
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
Expand All @@ -33,9 +40,11 @@ services:
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

kafka-2:
image: confluentinc/cp-kafka:3.3.1
image: confluentinc/cp-kafka:4.1.0
depends_on:
- zookeeper-1
networks:
- kafka
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
Expand All @@ -47,9 +56,11 @@ services:
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

kafka-3:
image: confluentinc/cp-kafka:3.3.1
image: confluentinc/cp-kafka:4.1.0
depends_on:
- zookeeper-1
networks:
- kafka
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
Expand All @@ -64,12 +75,14 @@ services:
# Schema Registry #
#---------------------------------------------------------------------------#
schema-registry-1:
image: confluentinc/cp-schema-registry:3.3.1
image: confluentinc/cp-schema-registry:4.1.0
depends_on:
- zookeeper-1
- kafka-1
- kafka-2
- kafka-3
networks:
- kafka
restart: always
ports:
- "8081:8081"
Expand All @@ -83,12 +96,14 @@ services:
# REST proxy #
#---------------------------------------------------------------------------#
rest-proxy-1:
image: confluentinc/cp-kafka-rest:3.3.1
image: confluentinc/cp-kafka-rest:4.1.0
depends_on:
- kafka-1
- kafka-2
- kafka-3
- schema-registry-1
networks:
- kafka
ports:
- "8082:8082"
environment:
Expand All @@ -109,8 +124,9 @@ services:
depends_on:
- kafka-1
- schema-registry-1
command:
- integrationTest
networks:
- kafka
command: integrationTest
volumes:
- ../../../build/jacoco:/code/build/jacoco
- ../../../build/reports:/code/build/reports
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public SourceStatisticsMonitor(RadarPropertyHandler radar,
SourceStatisticsMonitorConfig config) {
super(radar, config.getTopics(), Objects.requireNonNull(config.getName(),
"Source statistics monitor must have a name"), "1-"
+ config.getOutputTopic(),
+ config.getOutputTopic() + UUID.randomUUID(),
new SourceStatisticsState());

if (getStateStore() == null) {
Expand Down
42 changes: 41 additions & 1 deletion src/main/java/org/radarcns/stream/GeneralStreamGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ protected Collection<StreamDefinition> createWindowedSensorStream(String input,
Collection<StreamDefinition> streams = Arrays.stream(TimeWindowMetadata.values())
.map(w -> new StreamDefinition(
new KafkaTopic(input), new KafkaTopic(w.getTopicLabel(outputBase)),
w.getIntervalInMilliSec()))
w.getIntervalInMilliSec(), getCommitIntervalForTimeWindow(w)))
.collect(Collectors.toList());

topicNames.addAll(streams.stream()
Expand All @@ -135,6 +135,25 @@ public void addTopicNames(Collection<String> topicNames) {
this.topicNames.addAll(topicNames);
}

public long getCommitIntervalForTimeWindow(TimeWindowMetadata metadata) {
switch (metadata) {
case ONE_DAY:
return CommitInterval.COMMIT_INTERVAL_FOR_ONE_DAY.getCommitInterval();
case ONE_MIN:
return CommitInterval.COMMIT_INTERVAL_FOR_ONE_MIN.getCommitInterval();
case TEN_MIN:
return CommitInterval.COMMIT_INTERVAL_FOR_TEN_MIN.getCommitInterval();
case ONE_HOUR:
return CommitInterval.COMMIT_INTERVAL_FOR_ONE_HOUR.getCommitInterval();
case ONE_WEEK:
return CommitInterval.COMMIT_INTERVAL_FOR_ONE_WEEK.getCommitInterval();
case TEN_SECOND:
return CommitInterval.COMMIT_INTERVAL_FOR_TEN_SECOND.getCommitInterval();
default:
return CommitInterval.COMMIT_INTERVAL_DEFAULT.getCommitInterval();
}
}

@Override
public Collection<StreamDefinition> getStreamDefinition(String inputTopic) {
Collection<StreamDefinition> topic = topicMap.get(inputTopic);
Expand All @@ -150,4 +169,25 @@ public List<String> getTopicNames() {
topicList.sort(String.CASE_INSENSITIVE_ORDER);
return topicList;
}

public enum CommitInterval {
COMMIT_INTERVAL_FOR_TEN_SECOND(10_000L),
COMMIT_INTERVAL_FOR_ONE_MIN(30_000L),
COMMIT_INTERVAL_FOR_TEN_MIN(300_000L),
COMMIT_INTERVAL_FOR_ONE_HOUR(1800_000L),
COMMIT_INTERVAL_FOR_ONE_DAY(7200_000L),
COMMIT_INTERVAL_FOR_ONE_WEEK(10800_000L),
COMMIT_INTERVAL_DEFAULT(30_000L);

private final long commitInterval;


CommitInterval(long commitInterval) {
this.commitInterval = commitInterval;
}

public long getCommitInterval() {
return commitInterval;
}
}
}
10 changes: 9 additions & 1 deletion src/main/java/org/radarcns/stream/KStreamWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
Expand Down Expand Up @@ -126,8 +128,14 @@ protected Properties getStreamProperties(@Nonnull StreamDefinition definition) {
localClientId += '-' + window.sizeMs + '-' + window.advanceMs;
}

return kafkaProperty.getStreamProperties(localClientId, numThreads,
Properties props = kafkaProperty.getStreamProperties(localClientId, numThreads,
DeviceTimestampExtractor.class);
long interval = (long)(ThreadLocalRandom.current().nextDouble(0.75, 1.25)
* definition.getCommitIntervalMs());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
String.valueOf(interval));

return props;
}

/**
Expand Down
31 changes: 28 additions & 3 deletions src/main/java/org/radarcns/stream/StreamDefinition.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.radarcns.stream;

import static org.radarcns.stream.GeneralStreamGroup.CommitInterval.COMMIT_INTERVAL_DEFAULT;
import static org.radarcns.util.Comparison.compare;

import java.util.Objects;
Expand All @@ -24,10 +25,12 @@
import org.apache.kafka.streams.kstream.TimeWindows;
import org.radarcns.topic.KafkaTopic;


public class StreamDefinition implements Comparable<StreamDefinition> {
private final KafkaTopic inputTopic;
private final KafkaTopic outputTopic;
private final TimeWindows window;
private final long commitIntervalMs;

/**
* Constructor. It takes in input the topic name to be consumed and to topic name where the
Expand All @@ -36,7 +39,7 @@ public class StreamDefinition implements Comparable<StreamDefinition> {
* @param output output {@link KafkaTopic}
*/
public StreamDefinition(@Nonnull KafkaTopic input, @Nonnull KafkaTopic output) {
this(input, output, 0L);
this(input, output, 0L, COMMIT_INTERVAL_DEFAULT.getCommitInterval());
}

/**
Expand All @@ -47,24 +50,41 @@ public StreamDefinition(@Nonnull KafkaTopic input, @Nonnull KafkaTopic output) {
* @param window time window for aggregation.
*/
public StreamDefinition(@Nonnull KafkaTopic input, @Nonnull KafkaTopic output, long window) {
this(input, output, window == 0 ? null : TimeWindows.of(window));
this(input, output, window == 0 ? null : TimeWindows.of(window),
COMMIT_INTERVAL_DEFAULT.getCommitInterval());
}

/**
* Constructor. It takes in input the topic name to be consumed and to topic name where the
* related stream will write the computed values.
* @param input source {@link KafkaTopic}
* @param output output {@link KafkaTopic}
* @param window time window for aggregation.
* @param commitIntervalMs The commit.interval.ms config for the stream
*/
public StreamDefinition(@Nonnull KafkaTopic input, @Nonnull KafkaTopic output, long window,
long commitIntervalMs) {
this(input, output, window == 0 ? null : TimeWindows.of(window), commitIntervalMs);
}


/**
* Constructor. It takes in input the topic name to be consumed and to topic name where the
* related stream will write the computed values.
* @param input source {@link KafkaTopic}
* @param output output {@link KafkaTopic}
* @param window time window for aggregation.
* @param commitIntervalMs The commit.interval.ms config for the stream
*/
public StreamDefinition(@Nonnull KafkaTopic input, @Nonnull KafkaTopic output,
@Nullable TimeWindows window) {
@Nullable TimeWindows window, @Nonnull long commitIntervalMs) {
Objects.requireNonNull(input);
Objects.requireNonNull(output);

this.inputTopic = input;
this.outputTopic = output;
this.window = window;
this.commitIntervalMs = commitIntervalMs;
}

@Nonnull
Expand Down Expand Up @@ -94,6 +114,11 @@ public TimeWindows getTimeWindows() {
return window;
}

@Nullable
public long getCommitIntervalMs(){
return commitIntervalMs;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down

0 comments on commit b5191c6

Please sign in to comment.