diff --git a/package-lock.json b/package-lock.json
index 13d18a4d..de35cabd 100644
--- a/package-lock.json
+++ b/package-lock.json
@@ -14407,6 +14407,7 @@
"dependencies": {
"@xmpp/error": "^0.14.0",
"@xmpp/events": "^0.14.0",
+ "@xmpp/time": "^0.14.0",
"@xmpp/xml": "^0.14.0"
},
"engines": {
diff --git a/packages/client-core/src/bind2/bind2.test.js b/packages/client-core/src/bind2/bind2.test.js
index b0aac358..23c5fddb 100644
--- a/packages/client-core/src/bind2/bind2.test.js
+++ b/packages/client-core/src/bind2/bind2.test.js
@@ -66,7 +66,6 @@ test("with function resource returning string", async () => {
test("with function resource throwing", async () => {
const error = new Error("foo");
-
function resource() {
throw error;
}
@@ -102,7 +101,6 @@ test("with function resource returning resolved promise", async () => {
test("with function resource returning rejected promise", async () => {
const error = new Error("foo");
-
async function resource() {
throw error;
}
diff --git a/packages/connection/index.js b/packages/connection/index.js
index 77f53d2e..3f631860 100644
--- a/packages/connection/index.js
+++ b/packages/connection/index.js
@@ -252,6 +252,21 @@ class Connection extends EventEmitter {
await promise(this.socket, "close", "error", timeout);
}
+ /**
+ * Forcibly disconnects the socket
+ * https://xmpp.org/rfcs/rfc6120.html#streams-close
+ * https://tools.ietf.org/html/rfc7395#section-3.6
+ */
+ async forceDisconnect(timeout = this.timeout) {
+ if (!this.socket) return;
+
+ this._status("disconnecting");
+ this.socket.destroy();
+
+ // The 'disconnect' status is set by the socket 'close' listener
+ await promise(this.socket, "close", "error", timeout);
+ }
+
/**
* Opens the stream
*/
diff --git a/packages/stream-management/README.md b/packages/stream-management/README.md
index 296b8ad4..d28596e5 100644
--- a/packages/stream-management/README.md
+++ b/packages/stream-management/README.md
@@ -10,7 +10,13 @@ When the session is resumed the `online` event is not emitted as session resumpt
However `entity.status` is set to `online`.
If the session fails to resume, entity will fallback to regular session establishment in which case `online` event will be emitted.
-Automatically responds to acks but does not support requesting acks yet.
+Automatically responds to acks and requests them. Also requests periodically even if you haven't sent anything. If server fails to respond to a request, the module triggers a reconnect.
+
+## Events
+
+**resumed**: Indicates that the connection was resumed (so online with no online event)
+**fail**: Indicates that a stanza failed to send to the server and will not be retried
+**ack**: Indicates that a stanza has been acknowledged by the server
## References
diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js
index 9f162ce3..0cc7fa8b 100644
--- a/packages/stream-management/index.js
+++ b/packages/stream-management/index.js
@@ -1,6 +1,7 @@
import XMPPError from "@xmpp/error";
-import { procedure } from "@xmpp/events";
+import { EventEmitter, procedure } from "@xmpp/events";
import xml from "@xmpp/xml";
+import { datetime } from "@xmpp/time";
// https://xmpp.org/extensions/xep-0198.html
@@ -45,24 +46,68 @@ export default function streamManagement({
bind2,
sasl2,
}) {
- const sm = {
+ let timeoutTimeout = null;
+ let requestAckTimeout = null;
+
+ const sm = new EventEmitter();
+ Object.assign(sm, {
allowResume: true,
preferredMaximum: null,
enabled: false,
id: "",
+ outbound_q: [],
outbound: 0,
inbound: 0,
max: null,
- };
+ timeout: 60_000,
+ requestAckInterval: 300_000,
+ debounceAckRequest: 100,
+ });
+
+ entity.on("disconnect", () => {
+ clearTimeout(timeoutTimeout);
+ clearTimeout(requestAckTimeout);
+ });
+
+ function queueToStanza({ stanza, stamp }) {
+ if (
+ stanza.name === "message" &&
+ !stanza.getChild("delay", "urn:xmpp:delay")
+ ) {
+ stanza.append(
+ xml("delay", {
+ xmlns: "urn:xmpp:delay",
+ from: entity.jid.toString(),
+ stamp: stamp,
+ }),
+ );
+ }
+ return stanza;
+ }
- function resumed() {
+ async function resumed(resumed) {
sm.enabled = true;
+ const oldOutbound = sm.outbound;
+ for (let i = 0; i < resumed.attrs.h - oldOutbound; i++) {
+ let item = sm.outbound_q.shift();
+ sm.outbound++;
+ sm.emit("ack", item.stanza);
+ }
+ let q = sm.outbound_q;
+ sm.outbound_q = [];
+ // This will trigger the middleware and re-add to the queue
+ await entity.sendMany(q.map((item) => queueToStanza(item)));
+ sm.emit("resumed");
entity._ready(true);
}
function failed() {
sm.enabled = false;
sm.id = "";
+ let item;
+ while ((item = sm.outbound_q.shift())) {
+ sm.emit("fail", item.stanza);
+ }
sm.outbound = 0;
}
@@ -73,11 +118,20 @@ export default function streamManagement({
}
entity.on("online", () => {
+ if (sm.outbound_q.length > 0) {
+ throw new Error(
+ "Stream Management assertion failure, queue should be empty during online",
+ );
+ }
sm.outbound = 0;
sm.inbound = 0;
});
entity.on("offline", () => {
+ let item;
+ while ((item = sm.outbound_q.shift())) {
+ sm.emit("fail", item.stanza);
+ }
sm.outbound = 0;
sm.inbound = 0;
sm.enabled = false;
@@ -86,6 +140,7 @@ export default function streamManagement({
middleware.use((context, next) => {
const { stanza } = context;
+ clearTimeout(timeoutTimeout);
if (["presence", "message", "iq"].includes(stanza.name)) {
sm.inbound += 1;
} else if (stanza.is("r", NS)) {
@@ -93,7 +148,12 @@ export default function streamManagement({
entity.send(xml("a", { xmlns: NS, h: sm.inbound })).catch(() => {});
} else if (stanza.is("a", NS)) {
// > When a party receives an element, it SHOULD keep a record of the 'h' value returned as the sequence number of the last handled outbound stanza for the current stream (and discard the previous value).
- sm.outbound = stanza.attrs.h;
+ const oldOutbound = sm.outbound;
+ for (let i = 0; i < stanza.attrs.h - oldOutbound; i++) {
+ let item = sm.outbound_q.shift();
+ sm.outbound++;
+ sm.emit("ack", item.stanza);
+ }
}
return next();
@@ -105,6 +165,33 @@ export default function streamManagement({
if (sasl2) {
setupSasl2({ sasl2, sm, failed, resumed });
}
+
+ function requestAck() {
+ clearTimeout(timeoutTimeout);
+ if (sm.timeout) {
+ timeoutTimeout = setTimeout(
+ () => entity.forceDisconnect().catch(),
+ sm.timeout,
+ );
+ }
+ entity.send(xml("r", { xmlns: NS })).catch(() => {});
+ // Periodically send r to check the connection
+ // If a stanza goes out it will cancel this and set a sooner timer
+ requestAckTimeout = setTimeout(requestAck, sm.requestAckInterval);
+ }
+
+ middleware.filter((context, next) => {
+ if (!sm.enabled) return next();
+ const { stanza } = context;
+ if (!["presence", "message", "iq"].includes(stanza.name)) return next();
+
+ sm.outbound_q.push({ stanza, stamp: datetime() });
+ // Debounce requests so we send only one after a big run of stanza together
+ clearTimeout(requestAckTimeout);
+ requestAckTimeout = setTimeout(requestAck, sm.debounceAckRequest);
+ return next();
+ });
+
if (streamFeatures) {
setupStreamFeature({
streamFeatures,
@@ -133,8 +220,8 @@ function setupStreamFeature({
// Resuming
if (sm.id) {
try {
- await resume(entity, sm);
- resumed();
+ const element = await resume(entity, sm);
+ await resumed(element);
return;
// If resumption fails, continue with session establishment
} catch {
@@ -149,6 +236,12 @@ function setupStreamFeature({
const promiseEnable = enable(entity, sm);
+ if (sm.outbound_q.length > 0) {
+ throw new Error(
+ "Stream Management assertion failure, queue should be empty after enable",
+ );
+ }
+
// > The counter for an entity's own sent stanzas is set to zero and started after sending either or .
sm.outbound = 0;
@@ -172,7 +265,7 @@ function setupSasl2({ sasl2, sm, failed, resumed }) {
},
(element) => {
if (element.is("resumed")) {
- resumed();
+ resumed(element);
} else if (element.is(failed)) {
// const error = StreamError.fromElement(element)
failed();
diff --git a/packages/stream-management/package.json b/packages/stream-management/package.json
index e9818cbb..72503e38 100644
--- a/packages/stream-management/package.json
+++ b/packages/stream-management/package.json
@@ -16,7 +16,8 @@
"dependencies": {
"@xmpp/error": "^0.14.0",
"@xmpp/events": "^0.14.0",
- "@xmpp/xml": "^0.14.0"
+ "@xmpp/xml": "^0.14.0",
+ "@xmpp/time": "^0.14.0"
},
"engines": {
"node": ">= 20"
diff --git a/packages/stream-management/stream-features.test.js b/packages/stream-management/stream-features.test.js
index a8edc437..3eefbbe9 100644
--- a/packages/stream-management/stream-features.test.js
+++ b/packages/stream-management/stream-features.test.js
@@ -22,6 +22,7 @@ test("enable - enabled", async () => {
);
expect(entity.streamManagement.outbound).toBe(0);
+ expect(entity.streamManagement.outbound_q).toBeEmpty();
expect(entity.streamManagement.enabled).toBe(false);
expect(entity.streamManagement.id).toBe("");
@@ -73,6 +74,7 @@ test("enable - message - enabled", async () => {
);
expect(entity.streamManagement.outbound).toBe(0);
+ expect(entity.streamManagement.outbound_q).toBeEmpty();
expect(entity.streamManagement.enabled).toBe(false);
expect(entity.streamManagement.id).toBe("");
@@ -112,6 +114,7 @@ test("enable - failed", async () => {
);
expect(entity.streamManagement.outbound).toBe(0);
+ expect(entity.streamManagement.outbound_q).toBeEmpty();
entity.streamManagement.enabled = true;
entity.mockInput(
@@ -125,6 +128,53 @@ test("enable - failed", async () => {
expect(entity.streamManagement.enabled).toBe(false);
});
+test("stanza ack", async () => {
+ const { entity } = mockClient();
+
+ entity.mockInput(
+
+
+ ,
+ );
+
+ expect(await entity.catchOutgoing()).toEqual(
+ ,
+ );
+
+ entity.mockInput(
+ ,
+ );
+
+ await tick();
+
+ expect(entity.streamManagement.outbound).toBe(0);
+ expect(entity.streamManagement.outbound_q).toBeEmpty();
+ expect(entity.streamManagement.enabled).toBe(true);
+
+ await entity.send();
+
+ expect(entity.streamManagement.outbound).toBe(0);
+ expect(entity.streamManagement.outbound_q).toHaveLength(1);
+
+ let acks = 0;
+ entity.streamManagement.on("ack", (stanza) => {
+ expect(stanza.attrs.id).toBe("a");
+ acks++;
+ });
+
+ entity.mockInput();
+ await tick();
+
+ expect(acks).toBe(1);
+ expect(entity.streamManagement.outbound).toBe(1);
+ expect(entity.streamManagement.outbound_q).toHaveLength(0);
+});
+
test("resume - resumed", async () => {
const { entity } = mockClient();
@@ -138,6 +188,10 @@ test("resume - resumed", async () => {
);
entity.streamManagement.outbound = 45;
+ entity.streamManagement.outbound_q = [
+ { stanza: , stamp: "1990-01-01T00:00:00Z" },
+ { stanza: , stamp: "1990-01-01T00:00:00Z" },
+ ];
expect(await entity.catchOutgoing()).toEqual(
,
@@ -147,11 +201,87 @@ test("resume - resumed", async () => {
expect(entity.status).toBe("offline");
- entity.mockInput();
+ entity.mockInput();
+
+ let acks = 0;
+ entity.streamManagement.on("ack", (stanza) => {
+ expect(stanza.attrs.id).toBe("a");
+ acks++;
+ });
+
+ expect(await entity.catchOutgoing()).toEqual(
+
+
+ ,
+ );
await tick();
- expect(entity.streamManagement.outbound).toBe(45);
+ expect(acks).toBe(1);
+ expect(entity.streamManagement.outbound).toBe(46);
+ expect(entity.streamManagement.outbound_q).toHaveLength(1);
+ expect(entity.status).toBe("online");
+});
+
+test("resumed event", async () => {
+ const { entity } = mockClient();
+
+ entity.status = "offline";
+ entity.streamManagement.id = "bar";
+
+ entity.mockInput(
+
+
+ ,
+ );
+
+ entity.streamManagement.outbound = 45;
+ entity.streamManagement.outbound_q = [
+ { stanza: , stamp: "1990-01-01T00:00:00Z" },
+ { stanza: , stamp: "1990-01-01T00:00:00Z" },
+ ];
+
+ expect(await entity.catchOutgoing()).toEqual(
+ ,
+ );
+
+ expect(entity.streamManagement.enabled).toBe(false);
+
+ expect(entity.status).toBe("offline");
+
+ entity.mockInput();
+
+ let acks = 0;
+ entity.streamManagement.on("ack", (stanza) => {
+ expect(stanza.attrs.id).toBe("a");
+ acks++;
+ });
+
+ expect(await entity.catchOutgoing()).toEqual(
+
+
+ ,
+ );
+
+ let resumed = false;
+ entity.streamManagement.on("resumed", () => {
+ resumed = true;
+ });
+
+ await tick();
+
+ expect(resumed).toBe(true);
+ expect(acks).toBe(1);
+ expect(entity.streamManagement.outbound).toBe(46);
+ expect(entity.streamManagement.outbound_q).toHaveLength(1);
expect(entity.status).toBe("online");
});
@@ -162,6 +292,7 @@ test("resume - failed", async () => {
entity.streamManagement.id = "bar";
entity.streamManagement.enabled = true;
entity.streamManagement.outbound = 45;
+ entity.streamManagement.outbound_q = [];
entity.mockInput(
@@ -185,4 +316,46 @@ test("resume - failed", async () => {
expect(entity.streamManagement.id).toBe("");
expect(entity.streamManagement.enabled).toBe(false);
expect(entity.streamManagement.outbound).toBe(0);
+ expect(entity.streamManagement.outbound_q).toBeEmpty();
+});
+
+test("resume - failed with something in queue", async () => {
+ const { entity } = mockClient();
+
+ entity.status = "bar";
+ entity.streamManagement.id = "bar";
+ entity.streamManagement.enabled = true;
+ entity.streamManagement.outbound = 45;
+ entity.streamManagement.outbound_q = [{ stanza: "hai" }];
+
+ entity.mockInput(
+
+
+ ,
+ );
+
+ expect(await entity.catchOutgoing()).toEqual(
+ ,
+ );
+
+ entity.mockInput(
+
+
+ ,
+ );
+
+ let failures = 0;
+ entity.streamManagement.on("fail", (failed) => {
+ failures++;
+ expect(failed).toBe("hai");
+ });
+
+ await tick();
+
+ expect(failures).toBe(1);
+ expect(entity.status).toBe("bar");
+ expect(entity.streamManagement.id).toBe("");
+ expect(entity.streamManagement.enabled).toBe(false);
+ expect(entity.streamManagement.outbound).toBe(0);
+ expect(entity.streamManagement.outbound_q).toBeEmpty();
});
diff --git a/packages/test/mockClient.js b/packages/test/mockClient.js
index c7025edd..45eff046 100644
--- a/packages/test/mockClient.js
+++ b/packages/test/mockClient.js
@@ -5,6 +5,11 @@ import context from "./context.js";
export default function mockClient(options) {
const xmpp = client(options);
xmpp.send = Connection.prototype.send;
+ xmpp.sendMany = async (stanzas) => {
+ for (const stanza of stanzas) {
+ await xmpp.send(stanza);
+ }
+ };
const ctx = context(xmpp);
return Object.assign(xmpp, ctx);
}
diff --git a/packages/tls/lib/Socket.js b/packages/tls/lib/Socket.js
index e0bbfa27..9b60506c 100644
--- a/packages/tls/lib/Socket.js
+++ b/packages/tls/lib/Socket.js
@@ -63,6 +63,10 @@ class Socket extends EventEmitter {
this.socket.end();
}
+ destroy() {
+ this.socket.destroy();
+ }
+
write(data, fn) {
this.socket.write(data, fn);
}
diff --git a/packages/websocket/lib/Socket.js b/packages/websocket/lib/Socket.js
index 3227a11b..d48e5fa4 100644
--- a/packages/websocket/lib/Socket.js
+++ b/packages/websocket/lib/Socket.js
@@ -75,6 +75,10 @@ export default class Socket extends EventEmitter {
this.socket.close();
}
+ destroy() {
+ this.socket.close();
+ }
+
write(data, fn) {
if (WebSocket === WS) {
this.socket.send(data, fn);
diff --git a/test/stream-management.js b/test/stream-management.js
new file mode 100644
index 00000000..5d8aacca
--- /dev/null
+++ b/test/stream-management.js
@@ -0,0 +1,102 @@
+import { client } from "../packages/client/index.js";
+import { promise } from "../packages/events/index.js";
+import { datetime } from "../packages/time/index.js";
+import debug from "../packages/debug/index.js";
+import server from "../server/index.js";
+
+const username = "client";
+const password = "foobar";
+const credentials = { username, password };
+const domain = "localhost";
+
+let xmpp;
+
+afterEach(async () => {
+ await xmpp?.stop();
+ await server.reset();
+});
+
+test("client ack stanzas", async () => {
+ await server.enableModules(["smacks"]);
+ await server.restart();
+
+ xmpp = client({ credentials, service: domain });
+ debug(xmpp);
+
+ const elP = promise(xmpp.streamManagement, "ack");
+ await xmpp.start();
+ await xmpp.send(
+
+
+ ,
+ );
+
+ const el = await elP;
+ expect(el.attrs.id).toEqual("ping");
+});
+
+test("client fail stanzas", async () => {
+ await server.enableModules(["smacks"]);
+ await server.restart();
+
+ xmpp = client({ credentials, service: domain });
+ debug(xmpp);
+
+ const elP = promise(xmpp.streamManagement, "fail");
+ await xmpp.start();
+ // Expect send but don't actually send to server, so it will fail
+ await xmpp.streamManagement.outbound_q.push({
+ stanza:
+
+ ,
+ stamp: datetime()
+ });
+ await xmpp.stop();
+
+ const el = await elP;
+ expect(el.attrs.id).toEqual("ping");
+});
+
+test("client retry stanzas", async () => {
+ await server.enableModules(["smacks"]);
+ await server.restart();
+
+ xmpp = client({ credentials, service: domain });
+ debug(xmpp);
+
+ const elP = promise(xmpp.streamManagement, "ack");
+ await xmpp.start();
+ // Add to queue but don't actually send so it can retry after disconnect
+ await xmpp.streamManagement.outbound_q.push({
+ stanza:
+
+ ,
+ stamp: datetime()
+ });
+ await xmpp.disconnect();
+
+ const el = await elP;
+ expect(el.attrs.id).toEqual("ping");
+});
+
+test("client reconnect automatically", async () => {
+ await server.enableModules(["smacks"]);
+ await server.restart();
+
+ xmpp = client({ credentials, service: domain });
+ xmpp.streamManagement.timeout = 10;
+ xmpp.streamManagement.debounceAckRequest = 1;
+ debug(xmpp);
+
+ const resumedP = promise(xmpp.streamManagement, "resumed");
+ await xmpp.start();
+ await xmpp.send(
+
+
+ ,
+ );
+ xmpp.socket.socket.pause();
+
+ await resumedP;
+ expect().pass();
+});