Skip to content

Commit

Permalink
fix: invalidate conversation members [WPB-3047] (#1878)
Browse files Browse the repository at this point in the history
* fix: invalidate conversation member when user rejoins conversation

* detekt fix

* added tests

* index fix

---------

Co-authored-by: Mohamad Jaara <[email protected]>
  • Loading branch information
Garzas and MohamadJaara authored Jul 14, 2023
1 parent 211ae8a commit 7f3c540
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ interface ConversationRepository {
suspend fun persistConversations(
conversations: List<ConversationResponse>,
selfUserTeamId: String?,
originatedFromEvent: Boolean = false
originatedFromEvent: Boolean = false,
invalidateMembers: Boolean = false
): Either<CoreFailure, Unit>

suspend fun getConversationList(): Either<StorageFailure, Flow<List<Conversation>>>
Expand Down Expand Up @@ -204,7 +205,7 @@ internal class ConversationDataSource internal constructor(
private val conversationRoleMapper: ConversationRoleMapper = MapperProvider.conversationRoleMapper(),
private val protocolInfoMapper: ProtocolInfoMapper = MapperProvider.protocolInfoMapper(),
private val messageMapper: MessageMapper = MapperProvider.messageMapper(selfUserId),
private val receiptModeMapper: ReceiptModeMapper = MapperProvider.receiptModeMapper()
private val receiptModeMapper: ReceiptModeMapper = MapperProvider.receiptModeMapper(),
) : ConversationRepository {

// TODO:I would suggest preparing another suspend func getSelfUser to get nullable self user,
Expand Down Expand Up @@ -247,7 +248,12 @@ internal class ConversationDataSource internal constructor(
kaliumLogger.withFeatureId(CONVERSATIONS)
.d("Skipping ${conversations.conversationsNotFound.size} conversations not found")
}
persistConversations(conversations.conversationsFound, selfTeamIdProvider().getOrNull()?.value)
persistConversations(
conversations = conversations.conversationsFound,
selfUserTeamId = selfTeamIdProvider().getOrNull()?.value,
invalidateMembers = true
)

}.onFailure {
kaliumLogger.withFeatureId(CONVERSATIONS).e("Error fetching conversation details $it")
}
Expand All @@ -267,6 +273,7 @@ internal class ConversationDataSource internal constructor(
conversations: List<ConversationResponse>,
selfUserTeamId: String?,
originatedFromEvent: Boolean,
invalidateMembers: Boolean
) = wrapStorageRequest {
val conversationEntities = conversations
// TODO work-around for a bug in the backend. Can be removed when fixed: https://wearezeta.atlassian.net/browse/FS-1262
Expand All @@ -285,9 +292,19 @@ internal class ConversationDataSource internal constructor(
}
conversationDAO.insertConversations(conversationEntities)
conversations.forEach { conversationsResponse ->
conversationDAO.insertMembersWithQualifiedId(
memberMapper.fromApiModelToDaoModel(conversationsResponse.members), idMapper.fromApiToDao(conversationsResponse.id)
)
// do the cleanup of members from conversation in case when self user rejoined conversation
// and may not received any member remove or leave events
if (invalidateMembers) {
conversationDAO.updateFullMemberList(
memberMapper.fromApiModelToDaoModel(conversationsResponse.members),
idMapper.fromApiToDao(conversationsResponse.id)
)
} else {
conversationDAO.insertMembersWithQualifiedId(
memberMapper.fromApiModelToDaoModel(conversationsResponse.members),
idMapper.fromApiToDao(conversationsResponse.id)
)
}
}
}

Expand Down Expand Up @@ -370,7 +387,7 @@ internal class ConversationDataSource internal constructor(
conversationApi.fetchConversationDetails(conversationID.toApi())
}.flatMap {
val selfUserTeamId = selfTeamIdProvider().getOrNull()
persistConversations(listOf(it), selfUserTeamId?.value)
persistConversations(listOf(it), selfUserTeamId?.value, invalidateMembers = true)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ class UserSessionScope internal constructor(
)
private val memberJoinHandler: MemberJoinEventHandler
get() = MemberJoinEventHandlerImpl(
conversationRepository, userRepository, persistMessage
conversationRepository, userRepository, persistMessage, userId
)
private val memberLeaveHandler: MemberLeaveEventHandler
get() = MemberLeaveEventHandlerImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.wire.kalium.logic.data.event.logEventProcessing
import com.wire.kalium.logic.data.message.Message
import com.wire.kalium.logic.data.message.MessageContent
import com.wire.kalium.logic.data.message.PersistMessageUseCase
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.data.user.UserRepository
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.onFailure
Expand All @@ -42,13 +43,19 @@ interface MemberJoinEventHandler {
internal class MemberJoinEventHandlerImpl(
private val conversationRepository: ConversationRepository,
private val userRepository: UserRepository,
private val persistMessage: PersistMessageUseCase
private val persistMessage: PersistMessageUseCase,
private val selfUserId: UserId
) : MemberJoinEventHandler {
private val logger by lazy { kaliumLogger.withFeatureId(KaliumLogger.Companion.ApplicationFlow.EVENT_RECEIVER) }

override suspend fun handle(event: Event.Conversation.MemberJoin) =
// Attempt to fetch conversation details if needed, as this might be an unknown conversation
conversationRepository.fetchConversationIfUnknown(event.conversationId)
// we need to force fetching conversation when self user rejoined to conversation,
// because he may not received member change events
if (event.members.map { it.id }.contains(selfUserId)) {
conversationRepository.fetchConversation(event.conversationId)
} else {
conversationRepository.fetchConversationIfUnknown(event.conversationId)
}
.run {
onSuccess {
val logMap = mapOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ internal class SlowSyncManager(
val MIN_RETRY_DELAY = 1.seconds
val MAX_RETRY_DELAY = 10.minutes
val MIN_TIME_BETWEEN_SLOW_SYNCS = 7.days
const val CURRENT_VERSION = 2 // bump this version to perform slow sync when some new feature flag was added
}
}

const val CURRENT_VERSION = 3 // bump this version to perform slow sync when some new feature flag was added
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ object TestUser {
val USER_ID = UserId(value, domain)
val OTHER_USER_ID = USER_ID.copy(value = "otherValue")
val OTHER_USER_ID_2 = USER_ID.copy(value = "otherValue2")
val OTHER_FEDERATED_USER_ID = USER_ID.copy(value = "otherValue", "otherDomain")
val ENTITY_ID = QualifiedIDEntity(value, domain)
val NETWORK_ID = com.wire.kalium.network.api.base.model.UserId(
value = value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ import kotlin.test.Test
class MemberJoinEventHandlerTest {

@Test
fun givenMemberJoinEvent_whenHandlingIt_thenShouldFetchConversationIfUnknown() = runTest {
val newMembers = listOf(Member(TestUser.USER_ID, Member.Role.Member))
fun givenMemberJoinEventWithoutSelfUser_whenHandlingIt_thenShouldFetchConversationIfUnknown() = runTest {
val newMembers = listOf(Member(TestUser.OTHER_FEDERATED_USER_ID, Member.Role.Member))
val event = TestEvent.memberJoin(members = newMembers)

val (arrangement, eventHandler) = Arrangement()
Expand All @@ -64,6 +64,36 @@ class MemberJoinEventHandlerTest {
.suspendFunction(arrangement.conversationRepository::fetchConversationIfUnknown)
.with(eq(event.conversationId))
.wasInvoked(exactly = once)

verify(arrangement.conversationRepository)
.suspendFunction(arrangement.conversationRepository::fetchConversation)
.with(eq(event.conversationId))
.wasNotInvoked()
}

@Test
fun givenMemberJoinEventWithSelfUser_whenHandlingIt_thenShouldFetchConversation() = runTest {
val newMembers = listOf(Member(TestUser.SELF.id, Member.Role.Member))
val event = TestEvent.memberJoin(members = newMembers)

val (arrangement, eventHandler) = Arrangement()
.withPersistingMessageReturning(Either.Right(Unit))
.withFetchConversationIfUnknownSucceeding()
.withPersistMembersSucceeding()
.withFetchUsersIfUnknownByIdsReturning(Either.Right(Unit))
.arrange()

eventHandler.handle(event)

verify(arrangement.conversationRepository)
.suspendFunction(arrangement.conversationRepository::fetchConversationIfUnknown)
.with(eq(event.conversationId))
.wasNotInvoked()

verify(arrangement.conversationRepository)
.suspendFunction(arrangement.conversationRepository::fetchConversation)
.with(eq(event.conversationId))
.wasInvoked(exactly = once)
}

@Test
Expand Down Expand Up @@ -167,7 +197,8 @@ class MemberJoinEventHandlerTest {
private val memberJoinEventHandler: MemberJoinEventHandler = MemberJoinEventHandlerImpl(
conversationRepository,
userRepository,
persistMessage
persistMessage,
TestUser.SELF.id
)

fun withPersistingMessageReturning(result: Either<CoreFailure, Unit>) = apply {
Expand All @@ -182,13 +213,21 @@ class MemberJoinEventHandlerTest {
.suspendFunction(conversationRepository::fetchConversationIfUnknown)
.whenInvokedWith(any())
.thenReturn(Either.Right(Unit))
given(conversationRepository)
.suspendFunction(conversationRepository::fetchConversation)
.whenInvokedWith(any())
.thenReturn(Either.Right(Unit))
}

fun withFetchConversationIfUnknownFailing(coreFailure: CoreFailure) = apply {
given(conversationRepository)
.suspendFunction(conversationRepository::fetchConversationIfUnknown)
.whenInvokedWith(any())
.thenReturn(Either.Left(coreFailure))
given(conversationRepository)
.suspendFunction(conversationRepository::fetchConversation)
.whenInvokedWith(any())
.thenReturn(Either.Left(coreFailure))
}

fun withPersistMembersSucceeding() = apply {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@ CREATE TABLE Member (
FOREIGN KEY (user) REFERENCES User(qualified_id) ON DELETE CASCADE
);

CREATE INDEX member_conversation_index ON Member(conversation);

insertMember:
INSERT OR IGNORE INTO Member(user, conversation, role)
VALUES (?, ?, ?);

deleteMember:
DELETE FROM Member WHERE conversation = ? AND user = ?;

deleteMembersFromConversation:
DELETE FROM Member WHERE conversation = ?;

selectAllMembersByConversation:
SELECT * FROM Member WHERE conversation LIKE ('%' || :searchQuery || '%');

Expand Down
1 change: 1 addition & 0 deletions persistence/src/commonMain/db_user/migrations/43.sqm
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE INDEX member_conversation_index ON Member(conversation);
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,5 @@ interface ConversationDAO {
suspend fun updateMessageTimer(conversationId: QualifiedIDEntity, messageTimer: Long?)
suspend fun updateUserMessageTimer(conversationId: QualifiedIDEntity, messageTimer: Long?)
suspend fun clearContent(conversationId: QualifiedIDEntity)
suspend fun updateFullMemberList(memberList: List<Member>, conversationID: QualifiedIDEntity)
}
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,17 @@ class ConversationDAOImpl(
nonSuspendInsertMembersWithQualifiedId(memberList, conversationID)
}

override suspend fun updateFullMemberList(memberList: List<Member>, conversationID: QualifiedIDEntity) =
withContext(coroutineContext) {
memberQueries.transaction {
memberQueries.deleteMembersFromConversation(conversationID)
for (member: Member in memberList) {
userQueries.insertOrIgnoreUserId(member.user)
memberQueries.insertMember(member.user, conversationID, member.role)
}
}
}

private fun nonSuspendInsertMembersWithQualifiedId(memberList: List<Member>, conversationID: QualifiedIDEntity) =
memberQueries.transaction {
for (member: Member in memberList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertContentEquals
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertNotNull
import kotlin.test.assertNull
import kotlin.test.assertTrue
Expand Down Expand Up @@ -952,6 +953,40 @@ class ConversationDAOTest : BaseDatabaseTest() {
}
}

@Test
fun givenConversationWithExistingMember_whenUpdateFullMemberListIsCalled_thenExistingMemberIsRemovedAndNewMembersAreAdded() = runTest {
// Given
val conversationID = conversationEntity1.id
val memberList = listOf(
Member(user1.id, Member.Role.Admin),
Member(user2.id, Member.Role.Member)
)

// Insert a conversation, user, and a member into the conversation to test the deletion operation
val oldMember = Member(user3.id, Member.Role.Member)
userDAO.insertUser(user3)
conversationDAO.insertConversation(conversationEntity1)
conversationDAO.insertMember(oldMember, conversationID)

// Ensure all new users are inserted before calling updateFullMemberList
memberList.forEach { member ->
userDAO.insertUser(user1.copy(id = member.user))
}

// When
conversationDAO.updateFullMemberList(memberList, conversationID)

// Then
// Fetch the members of the conversation
val fetchedMembers = conversationDAO.getAllMembers(conversationID).first()

// Assert that the old member was deleted
assertFalse(fetchedMembers.any { it.user == oldMember.user })

// Assert that the new members were inserted
assertTrue(fetchedMembers.containsAll(memberList))
}

private suspend fun insertTeamUserAndMember(team: TeamEntity, user: UserEntity, conversationId: QualifiedIDEntity) {
teamDAO.insertTeam(team)
userDAO.insertUser(user)
Expand Down

0 comments on commit 7f3c540

Please sign in to comment.