From c79d802450a63a9f1f757b3872cd2ce7680eecb5 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Mon, 6 Jan 2025 12:17:37 +0000 Subject: [PATCH] Strip unnecessary `ClientInfo` fields from stream & consumer assignment proposals Signed-off-by: Neil Twigg --- server/events.go | 11 ++++++++++ server/jetstream_cluster.go | 40 +++++++++++++++++++++---------------- 2 files changed, 34 insertions(+), 17 deletions(-) diff --git a/server/events.go b/server/events.go index a91eaec92c..eeff78c18e 100644 --- a/server/events.go +++ b/server/events.go @@ -336,6 +336,17 @@ func (ci *ClientInfo) forAssignmentSnap() *ClientInfo { } } +// forProposal returns the minimum amount of ClientInfo we need for assignment proposals. +func (ci *ClientInfo) forProposal() *ClientInfo { + if ci == nil { + return nil + } + cci := *ci + cci.Jwt = _EMPTY_ + cci.IssuerKey = _EMPTY_ + return &cci +} + // ServerStats hold various statistics that we will periodically send out. type ServerStats struct { Start time.Time `json:"start"` diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 8e250fbff7..59f1d9f9e4 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -7297,23 +7297,29 @@ func (s *Server) jsClusteredMsgDeleteRequest(ci *ClientInfo, acc *Account, mset } func encodeAddStreamAssignment(sa *streamAssignment) []byte { + csa := *sa + csa.Client = csa.Client.forProposal() var bb bytes.Buffer bb.WriteByte(byte(assignStreamOp)) - json.NewEncoder(&bb).Encode(sa) + json.NewEncoder(&bb).Encode(csa) return bb.Bytes() } func encodeUpdateStreamAssignment(sa *streamAssignment) []byte { + csa := *sa + csa.Client = csa.Client.forProposal() var bb bytes.Buffer bb.WriteByte(byte(updateStreamOp)) - json.NewEncoder(&bb).Encode(sa) + json.NewEncoder(&bb).Encode(csa) return bb.Bytes() } func encodeDeleteStreamAssignment(sa *streamAssignment) []byte { + csa := *sa + csa.Client = csa.Client.forProposal() var bb bytes.Buffer bb.WriteByte(byte(removeStreamOp)) - json.NewEncoder(&bb).Encode(sa) + json.NewEncoder(&bb).Encode(csa) return bb.Bytes() } @@ -7723,16 +7729,20 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec } func encodeAddConsumerAssignment(ca *consumerAssignment) []byte { + cca := *ca + cca.Client = cca.Client.forProposal() var bb bytes.Buffer bb.WriteByte(byte(assignConsumerOp)) - json.NewEncoder(&bb).Encode(ca) + json.NewEncoder(&bb).Encode(cca) return bb.Bytes() } func encodeDeleteConsumerAssignment(ca *consumerAssignment) []byte { + cca := *ca + cca.Client = cca.Client.forProposal() var bb bytes.Buffer bb.WriteByte(byte(removeConsumerOp)) - json.NewEncoder(&bb).Encode(ca) + json.NewEncoder(&bb).Encode(cca) return bb.Bytes() } @@ -7743,25 +7753,21 @@ func decodeConsumerAssignment(buf []byte) (*consumerAssignment, error) { } func encodeAddConsumerAssignmentCompressed(ca *consumerAssignment) []byte { - b, err := json.Marshal(ca) - if err != nil { - return nil - } - // TODO(dlc) - Streaming better approach here probably. + cca := *ca + cca.Client = cca.Client.forProposal() var bb bytes.Buffer bb.WriteByte(byte(assignCompressedConsumerOp)) - bb.Write(s2.Encode(nil, b)) + s2e := s2.NewWriter(&bb) + json.NewEncoder(s2e).Encode(cca) + s2e.Close() return bb.Bytes() } func decodeConsumerAssignmentCompressed(buf []byte) (*consumerAssignment, error) { var ca consumerAssignment - js, err := s2.Decode(nil, buf) - if err != nil { - return nil, err - } - err = json.Unmarshal(js, &ca) - return &ca, err + bb := bytes.NewBuffer(buf) + s2d := s2.NewReader(bb) + return &ca, json.NewDecoder(s2d).Decode(&ca) } var errBadStreamMsg = errors.New("jetstream cluster bad replicated stream msg")