Skip to content

Commit

Permalink
Merge pull request #625 from xmtp/rygine/mls-client-updates
Browse files Browse the repository at this point in the history
Add streaming callbacks
  • Loading branch information
rygine authored Jun 13, 2024
2 parents 58e0fae + 961a013 commit 6a90268
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 52 deletions.
5 changes: 5 additions & 0 deletions .changeset/moody-dots-taste.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@xmtp/mls-client": patch
---

Add streaming callbacks
5 changes: 3 additions & 2 deletions packages/mls-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@
"typecheck": "tsc"
},
"dependencies": {
"@xmtp/content-type-primitives": "^1.0.1",
"@xmtp/mls-client-bindings-node": "^0.0.4",
"@xmtp/proto": "^3.61.1",
"@xmtp/xmtp-js": "^11.6.2"
"@xmtp/proto": "^3.61.1"
},
"devDependencies": {
"@ianvs/prettier-plugin-sort-imports": "^4.2.1",
Expand All @@ -64,6 +64,7 @@
"@typescript-eslint/eslint-plugin": "^7.8.0",
"@typescript-eslint/parser": "^7.8.0",
"@vitest/coverage-v8": "^1.6.0",
"@xmtp/xmtp-js": "workspace:^",
"eslint": "^8.57.0",
"eslint-config-prettier": "^9.1.0",
"eslint-config-standard": "^17.1.0",
Expand Down
40 changes: 16 additions & 24 deletions packages/mls-client/src/AsyncStream.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,30 @@
type Value<T, V> = V extends undefined ? T : V

type ResolveValue<T, V> = {
value: Value<T, V> | undefined
type ResolveValue<T> = {
value: T | undefined
done: boolean
}

type ResolveNext<T, V> = (resolveValue: ResolveValue<T, V>) => void
type ResolveNext<T> = (resolveValue: ResolveValue<T>) => void

type TransformValue<T, V> = (value: T) => Value<T, V>
export type StreamCallback<T> = (err: Error | null, value: T) => void

export class AsyncStream<T, V = undefined> {
export class AsyncStream<T> {
#done = false
#resolveNext: ResolveNext<T, V> | null
#queue: Value<T, V>[]
#transformValue?: TransformValue<T, V>
#resolveNext: ResolveNext<T> | null
#queue: T[]

stopCallback: (() => void) | undefined = undefined

constructor(
transformValue: V extends undefined ? undefined : TransformValue<T, V>
) {
constructor() {
this.#queue = []
this.#resolveNext = null
this.#done = false
this.#transformValue = transformValue
}

callback = (err: Error | null, value: T) => {
get isDone() {
return this.#done
}

callback: StreamCallback<T> = (err, value) => {
if (err) {
console.error('stream error', err)
this.stop()
Expand All @@ -37,17 +35,11 @@ export class AsyncStream<T, V = undefined> {
return
}

const newValue = this.#transformValue
? this.#transformValue(value)
: // must assert type because TypeScript can't infer that T is assignable
// to Value<T, V> when this.#transformValue is undefined
(value as unknown as Value<T, V>)

if (this.#resolveNext) {
this.#resolveNext({ value: newValue, done: false })
this.#resolveNext({ value, done: false })
this.#resolveNext = null
} else {
this.#queue.push(newValue)
this.#queue.push(value)
}
}

Expand All @@ -59,7 +51,7 @@ export class AsyncStream<T, V = undefined> {
this.stopCallback?.()
}

next = (): Promise<ResolveValue<T, V>> => {
next = (): Promise<ResolveValue<T>> => {
if (this.#queue.length > 0) {
return Promise.resolve({ value: this.#queue.shift(), done: false })
} else if (this.#done) {
Expand Down
18 changes: 11 additions & 7 deletions packages/mls-client/src/Conversation.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import type {
NapiGroup,
NapiListMessagesOptions,
NapiMessage,
} from '@xmtp/mls-client-bindings-node'
import type { ContentTypeId } from '@xmtp/xmtp-js'
import { AsyncStream } from '@/AsyncStream'
import { AsyncStream, type StreamCallback } from '@/AsyncStream'
import type { Client } from '@/Client'
import { DecodedMessage } from '@/DecodedMessage'
import { nsToDate } from '@/helpers/date'
Expand Down Expand Up @@ -78,12 +77,17 @@ export class Conversation {
return this.#group.sync()
}

stream() {
const asyncStream = new AsyncStream<NapiMessage, DecodedMessage>(
(message) => new DecodedMessage(this.#client, message)
)
const stream = this.#group.stream(asyncStream.callback)
stream(callback?: StreamCallback<DecodedMessage>) {
const asyncStream = new AsyncStream<DecodedMessage>()

const stream = this.#group.stream((err, message) => {
const decodedMessage = new DecodedMessage(this.#client, message)
asyncStream.callback(err, decodedMessage)
callback?.(err, decodedMessage)
})

asyncStream.stopCallback = stream.end.bind(stream)

return asyncStream
}

Expand Down
52 changes: 37 additions & 15 deletions packages/mls-client/src/Conversations.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
import type {
GroupPermissions,
NapiConversations,
NapiGroup,
NapiListMessagesOptions,
NapiMessage,
} from '@xmtp/mls-client-bindings-node'
import { AsyncStream } from '@/AsyncStream'
import { AsyncStream, type StreamCallback } from '@/AsyncStream'
import type { Client } from '@/Client'
import { Conversation } from '@/Conversation'
import { DecodedMessage } from '@/DecodedMessage'

export class Conversations {
#client: Client
#conversations: NapiConversations
#map: Map<string, Conversation>

constructor(client: Client, conversations: NapiConversations) {
this.#client = client
this.#conversations = conversations
this.#map = new Map()
}

get(id: string) {
return this.#map.get(id)
}

async newConversation(
Expand All @@ -27,35 +31,53 @@ export class Conversations {
accountAddresses,
permissions
)
return new Conversation(this.#client, group)
const conversation = new Conversation(this.#client, group)
this.#map.set(conversation.id, conversation)
return conversation
}

async list(options?: NapiListMessagesOptions) {
const groups = await this.#conversations.list(options)
return groups.map((group) => new Conversation(this.#client, group))
return groups.map((group) => {
const conversation = new Conversation(this.#client, group)
this.#map.set(conversation.id, conversation)
return conversation
})
}

async sync() {
return this.#conversations.sync()
}

stream() {
const asyncStream = new AsyncStream<NapiGroup, Conversation>(
(group) => new Conversation(this.#client, group)
)
const stream = this.#conversations.stream(asyncStream.callback)
stream(callback?: StreamCallback<Conversation>) {
const asyncStream = new AsyncStream<Conversation>()

const stream = this.#conversations.stream((err, group) => {
const conversation = new Conversation(this.#client, group)
this.#map.set(conversation.id, conversation)
asyncStream.callback(err, conversation)
callback?.(err, conversation)
})

asyncStream.stopCallback = stream.end.bind(stream)

return asyncStream
}

async streamAllMessages() {
async streamAllMessages(callback?: StreamCallback<DecodedMessage>) {
// sync conversations first
await this.sync()
const asyncStream = new AsyncStream<NapiMessage, DecodedMessage>(
(message) => new DecodedMessage(this.#client, message)
)
const stream = this.#conversations.streamAllMessages(asyncStream.callback)

const asyncStream = new AsyncStream<DecodedMessage>()

const stream = this.#conversations.streamAllMessages((err, message) => {
const decodedMessage = new DecodedMessage(this.#client, message)
asyncStream.callback(err, decodedMessage)
callback?.(err, decodedMessage)
})

asyncStream.stopCallback = stream.end.bind(stream)

return asyncStream
}
}
4 changes: 2 additions & 2 deletions packages/mls-client/src/codecs/GroupUpdatedCodec.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { mlsTranscriptMessages } from '@xmtp/proto'
import {
ContentTypeId,
type ContentCodec,
type EncodedContent,
} from '@xmtp/xmtp-js'
} from '@xmtp/content-type-primitives'
import { mlsTranscriptMessages } from '@xmtp/proto'

export const ContentTypeGroupUpdated = new ContentTypeId({
authorityId: 'xmtp.org',
Expand Down
1 change: 1 addition & 0 deletions packages/mls-client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ export {
ContentTypeGroupUpdated,
GroupUpdatedCodec,
} from './codecs/GroupUpdatedCodec'
export type { StreamCallback } from './AsyncStream'
7 changes: 7 additions & 0 deletions packages/mls-client/test/Conversations.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ describe('Conversations', () => {
user2.account.address,
])
expect(conversation).toBeDefined()
expect(client1.conversations.get(conversation.id)?.id).toBe(conversation.id)
expect(conversation.id).toBeDefined()
expect(conversation.createdAt).toBeDefined()
expect(conversation.createdAtNs).toBeDefined()
Expand Down Expand Up @@ -75,6 +76,12 @@ describe('Conversations', () => {
}
}
stream.stop()
expect(client3.conversations.get(conversation1.id)?.id).toBe(
conversation1.id
)
expect(client3.conversations.get(conversation2.id)?.id).toBe(
conversation2.id
)
})

it('should stream all messages', async () => {
Expand Down
14 changes: 12 additions & 2 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2863,6 +2863,15 @@ __metadata:
languageName: node
linkType: hard

"@xmtp/content-type-primitives@npm:^1.0.1":
version: 1.0.1
resolution: "@xmtp/content-type-primitives@npm:1.0.1"
dependencies:
"@xmtp/proto": "npm:^3.61.1"
checksum: 10/656826cda74328e3079c7f5937eeb694260bd68a66090303fdf6abf4c54c8bbf924064eb6895b9e66addee1269779dfe1c3f0e836fcd8857f784c5645c7b7bf5
languageName: node
linkType: hard

"@xmtp/mls-client-bindings-node@npm:^0.0.4":
version: 0.0.4
resolution: "@xmtp/mls-client-bindings-node@npm:0.0.4"
Expand All @@ -2881,9 +2890,10 @@ __metadata:
"@typescript-eslint/eslint-plugin": "npm:^7.8.0"
"@typescript-eslint/parser": "npm:^7.8.0"
"@vitest/coverage-v8": "npm:^1.6.0"
"@xmtp/content-type-primitives": "npm:^1.0.1"
"@xmtp/mls-client-bindings-node": "npm:^0.0.4"
"@xmtp/proto": "npm:^3.61.1"
"@xmtp/xmtp-js": "npm:^11.6.2"
"@xmtp/xmtp-js": "workspace:^"
eslint: "npm:^8.57.0"
eslint-config-prettier: "npm:^9.1.0"
eslint-config-standard: "npm:^17.1.0"
Expand Down Expand Up @@ -2963,7 +2973,7 @@ __metadata:
languageName: node
linkType: hard

"@xmtp/xmtp-js@npm:^11.6.2, @xmtp/xmtp-js@workspace:packages/js-sdk":
"@xmtp/xmtp-js@workspace:^, @xmtp/xmtp-js@workspace:packages/js-sdk":
version: 0.0.0-use.local
resolution: "@xmtp/xmtp-js@workspace:packages/js-sdk"
dependencies:
Expand Down

0 comments on commit 6a90268

Please sign in to comment.