Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement stream management requesting ACKs #1005

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
"bundlesize": [
{
"path": "./packages/client/dist/xmpp.min.js",
"maxSize": "16 KB"
"maxSize": "17 KB"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed if we're gonna make use of clone

}
],
"lint-staged": {
Expand Down
77 changes: 74 additions & 3 deletions packages/stream-management/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"use strict";

const xml = require("@xmpp/xml");
const time = require("@xmpp/time");

// https://xmpp.org/extensions/xep-0198.html

Expand Down Expand Up @@ -46,24 +47,34 @@ module.exports = function streamManagement({
middleware,
}) {
let address = null;
let timeoutTimeout = null;

const sm = {
allowResume: true,
preferredMaximum: null,
enabled: false,
id: "",
outbound_q: [],
outbound: 0,
inbound: 0,
max: null,
timeout: 60000,
};

entity.on("online", (jid) => {
address = jid;
if (sm.outbound_q.length > 0) {
throw "Stream Management assertion failure, queue should be empty during online";
}
sm.outbound = 0;
sm.inbound = 0;
});

entity.on("offline", () => {
let stanza;
while ((stanza = sm.outbound_q.shift())) {
entity.emit("stream-management/fail", stanza);
}
sm.outbound = 0;
sm.inbound = 0;
sm.enabled = false;
Expand All @@ -72,35 +83,92 @@ module.exports = function streamManagement({

middleware.use((context, next) => {
const { stanza } = context;
if (timeoutTimeout) clearTimeout(timeoutTimeout);
if (["presence", "message", "iq"].includes(stanza.name)) {
sm.inbound += 1;
} else if (stanza.is("r", NS)) {
// > When an <r/> element ("request") is received, the recipient MUST acknowledge it by sending an <a/> element to the sender containing a value of 'h' that is equal to the number of stanzas handled by the recipient of the <r/> element.
entity.send(xml("a", { xmlns: NS, h: sm.inbound })).catch(() => {});
} else if (stanza.is("a", NS)) {
// > When a party receives an <a/> element, it SHOULD keep a record of the 'h' value returned as the sequence number of the last handled outbound stanza for the current stream (and discard the previous value).
sm.outbound = stanza.attrs.h;
const oldOutbound = sm.outbound;
for (let i = 0; i < stanza.attrs.h - oldOutbound; i++) {
let stanza = sm.outbound_q.shift();
sm.outbound++;
entity.emit("stream-management/ack", stanza);
}
}

return next();
});

let requestAckTimeout = null;
function requestAck() {
if (timeoutTimeout) clearTimeout(timeoutTimeout);
if (sm.timeout) {
timeoutTimeout = setTimeout(() => entity.disconnect(), sm.timeout);
}
entity.send(xml("r", { xmlns: NS })).catch(() => {});
// Periodically send r to check the connection
// If a stanza goes out it will cancel this and set a sooner timer
requestAckTimeout = setTimeout(requestAck, 300000);
}

middleware.filter((context, next) => {
const { stanza } = context;
if (sm.enabled && ["presence", "message", "iq"].includes(stanza.name)) {
let qStanza = stanza;
if (
qStanza.name === "message" &&
!qStanza.getChild("delay", "urn:xmpp:delay")
) {
qStanza = xml.clone(stanza);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdyt of not cloning the stanza? I think we can document that we don't guarantee immutability past send()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, any mutation of arguments to a library that is visible by the caller makes my uncomfortable, and I'm not sure avoiding it just to lose 1KB is worth it IMO. But I don't think it would actually break anything I'm using it for, so if you feel strongly we should drop the clone I could I think.

qStanza.c("delay", {
xmlns: "urn:xmpp:delay",
from: entity.jid.toString(),
stamp: time.datetime(),
});
}
sm.outbound_q.push(qStanza);
// Debounce requests so we send only one after a big run of stanza together
if (requestAckTimeout) clearTimeout(requestAckTimeout);
requestAckTimeout = setTimeout(requestAck, 100);
}
return next();
});

// https://xmpp.org/extensions/xep-0198.html#enable
// For client-to-server connections, the client MUST NOT attempt to enable stream management until after it has completed Resource Binding unless it is resuming a previous session

streamFeatures.use("sm", NS, async (context, next) => {
// Resuming
if (sm.id) {
try {
await resume(entity, sm.inbound, sm.id);
let resumed = await resume(entity, sm.inbound, sm.id);
sm.enabled = true;
entity.jid = address;
if (address) entity.jid = address;
entity.status = "online";
const oldOutbound = sm.outbound;
for (let i = 0; i < resumed.attrs.h - oldOutbound; i++) {
let stanza = sm.outbound_q.shift();
sm.outbound++;
entity.emit("stream-management/ack", stanza);
}
let q = sm.outbound_q;
sm.outbound_q = [];
for (const item of q) {
entity.send(item); // This will trigger the middleware and re-add to the queue
}
entity.emit("stream-management/resumed");
return true;
// If resumption fails, continue with session establishment
} catch {
sm.id = "";
sm.enabled = false;
let stanza;
while ((stanza = sm.outbound_q.shift())) {
entity.emit("stream-management/fail", stanza);
}
sm.outbound = 0;
}
}
Expand All @@ -113,6 +181,9 @@ module.exports = function streamManagement({
const promiseEnable = enable(entity, sm.allowResume, sm.preferredMaximum);

// > The counter for an entity's own sent stanzas is set to zero and started after sending either <enable/> or <enabled/>.
if (sm.outbound_q.length > 0) {
throw "Stream Management assertion failure, queue should be empty after enable";
}
sm.outbound = 0;

try {
Expand Down
3 changes: 2 additions & 1 deletion packages/stream-management/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
"management"
],
"dependencies": {
"@xmpp/xml": "^0.13.2"
"@xmpp/xml": "^0.13.2",
"@xmpp/time": "^0.13.2"
},
"engines": {
"node": ">= 14"
Expand Down
2 changes: 2 additions & 0 deletions packages/xml/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const Element = require("ltx/lib/Element");
const createElement = require("ltx/lib/createElement");
const clone = require("ltx/lib/clone");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clone is not exported on purpose from @xmpp/xml to keep the size to a minimum

const Parser = require("./lib/Parser");
const {
escapeXML,
Expand All @@ -19,6 +20,7 @@ module.exports = xml;

Object.assign(module.exports, {
Element,
clone,
createElement,
Parser,
escapeXML,
Expand Down
Loading