Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Control Loop #225

Merged
merged 22 commits into from
Feb 18, 2025
Merged
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a99b3a6
Migrate to kube_codegen.sh
samuelattwood Nov 20, 2024
4c499e0
Migrate to reconciliation loop using controller-runtime (#208)
adriandieter Dec 9, 2024
e9dd49c
feat(controller-runtime): Implement stream controller (#211)
adriandieter Dec 19, 2024
c353ccc
feat(controller-runtime): Add consumer controller (#212)
adriandieter Dec 31, 2024
5bd0945
feat(controller-runtime): Add keyvalue store spec and controller (#215)
samuelattwood Jan 7, 2025
60d50f8
Add initial object store types. Update Stream config and reorg to kee…
samuelattwood Jan 7, 2025
f15f695
Add ObjectStore tests and remaining options
samuelattwood Jan 7, 2025
726b0d5
Add test for sealed stream option
samuelattwood Jan 7, 2025
180099a
Add Account Controller (#224)
samuelattwood Jan 16, 2025
af63722
Deps
samuelattwood Jan 16, 2025
4b8a530
Create configured cache dir if DNE
samuelattwood Jan 21, 2025
aee234b
Move stream controller to jsm.go for pedantic mode
samuelattwood Jan 21, 2025
f9e8121
Move consumer controller to jsm.go for pedantic mode
samuelattwood Jan 21, 2025
b0f8736
Remove debug log entry
samuelattwood Jan 21, 2025
5db0cff
deps
samuelattwood Jan 24, 2025
ad4cf12
Improve connection config priority. Add missing option from consumer …
samuelattwood Jan 31, 2025
18dec43
Deps. Fix placement config enforcement
samuelattwood Jan 31, 2025
6ddcced
Bump jsm.go. Fix typo
samuelattwood Jan 31, 2025
577ce79
Log diff on resource update
samuelattwood Jan 31, 2025
ad9693f
Merge branch 'main' into feature/controller-runtime
samuelattwood Feb 18, 2025
fc8ad13
Improve README. Modernize examples.
samuelattwood Feb 18, 2025
7a196bb
Avoid excess disk writes to cache directory. README tweaks.
samuelattwood Feb 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Move consumer controller to jsm.go for pedantic mode
samuelattwood committed Jan 31, 2025

Verified

This commit was signed with the committer’s verified signature.
samuelattwood Samuel Attwood
commit f9e8121d8b00d4a8ff92d1cfb4c2fb0f31615184
220 changes: 150 additions & 70 deletions internal/controller/consumer_controller.go
Original file line number Diff line number Diff line change
@@ -21,10 +21,14 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"time"

"github.com/nats-io/jsm.go"
jsmapi "github.com/nats-io/jsm.go/api"

"github.com/go-logr/logr"
"github.com/nats-io/nats.go/jetstream"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
@@ -149,21 +153,21 @@ func (r *ConsumerReconciler) deleteConsumer(ctx context.Context, log logr.Logger
}

if !consumer.Spec.PreventDelete && !r.ReadOnly() {
err := r.WithJetStreamClient(consumer.Spec.ConnectionOpts, consumer.Namespace, func(js jetstream.JetStream) error {
_, err := getServerConsumerState(ctx, js, consumer)
err := r.WithJSMClient(consumer.Spec.ConnectionOpts, consumer.Namespace, func(js *jsm.Manager) error {
_, err := getServerConsumerState(js, consumer)
// If we have no known state for this consumer it has never been reconciled.
// If we are also receiving an error fetching state, either the consumer does not exist
// or this resource config is invalid.
if err != nil && storedState == nil {
return nil
}

return js.DeleteConsumer(ctx, consumer.Spec.StreamName, consumer.Spec.DurableName)
return js.DeleteConsumer(consumer.Spec.StreamName, consumer.Spec.DurableName)
})
switch {
case errors.Is(err, jetstream.ErrConsumerNotFound):
case jsm.IsNatsError(err, JSConsumerNotFoundErr):
log.Info("Consumer does not exist. Unable to delete.")
case errors.Is(err, jetstream.ErrStreamNotFound):
case jsm.IsNatsError(err, JSStreamNotFoundErr):
log.Info("Stream of consumer does not exist. Unable to delete.")
case err != nil:
if storedState == nil {
@@ -201,13 +205,13 @@ func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger
return fmt.Errorf("map consumer spec to target config: %w", err)
}

err = r.WithJetStreamClient(consumer.Spec.ConnectionOpts, consumer.Namespace, func(js jetstream.JetStream) error {
err = r.WithJSMClient(consumer.Spec.ConnectionOpts, consumer.Namespace, func(js *jsm.Manager) error {
storedState, err := getStoredConsumerState(consumer)
if err != nil {
log.Error(err, "Failed to fetch stored consumer state.")
}

serverState, err := getServerConsumerState(ctx, js, consumer)
serverState, err := getServerConsumerState(js, consumer)
if err != nil {
return err
}
@@ -232,18 +236,28 @@ func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger
return nil
}

var updatedConsumer jetstream.Consumer
var updatedConsumer *jsm.Consumer
err = nil

if serverState == nil {
log.Info("Creating Consumer.")
updatedConsumer, err = js.CreateConsumer(ctx, consumer.Spec.StreamName, *targetConfig)
updatedConsumer, err = js.NewConsumer(consumer.Spec.StreamName, targetConfig...)
if err != nil {
return err
}
} else if !consumer.Spec.PreventUpdate {
log.Info("Updating Consumer.")
updatedConsumer, err = js.UpdateConsumer(ctx, consumer.Spec.StreamName, *targetConfig)
c, err := js.LoadConsumer(consumer.Spec.StreamName, consumer.Spec.DurableName)
if err != nil {
return err
}

err = c.UpdateConfiguration(targetConfig...)
if err != nil {
return err
}

updatedConsumer, err = js.LoadConsumer(consumer.Spec.StreamName, consumer.Spec.DurableName)
if err != nil {
return err
}
@@ -255,7 +269,7 @@ func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger

if updatedConsumer != nil {
// Store known state in annotation
updatedState, err := json.Marshal(updatedConsumer.CachedInfo().Config)
updatedState, err := json.Marshal(updatedConsumer.Configuration())
if err != nil {
return err
}
@@ -295,8 +309,8 @@ func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger
return nil
}

func getStoredConsumerState(consumer *api.Consumer) (*jetstream.ConsumerConfig, error) {
var storedState *jetstream.ConsumerConfig
func getStoredConsumerState(consumer *api.Consumer) (*jsmapi.ConsumerConfig, error) {
var storedState *jsmapi.ConsumerConfig
if state, ok := consumer.Annotations[stateAnnotationConsumer]; ok {
err := json.Unmarshal([]byte(state), &storedState)
if err != nil {
@@ -309,88 +323,148 @@ func getStoredConsumerState(consumer *api.Consumer) (*jetstream.ConsumerConfig,

// Fetch the current state of the consumer from the server.
// ErrConsumerNotFound is considered a valid response and does not return error
func getServerConsumerState(ctx context.Context, js jetstream.JetStream, consumer *api.Consumer) (*jetstream.ConsumerConfig, error) {
c, err := js.Consumer(ctx, consumer.Spec.StreamName, consumer.Spec.DurableName)
if errors.Is(err, jetstream.ErrConsumerNotFound) {
func getServerConsumerState(js *jsm.Manager, consumer *api.Consumer) (*jsmapi.ConsumerConfig, error) {
c, err := js.LoadConsumer(consumer.Spec.StreamName, consumer.Spec.DurableName)
if jsm.IsNatsError(err, JSConsumerNotFoundErr) {
return nil, nil
}
if err != nil {
return nil, err
}

return &c.CachedInfo().Config, nil
consumerCfg := c.Configuration()
return &consumerCfg, nil
}

func consumerSpecToConfig(spec *api.ConsumerSpec) (*jetstream.ConsumerConfig, error) {
config := &jetstream.ConsumerConfig{
Durable: spec.DurableName,
Description: spec.Description,
OptStartSeq: uint64(spec.OptStartSeq),
MaxDeliver: spec.MaxDeliver,
FilterSubject: spec.FilterSubject,
RateLimit: uint64(spec.RateLimitBps),
SampleFrequency: spec.SampleFreq,
MaxWaiting: spec.MaxWaiting,
MaxAckPending: spec.MaxAckPending,
HeadersOnly: spec.HeadersOnly,
MaxRequestBatch: spec.MaxRequestBatch,
MaxRequestMaxBytes: spec.MaxRequestMaxBytes,
Replicas: spec.Replicas,
MemoryStorage: spec.MemStorage,
FilterSubjects: spec.FilterSubjects,
Metadata: spec.Metadata,
}

// DeliverPolicy
if spec.DeliverPolicy != "" {
err := config.DeliverPolicy.UnmarshalJSON(jsonString(spec.DeliverPolicy))
func consumerSpecToConfig(spec *api.ConsumerSpec) ([]jsm.ConsumerOption, error) {
opts := []jsm.ConsumerOption{
jsm.ConsumerDescription(spec.Description),
jsm.DeliverySubject(spec.DeliverSubject),
jsm.DeliverGroup(spec.DeliverGroup),
jsm.DurableName(spec.DurableName),
jsm.MaxAckPending(uint(spec.MaxAckPending)),
jsm.MaxWaiting(uint(spec.MaxWaiting)),
jsm.RateLimitBitsPerSecond(uint64(spec.RateLimitBps)),
jsm.MaxRequestBatch(uint(spec.MaxRequestBatch)),
jsm.MaxRequestMaxBytes(spec.MaxRequestMaxBytes),
jsm.ConsumerOverrideReplicas(spec.Replicas),
jsm.ConsumerMetadata(spec.Metadata),
}

// ackPolicy
switch spec.AckPolicy {
case "none":
opts = append(opts, jsm.AcknowledgeNone())
case "all":
opts = append(opts, jsm.AcknowledgeAll())
case "explicit":
opts = append(opts, jsm.AcknowledgeExplicit())
case "":
default:
return nil, fmt.Errorf("invalid value for 'ackPolicy': '%s'. Must be one of 'none', 'all', 'explicit'", spec.AckPolicy)
}

// ackWait
if spec.AckWait != "" {
d, err := time.ParseDuration(spec.AckWait)
if err != nil {
return nil, fmt.Errorf("invalid delivery policy: %w", err)
return nil, fmt.Errorf("invalid ack wait duration: %w", err)
}
opts = append(opts, jsm.AckWait(d))
}

// OptStartTime RFC3339
if spec.OptStartTime != "" {
// deliverPolicy
switch spec.DeliverPolicy {
case "all":
opts = append(opts, jsm.DeliverAllAvailable())
case "last":
opts = append(opts, jsm.StartWithLastReceived())
case "new":
opts = append(opts, jsm.StartWithNextReceived())
case "byStartSequence":
opts = append(opts, jsm.StartAtSequence(uint64(spec.OptStartSeq)))
case "byStartTime":
if spec.OptStartTime == "" {
return nil, fmt.Errorf("'optStartTime' is required for deliver policy 'byStartTime'")
}
t, err := time.Parse(time.RFC3339, spec.OptStartTime)
if err != nil {
return nil, fmt.Errorf("invalid opt start time: %w", err)
return nil, err
}
config.OptStartTime = &t
opts = append(opts, jsm.StartAtTime(t))
case "":
default:
return nil, fmt.Errorf("invalid value for 'deliverPolicy': '%s'. Must be one of 'all', 'last', 'new', 'byStartSequence', 'byStartTime'", spec.DeliverPolicy)
}

// AckPolicy
if spec.AckPolicy != "" {
err := config.AckPolicy.UnmarshalJSON(jsonString(spec.AckPolicy))
if err != nil {
return nil, fmt.Errorf("invalid ack policy: %w", err)
}
// filterSubject
if spec.FilterSubject != "" && len(spec.FilterSubjects) > 0 {
return nil, errors.New("cannot set both 'filterSubject' and 'filterSubjects'")
}

// AckWait
if spec.AckWait != "" {
d, err := time.ParseDuration(spec.AckWait)
if spec.FilterSubject != "" {
opts = append(opts, jsm.FilterStreamBySubject(spec.FilterSubject))
} else if len(spec.FilterSubjects) > 0 {
opts = append(opts, jsm.FilterStreamBySubject(spec.FilterSubjects...))
}

// flowControl
if spec.FlowControl {
opts = append(opts, jsm.PushFlowControl())
}

// heartbeatInterval
if spec.HeartbeatInterval != "" {
d, err := time.ParseDuration(spec.HeartbeatInterval)
if err != nil {
return nil, fmt.Errorf("invalid ack wait duration: %w", err)
return nil, fmt.Errorf("invalid heartbeat interval: %w", err)
}
config.AckWait = d

opts = append(opts, jsm.IdleHeartbeat(d))
}

// BackOff
for _, bo := range spec.BackOff {
d, err := time.ParseDuration(bo)
if err != nil {
return nil, fmt.Errorf("invalid backoff: %w", err)
// maxDeliver
if spec.MaxDeliver != 0 {
opts = append(opts, jsm.MaxDeliveryAttempts(spec.MaxDeliver))
}

// backoff
if len(spec.BackOff) > 0 {
backoffs := make([]time.Duration, 0)
for _, bo := range spec.BackOff {
d, err := time.ParseDuration(bo)
if err != nil {
return nil, fmt.Errorf("invalid backoff: %w", err)
}
backoffs = append(backoffs, d)
}

config.BackOff = append(config.BackOff, d)
opts = append(opts, jsm.BackoffIntervals(backoffs...))
}

// ReplayPolicy
if spec.ReplayPolicy != "" {
err := config.ReplayPolicy.UnmarshalJSON(jsonString(spec.ReplayPolicy))
// replayPolicy
switch spec.ReplayPolicy {
case "instant":
opts = append(opts, jsm.ReplayInstantly())
case "original":
opts = append(opts, jsm.ReplayAsReceived())
case "":
default:
return nil, fmt.Errorf("invalid value for 'replayPolicy': '%s'. Must be one of 'instant', 'original'", spec.ReplayPolicy)
}

if spec.SampleFreq != "" {
n, err := strconv.Atoi(
strings.TrimSuffix(spec.SampleFreq, "%"),
)
if err != nil {
return nil, fmt.Errorf("invalid replay policy: %w", err)
return nil, err
}
opts = append(opts, jsm.SamplePercent(n))
}

if spec.HeadersOnly {
opts = append(opts, jsm.DeliverHeadersOnly())
}

// MaxRequestExpires
@@ -399,18 +473,24 @@ func consumerSpecToConfig(spec *api.ConsumerSpec) (*jetstream.ConsumerConfig, er
if err != nil {
return nil, fmt.Errorf("invalid opt start time: %w", err)
}
config.MaxRequestExpires = d
opts = append(opts, jsm.MaxRequestExpires(d))
}

// inactiveThreshold
if spec.InactiveThreshold != "" {
d, err := time.ParseDuration(spec.InactiveThreshold)
if err != nil {
return nil, fmt.Errorf("invalid inactive threshold: %w", err)
}
config.InactiveThreshold = d
opts = append(opts, jsm.InactiveThreshold(d))
}

// memStorage
if spec.MemStorage {
opts = append(opts, jsm.ConsumerOverrideMemoryStorage())
}

return config, nil
return opts, nil
}

// SetupWithManager sets up the controller with the Manager.
104 changes: 92 additions & 12 deletions internal/controller/consumer_controller_test.go
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ import (
"testing"
"time"

jsmapi "github.com/nats-io/jsm.go/api"
"github.com/nats-io/nats.go/jetstream"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
@@ -584,13 +585,13 @@ func Test_consumerSpecToConfig(t *testing.T) {
tests := []struct {
name string
spec *api.ConsumerSpec
want *jetstream.ConsumerConfig
want *jsmapi.ConsumerConfig
wantErr bool
}{
{
name: "empty spec",
spec: &api.ConsumerSpec{},
want: &jetstream.ConsumerConfig{},
want: &jsmapi.ConsumerConfig{},
wantErr: false,
},
{
@@ -600,11 +601,11 @@ func Test_consumerSpecToConfig(t *testing.T) {
AckWait: "10ns",
BackOff: []string{"1s", "5m"},
DeliverGroup: "",
DeliverPolicy: "new",
DeliverPolicy: "byStartSequence",
DeliverSubject: "",
Description: "test consumer",
DurableName: "test-consumer",
FilterSubject: "time.us.>",
FilterSubject: "",
FilterSubjects: []string{"time.us.east", "time.us.west"},
FlowControl: false,
HeadersOnly: true,
@@ -617,7 +618,7 @@ func Test_consumerSpecToConfig(t *testing.T) {
MaxWaiting: 5,
MemStorage: true,
OptStartSeq: 17,
OptStartTime: dateString,
OptStartTime: "",
RateLimitBps: 512,
ReplayPolicy: "instant",
Replicas: 9,
@@ -638,18 +639,17 @@ func Test_consumerSpecToConfig(t *testing.T) {
},
},
},
want: &jetstream.ConsumerConfig{
want: &jsmapi.ConsumerConfig{
Durable: "test-consumer",
Description: "test consumer",
DeliverPolicy: jetstream.DeliverNewPolicy,
DeliverPolicy: jsmapi.DeliverByStartSequence,
OptStartSeq: 17,
OptStartTime: &date,
AckPolicy: jetstream.AckExplicitPolicy,
AckPolicy: jsmapi.AckExplicit,
AckWait: 10 * time.Nanosecond,
MaxDeliver: 3,
BackOff: []time.Duration{time.Second, 5 * time.Minute},
FilterSubject: "time.us.>",
ReplayPolicy: jetstream.ReplayInstantPolicy,
FilterSubject: "",
ReplayPolicy: jsmapi.ReplayInstant,
RateLimit: 512,
SampleFrequency: "25%",
MaxWaiting: 5,
@@ -668,14 +668,94 @@ func Test_consumerSpecToConfig(t *testing.T) {
},
wantErr: false,
},
{
name: "full spec alt",
spec: &api.ConsumerSpec{
AckPolicy: "all",
AckWait: "20ns",
BackOff: []string{"1s", "5m"},
DeliverGroup: "",
DeliverPolicy: "byStartTime",
DeliverSubject: "",
Description: "test consumer",
DurableName: "test-consumer",
FilterSubject: "time.us.>",
FlowControl: true,
HeadersOnly: false,
HeartbeatInterval: "",
MaxAckPending: 5,
MaxDeliver: 6,
MaxRequestBatch: 7,
MaxRequestExpires: "8s",
MaxRequestMaxBytes: 1024,
MaxWaiting: 5,
MemStorage: false,
OptStartSeq: 17,
OptStartTime: dateString,
RateLimitBps: 1024,
ReplayPolicy: "original",
Replicas: 9,
SampleFreq: "30%",
StreamName: "",
Metadata: map[string]string{
"meta": "data",
},
BaseStreamConfig: api.BaseStreamConfig{
PreventDelete: false,
PreventUpdate: false,
ConnectionOpts: api.ConnectionOpts{
Account: "",
Creds: "",
Nkey: "",
TLS: api.TLS{},
Servers: nil,
},
},
},
want: &jsmapi.ConsumerConfig{
Durable: "test-consumer",
Description: "test consumer",
DeliverPolicy: jsmapi.DeliverByStartTime,
OptStartSeq: 0,
OptStartTime: &date,
AckPolicy: jsmapi.AckAll,
AckWait: 20 * time.Nanosecond,
MaxDeliver: 6,
BackOff: []time.Duration{time.Second, 5 * time.Minute},
FlowControl: true,
FilterSubject: "time.us.>",
ReplayPolicy: jsmapi.ReplayOriginal,
RateLimit: 1024,
SampleFrequency: "30%",
MaxWaiting: 5,
MaxAckPending: 5,
HeadersOnly: false,
MaxRequestBatch: 7,
MaxRequestExpires: 8 * time.Second,
MaxRequestMaxBytes: 1024,
InactiveThreshold: 0, // TODO no value?
Replicas: 9,
MemoryStorage: false,
Metadata: map[string]string{
"meta": "data",
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := consumerSpecToConfig(tt.spec)
cOpts, err := consumerSpecToConfig(tt.spec)
if (err != nil) != tt.wantErr {
t.Errorf("consumerSpecToConfig() error = %v, wantErr %v", err, tt.wantErr)
return
}

got := &jsmapi.ConsumerConfig{}
for _, o := range cOpts {
o(got)
}

assert.EqualValues(t, tt.want, got, "consumerSpecToConfig(%v)", tt.spec)
})
}
5 changes: 5 additions & 0 deletions internal/controller/jetstream_controller.go
Original file line number Diff line number Diff line change
@@ -24,6 +24,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
JSConsumerNotFoundErr uint16 = 10014
JSStreamNotFoundErr uint16 = 10059
)

var semVerRe = regexp.MustCompile(`\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?`)

type JetStreamController interface {
5 changes: 2 additions & 3 deletions internal/controller/stream_controller.go
Original file line number Diff line number Diff line change
@@ -148,7 +148,7 @@ func (r *StreamReconciler) deleteStream(ctx context.Context, log logr.Logger, st

return js.DeleteStream(stream.Spec.Name)
})
if errors.Is(err, jetstream.ErrStreamNotFound) {
if jsmapi.IsNatsErr(err, JSStreamNotFoundErr) {
log.Info("Stream does not exist, unable to delete.", "streamName", stream.Spec.Name)
} else if err != nil && storedState == nil {
log.Info("Stream not reconciled and no state received from server. Removing finalizer.")
@@ -303,8 +303,7 @@ func getStoredStreamState(stream *api.Stream) (*jsmapi.StreamConfig, error) {
// JSStreamNotFoundErr is considered a valid response and does not return error
func getServerStreamState(jsm *jsm.Manager, stream *api.Stream) (*jsmapi.StreamConfig, error) {
s, err := jsm.LoadStream(stream.Spec.Name)
// 10059 -> JSStreamNotFoundErr
if jsmapi.IsNatsErr(err, 10059) {
if jsmapi.IsNatsErr(err, JSStreamNotFoundErr) {
return nil, nil
}
if err != nil {
32 changes: 14 additions & 18 deletions pkg/jetstream/apis/jetstream/v1beta2/consumertypes.go
Original file line number Diff line number Diff line change
@@ -22,39 +22,35 @@ func (c *Consumer) GetSpec() interface{} {

// ConsumerSpec is the spec for a Consumer resource
type ConsumerSpec struct {
DurableName string `json:"durableName"` // Maps to Durable
Description string `json:"description"`
DeliverPolicy string `json:"deliverPolicy"`
OptStartSeq int `json:"optStartSeq"`
OptStartTime string `json:"optStartTime"`
AckPolicy string `json:"ackPolicy"`
AckWait string `json:"ackWait"`
DeliverPolicy string `json:"deliverPolicy"`
DeliverSubject string `json:"deliverSubject"`
DeliverGroup string `json:"deliverGroup"`
DurableName string `json:"durableName"` // Maps to Durable
FilterSubject string `json:"filterSubject"`
FilterSubjects []string `json:"filterSubjects"`
FlowControl bool `json:"flowControl"`
HeartbeatInterval string `json:"heartbeatInterval"` // Maps to Heartbeat
MaxAckPending int `json:"maxAckPending"`
MaxDeliver int `json:"maxDeliver"`
BackOff []string `json:"backoff"`
FilterSubject string `json:"filterSubject"`
ReplayPolicy string `json:"replayPolicy"`
RateLimitBps int `json:"rateLimitBps"` // Maps to RateLimit
SampleFreq string `json:"sampleFreq"` // Maps to SampleFrequency
MaxWaiting int `json:"maxWaiting"`
MaxAckPending int `json:"maxAckPending"`
OptStartSeq int `json:"optStartSeq"`
OptStartTime string `json:"optStartTime"`
RateLimitBps int `json:"rateLimitBps"` // Maps to RateLimit
ReplayPolicy string `json:"replayPolicy"`
SampleFreq string `json:"sampleFreq"` // Maps to SampleFrequency
HeadersOnly bool `json:"headersOnly"`
MaxRequestBatch int `json:"maxRequestBatch"`
MaxRequestExpires string `json:"maxRequestExpires"`
MaxRequestMaxBytes int `json:"maxRequestMaxBytes"`
InactiveThreshold string `json:"inactiveThreshold"`
Replicas int `json:"replicas"`
MemStorage bool `json:"memStorage"` // Maps to MemoryStorage
FilterSubjects []string `json:"filterSubjects"`
Metadata map[string]string `json:"metadata"`

// Legacy API options for Push Consumers.
// controller-runtime implementation moves to modern JetStream API over legacy
// which does not support Push Consumers
FlowControl bool `json:"flowControl"`
DeliverSubject string `json:"deliverSubject"`
DeliverGroup string `json:"deliverGroup"`
HeartbeatInterval string `json:"heartbeatInterval"`

StreamName string `json:"streamName"`
BaseStreamConfig
}
18 changes: 9 additions & 9 deletions pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go

Large diffs are not rendered by default.