Skip to content

Commit

Permalink
* Splits Elastic module into elastic-common and elastic8
Browse files Browse the repository at this point in the history
* Includes refactoring of all connectors to ensure that Maps are passed around and util.Maps are only used on entry and exit from scala code and converted soon after
* Adding Opensearch module
* Adding Opensearch unit tests
* Opensearch SSL Test - needs completing
* Replace Elastic6+7 with Elastic8

Removing deleted modules

Source package renamer

Package rename

Dir rename
  • Loading branch information
davidsloan committed Nov 29, 2023
1 parent e2e9adb commit 4e0c353
Show file tree
Hide file tree
Showing 216 changed files with 3,511 additions and 4,703 deletions.
51 changes: 38 additions & 13 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import Dependencies.globalExcludeDeps
import Dependencies.gson
import Dependencies.bouncyCastle

import Settings._
import sbt.Keys.libraryDependencies
Expand All @@ -18,8 +19,9 @@ lazy val subProjects: Seq[Project] = Seq(
`azure-documentdb`,
`azure-datalake`,
cassandra,
elastic6,
elastic7,
`elastic-common`,
opensearch,
elastic8,
ftp,
`gcp-storage`,
influxdb,
Expand Down Expand Up @@ -201,17 +203,16 @@ lazy val cassandra = (project in file("kafka-connect-cassandra"))
.configureFunctionalTests()
.enablePlugins(PackPlugin)

lazy val elastic6 = (project in file("kafka-connect-elastic6"))
lazy val `elastic-common` = (project in file("kafka-connect-elastic-common"))
.dependsOn(common)
.dependsOn(`test-common` % "fun->compile")
.settings(
settings ++
Seq(
name := "kafka-connect-elastic6",
name := "kafka-connect-elastic-common",
description := "Kafka Connect compatible connectors to move data between Kafka and popular data stores",
libraryDependencies ++= baseDeps ++ kafkaConnectElastic6Deps,
libraryDependencies ++= baseDeps ++ kafkaConnectElasticBaseDeps,
publish / skip := true,
FunctionalTest / baseDirectory := (LocalRootProject / baseDirectory).value,
packExcludeJars := Seq(
"scala-.*\\.jar",
"zookeeper-.*\\.jar",
Expand All @@ -220,19 +221,20 @@ lazy val elastic6 = (project in file("kafka-connect-elastic6"))
)
.configureAssembly(true)
.configureTests(baseTestDeps)
.configureIntegrationTests(kafkaConnectElastic6TestDeps)
.configureIntegrationTests(kafkaConnectElastic8TestDeps)
.configureFunctionalTests()
.enablePlugins(PackPlugin)
.disablePlugins(PackPlugin)

lazy val elastic7 = (project in file("kafka-connect-elastic7"))
lazy val elastic8 = (project in file("kafka-connect-elastic8"))
.dependsOn(common)
.dependsOn(`test-common` % "fun->compile")
.dependsOn(`elastic-common`)
.dependsOn(`test-common` % "fun->compile;it->compile")
.settings(
settings ++
Seq(
name := "kafka-connect-elastic7",
name := "kafka-connect-elastic8",
description := "Kafka Connect compatible connectors to move data between Kafka and popular data stores",
libraryDependencies ++= baseDeps ++ kafkaConnectElastic7Deps,
libraryDependencies ++= baseDeps ++ kafkaConnectElastic8Deps,
publish / skip := true,
packExcludeJars := Seq(
"scala-.*\\.jar",
Expand All @@ -242,10 +244,33 @@ lazy val elastic7 = (project in file("kafka-connect-elastic7"))
)
.configureAssembly(true)
.configureTests(baseTestDeps)
.configureIntegrationTests(kafkaConnectElastic7TestDeps)
.configureIntegrationTests(kafkaConnectElastic8TestDeps)
.configureFunctionalTests()
.enablePlugins(PackPlugin)

lazy val opensearch = (project in file("kafka-connect-opensearch"))
.dependsOn(common)
.dependsOn(`elastic-common`)
.dependsOn(`test-common` % "fun->compile;it->compile")
.settings(
settings ++
Seq(
name := "kafka-connect-opensearch",
description := "Kafka Connect compatible connectors to move data between Kafka and popular data stores",
libraryDependencies ++= baseDeps ++ kafkaConnectOpenSearchDeps,
publish / skip := true,
packExcludeJars := Seq(
"scala-.*\\.jar",
"zookeeper-.*\\.jar",
),
),
)
.configureAssembly(false)
.configureTests(baseTestDeps)
//.configureIntegrationTests(kafkaConnectOpenSearchTestDeps)
.configureFunctionalTests(bouncyCastle)
.enablePlugins(PackPlugin)

lazy val influxdb = (project in file("kafka-connect-influxdb"))
.dependsOn(common)
.settings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import io.lenses.streamreactor.connect.cloud.common.sink.CloudSinkTask
import io.lenses.streamreactor.connect.cloud.common.sink.WriterManagerCreator
import io.lenses.streamreactor.connect.cloud.common.sink.writer.WriterManager

import scala.jdk.CollectionConverters.MapHasAsJava
import scala.util.Try

object S3SinkTask {}
Expand All @@ -46,7 +45,7 @@ class S3SinkTask

def createWriterMan(props: Map[String, String]): Either[Throwable, WriterManager[S3FileMetadata]] =
for {
config <- S3SinkConfig.fromProps(props.asJava)
config <- S3SinkConfig.fromProps(props)
s3Client <- AwsS3ClientCreator.make(config.s3Config)
storageInterface = new AwsS3StorageInterface(connectorTaskId, s3Client, config.batchDelete)
_ <- Try(setErrorRetryInterval(config.s3Config)).toEither
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@ import io.lenses.streamreactor.connect.cloud.common.sink.config.CloudSinkBucketO
import io.lenses.streamreactor.connect.cloud.common.sink.config.CloudSinkConfig
import io.lenses.streamreactor.connect.cloud.common.sink.config.OffsetSeekerOptions

import java.util

object S3SinkConfig {

def fromProps(
props: util.Map[String, String],
props: Map[String, String],
)(
implicit
connectorTaskId: ConnectorTaskId,
Expand All @@ -51,7 +49,7 @@ object S3SinkConfig {
s3ConfigDefBuilder.getInt(SEEK_MAX_INDEX_FILES),
)
} yield S3SinkConfig(
S3Config(s3ConfigDefBuilder.getParsedValues),
S3Config(s3ConfigDefBuilder.props),
sinkBucketOptions,
offsetSeekerOptions,
s3ConfigDefBuilder.getCompressionCodec(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,12 @@ import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings
import io.lenses.streamreactor.connect.cloud.common.config.CompressionCodecSettings
import io.lenses.streamreactor.connect.cloud.common.sink.config.CloudSinkConfigDefBuilder

import java.util
import scala.jdk.CollectionConverters.MapHasAsScala

case class S3SinkConfigDefBuilder(props: util.Map[String, String])
case class S3SinkConfigDefBuilder(props: Map[String, String])
extends BaseConfig(S3ConfigSettings.CONNECTOR_PREFIX, S3SinkConfigDef.config, props)
with CloudSinkConfigDefBuilder
with ErrorPolicySettings
with NumberRetriesSettings
with UserSettings
with ConnectionSettings
with CompressionCodecSettings
with DeleteModeSettings {

def getParsedValues: Map[String, _] = values().asScala.toMap

}
with DeleteModeSettings {}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class S3SourceTask extends SourceTask with LazyLogging {
logger.debug(s"Received call to S3SourceTask.start with ${props.size()} properties")

val contextProperties = Option(context).flatMap(c => Option(c.configs()).map(_.asScala.toMap)).getOrElse(Map.empty)
val mergedProperties = MapUtils.mergeProps(contextProperties, props.asScala.toMap).asJava
val mergedProperties = MapUtils.mergeProps(contextProperties, props.asScala.toMap)
(for {
result <- S3SourceState.make(mergedProperties, contextOffsetFn)
fiber <- result.partitionDiscoveryLoop.start
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,17 @@ import io.lenses.streamreactor.connect.cloud.common.storage.ListOfKeysResponse
import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface
import io.lenses.streamreactor.connect.config.kcqlprops.KcqlProperties

import java.util
import scala.util.Try

object S3SourceConfig {

def fromProps(
props: util.Map[String, String],
props: Map[String, String],
): Either[Throwable, S3SourceConfig] =
S3SourceConfig(S3SourceConfigDefBuilder(props))

def apply(s3ConfigDefBuilder: S3SourceConfigDefBuilder): Either[Throwable, S3SourceConfig] = {
val parsedValues = s3ConfigDefBuilder.getParsedValues
val parsedValues = s3ConfigDefBuilder.props
for {
sbo <- SourceBucketOptions(
s3ConfigDefBuilder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ import io.lenses.streamreactor.connect.aws.s3.config.DeleteModeSettings
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings
import io.lenses.streamreactor.connect.cloud.common.config.CompressionCodecSettings

import java.util
import scala.jdk.CollectionConverters.MapHasAsScala

case class S3SourceConfigDefBuilder(props: util.Map[String, String])
case class S3SourceConfigDefBuilder(props: Map[String, String])
extends BaseConfig(S3ConfigSettings.CONNECTOR_PREFIX, S3SourceConfigDef.config, props)
with KcqlSettings
with ErrorPolicySettings
Expand All @@ -32,8 +29,4 @@ case class S3SourceConfigDefBuilder(props: util.Map[String, String])
with ConnectionSettings
with CompressionCodecSettings
with SourcePartitionSearcherSettings
with DeleteModeSettings {

def getParsedValues: Map[String, _] = values().asScala.toMap

}
with DeleteModeSettings {}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@ import io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager
import io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManagerState
import io.lenses.streamreactor.connect.cloud.common.source.state.CloudSourceTaskState

import java.util
import scala.jdk.CollectionConverters.IteratorHasAsScala

object S3SourceState extends StrictLogging {
def make(
props: util.Map[String, String],
props: Map[String, String],
contextOffsetFn: CloudLocation => Option[CloudLocation],
)(
implicit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.prop.TableDrivenPropertyChecks._

import scala.jdk.CollectionConverters.MapHasAsJava

class DeleteModeSettingsTest extends AnyFlatSpec with Matchers with LazyLogging {
private val deleteModeMap = Table[String, String, Boolean](
("testName", "value", "expected"),
Expand All @@ -36,7 +34,7 @@ class DeleteModeSettingsTest extends AnyFlatSpec with Matchers with LazyLogging
S3SinkConfigDefBuilder(Map(
"connect.s3.kcql" -> "abc",
"connect.s3.delete.mode" -> value,
).asJava).batchDelete() should be(expected)
)).batchDelete() should be(expected)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.scalatest.matchers.should.Matchers

import scala.concurrent.duration.DurationInt
import scala.jdk.CollectionConverters.IteratorHasAsScala
import scala.jdk.CollectionConverters.MapHasAsJava

class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matchers with EitherValues {

Expand All @@ -48,7 +47,7 @@ class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName PARTITIONBY _key STOREAS `CSV` WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1",
)

val kcql = S3SinkConfigDefBuilder(props.asJava).getKCQL
val kcql = S3SinkConfigDefBuilder(props).getKCQL
kcql should have size 1

val element = kcql.head
Expand All @@ -65,7 +64,7 @@ class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with
"connect.s3.kcql" -> s"insert into mybucket:myprefix select * from $TopicName PARTITIONBY _key STOREAS CSV WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1",
)

CloudSinkBucketOptions(S3SinkConfigDefBuilder(props.asJava)) match {
CloudSinkBucketOptions(S3SinkConfigDefBuilder(props)) match {
case Left(value) => fail(value.toString)
case Right(value) => value.map(_.dataStorage) should be(List(DataStorageSettings.Default))
}
Expand All @@ -76,7 +75,7 @@ class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with
"connect.s3.kcql" -> s"insert into mybucket:myprefix select * from $TopicName PARTITIONBY _key STOREAS `JSON` WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true)",
)

config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props.asJava)) match {
config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props)) match {
case Left(value) => fail(value.toString)
case Right(value) => value.map(_.dataStorage) should be(List(DataStorageSettings.enabled))
}
Expand All @@ -87,7 +86,7 @@ class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with
"connect.s3.kcql" -> s"insert into mybucket:myprefix select * from $TopicName PARTITIONBY _key STOREAS `PARQUET` WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true, '${DataStorageSettings.StoreKeyKey}'=true, '${DataStorageSettings.StoreValueKey}'=true, '${DataStorageSettings.StoreMetadataKey}'=false, '${DataStorageSettings.StoreHeadersKey}'=false)",
)

config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props.asJava)) match {
config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props)) match {
case Left(value) => fail(value.toString)
case Right(value) =>
value.map(_.dataStorage) should be(List(DataStorageSettings(true, true, true, false, false)))
Expand Down Expand Up @@ -116,7 +115,7 @@ class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with
|""".stripMargin,
)

config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props.asJava)) match {
config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props)) match {
case Left(value) => fail(value.toString)
case Right(value) =>
value.map(_.dataStorage) should be(List(DataStorageSettings(true, true, true, false, false),
Expand All @@ -131,7 +130,7 @@ class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with
)

val commitPolicy =
S3SinkConfigDefBuilder(props.asJava).commitPolicy(S3SinkConfigDefBuilder(props.asJava).getKCQL.head)
S3SinkConfigDefBuilder(props).commitPolicy(S3SinkConfigDefBuilder(props).getKCQL.head)

commitPolicy.conditions should be(
Seq(
Expand All @@ -149,7 +148,7 @@ class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with
)

val commitPolicy =
S3SinkConfigDefBuilder(props.asJava).commitPolicy(S3SinkConfigDefBuilder(props.asJava).getKCQL.head)
S3SinkConfigDefBuilder(props).commitPolicy(S3SinkConfigDefBuilder(props).getKCQL.head)

commitPolicy.conditions should be(
Seq(
Expand All @@ -165,7 +164,7 @@ class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with
)

val commitPolicy =
S3SinkConfigDefBuilder(props.asJava).commitPolicy(S3SinkConfigDefBuilder(props.asJava).getKCQL.head)
S3SinkConfigDefBuilder(props).commitPolicy(S3SinkConfigDefBuilder(props).getKCQL.head)

commitPolicy.conditions should be(
Seq(
Expand All @@ -181,7 +180,7 @@ class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName BATCH = 150 STOREAS `CSV` LIMIT 550",
)

val kcql = S3SinkConfigDefBuilder(props.asJava).getKCQL
val kcql = S3SinkConfigDefBuilder(props).getKCQL

kcql.head.getBatchSize should be(150)
kcql.head.getLimit should be(550)
Expand All @@ -192,7 +191,7 @@ class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `JSON` WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true, '${DataStorageSettings.StoreKeyKey}'=true, '${DataStorageSettings.StoreValueKey}'=true, '${DataStorageSettings.StoreMetadataKey}'=false, '${DataStorageSettings.StoreHeadersKey}'=false)",
)

config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props.asJava)) match {
config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props)) match {
case Left(value) => fail(value.toString)
case Right(value) =>
value.map(_.dataStorage) should be(List(DataStorageSettings(envelope = true,
Expand All @@ -209,7 +208,7 @@ class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `JSON` WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true, '${DataStorageSettings.StoreKeyKey}'=true, '${DataStorageSettings.StoreValueKey}'=true, '${DataStorageSettings.StoreMetadataKey}'=false, '${DataStorageSettings.StoreHeadersKey}'=false)",
)

config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props.asJava)) match {
config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props)) match {
case Left(value) => fail(value.toString)
case Right(value) =>
value.map(_.dataStorage) should be(List(DataStorageSettings(envelope = true,
Expand All @@ -226,7 +225,7 @@ class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `BYTES_VALUEONLY` WITH_FLUSH_COUNT = 1",
)

config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props.asJava)).left.value.getMessage should startWith(
config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props)).left.value.getMessage should startWith(
"Unsupported format - BYTES_VALUEONLY. Please note",
)
}
Expand All @@ -236,7 +235,7 @@ class S3S3S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `BYTES` WITH_FLUSH_COUNT = 3",
)

config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props.asJava)).left.value.getMessage should startWith(
config.CloudSinkBucketOptions(S3SinkConfigDefBuilder(props)).left.value.getMessage should startWith(
"FLUSH_COUNT > 1 is not allowed for BYTES",
)
}
Expand Down
Loading

0 comments on commit 4e0c353

Please sign in to comment.