Skip to content

Commit

Permalink
* Splits Elastic module into elastic-common and elastic8
Browse files Browse the repository at this point in the history
* Includes refactoring of all connectors to ensure that Maps are passed around and util.Maps are only used on entry and exit from scala code and converted soon after
* Adding Opensearch module
* Adding Opensearch unit tests
* Opensearch SSL Test - needs completing
  • Loading branch information
davidsloan committed Sep 29, 2023
1 parent d594a41 commit aea66a8
Show file tree
Hide file tree
Showing 202 changed files with 3,283 additions and 1,665 deletions.
51 changes: 49 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import Dependencies.globalExcludeDeps
import Dependencies.gson
import Dependencies.bouncyCastle

import Settings._
import sbt.Keys.libraryDependencies
Expand All @@ -16,8 +17,8 @@ lazy val subProjects: Seq[Project] = Seq(
`aws-s3`,
`azure-documentdb`,
cassandra,
//elastic6,
//elastic7,
`elastic-common`,
opensearch,
elastic8,
ftp,
hazelcast,
Expand Down Expand Up @@ -135,8 +136,31 @@ lazy val cassandra = (project in file("kafka-connect-cassandra"))
.configureFunctionalTests()
.enablePlugins(PackPlugin)

lazy val `elastic-common` = (project in file("kafka-connect-elastic-common"))
.dependsOn(common)
.dependsOn(`test-common` % "fun->compile")
.settings(
settings ++
Seq(
name := "kafka-connect-elastic-common",
description := "Kafka Connect compatible connectors to move data between Kafka and popular data stores",
libraryDependencies ++= baseDeps ++ kafkaConnectElasticBaseDeps,
publish / skip := true,
packExcludeJars := Seq(
"scala-.*\\.jar",
"zookeeper-.*\\.jar",
),
),
)
.configureAssembly()
.configureTests(baseTestDeps)
.configureIntegrationTests(kafkaConnectElastic8TestDeps)
.configureFunctionalTests()
.disablePlugins(PackPlugin)

lazy val elastic8 = (project in file("kafka-connect-elastic8"))
.dependsOn(common)
.dependsOn(`elastic-common`)
.dependsOn(`test-common` % "fun->compile")
.settings(
settings ++
Expand All @@ -157,6 +181,29 @@ lazy val elastic8 = (project in file("kafka-connect-elastic8"))
.configureFunctionalTests()
.enablePlugins(PackPlugin)

lazy val opensearch = (project in file("kafka-connect-opensearch"))
.dependsOn(common)
.dependsOn(`elastic-common`)
.dependsOn(`test-common` % "fun->compile")
.settings(
settings ++
Seq(
name := "kafka-connect-opensearch",
description := "Kafka Connect compatible connectors to move data between Kafka and popular data stores",
libraryDependencies ++= baseDeps ++ kafkaConnectOpenSearchDeps,
publish / skip := true,
packExcludeJars := Seq(
"scala-.*\\.jar",
"zookeeper-.*\\.jar",
),
),
)
.configureAssembly()
.configureTests(baseTestDeps)
//.configureIntegrationTests(kafkaConnectOpenSearchTestDeps)
.configureFunctionalTests(bouncyCastle)
.enablePlugins(PackPlugin)

lazy val hazelcast = (project in file("kafka-connect-hazelcast"))
.dependsOn(common)
.settings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,16 @@ import cats.implicits.toBifunctorOps
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings.TASK_INDEX
import io.lenses.streamreactor.connect.aws.s3.source.distribution.PartitionHasher

import java.util

case class ConnectorTaskId(name: String, maxTasks: Int, taskNo: Int) {
def ownsDir(dirPath: String): Boolean =
if (maxTasks == 1) true
else PartitionHasher.hash(maxTasks, dirPath) == taskNo
}

object ConnectorTaskId {
def fromProps(props: util.Map[String, String]): Either[Throwable, ConnectorTaskId] = {
def fromProps(props: Map[String, String]): Either[Throwable, ConnectorTaskId] = {
for {
taskIndexString <- Option(props.get(TASK_INDEX)).toRight(s"Missing $TASK_INDEX")
taskIndexString <- props.get(TASK_INDEX).toRight(s"Missing $TASK_INDEX")
taskIndex = taskIndexString.split(":")
_ <- if (taskIndex.size != 2) Left(s"Invalid $TASK_INDEX. Expecting TaskNumber:MaxTask format.") else Right(())
maxTasks <- taskIndex(1).toIntOption.toRight(
Expand All @@ -44,7 +42,7 @@ object ConnectorTaskId {
)
_ <- if (taskNumber < 0) Left(s"Invalid $TASK_INDEX. Expecting a positive integer but found:${taskIndex(0)}")
else Right(())
maybeTaskName <- Option(props.get("name")).filter(_.trim.nonEmpty).toRight("Missing connector name")
maybeTaskName <- props.get("name").filter(_.trim.nonEmpty).toRight("Missing connector name")
} yield ConnectorTaskId(maybeTaskName, maxTasks, taskNumber)
}.leftMap(new IllegalArgumentException(_))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class S3SinkTask extends SinkTask with ErrorHandler {

printAsciiHeader(manifest, "/aws-s3-sink-ascii.txt")

ConnectorTaskId.fromProps(fallbackProps) match {
ConnectorTaskId.fromProps(fallbackProps.asScala.toMap) match {
case Left(value) => throw new IllegalArgumentException(value)
case Right(value) => connectorTaskId = value
}
Expand All @@ -67,7 +67,7 @@ class S3SinkTask extends SinkTask with ErrorHandler {
val contextProps = Option(context).flatMap(c => Option(c.configs())).map(_.asScala.toMap).getOrElse(Map.empty)
val props = MapUtils.mergeProps(contextProps, fallbackProps.asScala.toMap).asJava
val errOrWriterMan = for {
config <- S3SinkConfig.fromProps(props)
config <- S3SinkConfig.fromProps(props.asScala.toMap)
s3Client <- AwsS3ClientCreator.make(config.s3Config)
storageInterface = new AwsS3StorageInterface(connectorTaskId, s3Client, config.batchDelete)
_ <- Try(setErrorRetryInterval(config.s3Config)).toEither
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ package io.lenses.streamreactor.connect.aws.s3.sink.config
import cats.syntax.all._
import com.datamountaineer.kcql.Kcql
import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.aws.s3.config.ConnectorTaskId
import io.lenses.streamreactor.connect.aws.s3.config.DataStorageSettings
import io.lenses.streamreactor.connect.aws.s3.config.FormatSelection
import io.lenses.streamreactor.connect.aws.s3.config.S3Config
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings.SEEK_MAX_INDEX_FILES
import io.lenses.streamreactor.connect.aws.s3.config._
import io.lenses.streamreactor.connect.aws.s3.model.CompressionCodec
Expand All @@ -30,18 +26,17 @@ import io.lenses.streamreactor.connect.aws.s3.sink.commit.Count
import io.lenses.streamreactor.connect.aws.s3.sink.config.kcqlprops.S3SinkProps
import io.lenses.streamreactor.connect.aws.s3.sink.config.kcqlprops.S3SinkPropsSchema
import io.lenses.streamreactor.connect.aws.s3.sink.config.padding.PaddingService
import io.lenses.streamreactor.connect.aws.s3.sink.naming.OffsetS3FileNamer
import io.lenses.streamreactor.connect.aws.s3.sink.naming.KeyNamer
import io.lenses.streamreactor.connect.aws.s3.sink.naming.TopicPartitionOffsetS3FileNamer
import io.lenses.streamreactor.connect.aws.s3.sink.naming.OffsetS3FileNamer
import io.lenses.streamreactor.connect.aws.s3.sink.naming.S3KeyNamer
import io.lenses.streamreactor.connect.aws.s3.sink.naming.TopicPartitionOffsetS3FileNamer

import java.util
import scala.jdk.CollectionConverters._

object S3SinkConfig {

def fromProps(
props: util.Map[String, String],
props: Map[String, String],
)(
implicit
connectorTaskId: ConnectorTaskId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class S3SinkConfigDef() extends ConfigDef with LazyLogging {

}

case class S3SinkConfigDefBuilder(props: util.Map[String, String])
case class S3SinkConfigDefBuilder(props: Map[String, String])
extends BaseConfig(S3ConfigSettings.CONNECTOR_PREFIX, S3SinkConfigDef.config, props)
with KcqlSettings
with ErrorPolicySettings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class S3SourceTask extends SourceTask with LazyLogging {
val contextProperties = Option(context).flatMap(c => Option(c.configs()).map(_.asScala.toMap)).getOrElse(Map.empty)
val mergedProperties = MapUtils.mergeProps(contextProperties, props.asScala.toMap).asJava
(for {
result <- S3SourceState.make(mergedProperties, contextOffsetFn)
result <- S3SourceState.make(mergedProperties.asScala.toMap, contextOffsetFn)
fiber <- result.partitionDiscoveryLoop.start
} yield {
s3SourceTaskState = result.state.some
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,13 @@ import io.lenses.streamreactor.connect.aws.s3.storage.FileMetadata
import io.lenses.streamreactor.connect.aws.s3.storage.ListResponse
import io.lenses.streamreactor.connect.aws.s3.storage.StorageInterface

import java.util
import scala.jdk.CollectionConverters.MapHasAsScala
import scala.util.Try

object S3SourceConfig {

def fromProps(
props: util.Map[String, String],
props: Map[String, String],
): Either[Throwable, S3SourceConfig] =
S3SourceConfig(S3SourceConfigDefBuilder(props))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class S3SourceConfigDef() extends ConfigDef with LazyLogging {

}

case class S3SourceConfigDefBuilder(props: util.Map[String, String])
case class S3SourceConfigDefBuilder(props: Map[String, String])
extends BaseConfig(S3ConfigSettings.CONNECTOR_PREFIX, S3SourceConfigDef.config, props)
with KcqlSettings
with ErrorPolicySettings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@ import io.lenses.streamreactor.connect.aws.s3.source.reader.ReaderManager
import io.lenses.streamreactor.connect.aws.s3.source.reader.ReaderManagerState
import io.lenses.streamreactor.connect.aws.s3.storage.AwsS3StorageInterface

import java.util
import scala.jdk.CollectionConverters.IteratorHasAsScala

object S3SourceState extends StrictLogging {
def make(
props: util.Map[String, String],
props: Map[String, String],
contextOffsetFn: S3Location => Option[S3Location],
): IO[BuilderResult] =
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,55 +21,54 @@ import io.lenses.streamreactor.connect.aws.s3.source.distribution.PartitionHashe
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import scala.jdk.CollectionConverters._
class ConnectorTaskIdTest extends AnyWordSpec with Matchers {
private val connectorName = "connectorName"
"ConnectorTaskId" should {
"create the instance" in {
val from = Map("a" -> "1", "b" -> "2", S3ConfigSettings.TASK_INDEX -> "0:2", "name" -> connectorName)
ConnectorTaskId.fromProps(from.asJava) shouldBe ConnectorTaskId(connectorName, 2, 0).asRight[String]
ConnectorTaskId.fromProps(from) shouldBe ConnectorTaskId(connectorName, 2, 0).asRight[String]
}
"fail if max tasks is not valid integer" in {
val from = Map("a" -> "1", "b" -> "2", S3ConfigSettings.TASK_INDEX -> "0:2a", "name" -> connectorName)
val actual = ConnectorTaskId.fromProps(from.asJava)
val actual = ConnectorTaskId.fromProps(from)
actual match {
case Left(e) => e.getMessage shouldBe s"Invalid $TASK_INDEX. Expecting an integer but found:2a"
case Right(_) => fail("Should have failed")
}
}
"fail if task number is not a valid integer" in {
val from = Map("a" -> "1", "b" -> "2", S3ConfigSettings.TASK_INDEX -> "0a:2", "name" -> connectorName)
ConnectorTaskId.fromProps(from.asJava) match {
ConnectorTaskId.fromProps(from) match {
case Left(value) => value.getMessage shouldBe s"Invalid $TASK_INDEX. Expecting an integer but found:0a"
case Right(_) => fail("Should have failed")
}
}
"fail if task number < 0" in {
val from = Map("a" -> "1", "b" -> "2", S3ConfigSettings.TASK_INDEX -> "-1:2", "name" -> connectorName)
ConnectorTaskId.fromProps(from.asJava) match {
ConnectorTaskId.fromProps(from) match {
case Left(value) => value.getMessage shouldBe s"Invalid $TASK_INDEX. Expecting a positive integer but found:-1"
case Right(value) => fail(s"Should have failed but got $value")
}

}
"fail if max tasks is zero" in {
val from = Map("a" -> "1", "b" -> "2", S3ConfigSettings.TASK_INDEX -> "0:0", "name" -> connectorName)
ConnectorTaskId.fromProps(from.asJava) match {
ConnectorTaskId.fromProps(from) match {
case Left(value) => value.getMessage shouldBe s"Invalid $TASK_INDEX. Expecting a positive integer but found:0"
case Right(value) => fail(s"Should have failed but got $value")
}
}
"fail if max tasks is negative" in {
val from = Map("a" -> "1", "b" -> "2", S3ConfigSettings.TASK_INDEX -> "0:-1", "name" -> connectorName)
ConnectorTaskId.fromProps(from.asJava) match {
ConnectorTaskId.fromProps(from) match {
case Left(value) => value.getMessage shouldBe s"Invalid $TASK_INDEX. Expecting a positive integer but found:-1"
case Right(value) => fail(s"Should have failed but got $value")
}
}

"own the partitions when max task is 1" in {
val from = Map("a" -> "1", "b" -> "2", S3ConfigSettings.TASK_INDEX -> "0:1", "name" -> connectorName)
val actual = ConnectorTaskId.fromProps(from.asJava).getOrElse(fail("Should be valid"))
val actual = ConnectorTaskId.fromProps(from).getOrElse(fail("Should be valid"))

Seq("/myTopic/", "/anotherTopic/", "/thirdTopic/")
.flatMap { value =>
Expand All @@ -86,12 +85,12 @@ class ConnectorTaskIdTest extends AnyWordSpec with Matchers {
"b" -> "2",
S3ConfigSettings.TASK_INDEX -> "0:2",
"name" -> connectorName,
).asJava).getOrElse(fail("Should be valid"))
)).getOrElse(fail("Should be valid"))
val two = ConnectorTaskId.fromProps(Map("a" -> "1",
"b" -> "2",
S3ConfigSettings.TASK_INDEX -> "1:2",
"name" -> connectorName,
).asJava).getOrElse(fail("Should be valid"))
)).getOrElse(fail("Should be valid"))

PartitionHasher.hash(2, "1") shouldBe 1
one.ownsDir("1") shouldBe false
Expand All @@ -108,17 +107,17 @@ class ConnectorTaskIdTest extends AnyWordSpec with Matchers {
"b" -> "2",
S3ConfigSettings.TASK_INDEX -> "0:3",
"name" -> connectorName,
).asJava).getOrElse(fail("Should be valid"))
)).getOrElse(fail("Should be valid"))
val two = ConnectorTaskId.fromProps(Map("a" -> "1",
"b" -> "2",
S3ConfigSettings.TASK_INDEX -> "1:3",
"name" -> connectorName,
).asJava).getOrElse(fail("Should be valid"))
)).getOrElse(fail("Should be valid"))
val three = ConnectorTaskId.fromProps(Map("a" -> "1",
"b" -> "2",
S3ConfigSettings.TASK_INDEX -> "2:3",
"name" -> connectorName,
).asJava).getOrElse(fail("Should be valid"))
)).getOrElse(fail("Should be valid"))

PartitionHasher.hash(3, "1") shouldBe 1
one.ownsDir("1") shouldBe false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import io.lenses.streamreactor.connect.aws.s3.source.config.S3SourceConfigDefBui
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers

import scala.jdk.CollectionConverters.MapHasAsJava

class S3SourceConfigTest extends AnyFunSuite with Matchers {
private val Identity: String = "identity"
private val Credential: String = "credential"
Expand Down Expand Up @@ -62,7 +60,7 @@ class S3SourceConfigTest extends AnyFunSuite with Matchers {
"connect.s3.partition.search.recurse.levels" -> "0",
)

S3SourceConfig(S3SourceConfigDefBuilder(props.asJava)) match {
S3SourceConfig(S3SourceConfigDefBuilder(props)) match {
case Left(value) => fail(value.toString)
case Right(config) =>
config.bucketOptions.size shouldBe 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.prop.TableDrivenPropertyChecks._

import scala.jdk.CollectionConverters.MapHasAsJava

class DeleteModeSettingsTest extends AnyFlatSpec with Matchers with LazyLogging {
private val deleteModeMap = Table[String, String, Boolean](
("testName", "value", "expected"),
Expand All @@ -36,7 +34,7 @@ class DeleteModeSettingsTest extends AnyFlatSpec with Matchers with LazyLogging
S3SinkConfigDefBuilder(Map(
"connect.s3.kcql" -> "abc",
"connect.s3.delete.mode" -> value,
).asJava).batchDelete() should be(expected)
)).batchDelete() should be(expected)
}
}
}
Loading

0 comments on commit aea66a8

Please sign in to comment.