Skip to content

Commit

Permalink
[ISSUE#46] Add support for wildcard subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
deepanshu42 committed Sep 4, 2023
1 parent 8f852ce commit 9188176
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 13 deletions.
2 changes: 1 addition & 1 deletion app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class MainActivity : AppCompatActivity() {
}

subscribe.setOnClickListener {
courierService.subscribe(topic = topic.text.toString())
courierService.subscribe(topic = topic.text.toString()).subscribe()
}

unsubscribe.setOnClickListener {
Expand Down
3 changes: 3 additions & 0 deletions courier-core/api/courier-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ public final class com/gojek/courier/extensions/CollectionExtensionsKt {
public static final fun toImmutableSet (Ljava/util/Set;)Ljava/util/Set;
}

public final class com/gojek/courier/extensions/StringExtensionsKt {
}

public final class com/gojek/courier/extensions/TimeUnitExtensionsKt {
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.gojek.courier.extensions

import androidx.annotation.RestrictTo

@RestrictTo(RestrictTo.Scope.LIBRARY)
fun String.isWildCardTopic(): Boolean {
return startsWith("+/") || contains("/+/") || endsWith("/+") || endsWith("/#")
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.gojek.mqtt.client

import com.gojek.courier.extensions.fromSecondsToNanos
import com.gojek.courier.extensions.isWildCardTopic
import com.gojek.courier.logging.ILogger
import com.gojek.courier.utils.Clock
import com.gojek.mqtt.client.listener.MessageListener
Expand Down Expand Up @@ -47,6 +48,8 @@ internal class IncomingMsgControllerImpl(

private val listenerMap = ConcurrentHashMap<String, List<MessageListener>>()

private val wildcardTopicListenerMap = ConcurrentHashMap<String, List<MessageListener>>()

private var cleanupFuture: ScheduledFuture<*>? = null

override fun triggerHandleMessage() {
Expand All @@ -64,40 +67,61 @@ internal class IncomingMsgControllerImpl(

@Synchronized
override fun registerListener(topic: String, listener: MessageListener) {
listenerMap[topic] = (listenerMap[topic] ?: emptyList()) + listener
if (topic.isWildCardTopic()) {
wildcardTopicListenerMap[topic] = (wildcardTopicListenerMap[topic] ?: emptyList()) + listener
} else {
listenerMap[topic] = (listenerMap[topic] ?: emptyList()) + listener
}
triggerHandleMessage()
}

@Synchronized
override fun unregisterListener(topic: String, listener: MessageListener) {
listenerMap[topic] = (listenerMap[topic] ?: emptyList()) - listener
if (listenerMap[topic]!!.isEmpty()) {
listenerMap.remove(topic)
if (topic.isWildCardTopic()) {
wildcardTopicListenerMap[topic] = (wildcardTopicListenerMap[topic] ?: emptyList()) - listener
if (wildcardTopicListenerMap[topic]!!.isEmpty()) {
wildcardTopicListenerMap.remove(topic)
}
} else {
listenerMap[topic] = (listenerMap[topic] ?: emptyList()) - listener
if (listenerMap[topic]!!.isEmpty()) {
listenerMap.remove(topic)
}
}
}

private inner class HandleMessage : Runnable {
override fun run() {
try {
if (listenerMap.keys.isEmpty()) {
if (listenerMap.keys.isEmpty() && wildcardTopicListenerMap.isEmpty()) {
logger.d(TAG, "No listeners registered")
return
}
val messages: List<MqttReceivePacket> =
mqttReceivePersistence.getAllIncomingMessagesWithTopicFilter(listenerMap.keys)
if (mqttUtils.isEmpty(messages)) {
logger.d(TAG, "No Messages in Table")
return
}
val deletedMsgIds = mutableListOf<Long>()
for (message in messages) {
logger.d(TAG, "Going to process ${message.messageId}")
val listenersNotified = notifyListeners(message)
val listenersNotified = notifyListeners(message, listenerMap[message.topic]!!)
if (listenersNotified) {
deletedMsgIds.add(message.messageId)
}
logger.d(TAG, "Successfully Processed Message ${message.messageId}")
}
// processing messages for wildcard topic subscription
for (wildCardTopic in wildcardTopicListenerMap.keys()) {
val topic = parseWildCardTopic(wildCardTopic)
val wildcardMessages: List<MqttReceivePacket> =
mqttReceivePersistence.getAllIncomingMessagesForWildCardTopic(topic)
for (message in wildcardMessages) {
logger.d(TAG, "Going to process ${message.messageId}")
val listenersNotified = notifyListeners(message, wildcardTopicListenerMap[wildCardTopic]!!)
if (listenersNotified) {
deletedMsgIds.add(message.messageId)
}
logger.d(TAG, "Successfully Processed Message ${message.messageId}")
}
}
if (deletedMsgIds.isNotEmpty()) {
val deletedMessagesCount = deleteMessages(deletedMsgIds)
logger.d(TAG, "Deleted $deletedMessagesCount messages")
Expand All @@ -112,6 +136,12 @@ internal class IncomingMsgControllerImpl(
}
}

private fun parseWildCardTopic(topic: String): String {
var updatedTopic: String = topic.replace("+", "_%")
updatedTopic = updatedTopic.replace("#", "_%")
return updatedTopic
}

private inner class CleanupExpiredMessages : Runnable {
override fun run() {
logger.d(TAG, "Deleting expired messages")
Expand All @@ -123,10 +153,10 @@ internal class IncomingMsgControllerImpl(
}
}

private fun notifyListeners(message: MqttReceivePacket): Boolean {
private fun notifyListeners(message: MqttReceivePacket, listeners: List<MessageListener>): Boolean {
var notified = false
try {
listenerMap[message.topic]!!.forEach {
listeners.forEach {
notified = true
it.onMessageReceived(message.toMqttMessage())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ internal interface IMqttReceivePersistence {
fun getAllIncomingMessagesWithTopicFilter(topics: Set<String>): List<MqttReceivePacket>
fun removeReceivedMessages(messageIds: List<Long>): Int
fun removeMessagesWithOlderTimestamp(timestampNanos: Long): Int
fun getAllIncomingMessagesForWildCardTopic(topic: String): List<MqttReceivePacket>
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ internal interface IncomingMessagesDao {
@Query("SELECT * from incoming_messages where topic in (:topics)")
fun getAllMessagesWithTopicFilter(topics: Set<String>): List<MqttReceivePacket>

@Query("SELECT * from incoming_messages where topic LIKE :topic")
fun getAllIncomingMessagesForWildCardTopic(topic: String): List<MqttReceivePacket>

@Query("DELETE from incoming_messages")
fun clearAllMessages()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ internal class PahoPersistence(private val context: Context) :
return incomingMessagesDao.getAllMessagesWithTopicFilter(topics)
}

override fun getAllIncomingMessagesForWildCardTopic(topic: String): List<MqttReceivePacket> {
return incomingMessagesDao.getAllIncomingMessagesForWildCardTopic(topic)
}

override fun removeReceivedMessages(messageIds: List<Long>): Int {
return incomingMessagesDao.removeMessagesById(messageIds)
}
Expand Down

0 comments on commit 9188176

Please sign in to comment.