Skip to content

Commit

Permalink
add back the dual sending parts
Browse files Browse the repository at this point in the history
  • Loading branch information
nplasterer committed Oct 20, 2024
1 parent 4c42f42 commit 0a8c40e
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,16 @@ class V3ClientTest {
val group =
runBlocking { boV3Client.conversations.newGroup(listOf(caroV2V3.walletAddress)) }
assertEquals(runBlocking { boV3Client.conversations.listConversations().size }, 2)
assertEquals(runBlocking { boV3Client.conversations.list(includeGroups = true).size }, 2)
assertEquals(runBlocking { boV3Client.conversations.listDms().size }, 1)
assertEquals(runBlocking { boV3Client.conversations.listGroups().size }, 1)

runBlocking { caroV2V3Client.conversations.syncConversations() }
assertEquals(
runBlocking { caroV2V3Client.conversations.list(includeGroups = true).size },
1
2
)
assertEquals(runBlocking { caroV2V3Client.conversations.listDms().size }, 1)
assertEquals(runBlocking { caroV2V3Client.conversations.listGroups().size }, 1)
}

Expand Down Expand Up @@ -170,8 +172,8 @@ class V3ClientTest {

@Test
fun testsCanSendMessagesToDm() {
var boDm =
runBlocking { boV3Client.conversations.findOrCreateDm(caroV2V3.walletAddress) }
val boDm =
runBlocking { boV3Client.conversations.newConversation(caroV2V3.walletAddress) }
runBlocking { boDm.send("howdy") }
var messageId = runBlocking { boDm.send("gm") }
var boDmMessage = runBlocking { boDm.messages() }
Expand All @@ -181,24 +183,27 @@ class V3ClientTest {
assertEquals(boDmMessage.size, 3)

runBlocking { caroV2V3Client.conversations.syncConversations() }
val caroDm = runBlocking { caroV2V3Client.findDm(boV3.walletAddress) }
runBlocking { caroDm!!.sync() }
var caroDmMessage = runBlocking { caroDm!!.messages() }
var sameDm = runBlocking { caroV2V3Client.conversations.list().last() }
runBlocking { sameDm.sync() }
var caroDmMessage = runBlocking { sameDm.messages() }
assertEquals(caroDmMessage.size, 2)
assertEquals(caroDmMessage.first().body, "gm")

runBlocking { caroDm!!.send("howdy") }
messageId = runBlocking { caroDm!!.send("gm") }
caroDmMessage = runBlocking { caroDm!!.messages() }
// Do the inverse
val caroDm =
runBlocking { caroV2V3Client.conversations.newConversation(boV3.walletAddress) }
runBlocking { caroDm.send("howdy") }
messageId = runBlocking { caroDm.send("gm") }
caroDmMessage = runBlocking { caroDm.messages() }
assertEquals(caroDmMessage.first().body, "gm")
assertEquals(caroDmMessage.first().id, messageId)
assertEquals(caroDmMessage.first().deliveryStatus, MessageDeliveryStatus.PUBLISHED)
assertEquals(caroDmMessage.size, 4)

runBlocking { boV3Client.conversations.syncConversations() }
boDm = runBlocking { boV3Client.findDm(caroV2V3.walletAddress)!! }
runBlocking { boDm.sync() }
boDmMessage = runBlocking { boDm.messages() }
sameDm = runBlocking { boV3Client.conversations.list().last() }
runBlocking { sameDm.sync() }
boDmMessage = runBlocking { sameDm.messages() }
assertEquals(boDmMessage.size, 5)
assertEquals(boDmMessage.first().body, "gm")
}
Expand Down Expand Up @@ -255,7 +260,7 @@ class V3ClientTest {
val group =
runBlocking { caroV2V3Client.conversations.newGroup(listOf(boV3.walletAddress)) }
val conversation =
runBlocking { boV3Client.conversations.findOrCreateDm(caroV2V3.walletAddress) }
runBlocking { boV3Client.conversations.newConversation(caroV2V3.walletAddress) }
runBlocking { boV3Client.conversations.syncConversations() }

val allMessages = mutableListOf<DecodedMessage>()
Expand Down Expand Up @@ -326,7 +331,7 @@ class V3ClientTest {
runBlocking {
caroV2V3Client.conversations.newGroup(listOf(boV3.walletAddress))
Thread.sleep(1000)
boV3Client.conversations.findOrCreateDm(caroV2V3.walletAddress)
boV3Client.conversations.newConversation(caroV2V3.walletAddress)
}

Thread.sleep(2000)
Expand All @@ -340,6 +345,8 @@ class V3ClientTest {
runBlocking { boV3Client.conversations.newGroup(listOf(caroV2V3.walletAddress)) }
val conversation =
runBlocking { alixV2Client.conversations.newConversation(caroV2V3.walletAddress) }
val dm =
runBlocking { boV3Client.conversations.newConversation(caroV2V3.walletAddress) }
runBlocking { caroV2V3Client.conversations.syncConversations() }

val allMessages = mutableListOf<DecodedMessage>()
Expand All @@ -357,9 +364,10 @@ class V3ClientTest {
runBlocking {
group.send("hi")
conversation.send("hi")
dm.send("hi")
}
Thread.sleep(1000)
assertEquals(2, allMessages.size)
assertEquals(3, allMessages.size)
job.cancel()
}

Expand All @@ -380,12 +388,13 @@ class V3ClientTest {

runBlocking {
alixV2Client.conversations.newConversation(caroV2V3.walletAddress)
Thread.sleep(1000)
boV3Client.conversations.newGroup(listOf(caroV2V3.walletAddress))
boV3Client.conversations.newConversation(caroV2V3.walletAddress)
Thread.sleep(1000)
}

Thread.sleep(2000)
assertEquals(2, allMessages.size)
assertEquals(3, allMessages.size)
job.cancel()
}
}
136 changes: 100 additions & 36 deletions library/src/main/java/org/xmtp/android/library/Conversations.kt
Original file line number Diff line number Diff line change
Expand Up @@ -373,39 +373,63 @@ data class Conversations(
* @return The list of [Conversation] that the current [Client] has.
*/
suspend fun list(includeGroups: Boolean = false): List<Conversation> {
val newConversations = mutableListOf<Conversation>()
val mostRecent = conversationsByTopic.values.maxOfOrNull { it.createdAt }
val pagination = Pagination(after = mostRecent)
val seenPeers = listIntroductionPeers(pagination = pagination)
for ((peerAddress, sentAt) in seenPeers) {
newConversations.add(
Conversation.V1(
ConversationV1(
client = client,
peerAddress = peerAddress,
sentAt = sentAt,
if (client.hasV2Client) {
val newConversations = mutableListOf<Conversation>()
val mostRecent = conversationsByTopic.values.maxOfOrNull { it.createdAt }
val pagination = Pagination(after = mostRecent)
val seenPeers = listIntroductionPeers(pagination = pagination)
for ((peerAddress, sentAt) in seenPeers) {
newConversations.add(
Conversation.V1(
ConversationV1(
client = client,
peerAddress = peerAddress,
sentAt = sentAt,
),
),
),
)
)
}
val invitations = listInvitations(pagination = pagination)
for (sealedInvitation in invitations) {
try {
val newConversation = Conversation.V2(conversation(sealedInvitation))
newConversations.add(newConversation)
val consentProof = newConversation.consentProof
if (consentProof != null) {
handleConsentProof(consentProof, newConversation.peerAddress)
}
} catch (e: Exception) {
Log.d(TAG, e.message.toString())
}
}

conversationsByTopic += newConversations.filter {
it.peerAddress != client.address && Topic.isValidTopic(it.topic)
}.map { Pair(it.topic, it) }
}
val invitations = listInvitations(pagination = pagination)
for (sealedInvitation in invitations) {
try {
val newConversation = Conversation.V2(conversation(sealedInvitation))
newConversations.add(newConversation)
val consentProof = newConversation.consentProof
if (consentProof != null) {
handleConsentProof(consentProof, newConversation.peerAddress)
if (client.v3Client != null) {
syncConversations()
val dms = listDms().map { Conversation.Dm(it) }
if (!client.hasV2Client) {
conversationsByTopic =
dms.associate { it.topic to it as Conversation }.toMutableMap()
} else {
// TODO: Handle when there is a V3 and V2 conversation (needs api)
try {
conversationsByTopic.putAll(
dms.filter { dm ->
conversationsByTopic.values.none { existing ->
val existingInboxId =
client.inboxIdFromAddress(existing.peerAddress)?.lowercase()
existingInboxId != null && existingInboxId == dm.peerAddress.lowercase()
}
}.associateBy { dm -> dm.topic }
)
} catch (e: Exception) {
}
} catch (e: Exception) {
Log.d(TAG, e.message.toString())
}
}

conversationsByTopic += newConversations.filter {
it.peerAddress != client.address && Topic.isValidTopic(it.topic)
}.map { Pair(it.topic, it) }

if (includeGroups) {
syncConversations()
val groups = listGroups()
Expand Down Expand Up @@ -506,15 +530,30 @@ data class Conversations(
}

fun streamAll(): Flow<Conversation> {
return merge(streamGroupConversations(), stream())
if (!client.hasV2Client) {
return streamConversations()
}
return merge(streamConversations(), stream())
}

fun streamConversations(): Flow<Conversation> = callbackFlow {
if (client.hasV2Client) throw XMTPException("Only supported for V3 only clients.")
val conversationCallback = object : FfiConversationCallback {
override fun onConversation(conversation: FfiConversation) {
if (conversation.groupMetadata().conversationType() == "dm") {
trySend(Conversation.Dm(Dm(client, conversation)))
// TODO: Need APIs to better match on existing V2 conversations
launch {
val dm = Dm(client, conversation)
val matchingConversations = conversationsByTopic.filter {
client.inboxIdFromAddress(it.value.peerAddress)?.lowercase() ==
dm.peerInboxId().lowercase()
}

// Only send the DM if no V2 DM exists
if (matchingConversations.none { it.value.version == Conversation.Version.V2 }) {
trySend(Conversation.Dm(dm)).isSuccess
}
}
} else {
trySend(Conversation.Group(Group(client, conversation)))
}
Expand All @@ -539,16 +578,20 @@ data class Conversations(
}

fun streamAllMessages(includeGroups: Boolean = false): Flow<DecodedMessage> {
return if (includeGroups) {
merge(streamAllV2Messages(), streamAllGroupMessages())
return if (includeGroups && !client.hasV2Client) {
streamAllConversationMessages()
} else if (includeGroups) {
merge(streamAllV2Messages(), streamAllConversationMessages())
} else {
streamAllV2Messages()
}
}

fun streamAllDecryptedMessages(includeGroups: Boolean = false): Flow<DecryptedMessage> {
return if (includeGroups) {
merge(streamAllV2DecryptedMessages(), streamAllGroupDecryptedMessages())
return if (includeGroups && !client.hasV2Client) {
streamAllConversationDecryptedMessages()
} else if (includeGroups) {
merge(streamAllV2DecryptedMessages(), streamAllConversationDecryptedMessages())
} else {
streamAllV2DecryptedMessages()
}
Expand Down Expand Up @@ -590,7 +633,18 @@ data class Conversations(
val decodedMessage = MessageV3(client, message).decodeOrNull()
when (conversation?.version) {
Conversation.Version.DM -> {
decodedMessage?.let { trySend(it) }
// TODO: Need api to better match on V2 conversations
launch {
val matchingConversations = conversationsByTopic.filter {
client.inboxIdFromAddress(it.value.peerAddress)?.lowercase() ==
conversation.peerAddress
}

// If there is no V2 conversation, send the decoded message
if (matchingConversations.none { it.value.version == Conversation.Version.V2 }) {
decodedMessage?.let { trySend(it) }
}
}
}

else -> {
Expand All @@ -615,8 +669,18 @@ data class Conversations(

when (conversation?.version) {
Conversation.Version.DM -> {
decryptedMessage?.let { trySend(it) }
}
// TODO: Need api to better match on V2 conversations
launch {
val matchingConversations = conversationsByTopic.filter {
client.inboxIdFromAddress(it.value.peerAddress)?.lowercase() ==
conversation.peerAddress
}

// If there is no V2 conversation, send the decoded message
if (matchingConversations.none { it.value.version == Conversation.Version.V2 }) {
decryptedMessage?.let { trySend(it) }
}
} }

else -> {
decryptedMessage?.let { trySend(it) }
Expand Down

0 comments on commit 0a8c40e

Please sign in to comment.