Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed May 15, 2024
1 parent e50330a commit b2ee7e2
Showing 1 changed file with 63 additions and 44 deletions.
107 changes: 63 additions & 44 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ You can add streams-bootstrap via Maven Central.
#### Gradle

```gradle
compile group: 'com.bakdata.kafka', name: 'streams-bootstrap', version: '2.1.1'
implementation group: 'com.bakdata.kafka', name: 'streams-bootstrap', version: '3.0.0'
```

#### Maven
Expand All @@ -36,7 +36,7 @@ compile group: 'com.bakdata.kafka', name: 'streams-bootstrap', version: '2.1.1'
<dependency>
<groupId>com.bakdata.kafka</groupId>
<artifactId>streams-bootstrap</artifactId>
<version>2.1.1</version>
<version>3.0.0</version>
</dependency>
```

Expand All @@ -52,8 +52,10 @@ and `getUniqueAppId()`. You can define the topology of your application in `buil

```java
import com.bakdata.kafka.KafkaStreamsApplication;
import java.util.Properties;
import org.apache.kafka.streams.StreamsBuilder;
import com.bakdata.kafka.StreamsApp;
import com.bakdata.kafka.StreamsTopicConfig;
import com.bakdata.kafka.TopologyBuilder;
import java.util.Map;
import org.apache.kafka.streams.kstream.KStream;

public class StreamsBootstrapApplication extends KafkaStreamsApplication {
Expand All @@ -62,26 +64,30 @@ public class StreamsBootstrapApplication extends KafkaStreamsApplication {
}

@Override
public void buildTopology(final StreamsBuilder builder) {
final KStream<String, String> input =
builder.<String, String>stream(this.getInputTopics());
public StreamsApp createApp(final boolean cleanUp) {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
final KStream<String, String> input = builder.streamInput();

// your topology
// your topology

input.to(this.getOutputTopic());
}

@Override
public String getUniqueAppId() {
return "streams-bootstrap-app";
}
input.to(builder.getTopics().getOutputTopic());
}

// Optionally you can override the default streams bootstrap Kafka properties
@Override
protected Properties createKafkaProperties() {
final Properties kafkaProperties = super.createKafkaProperties();
@Override
public String getUniqueAppId(final StreamsTopicConfig topics) {
return "streams-bootstrap-app-" + topics.getOutputTopic();
}

return kafkaProperties;
// Optionally you can define custom Kafka properties
@Override
public Map<String, Object> createKafkaProperties() {
return Map.of(
// your config
);
}
};
}
}
```
Expand All @@ -92,6 +98,8 @@ The following configuration options are available:

- `--schema-registry-url`: The URL of the Schema Registry

- `--kafka-config`: Kafka Streams configuration (`<String=String>[,<String=String>...]`)

- `--input-topics`: List of input topics (comma-separated)

- `--input-pattern`: Pattern of input topics
Expand All @@ -100,8 +108,6 @@ The following configuration options are available:

- `--error-topic`: A topic to write errors to

- `--streams-config`: Kafka Streams configuration (`<String=String>[,<String=String>...]`)

- `--extra-input-topics`: Additional named input topics if you need to specify multiple topics with different message
types (`<String=String>[,<String=String>...]`)

Expand All @@ -113,41 +119,53 @@ The following configuration options are available:

- `--volatile-group-instance-id`: Whether the group instance id is volatile, i.e., it will change on a Streams shutdown.

- `--clean-up`: Whether the state of the Kafka Streams app, i.e., offsets and state stores and auto-created topics,
should be cleared instead of running the app
- `--debug`: Configure logging to debug

Additionally, the following commands are available:

- `--delete-output`: Whether the output topics with their associated schemas and the consumer group should be deleted
during the cleanup
- `clean`: Reset the Kafka Streams application. Additionally, delete the consumer group and all output and intermediate
topics associated with the Kafka Streams application.

- `--debug`: Configure logging to debug
- `reset`: Clear all state stores, consumer group offsets, and internal topics associated with the Kafka Streams
application.

#### Kafka producer

Create a subclass of `KafkaProducerApplication`.

```java
import com.bakdata.kafka.KafkaProducerApplication;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import com.bakdata.kafka.ProducerApp;
import com.bakdata.kafka.ProducerBuilder;
import com.bakdata.kafka.ProducerRunnable;
import java.util.Map;
import org.apache.kafka.clients.producer.Producer;

public class StreamsBootstrapApplication extends KafkaProducerApplication {
public static void main(final String[] args) {
startApplication(new StreamsBootstrapApplication(), args);
}

@Override
protected void runApplication() {
try (final KafkaProducer<Object, Object> producer = this.createProducer()) {
// your producer
public ProducerApp createApp(final boolean cleanUp) {
return new ProducerApp() {
@Override
public ProducerRunnable buildRunnable(final ProducerBuilder builder) {
return () -> {
try (final Producer<Object, Object> producer = builder.createProducer()) {
// your producer
}
};
}
}

// Optionally you can override the default streams bootstrap Kafka properties
@Override
protected Properties createKafkaProperties() {
final Properties kafkaProperties = super.createKafkaProperties();

return kafkaProperties;
// Optionally you can define custom Kafka properties
@Override
public Map<String, Object> createKafkaProperties() {
return Map.of(
// your config
);
}
};
}
}
```
Expand All @@ -158,17 +176,18 @@ The following configuration options are available:

- `--schema-registry-url`: The URL of the Schema Registry

- `--output-topic`: The output topic
- `--kafka-config`: Kafka producer configuration (`<String=String>[,<String=String>...]`)

- `--streams-config`: Kafka producer configuration (`<String=String>[,<String=String>...]`)
- `--output-topic`: The output topic

- `--extra-output-topics`: Additional named output topics (`String=String>[,<String=String>...]`)

- `--clean-up`: Whether the output topics and associated schemas of the producer app should be deleted instead of
running the app

- `--debug`: Configure logging to debug

Additionally, the following commands are available:

- `clean`: Delete all output topics associated with the Kafka Producer application.

### Helm Charts

For the configuration and deployment to Kubernetes, you can use
Expand Down

0 comments on commit b2ee7e2

Please sign in to comment.