diff --git a/src/dispatch.c b/src/dispatch.c index b54de0807..f9eef0b51 100644 --- a/src/dispatch.c +++ b/src/dispatch.c @@ -70,8 +70,23 @@ natsSub_enqueueUserMessage(natsSubscription *sub, natsMsg *msg) if (newBytes > sub->bytesMax) sub->bytesMax = newBytes; - if ((sub->jsi != NULL) && sub->jsi->ackNone) - natsMsg_setAcked(msg); + if (sub->jsi != NULL) + { + if (sub->jsi->ackNone) + natsMsg_setAcked(msg); + + if (sub->jsi->fetch != NULL) + { + // Just a quick check to see if this is a user message, ignore everything else. + bool isUserMessage = false; + js_checkFetchedMsg(sub, msg, 0, false, &isUserMessage); + if (isUserMessage) + { + sub->jsi->fetch->receivedMsgs++; + sub->jsi->fetch->receivedBytes += natsMsg_dataAndHdrLen(msg); + } + } + } // Update the subscription stats if separate, the queue stats will be // updated below. @@ -148,7 +163,6 @@ _preProcessUserMessage( } } *overLimit = (*overLimit || overMaxFetch || overMaxBytes); - *lastMessageInSub = (*lastMessageInSub || *lastMessageInFetch); } if (!*overLimit) @@ -209,6 +223,8 @@ nats_dispatchThreadPool(void *arg) natsOnCompleteCB completeCB = sub->onCompleteCB; void *completeCBClosure = sub->onCompleteCBClosure; natsSubscriptionControlMessages *ctrl = sub->control; + bool draining = sub->draining; + bool connClosed = sub->connClosed; fetchStatus = _preProcessUserMessage( sub, jsi, fetch, msg, @@ -244,11 +260,20 @@ nats_dispatchThreadPool(void *arg) else if (msg == ctrl->sub.close) { nats_unlockDispatcher(d); + // Call this in case the subscription was draining. natsSub_setDrainCompleteState(sub); + // It's ok to access fetch->status without locking since it's only + // modified in this thread. completeCB and completeCBClosure are + // also safe to access. if ((fetch != NULL) && (fetch->completeCB != NULL)) - fetch->completeCB(nc, sub, fetch->status, fetch->completeCBClosure); + { + natsStatus fetchStatus = fetch->status; + if ((fetchStatus == NATS_OK) && connClosed) + fetchStatus = NATS_CONNECTION_CLOSED; + (*fetch->completeCB)(nc, sub, fetchStatus, fetch->completeCBClosure); + } if (completeCB != NULL) (*completeCB)(completeCBClosure); @@ -283,7 +308,9 @@ nats_dispatchThreadPool(void *arg) else if ((fetchStatus != NATS_OK) && !lastMessageInFetch) { // Finalize the fetch and the sub now. Need to store the fetch - // status, will call the user callback on close message. + // status, will call the user callback on close message. Override + // any prior (fetch request error?) value, since this is an explicit + // termination event. fetch->status = fetchStatus; // TODO: future: options for handling missed heartbeat, for now @@ -292,8 +319,7 @@ nats_dispatchThreadPool(void *arg) // Call this blindly, it will be a no-op if the subscription // was not draining. - natsSub_setDrainCompleteState(sub); - natsConn_removeSubscription(nc, sub); + natsSubscription_Unsubscribe(sub); natsMsg_Destroy(msg); // may be an actual headers-only message nats_lockDispatcher(d); continue; @@ -340,29 +366,41 @@ nats_dispatchThreadPool(void *arg) nats_unlockDispatcher(d); - // If we are fetching, see if we need to ask the server for more. - if (fetch != NULL) - js_maybeFetchMore(sub, fetch); - if (!overLimit) (*messageCB)(nc, sub, msg, messageClosure); else natsMsg_Destroy(msg); + if ((fetch != NULL) && !lastMessageInFetch && !draining) + { + fetch->status = js_maybeFetchMore(sub, fetch); + // If we failed to request more during a fetch, deliver whatever is + // already received. + if (fetch->status != NATS_OK) + natsSubscription_Drain(sub); + } + + if (fcReply != NULL) { natsConnection_Publish(nc, fcReply, NULL, 0); NATS_FREE(fcReply); } - // If we have reached the sub's message max, we need to remove - // the sub. - if (lastMessageInSub) + if (lastMessageInFetch || lastMessageInSub) { // Call this blindly, it will be a no-op if the subscription // was not draining. natsSub_setDrainCompleteState(sub); - natsConn_removeSubscription(nc, sub); + + // If we have reached the fetch limit, we need to send an + // unsubscribe to the server. Conversely, for the sub limit it has + // already been sent, so we just need to remove the sub from the + // connection's hash. + if (lastMessageInFetch) + natsSubscription_Unsubscribe(sub); + else + natsConn_removeSubscription(nc, sub); } nats_lockDispatcher(d); @@ -394,8 +432,10 @@ nats_dispatchThreadPool(void *arg) void nats_dispatchThreadOwn(void *arg) { - natsSubscription *sub = (natsSubscription *)arg; - bool rmSub = false; + natsSubscription *sub = (natsSubscription *)arg; + bool rmSub = false; + bool unsub = false; + bool connClosed = false; // These are set at sub creation time and never change, no need to lock. natsConnection *nc = sub->conn; @@ -433,6 +473,7 @@ nats_dispatchThreadOwn(void *arg) completeCB = sub->onCompleteCB; completeCBClosure = sub->onCompleteCBClosure; jsSub *jsi = sub->jsi; + connClosed = sub->connClosed; fetch = (jsi != NULL) ? jsi->fetch : NULL; if (sub->closed) @@ -472,7 +513,7 @@ nats_dispatchThreadOwn(void *arg) fetch->status = fetchStatus; natsSub_Unlock(sub); natsMsg_Destroy(msg); // may be an actual headers-only message - rmSub = true; + unsub = true; break; } else if ((fetch != NULL) && (fetchStatus == NATS_OK) && !userMsg) @@ -490,24 +531,35 @@ nats_dispatchThreadOwn(void *arg) natsSub_Unlock(sub); - // If we are fetching, see if we need to ask the server for more. - if (fetch != NULL) - js_maybeFetchMore(sub, fetch); - if (!overLimit) (*messageCB)(nc, sub, msg, messageClosure); else natsMsg_Destroy(msg); + if ((fetch != NULL) && !lastMessageInFetch && !draining) + { + fetch->status = js_maybeFetchMore(sub, fetch); + // If we failed to request more during a fetch, deliver whatever is + // already received. + if (fetch->status != NATS_OK) + natsSubscription_Drain(sub); + } + if (fcReply != NULL) { natsConnection_Publish(nc, fcReply, NULL, 0); NATS_FREE(fcReply); } - if (lastMessageInSub) + if (lastMessageInFetch) { - // If we have hit the max for delivered msgs, remove sub. + // If we hit the fetch limit, send unsubscribe to the server. + unsub = true; + break; + } + if (lastMessageInSub) + { + // If we have hit the max for delivered msgs, just remove sub. rmSub = true; break; } @@ -515,13 +567,20 @@ nats_dispatchThreadOwn(void *arg) natsSub_setDrainCompleteState(sub); - if (rmSub) + if (unsub) + natsSubscription_Unsubscribe(sub); + else if (rmSub) natsConn_removeSubscription(nc, sub); // It's ok to access fetch->status without locking since it's only modified // in this thread. completeCB and completeCBClosure are also safe to access. if ((fetch != NULL) && (fetch->completeCB != NULL)) - (*fetch->completeCB)(nc, sub, fetch->status, fetch->completeCBClosure); + { + natsStatus fetchStatus = fetch->status; + if ((fetchStatus == NATS_OK) && connClosed) + fetchStatus = NATS_CONNECTION_CLOSED; + (*fetch->completeCB)(nc, sub, fetchStatus, fetch->completeCBClosure); + } if (completeCB != NULL) (*completeCB)(completeCBClosure); diff --git a/src/js.c b/src/js.c index bf2444647..14105917b 100644 --- a/src/js.c +++ b/src/js.c @@ -2903,16 +2903,16 @@ js_PullSubscribe(natsSubscription **sub, jsCtx *js, const char *subject, const c return NATS_UPDATE_ERR_STACK(s); } -// sub->mu must NOT be held. +// Neither sub's nor dispatcher's lock must be held. natsStatus js_maybeFetchMore(natsSubscription *sub, jsFetch *fetch) { - jsFetchRequest req; + jsFetchRequest req = {.Expires = 0}; if (fetch->nextf == NULL) return NATS_OK; // Prepare the next fetch request - if (!fetch->nextf(&req, sub, fetch->nextClosure)) + if (!fetch->nextf(&req.Batch, &req.MaxBytes, sub, fetch->nextClosure)) return NATS_OK; // These are not changeable by the callback, only Batch and MaxBytes can be updated. @@ -2926,7 +2926,7 @@ js_maybeFetchMore(natsSubscription *sub, jsFetch *fetch) natsBuffer buf; natsBuf_InitWithBackend(&buf, buffer, 0, sizeof(buffer)); - natsSub_Lock(sub); + nats_lockSubAndDispatcher(sub); jsSub *jsi = sub->jsi; jsi->inFetch = true; @@ -2934,13 +2934,13 @@ js_maybeFetchMore(natsSubscription *sub, jsFetch *fetch) snprintf(fetch->replySubject, sizeof(fetch->replySubject), "%.*s%" PRIu64, (int)strlen(sub->subject) - 1, sub->subject, // exclude the last '*' jsi->fetchID); - natsStatus s = _sendPullRequest(sub->conn, jsi->nxtMsgSubj, fetch->replySubject, &buf, &req); if (s == NATS_OK) { fetch->requestedMsgs += req.Batch; } - natsSub_Unlock(sub); + + nats_unlockSubAndDispatcher(sub); natsBuf_Destroy(&buf); return NATS_UPDATE_ERR_STACK(s); @@ -2948,7 +2948,7 @@ js_maybeFetchMore(natsSubscription *sub, jsFetch *fetch) // Sets Batch and MaxBytes for the next fetch request. static bool -_autoNextFetchRequest(jsFetchRequest *req, natsSubscription *sub, void *closure) +_autoNextFetchRequest(int *messages, int64_t *maxBytes, natsSubscription *sub, void *closure) { jsFetch *fetch = (jsFetch *)closure; int remainingUnrequested = 0; @@ -2956,7 +2956,7 @@ _autoNextFetchRequest(jsFetchRequest *req, natsSubscription *sub, void *closure) int want = 0; bool maybeMore = true; - natsSub_Lock(sub); + nats_lockSubAndDispatcher(sub); int isAhead = fetch->requestedMsgs - fetch->deliveredMsgs; int wantAhead = fetch->keepAhead; @@ -2987,16 +2987,15 @@ _autoNextFetchRequest(jsFetchRequest *req, natsSubscription *sub, void *closure) maybeMore = (want > 0); } - natsSub_Unlock(sub); + nats_unlockSubAndDispatcher(sub); if (!maybeMore) return false; - req->Batch = want; - // FIXME discuss in PR - this seems wrong, we don't know how many bytes we will have - // received from what is already requested. Still, can serve as a safe - // upper boundary. - req->MaxBytes = remainingBytes; + // Since we do not allow keepAhead with MaxBytes, this is an accurate count + // of how many more bytes we expect. + *maxBytes = remainingBytes; + *messages = want; return true; } @@ -3013,6 +3012,9 @@ js_PullSubscribeAsync(natsSubscription **newsub, jsCtx *js, const char *subject, if ((newsub == NULL) || (msgCB == NULL)) return nats_setDefaultError(NATS_INVALID_ARG); + if ((jsOpts != NULL) && (jsOpts->PullSubscribeAsync.MaxBytes > 0) && (jsOpts->PullSubscribeAsync.KeepAhead > 0)) + return nats_setError(NATS_INVALID_ARG, "%s", "Can not use MaxBytes and KeepAhead together"); + if (errCode != NULL) *errCode = 0; @@ -3026,72 +3028,70 @@ js_PullSubscribeAsync(natsSubscription **newsub, jsCtx *js, const char *subject, if (fetch == NULL) s = nats_setDefaultError(NATS_NO_MEMORY); } + if (s != NATS_OK) + { + natsSubscription_Destroy(sub); + return NATS_UPDATE_ERR_STACK(s); + } // Initialize fetch parameters. - if (s == NATS_OK) - { - fetch->status = NATS_OK; - fetch->startTimeMillis = nats_Now(); + fetch->status = NATS_OK; + fetch->startTimeMillis = nats_Now(); #define _set(_f, _v, _nil, _def) fetch->_f = ((jsOpts != NULL) && (jsOpts->PullSubscribeAsync._v != _nil)) ? jsOpts->PullSubscribeAsync._v : _def - _set(completeCB, CompleteHandler, NULL, NULL); - _set(completeCBClosure, CompleteHandlerClosure, NULL, NULL); - _set(fetchSize, FetchSize, 0, NATS_DEFAULT_ASYNC_FETCH_SIZE); - _set(heartbeatMillis, HeartbeatMillis, 0, 0); - _set(keepAhead, KeepAhead, 0, 0); - _set(maxBytes, MaxBytes, 0, 0); - _set(maxMessages, MaxMessages, 0, INT_MAX); - _set(nextClosure, NextHandlerClosure, NULL, fetch); - _set(nextf, NextHandler, NULL, _autoNextFetchRequest); - _set(noWait, NoWait, false, false); - _set(timeoutMillis, TimeoutMillis, 0, INT64_MAX); + _set(completeCB, CompleteHandler, NULL, NULL); + _set(completeCBClosure, CompleteHandlerClosure, NULL, NULL); + _set(fetchSize, FetchSize, 0, NATS_DEFAULT_ASYNC_FETCH_SIZE); + _set(heartbeatMillis, HeartbeatMillis, 0, 0); + _set(keepAhead, KeepAhead, 0, 0); + _set(maxBytes, MaxBytes, 0, 0); + _set(maxMessages, MaxMessages, 0, INT_MAX); + _set(nextClosure, NextHandlerClosure, NULL, fetch); + _set(nextf, NextHandler, NULL, _autoNextFetchRequest); + _set(noWait, NoWait, false, false); + _set(timeoutMillis, TimeoutMillis, 0, INT64_MAX); #undef _set - } - // Set up the sub to process fetch results. - if (s == NATS_OK) - { - natsSub_Lock(sub); - jsi = sub->jsi; + nats_lockSubAndDispatcher(sub); + jsi = sub->jsi; - // Set up the fetch options - jsi->fetch = fetch; - jsi->inFetch = true; + // Set up the fetch options + jsi->fetch = fetch; + jsi->inFetch = true; + + // Start the timers. They will live for the entire length of the + // subscription (the missed heartbeat timer may be reset as needed). + if (fetch->timeoutMillis > 0) + { + sub->refs++; + s = natsTimer_Create(&fetch->expiresTimer, _fetchExpiredFired, _releaseSubWhenStopped, + fetch->timeoutMillis, (void *)sub); + if (s != NATS_OK) + sub->refs--; + } - // Start the timers. They will live for the entire length of the - // subscription (the missed heartbeat timer may be reset as needed). - if (fetch->timeoutMillis > 0) + if ((s == NATS_OK) && (fetch->heartbeatMillis > 0)) + { + int64_t dur = fetch->heartbeatMillis * 2; + sub->refs++; + if (jsi->hbTimer == NULL) { - sub->refs++; - s = natsTimer_Create(&fetch->expiresTimer, _fetchExpiredFired, _releaseSubWhenStopped, - fetch->timeoutMillis, (void *)sub); + s = natsTimer_Create(&jsi->hbTimer, _hbTimerFired, _releaseSubWhenStopped, dur, (void *)sub); if (s != NATS_OK) sub->refs--; } - - if ((s == NATS_OK) && (fetch->heartbeatMillis > 0)) - { - int64_t dur = fetch->heartbeatMillis * 2; - sub->refs++; - if (jsi->hbTimer == NULL) - { - s = natsTimer_Create(&jsi->hbTimer, _hbTimerFired, _releaseSubWhenStopped, dur, (void *)sub); - if (s != NATS_OK) - sub->refs--; - } - else - natsTimer_Reset(jsi->hbTimer, dur); - } - - natsSub_Unlock(sub); + else + natsTimer_Reset(jsi->hbTimer, dur); } if (s == NATS_OK) { - // Send the first fetch request + // Send the first fetch request. s = js_maybeFetchMore(sub, fetch); } + nats_unlockSubAndDispatcher(sub); + if (s != NATS_OK) { natsSubscription_Destroy(sub); diff --git a/src/nats.h b/src/nats.h index cae4ca887..7ed25dd69 100644 --- a/src/nats.h +++ b/src/nats.h @@ -1234,13 +1234,12 @@ typedef void (*natsFetchCompleteHandler)(natsConnection *nc, natsSubscription *s * The library will invoke this callback when it may be time to request more * messages from the server. * - * @return true to fetch more, false to skip. If true, req's attributes can be - * overridden as needed. + * @return true to fetch more, false to skip. If true, @messages and @maxBytes + * should be set to the number of messages and max bytes to fetch. * * @see js_PullSubscribeAsync */ -typedef bool (*natsFetchNextHandler)(jsFetchRequest *req, - natsSubscription *sub, void *closure); +typedef bool (*natsFetchNextHandler)(int *messages, int64_t *bytes, natsSubscription *sub, void *closure); /** * JetStream context options. diff --git a/src/sub.c b/src/sub.c index 8f1f0fb98..d5ffe33e9 100644 --- a/src/sub.c +++ b/src/sub.c @@ -744,7 +744,7 @@ _unsubscribe(natsSubscription *sub, int max, bool drainMode, int64_t timeout) nc = sub->conn; _retain(sub); - if ((jsi = sub->jsi) != NULL) + if ((max == 0) && (jsi = sub->jsi) != NULL) { if (jsi->hbTimer != NULL) natsTimer_Stop(jsi->hbTimer); diff --git a/test/test.c b/test/test.c index e3448369c..908507073 100644 --- a/test/test.c +++ b/test/test.c @@ -29525,7 +29525,7 @@ void test_JetStreamSubscribePullAsync_Disconnect(void) testCond(s == NATS_OK); test("Check fetch completion, expect NATS_OK: "); - testCond(_testBatchCompleted(&args, sub, 500, NATS_OK, 1, false)); + testCond(_testBatchCompleted(&args, sub, 500, NATS_CONNECTION_CLOSED, 1, false)); natsSubscription_Destroy(sub); JS_TEARDOWN;