Skip to content

Commit

Permalink
new sqs dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
Neelab Chaudhuri authored and wajda committed Jul 9, 2024
1 parent 6df9652 commit 02e0c88
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 0 deletions.
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@
<artifactId>snakeyaml</artifactId>
<version>1.33</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
<version>2.20.18</version>
</dependency>

<!-- dependencies shaded by us-->

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package za.co.absa.spline.harvester.dispatcher.sqsdispatcher

import org.apache.commons.configuration.Configuration
import za.co.absa.commons.config.ConfigurationImplicits._
import za.co.absa.commons.version.Version
import za.co.absa.spline.harvester.dispatcher.sqsdispatcher.SqsLineageDispatcherConfig._

import java.time.Duration
import java.time.temporal.ChronoUnit

object SqsLineageDispatcherConfig {
val QueueUrl = "queue.url"
val ApiVersion = "apiVersion"

def apply(c: Configuration) = new SqsLineageDispatcherConfig(c)
}

class SqsLineageDispatcherConfig(config: Configuration) {
val queueUrl: String = config.getRequiredString(QueueUrl)
val apiVersion: Version = Version.asSimple(config.getString(ApiVersion, "1.2"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package za.co.absa.spline.harvester.dispatcher.sqsdispatcher

import org.apache.commons.configuration.Configuration
import org.apache.spark.internal.Logging
import software.amazon.awssdk.services.sqs.SqsClient
import software.amazon.awssdk.services.sqs.model.SendMessageRequest
import za.co.absa.commons.version.Version
import za.co.absa.spline.harvester.dispatcher.LineageDispatcher
import za.co.absa.spline.harvester.dispatcher.modelmapper.ModelMapper
import za.co.absa.spline.producer.model.{ExecutionEvent, ExecutionPlan}

class SqsLineageDispatcherImpl(sqsClient: SqsClient,
sqsUrl: String,
apiVersion: Version) extends LineageDispatcher with Logging {
import za.co.absa.spline.harvester.json.HarvesterJsonSerDe.impl._
def this(dispatcherConfig: SqsLineageDispatcherConfig) = this(
SqsLineageDispatcherImpl.createSqsClient(dispatcherConfig),
dispatcherConfig.queueUrl,
dispatcherConfig.apiVersion
)

def this(configuration: Configuration) = this(new SqsLineageDispatcherConfig(configuration))

override def name = "Sqs"

logInfo(s"Using Producer API version: ${apiVersion.asString}")
logInfo(s"Sqs url: $sqsUrl")

private val modelMapper = ModelMapper.forApiVersion(apiVersion)

private var cachedPlan: ExecutionPlan = _

override def send(plan: ExecutionPlan): Unit = {
cachedPlan = plan
}

override def send(event: ExecutionEvent): Unit = {
assert(cachedPlan != null)
val plan = cachedPlan
for {
execPlanDTO <- modelMapper.toDTO(plan)
eventDTO <- modelMapper.toDTO(event)
} {
val jsonPlan = execPlanDTO.toJson
val jsonEvent = eventDTO.toJson
val json =
s"""
| {
| "plan": $jsonPlan,
| "event": $jsonEvent
| }
|""".stripMargin
sendToSqs(json)
}
}

private def sendToSqs(json: String,
objectType: String = "Spline"): Unit = {
val body =
s"""
| { "requestType": "SparkJobRunInfo",
| "objectType": "$objectType",
| "body": $json
| }
|""".stripMargin
val sendMsgRequest = SendMessageRequest.builder()
.queueUrl(sqsUrl)
.messageBody(body)
.build()
sqsClient.sendMessage(sendMsgRequest)
}
}


object SqsLineageDispatcherImpl extends Logging {

private def createSqsClient(config: SqsLineageDispatcherConfig): SqsClient = {
SqsClient
.builder()
.build()
}
}

0 comments on commit 02e0c88

Please sign in to comment.