Skip to content

Commit

Permalink
Added more JSdoc
Browse files Browse the repository at this point in the history
  • Loading branch information
seriousme committed Jan 19, 2025
1 parent 3e5c6d7 commit 3d10632
Show file tree
Hide file tree
Showing 23 changed files with 305 additions and 70 deletions.
4 changes: 4 additions & 0 deletions dist/client/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ function generateClientId(prefix) {
}
/**
* Implements exponential backoff sleep with optional randomization
* based on https://dthain.blogspot.com/2009/02/exponential-backoff-in-distributed.html
* @param random - Whether to add randomization to the delay
* @param attempt - The attempt number (used to calculate delay)
* @returns Promise that resolves after the calculated delay
Expand Down Expand Up @@ -67,6 +68,8 @@ export class Client {
* @returns Promise resolving to a SockConn connection
*/
createConn(protocol, _hostname, _port, _caCerts, _cert, _key) {
// if you need to support alternative connection types just
// overload this method in your subclass
throw `Unsupported protocol: ${protocol}`;
}
/**
Expand All @@ -85,6 +88,7 @@ export class Client {
logger.debug(`${isReconnect ? "re" : ""}connecting`);
try {
const conn = await this.createConn(this.url.protocol, this.url.hostname, Number(this.url.port) ?? undefined, this.caCerts, this.cert, this.key);
// if we get this far we have a connection
tryConnect =
(await this.ctx.handleConnection(conn, this.connectPacket)) &&
this.autoReconnect;
Expand Down
61 changes: 43 additions & 18 deletions dist/server/handlers/handleConnect.js
Original file line number Diff line number Diff line change
@@ -1,38 +1,63 @@
import { AuthenticationResult, logger, PacketType, Timer, } from "../deps.js";
/**
* Checks if the client is authenticated based on the provided credentials
* @param ctx - The connection context
* @param packet - The MQTT CONNECT packet
* @returns Authentication result indicating if the client is authenticated
*/
function isAuthenticated(ctx, packet) {
if (ctx.handlers.isAuthenticated) {
return ctx.handlers.isAuthenticated(ctx, packet.clientId || "", packet.username || "", packet.password || new Uint8Array(0));
}
return AuthenticationResult.ok;
}
/**
* Validates the CONNECT packet
* @param ctx - The connection context
* @param packet - The MQTT CONNECT packet to validate
* @returns Authentication result indicating if the CONNECT packet is valid
*/
function validateConnect(ctx, packet) {
if (packet.protocolLevel !== 4) {
return AuthenticationResult.unacceptableProtocol;
}
return isAuthenticated(ctx, packet);
}
/**
* Processes the validated CONNECT packet
* @param packet - The MQTT CONNECT packet
* @param ctx - The connection context
* @param clientId - The client ID
*/
function processValidatedConnect(packet, ctx, clientId) {
if (packet.will) {
ctx.will = {
type: PacketType.publish,
qos: packet.will.qos,
retain: packet.will.retain,
topic: packet.will.topic,
payload: packet.will.payload,
};
}
ctx.connect(clientId, packet.clean || false);
const keepAlive = packet.keepAlive || 0;
if (keepAlive > 0) {
logger.debug(`Setting keepalive to ${keepAlive * 1500} ms`);
ctx.timer = new Timer(() => {
ctx.close();
}, Math.floor(keepAlive * 1500));
}
}
/**
* Handles the MQTT CONNECT packet
* @param ctx - The connection context
* @param packet - The MQTT CONNECT packet to handle
*/
export function handleConnect(ctx, packet) {
const clientId = packet.clientId || `Opifex-${crypto.randomUUID()}`;
const returnCode = validateConnect(ctx, packet);
// connect is ok
if (returnCode === AuthenticationResult.ok) {
if (packet.will) {
ctx.will = {
type: PacketType.publish,
qos: packet.will.qos,
retain: packet.will.retain,
topic: packet.will.topic,
payload: packet.will.payload,
};
}
ctx.connect(clientId, packet.clean || false);
const keepAlive = packet.keepAlive || 0;
if (keepAlive > 0) {
logger.debug(`Setting keepalive to ${keepAlive * 1500} ms`);
ctx.timer = new Timer(() => {
ctx.close();
}, Math.floor(keepAlive * 1500));
}
processValidatedConnect(packet, ctx, clientId);
}
const sessionPresent = false;
ctx.send({
Expand Down
5 changes: 5 additions & 0 deletions dist/server/handlers/handleDisconnect.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/**
* Handles client disconnection by clearing the will message and closing the connection
* @param {Context} ctx - The connection context object
* @returns {void}
*/
export function handleDisconnect(ctx) {
ctx.will = undefined;
ctx.close();
Expand Down
7 changes: 7 additions & 0 deletions dist/server/handlers/handlePacket.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ import { handleSubscribe } from "./handleSubscribe.js";
import { handleUnsubscribe } from "./handleUnsubscribe.js";
import { handleDisconnect } from "./handleDisconnect.js";
import { logger } from "../deps.js";
/**
* Handles incoming MQTT packets based on their type and connection state
* @param ctx - The connection context containing client state and configuration
* @param packet - The MQTT packet to handle
* @throws Error if receiving unexpected packet types or packets before connect
* @returns Promise that resolves when packet handling is complete
*/
export async function handlePacket(ctx, packet) {
logger.debug("handling", PacketNameByType[packet.type]);
logger.debug(JSON.stringify(packet, null, 2));
Expand Down
5 changes: 5 additions & 0 deletions dist/server/handlers/handlePingreq.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import { PacketType } from "../deps.js";
/**
* Handles PINGREQ packet by responding with a PINGRESP packet
* @param ctx - The connection context containing send method
* @returns Promise that resolves when PINGRESP is sent
*/
export async function handlePingreq(ctx) {
await ctx.send({
type: PacketType.pingres,
Expand Down
10 changes: 9 additions & 1 deletion dist/server/handlers/handlePuback.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
// A PUBACK Packet is the response to a PUBLISH Packet with QoS level 1.
/**
* Handles PUBACK (Publish Acknowledgment) packets in MQTT protocol
* @param ctx - The connection context containing the client's state and configuration
* @param packet - The PUBACK packet received from the client
* @description
* PUBACK packets are sent in response to PUBLISH packets with QoS level 1.
* This function removes the original PUBLISH packet from the pending outgoing messages store
* once acknowledgment is received.
*/
export function handlePuback(ctx, packet) {
// qos 1 only
const id = packet.id;
Expand Down
10 changes: 8 additions & 2 deletions dist/server/handlers/handlePubcomp.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
// The PUBCOMP Packet is the response to a PUBREL Packet.
// It is the fourth and final packet of the QoS 2 protocol exchange.
/**
* Handles PUBCOMP packets which are the response to PUBREL packets in QoS 2 flow
* @param ctx - The connection context containing the client state and configuration
* @param packet - The PUBCOMP packet received from the client
* @description
* This is the fourth and final packet of the QoS 2 protocol exchange.
* When received, it removes the message from the pendingAckOutgoing store.
*/
export function handlePubcomp(ctx, packet) {
const id = packet.id;
if (ctx.store?.pendingAckOutgoing.has(id)) {
Expand Down
13 changes: 13 additions & 0 deletions dist/server/handlers/handlePublish.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import { SysPrefix } from "../context.js";
import { PacketType } from "../deps.js";
/**
* Checks if a client is authorized to publish to a given topic
* @param ctx - The connection context
* @param topic - The topic to check authorization for
* @returns boolean indicating if client is authorized to publish
*/
function authorizedToPublish(ctx, topic) {
if (topic.startsWith(SysPrefix)) {
return false;
Expand All @@ -9,6 +15,13 @@ function authorizedToPublish(ctx, topic) {
}
return true;
}
/**
* Handles MQTT PUBLISH packets
* @param ctx - The connection context
* @param packet - The PUBLISH packet to process
* @returns Promise that resolves when packet is processed
* @throws Error if packet processing fails
*/
export async function handlePublish(ctx, packet) {
if (!authorizedToPublish(ctx, packet.topic)) {
return;
Expand Down
14 changes: 11 additions & 3 deletions dist/server/handlers/handlePubrec.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import { PacketType } from "../deps.js";
// qos 2
// Discard message, Store PUBREC received <Packet Identifier>
// send PUBREL <Packet Identifier>
/**
* Handles PUBREC (QoS 2 Publish Received) packets
* @param ctx - The connection context
* @param packet - The PUBREC packet received from the client
* @returns Promise that resolves when handling is complete
* @description
* For QoS 2 message flow:
* 1. Discards the original publish message
* 2. Stores that PUBREC was received for the packet ID
* 3. Sends PUBREL packet in response
*/
export async function handlePubrec(ctx, packet) {
const id = packet.id;
if (ctx.store?.pendingOutgoing.has(id)) {
Expand Down
15 changes: 12 additions & 3 deletions dist/server/handlers/handlePubrel.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
import { PacketType } from "../deps.js";
// qos 2 only
// Method A, Initiate onward delivery of the Application Message1 then discard message
// Send PUBCOMP <Packet Identifier>
/**
* Handles PUBREL (QoS 2 publish release) packets
*
* @param ctx - The connection context
* @param packet - The PUBREL packet received from the client
* @returns Promise that resolves when handling is complete
* @description
* For QoS 2 message delivery:
* 1. Initiates onward delivery of the Application Message
* 2. Discards the stored message
* 3. Sends PUBCOMP packet with the Packet Identifier
*/
export async function handlePubrel(ctx, packet) {
const id = packet.id;
if (ctx.store?.pendingIncoming.has(id)) {
Expand Down
31 changes: 27 additions & 4 deletions dist/server/handlers/handleSubscribe.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,35 @@
import { PacketType, } from "../deps.js";
/**
* @constant {number} SubscriptionFailure
* @description Code indicating a failed subscription attempt
*/
const SubscriptionFailure = 0x80;
/**
* Checks if a client is authorized to subscribe to a topic
* @param ctx - The connection context
* @param topicFilter - The topic filter to check authorization for
* @returns True if authorized, false otherwise
*/
function authorizedToSubscribe(ctx, topicFilter) {
if (ctx.handlers.isAuthorizedToSubscribe) {
return ctx.handlers.isAuthorizedToSubscribe(ctx, topicFilter);
}
return true;
}
/**
* @function handleSubscribe
* @description Processes an MQTT SUBSCRIBE packet
* @param {Context} ctx - The connection context
* @param {SubscribePacket} packet - The SUBSCRIBE packet received from the client
* @returns {Promise<void>}
* @throws {Error} If subscription processing fails
* @remarks The order of return codes in the SUBACK Packet MUST match the order of Topic Filters in the SUBSCRIBE Packet [MQTT-3.9.3-1]
*/
export async function handleSubscribe(ctx, packet) {
// The order of return codes in the SUBACK Packet MUST match the order of
// Topic Filters in the SUBSCRIBE Packet [MQTT-3.9.3-1].
/*
* The order of return codes in the SUBACK Packet MUST match the order of
* Topic Filters in the SUBSCRIBE Packet [MQTT-3.9.3-1].
*/
const validSubscriptions = [];
const returnCodes = packet.subscriptions.map((sub) => {
if (ctx.store) {
Expand All @@ -19,14 +40,16 @@ export async function handleSubscribe(ctx, packet) {
validSubscriptions.push(sub);
return sub.qos;
}
return SubscriptionFailure; // failure
return SubscriptionFailure;
});
await ctx.send({
type: PacketType.suback,
id: packet.id,
returnCodes: returnCodes,
});
// send any retained messages that match these subscriptions
/*
* send any retained messages that match these subscriptions
*/
if (ctx.store) {
ctx.persistence.handleRetained(ctx.store.clientId);
}
Expand Down
6 changes: 6 additions & 0 deletions dist/server/handlers/handleUnsubscribe.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import { PacketType } from "../deps.js";
/**
* Handles MQTT unsubscribe packets by removing subscriptions and sending acknowledgement
* @param ctx - The connection context containing client information and methods
* @param packet - The MQTT unsubscribe packet containing topics to unsubscribe from
* @returns Promise that resolves when unsubscribe is complete and acknowledged
*/
export async function handleUnsubscribe(ctx, packet) {
for (const topic of packet.topicFilters) {
if (ctx.store) {
Expand Down
72 changes: 50 additions & 22 deletions server/handlers/handleConnect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ import {
Timer,
} from "../deps.ts";

/**
* Checks if the client is authenticated based on the provided credentials
* @param ctx - The connection context
* @param packet - The MQTT CONNECT packet
* @returns Authentication result indicating if the client is authenticated
*/
function isAuthenticated(
ctx: Context,
packet: ConnectPacket,
Expand All @@ -23,6 +29,12 @@ function isAuthenticated(
return AuthenticationResult.ok;
}

/**
* Validates the CONNECT packet
* @param ctx - The connection context
* @param packet - The MQTT CONNECT packet to validate
* @returns Authentication result indicating if the CONNECT packet is valid
*/
function validateConnect(
ctx: Context,
packet: ConnectPacket,
Expand All @@ -33,34 +45,50 @@ function validateConnect(
return isAuthenticated(ctx, packet);
}

/**
* Processes the validated CONNECT packet
* @param packet - The MQTT CONNECT packet
* @param ctx - The connection context
* @param clientId - The client ID
*/
function processValidatedConnect(
packet: ConnectPacket,
ctx: Context,
clientId: string,
) {
if (packet.will) {
ctx.will = {
type: PacketType.publish,
qos: packet.will.qos,
retain: packet.will.retain,
topic: packet.will.topic,
payload: packet.will.payload,
};
}

ctx.connect(clientId, packet.clean || false);

const keepAlive = packet.keepAlive || 0;
if (keepAlive > 0) {
logger.debug(`Setting keepalive to ${keepAlive * 1500} ms`);
ctx.timer = new Timer(() => {
ctx.close();
}, Math.floor(keepAlive * 1500));
}
}

/**
* Handles the MQTT CONNECT packet
* @param ctx - The connection context
* @param packet - The MQTT CONNECT packet to handle
*/
export function handleConnect(ctx: Context, packet: ConnectPacket): void {
const clientId = packet.clientId || `Opifex-${crypto.randomUUID()}`;
const returnCode = validateConnect(ctx, packet);
// connect is ok
if (returnCode === AuthenticationResult.ok) {
if (packet.will) {
ctx.will = {
type: PacketType.publish,
qos: packet.will.qos,
retain: packet.will.retain,
topic: packet.will.topic,
payload: packet.will.payload,
};
}

ctx.connect(clientId, packet.clean || false);

const keepAlive = packet.keepAlive || 0;
if (keepAlive > 0) {
logger.debug(`Setting keepalive to ${keepAlive * 1500} ms`);
ctx.timer = new Timer(() => {
ctx.close();
}, Math.floor(keepAlive * 1500));
}
processValidatedConnect(packet, ctx, clientId);
}

const sessionPresent = false;

ctx.send({
type: PacketType.connack,
sessionPresent,
Expand Down
Loading

0 comments on commit 3d10632

Please sign in to comment.