Skip to content

Commit

Permalink
* Splits Elastic module into elastic-common and elastic8
Browse files Browse the repository at this point in the history
* Adding Opensearch module
* Adding Opensearch unit tests
* Opensearch SSL Test - needs completing
* Replace Elastic6+7 with Elastic8
  • Loading branch information
davidsloan committed Mar 25, 2024
1 parent 9b634a8 commit 5832ac9
Show file tree
Hide file tree
Showing 124 changed files with 3,019 additions and 3,977 deletions.
53 changes: 39 additions & 14 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import Dependencies.globalExcludeDeps
import Dependencies.gson
import Dependencies.bouncyCastle

import Settings.*
import sbt.Keys.libraryDependencies
import sbt.*
Expand All @@ -18,8 +20,9 @@ lazy val subProjects: Seq[Project] = Seq(
`azure-documentdb`,
`azure-datalake`,
cassandra,
elastic6,
elastic7,
`elastic-common`,
opensearch,
elastic8,
ftp,
`gcp-storage`,
http,
Expand Down Expand Up @@ -219,18 +222,17 @@ lazy val cassandra = (project in file("kafka-connect-cassandra"))
.configureFunctionalTests()
.enablePlugins(PackPlugin)

lazy val elastic6 = (project in file("kafka-connect-elastic6"))
lazy val `elastic-common` = (project in file("kafka-connect-elastic-common"))
.dependsOn(common)
.dependsOn(`sql-common`)
.dependsOn(`test-common` % "fun->compile")
.settings(
settings ++
Seq(
name := "kafka-connect-elastic6",
name := "kafka-connect-elastic-common",
description := "Kafka Connect compatible connectors to move data between Kafka and popular data stores",
libraryDependencies ++= baseDeps ++ kafkaConnectElastic6Deps,
libraryDependencies ++= baseDeps ++ kafkaConnectElasticBaseDeps,
publish / skip := true,
FunctionalTest / baseDirectory := (LocalRootProject / baseDirectory).value,
packExcludeJars := Seq(
"scala-.*\\.jar",
"zookeeper-.*\\.jar",
Expand All @@ -239,20 +241,20 @@ lazy val elastic6 = (project in file("kafka-connect-elastic6"))
)
.configureAssembly(true)
.configureTests(baseTestDeps)
.configureIntegrationTests(kafkaConnectElastic6TestDeps)
.configureIntegrationTests(kafkaConnectElastic8TestDeps)
.configureFunctionalTests()
.enablePlugins(PackPlugin)
.disablePlugins(PackPlugin)

lazy val elastic7 = (project in file("kafka-connect-elastic7"))
lazy val elastic8 = (project in file("kafka-connect-elastic8"))
.dependsOn(common)
.dependsOn(`sql-common`)
.dependsOn(`test-common` % "fun->compile")
.dependsOn(`elastic-common`)
.dependsOn(`test-common` % "fun->compile;it->compile")
.settings(
settings ++
Seq(
name := "kafka-connect-elastic7",
name := "kafka-connect-elastic8",
description := "Kafka Connect compatible connectors to move data between Kafka and popular data stores",
libraryDependencies ++= baseDeps ++ kafkaConnectElastic7Deps,
libraryDependencies ++= baseDeps ++ kafkaConnectElastic8Deps,
publish / skip := true,
packExcludeJars := Seq(
"scala-.*\\.jar",
Expand All @@ -262,10 +264,33 @@ lazy val elastic7 = (project in file("kafka-connect-elastic7"))
)
.configureAssembly(true)
.configureTests(baseTestDeps)
.configureIntegrationTests(kafkaConnectElastic7TestDeps)
.configureIntegrationTests(kafkaConnectElastic8TestDeps)
.configureFunctionalTests()
.enablePlugins(PackPlugin)

lazy val opensearch = (project in file("kafka-connect-opensearch"))
.dependsOn(common)
.dependsOn(`elastic-common`)
.dependsOn(`test-common` % "fun->compile;it->compile")
.settings(
settings ++
Seq(
name := "kafka-connect-opensearch",
description := "Kafka Connect compatible connectors to move data between Kafka and popular data stores",
libraryDependencies ++= baseDeps ++ kafkaConnectOpenSearchDeps,
publish / skip := true,
packExcludeJars := Seq(
"scala-.*\\.jar",
"zookeeper-.*\\.jar",
),
),
)
.configureAssembly(false)
.configureTests(baseTestDeps)
//.configureIntegrationTests(kafkaConnectOpenSearchTestDeps)
.configureFunctionalTests(bouncyCastle)
.enablePlugins(PackPlugin)

lazy val http = (project in file("kafka-connect-http"))
.dependsOn(common)
//.dependsOn(`test-common` % "fun->compile")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.lenses.streamreactor.connect.azure.documentdb.sink

import cats.implicits.toBifunctorOps
import io.lenses.streamreactor.common.config.Helpers
import io.lenses.streamreactor.common.utils.JarManifest
import io.lenses.streamreactor.connect.azure.documentdb.DocumentClientProvider
Expand Down Expand Up @@ -100,7 +101,7 @@ class DocumentDbSinkConnector private[sink] (builder: DocumentDbSinkSettings =>
configProps = props

//check input topics
Helpers.checkInputTopics(DocumentDbConfigConstants.KCQL_CONFIG, props.asScala.toMap)
Helpers.checkInputTopics(DocumentDbConfigConstants.KCQL_CONFIG, props.asScala.toMap).leftMap(throw _)

val settings = DocumentDbSinkSettings(config)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/
package io.lenses.streamreactor.connect.cassandra

import io.lenses.streamreactor.common.config.SSLConfig
import io.lenses.streamreactor.common.config.SSLConfigContext
import io.lenses.streamreactor.connect.cassandra.config.CassandraConfigConstants
import io.lenses.streamreactor.connect.cassandra.config.SSLConfig
import io.lenses.streamreactor.connect.cassandra.config.SSLConfigContext
import io.lenses.streamreactor.connect.cassandra.config.LoadBalancingPolicy
import com.datastax.driver.core.Cluster.Builder
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.cassandra.config

import java.io.FileInputStream
import java.security.KeyStore
import java.security.SecureRandom
import javax.net.ssl._

/**
* Created by [email protected] on 14/04/16.
* stream-reactor
*/
object SSLConfigContext {
def apply(config: SSLConfig): SSLContext =
getSSLContext(config)

/**
* Get a SSL Connect for a given set of credentials
*
* @param config An SSLConfig containing key and truststore credentials
* @return a SSLContext
*/
def getSSLContext(config: SSLConfig): SSLContext = {
val useClientCertAuth = config.useClientCert

//is client certification authentication set
val keyManagers: Array[KeyManager] = if (useClientCertAuth) {
getKeyManagers(config)
} else {
Array[KeyManager]()
}

val ctx: SSLContext = SSLContext.getInstance("SSL")
val trustManagers = getTrustManagers(config)
ctx.init(keyManagers, trustManagers, new SecureRandom())
ctx
}

/**
* Get an array of Trust Managers
*
* @param config An SSLConfig containing key and truststore credentials
* @return An Array of TrustManagers
*/
def getTrustManagers(config: SSLConfig): Array[TrustManager] = {
val tsf = new FileInputStream(config.trustStorePath)
val ts = KeyStore.getInstance(config.trustStoreType)
ts.load(tsf, config.trustStorePass.toCharArray)
val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
tmf.init(ts)
tmf.getTrustManagers
}

/**
* Get an array of Key Managers
*
* @param config An SSLConfig containing key and truststore credentials
* @return An Array of KeyManagers
*/
def getKeyManagers(config: SSLConfig): Array[KeyManager] = {
require(config.keyStorePath.nonEmpty, "Key store path is not set!")
require(config.keyStorePass.nonEmpty, "Key store password is not set!")
val ksf = new FileInputStream(config.keyStorePath.get)
val ks = KeyStore.getInstance(config.keyStoreType)
ks.load(ksf, config.keyStorePass.get.toCharArray)
val kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
kmf.init(ks, config.keyStorePass.get.toCharArray)
kmf.getKeyManagers
}

}

/**
* Class for holding key and truststore settings
*/
case class SSLConfig(
trustStorePath: String,
trustStorePass: String,
keyStorePath: Option[String],
keyStorePass: Option[String],
useClientCert: Boolean = false,
keyStoreType: String = "JKS",
trustStoreType: String = "JKS",
)
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
*/
package io.lenses.streamreactor.connect.cassandra.sink

import cats.implicits.toBifunctorOps
import com.typesafe.scalalogging.StrictLogging
import io.lenses.streamreactor.common.config.Helpers
import io.lenses.streamreactor.common.utils.JarManifest

import java.util
import io.lenses.streamreactor.connect.cassandra.config.CassandraConfigConstants
import io.lenses.streamreactor.connect.cassandra.config.CassandraConfigSink
import com.typesafe.scalalogging.StrictLogging
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.connect.connector.Task
import org.apache.kafka.connect.errors.ConnectException
import org.apache.kafka.connect.sink.SinkConnector

import java.util
import scala.jdk.CollectionConverters.MapHasAsScala
import scala.jdk.CollectionConverters.SeqHasAsJava
import scala.util.Failure
Expand Down Expand Up @@ -66,7 +66,7 @@ class CassandraSinkConnector extends SinkConnector with StrictLogging {
*/
override def start(props: util.Map[String, String]): Unit = {
//check input topics
Helpers.checkInputTopics(CassandraConfigConstants.KCQL, props.asScala.toMap)
Helpers.checkInputTopics(CassandraConfigConstants.KCQL, props.asScala.toMap).leftMap(throw _)
configProps = props
Try(new CassandraConfigSink(props.asScala.toMap)) match {
case Failure(f) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.cassandra.config

import org.scalatest.BeforeAndAfter
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import javax.net.ssl.KeyManager
import javax.net.ssl.SSLContext
import javax.net.ssl.TrustManager

/**
* Created by [email protected] on 19/04/16.
* stream-reactor
*/
class TestSSLConfigContext extends AnyWordSpec with Matchers with BeforeAndAfter {
var sslConfig: SSLConfig = null
var sslConfigNoClient: SSLConfig = null

before {
val trustStorePath = getClass.getResource("/stc_truststore.jks").getPath
val keystorePath = getClass.getResource("/stc_keystore.jks").getPath
val trustStorePassword = "erZHDS9Eo0CcNo"
val keystorePassword = "8yJQLUnGkwZxOw"
sslConfig = SSLConfig(trustStorePath, trustStorePassword, Some(keystorePath), Some(keystorePassword), true)
sslConfigNoClient = SSLConfig(trustStorePath, trustStorePassword, Some(keystorePath), Some(keystorePassword), false)
}

"SSLConfigContext" should {
"should return an Array of KeyManagers" in {
val keyManagers = SSLConfigContext.getKeyManagers(sslConfig)
keyManagers.length shouldBe 1
val entry = keyManagers.head
entry shouldBe a[KeyManager]
}

"should return an Array of TrustManagers" in {
val trustManager = SSLConfigContext.getTrustManagers(sslConfig)
trustManager.length shouldBe 1
val entry = trustManager.head
entry shouldBe a[TrustManager]
}

"should return a SSLContext" in {
val context = SSLConfigContext(sslConfig)
context.getProtocol shouldBe "SSL"
context shouldBe a[SSLContext]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.lenses.streamreactor.common.config

import cats.implicits.catsSyntaxEitherId
import io.lenses.kcql.Kcql
import com.typesafe.scalalogging.StrictLogging
import org.apache.kafka.common.config.ConfigException
Expand All @@ -26,32 +27,32 @@ import org.apache.kafka.common.config.ConfigException

object Helpers extends StrictLogging {

def checkInputTopics(kcqlConstant: String, props: Map[String, String]): Boolean = {
def checkInputTopics(kcqlConstant: String, props: Map[String, String]): Either[Throwable, Unit] = {
val topics = props("topics").split(",").map(t => t.trim).toSet
val raw = props(kcqlConstant)
if (raw.isEmpty) {
throw new ConfigException(s"Missing $kcqlConstant")
return new ConfigException(s"Missing $kcqlConstant").asLeft
}
val kcql = raw.split(";").map(r => Kcql.parse(r)).toSet
val sources = kcql.map(k => k.getSource)
val res = topics.subsetOf(sources)

if (!res) {
val missing = topics.diff(sources)
throw new ConfigException(
return new ConfigException(
s"Mandatory `topics` configuration contains topics not set in $kcqlConstant: ${missing}, kcql contains $sources",
)
).asLeft
}

val res1 = sources.subsetOf(topics)

if (!res1) {
val missing = topics.diff(sources)
throw new ConfigException(
return new ConfigException(
s"$kcqlConstant configuration contains topics not set in mandatory `topic` configuration: ${missing}, kcql contains $sources",
)
).asLeft
}

true
().asRight
}
}
Loading

0 comments on commit 5832ac9

Please sign in to comment.