Skip to content

Commit

Permalink
* Splits Elastic module into elastic-common and elastic8
Browse files Browse the repository at this point in the history
* Includes refactoring of all connectors to ensure that Maps are passed around and util.Maps are only used on entry and exit from scala code and converted soon after
* Adding Opensearch module
* Adding Opensearch unit tests
* Opensearch SSL Test - needs completing
* Replace Elastic6+7 with Elastic8

Removing deleted modules

Source package renamer

Package rename

Dir rename
  • Loading branch information
davidsloan committed Feb 21, 2024
1 parent 7d8f2dc commit bfbb26c
Show file tree
Hide file tree
Showing 135 changed files with 3,171 additions and 4,143 deletions.
52 changes: 39 additions & 13 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import Dependencies.globalExcludeDeps
import Dependencies.gson
import Dependencies.bouncyCastle

import Settings.*
import sbt.Keys.libraryDependencies
import sbt.*
Expand All @@ -17,8 +19,9 @@ lazy val subProjects: Seq[Project] = Seq(
`azure-documentdb`,
`azure-datalake`,
cassandra,
elastic6,
elastic7,
`elastic-common`,
opensearch,
elastic8,
ftp,
`gcp-storage`,
http,
Expand Down Expand Up @@ -201,17 +204,16 @@ lazy val cassandra = (project in file("kafka-connect-cassandra"))
.configureFunctionalTests()
.enablePlugins(PackPlugin)

lazy val elastic6 = (project in file("kafka-connect-elastic6"))
lazy val `elastic-common` = (project in file("kafka-connect-elastic-common"))
.dependsOn(common)
.dependsOn(`test-common` % "fun->compile")
.settings(
settings ++
Seq(
name := "kafka-connect-elastic6",
name := "kafka-connect-elastic-common",
description := "Kafka Connect compatible connectors to move data between Kafka and popular data stores",
libraryDependencies ++= baseDeps ++ kafkaConnectElastic6Deps,
libraryDependencies ++= baseDeps ++ kafkaConnectElasticBaseDeps,
publish / skip := true,
FunctionalTest / baseDirectory := (LocalRootProject / baseDirectory).value,
packExcludeJars := Seq(
"scala-.*\\.jar",
"zookeeper-.*\\.jar",
Expand All @@ -220,19 +222,20 @@ lazy val elastic6 = (project in file("kafka-connect-elastic6"))
)
.configureAssembly(true)
.configureTests(baseTestDeps)
.configureIntegrationTests(kafkaConnectElastic6TestDeps)
.configureIntegrationTests(kafkaConnectElastic8TestDeps)
.configureFunctionalTests()
.enablePlugins(PackPlugin)
.disablePlugins(PackPlugin)

lazy val elastic7 = (project in file("kafka-connect-elastic7"))
lazy val elastic8 = (project in file("kafka-connect-elastic8"))
.dependsOn(common)
.dependsOn(`test-common` % "fun->compile")
.dependsOn(`elastic-common`)
.dependsOn(`test-common` % "fun->compile;it->compile")
.settings(
settings ++
Seq(
name := "kafka-connect-elastic7",
name := "kafka-connect-elastic8",
description := "Kafka Connect compatible connectors to move data between Kafka and popular data stores",
libraryDependencies ++= baseDeps ++ kafkaConnectElastic7Deps,
libraryDependencies ++= baseDeps ++ kafkaConnectElastic8Deps,
publish / skip := true,
packExcludeJars := Seq(
"scala-.*\\.jar",
Expand All @@ -242,10 +245,33 @@ lazy val elastic7 = (project in file("kafka-connect-elastic7"))
)
.configureAssembly(true)
.configureTests(baseTestDeps)
.configureIntegrationTests(kafkaConnectElastic7TestDeps)
.configureIntegrationTests(kafkaConnectElastic8TestDeps)
.configureFunctionalTests()
.enablePlugins(PackPlugin)

lazy val opensearch = (project in file("kafka-connect-opensearch"))
.dependsOn(common)
.dependsOn(`elastic-common`)
.dependsOn(`test-common` % "fun->compile;it->compile")
.settings(
settings ++
Seq(
name := "kafka-connect-opensearch",
description := "Kafka Connect compatible connectors to move data between Kafka and popular data stores",
libraryDependencies ++= baseDeps ++ kafkaConnectOpenSearchDeps,
publish / skip := true,
packExcludeJars := Seq(
"scala-.*\\.jar",
"zookeeper-.*\\.jar",
),
),
)
.configureAssembly(false)
.configureTests(baseTestDeps)
//.configureIntegrationTests(kafkaConnectOpenSearchTestDeps)
.configureFunctionalTests(bouncyCastle)
.enablePlugins(PackPlugin)

lazy val http = (project in file("kafka-connect-http"))
.dependsOn(common)
//.dependsOn(`test-common` % "fun->compile")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object S3SinkConfig {
s3ConfigDefBuilder.getInt(SEEK_MAX_INDEX_FILES),
)
} yield S3SinkConfig(
S3Config(s3ConfigDefBuilder.getParsedValues),
S3Config(s3ConfigDefBuilder.props),
sinkBucketOptions,
offsetSeekerOptions,
s3ConfigDefBuilder.getCompressionCodec(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
* Copyright 2017-2023 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,19 +18,15 @@ package io.lenses.streamreactor.connect.aws.s3.sink.config
import io.lenses.streamreactor.common.config.base.traits._
import io.lenses.streamreactor.connect.aws.s3.config.DeleteModeSettings
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings
import io.lenses.streamreactor.connect.cloud.common.config.CompressionCodecSettings
import io.lenses.streamreactor.connect.cloud.common.sink.config.CloudSinkConfigDefBuilder

import scala.jdk.CollectionConverters.MapHasAsScala

case class S3SinkConfigDefBuilder(props: Map[String, String])
extends BaseConfig(S3ConfigSettings.CONNECTOR_PREFIX, S3SinkConfigDef.config, props)
with CloudSinkConfigDefBuilder
with ErrorPolicySettings
with NumberRetriesSettings
with UserSettings
with ConnectionSettings
with DeleteModeSettings {

def getParsedValues: Map[String, _] = values().asScala.toMap

}
with CompressionCodecSettings
with DeleteModeSettings {}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
* Copyright 2017-2023 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,9 +63,8 @@ class S3SourceTask extends SourceTask with LazyLogging {

logger.debug(s"Received call to S3SourceTask.start with ${props.size()} properties")

val contextProperties: Map[String, String] =
Option(context).flatMap(c => Option(c.configs()).map(_.asScala.toMap)).getOrElse(Map.empty)
val mergedProperties: Map[String, String] = MapUtils.mergeProps(contextProperties, props.asScala.toMap)
val contextProperties = Option(context).flatMap(c => Option(c.configs()).map(_.asScala.toMap)).getOrElse(Map.empty)
val mergedProperties = MapUtils.mergeProps(contextProperties, props.asScala.toMap)
(for {
result <- S3SourceState.make(mergedProperties, contextOffsetFn)
fiber <- result.partitionDiscoveryLoop.start
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object S3SourceConfig {
S3SourceConfig(S3SourceConfigDefBuilder(props))

def apply(s3ConfigDefBuilder: S3SourceConfigDefBuilder): Either[Throwable, S3SourceConfig] = {
val parsedValues = s3ConfigDefBuilder.getParsedValues
val parsedValues = s3ConfigDefBuilder.props
for {
sbo <- SourceBucketOptions(
s3ConfigDefBuilder,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
* Copyright 2017-2023 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,8 +20,6 @@ import io.lenses.streamreactor.connect.aws.s3.config.DeleteModeSettings
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings
import io.lenses.streamreactor.connect.cloud.common.config.CompressionCodecSettings

import scala.jdk.CollectionConverters.MapHasAsScala

case class S3SourceConfigDefBuilder(props: Map[String, String])
extends BaseConfig(S3ConfigSettings.CONNECTOR_PREFIX, S3SourceConfigDef.config, props)
with KcqlSettings
Expand All @@ -31,8 +29,4 @@ case class S3SourceConfigDefBuilder(props: Map[String, String])
with ConnectionSettings
with CompressionCodecSettings
with SourcePartitionSearcherSettings
with DeleteModeSettings {

def getParsedValues: Map[String, _] = values().asScala.toMap

}
with DeleteModeSettings {}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object DatalakeSinkConfig {
s3ConfigDefBuilder.getInt(SEEK_MAX_INDEX_FILES),
)
} yield DatalakeSinkConfig(
AzureConfig(s3ConfigDefBuilder.getParsedValues, authMode),
AzureConfig(s3ConfigDefBuilder.props, authMode),
sinkBucketOptions,
offsetSeekerOptions,
s3ConfigDefBuilder.getCompressionCodec(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
* Copyright 2017-2023 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,21 +16,17 @@
package io.lenses.streamreactor.connect.datalake.sink.config

import io.lenses.streamreactor.common.config.base.traits._
import io.lenses.streamreactor.connect.cloud.common.config.CompressionCodecSettings
import io.lenses.streamreactor.connect.cloud.common.sink.config.CloudSinkConfigDefBuilder
import io.lenses.streamreactor.connect.datalake.config.AuthModeSettings
import io.lenses.streamreactor.connect.datalake.config.AzureConfigSettings

import scala.jdk.CollectionConverters.MapHasAsScala

case class DatalakeSinkConfigDefBuilder(props: Map[String, String])
extends BaseConfig(AzureConfigSettings.CONNECTOR_PREFIX, DatalakeSinkConfigDef.config, props)
with CloudSinkConfigDefBuilder
with ErrorPolicySettings
with NumberRetriesSettings
with UserSettings
with ConnectionSettings
with AuthModeSettings {

def getParsedValues: Map[String, _] = values().asScala.toMap

}
with CompressionCodecSettings
with AuthModeSettings {}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.lenses.streamreactor.connect.azure.documentdb.sink

import cats.implicits.toBifunctorOps
import io.lenses.streamreactor.common.config.Helpers
import io.lenses.streamreactor.common.utils.JarManifest
import io.lenses.streamreactor.connect.azure.documentdb.DocumentClientProvider
Expand Down Expand Up @@ -100,7 +101,7 @@ class DocumentDbSinkConnector private[sink] (builder: DocumentDbSinkSettings =>
configProps = props

//check input topics
Helpers.checkInputTopics(DocumentDbConfigConstants.KCQL_CONFIG, props.asScala.toMap)
Helpers.checkInputTopics(DocumentDbConfigConstants.KCQL_CONFIG, props.asScala.toMap).leftMap(throw _)

val settings = DocumentDbSinkSettings(config)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/
package io.lenses.streamreactor.connect.cassandra

import io.lenses.streamreactor.common.config.SSLConfig
import io.lenses.streamreactor.common.config.SSLConfigContext
import io.lenses.streamreactor.connect.cassandra.config.CassandraConfigConstants
import io.lenses.streamreactor.connect.cassandra.config.SSLConfig
import io.lenses.streamreactor.connect.cassandra.config.SSLConfigContext
import io.lenses.streamreactor.connect.cassandra.config.LoadBalancingPolicy
import com.datastax.driver.core.Cluster.Builder
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2017-2023 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.cassandra.config

import java.io.FileInputStream
import java.security.KeyStore
import java.security.SecureRandom
import javax.net.ssl._

/**
* Created by [email protected] on 14/04/16.
* stream-reactor
*/
object SSLConfigContext {
def apply(config: SSLConfig): SSLContext =
getSSLContext(config)

/**
* Get a SSL Connect for a given set of credentials
*
* @param config An SSLConfig containing key and truststore credentials
* @return a SSLContext
*/
def getSSLContext(config: SSLConfig): SSLContext = {
val useClientCertAuth = config.useClientCert

//is client certification authentication set
val keyManagers: Array[KeyManager] = if (useClientCertAuth) {
getKeyManagers(config)
} else {
Array[KeyManager]()
}

val ctx: SSLContext = SSLContext.getInstance("SSL")
val trustManagers = getTrustManagers(config)
ctx.init(keyManagers, trustManagers, new SecureRandom())
ctx
}

/**
* Get an array of Trust Managers
*
* @param config An SSLConfig containing key and truststore credentials
* @return An Array of TrustManagers
*/
def getTrustManagers(config: SSLConfig): Array[TrustManager] = {
val tsf = new FileInputStream(config.trustStorePath)
val ts = KeyStore.getInstance(config.trustStoreType)
ts.load(tsf, config.trustStorePass.toCharArray)
val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
tmf.init(ts)
tmf.getTrustManagers
}

/**
* Get an array of Key Managers
*
* @param config An SSLConfig containing key and truststore credentials
* @return An Array of KeyManagers
*/
def getKeyManagers(config: SSLConfig): Array[KeyManager] = {
require(config.keyStorePath.nonEmpty, "Key store path is not set!")
require(config.keyStorePass.nonEmpty, "Key store password is not set!")
val ksf = new FileInputStream(config.keyStorePath.get)
val ks = KeyStore.getInstance(config.keyStoreType)
ks.load(ksf, config.keyStorePass.get.toCharArray)
val kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
kmf.init(ks, config.keyStorePass.get.toCharArray)
kmf.getKeyManagers
}

}

/**
* Class for holding key and truststore settings
*/
case class SSLConfig(
trustStorePath: String,
trustStorePass: String,
keyStorePath: Option[String],
keyStorePass: Option[String],
useClientCert: Boolean = false,
keyStoreType: String = "JKS",
trustStoreType: String = "JKS",
)
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
*/
package io.lenses.streamreactor.connect.cassandra.sink

import cats.implicits.toBifunctorOps
import com.typesafe.scalalogging.StrictLogging
import io.lenses.streamreactor.common.config.Helpers
import io.lenses.streamreactor.common.utils.JarManifest

import java.util
import io.lenses.streamreactor.connect.cassandra.config.CassandraConfigConstants
import io.lenses.streamreactor.connect.cassandra.config.CassandraConfigSink
import com.typesafe.scalalogging.StrictLogging
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.connect.connector.Task
import org.apache.kafka.connect.errors.ConnectException
import org.apache.kafka.connect.sink.SinkConnector

import java.util
import scala.jdk.CollectionConverters.MapHasAsScala
import scala.jdk.CollectionConverters.SeqHasAsJava
import scala.util.Failure
Expand Down Expand Up @@ -66,7 +66,7 @@ class CassandraSinkConnector extends SinkConnector with StrictLogging {
*/
override def start(props: util.Map[String, String]): Unit = {
//check input topics
Helpers.checkInputTopics(CassandraConfigConstants.KCQL, props.asScala.toMap)
Helpers.checkInputTopics(CassandraConfigConstants.KCQL, props.asScala.toMap).leftMap(throw _)
configProps = props
Try(new CassandraConfigSink(props.asScala.toMap)) match {
case Failure(f) =>
Expand Down
Loading

0 comments on commit bfbb26c

Please sign in to comment.