diff --git a/package-lock.json b/package-lock.json index 13d18a4d..de35cabd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14407,6 +14407,7 @@ "dependencies": { "@xmpp/error": "^0.14.0", "@xmpp/events": "^0.14.0", + "@xmpp/time": "^0.14.0", "@xmpp/xml": "^0.14.0" }, "engines": { diff --git a/packages/client-core/src/bind2/bind2.test.js b/packages/client-core/src/bind2/bind2.test.js index b0aac358..23c5fddb 100644 --- a/packages/client-core/src/bind2/bind2.test.js +++ b/packages/client-core/src/bind2/bind2.test.js @@ -66,7 +66,6 @@ test("with function resource returning string", async () => { test("with function resource throwing", async () => { const error = new Error("foo"); - function resource() { throw error; } @@ -102,7 +101,6 @@ test("with function resource returning resolved promise", async () => { test("with function resource returning rejected promise", async () => { const error = new Error("foo"); - async function resource() { throw error; } diff --git a/packages/connection/index.js b/packages/connection/index.js index 77f53d2e..3f631860 100644 --- a/packages/connection/index.js +++ b/packages/connection/index.js @@ -252,6 +252,21 @@ class Connection extends EventEmitter { await promise(this.socket, "close", "error", timeout); } + /** + * Forcibly disconnects the socket + * https://xmpp.org/rfcs/rfc6120.html#streams-close + * https://tools.ietf.org/html/rfc7395#section-3.6 + */ + async forceDisconnect(timeout = this.timeout) { + if (!this.socket) return; + + this._status("disconnecting"); + this.socket.destroy(); + + // The 'disconnect' status is set by the socket 'close' listener + await promise(this.socket, "close", "error", timeout); + } + /** * Opens the stream */ diff --git a/packages/stream-management/README.md b/packages/stream-management/README.md index 296b8ad4..d28596e5 100644 --- a/packages/stream-management/README.md +++ b/packages/stream-management/README.md @@ -10,7 +10,13 @@ When the session is resumed the `online` event is not emitted as session resumpt However `entity.status` is set to `online`. If the session fails to resume, entity will fallback to regular session establishment in which case `online` event will be emitted. -Automatically responds to acks but does not support requesting acks yet. +Automatically responds to acks and requests them. Also requests periodically even if you haven't sent anything. If server fails to respond to a request, the module triggers a reconnect. + +## Events + +**resumed**: Indicates that the connection was resumed (so online with no online event) +**fail**: Indicates that a stanza failed to send to the server and will not be retried +**ack**: Indicates that a stanza has been acknowledged by the server ## References diff --git a/packages/stream-management/index.js b/packages/stream-management/index.js index 9f162ce3..0cc7fa8b 100644 --- a/packages/stream-management/index.js +++ b/packages/stream-management/index.js @@ -1,6 +1,7 @@ import XMPPError from "@xmpp/error"; -import { procedure } from "@xmpp/events"; +import { EventEmitter, procedure } from "@xmpp/events"; import xml from "@xmpp/xml"; +import { datetime } from "@xmpp/time"; // https://xmpp.org/extensions/xep-0198.html @@ -45,24 +46,68 @@ export default function streamManagement({ bind2, sasl2, }) { - const sm = { + let timeoutTimeout = null; + let requestAckTimeout = null; + + const sm = new EventEmitter(); + Object.assign(sm, { allowResume: true, preferredMaximum: null, enabled: false, id: "", + outbound_q: [], outbound: 0, inbound: 0, max: null, - }; + timeout: 60_000, + requestAckInterval: 300_000, + debounceAckRequest: 100, + }); + + entity.on("disconnect", () => { + clearTimeout(timeoutTimeout); + clearTimeout(requestAckTimeout); + }); + + function queueToStanza({ stanza, stamp }) { + if ( + stanza.name === "message" && + !stanza.getChild("delay", "urn:xmpp:delay") + ) { + stanza.append( + xml("delay", { + xmlns: "urn:xmpp:delay", + from: entity.jid.toString(), + stamp: stamp, + }), + ); + } + return stanza; + } - function resumed() { + async function resumed(resumed) { sm.enabled = true; + const oldOutbound = sm.outbound; + for (let i = 0; i < resumed.attrs.h - oldOutbound; i++) { + let item = sm.outbound_q.shift(); + sm.outbound++; + sm.emit("ack", item.stanza); + } + let q = sm.outbound_q; + sm.outbound_q = []; + // This will trigger the middleware and re-add to the queue + await entity.sendMany(q.map((item) => queueToStanza(item))); + sm.emit("resumed"); entity._ready(true); } function failed() { sm.enabled = false; sm.id = ""; + let item; + while ((item = sm.outbound_q.shift())) { + sm.emit("fail", item.stanza); + } sm.outbound = 0; } @@ -73,11 +118,20 @@ export default function streamManagement({ } entity.on("online", () => { + if (sm.outbound_q.length > 0) { + throw new Error( + "Stream Management assertion failure, queue should be empty during online", + ); + } sm.outbound = 0; sm.inbound = 0; }); entity.on("offline", () => { + let item; + while ((item = sm.outbound_q.shift())) { + sm.emit("fail", item.stanza); + } sm.outbound = 0; sm.inbound = 0; sm.enabled = false; @@ -86,6 +140,7 @@ export default function streamManagement({ middleware.use((context, next) => { const { stanza } = context; + clearTimeout(timeoutTimeout); if (["presence", "message", "iq"].includes(stanza.name)) { sm.inbound += 1; } else if (stanza.is("r", NS)) { @@ -93,7 +148,12 @@ export default function streamManagement({ entity.send(xml("a", { xmlns: NS, h: sm.inbound })).catch(() => {}); } else if (stanza.is("a", NS)) { // > When a party receives an element, it SHOULD keep a record of the 'h' value returned as the sequence number of the last handled outbound stanza for the current stream (and discard the previous value). - sm.outbound = stanza.attrs.h; + const oldOutbound = sm.outbound; + for (let i = 0; i < stanza.attrs.h - oldOutbound; i++) { + let item = sm.outbound_q.shift(); + sm.outbound++; + sm.emit("ack", item.stanza); + } } return next(); @@ -105,6 +165,33 @@ export default function streamManagement({ if (sasl2) { setupSasl2({ sasl2, sm, failed, resumed }); } + + function requestAck() { + clearTimeout(timeoutTimeout); + if (sm.timeout) { + timeoutTimeout = setTimeout( + () => entity.forceDisconnect().catch(), + sm.timeout, + ); + } + entity.send(xml("r", { xmlns: NS })).catch(() => {}); + // Periodically send r to check the connection + // If a stanza goes out it will cancel this and set a sooner timer + requestAckTimeout = setTimeout(requestAck, sm.requestAckInterval); + } + + middleware.filter((context, next) => { + if (!sm.enabled) return next(); + const { stanza } = context; + if (!["presence", "message", "iq"].includes(stanza.name)) return next(); + + sm.outbound_q.push({ stanza, stamp: datetime() }); + // Debounce requests so we send only one after a big run of stanza together + clearTimeout(requestAckTimeout); + requestAckTimeout = setTimeout(requestAck, sm.debounceAckRequest); + return next(); + }); + if (streamFeatures) { setupStreamFeature({ streamFeatures, @@ -133,8 +220,8 @@ function setupStreamFeature({ // Resuming if (sm.id) { try { - await resume(entity, sm); - resumed(); + const element = await resume(entity, sm); + await resumed(element); return; // If resumption fails, continue with session establishment } catch { @@ -149,6 +236,12 @@ function setupStreamFeature({ const promiseEnable = enable(entity, sm); + if (sm.outbound_q.length > 0) { + throw new Error( + "Stream Management assertion failure, queue should be empty after enable", + ); + } + // > The counter for an entity's own sent stanzas is set to zero and started after sending either or . sm.outbound = 0; @@ -172,7 +265,7 @@ function setupSasl2({ sasl2, sm, failed, resumed }) { }, (element) => { if (element.is("resumed")) { - resumed(); + resumed(element); } else if (element.is(failed)) { // const error = StreamError.fromElement(element) failed(); diff --git a/packages/stream-management/package.json b/packages/stream-management/package.json index e9818cbb..72503e38 100644 --- a/packages/stream-management/package.json +++ b/packages/stream-management/package.json @@ -16,7 +16,8 @@ "dependencies": { "@xmpp/error": "^0.14.0", "@xmpp/events": "^0.14.0", - "@xmpp/xml": "^0.14.0" + "@xmpp/xml": "^0.14.0", + "@xmpp/time": "^0.14.0" }, "engines": { "node": ">= 20" diff --git a/packages/stream-management/stream-features.test.js b/packages/stream-management/stream-features.test.js index a8edc437..3eefbbe9 100644 --- a/packages/stream-management/stream-features.test.js +++ b/packages/stream-management/stream-features.test.js @@ -22,6 +22,7 @@ test("enable - enabled", async () => { ); expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); expect(entity.streamManagement.enabled).toBe(false); expect(entity.streamManagement.id).toBe(""); @@ -73,6 +74,7 @@ test("enable - message - enabled", async () => { ); expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); expect(entity.streamManagement.enabled).toBe(false); expect(entity.streamManagement.id).toBe(""); @@ -112,6 +114,7 @@ test("enable - failed", async () => { ); expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); entity.streamManagement.enabled = true; entity.mockInput( @@ -125,6 +128,53 @@ test("enable - failed", async () => { expect(entity.streamManagement.enabled).toBe(false); }); +test("stanza ack", async () => { + const { entity } = mockClient(); + + entity.mockInput( + + + , + ); + + expect(await entity.catchOutgoing()).toEqual( + , + ); + + entity.mockInput( + , + ); + + await tick(); + + expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); + expect(entity.streamManagement.enabled).toBe(true); + + await entity.send(); + + expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toHaveLength(1); + + let acks = 0; + entity.streamManagement.on("ack", (stanza) => { + expect(stanza.attrs.id).toBe("a"); + acks++; + }); + + entity.mockInput(); + await tick(); + + expect(acks).toBe(1); + expect(entity.streamManagement.outbound).toBe(1); + expect(entity.streamManagement.outbound_q).toHaveLength(0); +}); + test("resume - resumed", async () => { const { entity } = mockClient(); @@ -138,6 +188,10 @@ test("resume - resumed", async () => { ); entity.streamManagement.outbound = 45; + entity.streamManagement.outbound_q = [ + { stanza: , stamp: "1990-01-01T00:00:00Z" }, + { stanza: , stamp: "1990-01-01T00:00:00Z" }, + ]; expect(await entity.catchOutgoing()).toEqual( , @@ -147,11 +201,87 @@ test("resume - resumed", async () => { expect(entity.status).toBe("offline"); - entity.mockInput(); + entity.mockInput(); + + let acks = 0; + entity.streamManagement.on("ack", (stanza) => { + expect(stanza.attrs.id).toBe("a"); + acks++; + }); + + expect(await entity.catchOutgoing()).toEqual( + + + , + ); await tick(); - expect(entity.streamManagement.outbound).toBe(45); + expect(acks).toBe(1); + expect(entity.streamManagement.outbound).toBe(46); + expect(entity.streamManagement.outbound_q).toHaveLength(1); + expect(entity.status).toBe("online"); +}); + +test("resumed event", async () => { + const { entity } = mockClient(); + + entity.status = "offline"; + entity.streamManagement.id = "bar"; + + entity.mockInput( + + + , + ); + + entity.streamManagement.outbound = 45; + entity.streamManagement.outbound_q = [ + { stanza: , stamp: "1990-01-01T00:00:00Z" }, + { stanza: , stamp: "1990-01-01T00:00:00Z" }, + ]; + + expect(await entity.catchOutgoing()).toEqual( + , + ); + + expect(entity.streamManagement.enabled).toBe(false); + + expect(entity.status).toBe("offline"); + + entity.mockInput(); + + let acks = 0; + entity.streamManagement.on("ack", (stanza) => { + expect(stanza.attrs.id).toBe("a"); + acks++; + }); + + expect(await entity.catchOutgoing()).toEqual( + + + , + ); + + let resumed = false; + entity.streamManagement.on("resumed", () => { + resumed = true; + }); + + await tick(); + + expect(resumed).toBe(true); + expect(acks).toBe(1); + expect(entity.streamManagement.outbound).toBe(46); + expect(entity.streamManagement.outbound_q).toHaveLength(1); expect(entity.status).toBe("online"); }); @@ -162,6 +292,7 @@ test("resume - failed", async () => { entity.streamManagement.id = "bar"; entity.streamManagement.enabled = true; entity.streamManagement.outbound = 45; + entity.streamManagement.outbound_q = []; entity.mockInput( @@ -185,4 +316,46 @@ test("resume - failed", async () => { expect(entity.streamManagement.id).toBe(""); expect(entity.streamManagement.enabled).toBe(false); expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); +}); + +test("resume - failed with something in queue", async () => { + const { entity } = mockClient(); + + entity.status = "bar"; + entity.streamManagement.id = "bar"; + entity.streamManagement.enabled = true; + entity.streamManagement.outbound = 45; + entity.streamManagement.outbound_q = [{ stanza: "hai" }]; + + entity.mockInput( + + + , + ); + + expect(await entity.catchOutgoing()).toEqual( + , + ); + + entity.mockInput( + + + , + ); + + let failures = 0; + entity.streamManagement.on("fail", (failed) => { + failures++; + expect(failed).toBe("hai"); + }); + + await tick(); + + expect(failures).toBe(1); + expect(entity.status).toBe("bar"); + expect(entity.streamManagement.id).toBe(""); + expect(entity.streamManagement.enabled).toBe(false); + expect(entity.streamManagement.outbound).toBe(0); + expect(entity.streamManagement.outbound_q).toBeEmpty(); }); diff --git a/packages/test/mockClient.js b/packages/test/mockClient.js index c7025edd..45eff046 100644 --- a/packages/test/mockClient.js +++ b/packages/test/mockClient.js @@ -5,6 +5,11 @@ import context from "./context.js"; export default function mockClient(options) { const xmpp = client(options); xmpp.send = Connection.prototype.send; + xmpp.sendMany = async (stanzas) => { + for (const stanza of stanzas) { + await xmpp.send(stanza); + } + }; const ctx = context(xmpp); return Object.assign(xmpp, ctx); } diff --git a/packages/tls/lib/Socket.js b/packages/tls/lib/Socket.js index e0bbfa27..9b60506c 100644 --- a/packages/tls/lib/Socket.js +++ b/packages/tls/lib/Socket.js @@ -63,6 +63,10 @@ class Socket extends EventEmitter { this.socket.end(); } + destroy() { + this.socket.destroy(); + } + write(data, fn) { this.socket.write(data, fn); } diff --git a/packages/websocket/lib/Socket.js b/packages/websocket/lib/Socket.js index 3227a11b..d48e5fa4 100644 --- a/packages/websocket/lib/Socket.js +++ b/packages/websocket/lib/Socket.js @@ -75,6 +75,10 @@ export default class Socket extends EventEmitter { this.socket.close(); } + destroy() { + this.socket.close(); + } + write(data, fn) { if (WebSocket === WS) { this.socket.send(data, fn); diff --git a/test/stream-management.js b/test/stream-management.js new file mode 100644 index 00000000..5d8aacca --- /dev/null +++ b/test/stream-management.js @@ -0,0 +1,102 @@ +import { client } from "../packages/client/index.js"; +import { promise } from "../packages/events/index.js"; +import { datetime } from "../packages/time/index.js"; +import debug from "../packages/debug/index.js"; +import server from "../server/index.js"; + +const username = "client"; +const password = "foobar"; +const credentials = { username, password }; +const domain = "localhost"; + +let xmpp; + +afterEach(async () => { + await xmpp?.stop(); + await server.reset(); +}); + +test("client ack stanzas", async () => { + await server.enableModules(["smacks"]); + await server.restart(); + + xmpp = client({ credentials, service: domain }); + debug(xmpp); + + const elP = promise(xmpp.streamManagement, "ack"); + await xmpp.start(); + await xmpp.send( + + + , + ); + + const el = await elP; + expect(el.attrs.id).toEqual("ping"); +}); + +test("client fail stanzas", async () => { + await server.enableModules(["smacks"]); + await server.restart(); + + xmpp = client({ credentials, service: domain }); + debug(xmpp); + + const elP = promise(xmpp.streamManagement, "fail"); + await xmpp.start(); + // Expect send but don't actually send to server, so it will fail + await xmpp.streamManagement.outbound_q.push({ + stanza: + + , + stamp: datetime() + }); + await xmpp.stop(); + + const el = await elP; + expect(el.attrs.id).toEqual("ping"); +}); + +test("client retry stanzas", async () => { + await server.enableModules(["smacks"]); + await server.restart(); + + xmpp = client({ credentials, service: domain }); + debug(xmpp); + + const elP = promise(xmpp.streamManagement, "ack"); + await xmpp.start(); + // Add to queue but don't actually send so it can retry after disconnect + await xmpp.streamManagement.outbound_q.push({ + stanza: + + , + stamp: datetime() + }); + await xmpp.disconnect(); + + const el = await elP; + expect(el.attrs.id).toEqual("ping"); +}); + +test("client reconnect automatically", async () => { + await server.enableModules(["smacks"]); + await server.restart(); + + xmpp = client({ credentials, service: domain }); + xmpp.streamManagement.timeout = 10; + xmpp.streamManagement.debounceAckRequest = 1; + debug(xmpp); + + const resumedP = promise(xmpp.streamManagement, "resumed"); + await xmpp.start(); + await xmpp.send( + + + , + ); + xmpp.socket.socket.pause(); + + await resumedP; + expect().pass(); +});