diff --git a/src/protocol/protocol.ts b/src/protocol/protocol.ts index 9462f36..fe55144 100644 --- a/src/protocol/protocol.ts +++ b/src/protocol/protocol.ts @@ -107,17 +107,37 @@ export class WireProtocol { if (this.#isPendingResponse) return; this.#isPendingResponse = true; while (this.#pendingResponses.size > 0) { - const headerBuffer = await this.#reader.readFull(new Uint8Array(16)); - if (!headerBuffer) throw new MongoDriverError("Invalid response header"); - const header = parseHeader(headerBuffer); - const bodyBuffer = await this.#reader.readFull( - new Uint8Array(header.messageLength - 16), - ); - if (!bodyBuffer) throw new MongoDriverError("Invalid response body"); - const reply = deserializeMessage(header, bodyBuffer); - const pendingMessage = this.#pendingResponses.get(header.responseTo); - this.#pendingResponses.delete(header.responseTo); - pendingMessage?.resolve(reply); + try { + const headerBuffer = await this.#reader.readFull(new Uint8Array(16)); + if (!headerBuffer) { + throw new MongoDriverError("Invalid response header"); + } + const header = parseHeader(headerBuffer); + const pendingMessage = this.#pendingResponses.get(header.responseTo); + try { + const bodyBuffer = await this.#reader.readFull( + new Uint8Array(header.messageLength - 16), + ); + if (!bodyBuffer) throw new MongoDriverError("Invalid response body"); + const reply = deserializeMessage(header, bodyBuffer); + pendingMessage?.resolve(reply); + } catch (error) { + pendingMessage?.reject(error); + } + this.#pendingResponses.delete(header.responseTo); + } catch (error) { + // If an error occurred in the above block, we won't be able to know for + // sure which specific message triggered the error. + // Though since the state appears to be so broken that we can't even + // read the header anymore, it's likely that the connection has + // simply closed. + // We'll just reject all pending messages so that the user can + // handle these themselves. + for (const pendingMessage of this.#pendingResponses.values()) { + pendingMessage.reject(error); + } + this.#pendingResponses.clear(); + } } this.#isPendingResponse = false; }