Skip to content

Commit

Permalink
TH2-4545 (#24)
Browse files Browse the repository at this point in the history
* message publishing to Kafka
* books&pages support
* alias groups support
* events added
  • Loading branch information
lumber1000 authored Feb 17, 2023
1 parent a741dfa commit 9c179de
Show file tree
Hide file tree
Showing 20 changed files with 1,525 additions and 682 deletions.
28 changes: 14 additions & 14 deletions .github/workflows/ci-unwelcome-words.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ jobs:
test:
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v2
with:
ref: ${{ github.sha }}
- name: Checkout tool
uses: actions/checkout@v2
with:
repository: exactpro-th2/ci-github-action
ref: master
token: ${{ secrets.PAT_CI_ACTION }}
path: ci-github-action
- name: Run CI action
uses: ./ci-github-action
with:
ref: ${{ github.sha }}
- uses: actions/checkout@v2
with:
ref: ${{ github.sha }}
- name: Checkout tool
uses: actions/checkout@v2
with:
repository: exactpro-th2/ci-github-action
ref: master
token: ${{ secrets.PAT_CI_ACTION }}
path: ci-github-action
- name: Run CI action
uses: ./ci-github-action
with:
ref: ${{ github.sha }}
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
FROM gradle:6.8-jdk11 AS build
FROM gradle:7.6-jdk11 AS build
ARG release_version
COPY ./ .
RUN gradle --no-daemon clean build dockerPrepare -Prelease_version=${release_version}

FROM adoptopenjdk/openjdk11:alpine
WORKDIR /home
COPY --from=build /home/gradle/build/docker .
ENTRYPOINT ["/home/service/bin/service"]
ENTRYPOINT ["/home/service/bin/service"]
125 changes: 96 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,52 +1,119 @@
# KafkaConnect (0.0.2)
# KafkaConnect (0.1.0)
The "KafkaConnect" component is responsible for the communication with Kafka;

## Configuration

This configuration should be specified in the custom configuration block in schema editor.
```yaml
customConfig:

```json
{
"topicToAlias": {
"topicName" : "session-alias"
},
"groupId": 1,
"acceptableBreak" : 8,
"acceptableBreakTimeUnit": "HOURS",
"batchSize": 100,
"timespan": 1000,
"timespanUnit" : "MILLISECONDS",
"reconnectBackoffMs": 50,
"reconnectBackoffMaxMs": 1000
}
aliasToTopic:
session_alias_01:
topic: "topic_01"
subscribe: true
session_alias_02:
topic: "topic_02"
subscribe: true

aliasToTopicAndKey:
session_alias_03:
topic: "topic_03"
key: "key_01"
subscribe: false
session_alias_04:
topic: "topic_04"
key: null

sessionGroups:
session_alias_group_01: ["session_alias_01", "session_alias_02"]
session_alias_group_02: ["session_alias_03"]

defaultSessionGroup: "session_alias_group_02"
bootstrapServers: "cp-kafka:9092"
groupId: "consumer_group_01"

maxInactivityPeriod: 8
maxInactivityPeriodUnit: "HOURS"
batchSize: 100
timeSpan: 1000
timeSpanUnit : "MILLISECONDS"
reconnectBackoffMs: 50
reconnectBackoffMaxMs: 1000
kafkaConnectionEvents: true
```
Parameters:
+ session-alias - that session alias will be set for all messages sent by this component. **It should be unique for each "KafkaConnect" topic**;
+ aliasToTopic - matches th2 sessions with Kafka topics **Note: Kafka does not guarantee message ordering within topic if topic contains more than one partition**
+ aliasToTopicAndKey - matches th2 sessions with Kafka topics and keys **Note: Kafka guarantees message ordering only within messages with the same non null key if topic contains more than one partition**
+ sessionGroups - match session group with sessions (key: session group, value: list of session aliases)
+ defaultSessionGroup - this session group will be set for sessions not mentioned in `sessionGroups`
+ groupId - that ID will be used for Kafka connection
+ bootstrapServers - URL of one of the Kafka brokers which you give to fetch the initial metadata about your Kafka cluster
+ acceptableBreak - if the period of inactivity is longer than this time, then start reading Kafka messages from the current moment. Should be positive.
+ timeUnit - time unit for `acceptableBreak` and `timespan` classification
+ maxInactivityPeriod - if the period of inactivity is longer than this time, then start reading Kafka messages from the current moment. Should be positive.
+ maxInactivityPeriodUnit - time unit for `maxInactivityPeriod`
+ DAYS
+ HOURS
+ MINUTES
+ SECONDS
+ MILLISECONDS
+ MICROSECONDS
+ NANOSECONDS
+ batchSize - the size of one batch. Should be positive.
+ timespan - The period router collects messages before it should be sent. Should be positive.
+ batchSize - the size of one batch (number of messages). Should be positive.
+ timeSpan - The period router collects messages before it should be sent. Should be positive.
+ timeSpanUnit time unit for `timeSpan`
+ reconnectBackoffMs - The amount of time in milliseconds to wait before attempting to reconnect to a given host. Should be positive.
+ reconnectBackoffMaxMs - The maximum amount of time in milliseconds to backoff/wait when reconnecting to a broker that
has repeatedly failed to connect. If provided, the backoff per host will increase
exponentially for each consecutive connection failure, up to this maximum.
Once the maximum is reached, reconnection attempts will continue periodically with
this fixed rate. To avoid connection storms, a randomization factor of 0.2 will be applied to
the backoff resulting in a random range between 20% below and 20% above the computed value. Should be positive.
+ reconnectBackoffMaxMs - The maximum amount of time in milliseconds to backoff/wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. Once the maximum is reached, reconnection attempts will continue periodically with this fixed rate. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between 20% below and 20% above the computed value. Should be positive.
+ kafkaConnectionEvents - Generate TH2 events on lost connection and restore connection to Kafka. `false` by default.

## Reconnect behaviour
If the consumer loses connection to which one, then it will try to reconnect to it indefinitely
in accordance with `reconnectBackoffMs` and `reconnectBackoffMaxMs`
You can see a warning in the log files

If the consumer loses connection to which one, then it will try to reconnect to it indefinitely in accordance with `reconnectBackoffMs` and `reconnectBackoffMaxMs`You can see a warning in the log files

`[kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 1 (/127.0.0.1:9092) could not be established. Broker may not be available.`

## Pins

Messages that were received to 'to_send' pin will be send to Kafka.
Messages that were received from / sent to the Kafka will be sent to the `out_raw` pin:

Example of pins configuration:

```yaml
spec:
imageName: ghcr.io/th2-net/th2-conn-kafka
imageVersion: 0.0.3
type: th2-conn
pins:
mq:
subscribers:
- name: to_send
attributes: ["send", "raw", "subscribe"]
linkTo:
- box: script
pin: to_conn
publishers:
- name: out_raw
attributes: ["raw", "publish", "store"]
```

## Release notes

### 0.1.0

+ Migrated to Books & Pages concept

### 0.0.3

+ Publishing to Kafka support
+ Kafka keys support
+ Session groups support

### 0.0.2

+ Reusable workflow with dependency check

### 0.0.1

+ Initial version
56 changes: 37 additions & 19 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
id 'java'
id 'java-library'
id 'maven-publish'
id 'com.palantir.docker' version '0.25.0'
id 'application'
id 'com.palantir.docker' version '0.25.0'
id 'org.jetbrains.kotlin.jvm' version "${kotlin_version}"
id "org.owasp.dependencycheck" version "7.4.4"
}

group 'com.exactpro.th2'
version release_version

sourceCompatibility = 11
targetCompatibility = 11

sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11

repositories {
mavenCentral()
Expand All @@ -24,9 +23,8 @@ repositories {
name 'Sonatype_releases'
url 'https://s01.oss.sonatype.org/content/repositories/releases/'
}
mavenLocal()

configurations.all {
configurations.configureEach {
resolutionStrategy.cacheChangingModulesFor 0, 'seconds'
resolutionStrategy.cacheDynamicVersionsFor 0, 'seconds'
}
Expand All @@ -48,23 +46,43 @@ jar {

dependencies {
api platform('com.exactpro.th2:bom:4.1.0')
implementation 'com.exactpro.th2:common:3.44.0'

implementation "org.slf4j:slf4j-api"
implementation 'com.exactpro.th2:common:5.1.0-dev-version-5-4085018593-8adee33-SNAPSHOT'
implementation 'com.exactpro.th2:common-utils:0.0.1-book-and-page-3836172629-SNAPSHOT'

implementation group: 'org.apache.kafka', name: 'kafka_2.13', version: '3.3.1'
implementation 'io.reactivex.rxjava3:rxjava:3.0.4'
implementation 'org.apache.kafka:kafka-clients:3.4.0'

testImplementation 'org.junit.jupiter:junit-jupiter:5.7.2'
testImplementation 'org.jetbrains.kotlin:kotlin-test-junit5'
testImplementation 'org.mockito.kotlin:mockito-kotlin:4.1.0'
testImplementation 'org.mockito:mockito-inline:5.1.1'
testImplementation 'org.assertj:assertj-core:3.24.2'
}

test {
useJUnitPlatform()
}

applicationName = 'service'

application {
mainClassName 'com.exactpro.th2.KafkaMain'
mainClass = 'com.exactpro.th2.kafka.client.Main'
}
applicationName = 'service'

dependencyCheck {
formats=['SARIF']
distTar {
archiveFileName = "${applicationName}.tar"
}

dockerPrepare {
dependsOn distTar
}

docker {
copySpec.from(tarTree("$buildDir/distributions/${applicationName}.tar"))
}

tasks.withType(KotlinCompile).configureEach {
kotlinOptions.jvmTarget = JavaVersion.VERSION_11
}

dependencyCheck {
formats=['SARIF']
}
6 changes: 5 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
release_version = 0.0.2
kotlin.code.style=official
kotlin_version=1.6.21
release_version = 0.1.0
description='Kafka Client'
vcs_url=https://github.com/th2-net/th2-conn-kafka
6 changes: 3 additions & 3 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright 2021-2021 Exactpro (Exactpro Systems Limited)
# Copyright 2021-2023 Exactpro (Exactpro Systems Limited)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,8 +14,8 @@
# limitations under the License.
#

distributionUrl=https\://services.gradle.org/distributions/gradle-6.8-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-all.zip
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStorePath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStoreBase=GRADLE_USER_HOME
Loading

0 comments on commit 9c179de

Please sign in to comment.