Skip to content

Commit

Permalink
Fix force ping request logic
Browse files Browse the repository at this point in the history
  • Loading branch information
deepanshu42 committed Jul 18, 2023
1 parent 7ae7f5b commit b7da0c7
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 36 deletions.
2 changes: 1 addition & 1 deletion app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class MainActivity : AppCompatActivity() {
incomingMessagesCleanupIntervalSecs = 10,
maxInflightMessagesLimit = 1000,
),
pingSender = WorkPingSenderFactory.createMqttPingSender(applicationContext, WorkManagerPingSenderConfig())
pingSender = WorkPingSenderFactory.createMqttPingSender(applicationContext, WorkManagerPingSenderConfig(sendForcePing = true))
)
mqttClient = MqttClientFactory.create(this, mqttConfig)
mqttClient.addEventHandler(eventHandler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ public IMqttToken checkPing(Object userContext, IMqttActionListener callback) th
// @TRACE 117=>
logger.d(TAG, "checking for ping");

token = comms.checkForActivity();
token = comms.checkForActivity(false);
// @TRACE 118=<

return token;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void run()
{
// @Trace 660=Check schedule at {0}
logger.d(TAG, "in ping timer task run function");
comms.checkForActivity();
comms.checkForActivity(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -758,10 +758,10 @@ public void run() {
/*
* Check and send a ping if needed and check for ping timeout. Need to send a ping if nothing has been sent or received in the last keepalive interval.
*/
public MqttToken checkForActivity() {
public MqttToken checkForActivity(Boolean forcePing) {
MqttToken token = null;
try {
token = clientState.checkForActivity();
token = clientState.checkForActivity(forcePing);
} catch (MqttException e) {
handleRunException(e);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ protected void undo(MqttPublish message) throws MqttPersistenceException
*
* @return token of ping command, null if no ping command has been sent.
*/
public MqttToken checkForActivity() throws MqttException
public MqttToken checkForActivity(Boolean forcePing) throws MqttException
{
final String methodName = "checkForActivity";

Expand All @@ -672,7 +672,7 @@ public MqttToken checkForActivity() throws MqttException
long lastActivity = lastInboundActivity;

// Is a ping required?
if (time - lastActivity + keepAliveMargin >= this.keepAlive)
if (forcePing || (time - lastActivity + keepAliveMargin >= this.keepAlive))
{

// @TRACE 620=ping needed. keepAlive={0} lastOutboundActivity={1} lastInboundActivity={2}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,7 @@ internal class AlarmPingSender(
serverUri = comms.client.serverURI
}
pingSenderEvents.mqttPingInitiated(serverUri, comms.keepAlive.fromMillisToSeconds())
val token = if (alarmPingSenderConfig.sendForcePing) {
comms.sendPingRequest()
} else {
comms.checkForActivity()
}
val token = comms.checkForActivity(alarmPingSenderConfig.sendForcePing)

// No ping has been sent.
if (token == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,7 @@ internal class TimerPingSender(
val serverUri = comms.client?.serverURI ?: ""
val keepAliveMillis = comms.keepAlive
pingSenderEvents.mqttPingInitiated(comms.client.serverURI, keepAliveMillis.fromMillisToSeconds())
val token = if (pingSenderConfig.sendForcePing) {
comms.sendPingRequest()
} else {
comms.checkForActivity()
}
val token = comms.checkForActivity(pingSenderConfig.sendForcePing)
if (token == null) {
logger.d(TAG, "Mqtt Ping Token null")
pingSenderEvents.pingMqttTokenNull(serverUri, keepAliveMillis.fromMillisToSeconds())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@ class TimerPingSenderTest {
val mqttClient = mock<IMqttAsyncClient>()
val testUri = "test-uri"
val keepaliveMillis = 30000L
whenever(pingSenderConfig.sendForcePing).thenReturn(false)
whenever(comms.client).thenReturn(mqttClient)
whenever(mqttClient.serverURI).thenReturn(testUri)
whenever(comms.keepAlive).thenReturn(keepaliveMillis)
whenever(comms.checkForActivity()).thenReturn(null)
whenever(comms.checkForActivity(false)).thenReturn(null)

pingSender.PingTask().run()

Expand All @@ -92,10 +93,36 @@ class TimerPingSenderTest {
val keepaliveMillis = 30000L
val startTime = TimeUnit.MILLISECONDS.toNanos(100)
val endTime = TimeUnit.MILLISECONDS.toNanos(110)
whenever(pingSenderConfig.sendForcePing).thenReturn(false)
whenever(comms.client).thenReturn(mqttClient)
whenever(mqttClient.serverURI).thenReturn(testUri)
whenever(comms.keepAlive).thenReturn(keepaliveMillis)
whenever(comms.checkForActivity()).thenReturn(mqttToken)
whenever(comms.checkForActivity(false)).thenReturn(mqttToken)
whenever(clock.nanoTime()).thenReturn(startTime, endTime)

pingSender.PingTask().run()

verify(pingSenderEvents).mqttPingInitiated(testUri, keepaliveMillis / 1000)

val argumentCaptor = argumentCaptor<IMqttActionListener>()
verify(mqttToken).actionCallback = argumentCaptor.capture()
argumentCaptor.lastValue.onSuccess(mqttToken)
verify(pingSenderEvents).pingEventSuccess(testUri, 10, keepaliveMillis / 1000)
}

@Test
fun `test sendPing when ping can be sent successfully with sendForcePing=true`() {
val mqttClient = mock<IMqttAsyncClient>()
val mqttToken = mock<MqttToken>()
val testUri = "test-uri"
val keepaliveMillis = 30000L
val startTime = TimeUnit.MILLISECONDS.toNanos(100)
val endTime = TimeUnit.MILLISECONDS.toNanos(110)
whenever(pingSenderConfig.sendForcePing).thenReturn(true)
whenever(comms.client).thenReturn(mqttClient)
whenever(mqttClient.serverURI).thenReturn(testUri)
whenever(comms.keepAlive).thenReturn(keepaliveMillis)
whenever(comms.checkForActivity(true)).thenReturn(mqttToken)
whenever(clock.nanoTime()).thenReturn(startTime, endTime)

pingSender.PingTask().run()
Expand Down Expand Up @@ -141,10 +168,11 @@ class TimerPingSenderTest {
val keepaliveMillis = 30000L
val startTime = TimeUnit.MILLISECONDS.toNanos(100)
val endTime = TimeUnit.MILLISECONDS.toNanos(110)
whenever(pingSenderConfig.sendForcePing).thenReturn(false)
whenever(comms.client).thenReturn(mqttClient)
whenever(mqttClient.serverURI).thenReturn(testUri)
whenever(comms.keepAlive).thenReturn(keepaliveMillis)
whenever(comms.checkForActivity()).thenReturn(mqttToken)
whenever(comms.checkForActivity(false)).thenReturn(mqttToken)
whenever(clock.nanoTime()).thenReturn(startTime, endTime)

pingSender.PingTask().run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,7 @@ internal class WorkManagerPingSender(
val keepAliveMillis = comms.keepAlive
pingSenderEvents.mqttPingInitiated(serverUri, keepAliveMillis.fromMillisToSeconds())

val token = if (pingSenderConfig.sendForcePing) {
comms.sendPingRequest()
} else {
comms.checkForActivity()
}
val token = comms.checkForActivity(pingSenderConfig.sendForcePing)
if (token == null) {
logger.d(TAG, "Mqtt Ping Token null")
pingSenderEvents.pingMqttTokenNull(serverUri, keepAliveMillis.fromMillisToSeconds())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ class WorkManagerPingSenderTest {
val mqttClient = mock<IMqttAsyncClient>()
val testUri = "test-uri"
val keepaliveMillis = 30000L
whenever(pingSenderConfig.sendForcePing).thenReturn(false)
whenever(comms.client).thenReturn(mqttClient)
whenever(mqttClient.serverURI).thenReturn(testUri)
whenever(comms.keepAlive).thenReturn(keepaliveMillis)
whenever(comms.checkForActivity()).thenReturn(null)
whenever(comms.checkForActivity(false)).thenReturn(null)

pingSender.sendPing {
// do nothing
Expand All @@ -82,10 +83,40 @@ class WorkManagerPingSenderTest {
val keepaliveMillis = 30000L
val startTime = TimeUnit.MILLISECONDS.toNanos(100)
val endTime = TimeUnit.MILLISECONDS.toNanos(110)
whenever(pingSenderConfig.sendForcePing).thenReturn(false)
whenever(comms.client).thenReturn(mqttClient)
whenever(mqttClient.serverURI).thenReturn(testUri)
whenever(comms.keepAlive).thenReturn(keepaliveMillis)
whenever(comms.checkForActivity()).thenReturn(mqttToken)
whenever(comms.checkForActivity(false)).thenReturn(mqttToken)
whenever(clock.nanoTime()).thenReturn(startTime, endTime)

var success: Boolean? = null
pingSender.sendPing {
success = it
}

verify(pingSenderEvents).mqttPingInitiated(testUri, keepaliveMillis / 1000)

val argumentCaptor = argumentCaptor<IMqttActionListener>()
verify(mqttToken).actionCallback = argumentCaptor.capture()
argumentCaptor.lastValue.onSuccess(mqttToken)
assertTrue(success!!)
verify(pingSenderEvents).pingEventSuccess(testUri, 10, keepaliveMillis / 1000)
}

@Test
fun `test sendPing when ping can be sent successfully with sendForcePing=true`() {
val mqttClient = mock<IMqttAsyncClient>()
val mqttToken = mock<MqttToken>()
val testUri = "test-uri"
val keepaliveMillis = 30000L
val startTime = TimeUnit.MILLISECONDS.toNanos(100)
val endTime = TimeUnit.MILLISECONDS.toNanos(110)
whenever(pingSenderConfig.sendForcePing).thenReturn(true)
whenever(comms.client).thenReturn(mqttClient)
whenever(mqttClient.serverURI).thenReturn(testUri)
whenever(comms.keepAlive).thenReturn(keepaliveMillis)
whenever(comms.checkForActivity(true)).thenReturn(mqttToken)
whenever(clock.nanoTime()).thenReturn(startTime, endTime)

var success: Boolean? = null
Expand Down Expand Up @@ -139,10 +170,11 @@ class WorkManagerPingSenderTest {
val keepaliveMillis = 30000L
val startTime = TimeUnit.MILLISECONDS.toNanos(100)
val endTime = TimeUnit.MILLISECONDS.toNanos(110)
whenever(pingSenderConfig.sendForcePing).thenReturn(false)
whenever(comms.client).thenReturn(mqttClient)
whenever(mqttClient.serverURI).thenReturn(testUri)
whenever(comms.keepAlive).thenReturn(keepaliveMillis)
whenever(comms.checkForActivity()).thenReturn(mqttToken)
whenever(comms.checkForActivity(false)).thenReturn(mqttToken)
whenever(clock.nanoTime()).thenReturn(startTime, endTime)

var success: Boolean? = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,7 @@ internal class WorkManagerPingSender(
val keepAliveMillis = comms.keepAlive
pingSenderEvents.mqttPingInitiated(serverUri, keepAliveMillis.fromMillisToSeconds())

val token = if (pingSenderConfig.sendForcePing) {
comms.sendPingRequest()
} else {
comms.checkForActivity()
}
val token = comms.checkForActivity(pingSenderConfig.sendForcePing)
if (token == null) {
logger.d(TAG, "Mqtt Ping Token null")
pingSenderEvents.pingMqttTokenNull(serverUri, keepAliveMillis.fromMillisToSeconds())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ class WorkManagerPingSenderTest {
val mqttClient = mock<IMqttAsyncClient>()
val testUri = "test-uri"
val keepaliveMillis = 30000L
whenever(pingSenderConfig.sendForcePing).thenReturn(false)
whenever(comms.client).thenReturn(mqttClient)
whenever(mqttClient.serverURI).thenReturn(testUri)
whenever(comms.keepAlive).thenReturn(keepaliveMillis)
whenever(comms.checkForActivity()).thenReturn(null)
whenever(comms.checkForActivity(false)).thenReturn(null)

pingSender.sendPing {
// do nothing
Expand All @@ -82,10 +83,40 @@ class WorkManagerPingSenderTest {
val keepaliveMillis = 30000L
val startTime = TimeUnit.MILLISECONDS.toNanos(100)
val endTime = TimeUnit.MILLISECONDS.toNanos(110)
whenever(pingSenderConfig.sendForcePing).thenReturn(false)
whenever(comms.client).thenReturn(mqttClient)
whenever(mqttClient.serverURI).thenReturn(testUri)
whenever(comms.keepAlive).thenReturn(keepaliveMillis)
whenever(comms.checkForActivity()).thenReturn(mqttToken)
whenever(comms.checkForActivity(false)).thenReturn(mqttToken)
whenever(clock.nanoTime()).thenReturn(startTime, endTime)

var success: Boolean? = null
pingSender.sendPing {
success = it
}

verify(pingSenderEvents).mqttPingInitiated(testUri, keepaliveMillis / 1000)

val argumentCaptor = argumentCaptor<IMqttActionListener>()
verify(mqttToken).actionCallback = argumentCaptor.capture()
argumentCaptor.lastValue.onSuccess(mqttToken)
assertTrue(success!!)
verify(pingSenderEvents).pingEventSuccess(testUri, 10, keepaliveMillis / 1000)
}

@Test
fun `test sendPing when ping can be sent successfully with sendForcePing=true`() {
val mqttClient = mock<IMqttAsyncClient>()
val mqttToken = mock<MqttToken>()
val testUri = "test-uri"
val keepaliveMillis = 30000L
val startTime = TimeUnit.MILLISECONDS.toNanos(100)
val endTime = TimeUnit.MILLISECONDS.toNanos(110)
whenever(pingSenderConfig.sendForcePing).thenReturn(true)
whenever(comms.client).thenReturn(mqttClient)
whenever(mqttClient.serverURI).thenReturn(testUri)
whenever(comms.keepAlive).thenReturn(keepaliveMillis)
whenever(comms.checkForActivity(true)).thenReturn(mqttToken)
whenever(clock.nanoTime()).thenReturn(startTime, endTime)

var success: Boolean? = null
Expand Down Expand Up @@ -139,10 +170,11 @@ class WorkManagerPingSenderTest {
val keepaliveMillis = 30000L
val startTime = TimeUnit.MILLISECONDS.toNanos(100)
val endTime = TimeUnit.MILLISECONDS.toNanos(110)
whenever(pingSenderConfig.sendForcePing).thenReturn(false)
whenever(comms.client).thenReturn(mqttClient)
whenever(mqttClient.serverURI).thenReturn(testUri)
whenever(comms.keepAlive).thenReturn(keepaliveMillis)
whenever(comms.checkForActivity()).thenReturn(mqttToken)
whenever(comms.checkForActivity(false)).thenReturn(mqttToken)
whenever(clock.nanoTime()).thenReturn(startTime, endTime)

var success: Boolean? = null
Expand Down

0 comments on commit b7da0c7

Please sign in to comment.