diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/ConversationRepository.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/ConversationRepository.kt index 0d3d7119a2e..bcdcb2b2701 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/ConversationRepository.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/conversation/ConversationRepository.kt @@ -92,7 +92,8 @@ interface ConversationRepository { suspend fun persistConversations( conversations: List, selfUserTeamId: String?, - originatedFromEvent: Boolean = false + originatedFromEvent: Boolean = false, + invalidateMembers: Boolean = false ): Either suspend fun getConversationList(): Either>> @@ -204,7 +205,7 @@ internal class ConversationDataSource internal constructor( private val conversationRoleMapper: ConversationRoleMapper = MapperProvider.conversationRoleMapper(), private val protocolInfoMapper: ProtocolInfoMapper = MapperProvider.protocolInfoMapper(), private val messageMapper: MessageMapper = MapperProvider.messageMapper(selfUserId), - private val receiptModeMapper: ReceiptModeMapper = MapperProvider.receiptModeMapper() + private val receiptModeMapper: ReceiptModeMapper = MapperProvider.receiptModeMapper(), ) : ConversationRepository { // TODO:I would suggest preparing another suspend func getSelfUser to get nullable self user, @@ -247,7 +248,12 @@ internal class ConversationDataSource internal constructor( kaliumLogger.withFeatureId(CONVERSATIONS) .d("Skipping ${conversations.conversationsNotFound.size} conversations not found") } - persistConversations(conversations.conversationsFound, selfTeamIdProvider().getOrNull()?.value) + persistConversations( + conversations = conversations.conversationsFound, + selfUserTeamId = selfTeamIdProvider().getOrNull()?.value, + invalidateMembers = true + ) + }.onFailure { kaliumLogger.withFeatureId(CONVERSATIONS).e("Error fetching conversation details $it") } @@ -267,6 +273,7 @@ internal class ConversationDataSource internal constructor( conversations: List, selfUserTeamId: String?, originatedFromEvent: Boolean, + invalidateMembers: Boolean ) = wrapStorageRequest { val conversationEntities = conversations // TODO work-around for a bug in the backend. Can be removed when fixed: https://wearezeta.atlassian.net/browse/FS-1262 @@ -285,9 +292,19 @@ internal class ConversationDataSource internal constructor( } conversationDAO.insertConversations(conversationEntities) conversations.forEach { conversationsResponse -> - conversationDAO.insertMembersWithQualifiedId( - memberMapper.fromApiModelToDaoModel(conversationsResponse.members), idMapper.fromApiToDao(conversationsResponse.id) - ) + // do the cleanup of members from conversation in case when self user rejoined conversation + // and may not received any member remove or leave events + if (invalidateMembers) { + conversationDAO.updateFullMemberList( + memberMapper.fromApiModelToDaoModel(conversationsResponse.members), + idMapper.fromApiToDao(conversationsResponse.id) + ) + } else { + conversationDAO.insertMembersWithQualifiedId( + memberMapper.fromApiModelToDaoModel(conversationsResponse.members), + idMapper.fromApiToDao(conversationsResponse.id) + ) + } } } @@ -370,7 +387,7 @@ internal class ConversationDataSource internal constructor( conversationApi.fetchConversationDetails(conversationID.toApi()) }.flatMap { val selfUserTeamId = selfTeamIdProvider().getOrNull() - persistConversations(listOf(it), selfUserTeamId?.value) + persistConversations(listOf(it), selfUserTeamId?.value, invalidateMembers = true) } } diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt index bb86f62569f..f20890e3df3 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt @@ -988,7 +988,7 @@ class UserSessionScope internal constructor( ) private val memberJoinHandler: MemberJoinEventHandler get() = MemberJoinEventHandlerImpl( - conversationRepository, userRepository, persistMessage + conversationRepository, userRepository, persistMessage, userId ) private val memberLeaveHandler: MemberLeaveEventHandler get() = MemberLeaveEventHandlerImpl( diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/conversation/MemberJoinEventHandler.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/conversation/MemberJoinEventHandler.kt index 4be64320bc4..a624fbb6aec 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/conversation/MemberJoinEventHandler.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/receiver/conversation/MemberJoinEventHandler.kt @@ -28,6 +28,7 @@ import com.wire.kalium.logic.data.event.logEventProcessing import com.wire.kalium.logic.data.message.Message import com.wire.kalium.logic.data.message.MessageContent import com.wire.kalium.logic.data.message.PersistMessageUseCase +import com.wire.kalium.logic.data.user.UserId import com.wire.kalium.logic.data.user.UserRepository import com.wire.kalium.logic.functional.Either import com.wire.kalium.logic.functional.onFailure @@ -42,13 +43,19 @@ interface MemberJoinEventHandler { internal class MemberJoinEventHandlerImpl( private val conversationRepository: ConversationRepository, private val userRepository: UserRepository, - private val persistMessage: PersistMessageUseCase + private val persistMessage: PersistMessageUseCase, + private val selfUserId: UserId ) : MemberJoinEventHandler { private val logger by lazy { kaliumLogger.withFeatureId(KaliumLogger.Companion.ApplicationFlow.EVENT_RECEIVER) } override suspend fun handle(event: Event.Conversation.MemberJoin) = - // Attempt to fetch conversation details if needed, as this might be an unknown conversation - conversationRepository.fetchConversationIfUnknown(event.conversationId) + // we need to force fetching conversation when self user rejoined to conversation, + // because he may not received member change events + if (event.members.map { it.id }.contains(selfUserId)) { + conversationRepository.fetchConversation(event.conversationId) + } else { + conversationRepository.fetchConversationIfUnknown(event.conversationId) + } .run { onSuccess { val logMap = mapOf( diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/slow/SlowSyncManager.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/slow/SlowSyncManager.kt index 623a1a3eb2c..97fbe3e5324 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/slow/SlowSyncManager.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/slow/SlowSyncManager.kt @@ -155,6 +155,7 @@ internal class SlowSyncManager( val MIN_RETRY_DELAY = 1.seconds val MAX_RETRY_DELAY = 10.minutes val MIN_TIME_BETWEEN_SLOW_SYNCS = 7.days - const val CURRENT_VERSION = 2 // bump this version to perform slow sync when some new feature flag was added } } + +const val CURRENT_VERSION = 3 // bump this version to perform slow sync when some new feature flag was added diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/framework/TestUser.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/framework/TestUser.kt index 01105e7396b..a4bc0453e0f 100644 --- a/logic/src/commonTest/kotlin/com/wire/kalium/logic/framework/TestUser.kt +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/framework/TestUser.kt @@ -45,6 +45,7 @@ object TestUser { val USER_ID = UserId(value, domain) val OTHER_USER_ID = USER_ID.copy(value = "otherValue") val OTHER_USER_ID_2 = USER_ID.copy(value = "otherValue2") + val OTHER_FEDERATED_USER_ID = USER_ID.copy(value = "otherValue", "otherDomain") val ENTITY_ID = QualifiedIDEntity(value, domain) val NETWORK_ID = com.wire.kalium.network.api.base.model.UserId( value = value, diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/conversation/MemberJoinEventHandlerTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/conversation/MemberJoinEventHandlerTest.kt index 5c43e690ece..23d13d177d3 100644 --- a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/conversation/MemberJoinEventHandlerTest.kt +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/receiver/conversation/MemberJoinEventHandlerTest.kt @@ -47,8 +47,8 @@ import kotlin.test.Test class MemberJoinEventHandlerTest { @Test - fun givenMemberJoinEvent_whenHandlingIt_thenShouldFetchConversationIfUnknown() = runTest { - val newMembers = listOf(Member(TestUser.USER_ID, Member.Role.Member)) + fun givenMemberJoinEventWithoutSelfUser_whenHandlingIt_thenShouldFetchConversationIfUnknown() = runTest { + val newMembers = listOf(Member(TestUser.OTHER_FEDERATED_USER_ID, Member.Role.Member)) val event = TestEvent.memberJoin(members = newMembers) val (arrangement, eventHandler) = Arrangement() @@ -64,6 +64,36 @@ class MemberJoinEventHandlerTest { .suspendFunction(arrangement.conversationRepository::fetchConversationIfUnknown) .with(eq(event.conversationId)) .wasInvoked(exactly = once) + + verify(arrangement.conversationRepository) + .suspendFunction(arrangement.conversationRepository::fetchConversation) + .with(eq(event.conversationId)) + .wasNotInvoked() + } + + @Test + fun givenMemberJoinEventWithSelfUser_whenHandlingIt_thenShouldFetchConversation() = runTest { + val newMembers = listOf(Member(TestUser.SELF.id, Member.Role.Member)) + val event = TestEvent.memberJoin(members = newMembers) + + val (arrangement, eventHandler) = Arrangement() + .withPersistingMessageReturning(Either.Right(Unit)) + .withFetchConversationIfUnknownSucceeding() + .withPersistMembersSucceeding() + .withFetchUsersIfUnknownByIdsReturning(Either.Right(Unit)) + .arrange() + + eventHandler.handle(event) + + verify(arrangement.conversationRepository) + .suspendFunction(arrangement.conversationRepository::fetchConversationIfUnknown) + .with(eq(event.conversationId)) + .wasNotInvoked() + + verify(arrangement.conversationRepository) + .suspendFunction(arrangement.conversationRepository::fetchConversation) + .with(eq(event.conversationId)) + .wasInvoked(exactly = once) } @Test @@ -167,7 +197,8 @@ class MemberJoinEventHandlerTest { private val memberJoinEventHandler: MemberJoinEventHandler = MemberJoinEventHandlerImpl( conversationRepository, userRepository, - persistMessage + persistMessage, + TestUser.SELF.id ) fun withPersistingMessageReturning(result: Either) = apply { @@ -182,6 +213,10 @@ class MemberJoinEventHandlerTest { .suspendFunction(conversationRepository::fetchConversationIfUnknown) .whenInvokedWith(any()) .thenReturn(Either.Right(Unit)) + given(conversationRepository) + .suspendFunction(conversationRepository::fetchConversation) + .whenInvokedWith(any()) + .thenReturn(Either.Right(Unit)) } fun withFetchConversationIfUnknownFailing(coreFailure: CoreFailure) = apply { @@ -189,6 +224,10 @@ class MemberJoinEventHandlerTest { .suspendFunction(conversationRepository::fetchConversationIfUnknown) .whenInvokedWith(any()) .thenReturn(Either.Left(coreFailure)) + given(conversationRepository) + .suspendFunction(conversationRepository::fetchConversation) + .whenInvokedWith(any()) + .thenReturn(Either.Left(coreFailure)) } fun withPersistMembersSucceeding() = apply { diff --git a/persistence/src/commonMain/db_user/com/wire/kalium/persistence/Members.sq b/persistence/src/commonMain/db_user/com/wire/kalium/persistence/Members.sq index baf11f7fcdc..89442da2dcc 100644 --- a/persistence/src/commonMain/db_user/com/wire/kalium/persistence/Members.sq +++ b/persistence/src/commonMain/db_user/com/wire/kalium/persistence/Members.sq @@ -10,6 +10,8 @@ CREATE TABLE Member ( FOREIGN KEY (user) REFERENCES User(qualified_id) ON DELETE CASCADE ); +CREATE INDEX member_conversation_index ON Member(conversation); + insertMember: INSERT OR IGNORE INTO Member(user, conversation, role) VALUES (?, ?, ?); @@ -17,6 +19,9 @@ VALUES (?, ?, ?); deleteMember: DELETE FROM Member WHERE conversation = ? AND user = ?; +deleteMembersFromConversation: +DELETE FROM Member WHERE conversation = ?; + selectAllMembersByConversation: SELECT * FROM Member WHERE conversation LIKE ('%' || :searchQuery || '%'); diff --git a/persistence/src/commonMain/db_user/migrations/43.sqm b/persistence/src/commonMain/db_user/migrations/43.sqm new file mode 100644 index 00000000000..bbabf9b5df8 --- /dev/null +++ b/persistence/src/commonMain/db_user/migrations/43.sqm @@ -0,0 +1 @@ +CREATE INDEX member_conversation_index ON Member(conversation); diff --git a/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/ConversationDAO.kt b/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/ConversationDAO.kt index f5a56c276f3..e782dfa8708 100644 --- a/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/ConversationDAO.kt +++ b/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/ConversationDAO.kt @@ -210,4 +210,5 @@ interface ConversationDAO { suspend fun updateMessageTimer(conversationId: QualifiedIDEntity, messageTimer: Long?) suspend fun updateUserMessageTimer(conversationId: QualifiedIDEntity, messageTimer: Long?) suspend fun clearContent(conversationId: QualifiedIDEntity) + suspend fun updateFullMemberList(memberList: List, conversationID: QualifiedIDEntity) } diff --git a/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/ConversationDAOImpl.kt b/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/ConversationDAOImpl.kt index 7223adde945..0607863499e 100644 --- a/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/ConversationDAOImpl.kt +++ b/persistence/src/commonMain/kotlin/com/wire/kalium/persistence/dao/ConversationDAOImpl.kt @@ -401,6 +401,17 @@ class ConversationDAOImpl( nonSuspendInsertMembersWithQualifiedId(memberList, conversationID) } + override suspend fun updateFullMemberList(memberList: List, conversationID: QualifiedIDEntity) = + withContext(coroutineContext) { + memberQueries.transaction { + memberQueries.deleteMembersFromConversation(conversationID) + for (member: Member in memberList) { + userQueries.insertOrIgnoreUserId(member.user) + memberQueries.insertMember(member.user, conversationID, member.role) + } + } + } + private fun nonSuspendInsertMembersWithQualifiedId(memberList: List, conversationID: QualifiedIDEntity) = memberQueries.transaction { for (member: Member in memberList) { diff --git a/persistence/src/commonTest/kotlin/com/wire/kalium/persistence/dao/ConversationDAOTest.kt b/persistence/src/commonTest/kotlin/com/wire/kalium/persistence/dao/ConversationDAOTest.kt index 5c0e5b5eae8..ccd052d35e6 100644 --- a/persistence/src/commonTest/kotlin/com/wire/kalium/persistence/dao/ConversationDAOTest.kt +++ b/persistence/src/commonTest/kotlin/com/wire/kalium/persistence/dao/ConversationDAOTest.kt @@ -43,6 +43,7 @@ import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertContentEquals import kotlin.test.assertEquals +import kotlin.test.assertFalse import kotlin.test.assertNotNull import kotlin.test.assertNull import kotlin.test.assertTrue @@ -952,6 +953,40 @@ class ConversationDAOTest : BaseDatabaseTest() { } } + @Test + fun givenConversationWithExistingMember_whenUpdateFullMemberListIsCalled_thenExistingMemberIsRemovedAndNewMembersAreAdded() = runTest { + // Given + val conversationID = conversationEntity1.id + val memberList = listOf( + Member(user1.id, Member.Role.Admin), + Member(user2.id, Member.Role.Member) + ) + + // Insert a conversation, user, and a member into the conversation to test the deletion operation + val oldMember = Member(user3.id, Member.Role.Member) + userDAO.insertUser(user3) + conversationDAO.insertConversation(conversationEntity1) + conversationDAO.insertMember(oldMember, conversationID) + + // Ensure all new users are inserted before calling updateFullMemberList + memberList.forEach { member -> + userDAO.insertUser(user1.copy(id = member.user)) + } + + // When + conversationDAO.updateFullMemberList(memberList, conversationID) + + // Then + // Fetch the members of the conversation + val fetchedMembers = conversationDAO.getAllMembers(conversationID).first() + + // Assert that the old member was deleted + assertFalse(fetchedMembers.any { it.user == oldMember.user }) + + // Assert that the new members were inserted + assertTrue(fetchedMembers.containsAll(memberList)) + } + private suspend fun insertTeamUserAndMember(team: TeamEntity, user: UserEntity, conversationId: QualifiedIDEntity) { teamDAO.insertTeam(team) userDAO.insertUser(user)