diff --git a/package.json b/package.json index 266039c..642ab4c 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "nats", - "version": "2.9.0", + "version": "2.9.1", "description": "Node.js client for NATS, a lightweight, high-performance cloud native messaging system", "keywords": [ "nats", @@ -40,7 +40,7 @@ "build": "tsc", "cjs": "deno run --allow-all bin/cjs-fix-imports.ts -o nats-base-client/ ./.deps/nats.deno/nats-base-client/", "clean": "shx rm -Rf ./lib/* ./nats-base-client ./.deps", - "clone-nbc": "shx mkdir -p ./.deps && cd ./.deps && git clone --branch v1.9.0 https://github.com/nats-io/nats.deno.git", + "clone-nbc": "shx mkdir -p ./.deps && cd ./.deps && git clone --branch v1.9.2 https://github.com/nats-io/nats.deno.git", "fmt": "deno fmt ./src/ ./examples/ ./test/", "prepack": "npm run clone-nbc && npm run cjs && npm run check-package && npm run build", "ava": "nyc ava --verbose -T 60000", diff --git a/src/node_transport.ts b/src/node_transport.ts index 2e2e98d..948064e 100644 --- a/src/node_transport.ts +++ b/src/node_transport.ts @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 The NATS Authors + * Copyright 2020-2022 The NATS Authors * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -34,7 +34,7 @@ const { resolve } = require("path"); const { readFile, existsSync } = require("fs"); const dns = require("dns"); -const VERSION = "2.9.0"; +const VERSION = "2.9.1"; const LANG = "nats.js"; export class NodeTransport implements Transport { diff --git a/test/auth.js b/test/auth.js index 9a5828d..93a0cc7 100644 --- a/test/auth.js +++ b/test/auth.js @@ -301,7 +301,7 @@ test("auth - custom error", async (t) => { ).then(() => { t.fail("shouldn't have connected"); }).catch((err) => { - t.is(err.code, ErrorCode.BadAuthentication); + t.is(err.message, "user code exploded"); }); await ns.stop(); }); diff --git a/test/basics.js b/test/basics.js index c332b5f..8b599ea 100644 --- a/test/basics.js +++ b/test/basics.js @@ -20,12 +20,15 @@ const { StringCodec, Empty, jwtAuthenticator, + AckPolicy, } = require( "../lib/src/mod", ); const net = require("net"); -const { deferred, delay } = require("../lib/nats-base-client/internal_mod"); +const { deferred, delay, nuid } = require( + "../lib/nats-base-client/internal_mod", +); const { Lock } = require("./helpers/lock"); const { NatsServer } = require("./helpers/launcher"); const { jetstreamServerConf } = require("./helpers/jsutil.js"); @@ -788,3 +791,65 @@ test("basics - resolve", async (t) => { t.true(srv.resolves && srv.resolves.length > 1); await nc.close(); }); + +test("basics - js fetch on stopped server doesn't close", async (t) => { + let ns = await NatsServer.start(jetstreamServerConf()); + const nc = await connect({ + port: ns.port, + maxReconnectAttempts: -1, + }); + const status = nc.status(); + (async () => { + let reconnects = 0; + for await (const s of status) { + switch (s.type) { + case "reconnecting": + reconnects++; + if (reconnects === 2) { + ns.restart().then((s) => { + ns = s; + }); + } + break; + case "reconnect": + setTimeout(() => { + loop = false; + }); + break; + default: + // nothing + } + } + })().then(); + + const jsm = await nc.jetstreamManager(); + const si = await jsm.streams.add({ name: nuid.next(), subjects: ["test"] }); + const { name: stream } = si.config; + await jsm.consumers.add(stream, { + durable_name: "dur", + ack_policy: AckPolicy.Explicit, + }); + + const js = nc.jetstream(); + setTimeout(() => { + ns.stop(); + }, 2000); + + let loop = true; + while (true) { + try { + const iter = js.fetch(stream, "dur", { batch: 1, expires: 500 }); + for await (const m of iter) { + m.ack(); + } + if (!loop) { + break; + } + } catch (err) { + t.fail(`shouldn't have errored: ${err.message}`); + } + } + t.pass(); + await nc.close(); + await ns.stop(); +});