Skip to content

Commit

Permalink
Merge pull request #45 from xmtp/daniel-batch-message-listing
Browse files Browse the repository at this point in the history
feat: support listBatchMessages (placeholder sequential)
  • Loading branch information
dmccartney authored May 26, 2023
2 parents 1d0c147 + 194cf95 commit 92913e7
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class XMTPModule : Module() {
private var clients: MutableMap<String, Client> = mutableMapOf()
private var xmtpPush: XMTPPush? = null
private var signer: ReactNativeSigner? = null
private val isDebugEnabled = BuildConfig.DEBUG; // TODO: consider making this configurable
private val conversations: MutableMap<String, Conversation> = mutableMapOf()
private val subscriptions: MutableMap<String, Job> = mutableMapOf()

Expand All @@ -96,6 +97,7 @@ class XMTPModule : Module() {
Events("sign", "authed", "conversation", "message")

Function("address") { clientAddress: String ->
logV( "address");
val client = clients[clientAddress]
client?.address ?: "No Client."
}
Expand All @@ -104,6 +106,7 @@ class XMTPModule : Module() {
// Auth functions
//
AsyncFunction("auth") { address: String, environment: String ->
logV( "auth");
val reactSigner = ReactNativeSigner(module = this@XMTPModule, address = address)
signer = reactSigner
val options =
Expand All @@ -114,11 +117,13 @@ class XMTPModule : Module() {
}

Function("receiveSignature") { requestID: String, signature: String ->
logV( "receiveSignature");
signer?.handle(id = requestID, signature = signature)
}

// Generate a random wallet and set the client to that
AsyncFunction("createRandom") { environment: String ->
logV( "createRandom");
val privateKey = PrivateKeyBuilder()
val options =
ClientOptions(api = apiEnvironments[environment] ?: apiEnvironments["dev"]!!)
Expand All @@ -130,12 +135,14 @@ class XMTPModule : Module() {
//
// Client API
AsyncFunction("canMessage") { clientAddress: String, peerAddress: String ->
logV( "canMessage");
val client = clients[clientAddress] ?: throw XMTPException("No client")

client.canMessage(peerAddress)
}

AsyncFunction("listConversations") { clientAddress: String ->
logV( "listConversations");
val client = clients[clientAddress] ?: throw XMTPException("No client")
val conversationList = client.conversations.list()
conversationList.map { conversation ->
Expand All @@ -144,23 +151,29 @@ class XMTPModule : Module() {
}
}

AsyncFunction("loadMessages") { clientAddress: String, conversationTopic: String, conversationID: String?, limit: Int?, before: Long?, after: Long? ->
val conversation =
AsyncFunction("loadMessages") { clientAddress: String, topics: List<String>, conversationIDs: List<String?>, limit: Int?, before: Long?, after: Long? ->
logV( "loadMessages");
// TODO: use batchQuery instead of one-at-a-time (once Android SDK supports it).
val beforeDate = if (before != null) Date(before) else null
val afterDate = if (after != null) Date(after) else null
topics.zip(conversationIDs) { topic, conversationID ->
findConversation(
clientAddress = clientAddress,
topic = conversationTopic,
topic = topic,
conversationId = conversationID
)
?: throw XMTPException("no conversation found for $conversationTopic")
val beforeDate = if (before != null) Date(before) else null
val afterDate = if (after != null) Date(after) else null

conversation.messages(limit = limit, before = beforeDate, after = afterDate)
.map { DecodedMessageWrapper.encode(it) }
?: throw XMTPException("no conversation found for $topic")
}
.map {
it.messages(limit = limit, before = beforeDate, after = afterDate)
.map { DecodedMessageWrapper.encode(it) }
}
.flatten()
}

// TODO: Support content types
AsyncFunction("sendMessage") { clientAddress: String, conversationTopic: String, conversationID: String?, content: String ->
logV( "sendMessage");
val conversation =
findConversation(
clientAddress = clientAddress,
Expand All @@ -175,6 +188,7 @@ class XMTPModule : Module() {
}

AsyncFunction("createConversation") { clientAddress: String, peerAddress: String, conversationID: String? ->
logV( "createConversation");
val client = clients[clientAddress] ?: throw XMTPException("No client")

val conversation = client.conversations.newConversation(
Expand All @@ -187,10 +201,12 @@ class XMTPModule : Module() {
}

Function("subscribeToConversations") { clientAddress: String ->
logV( "subscribeToConversations");
subscribeToConversations(clientAddress = clientAddress)
}

AsyncFunction("subscribeToMessages") { clientAddress: String, topic: String, conversationID: String? ->
logV( "subscribeToMessages");
subscribeToMessages(
clientAddress = clientAddress,
topic = topic,
Expand All @@ -199,6 +215,7 @@ class XMTPModule : Module() {
}

AsyncFunction("unsubscribeFromMessages") { clientAddress: String, topic: String, conversationID: String? ->
logV( "unsubscribeFromMessages");
unsubscribeFromMessages(
clientAddress = clientAddress,
topic = topic,
Expand All @@ -207,11 +224,13 @@ class XMTPModule : Module() {
}

Function("registerPushToken") { pushServer: String, token: String ->
logV( "registerPushToken");
xmtpPush = XMTPPush(appContext.reactContext!!, pushServer)
xmtpPush?.register(token)
}

Function("subscribePushTopics") { topics: List<String> ->
logV( "subscribePushTopics");
if (topics.isNotEmpty()) {
if (xmtpPush == null) {
throw XMTPException("Push server not registered")
Expand All @@ -221,6 +240,7 @@ class XMTPModule : Module() {
}

AsyncFunction("decodeMessage") { clientAddress: String, topic: String, encryptedMessage: String, conversationID: String? ->
logV( "decodeMessage");
val encryptedMessageData = Base64.decode(encryptedMessage, Base64.NO_WRAP)
val envelope = EnvelopeBuilder.buildFromString(topic, Date(), encryptedMessageData)
val conversation =
Expand Down Expand Up @@ -328,6 +348,12 @@ class XMTPModule : Module() {
) ?: return
subscriptions[conversation.cacheKey(clientAddress)]?.cancel()
}

private fun logV(msg: String) {
if (isDebugEnabled) {
Log.v("XMTPModule", msg);
}
}
}


7 changes: 7 additions & 0 deletions example/src/ConversationListView.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@ export default function ConversationListView({
const client = route.params.client;

const [conversations, setConversations] = useState<Conversation[]>([]);
const [messageCount, setMessageCount] = useState<number>(0);

async function refreshConversations() {
const conversations = await client.conversations.list();
const allMessages = await client.listBatchMessages(
conversations.map((conversation) => conversation.topic),
conversations.map((conversation) => conversation.conversationID)
);
setConversations(conversations);
setMessageCount(allMessages.length);
}

useEffect(() => {
Expand Down Expand Up @@ -55,6 +61,7 @@ export default function ConversationListView({
}
>
<HomeHeaderView client={client} navigation={navigation} />
<Text style={{marginLeft: 12}}>{messageCount} messages in {conversations.length} conversations</Text>

{conversations.map((conversation) => {
return (
Expand Down
27 changes: 17 additions & 10 deletions ios/XMTPModule.swift
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,25 @@ public class XMTPModule: Module {
}
}

AsyncFunction("loadMessages") { (clientAddress: String, conversationTopic: String, conversationID: String?, limit: Int?, before: Double?, after: Double?) -> [String] in

guard let conversation = try await findConversation(clientAddress: clientAddress, topic: conversationTopic, conversationID: conversationID) else {
throw Error.conversationNotFound("no conversation found for \(conversationTopic)")
}
AsyncFunction("loadMessages") { (clientAddress: String, topics: [String], conversationIDs: [String?], limit: Int?, before: Double?, after: Double?) -> [String] in
let beforeDate = before != nil ? Date(timeIntervalSince1970: before!) : nil
let afterDate = after != nil ? Date(timeIntervalSince1970: after!) : nil

let messages = try await conversation.messages(limit: limit, before: beforeDate, after: afterDate).map { try DecodedMessageWrapper.encode($0) }

let client = clients[clientAddress]!

var messages:[String] = []
// TODO: use batchQuery instead of one-at-a-time (once iOS and libxmtp support it).
for (topic, conversationID) in zip(topics, conversationIDs) {
guard let conversation = try await findConversation(
clientAddress: clientAddress,
topic: topic,
conversationID: conversationID) else {
throw Error.conversationNotFound("no conversation found for \(topic)")
}
messages += try await conversation.messages(
limit: limit,
before: beforeDate,
after: afterDate)
.map { (msg) in try DecodedMessageWrapper.encode(msg) }
}
// print("found \(messages.count) messages from \(topics.count) conversations");
return messages
}

Expand Down
26 changes: 24 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,30 @@ export async function listMessages(
return (
await XMTPModule.loadMessages(
clientAddress,
conversationTopic,
conversationID,
[conversationTopic],
[conversationID],
limit,
before?.getTime,
after?.getTime
)
).map((json: string) => {
return JSON.parse(json);
});
}

export async function listBatchMessages(
clientAddress: string,
conversationTopics: string[],
conversationIDs: (string | undefined)[],
limit?: number | undefined,
before?: Date | undefined,
after?: Date | undefined
): Promise<DecodedMessage[]> {
return (
await XMTPModule.loadMessages(
clientAddress,
conversationTopics,
conversationIDs,
limit,
before?.getTime,
after?.getTime
Expand Down
23 changes: 22 additions & 1 deletion src/lib/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ export class Client {
const address = await signer.getAddress();
resolve(new Client(address));
});

XMTPModule.auth(await signer.getAddress(), environment);
})();
});
Expand All @@ -59,4 +58,26 @@ export class Client {
this.address = address;
this.conversations = new Conversations(this);
}

async listBatchMessages(
topics: string[],
conversationIDs: (string | undefined)[],
limit?: number | undefined,
before?: Date | undefined,
after?: Date | undefined
): Promise<XMTPModule.DecodedMessage[]> {
try {
return await XMTPModule.listBatchMessages(
this.address,
topics,
conversationIDs,
limit,
before,
after
);
} catch (e) {
console.info("ERROR in listBatchMessages", e);
return [];
}
}
}
2 changes: 2 additions & 0 deletions src/lib/DecodedMessage.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
export type DecodedMessage = {
id: string;
// TODO:
// topic: string;
content: any;
senderAddress: string;
sent: Date;
Expand Down

0 comments on commit 92913e7

Please sign in to comment.