diff --git a/dist/client/client.js b/dist/client/client.js index 5c70e59..8362235 100644 --- a/dist/client/client.js +++ b/dist/client/client.js @@ -10,6 +10,7 @@ function generateClientId(prefix) { } /** * Implements exponential backoff sleep with optional randomization + * based on https://dthain.blogspot.com/2009/02/exponential-backoff-in-distributed.html * @param random - Whether to add randomization to the delay * @param attempt - The attempt number (used to calculate delay) * @returns Promise that resolves after the calculated delay @@ -67,6 +68,8 @@ export class Client { * @returns Promise resolving to a SockConn connection */ createConn(protocol, _hostname, _port, _caCerts, _cert, _key) { + // if you need to support alternative connection types just + // overload this method in your subclass throw `Unsupported protocol: ${protocol}`; } /** @@ -85,6 +88,7 @@ export class Client { logger.debug(`${isReconnect ? "re" : ""}connecting`); try { const conn = await this.createConn(this.url.protocol, this.url.hostname, Number(this.url.port) ?? undefined, this.caCerts, this.cert, this.key); + // if we get this far we have a connection tryConnect = (await this.ctx.handleConnection(conn, this.connectPacket)) && this.autoReconnect; diff --git a/dist/server/handlers/handleConnect.js b/dist/server/handlers/handleConnect.js index dbd43ef..7ce810d 100644 --- a/dist/server/handlers/handleConnect.js +++ b/dist/server/handlers/handleConnect.js @@ -1,38 +1,63 @@ import { AuthenticationResult, logger, PacketType, Timer, } from "../deps.js"; +/** + * Checks if the client is authenticated based on the provided credentials + * @param ctx - The connection context + * @param packet - The MQTT CONNECT packet + * @returns Authentication result indicating if the client is authenticated + */ function isAuthenticated(ctx, packet) { if (ctx.handlers.isAuthenticated) { return ctx.handlers.isAuthenticated(ctx, packet.clientId || "", packet.username || "", packet.password || new Uint8Array(0)); } return AuthenticationResult.ok; } +/** + * Validates the CONNECT packet + * @param ctx - The connection context + * @param packet - The MQTT CONNECT packet to validate + * @returns Authentication result indicating if the CONNECT packet is valid + */ function validateConnect(ctx, packet) { if (packet.protocolLevel !== 4) { return AuthenticationResult.unacceptableProtocol; } return isAuthenticated(ctx, packet); } +/** + * Processes the validated CONNECT packet + * @param packet - The MQTT CONNECT packet + * @param ctx - The connection context + * @param clientId - The client ID + */ +function processValidatedConnect(packet, ctx, clientId) { + if (packet.will) { + ctx.will = { + type: PacketType.publish, + qos: packet.will.qos, + retain: packet.will.retain, + topic: packet.will.topic, + payload: packet.will.payload, + }; + } + ctx.connect(clientId, packet.clean || false); + const keepAlive = packet.keepAlive || 0; + if (keepAlive > 0) { + logger.debug(`Setting keepalive to ${keepAlive * 1500} ms`); + ctx.timer = new Timer(() => { + ctx.close(); + }, Math.floor(keepAlive * 1500)); + } +} +/** + * Handles the MQTT CONNECT packet + * @param ctx - The connection context + * @param packet - The MQTT CONNECT packet to handle + */ export function handleConnect(ctx, packet) { const clientId = packet.clientId || `Opifex-${crypto.randomUUID()}`; const returnCode = validateConnect(ctx, packet); - // connect is ok if (returnCode === AuthenticationResult.ok) { - if (packet.will) { - ctx.will = { - type: PacketType.publish, - qos: packet.will.qos, - retain: packet.will.retain, - topic: packet.will.topic, - payload: packet.will.payload, - }; - } - ctx.connect(clientId, packet.clean || false); - const keepAlive = packet.keepAlive || 0; - if (keepAlive > 0) { - logger.debug(`Setting keepalive to ${keepAlive * 1500} ms`); - ctx.timer = new Timer(() => { - ctx.close(); - }, Math.floor(keepAlive * 1500)); - } + processValidatedConnect(packet, ctx, clientId); } const sessionPresent = false; ctx.send({ diff --git a/dist/server/handlers/handleDisconnect.js b/dist/server/handlers/handleDisconnect.js index d80de8d..b7f895e 100644 --- a/dist/server/handlers/handleDisconnect.js +++ b/dist/server/handlers/handleDisconnect.js @@ -1,3 +1,8 @@ +/** + * Handles client disconnection by clearing the will message and closing the connection + * @param {Context} ctx - The connection context object + * @returns {void} + */ export function handleDisconnect(ctx) { ctx.will = undefined; ctx.close(); diff --git a/dist/server/handlers/handlePacket.js b/dist/server/handlers/handlePacket.js index 942687d..689400f 100644 --- a/dist/server/handlers/handlePacket.js +++ b/dist/server/handlers/handlePacket.js @@ -10,6 +10,13 @@ import { handleSubscribe } from "./handleSubscribe.js"; import { handleUnsubscribe } from "./handleUnsubscribe.js"; import { handleDisconnect } from "./handleDisconnect.js"; import { logger } from "../deps.js"; +/** + * Handles incoming MQTT packets based on their type and connection state + * @param ctx - The connection context containing client state and configuration + * @param packet - The MQTT packet to handle + * @throws Error if receiving unexpected packet types or packets before connect + * @returns Promise that resolves when packet handling is complete + */ export async function handlePacket(ctx, packet) { logger.debug("handling", PacketNameByType[packet.type]); logger.debug(JSON.stringify(packet, null, 2)); diff --git a/dist/server/handlers/handlePingreq.js b/dist/server/handlers/handlePingreq.js index 083ef74..dd46789 100644 --- a/dist/server/handlers/handlePingreq.js +++ b/dist/server/handlers/handlePingreq.js @@ -1,4 +1,9 @@ import { PacketType } from "../deps.js"; +/** + * Handles PINGREQ packet by responding with a PINGRESP packet + * @param ctx - The connection context containing send method + * @returns Promise that resolves when PINGRESP is sent + */ export async function handlePingreq(ctx) { await ctx.send({ type: PacketType.pingres, diff --git a/dist/server/handlers/handlePuback.js b/dist/server/handlers/handlePuback.js index 724bd7e..187a5a8 100644 --- a/dist/server/handlers/handlePuback.js +++ b/dist/server/handlers/handlePuback.js @@ -1,4 +1,12 @@ -// A PUBACK Packet is the response to a PUBLISH Packet with QoS level 1. +/** + * Handles PUBACK (Publish Acknowledgment) packets in MQTT protocol + * @param ctx - The connection context containing the client's state and configuration + * @param packet - The PUBACK packet received from the client + * @description + * PUBACK packets are sent in response to PUBLISH packets with QoS level 1. + * This function removes the original PUBLISH packet from the pending outgoing messages store + * once acknowledgment is received. + */ export function handlePuback(ctx, packet) { // qos 1 only const id = packet.id; diff --git a/dist/server/handlers/handlePubcomp.js b/dist/server/handlers/handlePubcomp.js index ac79b3a..009169e 100644 --- a/dist/server/handlers/handlePubcomp.js +++ b/dist/server/handlers/handlePubcomp.js @@ -1,5 +1,11 @@ -// The PUBCOMP Packet is the response to a PUBREL Packet. -// It is the fourth and final packet of the QoS 2 protocol exchange. +/** + * Handles PUBCOMP packets which are the response to PUBREL packets in QoS 2 flow + * @param ctx - The connection context containing the client state and configuration + * @param packet - The PUBCOMP packet received from the client + * @description + * This is the fourth and final packet of the QoS 2 protocol exchange. + * When received, it removes the message from the pendingAckOutgoing store. + */ export function handlePubcomp(ctx, packet) { const id = packet.id; if (ctx.store?.pendingAckOutgoing.has(id)) { diff --git a/dist/server/handlers/handlePublish.js b/dist/server/handlers/handlePublish.js index a95df01..b5e9f81 100644 --- a/dist/server/handlers/handlePublish.js +++ b/dist/server/handlers/handlePublish.js @@ -1,5 +1,11 @@ import { SysPrefix } from "../context.js"; import { PacketType } from "../deps.js"; +/** + * Checks if a client is authorized to publish to a given topic + * @param ctx - The connection context + * @param topic - The topic to check authorization for + * @returns boolean indicating if client is authorized to publish + */ function authorizedToPublish(ctx, topic) { if (topic.startsWith(SysPrefix)) { return false; @@ -9,6 +15,13 @@ function authorizedToPublish(ctx, topic) { } return true; } +/** + * Handles MQTT PUBLISH packets + * @param ctx - The connection context + * @param packet - The PUBLISH packet to process + * @returns Promise that resolves when packet is processed + * @throws Error if packet processing fails + */ export async function handlePublish(ctx, packet) { if (!authorizedToPublish(ctx, packet.topic)) { return; diff --git a/dist/server/handlers/handlePubrec.js b/dist/server/handlers/handlePubrec.js index 041473f..828f833 100644 --- a/dist/server/handlers/handlePubrec.js +++ b/dist/server/handlers/handlePubrec.js @@ -1,7 +1,15 @@ import { PacketType } from "../deps.js"; -// qos 2 -// Discard message, Store PUBREC received -// send PUBREL +/** + * Handles PUBREC (QoS 2 Publish Received) packets + * @param ctx - The connection context + * @param packet - The PUBREC packet received from the client + * @returns Promise that resolves when handling is complete + * @description + * For QoS 2 message flow: + * 1. Discards the original publish message + * 2. Stores that PUBREC was received for the packet ID + * 3. Sends PUBREL packet in response + */ export async function handlePubrec(ctx, packet) { const id = packet.id; if (ctx.store?.pendingOutgoing.has(id)) { diff --git a/dist/server/handlers/handlePubrel.js b/dist/server/handlers/handlePubrel.js index 9a4d4cb..bfcd268 100644 --- a/dist/server/handlers/handlePubrel.js +++ b/dist/server/handlers/handlePubrel.js @@ -1,7 +1,16 @@ import { PacketType } from "../deps.js"; -// qos 2 only -// Method A, Initiate onward delivery of the Application Message1 then discard message -// Send PUBCOMP +/** + * Handles PUBREL (QoS 2 publish release) packets + * + * @param ctx - The connection context + * @param packet - The PUBREL packet received from the client + * @returns Promise that resolves when handling is complete + * @description + * For QoS 2 message delivery: + * 1. Initiates onward delivery of the Application Message + * 2. Discards the stored message + * 3. Sends PUBCOMP packet with the Packet Identifier + */ export async function handlePubrel(ctx, packet) { const id = packet.id; if (ctx.store?.pendingIncoming.has(id)) { diff --git a/dist/server/handlers/handleSubscribe.js b/dist/server/handlers/handleSubscribe.js index a374faf..24e6fe5 100644 --- a/dist/server/handlers/handleSubscribe.js +++ b/dist/server/handlers/handleSubscribe.js @@ -1,14 +1,35 @@ import { PacketType, } from "../deps.js"; +/** + * @constant {number} SubscriptionFailure + * @description Code indicating a failed subscription attempt + */ const SubscriptionFailure = 0x80; +/** + * Checks if a client is authorized to subscribe to a topic + * @param ctx - The connection context + * @param topicFilter - The topic filter to check authorization for + * @returns True if authorized, false otherwise + */ function authorizedToSubscribe(ctx, topicFilter) { if (ctx.handlers.isAuthorizedToSubscribe) { return ctx.handlers.isAuthorizedToSubscribe(ctx, topicFilter); } return true; } +/** + * @function handleSubscribe + * @description Processes an MQTT SUBSCRIBE packet + * @param {Context} ctx - The connection context + * @param {SubscribePacket} packet - The SUBSCRIBE packet received from the client + * @returns {Promise} + * @throws {Error} If subscription processing fails + * @remarks The order of return codes in the SUBACK Packet MUST match the order of Topic Filters in the SUBSCRIBE Packet [MQTT-3.9.3-1] + */ export async function handleSubscribe(ctx, packet) { - // The order of return codes in the SUBACK Packet MUST match the order of - // Topic Filters in the SUBSCRIBE Packet [MQTT-3.9.3-1]. + /* + * The order of return codes in the SUBACK Packet MUST match the order of + * Topic Filters in the SUBSCRIBE Packet [MQTT-3.9.3-1]. + */ const validSubscriptions = []; const returnCodes = packet.subscriptions.map((sub) => { if (ctx.store) { @@ -19,14 +40,16 @@ export async function handleSubscribe(ctx, packet) { validSubscriptions.push(sub); return sub.qos; } - return SubscriptionFailure; // failure + return SubscriptionFailure; }); await ctx.send({ type: PacketType.suback, id: packet.id, returnCodes: returnCodes, }); - // send any retained messages that match these subscriptions + /* + * send any retained messages that match these subscriptions + */ if (ctx.store) { ctx.persistence.handleRetained(ctx.store.clientId); } diff --git a/dist/server/handlers/handleUnsubscribe.js b/dist/server/handlers/handleUnsubscribe.js index afb4e27..0ab160f 100644 --- a/dist/server/handlers/handleUnsubscribe.js +++ b/dist/server/handlers/handleUnsubscribe.js @@ -1,4 +1,10 @@ import { PacketType } from "../deps.js"; +/** + * Handles MQTT unsubscribe packets by removing subscriptions and sending acknowledgement + * @param ctx - The connection context containing client information and methods + * @param packet - The MQTT unsubscribe packet containing topics to unsubscribe from + * @returns Promise that resolves when unsubscribe is complete and acknowledged + */ export async function handleUnsubscribe(ctx, packet) { for (const topic of packet.topicFilters) { if (ctx.store) { diff --git a/server/handlers/handleConnect.ts b/server/handlers/handleConnect.ts index ad23974..f651058 100644 --- a/server/handlers/handleConnect.ts +++ b/server/handlers/handleConnect.ts @@ -8,6 +8,12 @@ import { Timer, } from "../deps.ts"; +/** + * Checks if the client is authenticated based on the provided credentials + * @param ctx - The connection context + * @param packet - The MQTT CONNECT packet + * @returns Authentication result indicating if the client is authenticated + */ function isAuthenticated( ctx: Context, packet: ConnectPacket, @@ -23,6 +29,12 @@ function isAuthenticated( return AuthenticationResult.ok; } +/** + * Validates the CONNECT packet + * @param ctx - The connection context + * @param packet - The MQTT CONNECT packet to validate + * @returns Authentication result indicating if the CONNECT packet is valid + */ function validateConnect( ctx: Context, packet: ConnectPacket, @@ -33,34 +45,50 @@ function validateConnect( return isAuthenticated(ctx, packet); } +/** + * Processes the validated CONNECT packet + * @param packet - The MQTT CONNECT packet + * @param ctx - The connection context + * @param clientId - The client ID + */ +function processValidatedConnect( + packet: ConnectPacket, + ctx: Context, + clientId: string, +) { + if (packet.will) { + ctx.will = { + type: PacketType.publish, + qos: packet.will.qos, + retain: packet.will.retain, + topic: packet.will.topic, + payload: packet.will.payload, + }; + } + + ctx.connect(clientId, packet.clean || false); + + const keepAlive = packet.keepAlive || 0; + if (keepAlive > 0) { + logger.debug(`Setting keepalive to ${keepAlive * 1500} ms`); + ctx.timer = new Timer(() => { + ctx.close(); + }, Math.floor(keepAlive * 1500)); + } +} + +/** + * Handles the MQTT CONNECT packet + * @param ctx - The connection context + * @param packet - The MQTT CONNECT packet to handle + */ export function handleConnect(ctx: Context, packet: ConnectPacket): void { const clientId = packet.clientId || `Opifex-${crypto.randomUUID()}`; const returnCode = validateConnect(ctx, packet); - // connect is ok if (returnCode === AuthenticationResult.ok) { - if (packet.will) { - ctx.will = { - type: PacketType.publish, - qos: packet.will.qos, - retain: packet.will.retain, - topic: packet.will.topic, - payload: packet.will.payload, - }; - } - - ctx.connect(clientId, packet.clean || false); - - const keepAlive = packet.keepAlive || 0; - if (keepAlive > 0) { - logger.debug(`Setting keepalive to ${keepAlive * 1500} ms`); - ctx.timer = new Timer(() => { - ctx.close(); - }, Math.floor(keepAlive * 1500)); - } + processValidatedConnect(packet, ctx, clientId); } - const sessionPresent = false; - ctx.send({ type: PacketType.connack, sessionPresent, diff --git a/server/handlers/handleDisconnect.ts b/server/handlers/handleDisconnect.ts index 333ac8d..8811958 100644 --- a/server/handlers/handleDisconnect.ts +++ b/server/handlers/handleDisconnect.ts @@ -1,5 +1,10 @@ import type { Context } from "../context.ts"; +/** + * Handles client disconnection by clearing the will message and closing the connection + * @param {Context} ctx - The connection context object + * @returns {void} + */ export function handleDisconnect(ctx: Context): void { ctx.will = undefined; ctx.close(); diff --git a/server/handlers/handlePacket.ts b/server/handlers/handlePacket.ts index 103b01d..eaf00a8 100644 --- a/server/handlers/handlePacket.ts +++ b/server/handlers/handlePacket.ts @@ -23,6 +23,13 @@ import { handleUnsubscribe } from "./handleUnsubscribe.ts"; import { handleDisconnect } from "./handleDisconnect.ts"; import { logger } from "../deps.ts"; +/** + * Handles incoming MQTT packets based on their type and connection state + * @param ctx - The connection context containing client state and configuration + * @param packet - The MQTT packet to handle + * @throws Error if receiving unexpected packet types or packets before connect + * @returns Promise that resolves when packet handling is complete + */ export async function handlePacket( ctx: Context, packet: AnyPacket, diff --git a/server/handlers/handlePingreq.ts b/server/handlers/handlePingreq.ts index 55c333c..d2ab92e 100644 --- a/server/handlers/handlePingreq.ts +++ b/server/handlers/handlePingreq.ts @@ -1,6 +1,12 @@ import type { Context } from "../context.ts"; import { PacketType } from "../deps.ts"; +/** + * Handles PINGREQ packet by responding with a PINGRESP packet + * @param ctx - The connection context containing send method + * @returns Promise that resolves when PINGRESP is sent + */ + export async function handlePingreq(ctx: Context): Promise { await ctx.send({ type: PacketType.pingres, diff --git a/server/handlers/handlePuback.ts b/server/handlers/handlePuback.ts index 6a627b7..571094c 100644 --- a/server/handlers/handlePuback.ts +++ b/server/handlers/handlePuback.ts @@ -1,8 +1,15 @@ import type { Context } from "../context.ts"; import type { PubackPacket } from "../deps.ts"; -// A PUBACK Packet is the response to a PUBLISH Packet with QoS level 1. - +/** + * Handles PUBACK (Publish Acknowledgment) packets in MQTT protocol + * @param ctx - The connection context containing the client's state and configuration + * @param packet - The PUBACK packet received from the client + * @description + * PUBACK packets are sent in response to PUBLISH packets with QoS level 1. + * This function removes the original PUBLISH packet from the pending outgoing messages store + * once acknowledgment is received. + */ export function handlePuback(ctx: Context, packet: PubackPacket) { // qos 1 only const id = packet.id; diff --git a/server/handlers/handlePubcomp.ts b/server/handlers/handlePubcomp.ts index 1e3efea..3974391 100644 --- a/server/handlers/handlePubcomp.ts +++ b/server/handlers/handlePubcomp.ts @@ -1,9 +1,14 @@ import type { Context } from "../context.ts"; import type { PubcompPacket } from "../deps.ts"; -// The PUBCOMP Packet is the response to a PUBREL Packet. -// It is the fourth and final packet of the QoS 2 protocol exchange. - +/** + * Handles PUBCOMP packets which are the response to PUBREL packets in QoS 2 flow + * @param ctx - The connection context containing the client state and configuration + * @param packet - The PUBCOMP packet received from the client + * @description + * This is the fourth and final packet of the QoS 2 protocol exchange. + * When received, it removes the message from the pendingAckOutgoing store. + */ export function handlePubcomp(ctx: Context, packet: PubcompPacket): void { const id = packet.id; if (ctx.store?.pendingAckOutgoing.has(id)) { diff --git a/server/handlers/handlePublish.ts b/server/handlers/handlePublish.ts index 3e80d40..8d459b8 100644 --- a/server/handlers/handlePublish.ts +++ b/server/handlers/handlePublish.ts @@ -1,6 +1,12 @@ import { type Context, SysPrefix } from "../context.ts"; import { PacketType, type PublishPacket, type Topic } from "../deps.ts"; +/** + * Checks if a client is authorized to publish to a given topic + * @param ctx - The connection context + * @param topic - The topic to check authorization for + * @returns boolean indicating if client is authorized to publish + */ function authorizedToPublish(ctx: Context, topic: Topic) { if (topic.startsWith(SysPrefix)) { return false; @@ -11,6 +17,13 @@ function authorizedToPublish(ctx: Context, topic: Topic) { return true; } +/** + * Handles MQTT PUBLISH packets + * @param ctx - The connection context + * @param packet - The PUBLISH packet to process + * @returns Promise that resolves when packet is processed + * @throws Error if packet processing fails + */ export async function handlePublish( ctx: Context, packet: PublishPacket, diff --git a/server/handlers/handlePubrec.ts b/server/handlers/handlePubrec.ts index 080e295..5ff8398 100644 --- a/server/handlers/handlePubrec.ts +++ b/server/handlers/handlePubrec.ts @@ -1,9 +1,17 @@ import type { Context } from "../context.ts"; import { PacketType, type PubrecPacket } from "../deps.ts"; -// qos 2 -// Discard message, Store PUBREC received -// send PUBREL +/** + * Handles PUBREC (QoS 2 Publish Received) packets + * @param ctx - The connection context + * @param packet - The PUBREC packet received from the client + * @returns Promise that resolves when handling is complete + * @description + * For QoS 2 message flow: + * 1. Discards the original publish message + * 2. Stores that PUBREC was received for the packet ID + * 3. Sends PUBREL packet in response + */ export async function handlePubrec( ctx: Context, packet: PubrecPacket, diff --git a/server/handlers/handlePubrel.ts b/server/handlers/handlePubrel.ts index 3a9b178..40ca3c8 100644 --- a/server/handlers/handlePubrel.ts +++ b/server/handlers/handlePubrel.ts @@ -1,10 +1,18 @@ import type { Context } from "../context.ts"; import { PacketType, type PubrelPacket } from "../deps.ts"; -// qos 2 only -// Method A, Initiate onward delivery of the Application Message1 then discard message -// Send PUBCOMP - +/** + * Handles PUBREL (QoS 2 publish release) packets + * + * @param ctx - The connection context + * @param packet - The PUBREL packet received from the client + * @returns Promise that resolves when handling is complete + * @description + * For QoS 2 message delivery: + * 1. Initiates onward delivery of the Application Message + * 2. Discards the stored message + * 3. Sends PUBCOMP packet with the Packet Identifier + */ export async function handlePubrel( ctx: Context, packet: PubrelPacket, diff --git a/server/handlers/handleSubscribe.ts b/server/handlers/handleSubscribe.ts index 0ab1e80..d4e0a25 100644 --- a/server/handlers/handleSubscribe.ts +++ b/server/handlers/handleSubscribe.ts @@ -6,7 +6,18 @@ import { type Topic, } from "../deps.ts"; +/** + * @constant {number} SubscriptionFailure + * @description Code indicating a failed subscription attempt + */ const SubscriptionFailure = 0x80; + +/** + * Checks if a client is authorized to subscribe to a topic + * @param ctx - The connection context + * @param topicFilter - The topic filter to check authorization for + * @returns True if authorized, false otherwise + */ function authorizedToSubscribe(ctx: Context, topicFilter: Topic) { if (ctx.handlers.isAuthorizedToSubscribe) { return ctx.handlers.isAuthorizedToSubscribe(ctx, topicFilter); @@ -14,13 +25,23 @@ function authorizedToSubscribe(ctx: Context, topicFilter: Topic) { return true; } +/** + * @function handleSubscribe + * @description Processes an MQTT SUBSCRIBE packet + * @param {Context} ctx - The connection context + * @param {SubscribePacket} packet - The SUBSCRIBE packet received from the client + * @returns {Promise} + * @throws {Error} If subscription processing fails + * @remarks The order of return codes in the SUBACK Packet MUST match the order of Topic Filters in the SUBSCRIBE Packet [MQTT-3.9.3-1] + */ export async function handleSubscribe( ctx: Context, packet: SubscribePacket, ): Promise { - // The order of return codes in the SUBACK Packet MUST match the order of - // Topic Filters in the SUBSCRIBE Packet [MQTT-3.9.3-1]. - + /* + * The order of return codes in the SUBACK Packet MUST match the order of + * Topic Filters in the SUBSCRIBE Packet [MQTT-3.9.3-1]. + */ const validSubscriptions: Subscription[] = []; const returnCodes = packet.subscriptions.map((sub) => { if (ctx.store) { @@ -31,7 +52,7 @@ export async function handleSubscribe( validSubscriptions.push(sub); return sub.qos; } - return SubscriptionFailure; // failure + return SubscriptionFailure; }); await ctx.send({ @@ -40,7 +61,9 @@ export async function handleSubscribe( returnCodes: returnCodes, }); - // send any retained messages that match these subscriptions + /* + * send any retained messages that match these subscriptions + */ if (ctx.store) { ctx.persistence.handleRetained(ctx.store.clientId); } diff --git a/server/handlers/handleUnsubscribe.ts b/server/handlers/handleUnsubscribe.ts index 1b3b8a8..6c32475 100644 --- a/server/handlers/handleUnsubscribe.ts +++ b/server/handlers/handleUnsubscribe.ts @@ -1,6 +1,12 @@ import type { Context } from "../context.ts"; import { PacketType, type UnsubscribePacket } from "../deps.ts"; +/** + * Handles MQTT unsubscribe packets by removing subscriptions and sending acknowledgement + * @param ctx - The connection context containing client information and methods + * @param packet - The MQTT unsubscribe packet containing topics to unsubscribe from + * @returns Promise that resolves when unsubscribe is complete and acknowledged + */ export async function handleUnsubscribe( ctx: Context, packet: UnsubscribePacket,