Skip to content

Commit

Permalink
Cherry-picks for 2.10.26-RC.5 (#6567)
Browse files Browse the repository at this point in the history
Includes the following:

- #6524
- #6525
- #6526
- #5424
- #6565
- #6532

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander authored Feb 21, 2025
2 parents 1d72828 + 18d9129 commit e5fc7ad
Show file tree
Hide file tree
Showing 20 changed files with 469 additions and 45 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/cov.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ jobs:

- name: Coveralls
# Use commit hash here to avoid a re-tagging attack, as this is a third-party action
# Commit 3dfc5567390f6fa9267c0ee9c251e4c8c3f18949 = tag v2
uses: coverallsapp/github-action@643bc377ffa44ace6394b2b5d0d3950076de9f63
# Commit 648a8eb78e6d50909eff900e4ec85cab4524a45b = tag v2.3.6
uses: coverallsapp/github-action@648a8eb78e6d50909eff900e4ec85cab4524a45b
with:
github-token: ${{ secrets.github_token }}
file: src/github.com/nats-io/nats-server/coverage.lcov
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.22.0
toolchain go1.22.8

require (
github.com/klauspost/compress v1.17.11
github.com/klauspost/compress v1.18.0
github.com/minio/highwayhash v1.0.3
github.com/nats-io/jwt/v2 v2.7.3
github.com/nats-io/nats.go v1.39.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE=
Expand Down
2 changes: 1 addition & 1 deletion server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2026,7 +2026,7 @@ func (a *Account) addServiceImportSub(si *serviceImport) error {
a.mu.Unlock()

cb := func(sub *subscription, c *client, acc *Account, subject, reply string, msg []byte) {
c.processServiceImport(si, acc, msg)
c.pa.delivered = c.processServiceImport(si, acc, msg)
}
sub, err := c.processSubEx([]byte(subject), nil, []byte(sid), cb, true, true, false)
if err != nil {
Expand Down
30 changes: 29 additions & 1 deletion server/accounts_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018-2024 The NATS Authors
// Copyright 2018-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -3670,3 +3670,31 @@ func TestAccountServiceAndStreamExportDoubleDelivery(t *testing.T) {
time.Sleep(200 * time.Millisecond)
require_Equal(t, msgs.Load(), 1)
}

func TestAccountServiceImportNoResponders(t *testing.T) {
// Setup NATS server.
cf := createConfFile(t, []byte(`
port: -1
accounts: {
accExp: {
users: [{user: accExp, password: accExp}]
exports: [{service: "foo"}]
}
accImp: {
users: [{user: accImp, password: accImp}]
imports: [{service: {account: accExp, subject: "foo"}}]
}
}
`))

s, _ := RunServerWithConfig(cf)
defer s.Shutdown()

// Connect to the import account. We will not setup any responders, so a request should
// error out with ErrNoResponders.
nc := natsConnect(t, s.ClientURL(), nats.UserInfo("accImp", "accImp"))
defer nc.Close()

_, err := nc.Request("foo", []byte("request"), 250*time.Millisecond)
require_Error(t, err, nats.ErrNoResponders)
}
24 changes: 19 additions & 5 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3472,17 +3472,25 @@ func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, su
}
client.mu.Unlock()

// For service imports, track if we delivered.
didDeliver := true

// Internal account clients are for service imports and need the '\r\n'.
start := time.Now()
if client.kind == ACCOUNT {
sub.icb(sub, c, acc, string(subject), string(reply), msg)
// If we are a service import check to make sure we delivered the message somewhere.
if sub.si {
didDeliver = c.pa.delivered
}
} else {
sub.icb(sub, c, acc, string(subject), string(reply), msg[:msgSize])
}
if dur := time.Since(start); dur >= readLoopReportThreshold {
srv.Warnf("Internal subscription on %q took too long: %v", subject, dur)
}
return true

return didDeliver
}

// If we are a client and we detect that the consumer we are
Expand Down Expand Up @@ -4196,17 +4204,17 @@ var (

// processServiceImport is an internal callback when a subscription matches an imported service
// from another account. This includes response mappings as well.
func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byte) {
func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byte) bool {
// If we are a GW and this is not a direct serviceImport ignore.
isResponse := si.isRespServiceImport()
if (c.kind == GATEWAY || c.kind == ROUTER) && !isResponse {
return
return false
}
// Detect cycles and ignore (return) when we detect one.
if len(c.pa.psi) > 0 {
for i := len(c.pa.psi) - 1; i >= 0; i-- {
if psi := c.pa.psi[i]; psi.se == si.se {
return
return false
}
}
}
Expand All @@ -4227,7 +4235,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
// response service imports and rrMap entries which all will need to simply expire.
// TODO(dlc) - Come up with something better.
if shouldReturn || (checkJS && si.se != nil && si.se.acc == c.srv.SystemAccount()) {
return
return false
}

var nrr []byte
Expand Down Expand Up @@ -4375,6 +4383,10 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
c.in.rts = orts
c.pa = pacopy

// Before we undo didDeliver based on tracing and last mile, mark in the c.pa which informs us of no responders status.
// If we override due to tracing and traceOnly we do not want to send back a no responders.
c.pa.delivered = didDeliver

// Determine if we should remove this service import. This is for response service imports.
// We will remove if we did not deliver, or if we are a response service import and we are
// a singleton, or we have an EOF message.
Expand Down Expand Up @@ -4404,6 +4416,8 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
siAcc.removeRespServiceImport(rsi, reason)
}
}

