Skip to content

Commit

Permalink
improve locking while result is transfered to clients
Browse files Browse the repository at this point in the history
  • Loading branch information
sni committed Jan 15, 2025
1 parent 272e5ae commit c2c6797
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 102 deletions.
1 change: 1 addition & 0 deletions Changes
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ next:
- use primary key index for all tables
- use output buffer for queries without fixed header to reduce locked time
- add lq cmd to simply send queries
- improve locking while result is transfered to clients

2.2.5 Wed Dec 18 14:54:16 CET 2024
- add lb benchmark tool
Expand Down
5 changes: 1 addition & 4 deletions pkg/lmd/client_con.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type ClientConnection struct {
logSlowQueryThreshold int
logHugeQueryThreshold int
keepAlive bool
keepOpen bool
}

// NewClientConnection creates a new client connection object.
Expand Down Expand Up @@ -86,9 +85,7 @@ func (cl *ClientConnection) Handle() {
}
}
}
if !cl.keepOpen {
cl.connection.Close()
}
cl.connection.Close()
}

// answer handles a single client connection.
Expand Down
111 changes: 34 additions & 77 deletions pkg/lmd/response.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package lmd

import (
"bufio"
"bytes"
"context"
"fmt"
Expand Down Expand Up @@ -31,6 +30,7 @@ type Response struct {
Lock *deadlock.RWMutex // must be used for Result and Failed access
Request *Request // the initial request
RawResults *RawResultSet // collected results from peers
lockedStores map[*Peer]*DataStore
Failed map[string]string // map of failed backends by key
Result ResultSet // final processed result table
SelectedPeers []*Peer // peers used for this response
Expand Down Expand Up @@ -86,6 +86,7 @@ func NewResponse(ctx context.Context, req *Request, client *ClientConnection) (r
}

// set locks for required stores
res.lockedStores = make(map[*Peer]*DataStore)
stores := make(map[*Peer]*DataStore)
for i := range res.SelectedPeers {
peer := res.SelectedPeers[i]
Expand All @@ -99,17 +100,12 @@ func NewResponse(ctx context.Context, req *Request, client *ClientConnection) (r
}
if !table.WorksUnlocked {
store.DataSet.lock.RLock()
res.lockedStores[peer] = store
}
stores[peer] = store
}

if !table.WorksUnlocked {
defer func() {
for _, s := range stores {
s.DataSet.lock.RUnlock()
}
}()
}
defer res.unlockStores()

res.RawResults = &RawResultSet{}
res.RawResults.Sort = req.Sort
Expand All @@ -128,6 +124,18 @@ func NewResponse(ctx context.Context, req *Request, client *ClientConnection) (r
return res, 0, err
}

func (res *Response) unlockStores() {
if res.lockedStores == nil {
return
}

for _, s := range res.lockedStores {
s.DataSet.lock.RUnlock()
}

res.lockedStores = nil
}

func (res *Response) prepareResponse(ctx context.Context, req *Request) {
if res.Failed == nil {
res.Failed = make(map[string]string)
Expand Down Expand Up @@ -387,54 +395,30 @@ func finalStatsApply(stat *Filter) (res float64) {

// Send converts the result object to a livestatus answer and writes the resulting bytes back to the client.
func (res *Response) Send(client *ClientConnection) (size int64, err error) {
if res.Request.ResponseFixed16 {
size, err = res.SendFixed16(client.connection)

localAddr := client.connection.LocalAddr().String()
promFrontendBytesSend.WithLabelValues(localAddr).Add(float64(size + 1))

return size, err
}

// use output buffer to prevent slow clients holding lmd locks
writer := bufio.NewWriterSize(client.connection, 65536)
size, err = res.SendUnbuffered(writer)
if err != nil {
return size, err
}

client.keepOpen = true
go func(writer *bufio.Writer) {
err2 := writer.Flush()
if err2 != nil {
logWith(res).Warnf("write error: %s", err2.Error())
}
err2 = client.connection.Close()
if err2 != nil {
logWith(res).Warnf("close error: %s", err2.Error())
}
}(writer)
size, err = res.send(client.connection)

localAddr := client.connection.LocalAddr().String()
promFrontendBytesSend.WithLabelValues(localAddr).Add(float64(size + 1))

return size, nil
return size, err
}

// SendFixed16 converts the result object to a livestatus answer and writes the resulting bytes back to the client.
func (res *Response) SendFixed16(conn io.Writer) (size int64, err error) {
// send converts the result object to a livestatus answer and writes the resulting bytes back to the client.
func (res *Response) send(conn io.Writer) (size int64, err error) {
resBuffer, err := res.Buffer()
if err != nil {
return 0, err
}
size = int64(resBuffer.Len())
headerFixed16 := fmt.Sprintf("%d %11d", res.Code, size+1)
logWith(res).Tracef("write: %s", headerFixed16)
_, err = fmt.Fprintf(conn, "%s\n", headerFixed16)
if err != nil {
logWith(res).Warnf("write error: %s", err.Error())
if res.Request.ResponseFixed16 {
headerFixed16 := fmt.Sprintf("%d %11d", res.Code, size+1)
logWith(res).Tracef("write: %s", headerFixed16)
_, err = fmt.Fprintf(conn, "%s\n", headerFixed16)
if err != nil {
logWith(res).Warnf("write error: %s", err.Error())

return 0, fmt.Errorf("write: %s", err.Error())
return 0, fmt.Errorf("write: %s", err.Error())
}
}
if log.IsV(LogVerbosityTrace) {
logWith(res).Tracef("write: %s", resBuffer.Bytes())
Expand All @@ -443,7 +427,7 @@ func (res *Response) SendFixed16(conn io.Writer) (size int64, err error) {
if err != nil {
logWith(res).Warnf("write error: %s", err.Error())

return 0, fmt.Errorf("writeTo: %s", err.Error())
return written, fmt.Errorf("writeTo: %s", err.Error())
}
if written != size {
logWith(res).Warnf("write error: written %d, size: %d", written, size)
Expand All @@ -455,50 +439,23 @@ func (res *Response) SendFixed16(conn io.Writer) (size int64, err error) {
if err != nil {
logWith(res).Warnf("write error: %s", err.Error())

return 0, fmt.Errorf("writeTo: %s", err.Error())
return written, fmt.Errorf("writeTo: %s", err.Error())
}

return written, nil
}

// SendUnbuffered directly prints the result to the client connection.
func (res *Response) SendUnbuffered(c io.Writer) (size int64, err error) {
countingWriter := NewWriteCounter(c)
if res.Error != nil {
logWith(res).Warnf("sending error response: %d - %s", res.Code, res.Error.Error())
_, err = countingWriter.Write([]byte(res.Error.Error()))
if err != nil {
return
}
_, err = countingWriter.Write([]byte("\n"))
size = countingWriter.Count

return
}
if res.Request.OutputFormat == OutputFormatWrappedJSON {
err = res.WrappedJSON(countingWriter)
} else {
err = res.JSON(countingWriter)
}
if err != nil {
logWith(res).Warnf("write error: %s", err.Error())

return
}
_, err = countingWriter.Write([]byte("\n"))
size = countingWriter.Count

return
}

// Buffer fills buffer with the response as bytes array.
func (res *Response) Buffer() (*bytes.Buffer, error) {
// we can unlock all stores as soon this function is over
defer res.unlockStores()

buf := new(bytes.Buffer)
if res.Error != nil {
logWith(res).Warnf("sending error response: %d - %s", res.Code, res.Error.Error())
buf.WriteString(res.Error.Error())

return buf, nil //nolint:nilerr // error has been forwared to the client but sending it worked
return buf, nil
}

if res.Request.OutputFormat == OutputFormatWrappedJSON {
Expand Down
21 changes: 0 additions & 21 deletions pkg/lmd/write_counter.go

This file was deleted.

0 comments on commit c2c6797

Please sign in to comment.