Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Added] natsConnection_Reconnect #756

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 52 additions & 27 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,14 @@
static void
_close(natsConnection *nc, natsConnStatus status, bool fromPublicClose, bool doCBs);

static bool
_processOpError(natsConnection *nc, natsStatus s, bool initialConnect);
static natsStatus
_tryReconnect(natsConnection *nc, natsStatus s, bool forcedReconnect, bool *started);

static void
_maybeReconnect(natsConnection *nc, natsStatus s) { _tryReconnect(nc, s, false, NULL); }

static natsStatus
_forceReconnect(natsConnection *nc, natsStatus s, bool *started) { return _tryReconnect(nc, s, true, started); }

static natsStatus
_flushTimeout(natsConnection *nc, int64_t timeout);
Expand Down Expand Up @@ -2102,7 +2108,9 @@
{
natsConn_Unlock(nc);

if (_processOpError(nc, retSts, true))
bool reconnectStarted = false;
s = _forceReconnect(nc, retSts, &reconnectStarted);
if ((s == NATS_OK) && reconnectStarted)
{
nats_clearLastError();
return NATS_NOT_YET_CONNECTED;
Expand Down Expand Up @@ -2134,29 +2142,30 @@
return s;
}

// _processOpError handles errors from reading or parsing the protocol.
// _tryReconnect handles errors from reading or parsing the protocol, or forced
// reconnection. It will fire off a doReconnect thread if needed.
// The lock should not be held entering this function.
static bool
_processOpError(natsConnection *nc, natsStatus s, bool initialConnect)
static natsStatus
_tryReconnect(natsConnection *nc, natsStatus newErr, bool forcedReconnect, bool *started)
{
natsStatus s = NATS_OK;

natsConn_Lock(nc);

if (!initialConnect)
if (!forcedReconnect)
{
if (_isConnecting(nc) || natsConn_isClosed(nc) || (nc->inReconnect > 0))
{
natsConn_Unlock(nc);

return false;
return NATS_OK;
}
}

// Do reconnect only if allowed and we were actually connected
// or if we are retrying on initial failed connect.
if (initialConnect || (nc->opts->allowReconnect && (nc->status == NATS_CONN_STATUS_CONNECTED)))
if (forcedReconnect || (nc->opts->allowReconnect && (nc->status == NATS_CONN_STATUS_CONNECTED)))
{
natsStatus ls = NATS_OK;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a reason we used a different status here. What we wanted to report as the error was the error that was given to processOpError(), not an internal one that we may encounter trying to setup the reconnect.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, and I think my code still does that (newErr), but in the case of a natsConnection_Reconnect failing on a to fire off doReconnect we want to return the actual failure, right? (like, out of memory for instance)


// Set our new status
nc->status = NATS_CONN_STATUS_RECONNECTING;

Expand All @@ -2176,7 +2185,7 @@
// on the socket since we are going to reconnect.
if (nc->el.attached)
{
ls = _evStopPolling(nc);
s = _evStopPolling(nc);
natsSock_Close(nc->sockCtx.fd);
nc->sockCtx.fd = NATS_SOCK_INVALID;

Expand All @@ -2185,46 +2194,50 @@
}

// Fail pending flush requests.
if (ls == NATS_OK)
if (s == NATS_OK)
_clearPendingFlushRequests(nc);
// If option set, also fail pending requests.
if ((ls == NATS_OK) && nc->opts->failRequestsOnDisconnect)
if ((s == NATS_OK) && nc->opts->failRequestsOnDisconnect)
_clearPendingRequestCalls(nc, NATS_CONNECTION_DISCONNECTED);

// Create the pending buffer to hold all write requests while we try
// to reconnect.
if (ls == NATS_OK)
ls = natsBuf_Create(&(nc->pending), nc->opts->reconnectBufSize);
if (ls == NATS_OK)
IFOK (s, natsBuf_Create(&(nc->pending), nc->opts->reconnectBufSize));
if (s == NATS_OK)
{
nc->usePending = true;

// Start the reconnect thread
ls = natsThread_Create(&(nc->reconnectThread),
s = natsThread_Create(&(nc->reconnectThread),
_doReconnect, (void*) nc);
}
if (ls == NATS_OK)
if (s == NATS_OK)
{
// We created the reconnect thread successfully, so retain
// the connection.
_retain(nc);
nc->inReconnect++;
natsConn_Unlock(nc);

return true;
if (started != NULL)
*started = true;

return NATS_OK;
}
}

// reconnect not allowed or we failed to setup the reconnect code.

if (started != NULL)
*started = false;

Check warning on line 2232 in src/conn.c

View check run for this annotation

Codecov / codecov/patch

src/conn.c#L2232

Added line #L2232 was not covered by tests
nc->status = NATS_CONN_STATUS_DISCONNECTED;
nc->err = s;
nc->err = newErr;

natsConn_Unlock(nc);

_close(nc, NATS_CONN_STATUS_CLOSED, false, true);

return false;
return NATS_UPDATE_ERR_STACK(s);

Check warning on line 2240 in src/conn.c

View check run for this annotation

Codecov / codecov/patch

src/conn.c#L2240

Added line #L2240 was not covered by tests
}

static void
Expand Down Expand Up @@ -2267,7 +2280,7 @@
s = natsParser_Parse(nc, buffer, n);

if (s != NATS_OK)
_processOpError(nc, s, false);
_maybeReconnect(nc, s);

natsConn_Lock(nc);
}
Expand Down Expand Up @@ -2396,7 +2409,7 @@
if (++(nc->pout) > nc->opts->maxPingsOut)
{
natsConn_Unlock(nc);
_processOpError(nc, NATS_STALE_CONNECTION, false);
_maybeReconnect(nc, NATS_STALE_CONNECTION);
return;
}

Expand Down Expand Up @@ -2921,7 +2934,7 @@

if (strcasecmp(error, STALE_CONNECTION) == 0)
{
_processOpError(nc, NATS_STALE_CONNECTION, false);
_maybeReconnect(nc, NATS_STALE_CONNECTION);
}
else if (nats_strcasestr(error, PERMISSIONS_ERR) != NULL)
{
Expand Down Expand Up @@ -3333,6 +3346,18 @@
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
natsConnection_Reconnect(natsConnection *nc)
{
natsStatus s = NATS_OK;

if (natsConnection_IsClosed(nc))
return nats_setDefaultError(NATS_INVALID_ARG);

Check warning on line 3355 in src/conn.c

View check run for this annotation

Codecov / codecov/patch

src/conn.c#L3355

Added line #L3355 was not covered by tests

IFOK(s, _forceReconnect(nc, NATS_OK, NULL));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need for the IFOK here since there is no change of s in lines above.

But overall, I think these changes are way more complicated than it needs to be (or I am missing something). If we want to cause a reconnect, locking the connection and and if the socket is valid, closing it should be enough, no?

Of course, verification that the connection is set in such a way that it allows for reconnection (I don't recall if there are options that would totally prevent a reconnect) before doing so, and if not, returning an error indication that reconnect() failed because the connection would not reconnect because of options setting, would be best.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

way more complicated than it needs to be (or I am missing something). If we want to cause a reconnect, locking the connection and and if the socket is valid, closing it should be enough, no?

@kozlovic you're likely right, I was modeling after the go changes that call an existing re-connect method, and I figured this to be the closest. I'll try what you suggested, in a separate branch.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #757

return NATS_UPDATE_ERR_STACK(s);

Check warning on line 3358 in src/conn.c

View check run for this annotation

Codecov / codecov/patch

src/conn.c#L3358

Added line #L3358 was not covered by tests
}

static natsStatus
_processUrlString(natsOptions *opts, const char *urls)
{
Expand Down Expand Up @@ -4117,7 +4142,7 @@
s = natsParser_Parse(nc, buffer, n);

if (s != NATS_OK)
_processOpError(nc, s, false);
_maybeReconnect(nc, s);

natsConn_release(nc);
}
Expand Down Expand Up @@ -4166,7 +4191,7 @@
natsConn_Unlock(nc);

if (s != NATS_OK)
_processOpError(nc, s, false);
_maybeReconnect(nc, s);

Check warning on line 4194 in src/conn.c

View check run for this annotation

Codecov / codecov/patch

src/conn.c#L4194

Added line #L4194 was not covered by tests

(void) NATS_UPDATE_ERR_STACK(s);
}
Expand Down
12 changes: 12 additions & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -4033,6 +4033,18 @@ stanMsg_Destroy(stanMsg *msg);
NATS_EXTERN natsStatus
natsConnection_Connect(natsConnection **nc, natsOptions *options);

/** \brief Causes the client to drop the connection to the current server and
* perform standard reconnection process.
*
* This means that all subscriptions and consumers should be resubscribed and
* their work resumed after successful reconnect where all reconnect options are
* respected.
*
* @param nc the pointer to the #natsConnection object.
*/
natsStatus
natsConnection_Reconnect(natsConnection *nc);

/** \brief Process a read event when using external event loop.
*
* When using an external event loop, and the callback indicating that
Expand Down
1 change: 1 addition & 0 deletions test/list.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ ReconnectBufSize
RetryOnFailedConnect
NoPartialOnReconnect
ReconnectFailsPendingRequests
ForcedReconnect
ErrOnConnectAndDeadlock
ErrOnMaxPayloadLimit
Auth
Expand Down
69 changes: 69 additions & 0 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -20202,6 +20202,74 @@ test_NoPartialOnReconnect(void)
_stopServer(pid);
}

static void
test_ForcedReconnect(void)
{
natsStatus s;
struct threadArg arg;
natsOptions *opts = NULL;
natsConnection *nc = NULL;
natsSubscription *sub = NULL;
natsMsg *msg = NULL;
natsPid pid = NATS_INVALID_PID;

s = _createDefaultThreadArgsForCbTests(&arg);
if (s != NATS_OK)
FAIL("unable to setup test");

test("Start server, connect, subscribe: ");
pid = _startServer("nats://127.0.0.1:4222", "-p 4222", true);
CHECK_SERVER_STARTED(pid);
IFOK(s, natsOptions_Create(&opts));
IFOK(s, natsOptions_SetReconnectedCB(opts, _reconnectedCb, &arg));
IFOK(s, natsConnection_Connect(&nc, opts));
IFOK(s, natsConnection_SubscribeSync(&sub, nc, "foo"));
testCond(s == NATS_OK);

test("Send a message to foo: ");
s = natsMsg_Create(&msg, "foo", NULL, "bar", 3);
IFOK(s, natsConnection_PublishMsg(nc, msg));
testCond(s == NATS_OK);
natsMsg_Destroy(msg);
msg = NULL;

test("Receive the message: ");
s = natsSubscription_NextMsg(&msg, sub, 1000);
testCond((s == NATS_OK) && (msg != NULL));
natsMsg_Destroy(msg);
msg = NULL;

test("Forced reconnect: ");
s = natsConnection_Reconnect(nc);
testCond(s == NATS_OK);

test("Waiting for reconnect: ");
natsMutex_Lock(arg.m);
while ((s != NATS_TIMEOUT) && !arg.reconnected)
s = natsCondition_TimedWait(arg.c, arg.m, 5000);
arg.reconnected = false;
natsMutex_Unlock(arg.m);
testCond(s == NATS_OK);

test("Send a message to foo: ");
s = natsMsg_Create(&msg, "foo", NULL, "bar", 3);
IFOK(s, natsConnection_PublishMsg(nc, msg));
testCond(s == NATS_OK);
natsMsg_Destroy(msg);
msg = NULL;

test("Receive the message: ");
s = natsSubscription_NextMsg(&msg, sub, 1000);
testCond((s == NATS_OK) && (msg != NULL));
natsMsg_Destroy(msg);
msg = NULL;

natsSubscription_Destroy(sub);
natsConnection_Destroy(nc);
natsOptions_Destroy(opts);
_destroyDefaultThreadArgs(&arg);
}

static void
_stopServerInThread(void *closure)
{
Expand Down Expand Up @@ -36210,6 +36278,7 @@ static testInfo allTests[] =
{"RetryOnFailedConnect", test_RetryOnFailedConnect},
{"NoPartialOnReconnect", test_NoPartialOnReconnect},
{"ReconnectFailsPendingRequests", test_ReconnectFailsPendingRequest},
{"ForcedReconnect", test_ForcedReconnect},

{"ErrOnConnectAndDeadlock", test_ErrOnConnectAndDeadlock},
{"ErrOnMaxPayloadLimit", test_ErrOnMaxPayloadLimit},
Expand Down