Skip to content

Commit

Permalink
[TH2-4929] Transport protocol (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
lumber1000 authored Aug 7, 2024
1 parent 21a4408 commit e7fad5e
Show file tree
Hide file tree
Showing 10 changed files with 508 additions and 270 deletions.
49 changes: 38 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# KafkaConnect (0.2.0)
# KafkaConnect (0.3.0)
The "KafkaConnect" component is responsible for the communication with Kafka;

## Configuration

This configuration should be specified in the custom configuration block in schema editor.

```yaml
customConfig:

useTransport: true
aliasToTopic:
session_alias_01:
topic: "topic_01"
Expand Down Expand Up @@ -46,6 +46,7 @@ This configuration should be specified in the custom configuration block in sche
```
Parameters:
+ useTransport - use th2 transport or protobuf protocol to publish incoming/outgoing messages (false by default)
+ aliasToTopic - matches th2 sessions with Kafka topics **Note: Kafka does not guarantee message ordering within topic if topic contains more than one partition**
+ aliasToTopicAndKey - matches th2 sessions with Kafka topics and keys **Note: Kafka guarantees message ordering only within messages with the same non null key if topic contains more than one partition**
+ sessionGroups - match session group with sessions (key: session group, value: list of session aliases)
Expand Down Expand Up @@ -87,17 +88,19 @@ If the consumer loses connection to which one, then it will try to reconnect to

## Pins

Messages that were received to 'to_send' pin will be send to Kafka.
Messages that were received to 'to_send' pin will be sent to Kafka.
Messages that were received from / sent to the Kafka will be sent to the `out_raw` pin:

Example of pins configuration:

protobuf
```yaml
spec:
imageName: ghcr.io/th2-net/th2-conn-kafka
imageVersion: 0.2.0
imageVersion: 0.3.0
type: th2-conn
customConfig:
useTransport: false
pins:
mq:
subscribers:
Expand All @@ -112,10 +115,38 @@ spec:
attributes: ["raw", "publish", "store"]
```

th2 transport
```yaml
spec:
imageName: ghcr.io/th2-net/th2-conn-kafka
imageVersion: 0.3.0
type: th2-conn
customConfig:
useTransport: true
pins:
mq:
subscribers:
- name: to_send
attributes: ["send", "transport-group", "subscribe"]
linkTo:
- box: script
pin: to_conn
publishers:
- name: out_raw
attributes: ["transport-group", "publish"]
```

## Release notes

### 0.2.0
### 0.3.0
+ TH2 transport protocol support
Updated bom: `4.5.0`
+ Updated common: `5.4.0-dev`
+ Updated common-utils: `2.1.1-dev`
+ Updated kafka-clients: `3.5.1`

### 0.2.0
+ Secure connection support
+ Kafka batching settings
+ Message events publishing setting
Expand All @@ -126,23 +157,19 @@ spec:
+ bump library versions

### 0.1.0

+ Migrated to Books & Pages concept

### 0.0.4
+ th2-common upgrade to `3.44.1`
+ th2-bom upgrade to `4.2.0`

### 0.0.3

+ Publishing to Kafka support
+ Kafka keys support
+ Session groups support

### 0.0.2

+ Reusable workflow with dependency check

### 0.0.1

+ Initial version
97 changes: 15 additions & 82 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,99 +1,32 @@
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
id 'application'
id 'com.palantir.docker' version '0.25.0'
id 'org.jetbrains.kotlin.jvm' version "${kotlin_version}"
id "org.owasp.dependencycheck" version "8.1.2"
}

group 'com.exactpro.th2'
version release_version

sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11

