From bf4c61fd861891287f254e7f81b9fd56b0caa1a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-No=C3=ABl=20Moyne?= Date: Wed, 7 Aug 2024 16:36:19 -0700 Subject: [PATCH 1/2] Improve Stream sourcing to a stream with discard new: max bytes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit You can have a (typically wq) stream that is sourcing have a discard new policy (so you can limit its size) was working only for the max messages limit, now is also working for the max bytes limit. Signed-off-by: Jean-Noël Moyne --- server/jetstream_cluster_4_test.go | 85 +++++++++++++++++++++++------- server/stream.go | 2 +- 2 files changed, 68 insertions(+), 19 deletions(-) diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 94d9952d81e..2897c79ba6c 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -231,6 +231,12 @@ func TestJetStreamClusterStreamPlacementDistribution(t *testing.T) { } func TestJetStreamClusterSourceWorkingQueueWithLimit(t *testing.T) { + const ( + totalMsgs = 300 + maxMsgs = 100 + maxBytes = maxMsgs * 100 + msgPayloadFormat = "%0100d" // %0100d is 100 bytes. Must match payload value above. + ) c := createJetStreamClusterExplicit(t, "WQ3", 3) defer c.shutdown() @@ -240,24 +246,37 @@ func TestJetStreamClusterSourceWorkingQueueWithLimit(t *testing.T) { _, err := js.AddStream(&nats.StreamConfig{Name: "test", Subjects: []string{"test"}, Replicas: 3}) require_NoError(t, err) - _, err = js.AddStream(&nats.StreamConfig{Name: "wq", MaxMsgs: 100, Discard: nats.DiscardNew, Retention: nats.WorkQueuePolicy, + _, err = js.AddStream(&nats.StreamConfig{Name: "wq", MaxMsgs: maxMsgs, Discard: nats.DiscardNew, Retention: nats.WorkQueuePolicy, + Sources: []*nats.StreamSource{{Name: "test"}}, Replicas: 3}) + require_NoError(t, err) + + _, err = js.AddStream(&nats.StreamConfig{Name: "wq2", MaxBytes: maxBytes, Discard: nats.DiscardNew, Retention: nats.WorkQueuePolicy, Sources: []*nats.StreamSource{{Name: "test"}}, Replicas: 3}) require_NoError(t, err) sendBatch := func(subject string, n int) { for i := 0; i < n; i++ { - _, err = js.Publish(subject, []byte(strconv.Itoa(i))) + _, err = js.Publish(subject, []byte(fmt.Sprintf(msgPayloadFormat, i))) require_NoError(t, err) } } // Populate each one. - sendBatch("test", 300) + sendBatch("test", totalMsgs) checkFor(t, 3*time.Second, 250*time.Millisecond, func() error { si, err := js.StreamInfo("wq") require_NoError(t, err) - if si.State.Msgs != 100 { - return fmt.Errorf("Expected 100 msgs, got state: %+v", si.State) + if si.State.Msgs != maxMsgs { + return fmt.Errorf("expected %d msgs on stream wq, got state: %+v", maxMsgs, si.State) + } + return nil + }) + + checkFor(t, 3*time.Second, 250*time.Millisecond, func() error { + si, err := js.StreamInfo("wq2") + require_NoError(t, err) + if si.State.Bytes > maxBytes { + return fmt.Errorf("expected no more than %d bytes on stream wq2, got state: %+v", maxBytes, si.State) } return nil }) @@ -265,11 +284,12 @@ func TestJetStreamClusterSourceWorkingQueueWithLimit(t *testing.T) { _, err = js.AddConsumer("wq", &nats.ConsumerConfig{Durable: "wqc", FilterSubject: "test", AckPolicy: nats.AckExplicitPolicy}) require_NoError(t, err) - ss, err := js.PullSubscribe("test", "wqc", nats.Bind("wq", "wqc")) + ss1, err := js.PullSubscribe("test", "wqc", nats.Bind("wq", "wqc")) require_NoError(t, err) + // we must have at least one message on the transformed subject name (ie no timeout) - f := func(done chan bool) { - for i := 0; i < 300; i++ { + f := func(ss *nats.Subscription, done chan bool) { + for i := 0; i < totalMsgs; i++ { m, err := ss.Fetch(1, nats.MaxWait(3*time.Second)) require_NoError(t, err) p, err := strconv.Atoi(string(m[0].Data)) @@ -282,24 +302,53 @@ func TestJetStreamClusterSourceWorkingQueueWithLimit(t *testing.T) { done <- true } - var doneChan = make(chan bool) - go f(doneChan) + var doneChan1 = make(chan bool) + go f(ss1, doneChan1) - checkFor(t, 6*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 10*time.Second, 250*time.Millisecond, func() error { si, err := js.StreamInfo("wq") require_NoError(t, err) - if si.State.Msgs > 0 && si.State.Msgs <= 100 { - return fmt.Errorf("Expected 0 msgs, got: %d", si.State.Msgs) - } else if si.State.Msgs > 100 { - t.Fatalf("Got more than our 100 message limit: %+v", si.State) + if si.State.Msgs > 0 && si.State.Msgs <= maxMsgs { + return fmt.Errorf("expected 0 msgs on stream wq, got: %d", si.State.Msgs) + } else if si.State.Msgs > maxMsgs { + t.Fatalf("got more than our %d message limit on stream wq: %+v", maxMsgs, si.State) } + return nil }) select { - case <-doneChan: - ss.Drain() - case <-time.After(5 * time.Second): + case <-doneChan1: + ss1.Drain() + case <-time.After(10 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + _, err = js.AddConsumer("wq2", &nats.ConsumerConfig{Durable: "wqc", FilterSubject: "test", AckPolicy: nats.AckExplicitPolicy}) + require_NoError(t, err) + + ss2, err := js.PullSubscribe("test", "wqc", nats.Bind("wq2", "wqc")) + require_NoError(t, err) + + var doneChan2 = make(chan bool) + go f(ss2, doneChan2) + + checkFor(t, 10*time.Second, 250*time.Millisecond, func() error { + si, err := js.StreamInfo("wq2") + require_NoError(t, err) + if si.State.Bytes > 0 && si.State.Bytes <= maxBytes { + return fmt.Errorf("expected 0 bytes on stream wq2, got: %+v", si.State) + } else if si.State.Bytes > maxBytes { + t.Fatalf("got more than our %d bytes limit on stream wq2: %+v", maxMsgs, si.State) + } + + return nil + }) + + select { + case <-doneChan2: + ss2.Drain() + case <-time.After(20 * time.Second): t.Fatalf("Did not receive completion signal") } } diff --git a/server/stream.go b/server/stream.go index bd02a1265fb..8c0e81b5669 100644 --- a/server/stream.go +++ b/server/stream.go @@ -3471,7 +3471,7 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool { // Can happen temporarily all the time during normal operations when the sourcing stream // is working queue/interest with a limit and discard new. // TODO - Improve sourcing to WQ with limit and new to use flow control rather than re-creating the consumer. - if errors.Is(err, ErrMaxMsgs) { + if errors.Is(err, ErrMaxMsgs) || errors.Is(err, ErrMaxBytes) { // Do not need to do a full retry that includes finding the last sequence in the stream // for that source. Just re-create starting with the seq we couldn't store instead. mset.mu.Lock() From 822136916c3fe3e6cc8ebf6f77fb8159691dc9e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-No=C3=ABl=20Moyne?= Date: Thu, 8 Aug 2024 15:19:24 -0700 Subject: [PATCH 2/2] Improve Stream sourcing to a stream with discard new: max per subject MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit You can have a wq or interest stream that is sourcing have a discard new policy (so you can limit its size) was working only for the max messages and max bytes limits, now is also working for the max messages per subjects limit when discard new per subject is set. Signed-off-by: Jean-Noël Moyne --- server/jetstream_cluster.go | 45 ++++++++++++++++-- server/jetstream_cluster_4_test.go | 73 +++++++++++++++++++++++------- server/stream.go | 31 +++++++------ 3 files changed, 113 insertions(+), 36 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 85ac8e4c75f..a2b5f00b78e 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3098,6 +3098,17 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco mset.clMu.Unlock() } + if mset.inflightSubjects != nil { + mset.clMu.Lock() + n := mset.inflightSubjects[subject] + if n > 1 { + mset.inflightSubjects[subject]-- + } else { + delete(mset.inflightSubjects, subject) + } + mset.clMu.Unlock() + } + if err != nil { if err == errLastSeqMismatch { @@ -7966,7 +7977,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ name, stype, store := mset.cfg.Name, mset.cfg.Storage, mset.store s, js, jsa, st, r, tierName, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.tier, mset.outq, mset.node maxMsgSize, lseq := int(mset.cfg.MaxMsgSize), mset.lseq - interestPolicy, discard, maxMsgs, maxBytes := mset.cfg.Retention != LimitsPolicy, mset.cfg.Discard, mset.cfg.MaxMsgs, mset.cfg.MaxBytes + interestPolicy, discard, maxMsgs, maxBytes, discardNewPerSubject, maxMsgsPer := mset.cfg.Retention != LimitsPolicy, mset.cfg.Discard, mset.cfg.MaxMsgs, mset.cfg.MaxBytes, mset.cfg.DiscardNewPer, mset.cfg.MaxMsgsPer isLeader, isSealed, compressOK := mset.isLeader(), mset.cfg.Sealed, mset.compressOK mset.mu.RUnlock() @@ -8143,27 +8154,36 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ mset.clseq = lseq + mset.clfs } - // Check if we have an interest policy and discard new with max msgs or bytes. + // Check if we have an interest or working queue retention and discard new with max msgs or bytes. // We need to deny here otherwise it could succeed on some peers and not others // depending on consumer ack state. So we deny here, if we allow that means we know // it would succeed on every peer. - if interestPolicy && discard == DiscardNew && (maxMsgs > 0 || maxBytes > 0) { + if interestPolicy && discard == DiscardNew && (maxMsgs > 0 || maxBytes > 0 || (maxMsgsPer > 0 && discardNewPerSubject)) { // Track inflight. if mset.inflight == nil { mset.inflight = make(map[uint64]uint64) } + if stype == FileStorage { mset.inflight[mset.clseq] = fileStoreMsgSize(subject, hdr, msg) } else { mset.inflight[mset.clseq] = memStoreMsgSize(subject, hdr, msg) } + if mset.inflightSubjects == nil { + mset.inflightSubjects = make(map[string]uint64) + } + + mset.inflightSubjects[subject]++ + var state StreamState mset.store.FastState(&state) var err error - if maxMsgs > 0 && state.Msgs+uint64(len(mset.inflight)) > uint64(maxMsgs) { - err = ErrMaxMsgs + if maxMsgs > 0 { + if state.Msgs+uint64(len(mset.inflight)) > uint64(maxMsgs) { + err = ErrMaxMsgs + } } else if maxBytes > 0 { // TODO(dlc) - Could track this rollup independently. var bytesPending uint64 @@ -8173,9 +8193,24 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ if state.Bytes+bytesPending > uint64(maxBytes) { err = ErrMaxBytes } + } else if maxMsgsPer > 0 && discardNewPerSubject { + totals := mset.store.SubjectsTotals(subject) + total := totals[subject] + if (total + mset.inflightSubjects[subject]) > uint64(maxMsgsPer) { + err = ErrMaxMsgsPerSubject + } } + if err != nil { delete(mset.inflight, mset.clseq) + n := mset.inflightSubjects[subject] + + if n > 1 { + mset.inflightSubjects[subject] = n - 1 + } else { + delete(mset.inflightSubjects, subject) + } + mset.clMu.Unlock() if canRespond { var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}} diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 2897c79ba6c..648c9c3189e 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -254,13 +254,32 @@ func TestJetStreamClusterSourceWorkingQueueWithLimit(t *testing.T) { Sources: []*nats.StreamSource{{Name: "test"}}, Replicas: 3}) require_NoError(t, err) + _, err = js.AddStream(&nats.StreamConfig{Name: "wq3", MaxMsgsPerSubject: maxMsgs, Discard: nats.DiscardNew, DiscardNewPerSubject: true, Retention: nats.WorkQueuePolicy, + Sources: []*nats.StreamSource{{Name: "test"}}, Replicas: 3}) + require_NoError(t, err) + sendBatch := func(subject string, n int) { for i := 0; i < n; i++ { _, err = js.Publish(subject, []byte(fmt.Sprintf(msgPayloadFormat, i))) require_NoError(t, err) } } - // Populate each one. + + f := func(ss *nats.Subscription, done chan bool) { + for i := 0; i < totalMsgs; i++ { + m, err := ss.Fetch(1, nats.MaxWait(3*time.Second)) + require_NoError(t, err) + p, err := strconv.Atoi(string(m[0].Data)) + require_NoError(t, err) + require_Equal(t, p, i) + time.Sleep(11 * time.Millisecond) + err = m[0].Ack() + require_NoError(t, err) + } + done <- true + } + + // Populate the sourced stream. sendBatch("test", totalMsgs) checkFor(t, 3*time.Second, 250*time.Millisecond, func() error { @@ -281,27 +300,21 @@ func TestJetStreamClusterSourceWorkingQueueWithLimit(t *testing.T) { return nil }) + checkFor(t, 3*time.Second, 250*time.Millisecond, func() error { + si, err := js.StreamInfo("wq3") + require_NoError(t, err) + if si.State.Msgs != maxMsgs { + return fmt.Errorf("expected %d msgs on stream wq, got state: %+v", maxMsgs, si.State) + } + return nil + }) + _, err = js.AddConsumer("wq", &nats.ConsumerConfig{Durable: "wqc", FilterSubject: "test", AckPolicy: nats.AckExplicitPolicy}) require_NoError(t, err) ss1, err := js.PullSubscribe("test", "wqc", nats.Bind("wq", "wqc")) require_NoError(t, err) - // we must have at least one message on the transformed subject name (ie no timeout) - f := func(ss *nats.Subscription, done chan bool) { - for i := 0; i < totalMsgs; i++ { - m, err := ss.Fetch(1, nats.MaxWait(3*time.Second)) - require_NoError(t, err) - p, err := strconv.Atoi(string(m[0].Data)) - require_NoError(t, err) - require_Equal(t, p, i) - time.Sleep(11 * time.Millisecond) - err = m[0].Ack() - require_NoError(t, err) - } - done <- true - } - var doneChan1 = make(chan bool) go f(ss1, doneChan1) @@ -351,6 +364,34 @@ func TestJetStreamClusterSourceWorkingQueueWithLimit(t *testing.T) { case <-time.After(20 * time.Second): t.Fatalf("Did not receive completion signal") } + + _, err = js.AddConsumer("wq3", &nats.ConsumerConfig{Durable: "wqc", FilterSubject: "test", AckPolicy: nats.AckExplicitPolicy}) + require_NoError(t, err) + + ss3, err := js.PullSubscribe("test", "wqc", nats.Bind("wq3", "wqc")) + require_NoError(t, err) + + var doneChan3 = make(chan bool) + go f(ss3, doneChan3) + + checkFor(t, 10*time.Second, 250*time.Millisecond, func() error { + si, err := js.StreamInfo("wq3") + require_NoError(t, err) + if si.State.Msgs > 0 && si.State.Msgs <= maxMsgs { + return fmt.Errorf("expected 0 msgs on stream wq3, got: %d", si.State.Msgs) + } else if si.State.Msgs > maxMsgs { + t.Fatalf("got more than our %d message limit on stream wq3: %+v", maxMsgs, si.State) + } + + return nil + }) + + select { + case <-doneChan3: + ss3.Drain() + case <-time.After(10 * time.Second): + t.Fatalf("Did not receive completion signal") + } } func TestJetStreamClusterConsumerPauseViaConfig(t *testing.T) { diff --git a/server/stream.go b/server/stream.go index 8c0e81b5669..8c69c9b366d 100644 --- a/server/stream.go +++ b/server/stream.go @@ -336,20 +336,21 @@ type stream struct { // TODO(dlc) - Hide everything below behind two pointers. // Clustered mode. - sa *streamAssignment // What the meta controller uses to assign streams to peers. - node RaftNode // Our RAFT node for the stream's group. - catchup atomic.Bool // Used to signal we are in catchup mode. - catchups map[string]uint64 // The number of messages that need to be caught per peer. - syncSub *subscription // Internal subscription for sync messages (on "$JSC.SYNC"). - infoSub *subscription // Internal subscription for stream info requests. - clMu sync.Mutex // The mutex for clseq and clfs. - clseq uint64 // The current last seq being proposed to the NRG layer. - clfs uint64 // The count (offset) of the number of failed NRG sequences used to compute clseq. - inflight map[uint64]uint64 // Inflight message sizes per clseq. - lqsent time.Time // The time at which the last lost quorum advisory was sent. Used to rate limit. - uch chan struct{} // The channel to signal updates to the monitor routine. - compressOK bool // True if we can do message compression in RAFT and catchup logic - inMonitor bool // True if the monitor routine has been started. + sa *streamAssignment // What the meta controller uses to assign streams to peers. + node RaftNode // Our RAFT node for the stream's group. + catchup atomic.Bool // Used to signal we are in catchup mode. + catchups map[string]uint64 // The number of messages that need to be caught per peer. + syncSub *subscription // Internal subscription for sync messages (on "$JSC.SYNC"). + infoSub *subscription // Internal subscription for stream info requests. + clMu sync.Mutex // The mutex for clseq, clfs, inflight and inflightSubjects. + clseq uint64 // The current last seq being proposed to the NRG layer. + clfs uint64 // The count (offset) of the number of failed NRG sequences used to compute clseq. + inflight map[uint64]uint64 // Inflight message sizes per clseq. + inflightSubjects map[string]uint64 // Inflight number of messages per subject. + lqsent time.Time // The time at which the last lost quorum advisory was sent. Used to rate limit. + uch chan struct{} // The channel to signal updates to the monitor routine. + compressOK bool // True if we can do message compression in RAFT and catchup logic + inMonitor bool // True if the monitor routine has been started. // Direct get subscription. directSub *subscription @@ -3471,7 +3472,7 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool { // Can happen temporarily all the time during normal operations when the sourcing stream // is working queue/interest with a limit and discard new. // TODO - Improve sourcing to WQ with limit and new to use flow control rather than re-creating the consumer. - if errors.Is(err, ErrMaxMsgs) || errors.Is(err, ErrMaxBytes) { + if errors.Is(err, ErrMaxMsgs) || errors.Is(err, ErrMaxBytes) || errors.Is(err, ErrMaxMsgsPerSubject) { // Do not need to do a full retry that includes finding the last sequence in the stream // for that source. Just re-create starting with the seq we couldn't store instead. mset.mu.Lock()