Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: invalidate conversation members [WPB-3047] #1878

Merged
merged 6 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.wire.kalium.logic.data.id.toDao
import com.wire.kalium.logic.data.id.toModel
import com.wire.kalium.logic.data.message.MessageMapper
import com.wire.kalium.logic.data.message.UnreadEventType
import com.wire.kalium.logic.data.sync.SlowSyncRepository
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.di.MapperProvider
import com.wire.kalium.logic.feature.SelfTeamIdProvider
Expand All @@ -48,6 +49,7 @@ import com.wire.kalium.logic.functional.mapRight
import com.wire.kalium.logic.functional.onFailure
import com.wire.kalium.logic.functional.onSuccess
import com.wire.kalium.logic.kaliumLogger
import com.wire.kalium.logic.sync.slow.CURRENT_VERSION
import com.wire.kalium.logic.wrapApiRequest
import com.wire.kalium.logic.wrapCryptoRequest
import com.wire.kalium.logic.wrapMLSRequest
Expand Down Expand Up @@ -95,7 +97,8 @@ interface ConversationRepository {
suspend fun persistConversations(
conversations: List<ConversationResponse>,
selfUserTeamId: String?,
originatedFromEvent: Boolean = false
originatedFromEvent: Boolean = false,
invalidateMembers: Boolean = false
): Either<CoreFailure, Unit>

suspend fun getConversationList(): Either<StorageFailure, Flow<List<Conversation>>>
Expand Down Expand Up @@ -201,14 +204,15 @@ internal class ConversationDataSource internal constructor(
private val messageDAO: MessageDAO,
private val clientDAO: ClientDAO,
private val clientApi: ClientApi,
private val slowSyncRepository: SlowSyncRepository, // temporary to resolve issue https://wearezeta.atlassian.net/browse/WPB-3047
private val idMapper: IdMapper = MapperProvider.idMapper(),
private val conversationMapper: ConversationMapper = MapperProvider.conversationMapper(),
private val memberMapper: MemberMapper = MapperProvider.memberMapper(),
private val conversationStatusMapper: ConversationStatusMapper = MapperProvider.conversationStatusMapper(),
private val conversationRoleMapper: ConversationRoleMapper = MapperProvider.conversationRoleMapper(),
private val protocolInfoMapper: ProtocolInfoMapper = MapperProvider.protocolInfoMapper(),
private val messageMapper: MessageMapper = MapperProvider.messageMapper(selfUserId),
private val receiptModeMapper: ReceiptModeMapper = MapperProvider.receiptModeMapper()
private val receiptModeMapper: ReceiptModeMapper = MapperProvider.receiptModeMapper(),
) : ConversationRepository {

// TODO:I would suggest preparing another suspend func getSelfUser to get nullable self user,
Expand Down Expand Up @@ -251,7 +255,18 @@ internal class ConversationDataSource internal constructor(
kaliumLogger.withFeatureId(CONVERSATIONS)
.d("Skipping ${conversations.conversationsNotFound.size} conversations not found")
}
persistConversations(conversations.conversationsFound, selfTeamIdProvider().getOrNull()?.value)
// temporary solution to force invalidating all removed members
val lastVersion = slowSyncRepository.getSlowSyncVersion()

persistConversations(
conversations = conversations.conversationsFound,
selfUserTeamId = selfTeamIdProvider().getOrNull()?.value,
invalidateMembers = CURRENT_VERSION > lastVersion
)
if (CURRENT_VERSION > lastVersion) {
slowSyncRepository.setSlowSyncVersion(CURRENT_VERSION)
}

}.onFailure {
kaliumLogger.withFeatureId(CONVERSATIONS).e("Error fetching conversation details $it")
}
Expand All @@ -271,6 +286,7 @@ internal class ConversationDataSource internal constructor(
conversations: List<ConversationResponse>,
selfUserTeamId: String?,
originatedFromEvent: Boolean,
invalidateMembers: Boolean
) = wrapStorageRequest {
val conversationEntities = conversations
// TODO work-around for a bug in the backend. Can be removed when fixed: https://wearezeta.atlassian.net/browse/FS-1262
Expand All @@ -289,8 +305,14 @@ internal class ConversationDataSource internal constructor(
}
conversationDAO.insertConversations(conversationEntities)
conversations.forEach { conversationsResponse ->
// do the cleanup of members from conversation in case when self user rejoined conversation
// and may not received any member remove or leave events
if (invalidateMembers) {
conversationDAO.deleteMembersFromConversation(idMapper.fromApiToDao(conversationsResponse.id))
}
conversationDAO.insertMembersWithQualifiedId(
memberMapper.fromApiModelToDaoModel(conversationsResponse.members), idMapper.fromApiToDao(conversationsResponse.id)
memberMapper.fromApiModelToDaoModel(conversationsResponse.members),
idMapper.fromApiToDao(conversationsResponse.id)
)
}
}
Expand Down Expand Up @@ -374,7 +396,7 @@ internal class ConversationDataSource internal constructor(
conversationApi.fetchConversationDetails(conversationID.toApi())
}.flatMap {
val selfUserTeamId = selfTeamIdProvider().getOrNull()
persistConversations(listOf(it), selfUserTeamId?.value)
persistConversations(listOf(it), selfUserTeamId?.value, invalidateMembers = true)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,8 @@ class UserSessionScope internal constructor(
authenticatedNetworkContainer.conversationApi,
userStorage.database.messageDAO,
userStorage.database.clientDAO,
authenticatedNetworkContainer.clientApi
authenticatedNetworkContainer.clientApi,
slowSyncRepository
)

private val conversationGroupRepository: ConversationGroupRepository
Expand Down Expand Up @@ -988,7 +989,7 @@ class UserSessionScope internal constructor(
)
private val memberJoinHandler: MemberJoinEventHandler
get() = MemberJoinEventHandlerImpl(
conversationRepository, userRepository, persistMessage
conversationRepository, userRepository, persistMessage, userId
)
private val memberLeaveHandler: MemberLeaveEventHandler
get() = MemberLeaveEventHandlerImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import com.wire.kalium.logic.data.event.logEventProcessing
import com.wire.kalium.logic.data.message.Message
import com.wire.kalium.logic.data.message.MessageContent
import com.wire.kalium.logic.data.message.PersistMessageUseCase
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.data.user.UserRepository
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.functional.onFailure
import com.wire.kalium.logic.functional.onSuccess
import com.wire.kalium.logic.kaliumLogger
Expand All @@ -42,14 +44,23 @@ interface MemberJoinEventHandler {
internal class MemberJoinEventHandlerImpl(
private val conversationRepository: ConversationRepository,
private val userRepository: UserRepository,
private val persistMessage: PersistMessageUseCase
private val persistMessage: PersistMessageUseCase,
private val selfUserId: UserId
) : MemberJoinEventHandler {
private val logger by lazy { kaliumLogger.withFeatureId(KaliumLogger.Companion.ApplicationFlow.EVENT_RECEIVER) }

override suspend fun handle(event: Event.Conversation.MemberJoin) =
// Attempt to fetch conversation details if needed, as this might be an unknown conversation
conversationRepository.fetchConversationIfUnknown(event.conversationId)
.run {
conversationRepository.getConversationMembers(event.conversationId)
.map {
// we need to force fetching conversation when self user rejoined to conversation,
// because he may not received member change events
if (it.contains(selfUserId)) {
conversationRepository.fetchConversation(event.conversationId)
} else {
conversationRepository.fetchConversationIfUnknown(event.conversationId)
}
}.run {
onSuccess {
val logMap = mapOf(
"event" to event.toLogMap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ internal class SlowSyncManager(
val MIN_RETRY_DELAY = 1.seconds
val MAX_RETRY_DELAY = 10.minutes
val MIN_TIME_BETWEEN_SLOW_SYNCS = 7.days
const val CURRENT_VERSION = 2 // bump this version to perform slow sync when some new feature flag was added
}
}

const val CURRENT_VERSION = 3 // bump this version to perform slow sync when some new feature flag was added
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ VALUES (?, ?, ?);
deleteMember:
DELETE FROM Member WHERE conversation = ? AND user = ?;

deleteMembersFromConversation:
DELETE FROM Member WHERE conversation = ?;
Garzas marked this conversation as resolved.
Show resolved Hide resolved

Garzas marked this conversation as resolved.
Show resolved Hide resolved
selectAllMembersByConversation:
SELECT * FROM Member WHERE conversation LIKE ('%' || :searchQuery || '%');

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ interface ConversationDAO {
suspend fun insertMember(member: Member, conversationID: QualifiedIDEntity)
suspend fun updateMember(member: Member, conversationID: QualifiedIDEntity)
suspend fun insertMembersWithQualifiedId(memberList: List<Member>, conversationID: QualifiedIDEntity)
suspend fun insertMembers(memberList: List<Member>, groupId: String)
suspend fun deleteMemberByQualifiedID(userID: QualifiedIDEntity, conversationID: QualifiedIDEntity)
suspend fun deleteMembersByQualifiedID(userIDList: List<QualifiedIDEntity>, conversationID: QualifiedIDEntity)
suspend fun deleteMembersByQualifiedID(userIDList: List<QualifiedIDEntity>, groupId: String)
Expand Down Expand Up @@ -210,4 +209,5 @@ interface ConversationDAO {
suspend fun updateMessageTimer(conversationId: QualifiedIDEntity, messageTimer: Long?)
suspend fun updateUserMessageTimer(conversationId: QualifiedIDEntity, messageTimer: Long?)
suspend fun clearContent(conversationId: QualifiedIDEntity)
suspend fun deleteMembersFromConversation(conversationID: QualifiedIDEntity)
}
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,11 @@ class ConversationDAOImpl(
nonSuspendInsertMembersWithQualifiedId(memberList, conversationID)
}

override suspend fun deleteMembersFromConversation(conversationID: QualifiedIDEntity) =
withContext(coroutineContext) {
memberQueries.deleteMembersFromConversation(conversationID)
}

private fun nonSuspendInsertMembersWithQualifiedId(memberList: List<Member>, conversationID: QualifiedIDEntity) =
memberQueries.transaction {
for (member: Member in memberList) {
Expand All @@ -409,14 +414,6 @@ class ConversationDAOImpl(
}
}

override suspend fun insertMembers(memberList: List<Member>, groupId: String) {
withContext(coroutineContext) {
getConversationByGroupID(groupId).firstOrNull()?.let {
nonSuspendInsertMembersWithQualifiedId(memberList, it.id)
}
}
}

override suspend fun updateOrInsertOneOnOneMemberWithConnectionStatus(
member: Member,
status: ConnectionEntity.State,
Expand Down
Loading