Skip to content

Commit

Permalink
Strip unnecessary ClientInfo fields from stream & consumer assignme…
Browse files Browse the repository at this point in the history
…nt proposals

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Jan 6, 2025
1 parent d3bcbfc commit 10fb8f4
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 17 deletions.
11 changes: 11 additions & 0 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,17 @@ func (ci *ClientInfo) forAssignmentSnap() *ClientInfo {
}
}

// forProposal returns the minimum amount of ClientInfo we need for assignment proposals.
func (ci *ClientInfo) forProposal() *ClientInfo {
if ci == nil {
return nil
}
cci := *ci
cci.Jwt = _EMPTY_
cci.IssuerKey = _EMPTY_
return &cci
}

// ServerStats hold various statistics that we will periodically send out.
type ServerStats struct {
Start time.Time `json:"start"`
Expand Down
40 changes: 23 additions & 17 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7295,23 +7295,29 @@ func (s *Server) jsClusteredMsgDeleteRequest(ci *ClientInfo, acc *Account, mset
}

func encodeAddStreamAssignment(sa *streamAssignment) []byte {
csa := *sa
csa.Client = csa.Client.forProposal()
var bb bytes.Buffer
bb.WriteByte(byte(assignStreamOp))
json.NewEncoder(&bb).Encode(sa)
json.NewEncoder(&bb).Encode(csa)
return bb.Bytes()
}

func encodeUpdateStreamAssignment(sa *streamAssignment) []byte {
csa := *sa
csa.Client = csa.Client.forProposal()
var bb bytes.Buffer
bb.WriteByte(byte(updateStreamOp))
json.NewEncoder(&bb).Encode(sa)
json.NewEncoder(&bb).Encode(csa)
return bb.Bytes()
}

func encodeDeleteStreamAssignment(sa *streamAssignment) []byte {
csa := *sa
csa.Client = csa.Client.forProposal()
var bb bytes.Buffer
bb.WriteByte(byte(removeStreamOp))
json.NewEncoder(&bb).Encode(sa)
json.NewEncoder(&bb).Encode(csa)
return bb.Bytes()
}

Expand Down Expand Up @@ -7721,16 +7727,20 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
}

func encodeAddConsumerAssignment(ca *consumerAssignment) []byte {
cca := *ca
cca.Client = cca.Client.forProposal()
var bb bytes.Buffer
bb.WriteByte(byte(assignConsumerOp))
json.NewEncoder(&bb).Encode(ca)
json.NewEncoder(&bb).Encode(cca)
return bb.Bytes()
}

func encodeDeleteConsumerAssignment(ca *consumerAssignment) []byte {
cca := *ca
cca.Client = cca.Client.forProposal()
var bb bytes.Buffer
bb.WriteByte(byte(removeConsumerOp))
json.NewEncoder(&bb).Encode(ca)
json.NewEncoder(&bb).Encode(cca)
return bb.Bytes()
}

Expand All @@ -7741,25 +7751,21 @@ func decodeConsumerAssignment(buf []byte) (*consumerAssignment, error) {
}

func encodeAddConsumerAssignmentCompressed(ca *consumerAssignment) []byte {
b, err := json.Marshal(ca)
if err != nil {
return nil
}
// TODO(dlc) - Streaming better approach here probably.
cca := *ca
cca.Client = cca.Client.forProposal()
var bb bytes.Buffer
bb.WriteByte(byte(assignCompressedConsumerOp))
bb.Write(s2.Encode(nil, b))
s2e := s2.NewWriter(&bb)
json.NewEncoder(s2e).Encode(cca)
s2e.Close()
return bb.Bytes()
}

func decodeConsumerAssignmentCompressed(buf []byte) (*consumerAssignment, error) {
var ca consumerAssignment
js, err := s2.Decode(nil, buf)
if err != nil {
return nil, err
}
err = json.Unmarshal(js, &ca)
return &ca, err
bb := bytes.NewBuffer(buf)
s2d := s2.NewReader(bb)
return &ca, json.NewDecoder(s2d).Decode(&ca)
}

var errBadStreamMsg = errors.New("jetstream cluster bad replicated stream msg")
Expand Down

0 comments on commit 10fb8f4

Please sign in to comment.