Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream-management: Implement requesting ACKs #1005

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions packages/client-core/src/bind2/bind2.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ test("with function resource returning string", async () => {
test("with function resource throwing", async () => {
const error = new Error("foo");


function resource() {
throw error;
}
Expand Down Expand Up @@ -102,7 +101,6 @@ test("with function resource returning resolved promise", async () => {
test("with function resource returning rejected promise", async () => {
const error = new Error("foo");


async function resource() {
throw error;
}
Expand Down
15 changes: 15 additions & 0 deletions packages/connection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,21 @@ class Connection extends EventEmitter {
await promise(this.socket, "close", "error", timeout);
}

/**
* Forcibly disconnects the socket
* https://xmpp.org/rfcs/rfc6120.html#streams-close
* https://tools.ietf.org/html/rfc7395#section-3.6
*/
async forceDisconnect(timeout = this.timeout) {
if (!this.socket) return;

this._status("disconnecting");
this.socket.destroy();

// The 'disconnect' status is set by the socket 'close' listener
await promise(this.socket, "close", "error", timeout);
}

/**
* Opens the stream
*/
Expand Down
8 changes: 7 additions & 1 deletion packages/stream-management/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ When the session is resumed the `online` event is not emitted as session resumpt
However `entity.status` is set to `online`.
If the session fails to resume, entity will fallback to regular session establishment in which case `online` event will be emitted.

Automatically responds to acks but does not support requesting acks yet.
Automatically responds to acks and requests them. Also requests periodically even if you haven't sent anything. If server fails to respond to a request, the module triggers a reconnect.

## Events

**resumed**: Indicates that the connection was resumed (so online with no online event)
**fail**: Indicates that a stanza failed to send to the server and will not be retried
**ack**: Indicates that a stanza has been acknowledged by the server

## References

Expand Down
109 changes: 101 additions & 8 deletions packages/stream-management/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import XMPPError from "@xmpp/error";
import { procedure } from "@xmpp/events";
import { EventEmitter, procedure } from "@xmpp/events";
import xml from "@xmpp/xml";
import { datetime } from "@xmpp/time";

// https://xmpp.org/extensions/xep-0198.html

Expand Down Expand Up @@ -45,24 +46,68 @@
bind2,
sasl2,
}) {
const sm = {
let timeoutTimeout = null;
let requestAckTimeout = null;

const sm = new EventEmitter();
Object.assign(sm, {
allowResume: true,
preferredMaximum: null,
enabled: false,
id: "",
outbound_q: [],
outbound: 0,
inbound: 0,
max: null,
};
timeout: 60_000,
requestAckInterval: 300_000,
debounceAckRequest: 100,
});

entity.on("disconnect", () => {
clearTimeout(timeoutTimeout);
clearTimeout(requestAckTimeout);
});

function queueToStanza({ stanza, stamp }) {
if (
stanza.name === "message" &&
!stanza.getChild("delay", "urn:xmpp:delay")
) {
stanza.append(
xml("delay", {
xmlns: "urn:xmpp:delay",
from: entity.jid.toString(),
stamp: stamp,
}),
);
}
return stanza;
}

function resumed() {
async function resumed(resumed) {
sm.enabled = true;
const oldOutbound = sm.outbound;
for (let i = 0; i < resumed.attrs.h - oldOutbound; i++) {
let item = sm.outbound_q.shift();
sm.outbound++;
sm.emit("ack", item.stanza);
}
Comment on lines +91 to +95
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdyt of something like that instead

Suggested change
for (let i = 0; i < resumed.attrs.h - oldOutbound; i++) {
let item = sm.outbound_q.shift();
sm.outbound++;
sm.emit("ack", item.stanza);
}
const items = sm.outbound_q.slice(-resumed.attrs.h);
sm.outbound += resumed.attrs.h;
sm.emit("ack", items);

let q = sm.outbound_q;
sm.outbound_q = [];
// This will trigger the middleware and re-add to the queue
await entity.sendMany(q.map((item) => queueToStanza(item)));
sm.emit("resumed");
entity._ready(true);
}

function failed() {
sm.enabled = false;
sm.id = "";
let item;
while ((item = sm.outbound_q.shift())) {
sm.emit("fail", item.stanza);
}
Comment on lines +107 to +110
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdyt of

Suggested change
let item;
while ((item = sm.outbound_q.shift())) {
sm.emit("fail", item.stanza);
}
sm.emit("fail", sm.outbound_q);
sm.outbound_q = [];

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That feels like it would make the library internals very slightly simpler in exchange for making the API slightly more complex? Is there a reason you're interested in this change?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it stands xmpp.js is a fairly low level XMPP library. I prefer abstractions to reflect the protocols to leave room for optimizations.

If that changes then we can think of what the API should be like and deal with persistence, tab reload etc

sm.outbound = 0;
}

Expand All @@ -73,11 +118,20 @@
}

entity.on("online", () => {
if (sm.outbound_q.length > 0) {
throw new Error(
"Stream Management assertion failure, queue should be empty during online",
);
}
sm.outbound = 0;
sm.inbound = 0;
});

entity.on("offline", () => {
let item;
while ((item = sm.outbound_q.shift())) {
sm.emit("fail", item.stanza);
}
sm.outbound = 0;
sm.inbound = 0;
sm.enabled = false;
Expand All @@ -86,14 +140,20 @@

middleware.use((context, next) => {
const { stanza } = context;
clearTimeout(timeoutTimeout);
if (["presence", "message", "iq"].includes(stanza.name)) {
sm.inbound += 1;
} else if (stanza.is("r", NS)) {
// > When an <r/> element ("request") is received, the recipient MUST acknowledge it by sending an <a/> element to the sender containing a value of 'h' that is equal to the number of stanzas handled by the recipient of the <r/> element.
entity.send(xml("a", { xmlns: NS, h: sm.inbound })).catch(() => {});
} else if (stanza.is("a", NS)) {
// > When a party receives an <a/> element, it SHOULD keep a record of the 'h' value returned as the sequence number of the last handled outbound stanza for the current stream (and discard the previous value).
sm.outbound = stanza.attrs.h;
const oldOutbound = sm.outbound;
for (let i = 0; i < stanza.attrs.h - oldOutbound; i++) {
let item = sm.outbound_q.shift();
sm.outbound++;
sm.emit("ack", item.stanza);
}
}

return next();
Expand All @@ -105,6 +165,33 @@
if (sasl2) {
setupSasl2({ sasl2, sm, failed, resumed });
}

function requestAck() {
clearTimeout(timeoutTimeout);
if (sm.timeout) {
timeoutTimeout = setTimeout(
() => entity.forceDisconnect().catch(),

Check warning on line 173 in packages/stream-management/index.js

View workflow job for this annotation

GitHub Actions / test (20)

Promise.catch() requires 1 argument, but received 0

Check warning on line 173 in packages/stream-management/index.js

View workflow job for this annotation

GitHub Actions / test (22)

Promise.catch() requires 1 argument, but received 0

Check warning on line 173 in packages/stream-management/index.js

View workflow job for this annotation

GitHub Actions / test (23)

Promise.catch() requires 1 argument, but received 0
sm.timeout,
);
}
entity.send(xml("r", { xmlns: NS })).catch(() => {});
// Periodically send r to check the connection
// If a stanza goes out it will cancel this and set a sooner timer
requestAckTimeout = setTimeout(requestAck, sm.requestAckInterval);
}

middleware.filter((context, next) => {
if (!sm.enabled) return next();
const { stanza } = context;
if (!["presence", "message", "iq"].includes(stanza.name)) return next();

sm.outbound_q.push({ stanza, stamp: datetime() });
// Debounce requests so we send only one after a big run of stanza together
clearTimeout(requestAckTimeout);
requestAckTimeout = setTimeout(requestAck, sm.debounceAckRequest);
Comment on lines +189 to +191
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xmpp.js has a sendMany method, could we hook to that instead (if not in this PR then later)?

return next();
});

if (streamFeatures) {
setupStreamFeature({
streamFeatures,
Expand Down Expand Up @@ -133,8 +220,8 @@
// Resuming
if (sm.id) {
try {
await resume(entity, sm);
resumed();
const element = await resume(entity, sm);
await resumed(element);
return;
// If resumption fails, continue with session establishment
} catch {
Expand All @@ -149,6 +236,12 @@

const promiseEnable = enable(entity, sm);

if (sm.outbound_q.length > 0) {
throw new Error(
"Stream Management assertion failure, queue should be empty after enable",
);
}
sonnyp marked this conversation as resolved.
Show resolved Hide resolved

// > The counter for an entity's own sent stanzas is set to zero and started after sending either <enable/> or <enabled/>.
sm.outbound = 0;

Expand All @@ -172,7 +265,7 @@
},
(element) => {
if (element.is("resumed")) {
resumed();
resumed(element);
} else if (element.is(failed)) {
// const error = StreamError.fromElement(element)
failed();
Expand Down
3 changes: 2 additions & 1 deletion packages/stream-management/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
"dependencies": {
"@xmpp/error": "^0.14.0",
"@xmpp/events": "^0.14.0",
"@xmpp/xml": "^0.14.0"
"@xmpp/xml": "^0.14.0",
"@xmpp/time": "^0.14.0"
},
"engines": {
"node": ">= 20"
Expand Down
Loading
Loading