repositories {
mavenCentral()
maven {
name 'Sonatype_snapshots'
url 'https://s01.oss.sonatype.org/content/repositories/snapshots/'
}
maven {
name 'Sonatype_releases'
url 'https://s01.oss.sonatype.org/content/repositories/releases/'
buildscript {
repositories {
gradlePluginPortal()
maven {
url = "https://s01.oss.sonatype.org/content/repositories/snapshots/"
}
}

configurations.configureEach {
resolutionStrategy.cacheChangingModulesFor 0, 'seconds'
resolutionStrategy.cacheDynamicVersionsFor 0, 'seconds'
dependencies {
classpath "com.exactpro.th2:th2-gradle-plugin:0.0.1-dev-5915772757-13a28ae-SNAPSHOT"
}
}

jar {
manifest {
attributes(
'Created-By': "${System.getProperty('java.version')} (${System.getProperty('java.vendor')})",
'Specification-Title': '',
'Specification-Vendor': 'Exactpro Systems LLC',
'Implementation-Title': project.archivesBaseName,
'Implementation-Vendor': 'Exactpro Systems LLC',
'Implementation-Vendor-Id': 'com.exactpro',
'Implementation-Version': project.version
)
}
}
apply plugin: "com.exactpro.th2.common-conventions"
apply plugin: "com.exactpro.th2.docker-conventions"

dependencies {
api platform("com.exactpro.th2:bom:4.3.0")
implementation "com.exactpro.th2:common:5.2.1-dev"
implementation "com.exactpro.th2:common-utils:2.0.0-dev"
api platform("com.exactpro.th2:bom:4.5.0")
implementation "com.exactpro.th2:common:5.4.0-dev"
implementation "com.exactpro.th2:common-utils:2.2.0-dev"

implementation "org.apache.kafka:kafka-clients:3.4.1"
implementation "org.apache.kafka:kafka-clients:3.5.1"
implementation 'com.fasterxml.jackson.core:jackson-databind'

testImplementation "org.jetbrains.kotlin:kotlin-test-junit5"
testImplementation "org.mockito.kotlin:mockito-kotlin:5.0.0"
testImplementation "org.mockito:mockito-inline:5.2.0"
testImplementation "org.assertj:assertj-core:3.24.2"
}

test {
useJUnitPlatform()
}

applicationName = 'service'

application {
mainClass = 'com.exactpro.th2.kafka.client.Main'
}

distTar {
archiveFileName = "${applicationName}.tar"
}

dockerPrepare {
dependsOn distTar
}

docker {
copySpec.from(tarTree("$buildDir/distributions/${applicationName}.tar"))
}

tasks.withType(KotlinCompile).configureEach {
kotlinOptions.jvmTarget = JavaVersion.VERSION_11
}

dependencyCheck {
formats=['SARIF', 'JSON', 'HTML']
failBuildOnCVSS=5

analyzers {
assemblyEnabled = false
nugetconfEnabled = false
nodeEnabled = false
}
}

configurations {
compileClasspath {
resolutionStrategy.activateDependencyLocking()
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
kotlin.code.style=official
kotlin_version=1.6.21
release_version = 0.2.0
release_version = 0.3.0
description='Kafka Client'
vcs_url=https://github.com/th2-net/th2-conn-kafka
app_main_class=com.exactpro.th2.kafka.client.Main
7 changes: 6 additions & 1 deletion src/main/kotlin/com/exactpro/th2/kafka/client/Config.kt
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,15 @@ class Config(
@JsonProperty(SaslConfigs.SASL_JAAS_CONFIG)
val kafkaSaslJaasConfig: String? = null,

val extraConsumerProps: Map<String, String> = emptyMap(),
val extraProducerProps: Map<String, String> = emptyMap(),

val createTopics: Boolean = false,
val topicsToCreate: List<String> = emptyList(),
val newTopicsPartitions: Int = 1,
val newTopicsReplicationFactor: Short = 1
val newTopicsReplicationFactor: Short = 1,

val useTransport: Boolean = false
) {
@JsonIgnore
val maxInactivityPeriodDuration: Duration = maxInactivityPeriodUnit.toMillis(maxInactivityPeriod).toDuration(DurationUnit.MILLISECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class KafkaClientsFactory(private val config: Config) {
put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, config.reconnectBackoffMs)
put(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, config.reconnectBackoffMaxMs)
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, config.kafkaAutoOffsetReset)
config.extraConsumerProps.forEach { put(it.key, it.value) }
addSecuritySettings()
}
)
Expand All @@ -55,6 +56,7 @@ class KafkaClientsFactory(private val config: Config) {
put(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, config.reconnectBackoffMaxMs)
put(ProducerConfig.BATCH_SIZE_CONFIG, config.kafkaBatchSize)
put(ProducerConfig.LINGER_MS_CONFIG, config.kafkaLingerMillis)
config.extraProducerProps.forEach { put(it.key, it.value) }
addSecuritySettings()
}
)
Expand Down
Loading

0 comments on commit e7fad5e

Please sign in to comment.