-
Notifications
You must be signed in to change notification settings - Fork 140
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Added] natsConnection_Reconnect #756
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -81,8 +81,14 @@ | |
static void | ||
_close(natsConnection *nc, natsConnStatus status, bool fromPublicClose, bool doCBs); | ||
|
||
static bool | ||
_processOpError(natsConnection *nc, natsStatus s, bool initialConnect); | ||
static natsStatus | ||
_tryReconnect(natsConnection *nc, natsStatus s, bool forcedReconnect, bool *started); | ||
|
||
static void | ||
_maybeReconnect(natsConnection *nc, natsStatus s) { _tryReconnect(nc, s, false, NULL); } | ||
|
||
static natsStatus | ||
_forceReconnect(natsConnection *nc, natsStatus s, bool *started) { return _tryReconnect(nc, s, true, started); } | ||
|
||
static natsStatus | ||
_flushTimeout(natsConnection *nc, int64_t timeout); | ||
|
@@ -2102,7 +2108,9 @@ | |
{ | ||
natsConn_Unlock(nc); | ||
|
||
if (_processOpError(nc, retSts, true)) | ||
bool reconnectStarted = false; | ||
s = _forceReconnect(nc, retSts, &reconnectStarted); | ||
if ((s == NATS_OK) && reconnectStarted) | ||
{ | ||
nats_clearLastError(); | ||
return NATS_NOT_YET_CONNECTED; | ||
|
@@ -2134,29 +2142,30 @@ | |
return s; | ||
} | ||
|
||
// _processOpError handles errors from reading or parsing the protocol. | ||
// _tryReconnect handles errors from reading or parsing the protocol, or forced | ||
// reconnection. It will fire off a doReconnect thread if needed. | ||
// The lock should not be held entering this function. | ||
static bool | ||
_processOpError(natsConnection *nc, natsStatus s, bool initialConnect) | ||
static natsStatus | ||
_tryReconnect(natsConnection *nc, natsStatus newErr, bool forcedReconnect, bool *started) | ||
{ | ||
natsStatus s = NATS_OK; | ||
|
||
natsConn_Lock(nc); | ||
|
||
if (!initialConnect) | ||
if (!forcedReconnect) | ||
{ | ||
if (_isConnecting(nc) || natsConn_isClosed(nc) || (nc->inReconnect > 0)) | ||
{ | ||
natsConn_Unlock(nc); | ||
|
||
return false; | ||
return NATS_OK; | ||
} | ||
} | ||
|
||
// Do reconnect only if allowed and we were actually connected | ||
// or if we are retrying on initial failed connect. | ||
if (initialConnect || (nc->opts->allowReconnect && (nc->status == NATS_CONN_STATUS_CONNECTED))) | ||
if (forcedReconnect || (nc->opts->allowReconnect && (nc->status == NATS_CONN_STATUS_CONNECTED))) | ||
{ | ||
natsStatus ls = NATS_OK; | ||
|
||
// Set our new status | ||
nc->status = NATS_CONN_STATUS_RECONNECTING; | ||
|
||
|
@@ -2176,7 +2185,7 @@ | |
// on the socket since we are going to reconnect. | ||
if (nc->el.attached) | ||
{ | ||
ls = _evStopPolling(nc); | ||
s = _evStopPolling(nc); | ||
natsSock_Close(nc->sockCtx.fd); | ||
nc->sockCtx.fd = NATS_SOCK_INVALID; | ||
|
||
|
@@ -2185,46 +2194,50 @@ | |
} | ||
|
||
// Fail pending flush requests. | ||
if (ls == NATS_OK) | ||
if (s == NATS_OK) | ||
_clearPendingFlushRequests(nc); | ||
// If option set, also fail pending requests. | ||
if ((ls == NATS_OK) && nc->opts->failRequestsOnDisconnect) | ||
if ((s == NATS_OK) && nc->opts->failRequestsOnDisconnect) | ||
_clearPendingRequestCalls(nc, NATS_CONNECTION_DISCONNECTED); | ||
|
||
// Create the pending buffer to hold all write requests while we try | ||
// to reconnect. | ||
if (ls == NATS_OK) | ||
ls = natsBuf_Create(&(nc->pending), nc->opts->reconnectBufSize); | ||
if (ls == NATS_OK) | ||
IFOK (s, natsBuf_Create(&(nc->pending), nc->opts->reconnectBufSize)); | ||
if (s == NATS_OK) | ||
{ | ||
nc->usePending = true; | ||
|
||
// Start the reconnect thread | ||
ls = natsThread_Create(&(nc->reconnectThread), | ||
s = natsThread_Create(&(nc->reconnectThread), | ||
_doReconnect, (void*) nc); | ||
} | ||
if (ls == NATS_OK) | ||
if (s == NATS_OK) | ||
{ | ||
// We created the reconnect thread successfully, so retain | ||
// the connection. | ||
_retain(nc); | ||
nc->inReconnect++; | ||
natsConn_Unlock(nc); | ||
|
||
return true; | ||
if (started != NULL) | ||
*started = true; | ||
|
||
return NATS_OK; | ||
} | ||
} | ||
|
||
// reconnect not allowed or we failed to setup the reconnect code. | ||
|
||
if (started != NULL) | ||
*started = false; | ||
nc->status = NATS_CONN_STATUS_DISCONNECTED; | ||
nc->err = s; | ||
nc->err = newErr; | ||
|
||
natsConn_Unlock(nc); | ||
|
||
_close(nc, NATS_CONN_STATUS_CLOSED, false, true); | ||
|
||
return false; | ||
return NATS_UPDATE_ERR_STACK(s); | ||
} | ||
|
||
static void | ||
|
@@ -2267,7 +2280,7 @@ | |
s = natsParser_Parse(nc, buffer, n); | ||
|
||
if (s != NATS_OK) | ||
_processOpError(nc, s, false); | ||
_maybeReconnect(nc, s); | ||
|
||
natsConn_Lock(nc); | ||
} | ||
|
@@ -2396,7 +2409,7 @@ | |
if (++(nc->pout) > nc->opts->maxPingsOut) | ||
{ | ||
natsConn_Unlock(nc); | ||
_processOpError(nc, NATS_STALE_CONNECTION, false); | ||
_maybeReconnect(nc, NATS_STALE_CONNECTION); | ||
return; | ||
} | ||
|
||
|
@@ -2921,7 +2934,7 @@ | |
|
||
if (strcasecmp(error, STALE_CONNECTION) == 0) | ||
{ | ||
_processOpError(nc, NATS_STALE_CONNECTION, false); | ||
_maybeReconnect(nc, NATS_STALE_CONNECTION); | ||
} | ||
else if (nats_strcasestr(error, PERMISSIONS_ERR) != NULL) | ||
{ | ||
|
@@ -3333,6 +3346,18 @@ | |
return NATS_UPDATE_ERR_STACK(s); | ||
} | ||
|
||
natsStatus | ||
natsConnection_Reconnect(natsConnection *nc) | ||
{ | ||
natsStatus s = NATS_OK; | ||
|
||
if (natsConnection_IsClosed(nc)) | ||
return nats_setDefaultError(NATS_INVALID_ARG); | ||
|
||
IFOK(s, _forceReconnect(nc, NATS_OK, NULL)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no need for the But overall, I think these changes are way more complicated than it needs to be (or I am missing something). If we want to cause a reconnect, locking the connection and and if the socket is valid, closing it should be enough, no? Of course, verification that the connection is set in such a way that it allows for reconnection (I don't recall if there are options that would totally prevent a reconnect) before doing so, and if not, returning an error indication that reconnect() failed because the connection would not reconnect because of options setting, would be best. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@kozlovic you're likely right, I was modeling after the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See #757 |
||
return NATS_UPDATE_ERR_STACK(s); | ||
} | ||
|
||
static natsStatus | ||
_processUrlString(natsOptions *opts, const char *urls) | ||
{ | ||
|
@@ -4117,7 +4142,7 @@ | |
s = natsParser_Parse(nc, buffer, n); | ||
|
||
if (s != NATS_OK) | ||
_processOpError(nc, s, false); | ||
_maybeReconnect(nc, s); | ||
|
||
natsConn_release(nc); | ||
} | ||
|
@@ -4166,7 +4191,7 @@ | |
natsConn_Unlock(nc); | ||
|
||
if (s != NATS_OK) | ||
_processOpError(nc, s, false); | ||
_maybeReconnect(nc, s); | ||
|
||
(void) NATS_UPDATE_ERR_STACK(s); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a reason we used a different status here. What we wanted to report as the error was the error that was given to processOpError(), not an internal one that we may encounter trying to setup the reconnect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, and I think my code still does that (
newErr
), but in the case of a natsConnection_Reconnect failing on a to fire off doReconnect we want to return the actual failure, right? (like, out of memory for instance)