diff --git a/server/events.go b/server/events.go index 362ec2a5a4..2f9af5e944 100644 --- a/server/events.go +++ b/server/events.go @@ -336,6 +336,17 @@ func (ci *ClientInfo) forAssignmentSnap() *ClientInfo { } } +// forProposal returns the minimum amount of ClientInfo we need for assignment proposals. +func (ci *ClientInfo) forProposal() *ClientInfo { + if ci == nil { + return nil + } + cci := *ci + cci.Jwt = _EMPTY_ + cci.IssuerKey = _EMPTY_ + return &cci +} + // ServerStats hold various statistics that we will periodically send out. type ServerStats struct { Start time.Time `json:"start"` diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index da7147b1e2..7831787d7b 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -7295,23 +7295,29 @@ func (s *Server) jsClusteredMsgDeleteRequest(ci *ClientInfo, acc *Account, mset } func encodeAddStreamAssignment(sa *streamAssignment) []byte { + csa := *sa + csa.Client = csa.Client.forProposal() var bb bytes.Buffer bb.WriteByte(byte(assignStreamOp)) - json.NewEncoder(&bb).Encode(sa) + json.NewEncoder(&bb).Encode(csa) return bb.Bytes() } func encodeUpdateStreamAssignment(sa *streamAssignment) []byte { + csa := *sa + csa.Client = csa.Client.forProposal() var bb bytes.Buffer bb.WriteByte(byte(updateStreamOp)) - json.NewEncoder(&bb).Encode(sa) + json.NewEncoder(&bb).Encode(csa) return bb.Bytes() } func encodeDeleteStreamAssignment(sa *streamAssignment) []byte { + csa := *sa + csa.Client = csa.Client.forProposal() var bb bytes.Buffer bb.WriteByte(byte(removeStreamOp)) - json.NewEncoder(&bb).Encode(sa) + json.NewEncoder(&bb).Encode(csa) return bb.Bytes() } @@ -7721,16 +7727,20 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec } func encodeAddConsumerAssignment(ca *consumerAssignment) []byte { + cca := *ca + cca.Client = cca.Client.forProposal() var bb bytes.Buffer bb.WriteByte(byte(assignConsumerOp)) - json.NewEncoder(&bb).Encode(ca) + json.NewEncoder(&bb).Encode(cca) return bb.Bytes() } func encodeDeleteConsumerAssignment(ca *consumerAssignment) []byte { + cca := *ca + cca.Client = cca.Client.forProposal() var bb bytes.Buffer bb.WriteByte(byte(removeConsumerOp)) - json.NewEncoder(&bb).Encode(ca) + json.NewEncoder(&bb).Encode(cca) return bb.Bytes() } @@ -7741,25 +7751,21 @@ func decodeConsumerAssignment(buf []byte) (*consumerAssignment, error) { } func encodeAddConsumerAssignmentCompressed(ca *consumerAssignment) []byte { - b, err := json.Marshal(ca) - if err != nil { - return nil - } - // TODO(dlc) - Streaming better approach here probably. + cca := *ca + cca.Client = cca.Client.forProposal() var bb bytes.Buffer bb.WriteByte(byte(assignCompressedConsumerOp)) - bb.Write(s2.Encode(nil, b)) + s2e := s2.NewWriter(&bb) + json.NewEncoder(s2e).Encode(cca) + s2e.Close() return bb.Bytes() } func decodeConsumerAssignmentCompressed(buf []byte) (*consumerAssignment, error) { var ca consumerAssignment - js, err := s2.Decode(nil, buf) - if err != nil { - return nil, err - } - err = json.Unmarshal(js, &ca) - return &ca, err + bb := bytes.NewBuffer(buf) + s2d := s2.NewReader(bb) + return &ca, json.NewDecoder(s2d).Decode(&ca) } var errBadStreamMsg = errors.New("jetstream cluster bad replicated stream msg")