-
Notifications
You must be signed in to change notification settings - Fork 376
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement stream management requesting ACKs #1005
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
"use strict"; | ||
|
||
const xml = require("@xmpp/xml"); | ||
const time = require("@xmpp/time"); | ||
|
||
// https://xmpp.org/extensions/xep-0198.html | ||
|
||
|
@@ -46,24 +47,34 @@ module.exports = function streamManagement({ | |
middleware, | ||
}) { | ||
let address = null; | ||
let timeoutTimeout = null; | ||
|
||
const sm = { | ||
allowResume: true, | ||
preferredMaximum: null, | ||
enabled: false, | ||
id: "", | ||
outbound_q: [], | ||
outbound: 0, | ||
inbound: 0, | ||
max: null, | ||
timeout: 60000, | ||
}; | ||
|
||
entity.on("online", (jid) => { | ||
address = jid; | ||
if (sm.outbound_q.length > 0) { | ||
throw "Stream Management assertion failure, queue should be empty during online"; | ||
} | ||
sm.outbound = 0; | ||
sm.inbound = 0; | ||
}); | ||
|
||
entity.on("offline", () => { | ||
let stanza; | ||
while ((stanza = sm.outbound_q.shift())) { | ||
entity.emit("stream-management/fail", stanza); | ||
} | ||
sm.outbound = 0; | ||
sm.inbound = 0; | ||
sm.enabled = false; | ||
|
@@ -72,35 +83,92 @@ module.exports = function streamManagement({ | |
|
||
middleware.use((context, next) => { | ||
const { stanza } = context; | ||
if (timeoutTimeout) clearTimeout(timeoutTimeout); | ||
if (["presence", "message", "iq"].includes(stanza.name)) { | ||
sm.inbound += 1; | ||
} else if (stanza.is("r", NS)) { | ||
// > When an <r/> element ("request") is received, the recipient MUST acknowledge it by sending an <a/> element to the sender containing a value of 'h' that is equal to the number of stanzas handled by the recipient of the <r/> element. | ||
entity.send(xml("a", { xmlns: NS, h: sm.inbound })).catch(() => {}); | ||
} else if (stanza.is("a", NS)) { | ||
// > When a party receives an <a/> element, it SHOULD keep a record of the 'h' value returned as the sequence number of the last handled outbound stanza for the current stream (and discard the previous value). | ||
sm.outbound = stanza.attrs.h; | ||
const oldOutbound = sm.outbound; | ||
for (let i = 0; i < stanza.attrs.h - oldOutbound; i++) { | ||
let stanza = sm.outbound_q.shift(); | ||
sm.outbound++; | ||
entity.emit("stream-management/ack", stanza); | ||
} | ||
} | ||
|
||
return next(); | ||
}); | ||
|
||
let requestAckTimeout = null; | ||
function requestAck() { | ||
if (timeoutTimeout) clearTimeout(timeoutTimeout); | ||
if (sm.timeout) { | ||
timeoutTimeout = setTimeout(() => entity.disconnect(), sm.timeout); | ||
} | ||
entity.send(xml("r", { xmlns: NS })).catch(() => {}); | ||
// Periodically send r to check the connection | ||
// If a stanza goes out it will cancel this and set a sooner timer | ||
requestAckTimeout = setTimeout(requestAck, 300000); | ||
} | ||
|
||
middleware.filter((context, next) => { | ||
const { stanza } = context; | ||
if (sm.enabled && ["presence", "message", "iq"].includes(stanza.name)) { | ||
let qStanza = stanza; | ||
if ( | ||
qStanza.name === "message" && | ||
!qStanza.getChild("delay", "urn:xmpp:delay") | ||
) { | ||
qStanza = xml.clone(stanza); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wdyt of not cloning the stanza? I think we can document that we don't guarantee immutability past There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, any mutation of arguments to a library that is visible by the caller makes my uncomfortable, and I'm not sure avoiding it just to lose 1KB is worth it IMO. But I don't think it would actually break anything I'm using it for, so if you feel strongly we should drop the clone I could I think. |
||
qStanza.c("delay", { | ||
xmlns: "urn:xmpp:delay", | ||
from: entity.jid.toString(), | ||
stamp: time.datetime(), | ||
}); | ||
} | ||
sm.outbound_q.push(qStanza); | ||
// Debounce requests so we send only one after a big run of stanza together | ||
if (requestAckTimeout) clearTimeout(requestAckTimeout); | ||
requestAckTimeout = setTimeout(requestAck, 100); | ||
} | ||
return next(); | ||
}); | ||
|
||
// https://xmpp.org/extensions/xep-0198.html#enable | ||
// For client-to-server connections, the client MUST NOT attempt to enable stream management until after it has completed Resource Binding unless it is resuming a previous session | ||
|
||
streamFeatures.use("sm", NS, async (context, next) => { | ||
// Resuming | ||
if (sm.id) { | ||
try { | ||
await resume(entity, sm.inbound, sm.id); | ||
let resumed = await resume(entity, sm.inbound, sm.id); | ||
sm.enabled = true; | ||
entity.jid = address; | ||
if (address) entity.jid = address; | ||
entity.status = "online"; | ||
const oldOutbound = sm.outbound; | ||
for (let i = 0; i < resumed.attrs.h - oldOutbound; i++) { | ||
let stanza = sm.outbound_q.shift(); | ||
sm.outbound++; | ||
entity.emit("stream-management/ack", stanza); | ||
} | ||
let q = sm.outbound_q; | ||
sm.outbound_q = []; | ||
for (const item of q) { | ||
entity.send(item); // This will trigger the middleware and re-add to the queue | ||
} | ||
entity.emit("stream-management/resumed"); | ||
return true; | ||
// If resumption fails, continue with session establishment | ||
} catch { | ||
sm.id = ""; | ||
sm.enabled = false; | ||
let stanza; | ||
while ((stanza = sm.outbound_q.shift())) { | ||
entity.emit("stream-management/fail", stanza); | ||
} | ||
sm.outbound = 0; | ||
} | ||
} | ||
|
@@ -113,6 +181,9 @@ module.exports = function streamManagement({ | |
const promiseEnable = enable(entity, sm.allowResume, sm.preferredMaximum); | ||
|
||
// > The counter for an entity's own sent stanzas is set to zero and started after sending either <enable/> or <enabled/>. | ||
if (sm.outbound_q.length > 0) { | ||
throw "Stream Management assertion failure, queue should be empty after enable"; | ||
} | ||
sm.outbound = 0; | ||
|
||
try { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
|
||
const Element = require("ltx/lib/Element"); | ||
const createElement = require("ltx/lib/createElement"); | ||
const clone = require("ltx/lib/clone"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. clone is not exported on purpose from |
||
const Parser = require("./lib/Parser"); | ||
const { | ||
escapeXML, | ||
|
@@ -19,6 +20,7 @@ module.exports = xml; | |
|
||
Object.assign(module.exports, { | ||
Element, | ||
clone, | ||
createElement, | ||
Parser, | ||
escapeXML, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needed if we're gonna make use of
clone