diff --git a/core/pom.xml b/core/pom.xml
index 48b69622..9a1f19c7 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -88,6 +88,11 @@
snakeyaml
1.33
+
+ software.amazon.awssdk
+ sqs
+ 2.20.18
+
diff --git a/core/src/main/scala/za/co/absa/spline/harvester/dispatcher/sqsdispatcher/SqsLineageDispatcherConfig.scala b/core/src/main/scala/za/co/absa/spline/harvester/dispatcher/sqsdispatcher/SqsLineageDispatcherConfig.scala
new file mode 100644
index 00000000..d480d070
--- /dev/null
+++ b/core/src/main/scala/za/co/absa/spline/harvester/dispatcher/sqsdispatcher/SqsLineageDispatcherConfig.scala
@@ -0,0 +1,21 @@
+package za.co.absa.spline.harvester.dispatcher.sqsdispatcher
+
+import org.apache.commons.configuration.Configuration
+import za.co.absa.commons.config.ConfigurationImplicits._
+import za.co.absa.commons.version.Version
+import za.co.absa.spline.harvester.dispatcher.sqsdispatcher.SqsLineageDispatcherConfig._
+
+import java.time.Duration
+import java.time.temporal.ChronoUnit
+
+object SqsLineageDispatcherConfig {
+ val QueueUrl = "queue.url"
+ val ApiVersion = "apiVersion"
+
+ def apply(c: Configuration) = new SqsLineageDispatcherConfig(c)
+}
+
+class SqsLineageDispatcherConfig(config: Configuration) {
+ val queueUrl: String = config.getRequiredString(QueueUrl)
+ val apiVersion: Version = Version.asSimple(config.getString(ApiVersion, "1.2"))
+}
diff --git a/core/src/main/scala/za/co/absa/spline/harvester/dispatcher/sqsdispatcher/SqsLineageDispatcherImpl.scala b/core/src/main/scala/za/co/absa/spline/harvester/dispatcher/sqsdispatcher/SqsLineageDispatcherImpl.scala
new file mode 100644
index 00000000..e3e8bdf1
--- /dev/null
+++ b/core/src/main/scala/za/co/absa/spline/harvester/dispatcher/sqsdispatcher/SqsLineageDispatcherImpl.scala
@@ -0,0 +1,82 @@
+package za.co.absa.spline.harvester.dispatcher.sqsdispatcher
+
+import org.apache.commons.configuration.Configuration
+import org.apache.spark.internal.Logging
+import software.amazon.awssdk.services.sqs.SqsClient
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest
+import za.co.absa.commons.version.Version
+import za.co.absa.spline.harvester.dispatcher.LineageDispatcher
+import za.co.absa.spline.harvester.dispatcher.modelmapper.ModelMapper
+import za.co.absa.spline.producer.model.{ExecutionEvent, ExecutionPlan}
+
+class SqsLineageDispatcherImpl(sqsClient: SqsClient,
+ sqsUrl: String,
+ apiVersion: Version) extends LineageDispatcher with Logging {
+ import za.co.absa.spline.harvester.json.HarvesterJsonSerDe.impl._
+ def this(dispatcherConfig: SqsLineageDispatcherConfig) = this(
+ SqsLineageDispatcherImpl.createSqsClient(dispatcherConfig),
+ dispatcherConfig.queueUrl,
+ dispatcherConfig.apiVersion
+ )
+
+ def this(configuration: Configuration) = this(new SqsLineageDispatcherConfig(configuration))
+
+ override def name = "Sqs"
+
+ logInfo(s"Using Producer API version: ${apiVersion.asString}")
+ logInfo(s"Sqs url: $sqsUrl")
+
+ private val modelMapper = ModelMapper.forApiVersion(apiVersion)
+
+ private var cachedPlan: ExecutionPlan = _
+
+ override def send(plan: ExecutionPlan): Unit = {
+ cachedPlan = plan
+ }
+
+ override def send(event: ExecutionEvent): Unit = {
+ assert(cachedPlan != null)
+ val plan = cachedPlan
+ for {
+ execPlanDTO <- modelMapper.toDTO(plan)
+ eventDTO <- modelMapper.toDTO(event)
+ } {
+ val jsonPlan = execPlanDTO.toJson
+ val jsonEvent = eventDTO.toJson
+ val json =
+ s"""
+ | {
+ | "plan": $jsonPlan,
+ | "event": $jsonEvent
+ | }
+ |""".stripMargin
+ sendToSqs(json)
+ }
+ }
+
+ private def sendToSqs(json: String,
+ objectType: String = "Spline"): Unit = {
+ val body =
+ s"""
+ | { "requestType": "SparkJobRunInfo",
+ | "objectType": "$objectType",
+ | "body": $json
+ | }
+ |""".stripMargin
+ val sendMsgRequest = SendMessageRequest.builder()
+ .queueUrl(sqsUrl)
+ .messageBody(body)
+ .build()
+ sqsClient.sendMessage(sendMsgRequest)
+ }
+}
+
+
+object SqsLineageDispatcherImpl extends Logging {
+
+ private def createSqsClient(config: SqsLineageDispatcherConfig): SqsClient = {
+ SqsClient
+ .builder()
+ .build()
+ }
+}