Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use server time for calling (WPB-10979) #3017

Merged
merged 9 commits into from
Oct 8, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ import com.wire.kalium.logic.feature.call.usecase.GetCallConversationTypeProvide
import com.wire.kalium.logic.feature.message.MessageSender
import com.wire.kalium.logic.featureFlags.KaliumConfigs
import com.wire.kalium.logic.functional.fold
import com.wire.kalium.logic.util.ServerTimeHandler
import com.wire.kalium.logic.util.toInt
import com.wire.kalium.network.NetworkStateObserver
import com.wire.kalium.util.KaliumDispatcher
Expand Down Expand Up @@ -119,6 +120,7 @@ class CallManagerImpl internal constructor(
private val flowManagerService: FlowManagerService,
private val json: Json = Json { ignoreUnknownKeys = true },
private val shouldRemoteMuteChecker: ShouldRemoteMuteChecker = ShouldRemoteMuteCheckerImpl(),
private val serverTimeHandler: ServerTimeHandler = ServerTimeHandler(),
kaliumDispatchers: KaliumDispatcher = KaliumDispatcherImpl
) : CallManager {

Expand Down Expand Up @@ -255,8 +257,6 @@ class CallManagerImpl internal constructor(
)

if (callingValue.type != REMOTE_MUTE_TYPE || shouldRemoteMute) {
val currTime = System.currentTimeMillis()

val targetConversationId = if (message.isSelfMessage) {
content.conversationId ?: message.conversationId
} else {
Expand All @@ -270,7 +270,7 @@ class CallManagerImpl internal constructor(
inst = deferredHandle.await(),
msg = msg,
len = msg.size,
curr_time = Uint32_t(value = currTime / 1000),
curr_time = Uint32_t(value = serverTimeHandler.toServerTimestamp()),
msg_time = Uint32_t(value = message.date.epochSeconds),
convId = federatedIdMapper.parseToFederatedId(targetConversationId),
userId = federatedIdMapper.parseToFederatedId(message.senderUserId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import kotlinx.serialization.json.JsonNull
*/
data class EventEnvelope(
val event: Event,
val deliveryInfo: EventDeliveryInfo,
val deliveryInfo: EventDeliveryInfo
) {
override fun toString(): String {
return super.toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ interface EventRepository {
* @return Either containing a [CoreFailure] or the oldest available event ID as a String.
*/
suspend fun fetchOldestAvailableEventId(): Either<CoreFailure, String>
suspend fun fetchServerTime(): String?
}

class EventDataSource(
Expand Down Expand Up @@ -193,6 +194,15 @@ class EventDataSource(
}
}.map { it.id }

override suspend fun fetchServerTime(): String? {
val result = notificationApi.getServerTime(NOTIFICATIONS_QUERY_SIZE)
return if (result.isSuccessful()) {
result.value
} else {
null
}
}

private companion object {
const val NOTIFICATIONS_QUERY_SIZE = 100
const val LAST_PROCESSED_EVENT_ID_KEY = "last_processed_event_id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -917,22 +917,22 @@ class UserSessionScope internal constructor(
)

private val eventGatherer: EventGatherer get() = EventGathererImpl(
eventRepository,
incrementalSyncRepository,
userScopedLogger,
eventRepository = eventRepository,
incrementalSyncRepository = incrementalSyncRepository,
logger = userScopedLogger
)

private val eventProcessor: EventProcessor by lazy {
EventProcessorImpl(
eventRepository,
conversationEventReceiver,
userEventReceiver,
teamEventReceiver,
featureConfigEventReceiver,
userPropertiesEventReceiver,
federationEventReceiver,
this@UserSessionScope,
userScopedLogger,
eventRepository = eventRepository,
conversationEventReceiver = conversationEventReceiver,
userEventReceiver = userEventReceiver,
teamEventReceiver = teamEventReceiver,
featureConfigEventReceiver = featureConfigEventReceiver,
userPropertiesEventReceiver = userPropertiesEventReceiver,
federationEventReceiver = federationEventReceiver,
processingScope = this@UserSessionScope,
logger = userScopedLogger,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import com.wire.kalium.logic.functional.onFailure
import com.wire.kalium.logic.functional.onSuccess
import com.wire.kalium.logic.kaliumLogger
import com.wire.kalium.logic.sync.KaliumSyncException
import com.wire.kalium.logic.util.ServerTimeHandler
import com.wire.kalium.network.api.base.authenticated.notification.WebSocketEvent
import com.wire.kalium.network.exceptions.KaliumException
import io.ktor.http.HttpStatusCode
Expand All @@ -52,6 +53,7 @@ import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.transformWhile
import kotlinx.datetime.toInstant

/**
* Responsible for fetching events from a remote source, orchestrating between events missed since
Expand All @@ -77,6 +79,7 @@ internal interface EventGatherer {
internal class EventGathererImpl(
private val eventRepository: EventRepository,
private val incrementalSyncRepository: IncrementalSyncRepository,
private val serverTimeHandler: ServerTimeHandler = ServerTimeHandler(),
logger: KaliumLogger = kaliumLogger,
) : EventGatherer {

Expand Down Expand Up @@ -165,6 +168,7 @@ internal class EventGathererImpl(

private suspend fun FlowCollector<EventEnvelope>.onWebSocketOpen() {
logger.i("Websocket Open")
handleTimeDrift()
eventRepository
.pendingEvents()
.onEach { result ->
Expand All @@ -181,6 +185,12 @@ internal class EventGathererImpl(
_currentSource.value = EventSource.LIVE
}

private suspend fun handleTimeDrift() {
eventRepository.fetchServerTime()?.let {
serverTimeHandler.computeTimeOffset(it.toInstant().epochSeconds)
}
}

private fun throwPendingEventException(failure: CoreFailure) {
val networkCause = (failure as? NetworkFailure.ServerMiscommunication)?.rootCause
val isEventNotFound = networkCause is KaliumException.InvalidRequestError
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Wire
* Copyright (C) 2024 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package com.wire.kalium.logic.util

import com.wire.kalium.logic.kaliumLogger
import kotlinx.datetime.Clock

class ServerTimeHandler {

/**
* Used to store the difference (offset) between the server time and the local client time.
* And it will be used to adjust timestamps between server and client times.
*/
private var timeOffset: Long = 0
ohassine marked this conversation as resolved.
Show resolved Hide resolved

/**
* Compute the time offset between the server and the client
* @param serverTime the server time to compute the offset
*/
fun computeTimeOffset(serverTime: Long) {
kaliumLogger.i("ServerTimeHandler: computing time offset between server and client..")
val offset = Clock.System.now().epochSeconds - serverTime
timeOffset = offset
}

fun getTimeOffset(): Long {
return timeOffset
}

/**
* Convert local timestamp to server timestamp
* @param localTimestamp timestamp from client to convert
* @return the timestamp adjusted with the client/server time shift
*/
fun toServerTimestamp(localTimestamp: Long = Clock.System.now().epochSeconds): Long {
return localTimestamp - timeOffset
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ import kotlinx.datetime.Instant
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertIs
import kotlin.test.assertNotNull
import kotlin.test.assertNull

class EventRepositoryTest {

Expand Down Expand Up @@ -136,6 +138,34 @@ class EventRepositoryTest {
}
}

@Test
fun givenAPIFailure_whenFetchingServerTime_thenReturnNull() = runTest {
val (_, eventRepository) = Arrangement()
.withGetServerTimeReturning(NetworkResponse.Error(KaliumException.NoNetwork()))
.arrange()

val result = eventRepository.fetchServerTime()

assertNull(result)
}


@Test
fun givenAPISucceeds_whenFetchingServerTime_thenReturnTime() = runTest {
val result = NetworkResponse.Success(
value = "123434545",
headers = mapOf(),
httpCode = HttpStatusCode.OK.value
)
val (_, eventRepository) = Arrangement()
.withGetServerTimeReturning(result)
.arrange()

val time = eventRepository.fetchServerTime()

assertNotNull(time)
}

private companion object {
const val LAST_PROCESSED_EVENT_ID_KEY = "last_processed_event_id"
}
Expand All @@ -158,12 +188,6 @@ class EventRepositoryTest {
}
}

suspend fun withDeleteMetadataSucceeding() = apply {
coEvery {
metaDAO.deleteValue(any())
}.returns(Unit)
}

suspend fun withLastStoredEventId(value: String?) = apply {
coEvery {
metaDAO.valueByKey(LAST_PROCESSED_EVENT_ID_KEY)
Expand All @@ -176,15 +200,15 @@ class EventRepositoryTest {
}.returns(result)
}

suspend fun withLastNotificationRemote(result: NetworkResponse<EventResponse>) = apply {
suspend fun withOldestNotificationReturning(result: NetworkResponse<EventResponse>) = apply {
coEvery {
notificationApi.mostRecentNotification(any())
notificationApi.oldestNotification(any())
}.returns(result)
}

suspend fun withOldestNotificationReturning(result: NetworkResponse<EventResponse>) = apply {
suspend fun withGetServerTimeReturning(result: NetworkResponse<String>) = apply {
coEvery {
notificationApi.oldestNotification(any())
notificationApi.getServerTime(any())
}.returns(result)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,10 @@ object TestEvent {
id = "eventId",
)

fun Event.wrapInEnvelope(isTransient: Boolean = false, source: EventSource = EventSource.LIVE): EventEnvelope {
fun Event.wrapInEnvelope(
isTransient: Boolean = false,
source: EventSource = EventSource.LIVE
): EventEnvelope {
return EventEnvelope(this, EventDeliveryInfo(isTransient, source))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.wire.kalium.logic.sync.receiver.FederationEventReceiver
import com.wire.kalium.logic.sync.receiver.TeamEventReceiver
import com.wire.kalium.logic.sync.receiver.UserEventReceiver
import com.wire.kalium.logic.sync.receiver.UserPropertiesEventReceiver
import com.wire.kalium.logic.util.ServerTimeHandler
import com.wire.kalium.logic.util.arrangement.eventHandler.FeatureConfigEventReceiverArrangement
import com.wire.kalium.logic.util.arrangement.eventHandler.FeatureConfigEventReceiverArrangementImpl
import com.wire.kalium.logic.util.shouldFail
Expand All @@ -40,6 +41,7 @@ import io.mockative.eq
import io.mockative.mock
import io.mockative.once
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.isActive
Expand All @@ -52,6 +54,7 @@ import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertFalse

@OptIn(ExperimentalCoroutinesApi::class)
class EventProcessorTest {

@Test
Expand Down Expand Up @@ -319,6 +322,8 @@ class EventProcessorTest {
@Mock
val userPropertiesEventReceiver = mock(UserPropertiesEventReceiver::class)

val serverTimeHandler = ServerTimeHandler()

@Mock
val federationEventReceiver = mock(FederationEventReceiver::class)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ interface NotificationApi {
*/
suspend fun getAllNotifications(querySize: Int, queryClient: String): NetworkResponse<NotificationResponse>

suspend fun getServerTime(querySize: Int): NetworkResponse<String>
suspend fun listenToLiveEvents(clientId: String): NetworkResponse<Flow<WebSocketEvent<EventResponse>>>

}
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,12 @@ internal open class NotificationApiV0 internal constructor(
override suspend fun getAllNotifications(querySize: Int, queryClient: String): NetworkResponse<NotificationResponse> =
notificationsCall(querySize = querySize, queryClient = queryClient, querySince = null)

override suspend fun getServerTime(querySize: Int): NetworkResponse<String> =
notificationsCall(querySize = querySize, queryClient = null, querySince = null).mapSuccess { it.time }

protected open suspend fun notificationsCall(
querySize: Int,
queryClient: String,
queryClient: String?,
querySince: String?
): NetworkResponse<NotificationResponse> {
return wrapKaliumResponse({
Expand All @@ -104,7 +107,7 @@ internal open class NotificationApiV0 internal constructor(
}) {
httpClient.get(PATH_NOTIFICATIONS) {
parameter(SIZE_QUERY_KEY, querySize)
parameter(CLIENT_QUERY_KEY, queryClient)
queryClient?.let { parameter(CLIENT_QUERY_KEY, it) }
querySince?.let { parameter(SINCE_QUERY_KEY, it) }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ internal open class NotificationApiV3 internal constructor(

override suspend fun notificationsCall(
querySize: Int,
queryClient: String,
queryClient: String?,
querySince: String?
): NetworkResponse<NotificationResponse> = wrapKaliumResponse {
// Pretty much the same V0 request, but without the 404 overwrite
httpClient.get(V0.PATH_NOTIFICATIONS) {
parameter(V0.SIZE_QUERY_KEY, querySize)
parameter(V0.CLIENT_QUERY_KEY, queryClient)
queryClient?.let { parameter(V0.CLIENT_QUERY_KEY, it) }
querySince?.let { parameter(V0.SINCE_QUERY_KEY, it) }
}
}
Expand Down
Loading