Skip to content

Commit

Permalink
Merge branch 'main' into file_rename
Browse files Browse the repository at this point in the history
  • Loading branch information
deepanshu42 authored Sep 13, 2023
2 parents 0223798 + 8f852ce commit f10c6bd
Show file tree
Hide file tree
Showing 70 changed files with 901 additions and 235 deletions.
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package com.gojek.courier.app.data.network

import com.gojek.courier.QoS
import com.gojek.courier.annotation.Callback
import com.gojek.courier.annotation.Data
import com.gojek.courier.annotation.Path
import com.gojek.courier.annotation.Send
import com.gojek.courier.annotation.Subscribe
import com.gojek.courier.annotation.Unsubscribe
import com.gojek.courier.app.data.network.model.Message
import com.gojek.courier.callback.SendMessageCallback
import io.reactivex.Observable

interface CourierService {
@Send(topic = "{topic}", qos = QoS.ONE)
fun publish(@Path("topic") topic: String, @Data message: Message)
fun publish(@Path("topic") topic: String, @Data message: Message, @Callback callback: SendMessageCallback)

@Subscribe(topic = "{topic}")
fun subscribe(@Path("topic") topic: String): Observable<Message>
Expand Down
27 changes: 24 additions & 3 deletions app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.gojek.courier.app.ui

import android.os.Bundle
import android.util.Log
import androidx.appcompat.app.AppCompatActivity
import com.gojek.chuckmqtt.external.MqttChuckConfig
import com.gojek.chuckmqtt.external.MqttChuckInterceptor
Expand All @@ -9,6 +10,7 @@ import com.gojek.courier.Courier
import com.gojek.courier.app.R
import com.gojek.courier.app.data.network.CourierService
import com.gojek.courier.app.data.network.model.Message
import com.gojek.courier.callback.SendMessageCallback
import com.gojek.courier.logging.ILogger
import com.gojek.courier.messageadapter.gson.GsonMessageAdapterFactory
import com.gojek.courier.streamadapter.rxjava2.RxJava2StreamAdapterFactory
Expand Down Expand Up @@ -78,7 +80,24 @@ class MainActivity : AppCompatActivity() {
send.setOnClickListener {
courierService.publish(
topic = topic.text.toString(),
message = Message(123, message.text.toString())
message = Message(123, message.text.toString()),
callback = object : SendMessageCallback {
override fun onMessageSendTrigger() {
Log.d("Courier", "onMessageSendTrigger")
}

override fun onMessageWrittenOnSocket() {
Log.d("Courier", "onMessageWrittenOnSocket")
}

override fun onMessageSendSuccess() {
Log.d("Courier", "onMessageSendSuccess")
}

override fun onMessageSendFailure(error: Throwable) {
Log.d("Courier", "onMessageSendFailure")
}
}
)
}

Expand Down Expand Up @@ -107,7 +126,6 @@ class MainActivity : AppCompatActivity() {
private fun initialiseCourier() {
val mqttConfig = MqttV3Configuration(
logger = getLogger(),
eventHandler = eventHandler,
authenticator = object : Authenticator {
override fun authenticate(
connectOptions: MqttConnectOptions,
Expand All @@ -130,12 +148,15 @@ class MainActivity : AppCompatActivity() {
),
inactivityTimeoutSeconds = 45,
activityCheckIntervalSeconds = 30,
connectPacketTimeoutSeconds = 5,
incomingMessagesTTLSecs = 60,
incomingMessagesCleanupIntervalSecs = 10,
maxInflightMessagesLimit = 1000,
),
pingSender = WorkPingSenderFactory.createMqttPingSender(applicationContext, WorkManagerPingSenderConfig())
pingSender = WorkPingSenderFactory.createMqttPingSender(applicationContext, WorkManagerPingSenderConfig(sendForcePing = true))
)
mqttClient = MqttClientFactory.create(this, mqttConfig)
mqttClient.addEventHandler(eventHandler)

val configuration = Courier.Configuration(
client = mqttClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ internal class MqttTransactionUiModelMapper : Mapper<MqttTransactionDomainModel,
is MqttPublish -> {
sb.append("PUBLISH \n")
sb.append("Qos : ${mqttWireMessage.message.qos} \n")
if (mqttWireMessage.message.qos > 0) {
if (mqttWireMessage.message.qos > 0 || mqttWireMessage.message.type > 2) {
sb.append("MsgId : ${mqttWireMessage.messageId} \n")
}
sb.append("Retained : ${mqttWireMessage.message.isRetained} \n")
Expand Down
18 changes: 18 additions & 0 deletions courier-core/api/courier-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ public abstract interface class com/gojek/courier/MessageAdapter$Factory {

public final class com/gojek/courier/QoS : java/lang/Enum {
public static final field ONE Lcom/gojek/courier/QoS;
public static final field ONE_WITHOUT_PERSISTENCE_AND_NO_RETRY Lcom/gojek/courier/QoS;
public static final field ONE_WITHOUT_PERSISTENCE_AND_RETRY Lcom/gojek/courier/QoS;
public static final field TWO Lcom/gojek/courier/QoS;
public static final field ZERO Lcom/gojek/courier/QoS;
public final fun getType ()I
public final fun getValue ()I
public static fun valueOf (Ljava/lang/String;)Lcom/gojek/courier/QoS;
public static fun values ()[Lcom/gojek/courier/QoS;
Expand Down Expand Up @@ -49,6 +52,21 @@ public abstract interface class com/gojek/courier/StreamAdapter$Factory {
public abstract fun create (Ljava/lang/reflect/Type;)Lcom/gojek/courier/StreamAdapter;
}

public final class com/gojek/courier/callback/NoOpSendMessageCallback : com/gojek/courier/callback/SendMessageCallback {
public static final field INSTANCE Lcom/gojek/courier/callback/NoOpSendMessageCallback;
public fun onMessageSendFailure (Ljava/lang/Throwable;)V
public fun onMessageSendSuccess ()V
public fun onMessageSendTrigger ()V
public fun onMessageWrittenOnSocket ()V
}

public abstract interface class com/gojek/courier/callback/SendMessageCallback {
public abstract fun onMessageSendFailure (Ljava/lang/Throwable;)V
public abstract fun onMessageSendSuccess ()V
public abstract fun onMessageSendTrigger ()V
public abstract fun onMessageWrittenOnSocket ()V
}

public final class com/gojek/courier/extensions/CollectionExtensionsKt {
public static final fun toImmutableMap (Ljava/util/Map;)Ljava/util/Map;
public static final fun toImmutableSet (Ljava/util/Set;)Ljava/util/Set;
Expand Down
22 changes: 18 additions & 4 deletions courier-core/src/main/java/com/gojek/courier/QoS.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,21 @@
package com.gojek.courier

enum class QoS(val value: Int) {
ZERO(0),
ONE(1),
TWO(2)
enum class QoS(val value: Int, val type: Int) {
// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718100
ZERO(0, 0),

// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718100
ONE(1, 1),

// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718100
TWO(2, 2),

/** Like QoS1, Message delivery is acknowledged with PUBACK, but unlike QoS1 messages are
neither persisted nor retried at send after one attempt.
The message arrives at the receiver either once or not at all **/
ONE_WITHOUT_PERSISTENCE_AND_NO_RETRY(0, 3),

/** Like QoS1, Message delivery is acknowledged with PUBACK, but unlike QoS1 messages are
not persisted. The messages are retried within active connection if delivery is not acknowledged.**/
ONE_WITHOUT_PERSISTENCE_AND_RETRY(0, 4)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.gojek.courier.callback

object NoOpSendMessageCallback : SendMessageCallback {
override fun onMessageSendTrigger() {
// no-op
}

override fun onMessageWrittenOnSocket() {
// no-op
}

override fun onMessageSendSuccess() {
// no-op
}

override fun onMessageSendFailure(error: Throwable) {
// no-op
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.gojek.courier.callback

interface SendMessageCallback {
fun onMessageSendTrigger()
fun onMessageWrittenOnSocket()
fun onMessageSendSuccess()
fun onMessageSendFailure(error: Throwable)
}
13 changes: 13 additions & 0 deletions courier/api/courier.api
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
public final class com/gojek/courier/Courier {
public fun <init> (Lcom/gojek/courier/Courier$Configuration;)V
public final fun create (Ljava/lang/Class;)Ljava/lang/Object;
public final fun getConnectionState ()Lcom/gojek/mqtt/client/model/ConnectionState;
public final fun getEventStream ()Lcom/gojek/courier/Stream;
public final fun newBuilder ()Lcom/gojek/courier/Courier$Builder;
}

public final class com/gojek/courier/Courier$Builder {
public fun <init> (Lcom/gojek/courier/Courier$Configuration;)V
public final fun addMessageAdapterFactories (Ljava/util/List;)Lcom/gojek/courier/Courier$Builder;
public final fun addStreamAdapterFactories (Ljava/util/List;)Lcom/gojek/courier/Courier$Builder;
public final fun build ()Lcom/gojek/courier/Courier;
}

public final class com/gojek/courier/Courier$Configuration {
Expand All @@ -21,6 +31,9 @@ public final class com/gojek/courier/Courier$Configuration {
public fun toString ()Ljava/lang/String;
}

public abstract interface annotation class com/gojek/courier/annotation/Callback : java/lang/annotation/Annotation {
}

public abstract interface annotation class com/gojek/courier/annotation/Data : java/lang/annotation/Annotation {
}

Expand Down
37 changes: 36 additions & 1 deletion courier/src/main/java/com/gojek/courier/Courier.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import com.gojek.courier.utils.MessageAdapterResolver
import com.gojek.courier.utils.RuntimePlatform
import com.gojek.courier.utils.StreamAdapterResolver
import com.gojek.mqtt.client.MqttClient
import com.gojek.mqtt.client.model.ConnectionState
import com.gojek.mqtt.event.MqttEvent

class Courier(configuration: Configuration) {
class Courier(private val configuration: Configuration) {
private val stubInterfaceFactory: StubInterface.Factory
private val proxyFactory: ProxyFactory
private val coordinator: Coordinator
Expand Down Expand Up @@ -42,6 +44,18 @@ class Courier(configuration: Configuration) {
*/
inline fun <reified T : Any> create(): T = create(T::class.java)

fun getEventStream(): Stream<MqttEvent> {
return coordinator.getEventStream()
}

fun getConnectionState(): ConnectionState {
return coordinator.getConnectionState()
}

fun newBuilder(): Builder {
return Builder(configuration)
}

data class Configuration(
val client: MqttClient,
val streamAdapterFactories: List<StreamAdapter.Factory> = emptyList(),
Expand All @@ -56,4 +70,25 @@ class Courier(configuration: Configuration) {
private fun Configuration.createMessageAdapterResolver(): MessageAdapterResolver {
return MessageAdapterResolver(messageAdapterFactories)
}

class Builder(private var configuration: Configuration) {

fun addMessageAdapterFactories(messageAdapterFactories: List<MessageAdapter.Factory>): Builder {
configuration = configuration.copy(
messageAdapterFactories = configuration.messageAdapterFactories + messageAdapterFactories
)
return this
}

fun addStreamAdapterFactories(streamAdapterFactories: List<StreamAdapter.Factory>): Builder {
configuration = configuration.copy(
streamAdapterFactories = configuration.streamAdapterFactories + streamAdapterFactories
)
return this
}

fun build(): Courier {
return Courier(configuration)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.gojek.courier.annotation

@MustBeDocumented
@Target(AnnotationTarget.VALUE_PARAMETER)
@kotlin.annotation.Retention(AnnotationRetention.RUNTIME)
annotation class Callback
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.gojek.courier.annotation.parser

import com.gojek.courier.QoS
import com.gojek.courier.annotation.Callback
import com.gojek.courier.annotation.Data
import com.gojek.courier.annotation.Path
import com.gojek.courier.annotation.Receive
Expand All @@ -13,6 +14,7 @@ import com.gojek.courier.argument.processor.ReceiveArgumentProcessor
import com.gojek.courier.argument.processor.SendArgumentProcessor
import com.gojek.courier.argument.processor.SubscriptionArgumentProcessor
import com.gojek.courier.argument.processor.UnsubscriptionArgumentProcessor
import com.gojek.courier.callback.SendMessageCallback
import com.gojek.courier.stub.StubMethod
import com.gojek.courier.utils.MessageAdapterResolver
import com.gojek.courier.utils.StreamAdapterResolver
Expand Down Expand Up @@ -87,7 +89,8 @@ internal class MethodAnnotationsParser(
val messageType = method.getDataParameterType(dataParameterIndex)
val annotations = method.getDataParameterAnnotations(dataParameterIndex)
val adapter = messageAdapterResolver.resolve(messageType, annotations)
val argumentProcessor = SendArgumentProcessor(pathMap, topic, dataParameterIndex)
val callbackIndex = method.getCallbackParameterIndex()
val argumentProcessor = SendArgumentProcessor(pathMap, topic, dataParameterIndex, callbackIndex)
stubMethod = StubMethod.Send(adapter, qos, argumentProcessor)
}

Expand Down Expand Up @@ -226,7 +229,7 @@ internal class MethodAnnotationsParser(
private val PARAM_NAME_REGEX = Pattern.compile(PARAM)

private fun Annotation.isParameterAnnotation(): Boolean {
return this is Path || this is Data || this is TopicMap
return this is Path || this is Data || this is TopicMap || this is Callback
}

private fun Annotation.isStubMethodAnnotation(): Boolean {
Expand Down Expand Up @@ -268,6 +271,29 @@ internal class MethodAnnotationsParser(
return index
}

private fun Method.getCallbackParameterIndex(): Int {
var index = -1
for (parameterIndex in parameterAnnotations.indices) {
val parameterAnnotations = parameterAnnotations[parameterIndex]
val annotations = parameterAnnotations.filter { it.isParameterAnnotation() }
require(annotations.size == 1) {
"A parameter must have one and only one parameter annotation: $parameterIndex"
}
if (annotations.first() is Callback) {
if (index == -1) {
index = parameterIndex
break
} else {
throw IllegalArgumentException("Multiple parameters found with @Callback annotation")
}
}
}
if (index != -1 && parameterTypes[index] != SendMessageCallback::class.java) {
throw IllegalArgumentException("Parameter annotated with @Callback should be of type SendMessageCallback: ${parameterTypes[index]}")
}
return index
}

private fun Method.getDataParameterType(index: Int): Type {
return parameterTypes[index]
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package com.gojek.courier.argument.processor

import com.gojek.courier.callback.NoOpSendMessageCallback
import com.gojek.courier.callback.SendMessageCallback

internal class SendArgumentProcessor(
private val pathMap: Map<String, Int>,
private val topic: String,
private val dataParameterIndex: Int
private val dataParameterIndex: Int,
private val callbackIndex: Int
) : ArgumentProcessor() {
private var parsedTopic = topic

Expand All @@ -21,4 +25,12 @@ internal class SendArgumentProcessor(
fun getDataArgument(args: Array<Any>): Any {
return args[dataParameterIndex]
}

fun getCallbackArgument(args: Array<Any>): SendMessageCallback {
return if (callbackIndex == -1) {
NoOpSendMessageCallback
} else {
args[callbackIndex] as SendMessageCallback
}
}
}
Loading

0 comments on commit f10c6bd

Please sign in to comment.