Skip to content

Commit

Permalink
sync vkgo
Browse files Browse the repository at this point in the history
  • Loading branch information
alpinskiy committed Jun 13, 2024
1 parent f56de38 commit 0523f36
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 6 deletions.
2 changes: 1 addition & 1 deletion internal/vkgo/rpc/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (s *Server) collectStats(localAddr net.Addr) map[string]string {
requestMem, _ := s.reqMemSem.Observe()
responseMem, _ := s.respMemSem.Observe()

workersTotal := s.workerPool.Created()
workersTotal, _ := s.workerPool.Created()

gc := srvfunc.GetGCStats()
gcPausesMs, _ := json.Marshal(gc.LastPausesMs)
Expand Down
8 changes: 7 additions & 1 deletion internal/vkgo/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,13 @@ func (err *tagError) Error() string {
if err == nil {
return "<nil>"
}
return err.msg
if len(err.msg) != 0 {
return err.msg
}
if err.err != nil {
return err.err.Error()
}
return ""
}

func (err *tagError) Unwrap() error {
Expand Down
17 changes: 17 additions & 0 deletions internal/vkgo/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ type Server struct {
statRequestsCurrent atomic.Int64
statRPS atomic.Int64
statHostname string
statLongPollsWaiting atomic.Int64

opts ServerOptions

Expand Down Expand Up @@ -1100,6 +1101,22 @@ func (s *Server) RequestsCurrent() int64 {
return s.statRequestsCurrent.Load()
}

func (s *Server) WorkersPoolSize() (current int, total int) {
return s.workerPool.Created()
}

func (s *Server) LongPollsWaiting() int64 {
return s.statLongPollsWaiting.Load()
}

func (s *Server) RequestsMemory() (current int64, total int64) {
return s.reqMemSem.Observe()
}

func (s *Server) ResponsesMemory() (current int64, total int64) {
return s.respMemSem.Observe()
}

func (s *Server) RPS() int64 {
return s.statRPS.Load()
}
4 changes: 4 additions & 0 deletions internal/vkgo/rpc/server_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func (sc *serverConn) push(hctx *HandlerContext, isLongpoll bool) {
sc.mu.Unlock()
return // already released
}
sc.server.statLongPollsWaiting.Dec()
delete(sc.longpollResponses, resp.hctx.queryID)
}
if sc.closedFlag || hctx.noResult {
Expand Down Expand Up @@ -309,6 +310,7 @@ func (sc *serverConn) makeLongpollResponse(hctx *HandlerContext, canceller Hijac
sc.server.rareLog(&sc.server.lastHijackWarningLog, "[rpc.Server] - invariant violation, hijack response queryID collision")
return
}
sc.server.statLongPollsWaiting.Inc()
sc.longpollResponses[queryID] = hijackedResponse{canceller: canceller, hctx: hctx}
if debugTrace {
sc.server.addTrace(fmt.Sprintf("makeLongpollResponse %p %d", hctx, hctx.queryID))
Expand All @@ -331,6 +333,7 @@ func (sc *serverConn) cancelLongpollResponse(queryID int64) {
if debugPrint {
fmt.Printf("longpollResponses cancel %d\n", queryID)
}
sc.server.statLongPollsWaiting.Dec()
delete(sc.longpollResponses, queryID)
if debugTrace {
sc.server.addTrace(fmt.Sprintf("cancelLongpollResponse %p %d", resp.hctx, queryID))
Expand All @@ -344,6 +347,7 @@ func (sc *serverConn) cancelAllLongpollResponses() {
sc.mu.Lock()
longpollResponses := sc.longpollResponses
sc.longpollResponses = nil // If makeLongpollResponse is called, we'll panic. But this must be impossible if SyncHandler follows protocol
sc.server.statLongPollsWaiting.Sub(int64(len(longpollResponses)))
if debugTrace {
sc.server.addTrace("cancelAllLongpollResponses")
}
Expand Down
3 changes: 1 addition & 2 deletions internal/vkgo/rpc/server_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,7 @@ func ServerWithRequestHandler(fn RequestHandlerFunc) ServerOptionsFunc {
}
}

// TODO: rename ServerWithResponseHandler
func ServerWithRequestEndHandler(fn ResponseHandlerFunc) ServerOptionsFunc {
func ServerWithResponseHandler(fn ResponseHandlerFunc) ServerOptionsFunc {
return func(opts *ServerOptions) {
opts.ResponseHandler = fn
}
Expand Down
4 changes: 2 additions & 2 deletions internal/vkgo/rpc/server_workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ func (t *workerPool) Close() {
t.cond.Broadcast()
}

func (t *workerPool) Created() int {
func (t *workerPool) Created() (current int, total int) {
t.mu.Lock()
defer t.mu.Unlock()
return t.created
return t.created, t.create
}

func (t *workerPool) Get(wg *WaitGroup) (*worker, bool) {
Expand Down

0 comments on commit 0523f36

Please sign in to comment.