return didDeliver
}

func (c *client) addSubToRouteTargets(sub *subscription) {
Expand Down
17 changes: 17 additions & 0 deletions server/config_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1817,6 +1817,23 @@ func TestConfigCheck(t *testing.T) {
errorLine: 9,
errorPos: 9,
},
{
name: "invalid duration for remote leafnode first info timeout",
config: `
leafnodes {
port: -1
remotes [
{
url: "nats://127.0.0.1:123"
first_info_timeout: abc
}
]
}
`,
err: fmt.Errorf("error parsing first_info_timeout: time: invalid duration %q", "abc"),
errorLine: 7,
errorPos: 8,
},
{
name: "show warnings on empty configs without values",
config: ``,
Expand Down
8 changes: 8 additions & 0 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5308,6 +5308,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
if ss, ok := mb.fss.Find(stringToBytes(subj)); ok && ss != nil {
ss.Msgs++
ss.Last = seq
ss.lastNeedsUpdate = false
} else {
mb.fss.Insert(stringToBytes(subj), SimpleState{Msgs: 1, First: seq, Last: seq})
}
Expand Down Expand Up @@ -6030,6 +6031,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
if ss, ok := mb.fss.Find(bsubj); ok && ss != nil {
ss.Msgs++
ss.Last = seq
ss.lastNeedsUpdate = false
} else {
mb.fss.Insert(bsubj, SimpleState{
Msgs: 1,
Expand Down Expand Up @@ -8057,8 +8059,11 @@ func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) {
}
if startSlot >= len(mb.cache.idx) {
ss.First = ss.Last
ss.firstNeedsUpdate = false
ss.lastNeedsUpdate = false
return
}

endSlot := int(ss.Last - mb.cache.fseq)
if endSlot < 0 {
endSlot = 0
Expand All @@ -8085,6 +8090,8 @@ func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) {
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
ss.First = ss.Last
// Only need to reset ss.lastNeedsUpdate, ss.firstNeedsUpdate is already reset above.
ss.lastNeedsUpdate = false
return
}
buf := mb.cache.buf[li:]
Expand Down Expand Up @@ -8208,6 +8215,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error {
if ss, ok := mb.fss.Find(stringToBytes(sm.subj)); ok && ss != nil {
ss.Msgs++
ss.Last = seq
ss.lastNeedsUpdate = false
} else {
mb.fss.Insert(stringToBytes(sm.subj), SimpleState{Msgs: 1, First: seq, Last: seq})
}
Expand Down
6 changes: 5 additions & 1 deletion server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7655,7 +7655,11 @@ func TestJetStreamClusterRecreateConsumerFromMetaSnapshot(t *testing.T) {
return err
} else if o := mset.lookupConsumer("CONSUMER"); o == nil {
return errors.New("consumer doesn't exist")
} else if ccrg := o.raftNode().Group(); consumerRg == _EMPTY_ {
} else if rn := o.raftNode(); rn == nil {
return errors.New("consumer raft node doesn't exist")
} else if ccrg := rn.Group(); ccrg == _EMPTY_ {
return errors.New("consumer raft group doesn't exist")
} else if consumerRg == _EMPTY_ {
consumerRg = ccrg
} else if consumerRg != ccrg {
return errors.New("consumer raft groups don't match")
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster_2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7051,7 +7051,7 @@ func TestJetStreamClusterStreamDirectGetNotTooSoon(t *testing.T) {
defer nc.Close()

_, err = nc.Request(getSubj, nil, time.Second)
require_Error(t, err, nats.ErrTimeout)
require_Error(t, err, nats.ErrNoResponders)

// Now start all and make sure they all eventually have subs for direct access.
c.restartAll()
Expand Down
3 changes: 1 addition & 2 deletions server/jwt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6189,8 +6189,7 @@ func TestJWTAccountProtectedImport(t *testing.T) {

// ensure service fails
_, err = ncImp.Request(srvcSub, []byte("hello"), time.Second)
require_Error(t, err)
require_Contains(t, err.Error(), "timeout")
require_Error(t, err, nats.ErrNoResponders)
s.AccountResolver().Store(exportPub, exportJWTOn)
// ensure stream fails
err = ncExp.Publish(strmSub, []byte("hello"))
Expand Down
7 changes: 5 additions & 2 deletions server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
c.Noticef("Leafnode connection created%s %s", remoteSuffix, c.opts.Name)

var tlsFirst bool
var infoTimeout time.Duration
if remote != nil {
solicited = true
remote.Lock()
Expand All @@ -1006,6 +1007,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
c.leaf.isSpoke = true
}
tlsFirst = remote.TLSHandshakeFirst
infoTimeout = remote.FirstInfoTimeout
remote.Unlock()
c.acc = acc
} else {
Expand Down Expand Up @@ -1063,7 +1065,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
}
}
// We need to wait for the info, but not for too long.
c.nc.SetReadDeadline(time.Now().Add(DEFAULT_LEAFNODE_INFO_WAIT))
c.nc.SetReadDeadline(time.Now().Add(infoTimeout))
}

// We will process the INFO from the readloop and finish by
Expand Down Expand Up @@ -2897,6 +2899,7 @@ func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remot
compress := remote.Websocket.Compression
// By default the server will mask outbound frames, but it can be disabled with this option.
noMasking := remote.Websocket.NoMasking
infoTimeout := remote.FirstInfoTimeout
remote.RUnlock()
// Will do the client-side TLS handshake if needed.
tlsRequired, err := c.leafClientHandshakeIfNeeded(remote, opts)
Expand Down Expand Up @@ -2949,14 +2952,14 @@ func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remot
if noMasking {
req.Header.Add(wsNoMaskingHeader, wsNoMaskingValue)
}
c.nc.SetDeadline(time.Now().Add(infoTimeout))
if err := req.Write(c.nc); err != nil {
return nil, WriteError, err
}

var resp *http.Response

br := bufio.NewReaderSize(c.nc, MAX_CONTROL_LINE_SIZE)
c.nc.SetReadDeadline(time.Now().Add(DEFAULT_LEAFNODE_INFO_WAIT))
resp, err = http.ReadResponse(br, req)
if err == nil &&
(resp.StatusCode != 101 ||
Expand Down
Loading

0 comments on commit e5fc7ad

Please sign in to comment.