Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix conversationsByTopic concurrency issues #160

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions Sources/XMTP/Conversations.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public enum ConversationError: Error {
/// Handles listing and creating Conversations.
public class Conversations {
var client: Client
var conversationsByTopic: [String: Conversation] = [:]
@MainActor var conversationsByTopic: [String: Conversation] = [:]

init(client: Client) {
self.client = client
Expand All @@ -29,7 +29,9 @@ public class Conversations {
client: client
))
}
conversationsByTopic[conversation.topic] = conversation
async {
conversationsByTopic[conversation.topic] = conversation
}
return conversation
}

Expand All @@ -47,7 +49,7 @@ public class Conversations {
messages += try await client.apiClient.batchQuery(request: batch)
.responses.flatMap { (res) in
res.envelopes.compactMap { (envelope) in
let conversation = conversationsByTopic[envelope.contentTopic]
let conversation = await conversationsByTopic[envelope.contentTopic]
if conversation == nil {
print("discarding message, unknown conversation \(envelope)")
return nil
Expand Down Expand Up @@ -79,16 +81,20 @@ public class Conversations {

do {
for try await envelope in client.subscribe(topics: topics) {
if let conversation = conversationsByTopic[envelope.contentTopic] {
if let conversation = await conversationsByTopic[envelope.contentTopic] {
let decoded = try conversation.decode(envelope)
continuation.yield(decoded)
} else if envelope.contentTopic.hasPrefix("/xmtp/0/invite-") {
let conversation = try fromInvite(envelope: envelope)
conversationsByTopic[conversation.topic] = conversation
async {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these should be awaits, not async tasks. If you make a task here, the loop can break before the dictionary value is written, so it will miss the subscription.

conversationsByTopic[conversation.topic] = conversation
}
break // Break so we can resubscribe with the new conversation
} else if envelope.contentTopic.hasPrefix("/xmtp/0/intro-") {
let conversation = try fromIntro(envelope: envelope)
conversationsByTopic[conversation.topic] = conversation
async {
conversationsByTopic[conversation.topic] = conversation
}
let decoded = try conversation.decode(envelope)
continuation.yield(decoded)
break // Break so we can resubscribe with the new conversation
Expand Down Expand Up @@ -123,7 +129,7 @@ public class Conversations {
}

private func findExistingConversation(with peerAddress: String, conversationID: String?) -> Conversation? {
return conversationsByTopic.first(where: { $0.value.peerAddress == peerAddress &&
return await conversationsByTopic.first(where: { $0.value.peerAddress == peerAddress &&
(($0.value.conversationID ?? "") == (conversationID ?? ""))
})?.value
}
Expand Down Expand Up @@ -156,7 +162,9 @@ public class Conversations {
let conversationV2 = try ConversationV2.create(client: client, invitation: invitation, header: sealedInvitation.v1.header)

let conversation: Conversation = .v2(conversationV2)
conversationsByTopic[conversation.topic] = conversation
async {
conversationsByTopic[conversation.topic] = conversation
}
return conversation
}

Expand Down Expand Up @@ -202,7 +210,7 @@ public class Conversations {

public func list() async throws -> [Conversation] {
var newConversations: [Conversation] = []
let mostRecent = conversationsByTopic.values.max { a, b in
let mostRecent = await conversationsByTopic.values.max { a, b in
a.createdAt < b.createdAt
}
let pagination = Pagination(after: mostRecent?.createdAt)
Expand Down Expand Up @@ -235,10 +243,14 @@ public class Conversations {

newConversations
.filter { $0.peerAddress != client.address }
.forEach { conversationsByTopic[$0.topic] = $0 }
.forEach {
async {
conversationsByTopic[$0.topic] = $0
}
}

// TODO(perf): use DB to persist + sort
return conversationsByTopic.values.sorted { a, b in
return await conversationsByTopic.values.sorted { a, b in
a.createdAt < b.createdAt
}
}
Expand Down
2 changes: 1 addition & 1 deletion XMTP.podspec
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Pod::Spec.new do |spec|
#

spec.name = "XMTP"
spec.version = "0.5.7-alpha0"
spec.version = "0.5.8-alpha0"
spec.summary = "XMTP SDK Cocoapod"

# This description is used to generate tags and improve search results.
Expand Down