Skip to content

Commit

Permalink
Move from getMessages to get before/after/around
Browse files Browse the repository at this point in the history
  • Loading branch information
singpolyma committed Sep 25, 2024
1 parent 9d708b4 commit 2bcc37e
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 59 deletions.
142 changes: 102 additions & 40 deletions snikket/Chat.hx
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,48 @@ abstract class Chat {
String in format YYYY-MM-DDThh:mm:ss[.sss]+00:00
@param handler takes one argument, an array of ChatMessage that are found
**/
abstract public function getMessages(beforeId:Null<String>, beforeTime:Null<String>, handler:(Array<ChatMessage>)->Void):Void;
abstract public function getMessagesBefore(beforeId:Null<String>, beforeTime:Null<String>, handler:(Array<ChatMessage>)->Void):Void;

/**
Fetch a page of messages after some point
@param afterId id of the message to look after
@param afterTime timestamp of the message to look after,
String in format YYYY-MM-DDThh:mm:ss[.sss]+00:00
@param handler takes one argument, an array of ChatMessage that are found
**/
abstract public function getMessagesAfter(afterId:Null<String>, afterTime:Null<String>, handler:(Array<ChatMessage>)->Void):Void;

/**
Fetch a page of messages around (before, including, and after) some point
@param aroundId id of the message to look around
@param aroundTime timestamp of the message to look around,
String in format YYYY-MM-DDThh:mm:ss[.sss]+00:00
@param handler takes one argument, an array of ChatMessage that are found
**/
abstract public function getMessagesAround(aroundId:Null<String>, aroundTime:Null<String>, handler:(Array<ChatMessage>)->Void):Void;

private function fetchFromSync(sync: MessageSync, callback: (Array<ChatMessage>)->Void) {
final chatMessages = [];
sync.onMessages((messageList) -> {
final chatMessages = [];
for (m in messageList.messages) {
switch (m) {
case ChatMessageStanza(message):
final chatMessage = prepareIncomingMessage(message, new Stanza("message", { from: message.senderId() }));
persistence.storeMessage(client.accountId(), chatMessage, (m)->{});
if (message.chatId() == chatId) chatMessages.push(message);
case ReactionUpdateStanza(update):
persistence.storeReaction(client.accountId(), update, (m)->{});
default:
// ignore
}
}
callback(chatMessages);
});
sync.fetchNext();
}

/**
Send a ChatMessage to this Chat
Expand Down Expand Up @@ -470,7 +511,7 @@ abstract class Chat {
readUpToId = upTo;
readUpToBy = upToBy;
persistence.storeChat(client.accountId(), this);
persistence.getMessages(client.accountId(), chatId, null, null, (messages) -> {
persistence.getMessagesBefore(client.accountId(), chatId, null, null, (messages) -> {
var i = messages.length;
while (--i >= 0) {
if (messages[i].serverId == readUpToId) break;
Expand Down Expand Up @@ -549,30 +590,41 @@ class DirectChat extends Chat {
}

@HaxeCBridge.noemit // on superclass as abstract
public function getMessages(beforeId:Null<String>, beforeTime:Null<String>, handler:(Array<ChatMessage>)->Void):Void {
persistence.getMessages(client.accountId(), chatId, beforeId, beforeTime, (messages) -> {
public function getMessagesBefore(beforeId:Null<String>, beforeTime:Null<String>, handler:(Array<ChatMessage>)->Void):Void {
persistence.getMessagesBefore(client.accountId(), chatId, beforeId, beforeTime, (messages) -> {
if (messages.length > 0) {
handler(messages);
} else {
var filter:MAMQueryParams = { with: this.chatId };
if (beforeId != null) filter.page = { before: beforeId };
var sync = new MessageSync(this.client, this.stream, filter);
sync.onMessages((messageList) -> {
final chatMessages = [];
for (m in messageList.messages) {
switch (m) {
case ChatMessageStanza(message):
persistence.storeMessage(client.accountId(), message, (m)->{});
if (message.chatId() == chatId) chatMessages.push(message);
case ReactionUpdateStanza(update):
persistence.storeReaction(client.accountId(), update, (m)->{});
default:
// ignore
}
}
handler(chatMessages);
});
sync.fetchNext();
var sync = new MessageSync(this.client, this.stream, filter);
fetchFromSync(sync, handler);
}
});
}

@HaxeCBridge.noemit // on superclass as abstract
public function getMessagesAfter(afterId:Null<String>, afterTime:Null<String>, handler:(Array<ChatMessage>)->Void):Void {
persistence.getMessagesAfter(client.accountId(), chatId, afterId, afterTime, (messages) -> {
if (messages.length > 0) {
handler(messages);
} else {
var filter:MAMQueryParams = { with: this.chatId };
if (afterId != null) filter.page = { after: afterId };
var sync = new MessageSync(this.client, this.stream, filter);
fetchFromSync(sync, handler);
}
});
}

@HaxeCBridge.noemit // on superclass as abstract
public function getMessagesAround(aroundId:Null<String>, aroundTime:Null<String>, handler:(Array<ChatMessage>)->Void):Void {
persistence.getMessagesAround(client.accountId(), chatId, aroundId, aroundTime, (messages) -> {
if (messages.length > 0) {
handler(messages);
} else {
// TODO
handler([]);
}
});
}
Expand Down Expand Up @@ -951,31 +1003,41 @@ class Channel extends Chat {
}

@HaxeCBridge.noemit // on superclass as abstract
public function getMessages(beforeId:Null<String>, beforeTime:Null<String>, handler:(Array<ChatMessage>)->Void):Void {
persistence.getMessages(client.accountId(), chatId, beforeId, beforeTime, (messages) -> {
public function getMessagesBefore(beforeId:Null<String>, beforeTime:Null<String>, handler:(Array<ChatMessage>)->Void):Void {
persistence.getMessagesBefore(client.accountId(), chatId, beforeId, beforeTime, (messages) -> {
if (messages.length > 0) {
handler(messages);
} else {
var filter:MAMQueryParams = {};
if (beforeId != null) filter.page = { before: beforeId };
var sync = new MessageSync(this.client, this.stream, filter, chatId);
sync.onMessages((messageList) -> {
final chatMessages = [];
for (m in messageList.messages) {
switch (m) {
case ChatMessageStanza(message):
final chatMessage = prepareIncomingMessage(message, new Stanza("message", { from: message.senderId() }));
persistence.storeMessage(client.accountId(), chatMessage, (m)->{});
if (message.chatId() == chatId) chatMessages.push(message);
case ReactionUpdateStanza(update):
persistence.storeReaction(client.accountId(), update, (m)->{});
default:
// ignore
}
}
handler(chatMessages);
});
sync.fetchNext();
fetchFromSync(sync, handler);
}
});
}

@HaxeCBridge.noemit // on superclass as abstract
public function getMessagesAfter(afterId:Null<String>, afterTime:Null<String>, handler:(Array<ChatMessage>)->Void):Void {
persistence.getMessagesAfter(client.accountId(), chatId, afterId, afterTime, (messages) -> {
if (messages.length > 0) {
handler(messages);
} else {
var filter:MAMQueryParams = {};
if (afterId != null) filter.page = { after: afterId };
var sync = new MessageSync(this.client, this.stream, filter, chatId);
fetchFromSync(sync, handler);
}
});
}

@HaxeCBridge.noemit // on superclass as abstract
public function getMessagesAround(aroundId:Null<String>, aroundTime:Null<String>, handler:(Array<ChatMessage>)->Void):Void {
persistence.getMessagesAround(client.accountId(), chatId, aroundId, aroundTime, (messages) -> {
if (messages.length > 0) {
handler(messages);
} else {
// TODO
handler([]);
}
});
}
Expand Down
2 changes: 1 addition & 1 deletion snikket/MessageSync.hx
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class MessageSync {
}
final messages:Array<MessageStanza> = [];
if (lastPage == null) {
if (newestPageFirst == true && (filter.page == null || filter.page.before == null)) {
if (newestPageFirst == true && (filter.page == null || (filter.page.before == null && filter.page.after == null))) {
if (filter.page == null) filter.page = {};
filter.page.before = ""; // Request last page of results
}
Expand Down
4 changes: 3 additions & 1 deletion snikket/Persistence.hx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ interface Persistence {
public function storeReaction(accountId: String, update: ReactionUpdate, callback: (Null<ChatMessage>)->Void):Void;
public function storeMessage(accountId: String, message: ChatMessage, callback: (ChatMessage)->Void):Void;
public function updateMessageStatus(accountId: String, localId: String, status:MessageStatus, callback: (ChatMessage)->Void):Void;
public function getMessages(accountId: String, chatId: String, beforeId: Null<String>, beforeTime: Null<String>, callback: (messages:Array<ChatMessage>)->Void):Void;
public function getMessagesBefore(accountId: String, chatId: String, beforeId: Null<String>, beforeTime: Null<String>, callback: (messages:Array<ChatMessage>)->Void):Void;
public function getMessagesAfter(accountId: String, chatId: String, afterId: Null<String>, afterTime: Null<String>, callback: (messages:Array<ChatMessage>)->Void):Void;
public function getMessagesAround(accountId: String, chatId: String, aroundId: Null<String>, aroundTime: Null<String>, callback: (messages:Array<ChatMessage>)->Void):Void;
public function getMediaUri(hashAlgorithm:String, hash:BytesData, callback: (uri:Null<String>)->Void):Void;
public function storeMedia(mime:String, bytes:BytesData, callback: ()->Void):Void;
public function storeCaps(caps:Caps):Void;
Expand Down
20 changes: 15 additions & 5 deletions snikket/persistence/Custom.hx
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,18 @@ class Custom implements Persistence {
}

@HaxeCBridge.noemit
public function getMessages(accountId: String, chatId: String, beforeId: Null<String>, beforeTime: Null<String>, callback: (Array<ChatMessage>)->Void) {
backing.getMessages(accountId, chatId, beforeId, beforeTime, callback);
public function getMessagesBefore(accountId: String, chatId: String, beforeId: Null<String>, beforeTime: Null<String>, callback: (Array<ChatMessage>)->Void) {
backing.getMessagesBefore(accountId, chatId, beforeId, beforeTime, callback);
}

@HaxeCBridge.noemit
public function getMessagesAfter(accountId: String, chatId: String, afterId: Null<String>, afterTime: Null<String>, callback: (Array<ChatMessage>)->Void) {
backing.getMessagesAfter(accountId, chatId, afterId, afterTime, callback);
}

@HaxeCBridge.noemit
public function getMessagesAround(accountId: String, chatId: String, aroundId: Null<String>, aroundTime: Null<String>, callback: (Array<ChatMessage>)->Void) {
backing.getMessagesAround(accountId, chatId, aroundId, aroundTime, callback);
}

@HaxeCBridge.noemit
Expand Down Expand Up @@ -109,12 +119,12 @@ class Custom implements Persistence {
}

@HaxeCBridge.noemit
public function storeStreamManagement(accountId:String, smId:String, outboundCount:Int, inboundCount:Int, outboundQueue:Array<String>) {
backing.storeStreamManagement(accountId, smId, outboundCount, inboundCount, outboundQueue);
public function storeStreamManagement(accountId:String, sm:BytesData) {
backing.storeStreamManagement(accountId, sm);
}

@HaxeCBridge.noemit
public function getStreamManagement(accountId:String, callback: (Null<String>, Int, Int, Array<String>)->Void) {
public function getStreamManagement(accountId:String, callback: (BytesData)->Void) {
backing.getStreamManagement(accountId, callback);
}

Expand Down
12 changes: 11 additions & 1 deletion snikket/persistence/Dummy.hx
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,17 @@ class Dummy implements Persistence {
}

@HaxeCBridge.noemit
public function getMessages(accountId: String, chatId: String, beforeId: Null<String>, beforeTime: Null<String>, callback: (Array<ChatMessage>)->Void) {
public function getMessagesBefore(accountId: String, chatId: String, beforeId: Null<String>, beforeTime: Null<String>, callback: (Array<ChatMessage>)->Void) {
callback([]);
}

@HaxeCBridge.noemit
public function getMessagesAfter(accountId: String, chatId: String, afterId: Null<String>, afterTime: Null<String>, callback: (Array<ChatMessage>)->Void) {
callback([]);
}

@HaxeCBridge.noemit
public function getMessagesAround(accountId: String, chatId: String, aroundId: Null<String>, aroundTime: Null<String>, callback: (Array<ChatMessage>)->Void) {
callback([]);
}

Expand Down
30 changes: 24 additions & 6 deletions snikket/persistence/Sqlite.hx
Original file line number Diff line number Diff line change
Expand Up @@ -187,23 +187,41 @@ class Sqlite implements Persistence {
callback(message);
}

@HaxeCBridge.noemit
public function getMessages(accountId: String, chatId: String, beforeId: Null<String>, beforeTime: Null<String>, callback: (Array<ChatMessage>)->Void) {
private function getMessages(accountId: String, chatId: String, time: String, op: String) {
final q = new StringBuf();
q.add("SELECT stanza FROM messages WHERE account_id=");
db.addValue(q, accountId);
q.add(" AND chat_id=");
db.addValue(q, chatId);
if (beforeTime != null) {
q.add(" AND created_at <");
db.addValue(q, DateTime.fromString(beforeTime).getTime());
if (time != null) {
q.add(" AND created_at " + op);
db.addValue(q, DateTime.fromString(time).getTime());
}
q.add("LIMIT 50");
final result = db.request(q.toString());
final messages = [];
for (row in result) {
messages.push(ChatMessage.fromStanza(Stanza.parse(row.stanza), JID.parse(accountId))); // TODO
}
callback(messages);
return messages;
}

@HaxeCBridge.noemit
public function getMessagesBefore(accountId: String, chatId: String, beforeId: Null<String>, beforeTime: Null<String>, callback: (Array<ChatMessage>)->Void) {
callback(getMessages(accountId, chatId, beforeTime, "<"));
}

@HaxeCBridge.noemit
public function getMessagesAfter(accountId: String, chatId: String, afterId: Null<String>, afterTime: Null<String>, callback: (Array<ChatMessage>)->Void) {
callback(getMessages(accountId, chatId, afterTime, ">"));
}

@HaxeCBridge.noemit
public function getMessagesAround(accountId: String, chatId: String, aroundId: Null<String>, aroundTime: Null<String>, callback: (Array<ChatMessage>)->Void) {
if (aroundTime == null) throw "Around what?";
final before = getMessages(accountId, chatId, aroundTime, "<");
final aroundAndAfter = getMessages(accountId, chatId, aroundTime, ">=");
callback(before.concat(aroundAndAfter));
}

@HaxeCBridge.noemit
Expand Down
49 changes: 44 additions & 5 deletions snikket/persistence/browser.js
Original file line number Diff line number Diff line change
Expand Up @@ -375,24 +375,63 @@ const browser = (dbname, tokenize, stemmer) => {
});
},

getMessages: function(account, chatId, beforeId, beforeTime, callback) {
const beforeDate = beforeTime ? new Date(beforeTime) : [];
getMessagesBefore: function(account, chatId, beforeId, beforeTime, callback) {
// TODO: if beforeId is present but beforeTime is null, lookup time
const bound = beforeTime ? new Date(beforeTime) : [];
const tx = db.transaction(["messages"], "readonly");
const store = tx.objectStore("messages");
const cursor = store.index("chats").openCursor(
IDBKeyRange.bound([account, chatId], [account, chatId, beforeDate]),
IDBKeyRange.bound([account, chatId], [account, chatId, bound]),
"prev"
);
this.getMessagesFromCursor(cursor, beforeId, bound, (messages) => callback(messages.reverse()));
},

getMessagesAfter: function(account, chatId, afterId, afterTime, callback) {
// TODO: if afterId is present but afterTime is null, lookup time
const bound = afterTime ? [new Date(afterTime)] : [];
const tx = db.transaction(["messages"], "readonly");
const store = tx.objectStore("messages");
const cursor = store.index("chats").openCursor(
IDBKeyRange.bound([account, chatId].concat(bound), [account, chatId, []]),
"next"
);
this.getMessagesFromCursor(cursor, afterId, bound[0], callback);
},

getMessagesAround: function(account, chatId, id, time, callback) {
// TODO: if id is present but time is null, lookup time
if (!id && !time) throw "Around what?";
const before = new Promise((resolve, reject) =>
this.getMessagesBefore(account, chatId, id, time, resolve)
);

const tx = db.transaction(["messages"], "readonly");
const store = tx.objectStore("messages");
const cursor = store.index("chats").openCursor(
IDBKeyRange.bound([account, chatId, new Date(time)], [account, chatId, []]),
"next"
);
const aroundAndAfter = new Promise((resolve, reject) =>
this.getMessagesFromCursor(cursor, null, null, resolve)
);

Promise.all([before, aroundAndAfter]).then((result) => {
callback(result.flat());
});
},

getMessagesFromCursor: function(cursor, id, bound, callback) {
const result = [];
cursor.onsuccess = (event) => {
if (event.target.result && result.length < 50) {
const value = event.target.result.value;
if (value.serverId === beforeId || (value.timestamp && value.timestamp.getTime() === (beforeDate instanceof Date && beforeDate.getTime()))) {
if (value.serverId === id || value.localId === id || (value.timestamp && value.timestamp.getTime() === (bound instanceof Date && bound.getTime()))) {
event.target.result.continue();
return;
}

result.unshift(hydrateMessage(value));
result.push(hydrateMessage(value));
event.target.result.continue();
} else {
Promise.all(result).then(callback);
Expand Down

0 comments on commit 2bcc37e

Please sign in to comment.