Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use server time for calling (WPB-10979) #3017

Merged
merged 9 commits into from
Oct 8, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ import com.wire.kalium.logic.feature.call.usecase.GetCallConversationTypeProvide
import com.wire.kalium.logic.feature.message.MessageSender
import com.wire.kalium.logic.featureFlags.KaliumConfigs
import com.wire.kalium.logic.functional.fold
import com.wire.kalium.logic.util.ServerTimeHandler
import com.wire.kalium.logic.util.toInt
import com.wire.kalium.network.NetworkStateObserver
import com.wire.kalium.util.KaliumDispatcher
Expand Down Expand Up @@ -258,8 +259,6 @@ class CallManagerImpl internal constructor(
)

if (callingValue.type != REMOTE_MUTE_TYPE || shouldRemoteMute) {
val currTime = System.currentTimeMillis()

val targetConversationId = if (message.isSelfMessage) {
content.conversationId ?: message.conversationId
} else {
Expand All @@ -273,7 +272,7 @@ class CallManagerImpl internal constructor(
inst = deferredHandle.await(),
msg = msg,
len = msg.size,
curr_time = Uint32_t(value = currTime / 1000),
curr_time = Uint32_t(value = ServerTimeHandler.toServerTimestamp()),
msg_time = Uint32_t(value = message.date.epochSeconds),
convId = federatedIdMapper.parseToFederatedId(targetConversationId),
userId = federatedIdMapper.parseToFederatedId(message.senderUserId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import kotlinx.serialization.json.JsonNull
data class EventEnvelope(
val event: Event,
val deliveryInfo: EventDeliveryInfo,
val time: String?
) {
override fun toString(): String {
return super.toString()
Expand All @@ -67,7 +68,8 @@ data class EventEnvelope(

fun toLogMap(): Map<String, Any?> = mapOf(
"event" to event.toLogMap(),
"deliveryInfo" to deliveryInfo.toLogMap()
"deliveryInfo" to deliveryInfo.toLogMap(),
"time" to time
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ class EventMapper(
private val qualifiedIdMapper: QualifiedIdMapper = MapperProvider.qualifiedIdMapper(selfUserId),
private val conversationMapper: ConversationMapper = MapperProvider.conversationMapper(selfUserId)
) {
fun fromDTO(eventResponse: EventResponse, isLive: Boolean): List<EventEnvelope> {
fun fromDTO(eventResponse: EventResponse, isLive: Boolean, time: String?): List<EventEnvelope> {
// TODO(edge-case): Multiple payloads in the same event have the same ID, is this an issue when marking lastProcessedEventId?
val id = eventResponse.id
val source = if (isLive) EventSource.LIVE else EventSource.PENDING
return eventResponse.payload?.map { eventContentDTO ->
EventEnvelope(fromEventContentDTO(id, eventContentDTO), EventDeliveryInfo(eventResponse.transient, source))
EventEnvelope(fromEventContentDTO(id, eventContentDTO), EventDeliveryInfo(eventResponse.transient, source), time)
} ?: listOf()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class EventDataSource(
}

is WebSocketEvent.BinaryPayloadReceived -> {
eventMapper.fromDTO(webSocketEvent.payload, true).asFlow().map { WebSocketEvent.BinaryPayloadReceived(it) }
eventMapper.fromDTO(webSocketEvent.payload, true, null).asFlow().map { WebSocketEvent.BinaryPayloadReceived(it) }
}
}
}.flattenConcat()
Expand All @@ -140,7 +140,7 @@ class EventDataSource(
lastFetchedNotificationId = notificationsPageResult.value.notifications.lastOrNull()?.id

notificationsPageResult.value.notifications.flatMap {
eventMapper.fromDTO(it, isLive = false)
eventMapper.fromDTO(it, isLive = false, notificationsPageResult.value.time)
}.forEach { event ->
if (!coroutineContext.isActive) {
return@flow
Expand All @@ -157,7 +157,7 @@ class EventDataSource(
override fun parseExternalEvents(data: String): List<EventEnvelope> {
val notificationResponse = Json.decodeFromString<NotificationResponse>(data)
return notificationResponse.notifications.flatMap {
eventMapper.fromDTO(it, isLive = false)
eventMapper.fromDTO(it, isLive = false, notificationResponse.time)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -924,15 +924,15 @@ class UserSessionScope internal constructor(

private val eventProcessor: EventProcessor by lazy {
EventProcessorImpl(
eventRepository,
conversationEventReceiver,
userEventReceiver,
teamEventReceiver,
featureConfigEventReceiver,
userPropertiesEventReceiver,
federationEventReceiver,
this@UserSessionScope,
userScopedLogger,
eventRepository = eventRepository,
conversationEventReceiver = conversationEventReceiver,
userEventReceiver = userEventReceiver,
teamEventReceiver = teamEventReceiver,
featureConfigEventReceiver = featureConfigEventReceiver,
userPropertiesEventReceiver = userPropertiesEventReceiver,
federationEventReceiver = federationEventReceiver,
processingScope = this@UserSessionScope,
logger = userScopedLogger,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ import com.wire.kalium.logic.sync.receiver.TeamEventReceiver
import com.wire.kalium.logic.sync.receiver.UserEventReceiver
import com.wire.kalium.logic.sync.receiver.UserPropertiesEventReceiver
import com.wire.kalium.logic.util.EventLoggingStatus
import com.wire.kalium.logic.util.ServerTimeHandler
import com.wire.kalium.logic.util.createEventProcessingLogger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.async
import kotlinx.coroutines.withContext
import kotlinx.datetime.toInstant

/**
* Handles incoming events from remote.
Expand Down Expand Up @@ -76,6 +78,7 @@ internal class EventProcessorImpl(
private val userPropertiesEventReceiver: UserPropertiesEventReceiver,
private val federationEventReceiver: FederationEventReceiver,
private val processingScope: CoroutineScope,
private val serverTimeHandler: ServerTimeHandler = ServerTimeHandler,
logger: KaliumLogger = kaliumLogger,
) : EventProcessor {

Expand All @@ -86,7 +89,10 @@ internal class EventProcessorImpl(
override var disableEventProcessing: Boolean = false

override suspend fun processEvent(eventEnvelope: EventEnvelope): Either<CoreFailure, Unit> = processingScope.async {
val (event, deliveryInfo) = eventEnvelope
val (event, deliveryInfo, time) = eventEnvelope
time?.let {
serverTimeHandler.computeTimeOffset(it.toInstant().epochSeconds)
}
if (disableEventProcessing) {
logger.w("Skipping processing of ${event.toLogString()} due to debug option")
Either.Right(Unit)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Wire
* Copyright (C) 2024 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package com.wire.kalium.logic.util

import kotlinx.datetime.Clock

object ServerTimeHandler {
private var timeOffset: Long = 0
ohassine marked this conversation as resolved.
Show resolved Hide resolved

ohassine marked this conversation as resolved.
Show resolved Hide resolved
/**
* Compute the time offset between the server and the client
* @param serverTime the server time to compute the offset
*/
fun computeTimeOffset(serverTime: Long) {
val offset = Clock.System.now().epochSeconds - serverTime
timeOffset = offset
}

fun getTimeOffset(): Long {
return timeOffset
}

/**
* Convert local timestamp to server timestamp
* @param localTimestamp timestamp from client to convert
* @return the timestamp adjusted with the client/server time shift
*/
fun toServerTimestamp(localTimestamp: Long = Clock.System.now().epochSeconds): Long {
return localTimestamp - timeOffset
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,12 @@ object TestEvent {
id = "eventId",
)

fun Event.wrapInEnvelope(isTransient: Boolean = false, source: EventSource = EventSource.LIVE): EventEnvelope {
return EventEnvelope(this, EventDeliveryInfo(isTransient, source))
fun Event.wrapInEnvelope(
isTransient: Boolean = false,
source: EventSource = EventSource.LIVE,
time: String? = null
): EventEnvelope {
return EventEnvelope(this, EventDeliveryInfo(isTransient, source), time)
}

val liveDeliveryInfo = EventDeliveryInfo(false, EventSource.LIVE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.wire.kalium.logic.sync.receiver.FederationEventReceiver
import com.wire.kalium.logic.sync.receiver.TeamEventReceiver
import com.wire.kalium.logic.sync.receiver.UserEventReceiver
import com.wire.kalium.logic.sync.receiver.UserPropertiesEventReceiver
import com.wire.kalium.logic.util.ServerTimeHandler
import com.wire.kalium.logic.util.arrangement.eventHandler.FeatureConfigEventReceiverArrangement
import com.wire.kalium.logic.util.arrangement.eventHandler.FeatureConfigEventReceiverArrangementImpl
import com.wire.kalium.logic.util.shouldFail
Expand All @@ -40,6 +41,7 @@ import io.mockative.eq
import io.mockative.mock
import io.mockative.once
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.isActive
Expand All @@ -51,7 +53,9 @@ import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertFalse
import kotlin.test.assertTrue

@OptIn(ExperimentalCoroutinesApi::class)
class EventProcessorTest {

@Test
Expand Down Expand Up @@ -300,6 +304,19 @@ class EventProcessorTest {
}.wasNotInvoked()
}

@Test
fun givenEventWithTime_whenProcessingEvent_thenTimeOffsetIsComputed() = runTest {
val time = "2024-03-30T15:36:00.000Z"
val event = TestEvent.newConversationEvent().wrapInEnvelope(time = time)

val (arrangement, eventProcessor) = Arrangement(this).arrange {
withUpdateLastProcessedEventId(event.event.id, Either.Right(Unit))
}
eventProcessor.processEvent(event)

assertTrue { arrangement.serverTimeHandler.getTimeOffset() != 0L }
}

private class Arrangement(
val processingScope: CoroutineScope
) : FeatureConfigEventReceiverArrangement by FeatureConfigEventReceiverArrangementImpl() {
Expand All @@ -319,6 +336,8 @@ class EventProcessorTest {
@Mock
val userPropertiesEventReceiver = mock(UserPropertiesEventReceiver::class)

val serverTimeHandler = ServerTimeHandler

@Mock
val federationEventReceiver = mock(FederationEventReceiver::class)

Expand Down Expand Up @@ -392,7 +411,8 @@ class EventProcessorTest {
featureConfigEventReceiver,
userPropertiesEventReceiver,
federationEventReceiver,
processingScope
processingScope,
serverTimeHandler
)
}
}
Expand Down
Loading