Skip to content

Commit

Permalink
Merge pull request #123 from danditomaso/fix/ble-read-loop-issue
Browse files Browse the repository at this point in the history
fix: prevent infinite loop in readFromRadio function
  • Loading branch information
Hunter275 authored Jan 22, 2025
2 parents 715e35d + 9ae03a1 commit 5e5385c
Show file tree
Hide file tree
Showing 8 changed files with 293 additions and 83 deletions.
6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
{
"name": "@meshtastic/js",
"version": "2.5.9-2",
"version": "2.5.9-3",
"description": "Browser library for interfacing with meshtastic devices",
"license": "GPL-3.0-only",
"scripts": {
"build": "tsup && pnpm biome format .",
"lint": "pnpm biome check",
"lint:fix": "pnpm biome check --fix",
"format": "pnpm biome format .",
"format:fix": "pnpm biome format . --fix",
"generate:docs": "typedoc src/index.ts"
},
"keywords": [
Expand Down
10 changes: 5 additions & 5 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 25 additions & 26 deletions src/adapters/bleConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,34 +213,33 @@ export class BleConnection extends MeshDevice {
return await Promise.resolve(true);
}

/** Short description */
/**
* Reads data packets from the radio until empty
* @throws Error if reading fails
*/
protected async readFromRadio(): Promise<void> {
// if (this.pendingRead) {
// return Promise.resolve();
// }
// this.pendingRead = true;
let readBuffer = new ArrayBuffer(1);

while (readBuffer.byteLength > 0 && this.fromRadioCharacteristic) {
await this.fromRadioCharacteristic
.readValue()
.then((value) => {
readBuffer = value.buffer;

if (value.byteLength > 0) {
this.handleFromRadio(new Uint8Array(readBuffer));
}
this.updateDeviceStatus(Types.DeviceStatusEnum.DeviceConnected);
})
.catch((e: Error) => {
readBuffer = new ArrayBuffer(0);
this.log.error(
Types.Emitter[Types.Emitter.ReadFromRadio],
`❌ ${e.message}`,
);
});
try {
let hasMoreData = true;
while (hasMoreData && this.fromRadioCharacteristic) {
const value = await this.fromRadioCharacteristic.readValue();

if (value.byteLength === 0) {
hasMoreData = false;
continue;
}

await this.handleFromRadio(new Uint8Array(value.buffer));
this.updateDeviceStatus(Types.DeviceStatusEnum.DeviceConnected);
}
} catch (error) {
this.log.error(
Types.Emitter[Types.Emitter.ReadFromRadio],
`❌ ${error instanceof Error ? error.message : "Unknown error"}`,
);
throw error; // Re-throw to let caller handle
} finally {
// this.pendingRead = false;
}
// this.pendingRead = false;
}

/**
Expand Down
234 changes: 190 additions & 44 deletions src/adapters/httpConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ export class HttpConnection extends MeshDevice {

private abortController: AbortController;

private readonly defaultRetryConfig: Types.HttpRetryConfig = {
maxRetries: 3,
initialDelayMs: 1000,
maxDelayMs: 10000,
backoffFactor: 2,
};

constructor(configId?: number) {
super(configId);

Expand All @@ -37,6 +44,105 @@ export class HttpConnection extends MeshDevice {
);
}

/**
* Checks if the error should trigger a retry attempt
* @param response - The fetch response
* @returns boolean indicating if should retry
*/
private shouldRetry(response: Response): boolean {
if (response.status >= 500 && response.status <= 599) {
return true;
}

if (!response.ok && response.status < 400) {
return true;
}

return false;
}

/**
* Implements exponential backoff retry logic for HTTP operations
* @param operation - The async operation to retry
* @param retryConfig - Configuration for retry behavior
* @param operationName - Name of the operation for logging
*/
private async withRetry(
operation: () => Promise<Response>,
retryConfig: Types.HttpRetryConfig,
operationName: string,
): Promise<Response> {
let delay = retryConfig.initialDelayMs;

for (let attempt = 1; attempt <= retryConfig.maxRetries; attempt++) {
try {
const response = await operation();

// If the response is success or a non-retryable error, return it
if (!this.shouldRetry(response)) {
return response;
}

const error = new Error(
`HTTP ${response.status}: ${response.statusText}`,
);

if (attempt === retryConfig.maxRetries) {
throw error;
}

this.log.warn(
`${operationName} failed (attempt ${attempt}/${retryConfig.maxRetries}): ${error.message}`,
);

await new Promise((resolve) => setTimeout(resolve, delay));
delay = Math.min(
delay * retryConfig.backoffFactor,
retryConfig.maxDelayMs,
);
} catch (error) {
// If it's not a Response error (e.g., network error), don't retry
if (!(error instanceof Error) || !error.message.startsWith("HTTP")) {
throw error;
}

if (attempt === retryConfig.maxRetries) {
throw error;
}

this.log.warn(
`${operationName} failed (attempt ${attempt}/${retryConfig.maxRetries}): ${error.message}`,
);
}
}

// This line should never be reached due to the error handling above,
throw new Error("Unexpected end of retry loop");
}

/**
* Attempts a single connection to the device
*/
private async attemptConnection(
params: Types.HttpConnectionParameters,
): Promise<Response> {
const { address, tls = false } = params;
this.portId = `${tls ? "https://" : "http://"}${address}`;

// We create a dummy request here just to have a Response object to work with
// The actual connection check is done via ping()
const response = await fetch(`${this.portId}/hotspot-detect.html`, {
signal: this.abortController.signal,
mode: "no-cors",
});

if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}

return response;
}

/**
* Initiates the connect process to a Meshtastic device via HTTP(S)
*/
Expand All @@ -46,45 +152,73 @@ export class HttpConnection extends MeshDevice {
receiveBatchRequests = false,
tls = false,
}: Types.HttpConnectionParameters): Promise<void> {
// Set initial state
this.updateDeviceStatus(Types.DeviceStatusEnum.DeviceConnecting);

this.receiveBatchRequests = receiveBatchRequests;

this.portId = `${tls ? "https://" : "http://"}${address}`;

if (
this.deviceStatus === Types.DeviceStatusEnum.DeviceConnecting &&
(await this.ping())
) {
this.log.debug(
Types.Emitter[Types.Emitter.Connect],
"Ping succeeded, starting configuration and request timer.",
try {
// Attempt connection with retries
await this.withRetry(
() => this.attemptConnection({ address, tls, fetchInterval }),
{
...this.defaultRetryConfig,
maxRetries: 5, // More retries for initial connection
maxDelayMs: 10000, // Max 10s between retries
},
"Connect",
);
this.configure().catch(() => {
// TODO: FIX, workaround for `wantConfigId` not getting acks.
});
this.readLoop = setInterval(() => {
this.readFromRadio().catch((e: Error) => {
this.log.error(

// If connection successful, set up device
if (this.deviceStatus === Types.DeviceStatusEnum.DeviceConnecting) {
this.log.debug(
Types.Emitter[Types.Emitter.Connect],
"Connection succeeded, starting configuration and request timer.",
);

// Start device configuration
await this.configure().catch((error) => {
this.log.warn(
Types.Emitter[Types.Emitter.Connect],
`${e.message}`,
`Configuration warning: ${error.message}`,
);
});
}, fetchInterval);
} else if (
this.deviceStatus !== Types.DeviceStatusEnum.DeviceDisconnected
) {
setTimeout(() => {

if (!this.readLoop) {
this.readLoop = setInterval(async () => {
try {
await this.readFromRadio();
} catch (error) {
if (error instanceof Error) {
this.log.error(
Types.Emitter[Types.Emitter.Connect],
`❌ Read loop error: ${error.message}`,
);
}
}
}, fetchInterval);
}
}
} catch (error) {
if (error instanceof Error) {
this.log.error(
Types.Emitter[Types.Emitter.Connect],
`❌ Connection failed: ${error.message}`,
);
}

// Only attempt reconnection if we haven't been disconnected
if (this.deviceStatus !== Types.DeviceStatusEnum.DeviceDisconnected) {
this.updateDeviceStatus(Types.DeviceStatusEnum.DeviceReconnecting);

this.connect({
address: address,
fetchInterval: fetchInterval,
receiveBatchRequests: receiveBatchRequests,
tls: tls,
address,
fetchInterval,
receiveBatchRequests,
tls,
});
}, 10000);
}
}
}

/** Disconnects from the Meshtastic device */
public disconnect(): void {
this.abortController.abort();
Expand All @@ -95,7 +229,7 @@ export class HttpConnection extends MeshDevice {
}
}

/** Pings device to check if it is avaliable */
/** Pings device to check if it is available with retry logic */
public async ping(): Promise<boolean> {
this.log.debug(
Types.Emitter[Types.Emitter.Ping],
Expand All @@ -104,22 +238,34 @@ export class HttpConnection extends MeshDevice {

const { signal } = this.abortController;

let pingSuccessful = false;
try {
const response = await this.withRetry(
async () => {
return await fetch(`${this.portId}/hotspot-detect.html`, {
signal,
mode: "no-cors",
});
},
{ ...this.defaultRetryConfig },
"Ping",
);

await fetch(`${this.portId}/hotspot-detect.html`, {
signal,
mode: "no-cors",
})
.then(() => {
pingSuccessful = true;
this.updateDeviceStatus(Types.DeviceStatusEnum.DeviceConnected);
})
.catch((e: Error) => {
pingSuccessful = false;
this.log.error(Types.Emitter[Types.Emitter.Ping], `❌ ${e.message}`);
this.updateDeviceStatus(Types.DeviceStatusEnum.DeviceReconnecting);
});
return pingSuccessful;
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}

this.updateDeviceStatus(Types.DeviceStatusEnum.DeviceConnected);
return true;
} catch (error) {
if (error instanceof Error) {
this.log.error(
Types.Emitter[Types.Emitter.Ping],
`❌ ${error.message}`,
);
}
this.updateDeviceStatus(Types.DeviceStatusEnum.DeviceReconnecting);
return false;
}
}

/** Reads any avaliable protobuf messages from the radio */
Expand Down
Loading

0 comments on commit 5e5385c

Please sign in to comment.