Skip to content

Commit

Permalink
Cherry-picks for 2.10.25-RC.1 (#6345)
Browse files Browse the repository at this point in the history
#### Dependencies
- #6323
- #6324

####  Leafnode
- #6291

#### JetStream
- #6226
- #6235
- #6277
- #6279
- #6283
- #6289
- #6316
- #6317
- #6325
- #6326
- #6335
- #6338
- #6341
- #6344
- #6150
- #6351
- #6355

#### Tests
- #6278
- #6297
- #6300
- #6343
- #6329
- #6330
- #6331
- #6332
- #6334
- #6356
  • Loading branch information
wallyqs authored Jan 9, 2025
2 parents 1d6f7ea + 82d47e2 commit 339d1b3
Show file tree
Hide file tree
Showing 25 changed files with 1,191 additions and 367 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ require (
github.com/nats-io/nuid v1.0.1
go.uber.org/automaxprocs v1.6.0
golang.org/x/crypto v0.31.0
golang.org/x/sys v0.28.0
golang.org/x/time v0.8.0
golang.org/x/sys v0.29.0
golang.org/x/time v0.9.0
)
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwE
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg=
golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY=
golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
140 changes: 73 additions & 67 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1415,8 +1415,23 @@ func (o *consumer) unsubscribe(sub *subscription) {

// We need to make sure we protect access to the outq.
// Do all advisory sends here.
func (o *consumer) sendAdvisory(subj string, msg []byte) {
o.outq.sendMsg(subj, msg)
func (o *consumer) sendAdvisory(subject string, e any) {
if o.acc == nil {
return
}

// If there is no one listening for this advisory then save ourselves the effort
// and don't bother encoding the JSON or sending it.
if sl := o.acc.sl; (sl != nil && !sl.HasInterest(subject)) && !o.srv.hasGatewayInterest(o.acc.Name, subject) {
return
}

j, err := json.Marshal(e)
if err != nil {
return
}

o.outq.sendMsg(subject, j)
}

func (o *consumer) sendDeleteAdvisoryLocked() {
Expand All @@ -1432,13 +1447,8 @@ func (o *consumer) sendDeleteAdvisoryLocked() {
Domain: o.srv.getOpts().JetStreamDomain,
}

j, err := json.Marshal(e)
if err != nil {
return
}

subj := JSAdvisoryConsumerDeletedPre + "." + o.stream + "." + o.name
o.sendAdvisory(subj, j)
o.sendAdvisory(subj, e)
}

func (o *consumer) sendCreateAdvisory() {
Expand All @@ -1457,13 +1467,8 @@ func (o *consumer) sendCreateAdvisory() {
Domain: o.srv.getOpts().JetStreamDomain,
}

j, err := json.Marshal(e)
if err != nil {
return
}

subj := JSAdvisoryConsumerCreatedPre + "." + o.stream + "." + o.name
o.sendAdvisory(subj, j)
o.sendAdvisory(subj, e)
}

// Created returns created time.
Expand Down Expand Up @@ -1573,6 +1578,8 @@ var (
consumerNotActiveMaxInterval = defaultConsumerNotActiveMaxInterval
)

// deleteNotActive must only be called from time.AfterFunc or in its own
// goroutine, as it can block on clean-up.
func (o *consumer) deleteNotActive() {
o.mu.Lock()
if o.mset == nil {
Expand Down Expand Up @@ -1613,8 +1620,25 @@ func (o *consumer) deleteNotActive() {

s, js := o.mset.srv, o.srv.js.Load()
acc, stream, name, isDirect := o.acc.Name, o.stream, o.name, o.cfg.Direct
var qch, cqch chan struct{}
if o.srv != nil {
qch = o.srv.quitCh
}
if o.js != nil {
cqch = o.js.clusterQuitC()
}
o.mu.Unlock()

// Useful for pprof.
setGoRoutineLabels(pprofLabels{
"account": acc,
"stream": stream,
"consumer": name,
})

// We will delete locally regardless.
defer o.delete()

// If we are clustered, check if we still have this consumer assigned.
// If we do forward a proposal to delete ourselves to the metacontroller leader.
if !isDirect && s.JetStreamIsClustered() {
Expand All @@ -1637,38 +1661,40 @@ func (o *consumer) deleteNotActive() {
if ca != nil && cc != nil {
// Check to make sure we went away.
// Don't think this needs to be a monitored go routine.
go func() {
jitter := time.Duration(rand.Int63n(int64(consumerNotActiveStartInterval)))
interval := consumerNotActiveStartInterval + jitter
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
js.mu.RLock()
if js.shuttingDown {
js.mu.RUnlock()
return
}
nca := js.consumerAssignment(acc, stream, name)
jitter := time.Duration(rand.Int63n(int64(consumerNotActiveStartInterval)))
interval := consumerNotActiveStartInterval + jitter
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
case <-qch:
return
case <-cqch:
return
}
js.mu.RLock()
if js.shuttingDown {
js.mu.RUnlock()
// Make sure this is not a new consumer with the same name.
if nca != nil && nca == ca {
s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name)
meta.ForwardProposal(removeEntry)
if interval < consumerNotActiveMaxInterval {
interval *= 2
ticker.Reset(interval)
}
continue
}
// We saw that consumer has been removed, all done.
return
}
}()
nca := js.consumerAssignment(acc, stream, name)
js.mu.RUnlock()
// Make sure this is not a new consumer with the same name.
if nca != nil && nca == ca {
s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name)
meta.ForwardProposal(removeEntry)
if interval < consumerNotActiveMaxInterval {
interval *= 2
ticker.Reset(interval)
}
continue
}
// We saw that consumer has been removed, all done.
return
}
}
}

// We will delete here regardless.
o.delete()
}

func (o *consumer) watchGWinterest() {
Expand Down Expand Up @@ -2382,12 +2408,7 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) {
Domain: o.srv.getOpts().JetStreamDomain,
}

j, err := json.Marshal(e)
if err != nil {
return
}

o.sendAdvisory(o.nakEventT, j)
o.sendAdvisory(o.nakEventT, e)

// Check to see if we have delays attached.
if len(nak) > len(AckNak) {
Expand Down Expand Up @@ -2462,15 +2483,8 @@ func (o *consumer) processTerm(sseq, dseq, dc uint64, reason, reply string) bool
Domain: o.srv.getOpts().JetStreamDomain,
}

j, err := json.Marshal(e)
if err != nil {
// We had an error during the marshal, so we can't send the advisory,
// but we still need to tell the caller that the ack was processed.
return ackedInPlace
}

subj := JSAdvisoryConsumerMsgTerminatedPre + "." + o.stream + "." + o.name
o.sendAdvisory(subj, j)
o.sendAdvisory(subj, e)
return ackedInPlace
}

Expand Down Expand Up @@ -2765,12 +2779,7 @@ func (o *consumer) sampleAck(sseq, dseq, dc uint64) {
Domain: o.srv.getOpts().JetStreamDomain,
}

j, err := json.Marshal(e)
if err != nil {
return
}

o.sendAdvisory(o.ackEventT, j)
o.sendAdvisory(o.ackEventT, e)
}

