diff --git a/snikket/Chat.hx b/snikket/Chat.hx index 5fa0926..742fb3c 100644 --- a/snikket/Chat.hx +++ b/snikket/Chat.hx @@ -92,7 +92,48 @@ abstract class Chat { String in format YYYY-MM-DDThh:mm:ss[.sss]+00:00 @param handler takes one argument, an array of ChatMessage that are found **/ - abstract public function getMessages(beforeId:Null, beforeTime:Null, handler:(Array)->Void):Void; + abstract public function getMessagesBefore(beforeId:Null, beforeTime:Null, handler:(Array)->Void):Void; + + /** + Fetch a page of messages after some point + + @param afterId id of the message to look after + @param afterTime timestamp of the message to look after, + String in format YYYY-MM-DDThh:mm:ss[.sss]+00:00 + @param handler takes one argument, an array of ChatMessage that are found + **/ + abstract public function getMessagesAfter(afterId:Null, afterTime:Null, handler:(Array)->Void):Void; + + /** + Fetch a page of messages around (before, including, and after) some point + + @param aroundId id of the message to look around + @param aroundTime timestamp of the message to look around, + String in format YYYY-MM-DDThh:mm:ss[.sss]+00:00 + @param handler takes one argument, an array of ChatMessage that are found + **/ + abstract public function getMessagesAround(aroundId:Null, aroundTime:Null, handler:(Array)->Void):Void; + + private function fetchFromSync(sync: MessageSync, callback: (Array)->Void) { + final chatMessages = []; + sync.onMessages((messageList) -> { + final chatMessages = []; + for (m in messageList.messages) { + switch (m) { + case ChatMessageStanza(message): + final chatMessage = prepareIncomingMessage(message, new Stanza("message", { from: message.senderId() })); + persistence.storeMessage(client.accountId(), chatMessage, (m)->{}); + if (message.chatId() == chatId) chatMessages.push(message); + case ReactionUpdateStanza(update): + persistence.storeReaction(client.accountId(), update, (m)->{}); + default: + // ignore + } + } + callback(chatMessages); + }); + sync.fetchNext(); + } /** Send a ChatMessage to this Chat @@ -470,7 +511,7 @@ abstract class Chat { readUpToId = upTo; readUpToBy = upToBy; persistence.storeChat(client.accountId(), this); - persistence.getMessages(client.accountId(), chatId, null, null, (messages) -> { + persistence.getMessagesBefore(client.accountId(), chatId, null, null, (messages) -> { var i = messages.length; while (--i >= 0) { if (messages[i].serverId == readUpToId) break; @@ -549,30 +590,41 @@ class DirectChat extends Chat { } @HaxeCBridge.noemit // on superclass as abstract - public function getMessages(beforeId:Null, beforeTime:Null, handler:(Array)->Void):Void { - persistence.getMessages(client.accountId(), chatId, beforeId, beforeTime, (messages) -> { + public function getMessagesBefore(beforeId:Null, beforeTime:Null, handler:(Array)->Void):Void { + persistence.getMessagesBefore(client.accountId(), chatId, beforeId, beforeTime, (messages) -> { if (messages.length > 0) { handler(messages); } else { var filter:MAMQueryParams = { with: this.chatId }; if (beforeId != null) filter.page = { before: beforeId }; - var sync = new MessageSync(this.client, this.stream, filter); - sync.onMessages((messageList) -> { - final chatMessages = []; - for (m in messageList.messages) { - switch (m) { - case ChatMessageStanza(message): - persistence.storeMessage(client.accountId(), message, (m)->{}); - if (message.chatId() == chatId) chatMessages.push(message); - case ReactionUpdateStanza(update): - persistence.storeReaction(client.accountId(), update, (m)->{}); - default: - // ignore - } - } - handler(chatMessages); - }); - sync.fetchNext(); + var sync = new MessageSync(this.client, this.stream, filter); + fetchFromSync(sync, handler); + } + }); + } + + @HaxeCBridge.noemit // on superclass as abstract + public function getMessagesAfter(afterId:Null, afterTime:Null, handler:(Array)->Void):Void { + persistence.getMessagesAfter(client.accountId(), chatId, afterId, afterTime, (messages) -> { + if (messages.length > 0) { + handler(messages); + } else { + var filter:MAMQueryParams = { with: this.chatId }; + if (afterId != null) filter.page = { after: afterId }; + var sync = new MessageSync(this.client, this.stream, filter); + fetchFromSync(sync, handler); + } + }); + } + + @HaxeCBridge.noemit // on superclass as abstract + public function getMessagesAround(aroundId:Null, aroundTime:Null, handler:(Array)->Void):Void { + persistence.getMessagesAround(client.accountId(), chatId, aroundId, aroundTime, (messages) -> { + if (messages.length > 0) { + handler(messages); + } else { + // TODO + handler([]); } }); } @@ -951,31 +1003,41 @@ class Channel extends Chat { } @HaxeCBridge.noemit // on superclass as abstract - public function getMessages(beforeId:Null, beforeTime:Null, handler:(Array)->Void):Void { - persistence.getMessages(client.accountId(), chatId, beforeId, beforeTime, (messages) -> { + public function getMessagesBefore(beforeId:Null, beforeTime:Null, handler:(Array)->Void):Void { + persistence.getMessagesBefore(client.accountId(), chatId, beforeId, beforeTime, (messages) -> { if (messages.length > 0) { handler(messages); } else { var filter:MAMQueryParams = {}; if (beforeId != null) filter.page = { before: beforeId }; var sync = new MessageSync(this.client, this.stream, filter, chatId); - sync.onMessages((messageList) -> { - final chatMessages = []; - for (m in messageList.messages) { - switch (m) { - case ChatMessageStanza(message): - final chatMessage = prepareIncomingMessage(message, new Stanza("message", { from: message.senderId() })); - persistence.storeMessage(client.accountId(), chatMessage, (m)->{}); - if (message.chatId() == chatId) chatMessages.push(message); - case ReactionUpdateStanza(update): - persistence.storeReaction(client.accountId(), update, (m)->{}); - default: - // ignore - } - } - handler(chatMessages); - }); - sync.fetchNext(); + fetchFromSync(sync, handler); + } + }); + } + + @HaxeCBridge.noemit // on superclass as abstract + public function getMessagesAfter(afterId:Null, afterTime:Null, handler:(Array)->Void):Void { + persistence.getMessagesAfter(client.accountId(), chatId, afterId, afterTime, (messages) -> { + if (messages.length > 0) { + handler(messages); + } else { + var filter:MAMQueryParams = {}; + if (afterId != null) filter.page = { after: afterId }; + var sync = new MessageSync(this.client, this.stream, filter, chatId); + fetchFromSync(sync, handler); + } + }); + } + + @HaxeCBridge.noemit // on superclass as abstract + public function getMessagesAround(aroundId:Null, aroundTime:Null, handler:(Array)->Void):Void { + persistence.getMessagesAround(client.accountId(), chatId, aroundId, aroundTime, (messages) -> { + if (messages.length > 0) { + handler(messages); + } else { + // TODO + handler([]); } }); } diff --git a/snikket/MessageSync.hx b/snikket/MessageSync.hx index dde65c5..2a2c325 100644 --- a/snikket/MessageSync.hx +++ b/snikket/MessageSync.hx @@ -46,7 +46,7 @@ class MessageSync { } final messages:Array = []; if (lastPage == null) { - if (newestPageFirst == true && (filter.page == null || filter.page.before == null)) { + if (newestPageFirst == true && (filter.page == null || (filter.page.before == null && filter.page.after == null))) { if (filter.page == null) filter.page = {}; filter.page.before = ""; // Request last page of results } diff --git a/snikket/Persistence.hx b/snikket/Persistence.hx index 66edb53..e8ab6ad 100644 --- a/snikket/Persistence.hx +++ b/snikket/Persistence.hx @@ -17,7 +17,9 @@ interface Persistence { public function storeReaction(accountId: String, update: ReactionUpdate, callback: (Null)->Void):Void; public function storeMessage(accountId: String, message: ChatMessage, callback: (ChatMessage)->Void):Void; public function updateMessageStatus(accountId: String, localId: String, status:MessageStatus, callback: (ChatMessage)->Void):Void; - public function getMessages(accountId: String, chatId: String, beforeId: Null, beforeTime: Null, callback: (messages:Array)->Void):Void; + public function getMessagesBefore(accountId: String, chatId: String, beforeId: Null, beforeTime: Null, callback: (messages:Array)->Void):Void; + public function getMessagesAfter(accountId: String, chatId: String, afterId: Null, afterTime: Null, callback: (messages:Array)->Void):Void; + public function getMessagesAround(accountId: String, chatId: String, aroundId: Null, aroundTime: Null, callback: (messages:Array)->Void):Void; public function getMediaUri(hashAlgorithm:String, hash:BytesData, callback: (uri:Null)->Void):Void; public function storeMedia(mime:String, bytes:BytesData, callback: ()->Void):Void; public function storeCaps(caps:Caps):Void; diff --git a/snikket/persistence/Custom.hx b/snikket/persistence/Custom.hx index 1448bfb..469e490 100644 --- a/snikket/persistence/Custom.hx +++ b/snikket/persistence/Custom.hx @@ -59,8 +59,18 @@ class Custom implements Persistence { } @HaxeCBridge.noemit - public function getMessages(accountId: String, chatId: String, beforeId: Null, beforeTime: Null, callback: (Array)->Void) { - backing.getMessages(accountId, chatId, beforeId, beforeTime, callback); + public function getMessagesBefore(accountId: String, chatId: String, beforeId: Null, beforeTime: Null, callback: (Array)->Void) { + backing.getMessagesBefore(accountId, chatId, beforeId, beforeTime, callback); + } + + @HaxeCBridge.noemit + public function getMessagesAfter(accountId: String, chatId: String, afterId: Null, afterTime: Null, callback: (Array)->Void) { + backing.getMessagesAfter(accountId, chatId, afterId, afterTime, callback); + } + + @HaxeCBridge.noemit + public function getMessagesAround(accountId: String, chatId: String, aroundId: Null, aroundTime: Null, callback: (Array)->Void) { + backing.getMessagesAround(accountId, chatId, aroundId, aroundTime, callback); } @HaxeCBridge.noemit @@ -109,12 +119,12 @@ class Custom implements Persistence { } @HaxeCBridge.noemit - public function storeStreamManagement(accountId:String, smId:String, outboundCount:Int, inboundCount:Int, outboundQueue:Array) { - backing.storeStreamManagement(accountId, smId, outboundCount, inboundCount, outboundQueue); + public function storeStreamManagement(accountId:String, sm:BytesData) { + backing.storeStreamManagement(accountId, sm); } @HaxeCBridge.noemit - public function getStreamManagement(accountId:String, callback: (Null, Int, Int, Array)->Void) { + public function getStreamManagement(accountId:String, callback: (BytesData)->Void) { backing.getStreamManagement(accountId, callback); } diff --git a/snikket/persistence/Dummy.hx b/snikket/persistence/Dummy.hx index 6d91ffa..eddfd9f 100644 --- a/snikket/persistence/Dummy.hx +++ b/snikket/persistence/Dummy.hx @@ -42,7 +42,17 @@ class Dummy implements Persistence { } @HaxeCBridge.noemit - public function getMessages(accountId: String, chatId: String, beforeId: Null, beforeTime: Null, callback: (Array)->Void) { + public function getMessagesBefore(accountId: String, chatId: String, beforeId: Null, beforeTime: Null, callback: (Array)->Void) { + callback([]); + } + + @HaxeCBridge.noemit + public function getMessagesAfter(accountId: String, chatId: String, afterId: Null, afterTime: Null, callback: (Array)->Void) { + callback([]); + } + + @HaxeCBridge.noemit + public function getMessagesAround(accountId: String, chatId: String, aroundId: Null, aroundTime: Null, callback: (Array)->Void) { callback([]); } diff --git a/snikket/persistence/Sqlite.hx b/snikket/persistence/Sqlite.hx index bd70b66..66b5f95 100644 --- a/snikket/persistence/Sqlite.hx +++ b/snikket/persistence/Sqlite.hx @@ -187,23 +187,41 @@ class Sqlite implements Persistence { callback(message); } - @HaxeCBridge.noemit - public function getMessages(accountId: String, chatId: String, beforeId: Null, beforeTime: Null, callback: (Array)->Void) { + private function getMessages(accountId: String, chatId: String, time: String, op: String) { final q = new StringBuf(); q.add("SELECT stanza FROM messages WHERE account_id="); db.addValue(q, accountId); q.add(" AND chat_id="); db.addValue(q, chatId); - if (beforeTime != null) { - q.add(" AND created_at <"); - db.addValue(q, DateTime.fromString(beforeTime).getTime()); + if (time != null) { + q.add(" AND created_at " + op); + db.addValue(q, DateTime.fromString(time).getTime()); } + q.add("LIMIT 50"); final result = db.request(q.toString()); final messages = []; for (row in result) { messages.push(ChatMessage.fromStanza(Stanza.parse(row.stanza), JID.parse(accountId))); // TODO } - callback(messages); + return messages; + } + + @HaxeCBridge.noemit + public function getMessagesBefore(accountId: String, chatId: String, beforeId: Null, beforeTime: Null, callback: (Array)->Void) { + callback(getMessages(accountId, chatId, beforeTime, "<")); + } + + @HaxeCBridge.noemit + public function getMessagesAfter(accountId: String, chatId: String, afterId: Null, afterTime: Null, callback: (Array)->Void) { + callback(getMessages(accountId, chatId, afterTime, ">")); + } + + @HaxeCBridge.noemit + public function getMessagesAround(accountId: String, chatId: String, aroundId: Null, aroundTime: Null, callback: (Array)->Void) { + if (aroundTime == null) throw "Around what?"; + final before = getMessages(accountId, chatId, aroundTime, "<"); + final aroundAndAfter = getMessages(accountId, chatId, aroundTime, ">="); + callback(before.concat(aroundAndAfter)); } @HaxeCBridge.noemit diff --git a/snikket/persistence/browser.js b/snikket/persistence/browser.js index ce34f11..63c9be4 100644 --- a/snikket/persistence/browser.js +++ b/snikket/persistence/browser.js @@ -375,24 +375,63 @@ const browser = (dbname, tokenize, stemmer) => { }); }, - getMessages: function(account, chatId, beforeId, beforeTime, callback) { - const beforeDate = beforeTime ? new Date(beforeTime) : []; + getMessagesBefore: function(account, chatId, beforeId, beforeTime, callback) { + // TODO: if beforeId is present but beforeTime is null, lookup time + const bound = beforeTime ? new Date(beforeTime) : []; const tx = db.transaction(["messages"], "readonly"); const store = tx.objectStore("messages"); const cursor = store.index("chats").openCursor( - IDBKeyRange.bound([account, chatId], [account, chatId, beforeDate]), + IDBKeyRange.bound([account, chatId], [account, chatId, bound]), "prev" ); + this.getMessagesFromCursor(cursor, beforeId, bound, (messages) => callback(messages.reverse())); + }, + + getMessagesAfter: function(account, chatId, afterId, afterTime, callback) { + // TODO: if afterId is present but afterTime is null, lookup time + const bound = afterTime ? [new Date(afterTime)] : []; + const tx = db.transaction(["messages"], "readonly"); + const store = tx.objectStore("messages"); + const cursor = store.index("chats").openCursor( + IDBKeyRange.bound([account, chatId].concat(bound), [account, chatId, []]), + "next" + ); + this.getMessagesFromCursor(cursor, afterId, bound[0], callback); + }, + + getMessagesAround: function(account, chatId, id, time, callback) { + // TODO: if id is present but time is null, lookup time + if (!id && !time) throw "Around what?"; + const before = new Promise((resolve, reject) => + this.getMessagesBefore(account, chatId, id, time, resolve) + ); + + const tx = db.transaction(["messages"], "readonly"); + const store = tx.objectStore("messages"); + const cursor = store.index("chats").openCursor( + IDBKeyRange.bound([account, chatId, new Date(time)], [account, chatId, []]), + "next" + ); + const aroundAndAfter = new Promise((resolve, reject) => + this.getMessagesFromCursor(cursor, null, null, resolve) + ); + + Promise.all([before, aroundAndAfter]).then((result) => { + callback(result.flat()); + }); + }, + + getMessagesFromCursor: function(cursor, id, bound, callback) { const result = []; cursor.onsuccess = (event) => { if (event.target.result && result.length < 50) { const value = event.target.result.value; - if (value.serverId === beforeId || (value.timestamp && value.timestamp.getTime() === (beforeDate instanceof Date && beforeDate.getTime()))) { + if (value.serverId === id || value.localId === id || (value.timestamp && value.timestamp.getTime() === (bound instanceof Date && bound.getTime()))) { event.target.result.continue(); return; } - result.unshift(hydrateMessage(value)); + result.push(hydrateMessage(value)); event.target.result.continue(); } else { Promise.all(result).then(callback);