Skip to content

Commit

Permalink
Merge branch 'feat/composit_messages/epic' into feat/composit_message…
Browse files Browse the repository at this point in the history
…s/map_from_to_proto
  • Loading branch information
MohamadJaara authored Jul 16, 2023
2 parents 326bb08 + 2c1b8fa commit 6bd12d1
Show file tree
Hide file tree
Showing 15 changed files with 346 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class OnIncomingCall(
status = status,
callerId = qualifiedIdMapper.fromStringToQualifiedID(userId).toString(),
isMuted = isMuted,
isCameraOn = isVideoCall,
isCameraOn = false,
type = mappedConversationType,
isCbrEnabled = kaliumConfigs.forceConstantBitrateCalls
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ interface ConversationRepository {
suspend fun persistConversations(
conversations: List<ConversationResponse>,
selfUserTeamId: String?,
originatedFromEvent: Boolean = false
originatedFromEvent: Boolean = false,
invalidateMembers: Boolean = false
): Either<CoreFailure, Unit>

/**
Expand Down Expand Up @@ -250,7 +251,12 @@ internal class ConversationDataSource internal constructor(
kaliumLogger.withFeatureId(CONVERSATIONS)
.d("Skipping ${conversations.conversationsNotFound.size} conversations not found")
}
persistConversations(conversations.conversationsFound, selfTeamIdProvider().getOrNull()?.value)
persistConversations(
conversations = conversations.conversationsFound,
selfUserTeamId = selfTeamIdProvider().getOrNull()?.value,
invalidateMembers = true
)

}.onFailure {
kaliumLogger.withFeatureId(CONVERSATIONS).e("Error fetching conversation details $it")
}
Expand Down Expand Up @@ -291,6 +297,7 @@ internal class ConversationDataSource internal constructor(
conversations: List<ConversationResponse>,
selfUserTeamId: String?,
originatedFromEvent: Boolean,
invalidateMembers: Boolean
) = wrapStorageRequest {
val conversationEntities = conversations
.map { conversationResponse ->
Expand All @@ -307,9 +314,19 @@ internal class ConversationDataSource internal constructor(
}
conversationDAO.insertConversations(conversationEntities)
conversations.forEach { conversationsResponse ->
memberDAO.insertMembersWithQualifiedId(
memberMapper.fromApiModelToDaoModel(conversationsResponse.members), idMapper.fromApiToDao(conversationsResponse.id)
)
// do the cleanup of members from conversation in case when self user rejoined conversation
// and may not received any member remove or leave events
if (invalidateMembers) {
memberDAO.updateFullMemberList(
memberMapper.fromApiModelToDaoModel(conversationsResponse.members),
idMapper.fromApiToDao(conversationsResponse.id)
)
} else {
memberDAO.insertMembersWithQualifiedId(
memberMapper.fromApiModelToDaoModel(conversationsResponse.members),
idMapper.fromApiToDao(conversationsResponse.id)
)
}
}
}

Expand Down Expand Up @@ -392,7 +409,7 @@ internal class ConversationDataSource internal constructor(
conversationApi.fetchConversationDetails(conversationID.toApi())
}.flatMap {
val selfUserTeamId = selfTeamIdProvider().getOrNull()
persistConversations(listOf(it), selfUserTeamId?.value)
persistConversations(listOf(it), selfUserTeamId?.value, invalidateMembers = true)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ import com.wire.kalium.logic.sync.receiver.conversation.message.ApplicationMessa
import com.wire.kalium.logic.sync.receiver.conversation.message.ApplicationMessageHandlerImpl
import com.wire.kalium.logic.sync.receiver.conversation.message.MLSMessageUnpacker
import com.wire.kalium.logic.sync.receiver.conversation.message.MLSMessageUnpackerImpl
import com.wire.kalium.logic.sync.receiver.conversation.message.NewMessageEventHandler
import com.wire.kalium.logic.sync.receiver.conversation.message.MLSWrongEpochHandler
import com.wire.kalium.logic.sync.receiver.conversation.message.MLSWrongEpochHandlerImpl
import com.wire.kalium.logic.sync.receiver.conversation.message.NewMessageEventHandlerImpl
Expand Down Expand Up @@ -991,9 +992,13 @@ class UserSessionScope internal constructor(
joinExistingMLSConversation = joinExistingMLSConversationUseCase
)

private val newMessageHandler: NewMessageEventHandlerImpl
private val newMessageHandler: NewMessageEventHandler
get() = NewMessageEventHandlerImpl(
proteusUnpacker, mlsUnpacker, applicationMessageHandler, mlsWrongEpochHandler
proteusUnpacker, mlsUnpacker, applicationMessageHandler,
{ conversationId, messageId ->
messages.ephemeralMessageDeletionHandler.startSelfDeletion(conversationId, messageId)
}, userId,
mlsWrongEpochHandler
)

private val newConversationHandler: NewConversationEventHandler
Expand All @@ -1009,7 +1014,7 @@ class UserSessionScope internal constructor(
)
private val memberJoinHandler: MemberJoinEventHandler
get() = MemberJoinEventHandlerImpl(
conversationRepository, userRepository, persistMessage
conversationRepository, userRepository, persistMessage, userId
)
private val memberLeaveHandler: MemberLeaveEventHandler
get() = MemberLeaveEventHandlerImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import com.wire.kalium.logic.feature.message.ephemeral.DeleteEphemeralMessageFor
import com.wire.kalium.logic.feature.message.ephemeral.DeleteEphemeralMessageForSelfUserAsSenderUseCaseImpl
import com.wire.kalium.logic.feature.message.ephemeral.EnqueueMessageSelfDeletionUseCase
import com.wire.kalium.logic.feature.message.ephemeral.EnqueueMessageSelfDeletionUseCaseImpl
import com.wire.kalium.logic.feature.message.ephemeral.EphemeralMessageDeletionHandler
import com.wire.kalium.logic.feature.message.ephemeral.EphemeralMessageDeletionHandlerImpl
import com.wire.kalium.logic.feature.selfDeletingMessages.ObserveSelfDeletionTimerSettingsForConversationUseCase
import com.wire.kalium.logic.feature.sessionreset.ResetSessionUseCase
Expand Down Expand Up @@ -113,7 +114,7 @@ class MessageScope internal constructor(
private val messageSendingInterceptor: MessageSendingInterceptor
get() = MessageSendingInterceptorImpl(messageContentEncoder, messageRepository)

internal val ephemeralMessageDeletionHandler =
internal val ephemeralMessageDeletionHandler: EphemeralMessageDeletionHandler =
EphemeralMessageDeletionHandlerImpl(
userSessionCoroutineScope = scope,
messageRepository = messageRepository,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ internal class ConversationEventReceiverImpl(
}

is Event.Conversation.AccessUpdate -> {
TODO()
/* no-op */
Either.Right(Unit)
}

is Event.Conversation.ConversationMessageTimer -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.wire.kalium.logic.data.event.logEventProcessing
import com.wire.kalium.logic.data.message.Message
import com.wire.kalium.logic.data.message.MessageContent
import com.wire.kalium.logic.data.message.PersistMessageUseCase
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.data.user.UserRepository
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.onFailure
Expand All @@ -42,13 +43,19 @@ interface MemberJoinEventHandler {
internal class MemberJoinEventHandlerImpl(
private val conversationRepository: ConversationRepository,
private val userRepository: UserRepository,
private val persistMessage: PersistMessageUseCase
private val persistMessage: PersistMessageUseCase,
private val selfUserId: UserId
) : MemberJoinEventHandler {
private val logger by lazy { kaliumLogger.withFeatureId(KaliumLogger.Companion.ApplicationFlow.EVENT_RECEIVER) }

override suspend fun handle(event: Event.Conversation.MemberJoin) =
// Attempt to fetch conversation details if needed, as this might be an unknown conversation
conversationRepository.fetchConversationIfUnknown(event.conversationId)
// we need to force fetching conversation when self user rejoined to conversation,
// because he may not received member change events
if (event.members.map { it.id }.contains(selfUserId)) {
conversationRepository.fetchConversation(event.conversationId)
} else {
conversationRepository.fetchConversationIfUnknown(event.conversationId)
}
.run {
onSuccess {
val logMap = mapOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import com.wire.kalium.logic.data.conversation.ClientId
import com.wire.kalium.logic.data.event.Event
import com.wire.kalium.logic.data.event.EventLoggingStatus
import com.wire.kalium.logic.data.event.logEventProcessing
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.message.MessageContent
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.functional.onFailure
import com.wire.kalium.logic.functional.onSuccess
import com.wire.kalium.logic.kaliumLogger
Expand All @@ -41,6 +43,8 @@ internal class NewMessageEventHandlerImpl(
private val proteusMessageUnpacker: ProteusMessageUnpacker,
private val mlsMessageUnpacker: MLSMessageUnpacker,
private val applicationMessageHandler: ApplicationMessageHandler,
private val enqueueSelfDeletion: (conversationId: ConversationId, messageId: String) -> Unit,
private val selfUserId: UserId,
private val mlsWrongEpochHandler: MLSWrongEpochHandler
) : NewMessageEventHandler {

Expand Down Expand Up @@ -76,7 +80,10 @@ internal class NewMessageEventHandlerImpl(
)
)
}.onSuccess {
handleSuccessfulResult(it)
if (it is MessageUnpackResult.ApplicationMessage) {
handleSuccessfulResult(it)
onMessageInserted(it)
}
kaliumLogger
.logEventProcessing(
EventLoggingStatus.SUCCESS,
Expand Down Expand Up @@ -113,7 +120,10 @@ internal class NewMessageEventHandlerImpl(
)
)
}.onSuccess {
handleSuccessfulResult(it)
if (it is MessageUnpackResult.ApplicationMessage) {
handleSuccessfulResult(it)
onMessageInserted(it)
}
kaliumLogger
.logEventProcessing(
EventLoggingStatus.SUCCESS,
Expand All @@ -122,17 +132,22 @@ internal class NewMessageEventHandlerImpl(
}
}

private suspend fun handleSuccessfulResult(result: MessageUnpackResult) {
if (result is MessageUnpackResult.ApplicationMessage) {
applicationMessageHandler.handleContent(
conversationId = result.conversationId,
timestampIso = result.timestampIso,
senderUserId = result.senderUserId,
senderClientId = result.senderClientId,
content = result.content
private fun onMessageInserted(result: MessageUnpackResult.ApplicationMessage) {
if (result.senderUserId == selfUserId && result.content.expiresAfterMillis != null) {
enqueueSelfDeletion(
result.conversationId,
result.content.messageUid
)
} else {
// NO-OP. Pure Protocol messages are handled by the unpackers
}
}

private suspend fun handleSuccessfulResult(result: MessageUnpackResult.ApplicationMessage) {
applicationMessageHandler.handleContent(
conversationId = result.conversationId,
timestampIso = result.timestampIso,
senderUserId = result.senderUserId,
senderClientId = result.senderClientId,
content = result.content
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ internal class SlowSyncManager(
val MIN_RETRY_DELAY = 1.seconds
val MAX_RETRY_DELAY = 10.minutes
val MIN_TIME_BETWEEN_SLOW_SYNCS = 7.days
const val CURRENT_VERSION = 2 // bump this version to perform slow sync when some new feature flag was added
}
}

const val CURRENT_VERSION = 3 // bump this version to perform slow sync when some new feature flag was added
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ object TestUser {
val USER_ID = UserId(value, domain)
val OTHER_USER_ID = USER_ID.copy(value = "otherValue")
val OTHER_USER_ID_2 = USER_ID.copy(value = "otherValue2")
val OTHER_FEDERATED_USER_ID = USER_ID.copy(value = "otherValue", "otherDomain")
val ENTITY_ID = QualifiedIDEntity(value, domain)
val NETWORK_ID = com.wire.kalium.network.api.base.model.UserId(
value = value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ import kotlin.test.Test
class MemberJoinEventHandlerTest {

@Test
fun givenMemberJoinEvent_whenHandlingIt_thenShouldFetchConversationIfUnknown() = runTest {
val newMembers = listOf(Member(TestUser.USER_ID, Member.Role.Member))
fun givenMemberJoinEventWithoutSelfUser_whenHandlingIt_thenShouldFetchConversationIfUnknown() = runTest {
val newMembers = listOf(Member(TestUser.OTHER_FEDERATED_USER_ID, Member.Role.Member))
val event = TestEvent.memberJoin(members = newMembers)

val (arrangement, eventHandler) = Arrangement()
Expand All @@ -64,6 +64,36 @@ class MemberJoinEventHandlerTest {
.suspendFunction(arrangement.conversationRepository::fetchConversationIfUnknown)
.with(eq(event.conversationId))
.wasInvoked(exactly = once)

verify(arrangement.conversationRepository)
.suspendFunction(arrangement.conversationRepository::fetchConversation)
.with(eq(event.conversationId))
.wasNotInvoked()
}

@Test
fun givenMemberJoinEventWithSelfUser_whenHandlingIt_thenShouldFetchConversation() = runTest {
val newMembers = listOf(Member(TestUser.SELF.id, Member.Role.Member))
val event = TestEvent.memberJoin(members = newMembers)

val (arrangement, eventHandler) = Arrangement()
.withPersistingMessageReturning(Either.Right(Unit))
.withFetchConversationIfUnknownSucceeding()
.withPersistMembersSucceeding()
.withFetchUsersIfUnknownByIdsReturning(Either.Right(Unit))
.arrange()

eventHandler.handle(event)

verify(arrangement.conversationRepository)
.suspendFunction(arrangement.conversationRepository::fetchConversationIfUnknown)
.with(eq(event.conversationId))
.wasNotInvoked()

verify(arrangement.conversationRepository)
.suspendFunction(arrangement.conversationRepository::fetchConversation)
.with(eq(event.conversationId))
.wasInvoked(exactly = once)
}

@Test
Expand Down Expand Up @@ -167,7 +197,8 @@ class MemberJoinEventHandlerTest {
private val memberJoinEventHandler: MemberJoinEventHandler = MemberJoinEventHandlerImpl(
conversationRepository,
userRepository,
persistMessage
persistMessage,
TestUser.SELF.id
)

fun withPersistingMessageReturning(result: Either<CoreFailure, Unit>) = apply {
Expand All @@ -182,13 +213,21 @@ class MemberJoinEventHandlerTest {
.suspendFunction(conversationRepository::fetchConversationIfUnknown)
.whenInvokedWith(any())
.thenReturn(Either.Right(Unit))
given(conversationRepository)
.suspendFunction(conversationRepository::fetchConversation)
.whenInvokedWith(any())
.thenReturn(Either.Right(Unit))
}

fun withFetchConversationIfUnknownFailing(coreFailure: CoreFailure) = apply {
given(conversationRepository)
.suspendFunction(conversationRepository::fetchConversationIfUnknown)
.whenInvokedWith(any())
.thenReturn(Either.Left(coreFailure))
given(conversationRepository)
.suspendFunction(conversationRepository::fetchConversation)
.whenInvokedWith(any())
.thenReturn(Either.Left(coreFailure))
}

fun withPersistMembersSucceeding() = apply {
Expand Down
Loading

0 comments on commit 6bd12d1

Please sign in to comment.