Skip to content

Commit

Permalink
Fixes:
Browse files Browse the repository at this point in the history
- Properly account for *received* fetch messages/bytes
- Fetch status set to NATS_CONNECTION_CLOSED if fetch is terminated by a disconnect
- Auto-Unsubscribe when the (pull async) subscription reaches the end of life(*)
- Handle errors from sending fetch requests(*)
- Disallow MaxBytes and KeepAhead simultaneously so we can set MaxBytes on subsequent requests accurately
- Unrelated: _unsubscribe should not stop timers for max>0
  • Loading branch information
levb committed Aug 23, 2024
1 parent 5683c02 commit 74dce5c
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 93 deletions.
111 changes: 85 additions & 26 deletions src/dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,23 @@ natsSub_enqueueUserMessage(natsSubscription *sub, natsMsg *msg)
if (newBytes > sub->bytesMax)
sub->bytesMax = newBytes;

if ((sub->jsi != NULL) && sub->jsi->ackNone)
natsMsg_setAcked(msg);
if (sub->jsi != NULL)
{
if (sub->jsi->ackNone)
natsMsg_setAcked(msg);

if (sub->jsi->fetch != NULL)
{
// Just a quick check to see if this is a user message, ignore everything else.
bool isUserMessage = false;
js_checkFetchedMsg(sub, msg, 0, false, &isUserMessage);
if (isUserMessage)
{
sub->jsi->fetch->receivedMsgs++;
sub->jsi->fetch->receivedBytes += natsMsg_dataAndHdrLen(msg);
}
}
}

// Update the subscription stats if separate, the queue stats will be
// updated below.
Expand Down Expand Up @@ -148,7 +163,6 @@ _preProcessUserMessage(
}
}
*overLimit = (*overLimit || overMaxFetch || overMaxBytes);
*lastMessageInSub = (*lastMessageInSub || *lastMessageInFetch);
}

if (!*overLimit)
Expand Down Expand Up @@ -209,6 +223,8 @@ nats_dispatchThreadPool(void *arg)
natsOnCompleteCB completeCB = sub->onCompleteCB;
void *completeCBClosure = sub->onCompleteCBClosure;
natsSubscriptionControlMessages *ctrl = sub->control;
bool draining = sub->draining;
bool connClosed = sub->connClosed;

