Skip to content

Commit

Permalink
remove registry locks
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Oct 19, 2023
1 parent d78d9f0 commit 94b98f0
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 297 deletions.
24 changes: 12 additions & 12 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,7 @@ func (c *Client) close(disconnect Disconnect) error {
c.node.logger.log(newLogEntry(LogLevelDebug, "closing client connection", map[string]any{"client": c.uid, "user": c.user, "reason": disconnect.Reason}))
}
if disconnect.Code != DisconnectConnectionClosed.Code {
incServerDisconnect(disconnect.Code)
c.node.metrics.incServerDisconnect(disconnect.Code)
}
if c.eventHub.disconnectHandler != nil && prevStatus == statusConnected {
c.eventHub.disconnectHandler(DisconnectEvent{
Expand Down Expand Up @@ -1095,7 +1095,7 @@ func isPong(cmd *protocol.Command) bool {

func (c *Client) handleCommandFinished(cmd *protocol.Command, method commandMethodType, disconnect *Disconnect, reply *protocol.Reply, started time.Time) {
defer func() {
observeCommandDuration(method, time.Since(started))
c.node.metrics.observeCommandDuration(method, time.Since(started))
}()
if c.node.clientEvents.commandProcessedHandler != nil {
event := newCommandProcessedEvent(cmd, disconnect, reply, started)
Expand All @@ -1105,7 +1105,7 @@ func (c *Client) handleCommandFinished(cmd *protocol.Command, method commandMeth

func (c *Client) handleCommandDispatchError(cmd *protocol.Command, method commandMethodType, err error, started time.Time) (*Disconnect, bool) {
defer func() {
observeCommandDuration(method, time.Since(started))
c.node.metrics.observeCommandDuration(method, time.Since(started))
}()
switch t := err.(type) {
case *Disconnect:
Expand Down Expand Up @@ -1154,7 +1154,7 @@ func (c *Client) dispatchCommand(cmd *protocol.Command, cmdSize int) (*Disconnec
if metricChannel != "" && c.node.config.GetChannelNamespaceLabel != nil && c.node.config.ChannelNamespaceLabelForMessagesReceived {
channelGroup = c.node.config.GetChannelNamespaceLabel(metricChannel)
}
incTransportMessagesReceived(c.transport.Name(), channelGroup, cmdSize)
c.node.metrics.incTransportMessagesReceived(c.transport.Name(), channelGroup, cmdSize)
}()

if isPong(cmd) {
Expand Down Expand Up @@ -1284,7 +1284,7 @@ func (c *Client) writeEncodedCommandReply(method commandMethodType, cmd *protoco
if c.node.LogEnabled(LogLevelInfo) {
c.node.logger.log(newLogEntry(LogLevelInfo, "client command error", map[string]any{"reply": fmt.Sprintf("%v", rep), "command": fmt.Sprintf("%v", cmd), "client": c.ID(), "user": c.UserID(), "error": rep.Error.Message, "code": rep.Error.Code}))
}
incReplyError(method, rep.Error.Code)
c.node.metrics.incReplyError(method, rep.Error.Code)
}

protoType := c.transport.Protocol().toProto()
Expand Down Expand Up @@ -2051,7 +2051,7 @@ func (c *Client) writeError(method commandMethodType, cmd *protocol.Command, err

func (c *Client) writeDisconnectOrErrorFlush(method commandMethodType, cmd *protocol.Command, replyError error, started time.Time, rw *replyWriter) {
defer func() {
observeCommandDuration(method, time.Since(started))
c.node.metrics.observeCommandDuration(method, time.Since(started))
}()
switch t := replyError.(type) {
case *Disconnect:
Expand Down Expand Up @@ -2125,7 +2125,7 @@ func (c *Client) getRPCCommandReply(res *protocol.RPCResult) (*protocol.Reply, e
func (c *Client) handleSend(req *protocol.SendRequest, cmd *protocol.Command, started time.Time) error {
// Send handler is a bit special since it's a one way command: client does not expect any reply.
if c.eventHub.messageHandler == nil {
observeCommandDuration(commandSend, time.Since(started))
c.node.metrics.observeCommandDuration(commandSend, time.Since(started))
// Return DisconnectNotAvailable here since otherwise client won't even know
// server does not have asynchronous message handler set.
return DisconnectNotAvailable
Expand Down Expand Up @@ -2451,7 +2451,7 @@ func (c *Client) startWriter(batchDelay time.Duration, maxMessagesInFrame int, q
if item.Channel != "" && c.node.config.GetChannelNamespaceLabel != nil && c.node.config.ChannelNamespaceLabelForMessagesSent {
channelGroup = c.node.config.GetChannelNamespaceLabel(item.Channel)
}
incTransportMessagesSent(c.transport.Name(), channelGroup, len(item.Data))
c.node.metrics.incTransportMessagesSent(c.transport.Name(), channelGroup, len(item.Data))

if c.node.clientEvents.transportWriteHandler != nil {
pass := c.node.clientEvents.transportWriteHandler(c, TransportWriteEvent(item))
Expand Down Expand Up @@ -2488,7 +2488,7 @@ func (c *Client) startWriter(batchDelay time.Duration, maxMessagesInFrame int, q
if items[i].Channel != "" && c.node.config.GetChannelNamespaceLabel != nil && c.node.config.ChannelNamespaceLabelForMessagesSent {
channelGroup = c.node.config.GetChannelNamespaceLabel(items[i].Channel)
}
incTransportMessagesSent(c.transport.Name(), channelGroup, len(items[i].Data))
c.node.metrics.incTransportMessagesSent(c.transport.Name(), channelGroup, len(items[i].Data))
}
writeMu.Lock()
defer writeMu.Unlock()
Expand Down Expand Up @@ -2757,7 +2757,7 @@ func (c *Client) subscribeCmd(req *protocol.SubscribeRequest, reply SubscribeRep
latestOffset = historyResult.Offset
latestEpoch = historyResult.Epoch
res.Recovered = false
incRecover(res.Recovered)
c.node.metrics.incRecover(res.Recovered)
} else {
c.node.logger.log(newLogEntry(LogLevelError, "error on recover", map[string]any{"channel": channel, "user": c.user, "client": c.uid, "error": err.Error()}))
c.pubSubSync.StopBuffering(channel)
Expand All @@ -2773,7 +2773,7 @@ func (c *Client) subscribeCmd(req *protocol.SubscribeRequest, reply SubscribeRep
var recovered bool
recoveredPubs, recovered = isRecovered(historyResult, cmdOffset, cmdEpoch)
res.Recovered = recovered
incRecover(res.Recovered)
c.node.metrics.incRecover(res.Recovered)
}
} else {
streamTop, err := c.node.streamTop(channel, reply.Options.HistoryMetaTTL)
Expand Down Expand Up @@ -3102,7 +3102,7 @@ func (c *Client) logDisconnectBadRequest(message string) error {

func (c *Client) logWriteInternalErrorFlush(method commandMethodType, cmd *protocol.Command, err error, message string, started time.Time, rw *replyWriter) {
defer func() {
observeCommandDuration(method, time.Since(started))
c.node.metrics.observeCommandDuration(method, time.Since(started))
}()
if clientErr, ok := err.(*Error); ok {
errorReply := &protocol.Reply{Error: clientErr.toProto()}
Expand Down
2 changes: 1 addition & 1 deletion handler_http_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const defaultMaxHTTPStreamingBodySize = 64 * 1024
const streamingResponseWriteTimeout = time.Second

func (h *HTTPStreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
incTransportConnect(transportHTTPStream)
h.node.metrics.incTransportConnect(transportHTTPStream)

if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusOK)
Expand Down
2 changes: 1 addition & 1 deletion handler_sockjs.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (s *SockjsHandler) sockJSHandler(sess sockjs.Session) {

// sockJSHandler called when new client connection comes to SockJS endpoint.
func (s *SockjsHandler) handleSession(sess sockjs.Session) {
incTransportConnect(transportSockJS)
s.node.metrics.incTransportConnect(transportSockJS)

// Separate goroutine for better GC of caller's data.
go func() {
Expand Down
2 changes: 1 addition & 1 deletion handler_sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const connectUrlParam = "cf_connect"
const defaultMaxSSEBodySize = 64 * 1024

func (h *SSEHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
incTransportConnect(transportSSE)
h.node.metrics.incTransportConnect(transportSSE)

var requestData []byte
if r.Method == http.MethodGet {
Expand Down
2 changes: 1 addition & 1 deletion handler_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func NewWebsocketHandler(node *Node, config WebsocketConfig) *WebsocketHandler {
}

func (s *WebsocketHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
incTransportConnect(transportWebsocket)
s.node.metrics.incTransportConnect(transportWebsocket)

var protoType = ProtocolTypeJSON
var useFramePingPong bool
Expand Down
Loading

0 comments on commit 94b98f0

Please sign in to comment.