diff --git a/Sources/XMTPiOS/Conversations.swift b/Sources/XMTPiOS/Conversations.swift index ad721e68..91c47095 100644 --- a/Sources/XMTPiOS/Conversations.swift +++ b/Sources/XMTPiOS/Conversations.swift @@ -87,11 +87,22 @@ class StreamManager { } } +actor FfiStreamActor { + private var ffiStream: FfiStreamCloser? + + func setFfiStream(_ stream: FfiStreamCloser?) { + ffiStream = stream + } + + func endStream() { + ffiStream?.end() + } +} + /// Handles listing and creating Conversations. public actor Conversations { var client: Client var conversationsByTopic: [String: Conversation] = [:] - let streamHolder = StreamHolder() init(client: Client) { self.client = client @@ -130,7 +141,8 @@ public actor Conversations { public func streamGroups() async throws -> AsyncThrowingStream { AsyncThrowingStream { continuation in - let task = Task { + let ffiStreamActor = FfiStreamActor() + let task = Task { let groupCallback = GroupStreamCallback(client: self.client) { group in guard !Task.isCancelled else { continuation.finish() @@ -142,24 +154,28 @@ public actor Conversations { continuation.finish(throwing: GroupError.streamingFailure) return } - - self.streamHolder.stream = stream + await ffiStreamActor.setFfiStream(stream) continuation.onTermination = { @Sendable reason in - stream.end() + Task { + await ffiStreamActor.endStream() + } } } continuation.onTermination = { @Sendable reason in task.cancel() - self.streamHolder.stream?.end() + Task { + await ffiStreamActor.endStream() + } } } } private func streamGroupConversations() -> AsyncThrowingStream { AsyncThrowingStream { continuation in + let ffiStreamActor = FfiStreamActor() let task = Task { - self.streamHolder.stream = await self.client.v3Client?.conversations().stream( + let stream = await self.client.v3Client?.conversations().stream( callback: GroupStreamCallback(client: self.client) { group in guard !Task.isCancelled else { continuation.finish() @@ -168,14 +184,19 @@ public actor Conversations { continuation.yield(Conversation.group(group)) } ) + await ffiStreamActor.setFfiStream(stream) continuation.onTermination = { @Sendable reason in - self.streamHolder.stream?.end() + Task { + await ffiStreamActor.endStream() + } } } continuation.onTermination = { @Sendable reason in task.cancel() - self.streamHolder.stream?.end() + Task { + await ffiStreamActor.endStream() + } } } } @@ -412,12 +433,15 @@ public actor Conversations { public func streamAllGroupMessages() -> AsyncThrowingStream { AsyncThrowingStream { continuation in + let ffiStreamActor = FfiStreamActor() let task = Task { - self.streamHolder.stream = await self.client.v3Client?.conversations().streamAllMessages( + let stream = await self.client.v3Client?.conversations().streamAllMessages( messageCallback: MessageCallback(client: self.client) { message in guard !Task.isCancelled else { continuation.finish() - self.streamHolder.stream?.end() // End the stream upon cancellation + Task { + await ffiStreamActor.endStream() // End the stream upon cancellation + } return } do { @@ -427,24 +451,26 @@ public actor Conversations { } } ) + await ffiStreamActor.setFfiStream(stream) } continuation.onTermination = { _ in task.cancel() - self.streamHolder.stream?.end() + Task { + await ffiStreamActor.endStream() + } } } } public func streamAllMessages(includeGroups: Bool = false) -> AsyncThrowingStream { AsyncThrowingStream { continuation in - @Sendable func forwardStreamToMerged(stream: AsyncThrowingStream) async { + @Sendable func forwardStreamToMerged(stream: AsyncThrowingStream) async { do { var iterator = stream.makeAsyncIterator() while let element = try await iterator.next() { guard !Task.isCancelled else { continuation.finish() - self.streamHolder.stream?.end() return } continuation.yield(element) @@ -456,30 +482,31 @@ public actor Conversations { } let task = Task { - await forwardStreamToMerged(stream: streamAllV2Messages()) - } - - if includeGroups { - Task { - await forwardStreamToMerged(stream: streamAllGroupMessages()) - } + await forwardStreamToMerged(stream: streamAllV2Messages()) } + + let groupTask = includeGroups ? Task { + await forwardStreamToMerged(stream: streamAllGroupMessages()) + } : nil continuation.onTermination = { _ in task.cancel() - self.streamHolder.stream?.end() + groupTask?.cancel() } } } public func streamAllGroupDecryptedMessages() -> AsyncThrowingStream { AsyncThrowingStream { continuation in + let ffiStreamActor = FfiStreamActor() let task = Task { - self.streamHolder.stream = await self.client.v3Client?.conversations().streamAllMessages( + let stream = await self.client.v3Client?.conversations().streamAllMessages( messageCallback: MessageCallback(client: self.client) { message in guard !Task.isCancelled else { continuation.finish() - self.streamHolder.stream?.end() // End the stream upon cancellation + Task { + await ffiStreamActor.endStream() // End the stream upon cancellation + } return } do { @@ -489,24 +516,26 @@ public actor Conversations { } } ) + await ffiStreamActor.setFfiStream(stream) } continuation.onTermination = { _ in task.cancel() - self.streamHolder.stream?.end() + Task { + await ffiStreamActor.endStream() + } } } } public func streamAllDecryptedMessages(includeGroups: Bool = false) -> AsyncThrowingStream { AsyncThrowingStream { continuation in - @Sendable func forwardStreamToMerged(stream: AsyncThrowingStream) async { + @Sendable func forwardStreamToMerged(stream: AsyncThrowingStream) async { do { var iterator = stream.makeAsyncIterator() while let element = try await iterator.next() { guard !Task.isCancelled else { continuation.finish() - self.streamHolder.stream?.end() return } continuation.yield(element) @@ -520,16 +549,14 @@ public actor Conversations { let task = Task { await forwardStreamToMerged(stream: streamAllV2DecryptedMessages()) } - - if includeGroups { - Task { - await forwardStreamToMerged(stream: streamAllGroupDecryptedMessages()) - } - } + + let groupTask = includeGroups ? Task { + await forwardStreamToMerged(stream: streamAllGroupDecryptedMessages()) + } : nil continuation.onTermination = { _ in - task.cancel() - self.streamHolder.stream?.end() + task.cancel() + groupTask?.cancel() } } } diff --git a/Tests/XMTPTests/GroupTests.swift b/Tests/XMTPTests/GroupTests.swift index 6dfb9c61..ba43bdbd 100644 --- a/Tests/XMTPTests/GroupTests.swift +++ b/Tests/XMTPTests/GroupTests.swift @@ -225,7 +225,7 @@ class GroupTests: XCTestCase { try await group.sync() let members = try await group.members.map(\.inboxId).sorted() - let peerMembers = try Conversation.group(group).peerAddresses.sorted() + let peerMembers = try await group.peerInboxIds.sorted() XCTAssertEqual([fixtures.bobClient.inboxID, fixtures.aliceClient.inboxID].sorted(), members) XCTAssertEqual([fixtures.bobClient.inboxID].sorted(), peerMembers) diff --git a/XMTP.podspec b/XMTP.podspec index 99b4f1ce..52039b35 100644 --- a/XMTP.podspec +++ b/XMTP.podspec @@ -16,7 +16,7 @@ Pod::Spec.new do |spec| # spec.name = "XMTP" - spec.version = "0.14.17" + spec.version = "0.14.18" spec.summary = "XMTP SDK Cocoapod" # This description is used to generate tags and improve search results.