Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass sourceQueueName via System Message Attribute for DeadLetterQueueSourceArn #1086

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/scala/org/elasticmq/MessageData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ case class MessageData(
messageGroupId: Option[String],
messageDeduplicationId: Option[DeduplicationId],
tracingId: Option[TracingId],
sequenceNumber: Option[String]
sequenceNumber: Option[String],
deadLetterSourceQueueName: Option[String]
)
4 changes: 2 additions & 2 deletions core/src/main/scala/org/elasticmq/NewMessageData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ case class NewMessageData(
id: Option[MessageId],
content: String,
messageAttributes: Map[String, MessageAttribute],
messageSystemAttributes: Map[String, MessageAttribute],
nextDelivery: NextDelivery,
messageGroupId: Option[String],
messageDeduplicationId: Option[DeduplicationId],
orderIndex: Int,
tracingId: Option[TracingId],
sequenceNumber: Option[String]
sequenceNumber: Option[String],
deadLetterSourceQueueName: Option[String]
)
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ case class InternalMessage(
var nextDelivery: Long,
content: String,
messageAttributes: Map[String, MessageAttribute],
messageSystemAttributes: Map[String, MessageAttribute],
created: OffsetDateTime,
orderIndex: Int,
var firstReceive: Received,
Expand All @@ -22,7 +21,8 @@ case class InternalMessage(
messageGroupId: Option[String],
messageDeduplicationId: Option[DeduplicationId],
tracingId: Option[TracingId],
sequenceNumber: Option[String]
sequenceNumber: Option[String],
deadLetterSourceQueueName: Option[String]
) extends Comparable[InternalMessage] {

// Priority queues have biggest elements first
Expand Down Expand Up @@ -67,21 +67,22 @@ case class InternalMessage(
messageGroupId,
messageDeduplicationId,
tracingId,
sequenceNumber
sequenceNumber,
deadLetterSourceQueueName
)

def toNewMessageData =
NewMessageData(
Some(MessageId(id)),
content,
messageAttributes,
messageSystemAttributes,
MillisNextDelivery(nextDelivery),
messageGroupId,
messageDeduplicationId,
orderIndex,
tracingId,
sequenceNumber
sequenceNumber,
deadLetterSourceQueueName
)

def deliverable(deliveryTime: Long): Boolean = nextDelivery <= deliveryTime
Expand All @@ -97,7 +98,6 @@ object InternalMessage {
newMessageData.nextDelivery.toMillis(now, queueData.delay.toMillis).millis,
newMessageData.content,
newMessageData.messageAttributes,
newMessageData.messageSystemAttributes,
OffsetDateTime.now(),
newMessageData.orderIndex,
NeverReceived,
Expand All @@ -106,7 +106,8 @@ object InternalMessage {
newMessageData.messageGroupId,
newMessageData.messageDeduplicationId,
newMessageData.tracingId,
newMessageData.sequenceNumber
newMessageData.sequenceNumber,
newMessageData.deadLetterSourceQueueName
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ trait QueueActorMessageOps
receiveMessages(visibilityTimeout, count, receiveRequestAttemptId).send()
case DeleteMessage(deliveryReceipt) =>
deleteMessage(deliveryReceipt).send()
case LookupMessage(messageId) => messageQueue.getById(messageId.id).map(_.toMessageData)
case MoveMessage(message, destination) => moveMessage(message, destination).send()
case LookupMessage(messageId) => messageQueue.getById(messageId.id).map(_.toMessageData)
case MoveMessage(message, destination, sourceQueueName) =>
moveMessage(message, destination, sourceQueueName).send()
case DeduplicationIdsCleanup =>
fifoMessagesHistory = fifoMessagesHistory.cleanOutdatedMessages(nowProvider)
DoNotReply()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,27 @@ import org.elasticmq.{DeduplicationId, MoveDestination, MoveToDLQ}
trait MoveMessageOps extends Logging {
this: QueueActorStorage =>

def moveMessage(message: InternalMessage, destination: MoveDestination): ResultWithEvents[Unit] = {
def moveMessage(
message: InternalMessage,
destination: MoveDestination,
sourceQueueName: String
): ResultWithEvents[Unit] = {

copyMessagesToActorRef.foreach { _ ! SendMessage(message.toNewMessageData) }
val messageWithSourceQueueName = message.copy(deadLetterSourceQueueName = Some(sourceQueueName))
copyMessagesToActorRef.foreach { _ ! SendMessage(messageWithSourceQueueName.toNewMessageData) }

destination match {
case MoveToDLQ =>
if (queueData.isFifo) {
CommonOperations.wasRegistered(message.toNewMessageData, fifoMessagesHistory) match {
CommonOperations.wasRegistered(messageWithSourceQueueName.toNewMessageData, fifoMessagesHistory) match {
case Some(_) => ResultWithEvents.empty
case None =>
logger.debug(s"Moved message (${message.id}) from FIFO queue to ${queueData.name}")
moveMessageToQueue(regenerateDeduplicationId(message))
logger.debug(s"Moved message (${messageWithSourceQueueName.id}) from FIFO queue to ${queueData.name}")
moveMessageToQueue(regenerateDeduplicationId(messageWithSourceQueueName))
}
} else {
logger.debug(s"Moved message (${message.id}) to ${queueData.name}")
moveMessageToQueue(message)
logger.debug(s"Moved message (${messageWithSourceQueueName.id}) to ${queueData.name}")
moveMessageToQueue(messageWithSourceQueueName)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ trait ReceiveMessageOps extends Logging {
messageQueue.dequeue(count, deliveryTime).map { internalMessage =>
if (queueData.deadLettersQueue.map(_.maxReceiveCount).exists(_ <= internalMessage.receiveCount)) {
logger.debug(s"${queueData.name}: send message $internalMessage to dead letters actor $deadLettersActorRef")
deadLettersActorRef.foreach(_ ! MoveMessage(internalMessage, MoveToDLQ))
deadLettersActorRef.foreach(_ ! MoveMessage(internalMessage, MoveToDLQ, queueData.name))
MessageToDelete(internalMessage)
} else {
MessageToReturn(internalMessage)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/elasticmq/msg/QueueMsg.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ case class GetQueueStatistics(deliveryTime: Long) extends QueueQueueMsg[QueueSta
case class ClearQueue() extends QueueQueueMsg[Unit]

case class SendMessage(message: NewMessageData) extends QueueMessageMsg[MessageData]
case class MoveMessage(message: InternalMessage, moveDestination: MoveDestination) extends QueueMessageMsg[Unit]
case class MoveMessage(message: InternalMessage, moveDestination: MoveDestination, sourceQueueName: String) extends QueueMessageMsg[Unit]
case class UpdateVisibilityTimeout(deliveryReceipt: DeliveryReceipt, visibilityTimeout: VisibilityTimeout)
extends QueueMessageMsg[Either[InvalidReceiptHandle, Unit]]
case class ReceiveMessages(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ class FifoDeduplicationIdsHistoryTest extends AnyFunSuite with Matchers {
nextDelivery = 100L,
content = "",
messageAttributes = Map.empty,
messageSystemAttributes = Map.empty,
created = created,
orderIndex = 0,
firstReceive = NeverReceived,
Expand All @@ -116,7 +115,8 @@ class FifoDeduplicationIdsHistoryTest extends AnyFunSuite with Matchers {
messageGroupId = None,
messageDeduplicationId = maybeDeduplicationId,
tracingId = None,
sequenceNumber = None
sequenceNumber = None,
deadLetterSourceQueueName = None
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ class InternalMessageSpec extends AnyFunSuite with Matchers {
nextDelivery = 0L,
content = "content",
messageAttributes = Map.empty,
messageSystemAttributes = Map.empty,
created = freezedDateTime,
orderIndex = 100,
firstReceive = NeverReceived,
Expand All @@ -27,7 +26,8 @@ class InternalMessageSpec extends AnyFunSuite with Matchers {
messageGroupId = None,
messageDeduplicationId = None,
tracingId = None,
sequenceNumber = None
sequenceNumber = None,
deadLetterSourceQueueName = None
)

val second = first.copy(
Expand All @@ -46,7 +46,6 @@ class InternalMessageSpec extends AnyFunSuite with Matchers {
nextDelivery = 0L,
content = "content",
messageAttributes = Map.empty,
messageSystemAttributes = Map.empty,
created = freezedDateTime,
orderIndex = 100,
firstReceive = NeverReceived,
Expand All @@ -55,7 +54,8 @@ class InternalMessageSpec extends AnyFunSuite with Matchers {
messageGroupId = None,
messageDeduplicationId = None,
tracingId = None,
sequenceNumber = None
sequenceNumber = None,
deadLetterSourceQueueName = None
)

val second = first.copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ class ReceiveRequestAttemptCacheTest extends AnyFunSuite with Matchers {
1L,
"content",
Map.empty,
Map.empty,
nowProvider.now,
orderIndex = 0,
NeverReceived,
Expand All @@ -32,7 +31,8 @@ class ReceiveRequestAttemptCacheTest extends AnyFunSuite with Matchers {
messageGroupId = None,
messageDeduplicationId = None,
tracingId = None,
sequenceNumber = None
sequenceNumber = None,
deadLetterSourceQueueName = None
)
val msg2 = msg1.copy(id = "id-2")
val msg3 = msg1.copy(id = "id-3")
Expand Down Expand Up @@ -74,7 +74,6 @@ class ReceiveRequestAttemptCacheTest extends AnyFunSuite with Matchers {
1L,
"content",
Map.empty,
Map.empty,
nowProvider.now,
orderIndex = 0,
NeverReceived,
Expand All @@ -83,7 +82,8 @@ class ReceiveRequestAttemptCacheTest extends AnyFunSuite with Matchers {
messageGroupId = None,
messageDeduplicationId = None,
tracingId = None,
sequenceNumber = None
sequenceNumber = None,
deadLetterSourceQueueName = None
)
val msg2 = msg1.copy(id = "id-2")
val messageQueue = MessageQueue(isFifo = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ trait DataCreationHelpers {
messageGroupId,
messageDeduplicationId,
tracingId,
None
sequenceNumber = None,
deadLetterSourceQueueName = None
)

def createNewMessageData(
Expand All @@ -68,26 +69,26 @@ trait DataCreationHelpers {
Some(MessageId(id)),
content,
messageAttributes,
Map.empty,
nextDelivery,
messageGroupId,
messageDeduplicationId,
orderIndex = 0,
tracingId,
None
sequenceNumber = None,
deadLetterSourceQueueName = None
)

def createNewMessageData(messageData: MessageData) =
NewMessageData(
Some(messageData.id),
messageData.content,
messageData.messageAttributes,
Map.empty,
messageData.nextDelivery,
messageData.messageGroupId,
messageData.messageDeduplicationId,
orderIndex = 0,
messageData.tracingId,
messageData.sequenceNumber
messageData.sequenceNumber,
messageData.deadLetterSourceQueueName
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ case class DBMessage(
groupId: Option[String],
deduplicationId: Option[String],
tracingId: Option[String],
sequenceNumber: Option[String]
sequenceNumber: Option[String],
deadLetterSourceQueueName: Option[String]
) {

def toInternalMessage: InternalMessage = {
Expand Down Expand Up @@ -48,7 +49,6 @@ case class DBMessage(
nextDelivery,
new String(content),
serializedAttrs,
Map.empty,
OffsetDateTimeUtil.ofEpochMilli(created),
orderIndex = 0,
firstReceive,
Expand All @@ -57,7 +57,8 @@ case class DBMessage(
groupId,
deduplicationId.map(id => DeduplicationId(id)),
tracingId.map(TracingId.apply),
sequenceNumber
sequenceNumber,
deadLetterSourceQueueName
)
}
}
Expand All @@ -76,7 +77,8 @@ object DBMessage {
rs.stringOpt("group_id"),
rs.stringOpt("deduplication_id"),
rs.stringOpt("tracing_id"),
rs.stringOpt("sequence_number")
rs.stringOpt("sequence_number"),
rs.stringOpt("dead_letter_source_queue_name")
)

def from(message: InternalMessage): DBMessage = {
Expand Down Expand Up @@ -111,7 +113,8 @@ object DBMessage {
message.messageGroupId,
deduplicationId,
message.tracingId.map(_.id),
message.sequenceNumber
message.sequenceNumber,
message.deadLetterSourceQueueName
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class MessageRepository(queueName: String, db: DB) extends Logging {
group_id varchar,
deduplication_id varchar,
tracing_id varchar,
sequence_number varchar
sequence_number varchar,
dead_letter_source_queue_name varchar
)""".execute.apply()

def drop(): Unit = {
Expand All @@ -50,7 +51,7 @@ class MessageRepository(queueName: String, db: DB) extends Logging {
def add(internalMessage: InternalMessage): Int = {
val message = DBMessage.from(internalMessage)
sql"""insert into $tableName
(message_id, delivery_receipts, next_delivery, content, attributes, created, received, receive_count, group_id, deduplication_id, tracing_id, sequence_number)
(message_id, delivery_receipts, next_delivery, content, attributes, created, received, receive_count, group_id, deduplication_id, tracing_id, sequence_number, dead_letter_source_queue_name)
values (${message.messageId},
${message.deliveryReceipts},
${message.nextDelivery},
Expand All @@ -62,7 +63,8 @@ class MessageRepository(queueName: String, db: DB) extends Logging {
${message.groupId},
${message.deduplicationId},
${message.tracingId},
${message.sequenceNumber})""".update.apply()
${message.sequenceNumber},
${message.deadLetterSourceQueueName})""".update.apply()
}

def update(internalMessage: InternalMessage): Int = {
Expand All @@ -74,7 +76,8 @@ class MessageRepository(queueName: String, db: DB) extends Logging {
received = ${message.received},
receive_count = ${message.receiveCount},
tracing_id = ${message.tracingId},
sequence_number = ${message.sequenceNumber}
sequence_number = ${message.sequenceNumber},
dead_letter_source_queue_name = ${message.deadLetterSourceQueueName}
where message_id = ${message.messageId}""".update.apply()
}

Expand Down
Loading
Loading