Skip to content

Commit

Permalink
fix: use server time for calling (WPB-10979) (#3017)
Browse files Browse the repository at this point in the history
* fix: use server time for calling

* chore: address comments

* fix: get serverTime after websocket is connected

* chore: cleanup

* chore: broken unit test
  • Loading branch information
ohassine authored Oct 8, 2024
1 parent 8be3015 commit fc3f3e3
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ import com.wire.kalium.logic.feature.call.usecase.GetCallConversationTypeProvide
import com.wire.kalium.logic.feature.message.MessageSender
import com.wire.kalium.logic.featureFlags.KaliumConfigs
import com.wire.kalium.logic.functional.fold
import com.wire.kalium.logic.util.ServerTimeHandler
import com.wire.kalium.logic.util.ServerTimeHandlerImpl
import com.wire.kalium.logic.util.toInt
import com.wire.kalium.network.NetworkStateObserver
import com.wire.kalium.util.KaliumDispatcher
Expand Down Expand Up @@ -119,6 +121,7 @@ class CallManagerImpl internal constructor(
private val flowManagerService: FlowManagerService,
private val json: Json = Json { ignoreUnknownKeys = true },
private val shouldRemoteMuteChecker: ShouldRemoteMuteChecker = ShouldRemoteMuteCheckerImpl(),
private val serverTimeHandler: ServerTimeHandler = ServerTimeHandlerImpl(),
kaliumDispatchers: KaliumDispatcher = KaliumDispatcherImpl
) : CallManager {

Expand Down Expand Up @@ -255,8 +258,6 @@ class CallManagerImpl internal constructor(
)

if (callingValue.type != REMOTE_MUTE_TYPE || shouldRemoteMute) {
val currTime = System.currentTimeMillis()

val targetConversationId = if (message.isSelfMessage) {
content.conversationId ?: message.conversationId
} else {
Expand All @@ -270,7 +271,7 @@ class CallManagerImpl internal constructor(
inst = deferredHandle.await(),
msg = msg,
len = msg.size,
curr_time = Uint32_t(value = currTime / 1000),
curr_time = Uint32_t(value = serverTimeHandler.toServerTimestamp()),
msg_time = Uint32_t(value = message.date.epochSeconds),
convId = federatedIdMapper.parseToFederatedId(targetConversationId),
userId = federatedIdMapper.parseToFederatedId(message.senderUserId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import kotlinx.serialization.json.JsonNull
*/
data class EventEnvelope(
val event: Event,
val deliveryInfo: EventDeliveryInfo,
val deliveryInfo: EventDeliveryInfo
) {
override fun toString(): String {
return super.toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ interface EventRepository {
* @return Either containing a [CoreFailure] or the oldest available event ID as a String.
*/
suspend fun fetchOldestAvailableEventId(): Either<CoreFailure, String>
suspend fun fetchServerTime(): String?
}

class EventDataSource(
Expand Down Expand Up @@ -193,6 +194,15 @@ class EventDataSource(
}
}.map { it.id }

override suspend fun fetchServerTime(): String? {
val result = notificationApi.getServerTime(NOTIFICATIONS_QUERY_SIZE)
return if (result.isSuccessful()) {
result.value
} else {
null
}
}

private companion object {
const val NOTIFICATIONS_QUERY_SIZE = 100
const val LAST_PROCESSED_EVENT_ID_KEY = "last_processed_event_id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -918,24 +918,23 @@ class UserSessionScope internal constructor(
kaliumFileSystem = kaliumFileSystem
)

private val eventGatherer: EventGatherer
get() = EventGathererImpl(
eventRepository,
incrementalSyncRepository,
userScopedLogger,
)
private val eventGatherer: EventGatherer get() = EventGathererImpl(
eventRepository = eventRepository,
incrementalSyncRepository = incrementalSyncRepository,
logger = userScopedLogger
)

private val eventProcessor: EventProcessor by lazy {
EventProcessorImpl(
eventRepository,
conversationEventReceiver,
userEventReceiver,
teamEventReceiver,
featureConfigEventReceiver,
userPropertiesEventReceiver,
federationEventReceiver,
this@UserSessionScope,
userScopedLogger,
eventRepository = eventRepository,
conversationEventReceiver = conversationEventReceiver,
userEventReceiver = userEventReceiver,
teamEventReceiver = teamEventReceiver,
featureConfigEventReceiver = featureConfigEventReceiver,
userPropertiesEventReceiver = userPropertiesEventReceiver,
federationEventReceiver = federationEventReceiver,
processingScope = this@UserSessionScope,
logger = userScopedLogger,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import com.wire.kalium.logic.functional.onFailure
import com.wire.kalium.logic.functional.onSuccess
import com.wire.kalium.logic.kaliumLogger
import com.wire.kalium.logic.sync.KaliumSyncException
import com.wire.kalium.logic.util.ServerTimeHandler
import com.wire.kalium.logic.util.ServerTimeHandlerImpl
import com.wire.kalium.network.api.base.authenticated.notification.WebSocketEvent
import com.wire.kalium.network.exceptions.KaliumException
import io.ktor.http.HttpStatusCode
Expand All @@ -52,6 +54,7 @@ import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.transformWhile
import kotlinx.datetime.toInstant

/**
* Responsible for fetching events from a remote source, orchestrating between events missed since
Expand All @@ -77,6 +80,7 @@ internal interface EventGatherer {
internal class EventGathererImpl(
private val eventRepository: EventRepository,
private val incrementalSyncRepository: IncrementalSyncRepository,
private val serverTimeHandler: ServerTimeHandler = ServerTimeHandlerImpl(),
logger: KaliumLogger = kaliumLogger,
) : EventGatherer {

Expand Down Expand Up @@ -165,6 +169,7 @@ internal class EventGathererImpl(

private suspend fun FlowCollector<EventEnvelope>.onWebSocketOpen() {
logger.i("Websocket Open")
handleTimeDrift()
eventRepository
.pendingEvents()
.onEach { result ->
Expand All @@ -181,6 +186,12 @@ internal class EventGathererImpl(
_currentSource.value = EventSource.LIVE
}

private suspend fun handleTimeDrift() {
eventRepository.fetchServerTime()?.let {
serverTimeHandler.computeTimeOffset(it.toInstant().epochSeconds)
}
}

private fun throwPendingEventException(failure: CoreFailure) {
val networkCause = (failure as? NetworkFailure.ServerMiscommunication)?.rootCause
val isEventNotFound = networkCause is KaliumException.InvalidRequestError
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Wire
* Copyright (C) 2024 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package com.wire.kalium.logic.util

import com.wire.kalium.logic.kaliumLogger
import kotlinx.datetime.Clock

internal interface ServerTimeHandler {
fun computeTimeOffset(serverTime: Long)
fun toServerTimestamp(localTimestamp: Long = Clock.System.now().epochSeconds): Long
}

internal class ServerTimeHandlerImpl : ServerTimeHandler {

/**
* Used to store the difference (offset) between the server time and the local client time.
* And it will be used to adjust timestamps between server and client times.
*/
private var timeOffset: Long = 0

/**
* Compute the time offset between the server and the client
* @param serverTime the server time to compute the offset
*/
override fun computeTimeOffset(serverTime: Long) {
kaliumLogger.i("ServerTimeHandler: computing time offset between server and client..")
val offset = Clock.System.now().epochSeconds - serverTime
timeOffset = offset
}

/**
* Convert local timestamp to server timestamp
* @param localTimestamp timestamp from client to convert
* @return the timestamp adjusted with the client/server time shift
*/
override fun toServerTimestamp(localTimestamp: Long): Long {
return localTimestamp - timeOffset
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ import kotlinx.datetime.Instant
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertIs
import kotlin.test.assertNotNull
import kotlin.test.assertNull

class EventRepositoryTest {

Expand Down Expand Up @@ -136,6 +138,34 @@ class EventRepositoryTest {
}
}

@Test
fun givenAPIFailure_whenFetchingServerTime_thenReturnNull() = runTest {
val (_, eventRepository) = Arrangement()
.withGetServerTimeReturning(NetworkResponse.Error(KaliumException.NoNetwork()))
.arrange()

val result = eventRepository.fetchServerTime()

assertNull(result)
}


@Test
fun givenAPISucceeds_whenFetchingServerTime_thenReturnTime() = runTest {
val result = NetworkResponse.Success(
value = "123434545",
headers = mapOf(),
httpCode = HttpStatusCode.OK.value
)
val (_, eventRepository) = Arrangement()
.withGetServerTimeReturning(result)
.arrange()

val time = eventRepository.fetchServerTime()

assertNotNull(time)
}

private companion object {
const val LAST_PROCESSED_EVENT_ID_KEY = "last_processed_event_id"
}
Expand All @@ -158,12 +188,6 @@ class EventRepositoryTest {
}
}

suspend fun withDeleteMetadataSucceeding() = apply {
coEvery {
metaDAO.deleteValue(any())
}.returns(Unit)
}

suspend fun withLastStoredEventId(value: String?) = apply {
coEvery {
metaDAO.valueByKey(LAST_PROCESSED_EVENT_ID_KEY)
Expand All @@ -176,15 +200,15 @@ class EventRepositoryTest {
}.returns(result)
}

suspend fun withLastNotificationRemote(result: NetworkResponse<EventResponse>) = apply {
suspend fun withOldestNotificationReturning(result: NetworkResponse<EventResponse>) = apply {
coEvery {
notificationApi.mostRecentNotification(any())
notificationApi.oldestNotification(any())
}.returns(result)
}

suspend fun withOldestNotificationReturning(result: NetworkResponse<EventResponse>) = apply {
suspend fun withGetServerTimeReturning(result: NetworkResponse<String>) = apply {
coEvery {
notificationApi.oldestNotification(any())
notificationApi.getServerTime(any())
}.returns(result)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,10 @@ object TestEvent {
id = "eventId",
)

fun Event.wrapInEnvelope(isTransient: Boolean = false, source: EventSource = EventSource.LIVE): EventEnvelope {
fun Event.wrapInEnvelope(
isTransient: Boolean = false,
source: EventSource = EventSource.LIVE
): EventEnvelope {
return EventEnvelope(this, EventDeliveryInfo(isTransient, source))
}

Expand Down
Loading

0 comments on commit fc3f3e3

Please sign in to comment.