Skip to content

Commit

Permalink
Fixing group streams (#406)
Browse files Browse the repository at this point in the history
* Fixing group streams

* bump the pod

* get all the tests passing

---------

Co-authored-by: Naomi Plasterer <[email protected]>
  • Loading branch information
nmalzieu and nplasterer authored Sep 26, 2024
1 parent 7729103 commit 70502e2
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 37 deletions.
97 changes: 62 additions & 35 deletions Sources/XMTPiOS/Conversations.swift
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,22 @@ class StreamManager {
}
}

actor FfiStreamActor {
private var ffiStream: FfiStreamCloser?

func setFfiStream(_ stream: FfiStreamCloser?) {
ffiStream = stream
}

func endStream() {
ffiStream?.end()
}
}

/// Handles listing and creating Conversations.
public actor Conversations {
var client: Client
var conversationsByTopic: [String: Conversation] = [:]
let streamHolder = StreamHolder()

init(client: Client) {
self.client = client
Expand Down Expand Up @@ -130,7 +141,8 @@ public actor Conversations {

public func streamGroups() async throws -> AsyncThrowingStream<Group, Error> {
AsyncThrowingStream { continuation in
let task = Task {
let ffiStreamActor = FfiStreamActor()
let task = Task {
let groupCallback = GroupStreamCallback(client: self.client) { group in
guard !Task.isCancelled else {
continuation.finish()
Expand All @@ -142,24 +154,28 @@ public actor Conversations {
continuation.finish(throwing: GroupError.streamingFailure)
return
}

self.streamHolder.stream = stream
await ffiStreamActor.setFfiStream(stream)
continuation.onTermination = { @Sendable reason in
stream.end()
Task {
await ffiStreamActor.endStream()
}
}
}

continuation.onTermination = { @Sendable reason in
task.cancel()
self.streamHolder.stream?.end()
Task {
await ffiStreamActor.endStream()
}
}
}
}

private func streamGroupConversations() -> AsyncThrowingStream<Conversation, Error> {
AsyncThrowingStream { continuation in
let ffiStreamActor = FfiStreamActor()
let task = Task {
self.streamHolder.stream = await self.client.v3Client?.conversations().stream(
let stream = await self.client.v3Client?.conversations().stream(
callback: GroupStreamCallback(client: self.client) { group in
guard !Task.isCancelled else {
continuation.finish()
Expand All @@ -168,14 +184,19 @@ public actor Conversations {
continuation.yield(Conversation.group(group))
}
)
await ffiStreamActor.setFfiStream(stream)
continuation.onTermination = { @Sendable reason in
self.streamHolder.stream?.end()
Task {
await ffiStreamActor.endStream()
}
}
}

continuation.onTermination = { @Sendable reason in
task.cancel()
self.streamHolder.stream?.end()
Task {
await ffiStreamActor.endStream()
}
}
}
}
Expand Down Expand Up @@ -412,12 +433,15 @@ public actor Conversations {

public func streamAllGroupMessages() -> AsyncThrowingStream<DecodedMessage, Error> {
AsyncThrowingStream { continuation in
let ffiStreamActor = FfiStreamActor()
let task = Task {
self.streamHolder.stream = await self.client.v3Client?.conversations().streamAllMessages(
let stream = await self.client.v3Client?.conversations().streamAllMessages(
messageCallback: MessageCallback(client: self.client) { message in
guard !Task.isCancelled else {
continuation.finish()
self.streamHolder.stream?.end() // End the stream upon cancellation
Task {
await ffiStreamActor.endStream() // End the stream upon cancellation
}
return
}
do {
Expand All @@ -427,24 +451,26 @@ public actor Conversations {
}
}
)
await ffiStreamActor.setFfiStream(stream)
}

continuation.onTermination = { _ in
task.cancel()
self.streamHolder.stream?.end()
Task {
await ffiStreamActor.endStream()
}
}
}
}

public func streamAllMessages(includeGroups: Bool = false) -> AsyncThrowingStream<DecodedMessage, Error> {
AsyncThrowingStream<DecodedMessage, Error> { continuation in
@Sendable func forwardStreamToMerged(stream: AsyncThrowingStream<DecodedMessage, Error>) async {
@Sendable func forwardStreamToMerged(stream: AsyncThrowingStream<DecodedMessage, Error>) async {
do {
var iterator = stream.makeAsyncIterator()
while let element = try await iterator.next() {
guard !Task.isCancelled else {
continuation.finish()
self.streamHolder.stream?.end()
return
}
continuation.yield(element)
Expand All @@ -456,30 +482,31 @@ public actor Conversations {
}

let task = Task {
await forwardStreamToMerged(stream: streamAllV2Messages())
}

if includeGroups {
Task {
await forwardStreamToMerged(stream: streamAllGroupMessages())
}
await forwardStreamToMerged(stream: streamAllV2Messages())
}

let groupTask = includeGroups ? Task {
await forwardStreamToMerged(stream: streamAllGroupMessages())
} : nil

continuation.onTermination = { _ in
task.cancel()
self.streamHolder.stream?.end()
groupTask?.cancel()
}
}
}

public func streamAllGroupDecryptedMessages() -> AsyncThrowingStream<DecryptedMessage, Error> {
AsyncThrowingStream { continuation in
let ffiStreamActor = FfiStreamActor()
let task = Task {
self.streamHolder.stream = await self.client.v3Client?.conversations().streamAllMessages(
let stream = await self.client.v3Client?.conversations().streamAllMessages(
messageCallback: MessageCallback(client: self.client) { message in
guard !Task.isCancelled else {
continuation.finish()
self.streamHolder.stream?.end() // End the stream upon cancellation
Task {
await ffiStreamActor.endStream() // End the stream upon cancellation
}
return
}
do {
Expand All @@ -489,24 +516,26 @@ public actor Conversations {
}
}
)
await ffiStreamActor.setFfiStream(stream)
}

continuation.onTermination = { _ in
task.cancel()
self.streamHolder.stream?.end()
Task {
await ffiStreamActor.endStream()
}
}
}
}

public func streamAllDecryptedMessages(includeGroups: Bool = false) -> AsyncThrowingStream<DecryptedMessage, Error> {
AsyncThrowingStream<DecryptedMessage, Error> { continuation in
@Sendable func forwardStreamToMerged(stream: AsyncThrowingStream<DecryptedMessage, Error>) async {
@Sendable func forwardStreamToMerged(stream: AsyncThrowingStream<DecryptedMessage, Error>) async {
do {
var iterator = stream.makeAsyncIterator()
while let element = try await iterator.next() {
guard !Task.isCancelled else {
continuation.finish()
self.streamHolder.stream?.end()
return
}
continuation.yield(element)
Expand All @@ -520,16 +549,14 @@ public actor Conversations {
let task = Task {
await forwardStreamToMerged(stream: streamAllV2DecryptedMessages())
}

if includeGroups {
Task {
await forwardStreamToMerged(stream: streamAllGroupDecryptedMessages())
}
}

let groupTask = includeGroups ? Task {
await forwardStreamToMerged(stream: streamAllGroupDecryptedMessages())
} : nil

continuation.onTermination = { _ in
task.cancel()
self.streamHolder.stream?.end()
task.cancel()
groupTask?.cancel()
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion Tests/XMTPTests/GroupTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class GroupTests: XCTestCase {

try await group.sync()
let members = try await group.members.map(\.inboxId).sorted()
let peerMembers = try Conversation.group(group).peerAddresses.sorted()
let peerMembers = try await group.peerInboxIds.sorted()

XCTAssertEqual([fixtures.bobClient.inboxID, fixtures.aliceClient.inboxID].sorted(), members)
XCTAssertEqual([fixtures.bobClient.inboxID].sorted(), peerMembers)
Expand Down
2 changes: 1 addition & 1 deletion XMTP.podspec
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Pod::Spec.new do |spec|
#

spec.name = "XMTP"
spec.version = "0.14.17"
spec.version = "0.14.18"
spec.summary = "XMTP SDK Cocoapod"

# This description is used to generate tags and improve search results.
Expand Down

0 comments on commit 70502e2

Please sign in to comment.