Skip to content

Commit

Permalink
Merge pull request #622 from xmtp/rygine/stream-all-messages
Browse files Browse the repository at this point in the history
Add streamAllMessages to Conversations
  • Loading branch information
rygine authored Jun 12, 2024
2 parents 385ca91 + 6dd6a0e commit fcf2b2a
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 16 deletions.
5 changes: 5 additions & 0 deletions .changeset/hip-plums-run.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@xmtp/mls-client": patch
---

Add streamAllMessages to Conversations
2 changes: 1 addition & 1 deletion packages/mls-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
"typecheck": "tsc"
},
"dependencies": {
"@xmtp/mls-client-bindings-node": "^0.0.3",
"@xmtp/mls-client-bindings-node": "^0.0.4",
"@xmtp/proto": "^3.61.1",
"@xmtp/xmtp-js": "^11.6.2"
},
Expand Down
4 changes: 2 additions & 2 deletions packages/mls-client/src/Conversation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ export class Conversation {

stream() {
const asyncStream = new AsyncStream<NapiMessage, DecodedMessage>(
(message) => new DecodedMessage(this.#client, this, message)
(message) => new DecodedMessage(this.#client, message)
)
const stream = this.#group.stream(asyncStream.callback)
asyncStream.stopCallback = stream.end.bind(stream)
Expand Down Expand Up @@ -126,6 +126,6 @@ export class Conversation {
messages(options?: NapiListMessagesOptions): DecodedMessage[] {
return this.#group
.findMessages(options)
.map((message) => new DecodedMessage(this.#client, this, message))
.map((message) => new DecodedMessage(this.#client, message))
}
}
13 changes: 13 additions & 0 deletions packages/mls-client/src/Conversations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ import type {
NapiConversations,
NapiGroup,
NapiListMessagesOptions,
NapiMessage,
} from '@xmtp/mls-client-bindings-node'
import { AsyncStream } from '@/AsyncStream'
import type { Client } from '@/Client'
import { Conversation } from '@/Conversation'
import { DecodedMessage } from '@/DecodedMessage'

export class Conversations {
#client: Client
Expand Down Expand Up @@ -45,4 +47,15 @@ export class Conversations {
asyncStream.stopCallback = stream.end.bind(stream)
return asyncStream
}

async streamAllMessages() {
// sync conversations first
await this.sync()
const asyncStream = new AsyncStream<NapiMessage, DecodedMessage>(
(message) => new DecodedMessage(this.#client, message)
)
const stream = this.#conversations.streamAllMessages(asyncStream.callback)
asyncStream.stopCallback = stream.end.bind(stream)
return asyncStream
}
}
9 changes: 1 addition & 8 deletions packages/mls-client/src/DecodedMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import {
} from '@xmtp/mls-client-bindings-node'
import { ContentTypeId } from '@xmtp/xmtp-js'
import type { Client } from '@/Client'
import type { Conversation } from '@/Conversation'
import { nsToDate } from '@/helpers/date'

export type MessageKind = 'application' | 'membership_change'
Expand All @@ -15,7 +14,6 @@ export class DecodedMessage {
#client: Client
content: any
contentType: ContentTypeId
conversation: Conversation
conversationId: string
deliveryStatus: MessageDeliveryStatus
fallback?: string
Expand All @@ -27,13 +25,8 @@ export class DecodedMessage {
sentAt: Date
sentAtNs: number

constructor(
client: Client,
conversation: Conversation,
message: NapiMessage
) {
constructor(client: Client, message: NapiMessage) {
this.#client = client
this.conversation = conversation
this.id = message.id
this.sentAtNs = message.sentAtNs
this.sentAt = nsToDate(message.sentAtNs)
Expand Down
38 changes: 38 additions & 0 deletions packages/mls-client/test/Conversations.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { ContentTypeText } from '@xmtp/xmtp-js'
import { describe, expect, it } from 'vitest'
import { createRegisteredClient, createUser } from '@test/helpers'

Expand Down Expand Up @@ -75,4 +76,41 @@ describe('Conversations', () => {
}
stream.stop()
})

it('should stream all messages', async () => {
const user1 = createUser()
const user2 = createUser()
const user3 = createUser()
const client1 = await createRegisteredClient(user1)
const client2 = await createRegisteredClient(user2)
const client3 = await createRegisteredClient(user3)
await client1.conversations.newConversation([user2.account.address])
await client1.conversations.newConversation([user3.account.address])

const stream = await client1.conversations.streamAllMessages()

await client2.conversations.sync()
const groups2 = await client2.conversations.list()

await client3.conversations.sync()
const groups3 = await client3.conversations.list()

await groups2[0].send('gm!', ContentTypeText)
await groups3[0].send('gm2!', ContentTypeText)

let count = 0

for await (const message of stream) {
count++
expect(message).toBeDefined()
if (count === 1) {
expect(message!.senderInboxId).toBe(client2.inboxId)
}
if (count === 2) {
expect(message!.senderInboxId).toBe(client3.inboxId)
break
}
}
stream.stop()
})
})
10 changes: 5 additions & 5 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2863,10 +2863,10 @@ __metadata:
languageName: node
linkType: hard

"@xmtp/mls-client-bindings-node@npm:^0.0.3":
version: 0.0.3
resolution: "@xmtp/mls-client-bindings-node@npm:0.0.3"
checksum: 10/3e1d578e462965d992b23020c69619231af5022b2f015ec67187f0fabb2b657ace68f013df65ed057b2716801099aa6f7679cd86818b90168de7ef51abb72be1
"@xmtp/mls-client-bindings-node@npm:^0.0.4":
version: 0.0.4
resolution: "@xmtp/mls-client-bindings-node@npm:0.0.4"
checksum: 10/508839e57a7126f8f2d9898c62c117cd2626279c244b57c2a257bb7681755e19bde985df4a666c61b665b7bb23b9aa8d784527c601797845271c1cf61af26808
languageName: node
linkType: hard

Expand All @@ -2881,7 +2881,7 @@ __metadata:
"@typescript-eslint/eslint-plugin": "npm:^7.8.0"
"@typescript-eslint/parser": "npm:^7.8.0"
"@vitest/coverage-v8": "npm:^1.6.0"
"@xmtp/mls-client-bindings-node": "npm:^0.0.3"
"@xmtp/mls-client-bindings-node": "npm:^0.0.4"
"@xmtp/proto": "npm:^3.61.1"
"@xmtp/xmtp-js": "npm:^11.6.2"
eslint: "npm:^8.57.0"
Expand Down

0 comments on commit fcf2b2a

Please sign in to comment.