diff --git a/app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt b/app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt index 68848d19..6c80b2be 100644 --- a/app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt +++ b/app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt @@ -102,7 +102,7 @@ class MainActivity : AppCompatActivity() { } subscribe.setOnClickListener { - courierService.subscribe(topic = topic.text.toString()) + courierService.subscribe(topic = topic.text.toString()).subscribe() } unsubscribe.setOnClickListener { diff --git a/courier-core/api/courier-core.api b/courier-core/api/courier-core.api index 11bf9339..21f4bfd6 100644 --- a/courier-core/api/courier-core.api +++ b/courier-core/api/courier-core.api @@ -72,6 +72,9 @@ public final class com/gojek/courier/extensions/CollectionExtensionsKt { public static final fun toImmutableSet (Ljava/util/Set;)Ljava/util/Set; } +public final class com/gojek/courier/extensions/StringExtensionsKt { +} + public final class com/gojek/courier/extensions/TimeUnitExtensionsKt { } diff --git a/courier-core/src/main/java/com/gojek/courier/extensions/StringExtensions.kt b/courier-core/src/main/java/com/gojek/courier/extensions/StringExtensions.kt new file mode 100644 index 00000000..042fe65a --- /dev/null +++ b/courier-core/src/main/java/com/gojek/courier/extensions/StringExtensions.kt @@ -0,0 +1,8 @@ +package com.gojek.courier.extensions + +import androidx.annotation.RestrictTo + +@RestrictTo(RestrictTo.Scope.LIBRARY) +fun String.isWildCardTopic(): Boolean { + return startsWith("+/") || contains("/+/") || endsWith("/+") || endsWith("/#") +} diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/IncomingMsgControllerImpl.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/IncomingMsgControllerImpl.kt index d36a1688..8c1797a1 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/IncomingMsgControllerImpl.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/IncomingMsgControllerImpl.kt @@ -1,6 +1,7 @@ package com.gojek.mqtt.client import com.gojek.courier.extensions.fromSecondsToNanos +import com.gojek.courier.extensions.isWildCardTopic import com.gojek.courier.logging.ILogger import com.gojek.courier.utils.Clock import com.gojek.mqtt.client.listener.MessageListener @@ -47,6 +48,8 @@ internal class IncomingMsgControllerImpl( private val listenerMap = ConcurrentHashMap>() + private val wildcardTopicListenerMap = ConcurrentHashMap>() + private var cleanupFuture: ScheduledFuture<*>? = null override fun triggerHandleMessage() { @@ -64,40 +67,61 @@ internal class IncomingMsgControllerImpl( @Synchronized override fun registerListener(topic: String, listener: MessageListener) { - listenerMap[topic] = (listenerMap[topic] ?: emptyList()) + listener + if (topic.isWildCardTopic()) { + wildcardTopicListenerMap[topic] = (wildcardTopicListenerMap[topic] ?: emptyList()) + listener + } else { + listenerMap[topic] = (listenerMap[topic] ?: emptyList()) + listener + } triggerHandleMessage() } @Synchronized override fun unregisterListener(topic: String, listener: MessageListener) { - listenerMap[topic] = (listenerMap[topic] ?: emptyList()) - listener - if (listenerMap[topic]!!.isEmpty()) { - listenerMap.remove(topic) + if (topic.isWildCardTopic()) { + wildcardTopicListenerMap[topic] = (wildcardTopicListenerMap[topic] ?: emptyList()) - listener + if (wildcardTopicListenerMap[topic]!!.isEmpty()) { + wildcardTopicListenerMap.remove(topic) + } + } else { + listenerMap[topic] = (listenerMap[topic] ?: emptyList()) - listener + if (listenerMap[topic]!!.isEmpty()) { + listenerMap.remove(topic) + } } } private inner class HandleMessage : Runnable { override fun run() { try { - if (listenerMap.keys.isEmpty()) { + if (listenerMap.keys.isEmpty() && wildcardTopicListenerMap.isEmpty()) { logger.d(TAG, "No listeners registered") return } val messages: List = mqttReceivePersistence.getAllIncomingMessagesWithTopicFilter(listenerMap.keys) - if (mqttUtils.isEmpty(messages)) { - logger.d(TAG, "No Messages in Table") - return - } val deletedMsgIds = mutableListOf() for (message in messages) { logger.d(TAG, "Going to process ${message.messageId}") - val listenersNotified = notifyListeners(message) + val listenersNotified = notifyListeners(message, listenerMap[message.topic]!!) if (listenersNotified) { deletedMsgIds.add(message.messageId) } logger.d(TAG, "Successfully Processed Message ${message.messageId}") } + // processing messages for wildcard topic subscription + for (wildCardTopic in wildcardTopicListenerMap.keys()) { + val topic = parseWildCardTopic(wildCardTopic) + val wildcardMessages: List = + mqttReceivePersistence.getAllIncomingMessagesForWildCardTopic(topic) + for (message in wildcardMessages) { + logger.d(TAG, "Going to process ${message.messageId}") + val listenersNotified = notifyListeners(message, wildcardTopicListenerMap[wildCardTopic]!!) + if (listenersNotified) { + deletedMsgIds.add(message.messageId) + } + logger.d(TAG, "Successfully Processed Message ${message.messageId}") + } + } if (deletedMsgIds.isNotEmpty()) { val deletedMessagesCount = deleteMessages(deletedMsgIds) logger.d(TAG, "Deleted $deletedMessagesCount messages") @@ -112,6 +136,12 @@ internal class IncomingMsgControllerImpl( } } + private fun parseWildCardTopic(topic: String): String { + var updatedTopic: String = topic.replace("+", "_%") + updatedTopic = updatedTopic.replace("#", "_%") + return updatedTopic + } + private inner class CleanupExpiredMessages : Runnable { override fun run() { logger.d(TAG, "Deleting expired messages") @@ -123,10 +153,10 @@ internal class IncomingMsgControllerImpl( } } - private fun notifyListeners(message: MqttReceivePacket): Boolean { + private fun notifyListeners(message: MqttReceivePacket, listeners: List): Boolean { var notified = false try { - listenerMap[message.topic]!!.forEach { + listeners.forEach { notified = true it.onMessageReceived(message.toMqttMessage()) } diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/persistence/IMqttReceivePersistence.kt b/mqtt-client/src/main/java/com/gojek/mqtt/persistence/IMqttReceivePersistence.kt index e7dcb280..c0a8a605 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/persistence/IMqttReceivePersistence.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/persistence/IMqttReceivePersistence.kt @@ -7,4 +7,5 @@ internal interface IMqttReceivePersistence { fun getAllIncomingMessagesWithTopicFilter(topics: Set): List fun removeReceivedMessages(messageIds: List): Int fun removeMessagesWithOlderTimestamp(timestampNanos: Long): Int + fun getAllIncomingMessagesForWildCardTopic(topic: String): List } diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/persistence/dao/IncomingMessagesDao.kt b/mqtt-client/src/main/java/com/gojek/mqtt/persistence/dao/IncomingMessagesDao.kt index 5aedd4cb..8f8783a9 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/persistence/dao/IncomingMessagesDao.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/persistence/dao/IncomingMessagesDao.kt @@ -13,6 +13,9 @@ internal interface IncomingMessagesDao { @Query("SELECT * from incoming_messages where topic in (:topics)") fun getAllMessagesWithTopicFilter(topics: Set): List + @Query("SELECT * from incoming_messages where topic LIKE :topic") + fun getAllIncomingMessagesForWildCardTopic(topic: String): List + @Query("DELETE from incoming_messages") fun clearAllMessages() diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/persistence/impl/PahoPersistence.kt b/mqtt-client/src/main/java/com/gojek/mqtt/persistence/impl/PahoPersistence.kt index caf29d31..c0492a66 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/persistence/impl/PahoPersistence.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/persistence/impl/PahoPersistence.kt @@ -86,6 +86,10 @@ internal class PahoPersistence(private val context: Context) : return incomingMessagesDao.getAllMessagesWithTopicFilter(topics) } + override fun getAllIncomingMessagesForWildCardTopic(topic: String): List { + return incomingMessagesDao.getAllIncomingMessagesForWildCardTopic(topic) + } + override fun removeReceivedMessages(messageIds: List): Int { return incomingMessagesDao.removeMessagesById(messageIds) }