fetchStatus = _preProcessUserMessage(
sub, jsi, fetch, msg,
Expand Down Expand Up @@ -244,11 +260,20 @@ nats_dispatchThreadPool(void *arg)
else if (msg == ctrl->sub.close)
{
nats_unlockDispatcher(d);

// Call this in case the subscription was draining.
natsSub_setDrainCompleteState(sub);

// It's ok to access fetch->status without locking since it's only
// modified in this thread. completeCB and completeCBClosure are
// also safe to access.
if ((fetch != NULL) && (fetch->completeCB != NULL))
fetch->completeCB(nc, sub, fetch->status, fetch->completeCBClosure);
{
natsStatus fetchStatus = fetch->status;
if ((fetchStatus == NATS_OK) && connClosed)
fetchStatus = NATS_CONNECTION_CLOSED;
(*fetch->completeCB)(nc, sub, fetchStatus, fetch->completeCBClosure);
}

if (completeCB != NULL)
(*completeCB)(completeCBClosure);
Expand Down Expand Up @@ -283,7 +308,9 @@ nats_dispatchThreadPool(void *arg)
else if ((fetchStatus != NATS_OK) && !lastMessageInFetch)
{
// Finalize the fetch and the sub now. Need to store the fetch
// status, will call the user callback on close message.
// status, will call the user callback on close message. Override
// any prior (fetch request error?) value, since this is an explicit
// termination event.
fetch->status = fetchStatus;

// TODO: future: options for handling missed heartbeat, for now
Expand All @@ -292,8 +319,7 @@ nats_dispatchThreadPool(void *arg)

// Call this blindly, it will be a no-op if the subscription
// was not draining.
natsSub_setDrainCompleteState(sub);
natsConn_removeSubscription(nc, sub);
natsSubscription_Unsubscribe(sub);
natsMsg_Destroy(msg); // may be an actual headers-only message
nats_lockDispatcher(d);
continue;
Expand Down Expand Up @@ -340,29 +366,41 @@ nats_dispatchThreadPool(void *arg)

nats_unlockDispatcher(d);

// If we are fetching, see if we need to ask the server for more.
if (fetch != NULL)
js_maybeFetchMore(sub, fetch);

if (!overLimit)
(*messageCB)(nc, sub, msg, messageClosure);
else
natsMsg_Destroy(msg);

Check warning on line 372 in src/dispatch.c

View check run for this annotation

Codecov / codecov/patch

src/dispatch.c#L372

Added line #L372 was not covered by tests

if ((fetch != NULL) && !lastMessageInFetch && !draining)
{
fetch->status = js_maybeFetchMore(sub, fetch);
// If we failed to request more during a fetch, deliver whatever is
// already received.
if (fetch->status != NATS_OK)
natsSubscription_Drain(sub);

Check warning on line 380 in src/dispatch.c

View check run for this annotation

Codecov / codecov/patch

src/dispatch.c#L380

Added line #L380 was not covered by tests
}


if (fcReply != NULL)
{
natsConnection_Publish(nc, fcReply, NULL, 0);
NATS_FREE(fcReply);
}

// If we have reached the sub's message max, we need to remove
// the sub.
if (lastMessageInSub)
if (lastMessageInFetch || lastMessageInSub)
{
// Call this blindly, it will be a no-op if the subscription
// was not draining.
natsSub_setDrainCompleteState(sub);
natsConn_removeSubscription(nc, sub);

// If we have reached the fetch limit, we need to send an
// unsubscribe to the server. Conversely, for the sub limit it has
// already been sent, so we just need to remove the sub from the
// connection's hash.
if (lastMessageInFetch)
natsSubscription_Unsubscribe(sub);
else
natsConn_removeSubscription(nc, sub);
}

nats_lockDispatcher(d);
Expand Down Expand Up @@ -394,8 +432,10 @@ nats_dispatchThreadPool(void *arg)
void
nats_dispatchThreadOwn(void *arg)
{
natsSubscription *sub = (natsSubscription *)arg;
bool rmSub = false;
natsSubscription *sub = (natsSubscription *)arg;
bool rmSub = false;
bool unsub = false;
bool connClosed = false;

// These are set at sub creation time and never change, no need to lock.
natsConnection *nc = sub->conn;
Expand Down Expand Up @@ -433,6 +473,7 @@ nats_dispatchThreadOwn(void *arg)
completeCB = sub->onCompleteCB;
completeCBClosure = sub->onCompleteCBClosure;
jsSub *jsi = sub->jsi;
connClosed = sub->connClosed;

fetch = (jsi != NULL) ? jsi->fetch : NULL;
if (sub->closed)
Expand Down Expand Up @@ -472,7 +513,7 @@ nats_dispatchThreadOwn(void *arg)
fetch->status = fetchStatus;
natsSub_Unlock(sub);
natsMsg_Destroy(msg); // may be an actual headers-only message
rmSub = true;
unsub = true;
break;
}
else if ((fetch != NULL) && (fetchStatus == NATS_OK) && !userMsg)
Expand All @@ -490,38 +531,56 @@ nats_dispatchThreadOwn(void *arg)

natsSub_Unlock(sub);

// If we are fetching, see if we need to ask the server for more.
if (fetch != NULL)
js_maybeFetchMore(sub, fetch);

if (!overLimit)
(*messageCB)(nc, sub, msg, messageClosure);
else
natsMsg_Destroy(msg);

Check warning on line 537 in src/dispatch.c

View check run for this annotation

Codecov / codecov/patch

src/dispatch.c#L537

Added line #L537 was not covered by tests

if ((fetch != NULL) && !lastMessageInFetch && !draining)
{
fetch->status = js_maybeFetchMore(sub, fetch);
// If we failed to request more during a fetch, deliver whatever is
// already received.
if (fetch->status != NATS_OK)
natsSubscription_Drain(sub);

Check warning on line 545 in src/dispatch.c

View check run for this annotation

Codecov / codecov/patch

src/dispatch.c#L545

Added line #L545 was not covered by tests
}

if (fcReply != NULL)
{
natsConnection_Publish(nc, fcReply, NULL, 0);
NATS_FREE(fcReply);
}

if (lastMessageInSub)
if (lastMessageInFetch)
{
// If we have hit the max for delivered msgs, remove sub.
// If we hit the fetch limit, send unsubscribe to the server.
unsub = true;
break;
}
if (lastMessageInSub)
{
// If we have hit the max for delivered msgs, just remove sub.
rmSub = true;
break;
}
}

natsSub_setDrainCompleteState(sub);

if (rmSub)
if (unsub)
natsSubscription_Unsubscribe(sub);
else if (rmSub)
natsConn_removeSubscription(nc, sub);

// It's ok to access fetch->status without locking since it's only modified
// in this thread. completeCB and completeCBClosure are also safe to access.
if ((fetch != NULL) && (fetch->completeCB != NULL))
(*fetch->completeCB)(nc, sub, fetch->status, fetch->completeCBClosure);
{
natsStatus fetchStatus = fetch->status;
if ((fetchStatus == NATS_OK) && connClosed)
fetchStatus = NATS_CONNECTION_CLOSED;
(*fetch->completeCB)(nc, sub, fetchStatus, fetch->completeCBClosure);
}

if (completeCB != NULL)
(*completeCB)(completeCBClosure);
Expand Down
Loading

0 comments on commit 74dce5c

Please sign in to comment.