Skip to content

Commit

Permalink
use dedicated field in Message instead of system attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
micossow committed Jan 6, 2025
1 parent c2aa609 commit 742b436
Show file tree
Hide file tree
Showing 15 changed files with 80 additions and 72 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/elasticmq/MessageData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ case class MessageData(
deliveryReceipt: Option[DeliveryReceipt],
content: String,
messageAttributes: Map[String, MessageAttribute],
messageSystemAttributes: Map[String, MessageAttribute],
nextDelivery: MillisNextDelivery,
created: OffsetDateTime,
statistics: MessageStatistics,
messageGroupId: Option[String],
messageDeduplicationId: Option[DeduplicationId],
tracingId: Option[TracingId],
sequenceNumber: Option[String]
sequenceNumber: Option[String],
deadLetterSourceQueueName: Option[String]
)
6 changes: 2 additions & 4 deletions core/src/main/scala/org/elasticmq/NewMessageData.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package org.elasticmq

import scala.collection.mutable

case class NewMessageData(
id: Option[MessageId],
content: String,
messageAttributes: Map[String, MessageAttribute],
messageSystemAttributes: Map[String, MessageAttribute],
nextDelivery: NextDelivery,
messageGroupId: Option[String],
messageDeduplicationId: Option[DeduplicationId],
orderIndex: Int,
tracingId: Option[TracingId],
sequenceNumber: Option[String]
sequenceNumber: Option[String],
deadLetterSourceQueueName: Option[String]
)
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ case class InternalMessage(
var nextDelivery: Long,
content: String,
messageAttributes: Map[String, MessageAttribute],
messageSystemAttributes: mutable.HashMap[String, MessageAttribute],
created: OffsetDateTime,
orderIndex: Int,
var firstReceive: Received,
Expand All @@ -22,7 +21,8 @@ case class InternalMessage(
messageGroupId: Option[String],
messageDeduplicationId: Option[DeduplicationId],
tracingId: Option[TracingId],
sequenceNumber: Option[String]
sequenceNumber: Option[String],
deadLetterSourceQueueName: Option[String]
) extends Comparable[InternalMessage] {

// Priority queues have biggest elements first
Expand Down Expand Up @@ -61,28 +61,28 @@ case class InternalMessage(
deliveryReceipts.lastOption.map(DeliveryReceipt(_)),
content,
messageAttributes,
messageSystemAttributes.to(Map),
MillisNextDelivery(nextDelivery),
created,
MessageStatistics(firstReceive, receiveCount),
messageGroupId,
messageDeduplicationId,
tracingId,
sequenceNumber
sequenceNumber,
deadLetterSourceQueueName
)

def toNewMessageData =
NewMessageData(
Some(MessageId(id)),
content,
messageAttributes,
messageSystemAttributes.to(Map),
MillisNextDelivery(nextDelivery),
messageGroupId,
messageDeduplicationId,
orderIndex,
tracingId,
sequenceNumber
sequenceNumber,
deadLetterSourceQueueName
)

def deliverable(deliveryTime: Long): Boolean = nextDelivery <= deliveryTime
Expand All @@ -98,7 +98,6 @@ object InternalMessage {
newMessageData.nextDelivery.toMillis(now, queueData.delay.toMillis).millis,
newMessageData.content,
newMessageData.messageAttributes,
newMessageData.messageSystemAttributes.to(mutable.HashMap),
OffsetDateTime.now(),
newMessageData.orderIndex,
NeverReceived,
Expand All @@ -107,7 +106,8 @@ object InternalMessage {
newMessageData.messageGroupId,
newMessageData.messageDeduplicationId,
newMessageData.tracingId,
newMessageData.sequenceNumber
newMessageData.sequenceNumber,
newMessageData.deadLetterSourceQueueName
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ trait QueueActorMessageOps
receiveMessages(visibilityTimeout, count, receiveRequestAttemptId).send()
case DeleteMessage(deliveryReceipt) =>
deleteMessage(deliveryReceipt).send()
case LookupMessage(messageId) => messageQueue.getById(messageId.id).map(_.toMessageData)
case MoveMessage(message, destination, sourceQueueName) => moveMessage(message, destination, sourceQueueName).send()
case LookupMessage(messageId) => messageQueue.getById(messageId.id).map(_.toMessageData)
case MoveMessage(message, destination, sourceQueueName) =>
moveMessage(message, destination, sourceQueueName).send()
case DeduplicationIdsCleanup =>
fifoMessagesHistory = fifoMessagesHistory.cleanOutdatedMessages(nowProvider)
DoNotReply()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,32 @@ package org.elasticmq.actor.queue.operations
import org.elasticmq.actor.queue.{InternalMessage, QueueActorStorage, QueueEvent}
import org.elasticmq.msg.SendMessage
import org.elasticmq.util.Logging
import org.elasticmq.{DeduplicationId, MoveDestination, MoveToDLQ, StringMessageAttribute}
import org.elasticmq.{DeduplicationId, MoveDestination, MoveToDLQ}

trait MoveMessageOps extends Logging {
this: QueueActorStorage =>

def moveMessage(message: InternalMessage, destination: MoveDestination, sourceQueueName: String): ResultWithEvents[Unit] = {
def moveMessage(
message: InternalMessage,
destination: MoveDestination,
sourceQueueName: String
): ResultWithEvents[Unit] = {

message.messageSystemAttributes.put("sourceQueueName", StringMessageAttribute(sourceQueueName))
copyMessagesToActorRef.foreach { _ ! SendMessage(message.toNewMessageData) }
val messageWithSourceQueueName = message.copy(deadLetterSourceQueueName = Some(sourceQueueName))
copyMessagesToActorRef.foreach { _ ! SendMessage(messageWithSourceQueueName.toNewMessageData) }

destination match {
case MoveToDLQ =>
if (queueData.isFifo) {
CommonOperations.wasRegistered(message.toNewMessageData, fifoMessagesHistory) match {
CommonOperations.wasRegistered(messageWithSourceQueueName.toNewMessageData, fifoMessagesHistory) match {
case Some(_) => ResultWithEvents.empty
case None =>
logger.debug(s"Moved message (${message.id}) from FIFO queue to ${queueData.name}")
moveMessageToQueue(regenerateDeduplicationId(message))
logger.debug(s"Moved message (${messageWithSourceQueueName.id}) from FIFO queue to ${queueData.name}")
moveMessageToQueue(regenerateDeduplicationId(messageWithSourceQueueName))
}
} else {
logger.debug(s"Moved message (${message.id}) to ${queueData.name}")
moveMessageToQueue(message)
logger.debug(s"Moved message (${messageWithSourceQueueName.id}) to ${queueData.name}")
moveMessageToQueue(messageWithSourceQueueName)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ class FifoDeduplicationIdsHistoryTest extends AnyFunSuite with Matchers {
nextDelivery = 100L,
content = "",
messageAttributes = Map.empty,
messageSystemAttributes = mutable.HashMap.empty,
created = created,
orderIndex = 0,
firstReceive = NeverReceived,
Expand All @@ -116,7 +115,8 @@ class FifoDeduplicationIdsHistoryTest extends AnyFunSuite with Matchers {
messageGroupId = None,
messageDeduplicationId = maybeDeduplicationId,
tracingId = None,
sequenceNumber = None
sequenceNumber = None,
deadLetterSourceQueueName = None
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ class InternalMessageSpec extends AnyFunSuite with Matchers {
nextDelivery = 0L,
content = "content",
messageAttributes = Map.empty,
messageSystemAttributes = mutable.HashMap.empty,
created = freezedDateTime,
orderIndex = 100,
firstReceive = NeverReceived,
Expand All @@ -27,7 +26,8 @@ class InternalMessageSpec extends AnyFunSuite with Matchers {
messageGroupId = None,
messageDeduplicationId = None,
tracingId = None,
sequenceNumber = None
sequenceNumber = None,
deadLetterSourceQueueName = None
)

val second = first.copy(
Expand All @@ -46,7 +46,6 @@ class InternalMessageSpec extends AnyFunSuite with Matchers {
nextDelivery = 0L,
content = "content",
messageAttributes = Map.empty,
messageSystemAttributes = mutable.HashMap.empty,
created = freezedDateTime,
orderIndex = 100,
firstReceive = NeverReceived,
Expand All @@ -55,7 +54,8 @@ class InternalMessageSpec extends AnyFunSuite with Matchers {
messageGroupId = None,
messageDeduplicationId = None,
tracingId = None,
sequenceNumber = None
sequenceNumber = None,
deadLetterSourceQueueName = None
)

val second = first.copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ class ReceiveRequestAttemptCacheTest extends AnyFunSuite with Matchers {
1L,
"content",
Map.empty,
mutable.HashMap.empty,
nowProvider.now,
orderIndex = 0,
NeverReceived,
Expand All @@ -32,7 +31,8 @@ class ReceiveRequestAttemptCacheTest extends AnyFunSuite with Matchers {
messageGroupId = None,
messageDeduplicationId = None,
tracingId = None,
sequenceNumber = None
sequenceNumber = None,
deadLetterSourceQueueName = None
)
val msg2 = msg1.copy(id = "id-2")
val msg3 = msg1.copy(id = "id-3")
Expand Down Expand Up @@ -74,7 +74,6 @@ class ReceiveRequestAttemptCacheTest extends AnyFunSuite with Matchers {
1L,
"content",
Map.empty,
mutable.HashMap.empty,
nowProvider.now,
orderIndex = 0,
NeverReceived,
Expand All @@ -83,7 +82,8 @@ class ReceiveRequestAttemptCacheTest extends AnyFunSuite with Matchers {
messageGroupId = None,
messageDeduplicationId = None,
tracingId = None,
sequenceNumber = None
sequenceNumber = None,
deadLetterSourceQueueName = None
)
val msg2 = msg1.copy(id = "id-2")
val messageQueue = MessageQueue(isFifo = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ trait DataCreationHelpers {
id: String,
content: String,
messageAttributes: Map[String, MessageAttribute],
messageSystemAttributes: Map[String, MessageAttribute],
nextDelivery: MillisNextDelivery,
deliveryReceipt: Option[DeliveryReceipt] = None,
messageGroupId: Option[String] = None,
Expand All @@ -47,14 +46,14 @@ trait DataCreationHelpers {
deliveryReceipt,
content,
messageAttributes,
messageSystemAttributes,
nextDelivery,
OffsetDateTimeUtil.ofEpochMilli(0),
MessageStatistics(NeverReceived, 0),
messageGroupId,
messageDeduplicationId,
tracingId,
None
sequenceNumber = None,
deadLetterSourceQueueName = None
)

def createNewMessageData(
Expand All @@ -70,26 +69,26 @@ trait DataCreationHelpers {
Some(MessageId(id)),
content,
messageAttributes,
Map.empty,
nextDelivery,
messageGroupId,
messageDeduplicationId,
orderIndex = 0,
tracingId,
None
sequenceNumber = None,
deadLetterSourceQueueName = None
)

def createNewMessageData(messageData: MessageData) =
NewMessageData(
Some(messageData.id),
messageData.content,
messageData.messageAttributes,
Map.empty,
messageData.nextDelivery,
messageData.messageGroupId,
messageData.messageDeduplicationId,
orderIndex = 0,
messageData.tracingId,
messageData.sequenceNumber
messageData.sequenceNumber,
messageData.deadLetterSourceQueueName
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import org.elasticmq.util.OffsetDateTimeUtil
import scalikejdbc.WrappedResultSet
import spray.json._

import scala.collection.mutable

case class DBMessage(
messageId: String,
deliveryReceipts: Array[Byte],
Expand All @@ -21,7 +19,8 @@ case class DBMessage(
groupId: Option[String],
deduplicationId: Option[String],
tracingId: Option[String],
sequenceNumber: Option[String]
sequenceNumber: Option[String],
deadLetterSourceQueueName: Option[String]
) {

def toInternalMessage: InternalMessage = {
Expand Down Expand Up @@ -50,7 +49,6 @@ case class DBMessage(
nextDelivery,
new String(content),
serializedAttrs,
mutable.HashMap.empty,
OffsetDateTimeUtil.ofEpochMilli(created),
orderIndex = 0,
firstReceive,
Expand All @@ -59,7 +57,8 @@ case class DBMessage(
groupId,
deduplicationId.map(id => DeduplicationId(id)),
tracingId.map(TracingId.apply),
sequenceNumber
sequenceNumber,
deadLetterSourceQueueName
)
}
}
Expand All @@ -78,7 +77,8 @@ object DBMessage {
rs.stringOpt("group_id"),
rs.stringOpt("deduplication_id"),
rs.stringOpt("tracing_id"),
rs.stringOpt("sequence_number")
rs.stringOpt("sequence_number"),
rs.stringOpt("dead_letter_source_queue_name")
)

def from(message: InternalMessage): DBMessage = {
Expand Down Expand Up @@ -113,7 +113,8 @@ object DBMessage {
message.messageGroupId,
deduplicationId,
message.tracingId.map(_.id),
message.sequenceNumber
message.sequenceNumber,
message.deadLetterSourceQueueName
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class MessageRepository(queueName: String, db: DB) extends Logging {
group_id varchar,
deduplication_id varchar,
tracing_id varchar,
sequence_number varchar
sequence_number varchar,
dead_letter_source_queue_name varchar
)""".execute.apply()

def drop(): Unit = {
Expand All @@ -50,7 +51,7 @@ class MessageRepository(queueName: String, db: DB) extends Logging {
def add(internalMessage: InternalMessage): Int = {
val message = DBMessage.from(internalMessage)
sql"""insert into $tableName
(message_id, delivery_receipts, next_delivery, content, attributes, created, received, receive_count, group_id, deduplication_id, tracing_id, sequence_number)
(message_id, delivery_receipts, next_delivery, content, attributes, created, received, receive_count, group_id, deduplication_id, tracing_id, sequence_number, dead_letter_source_queue_name)
values (${message.messageId},
${message.deliveryReceipts},
${message.nextDelivery},
Expand All @@ -62,7 +63,8 @@ class MessageRepository(queueName: String, db: DB) extends Logging {
${message.groupId},
${message.deduplicationId},
${message.tracingId},
${message.sequenceNumber})""".update.apply()
${message.sequenceNumber},
${message.deadLetterSourceQueueName})""".update.apply()
}

def update(internalMessage: InternalMessage): Int = {
Expand All @@ -74,7 +76,8 @@ class MessageRepository(queueName: String, db: DB) extends Logging {
received = ${message.received},
receive_count = ${message.receiveCount},
tracing_id = ${message.tracingId},
sequence_number = ${message.sequenceNumber}
sequence_number = ${message.sequenceNumber},
dead_letter_source_queue_name = ${message.deadLetterSourceQueueName}
where message_id = ${message.messageId}""".update.apply()
}

Expand Down
Loading

0 comments on commit 742b436

Please sign in to comment.