// Process an ACK.
Expand Down Expand Up @@ -3515,12 +3524,7 @@ func (o *consumer) notifyDeliveryExceeded(sseq, dc uint64) {
Domain: o.srv.getOpts().JetStreamDomain,
}

j, err := json.Marshal(e)
if err != nil {
return
}

o.sendAdvisory(o.deliveryExcEventT, j)
o.sendAdvisory(o.deliveryExcEventT, e)
}

// Check if the candidate subject matches a filter if its present.
Expand Down Expand Up @@ -5379,6 +5383,7 @@ func (o *consumer) requestNextMsgSubject() string {

func (o *consumer) decStreamPending(sseq uint64, subj string) {
o.mu.Lock()

// Update our cached num pending only if we think deliverMsg has not done so.
if sseq >= o.sseq && o.isFilteredMatch(subj) {
o.npc--
Expand All @@ -5390,6 +5395,7 @@ func (o *consumer) decStreamPending(sseq uint64, subj string) {
if o.rdc != nil {
rdc = o.rdc[sseq]
}

o.mu.Unlock()

// If it was pending process it like an ack.
Expand Down
22 changes: 22 additions & 0 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,28 @@ func (ci *ClientInfo) forAssignmentSnap() *ClientInfo {
}
}

// forProposal returns the minimum amount of ClientInfo we need for assignment proposals.
func (ci *ClientInfo) forProposal() *ClientInfo {
if ci == nil {
return nil
}
cci := *ci
cci.Jwt = _EMPTY_
cci.IssuerKey = _EMPTY_
return &cci
}

// forAdvisory returns the minimum amount of ClientInfo we need for JS advisory events.
func (ci *ClientInfo) forAdvisory() *ClientInfo {
if ci == nil {
return nil
}
cci := *ci
cci.Jwt = _EMPTY_
cci.Alternates = nil
return &cci
}

// ServerStats hold various statistics that we will periodically send out.
type ServerStats struct {
Start time.Time `json:"start"`
Expand Down
Loading

0 comments on commit 339d1b3

Please sign in to comment.