Skip to content

Commit

Permalink
feat(controller-runtime): Add keyvalue store spec and controller (#215)
Browse files Browse the repository at this point in the history
* Bump helm/kind-action from 1.10.0 to 1.11.0 (#213)

Bumps [helm/kind-action](https://github.com/helm/kind-action) from 1.10.0 to 1.11.0.
- [Release notes](https://github.com/helm/kind-action/releases)
- [Commits](helm/kind-action@v1.10.0...v1.11.0)

---
updated-dependencies:
- dependency-name: helm/kind-action
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Bump helm/kind-action from 1.11.0 to 1.12.0 (#214)

Bumps [helm/kind-action](https://github.com/helm/kind-action) from 1.11.0 to 1.12.0.
- [Release notes](https://github.com/helm/kind-action/releases)
- [Commits](helm/kind-action@v1.11.0...v1.12.0)

---
updated-dependencies:
- dependency-name: helm/kind-action
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* * Formatting

* Add initial definitions for KeyValue store

* Deps

* Fix test

* Add KeyValue controller

* Add KeyValue tests

* Update PreventUpdate behavior

* Minor error handling change

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
samuelattwood and dependabot[bot] committed Jan 31, 2025
1 parent c353ccc commit 5bd0945
Showing 84 changed files with 2,690 additions and 971 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ export GO111MODULE := on

SHELL=/usr/bin/env bash

ENVTEST_K8S_VERSION = 1.29.0
ENVTEST_K8S_VERSION = 1.31.0

now := $(shell date -u +%Y-%m-%dT%H:%M:%S%z)
gitBranch := $(shell git rev-parse --abbrev-ref HEAD)
@@ -197,7 +197,7 @@ mv "$$(echo "$(1)" | sed "s/-$(3)$$//")" $(1) ;\
endef

ENVTEST ?= $(LOCALBIN)/setup-envtest-$(ENVTEST_VERSION)
ENVTEST_VERSION ?= release-0.17
ENVTEST_VERSION ?= release-0.19

.PHONY: envtest
envtest: $(ENVTEST) ## Download setup-envtest locally if necessary.
11 changes: 6 additions & 5 deletions cmd/jetstream-controller/main.go
Original file line number Diff line number Diff line change
@@ -18,6 +18,11 @@ import (
"errors"
"flag"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/nats-io/nack/controllers/jetstream"
"github.com/nats-io/nack/internal/controller"
jetstreamnatsiov1beta2 "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
@@ -29,12 +34,9 @@ import (
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
klog "k8s.io/klog/v2"
"os"
"os/signal"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"syscall"
"time"
)

var (
@@ -155,7 +157,6 @@ func run() error {
}

func runControlLoop(config *rest.Config, natsCfg *controller.NatsConfig, controllerCfg *controller.Config) error {

// Setup scheme
scheme := runtime.NewScheme()
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
4 changes: 2 additions & 2 deletions controllers/jetstream/consumer.go
Original file line number Diff line number Diff line change
@@ -244,7 +244,7 @@ func consumerSpecToOpts(spec apis.ConsumerSpec) ([]jsm.ConsumerOption, error) {
opts = append(opts, jsm.AcknowledgeExplicit())
case "":
default:
return nil, fmt.Errorf("invalid value for 'ackPolicy': '%s'. Must be one of 'none', 'all', 'explicit'.", spec.AckPolicy)
return nil, fmt.Errorf("invalid value for 'ackPolicy': '%s'. Must be one of 'none', 'all', 'explicit'", spec.AckPolicy)
}

if spec.AckWait != "" {
@@ -262,7 +262,7 @@ func consumerSpecToOpts(spec apis.ConsumerSpec) ([]jsm.ConsumerOption, error) {
opts = append(opts, jsm.ReplayAsReceived())
case "":
default:
return nil, fmt.Errorf("invalid value for 'replayPolicy': '%s'. Must be one of 'instant', 'original'.", spec.ReplayPolicy)
return nil, fmt.Errorf("invalid value for 'replayPolicy': '%s'. Must be one of 'instant', 'original'", spec.ReplayPolicy)
}

if spec.SampleFreq != "" {
22 changes: 12 additions & 10 deletions controllers/jetstream/controller.go
Original file line number Diff line number Diff line change
@@ -98,11 +98,11 @@ type Controller struct {

strLister listers.StreamLister
strSynced cache.InformerSynced
strQueue workqueue.RateLimitingInterface
strQueue workqueue.TypedRateLimitingInterface[any]

cnsLister listers.ConsumerLister
cnsSynced cache.InformerSynced
cnsQueue workqueue.RateLimitingInterface
cnsQueue workqueue.TypedRateLimitingInterface[any]

accLister listers.AccountLister

@@ -137,8 +137,8 @@ func NewController(opt Options) *Controller {
}

ji := opt.JetstreamIface.JetstreamV1beta2()
streamQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Streams")
consumerQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Consumers")
streamQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), "Streams")
consumerQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), "Consumers")

streamInformer.Informer().AddEventHandler(
eventHandlers(
@@ -156,6 +156,7 @@ func NewController(opt Options) *Controller {
if err != nil {
panic(err)
}
defer os.RemoveAll(cacheDir)

return &Controller{
ctx: opt.Ctx,
@@ -180,7 +181,6 @@ func NewController(opt Options) *Controller {
}

func (c *Controller) Run() error {

// Connect to NATS.
opts := make([]nats.Option, 0)
// Always attempt to have a connection to NATS.
@@ -647,7 +647,7 @@ func getStorageType(s string) (jsmapi.StorageType, error) {
}
}

func enqueueWork(q workqueue.RateLimitingInterface, item interface{}) (err error) {
func enqueueWork(q workqueue.TypedRateLimitingInterface[any], item interface{}) (err error) {
key, err := cache.MetaNamespaceKeyFunc(item)
if err != nil {
return fmt.Errorf("failed to enqueue work: %w", err)
@@ -657,10 +657,12 @@ func enqueueWork(q workqueue.RateLimitingInterface, item interface{}) (err error
return nil
}

type jsmClientFunc func(*natsContext) (jsmClient, error)
type processorFunc func(ns, name string, jmsClient jsmClientFunc) error
type (
jsmClientFunc func(*natsContext) (jsmClient, error)
processorFunc func(ns, name string, jmsClient jsmClientFunc) error
)

func processQueueNext(q workqueue.RateLimitingInterface, jmsClient jsmClientFunc, process processorFunc) {
func processQueueNext(q workqueue.TypedRateLimitingInterface[any], jmsClient jsmClientFunc, process processorFunc) {
item, shutdown := q.Get()
if shutdown {
return
@@ -730,7 +732,7 @@ func shouldEnqueue(prevObj, nextObj interface{}) bool {
return markedDelete || specChanged
}

func eventHandlers(q workqueue.RateLimitingInterface) cache.ResourceEventHandlerFuncs {
func eventHandlers(q workqueue.TypedRateLimitingInterface[any]) cache.ResourceEventHandlerFuncs {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if err := enqueueWork(q, obj); err != nil {
8 changes: 4 additions & 4 deletions controllers/jetstream/controller_test.go
Original file line number Diff line number Diff line change
@@ -63,7 +63,7 @@ func TestGetStorageType(t *testing.T) {
func TestEnqueueWork(t *testing.T) {
t.Parallel()

limiter := workqueue.DefaultControllerRateLimiter()
limiter := workqueue.DefaultTypedControllerRateLimiter[any]()
q := workqueue.NewNamedRateLimitingQueue(limiter, "StreamsTest")
defer q.ShutDown()

@@ -97,7 +97,7 @@ func TestProcessQueueNext(t *testing.T) {
t.Run("bad item key", func(t *testing.T) {
t.Parallel()

limiter := workqueue.DefaultControllerRateLimiter()
limiter := workqueue.DefaultTypedControllerRateLimiter[any]()
q := workqueue.NewNamedRateLimitingQueue(limiter, "StreamsTest")
defer q.ShutDown()

@@ -122,7 +122,7 @@ func TestProcessQueueNext(t *testing.T) {
t.Run("process error", func(t *testing.T) {
t.Parallel()

limiter := workqueue.DefaultControllerRateLimiter()
limiter := workqueue.DefaultTypedControllerRateLimiter[any]()
q := workqueue.NewNamedRateLimitingQueue(limiter, "StreamsTest")
defer q.ShutDown()

@@ -156,7 +156,7 @@ func TestProcessQueueNext(t *testing.T) {
t.Run("process ok", func(t *testing.T) {
t.Parallel()

limiter := workqueue.DefaultControllerRateLimiter()
limiter := workqueue.DefaultTypedControllerRateLimiter[any]()
q := workqueue.NewNamedRateLimitingQueue(limiter, "StreamsTest")
defer q.ShutDown()

11 changes: 5 additions & 6 deletions controllers/jetstream/stream.go
Original file line number Diff line number Diff line change
@@ -20,7 +20,6 @@ import (
"time"

jsm "github.com/nats-io/jsm.go"
"github.com/nats-io/jsm.go/api"
jsmapi "github.com/nats-io/jsm.go/api"
apis "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
typed "github.com/nats-io/nack/pkg/jetstream/generated/clientset/versioned/typed/jetstream/v1beta2"
@@ -223,9 +222,9 @@ func createStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e

switch spec.Compression {
case "s2":
opts = append(opts, jsm.Compression(api.S2Compression))
opts = append(opts, jsm.Compression(jsmapi.S2Compression))
case "none":
opts = append(opts, jsm.Compression(api.NoCompression))
opts = append(opts, jsm.Compression(jsmapi.NoCompression))
}

if spec.NoAck {
@@ -289,7 +288,7 @@ func createStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e
}

if spec.SubjectTransform != nil {
opts = append(opts, func(o *api.StreamConfig) error {
opts = append(opts, func(o *jsmapi.StreamConfig) error {
o.SubjectTransform = &jsmapi.SubjectTransformConfig{
Source: spec.SubjectTransform.Source,
Destination: spec.SubjectTransform.Dest,
@@ -425,9 +424,9 @@ func updateStream(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err e

switch spec.Compression {
case "s2":
config.Compression = api.S2Compression
config.Compression = jsmapi.S2Compression
case "none":
config.Compression = api.NoCompression
config.Compression = jsmapi.NoCompression
}

return js.UpdateConfiguration(config)
211 changes: 211 additions & 0 deletions deploy/crds.yml
Original file line number Diff line number Diff line change
@@ -1042,3 +1042,214 @@ spec:
password:
description: Key in the secret that contains the password.
type: string
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: keyvalues.jetstream.nats.io
spec:
group: jetstream.nats.io
scope: Namespaced
names:
kind: KeyValue
singular: keyvalue
plural: keyvalues
shortNames:
- kv
versions:
- name: v1beta2
served: true
storage: true
subresources:
status: {}
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
name:
description: A unique name for the Key/Value store.
type: string
description:
description: The description of the Key/Value store.
type: string
maxValueSize:
description: The maximum size of a value in bytes.
type: integer
history:
description: The number of historical values to keep per key.
type: integer
ttl:
description: The time expiry for keys.
type: string
maxBytes:
description: The maximum size of the Key/Value store in bytes.
type: integer
storage:
description: The storage backend to use for the Key/Value store.
type: string
enum:
- file
- memory
replicas:
description: The number of replicas to keep for the Key/Value store in clustered JetStream.
type: integer
minimum: 1
maximum: 5
placement:
description: The Key/Value store placement via tags or cluster name.
type: object
properties:
cluster:
type: string
tags:
type: array
items:
type: string
republish:
description: Republish configuration for the Key/Value store.
type: object
properties:
destination:
type: string
description: Messages will be additionally published to this subject after store.
source:
type: string
description: Messages will be published from this subject to the destination subject.
mirror:
description: A Key/Value store mirror.
type: object
properties:
name:
type: string
optStartSeq:
type: integer
optStartTime:
description: Time format must be RFC3339.
type: string
filterSubject:
type: string
externalApiPrefix:
type: string
externalDeliverPrefix:
type: string
subjectTransforms:
description: List of subject transforms for this mirror.
type: array
items:
description: A subject transform pair.
type: object
properties:
source:
description: Source subject.
type: string
dest:
description: Destination subject.
type: string
compression:
description: Key/Value store compression.
type: boolean
sources:
description: A Key/Value store's sources.
type: array
items:
type: object
properties:
name:
type: string
optStartSeq:
type: integer
optStartTime:
description: Time format must be RFC3339.
type: string
filterSubject:
type: string
externalApiPrefix:
type: string
externalDeliverPrefix:
type: string
subjectTransforms:
description: List of subject transforms for this mirror.
type: array
items:
description: A subject transform pair.
type: object
properties:
source:
description: Source subject.
type: string
dest:
description: Destination subject.
type: string
servers:
description: A list of servers for creating stream
type: array
items:
type: string
default: []
creds:
description: NATS user credentials for connecting to servers. Please make sure your controller has mounted the cerds on its path.
type: string
default: ''
nkey:
description: NATS user NKey for connecting to servers.
type: string
default: ''
tls:
description: A client's TLS certs and keys.
type: object
properties:
clientCert:
description: A client's cert filepath. Should be mounted.
type: string
clientKey:
description: A client's key filepath. Should be mounted.
type: string
rootCas:
description: A list of filepaths to CAs. Should be mounted.
type: array
items:
type: string
account:
description: Name of the account to which the Stream belongs.
type: string
pattern: '^[^.*>]*$'
preventDelete:
description: When true, the managed Stream will not be deleted when the resource is deleted
type: boolean
default: false
preventUpdate:
description: When true, the managed Stream will not be updated when the resource is updated
type: boolean
default: false
status:
type: object
properties:
observedGeneration:
type: integer
conditions:
type: array
items:
type: object
properties:
type:
type: string
status:
type: string
lastTransitionTime:
type: string
reason:
type: string
message:
type: string
additionalPrinterColumns:
- name: State
type: string
description: The current state of the Key/Value store.
jsonPath: .status.conditions[?(@.type == 'Ready')].reason
- name: Key/Value Store Name
type: string
description: The name of the Key/Value store.
jsonPath: .spec.name
3 changes: 3 additions & 0 deletions deploy/rbac.yml
Original file line number Diff line number Diff line change
@@ -31,6 +31,8 @@ rules:
resources:
- streams
- streams/status
- keyvalues
- keyvalues/status
- consumers
- consumers/status
- streamtemplates
@@ -48,6 +50,7 @@ rules:
- jetstream.nats.io
resources:
- streams/finalizers
- keyvalues/finalizers
- consumers/finalizers
- accounts/finalizers
verbs:
72 changes: 34 additions & 38 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
module github.com/nats-io/nack

go 1.23
go 1.23.0

toolchain go1.23.2
toolchain go1.23.4

require (
github.com/fsnotify/fsnotify v1.8.0
github.com/go-logr/logr v1.4.2
github.com/nats-io/jsm.go v0.1.2
github.com/nats-io/nats-server/v2 v2.10.22
github.com/nats-io/nats.go v1.37.0
github.com/onsi/ginkgo/v2 v2.19.0
github.com/onsi/gomega v1.33.1
github.com/nats-io/nats-server/v2 v2.10.24
github.com/nats-io/nats.go v1.38.0
github.com/onsi/ginkgo/v2 v2.22.2
github.com/onsi/gomega v1.36.2
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.9.0
golang.org/x/sync v0.9.0
k8s.io/api v0.31.2
k8s.io/apimachinery v0.31.2
k8s.io/client-go v0.31.2
k8s.io/code-generator v0.31.2
github.com/stretchr/testify v1.10.0
golang.org/x/sync v0.10.0
k8s.io/api v0.32.0
k8s.io/apimachinery v0.32.0
k8s.io/client-go v0.32.0
k8s.io/code-generator v0.32.0
k8s.io/klog/v2 v2.130.1
sigs.k8s.io/controller-runtime v0.19.2
sigs.k8s.io/structured-merge-diff/v4 v4.4.3
sigs.k8s.io/controller-runtime v0.19.3
sigs.k8s.io/structured-merge-diff/v4 v4.5.0
)

require (
@@ -29,66 +30,61 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/emicklei/go-restful/v3 v3.12.1 // indirect
github.com/evanphx/json-patch v5.9.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
github.com/expr-lang/expr v1.16.9 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/zapr v1.3.0 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.21.0 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/gnostic-models v0.6.9 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8 // indirect
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/imdario/mergo v0.3.16 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mailru/easyjson v0.9.0 // indirect
github.com/minio/highwayhash v1.0.3 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nats-io/jwt/v2 v2.7.2 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/jwt/v2 v2.7.3 // indirect
github.com/nats-io/nkeys v0.4.9 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/client_golang v1.20.5 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/common v0.61.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/crypto v0.29.0 // indirect
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 // indirect
golang.org/x/mod v0.22.0 // indirect
golang.org/x/net v0.31.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/oauth2 v0.24.0 // indirect
golang.org/x/sys v0.27.0 // indirect
golang.org/x/term v0.26.0 // indirect
golang.org/x/text v0.20.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/term v0.27.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.8.0 // indirect
golang.org/x/tools v0.27.0 // indirect
golang.org/x/tools v0.28.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/protobuf v1.35.2 // indirect
google.golang.org/protobuf v1.36.1 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.31.0 // indirect
k8s.io/apiextensions-apiserver v0.32.0 // indirect
k8s.io/gengo/v2 v2.0.0-20240911193312-2b36238f13e9 // indirect
k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect
k8s.io/utils v0.0.0-20241104163129-6fe5fd82f078 // indirect
k8s.io/kube-openapi v0.0.0-20241212222426-2c72e554b1e7 // indirect
k8s.io/utils v0.0.0-20241210054802-24370beab758 // indirect
sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)
136 changes: 66 additions & 70 deletions go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions internal/controller/account_controller.go
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ package controller

import (
"context"

"k8s.io/klog/v2"

jetstreamnatsiov1beta2 "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
4 changes: 2 additions & 2 deletions internal/controller/client.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package controller

import (
"fmt"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
@@ -46,7 +47,7 @@ func (o *NatsConfig) buildOptions() ([]nats.Option, error) {
opts = append(opts, nats.ClientCert(o.Certificate, o.Key))
}

if o.CAs != nil && len(o.CAs) > 0 {
if len(o.CAs) > 0 {
opts = append(opts, nats.RootCAs(o.CAs...))
}
}
@@ -62,7 +63,6 @@ type Closable interface {
// Returns a jetstream.Jetstream client and the Closable of the underlying connection.
// Close should be called when the client is no longer used.
func CreateJetStreamClient(cfg *NatsConfig, pedantic bool) (jetstream.JetStream, Closable, error) {

opts, err := cfg.buildOptions()
if err != nil {
return nil, nil, fmt.Errorf("nats options: %w", err)
46 changes: 35 additions & 11 deletions internal/controller/consumer_controller.go
Original file line number Diff line number Diff line change
@@ -20,14 +20,15 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/go-logr/logr"
"github.com/nats-io/nats.go/jetstream"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"time"

api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
ctrl "sigs.k8s.io/controller-runtime"
@@ -67,7 +68,7 @@ func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
)

// Update ready status to unknown when no status is set
if consumer.Status.Conditions == nil || len(consumer.Status.Conditions) == 0 {
if len(consumer.Status.Conditions) == 0 {
log.Info("Setting initial ready condition to unknown.")
consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionUnknown, "Reconciling", "Starting reconciliation")
err := r.Status().Update(ctx, consumer)
@@ -113,7 +114,6 @@ func (r *ConsumerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}

func (r *ConsumerReconciler) deleteConsumer(ctx context.Context, log logr.Logger, consumer *api.Consumer) error {

// Set status to not false
consumer.Status.Conditions = updateReadyCondition(consumer.Status.Conditions, v1.ConditionFalse, "Finalizing", "Performing finalizer operations.")
if err := r.Status().Update(ctx, consumer); err != nil {
@@ -154,11 +154,9 @@ func (r *ConsumerReconciler) deleteConsumer(ctx context.Context, log logr.Logger
}

func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger, consumer *api.Consumer) error {

// Create or Update the stream based on the spec
if consumer.Spec.PreventUpdate || r.ReadOnly() {
if r.ReadOnly() {
log.Info("Skipping consumer creation or update.",
"preventDelete", consumer.Spec.PreventDelete,
"read-only", r.ReadOnly(),
)
return nil
@@ -171,9 +169,36 @@ func (r *ConsumerReconciler) createOrUpdate(ctx context.Context, log klog.Logger
}

err = r.WithJetStreamClient(consumerConnOpts(consumer.Spec), func(js jetstream.JetStream) error {
log.Info("Consumer created or updated.")
_, err := js.CreateOrUpdateConsumer(ctx, consumer.Spec.StreamName, *targetConfig)
return err
consumerName := targetConfig.Name
if consumerName == "" {
consumerName = targetConfig.Durable
}

exists := false
_, err := js.Consumer(ctx, consumer.Spec.StreamName, consumerName)
if err == nil {
exists = true
} else if !errors.Is(err, jetstream.ErrConsumerNotFound) {
return err
}

if !exists {
log.Info("Creating Consumer.")
_, err := js.CreateConsumer(ctx, consumer.Spec.StreamName, *targetConfig)
return err
}

if !consumer.Spec.PreventUpdate {
log.Info("Updating Consumer.")
_, err := js.UpdateConsumer(ctx, consumer.Spec.StreamName, *targetConfig)
return err
} else {
log.Info("Skipping Consumer update.",
"preventUpdate", consumer.Spec.PreventUpdate,
)
}

return nil
})
if err != nil {
err = fmt.Errorf("create or update consumer: %w", err)
@@ -211,7 +236,6 @@ func consumerConnOpts(spec api.ConsumerSpec) *connectionOptions {
}

func consumerSpecToConfig(spec *api.ConsumerSpec) (*jetstream.ConsumerConfig, error) {

config := &jetstream.ConsumerConfig{
Durable: spec.DurableName,
Description: spec.Description,
@@ -269,7 +293,7 @@ func consumerSpecToConfig(spec *api.ConsumerSpec) (*jetstream.ConsumerConfig, er
config.AckWait = d
}

//BackOff
// BackOff
for _, bo := range spec.BackOff {
d, err := time.ParseDuration(bo)
if err != nil {
36 changes: 13 additions & 23 deletions internal/controller/consumer_controller_test.go
Original file line number Diff line number Diff line change
@@ -17,19 +17,19 @@ limitations under the License.
package controller

import (
"testing"
"time"

"github.com/nats-io/nats.go/jetstream"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"testing"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

@@ -67,7 +67,7 @@ var _ = Describe("Consumer Controller", func() {
BeforeEach(func(ctx SpecContext) {
By("creating the custom resource for the Kind Consumer")
err := k8sClient.Get(ctx, typeNamespacedName, consumer)
if err != nil && errors.IsNotFound(err) {
if err != nil && k8serrors.IsNotFound(err) {
resource := &api.Consumer{
ObjectMeta: metav1.ObjectMeta{
Name: resourceName,
@@ -149,9 +149,7 @@ var _ = Describe("Consumer Controller", func() {
})

When("reconciling a not initialized resource", func() {

It("should initialize a new resource", func(ctx SpecContext) {

By("re-queueing until it is initialized")
// Initialization can require multiple reconciliation loops
Eventually(func(ctx SpecContext) *api.Consumer {
@@ -177,7 +175,6 @@ var _ = Describe("Consumer Controller", func() {
})

When("reconciling an initialized resource", func() {

BeforeEach(func(ctx SpecContext) {
By("initializing the stream resource")

@@ -222,7 +219,6 @@ var _ = Describe("Consumer Controller", func() {
})

It("should create a new consumer", func(ctx SpecContext) {

By("running Reconcile")
result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName})
Expect(err).NotTo(HaveOccurred())
@@ -249,7 +245,6 @@ var _ = Describe("Consumer Controller", func() {
})

It("should update an existing consumer", func(ctx SpecContext) {

By("reconciling once to create the consumer")
result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName})
Expect(err).NotTo(HaveOccurred())
@@ -290,21 +285,20 @@ var _ = Describe("Consumer Controller", func() {
})

When("PreventUpdate is set", func() {

BeforeEach(func(ctx SpecContext) {
By("setting preventUpdate on the resource")
consumer.Spec.PreventUpdate = true
Expect(k8sClient.Update(ctx, consumer)).To(Succeed())
})
It("should not create the consumer", func(ctx SpecContext) {
It("should create the consumer", func(ctx SpecContext) {
By("running Reconcile")
result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName})
Expect(err).NotTo(HaveOccurred())
Expect(result.IsZero()).To(BeTrue())

By("checking that no consumer was created")
By("checking that consumer was created")
_, err = jsClient.Consumer(ctx, streamName, consumerName)
Expect(err).To(MatchError(jetstream.ErrConsumerNotFound))
Expect(err).ToNot(HaveOccurred())
})
It("should not update the consumer", func(ctx SpecContext) {
By("creating the consumer")
@@ -318,12 +312,12 @@ var _ = Describe("Consumer Controller", func() {

By("checking that consumer was not updated")
c, err := jsClient.Consumer(ctx, streamName, consumerName)
Expect(err).NotTo(HaveOccurred())
Expect(c.CachedInfo().Config.Description).To(BeEmpty())
})
})

When("read-only mode is enabled", func() {

BeforeEach(func(ctx SpecContext) {
By("setting read only on the controller")
readOnly, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{ReadOnly: true})
@@ -355,12 +349,12 @@ var _ = Describe("Consumer Controller", func() {

By("checking that consumer was not updated")
s, err := jsClient.Consumer(ctx, streamName, consumerName)
Expect(err).NotTo(HaveOccurred())
Expect(s.CachedInfo().Config.Description).To(BeEmpty())
})
})

When("namespace restriction is enabled", func() {

BeforeEach(func(ctx SpecContext) {
By("setting a namespace on the resource")
namespaced, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{Namespace: "other-namespace"})
@@ -392,12 +386,12 @@ var _ = Describe("Consumer Controller", func() {

By("checking that consumer was not updated")
s, err := jsClient.Consumer(ctx, streamName, consumerName)
Expect(err).NotTo(HaveOccurred())
Expect(s.CachedInfo().Config.Description).To(BeEmpty())
})
})

When("the resource is marked for deletion", func() {

BeforeEach(func(ctx SpecContext) {
By("marking the resource for deletion")
Expect(k8sClient.Delete(ctx, consumer)).To(Succeed())
@@ -495,7 +489,6 @@ var _ = Describe("Consumer Controller", func() {
}
})
It("should delete the resource and not delete the consumer", func(ctx SpecContext) {

By("reconciling")
result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName})
Expect(err).NotTo(HaveOccurred())
@@ -521,7 +514,6 @@ var _ = Describe("Consumer Controller", func() {
}
})
It("should not delete the resource and consumer", func(ctx SpecContext) {

By("reconciling")
result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName})
Expect(err).NotTo(HaveOccurred())
@@ -540,7 +532,6 @@ var _ = Describe("Consumer Controller", func() {
})

It("should create consumer on different server as specified in spec", func(ctx SpecContext) {

By("setting up the alternative server")
altServer := CreateTestServer()
defer altServer.Shutdown()
@@ -558,7 +549,7 @@ var _ = Describe("Consumer Controller", func() {
Expect(k8sClient.Update(ctx, consumer)).To(Succeed())

By("checking precondition, that the consumer does not yet exist")
got, err := jsClient.Consumer(ctx, streamName, consumerName)
_, err = jsClient.Consumer(ctx, streamName, consumerName)
Expect(err).To(MatchError(jetstream.ErrConsumerNotFound))

By("reconciling the resource")
@@ -569,7 +560,7 @@ var _ = Describe("Consumer Controller", func() {
Expect(result.IsZero()).To(BeTrue())

By("checking if the consumer was created on the alternative server")
got, err = altClient.Consumer(ctx, streamName, consumerName)
got, err := altClient.Consumer(ctx, streamName, consumerName)
Expect(err).NotTo(HaveOccurred())
Expect(got.CachedInfo().Created).To(BeTemporally("~", time.Now(), time.Second))

@@ -582,8 +573,7 @@ var _ = Describe("Consumer Controller", func() {
})

func Test_consumerSpecToConfig(t *testing.T) {

date := time.Date(2024, 12, 03, 16, 55, 5, 0, time.UTC)
date := time.Date(2024, 12, 3, 16, 55, 5, 0, time.UTC)
dateString := date.Format(time.RFC3339)

tests := []struct {
6 changes: 3 additions & 3 deletions internal/controller/helpers_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package controller

import (
"os"
"time"

api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
"github.com/nats-io/nats-server/v2/server"
natsserver "github.com/nats-io/nats-server/v2/test"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"

"os"
"time"
)

func assertReadyStateMatches(condition api.Condition, status v1.ConditionStatus, reason string, message string, transitionTime time.Time) {
9 changes: 3 additions & 6 deletions internal/controller/jetstream_controller.go
Original file line number Diff line number Diff line change
@@ -2,14 +2,15 @@ package controller

import (
"fmt"
"strings"
"time"

js "github.com/nats-io/nack/controllers/jetstream"
api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
"github.com/nats-io/nats.go/jetstream"
v1 "k8s.io/api/core/v1"
"strings"

"sigs.k8s.io/controller-runtime/pkg/client"
"time"
)

type connectionOptions struct {
@@ -39,7 +40,6 @@ type JetStreamController interface {
}

func NewJSController(k8sClient client.Client, natsConfig *NatsConfig, controllerConfig *Config) (JetStreamController, error) {

return &jsController{
Client: k8sClient,
config: natsConfig,
@@ -63,7 +63,6 @@ func (c *jsController) ValidNamespace(namespace string) bool {
}

func (c *jsController) WithJetStreamClient(opts *connectionOptions, op func(js jetstream.JetStream) error) error {

// Build single use client
// TODO(future-feature): Use client-pool instead of single use client
cfg := c.buildNatsConfig(opts)
@@ -79,7 +78,6 @@ func (c *jsController) WithJetStreamClient(opts *connectionOptions, op func(js j

// buildNatsConfig uses given opts to override the base NatsConfig.
func (c *jsController) buildNatsConfig(opts *connectionOptions) *NatsConfig {

serverUrls := strings.Join(opts.Servers, ",")

// Takes opts values if present
@@ -132,7 +130,6 @@ func or[T comparable](v T, fallback T) T {

// updateReadyCondition returns the given conditions with an added or updated ready condition.
func updateReadyCondition(conditions []api.Condition, status v1.ConditionStatus, reason string, message string) []api.Condition {

var currentStatus v1.ConditionStatus
var lastTransitionTime string
for _, condition := range conditions {
6 changes: 3 additions & 3 deletions internal/controller/jetstream_controller_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package controller

import (
"testing"
"time"

api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"testing"
"time"
)

func Test_updateReadyCondition(t *testing.T) {

pastTransition := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339Nano)
updatedTransition := "now"

312 changes: 312 additions & 0 deletions internal/controller/keyvalue_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
/*
Copyright 2025.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
"context"
"errors"
"fmt"
"time"

"github.com/go-logr/logr"
api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
"github.com/nats-io/nats.go/jetstream"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// KeyValueReconciler reconciles a KeyValue object
type KeyValueReconciler struct {
JetStreamController
}

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
//
// It performs three main operations:
// - Initialize finalizer and ready condition if not present
// - Delete KeyValue if it is marked for deletion.
// - Create or Update the KeyValue
//
// A call to reconcile may perform only one action, expecting the reconciliation to be triggered again by an update.
// For example: Setting the finalizer triggers a second reconciliation. Reconcile returns after setting the finalizer,
// to prevent parallel reconciliations performing the same steps.
func (r *KeyValueReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := klog.FromContext(ctx)

if ok := r.ValidNamespace(req.Namespace); !ok {
log.Info("Controller restricted to namespace, skipping reconciliation.")
return ctrl.Result{}, nil
}

// Fetch KeyValue resource
keyValue := &api.KeyValue{}
if err := r.Get(ctx, req.NamespacedName, keyValue); err != nil {
if apierrors.IsNotFound(err) {
log.Info("KeyValue resource not found. Ignoring since object must be deleted.")
return ctrl.Result{}, nil
}
return ctrl.Result{}, fmt.Errorf("get keyvalue resource '%s': %w", req.NamespacedName.String(), err)
}

log = log.WithValues("keyValueName", keyValue.Spec.Name)

// Update ready status to unknown when no status is set
if len(keyValue.Status.Conditions) == 0 {
log.Info("Setting initial ready condition to unknown.")
keyValue.Status.Conditions = updateReadyCondition(keyValue.Status.Conditions, v1.ConditionUnknown, "Reconciling", "Starting reconciliation")
err := r.Status().Update(ctx, keyValue)
if err != nil {
return ctrl.Result{}, fmt.Errorf("set condition unknown: %w", err)
}
return ctrl.Result{Requeue: true}, nil
}

// Add finalizer
if !controllerutil.ContainsFinalizer(keyValue, keyValueFinalizer) {
log.Info("Adding KeyValue finalizer.")
if ok := controllerutil.AddFinalizer(keyValue, keyValueFinalizer); !ok {
return ctrl.Result{}, errors.New("failed to add finalizer to keyvalue resource")
}

if err := r.Update(ctx, keyValue); err != nil {
return ctrl.Result{}, fmt.Errorf("update keyvalue resource to add finalizer: %w", err)
}
return ctrl.Result{}, nil
}

// Check Deletion
markedForDeletion := keyValue.GetDeletionTimestamp() != nil
if markedForDeletion {
if controllerutil.ContainsFinalizer(keyValue, keyValueFinalizer) {
err := r.deleteKeyValue(ctx, log, keyValue)
if err != nil {
return ctrl.Result{}, fmt.Errorf("delete keyvalue: %w", err)
}
} else {
log.Info("KeyValue marked for deletion and already finalized. Ignoring.")
}

return ctrl.Result{}, nil
}

// Create or update KeyValue
if err := r.createOrUpdate(ctx, log, keyValue); err != nil {
return ctrl.Result{}, fmt.Errorf("create or update: %s", err)
}
return ctrl.Result{}, nil
}

func (r *KeyValueReconciler) deleteKeyValue(ctx context.Context, log logr.Logger, keyValue *api.KeyValue) error {
// Set status to not false
keyValue.Status.Conditions = updateReadyCondition(keyValue.Status.Conditions, v1.ConditionFalse, "Finalizing", "Performing finalizer operations.")
if err := r.Status().Update(ctx, keyValue); err != nil {
return fmt.Errorf("update ready condition: %w", err)
}

if !keyValue.Spec.PreventDelete && !r.ReadOnly() {
log.Info("Deleting KeyValue.")
err := r.WithJetStreamClient(keyValueConnOpts(keyValue.Spec), func(js jetstream.JetStream) error {
return js.DeleteKeyValue(ctx, keyValue.Spec.Name)
})
if errors.Is(err, jetstream.ErrBucketNotFound) {
log.Info("KeyValue does not exist, unable to delete.", "keyValueName", keyValue.Spec.Name)
} else if err != nil {
return fmt.Errorf("delete keyvalue during finalization: %w", err)
}
} else {
log.Info("Skipping KeyValue deletion.",
"preventDelete", keyValue.Spec.PreventDelete,
"read-only", r.ReadOnly(),
)
}

log.Info("Removing KeyValue finalizer.")
if ok := controllerutil.RemoveFinalizer(keyValue, keyValueFinalizer); !ok {
return errors.New("failed to remove keyvalue finalizer")
}
if err := r.Update(ctx, keyValue); err != nil {
return fmt.Errorf("remove finalizer: %w", err)
}

return nil
}

func (r *KeyValueReconciler) createOrUpdate(ctx context.Context, log logr.Logger, keyValue *api.KeyValue) error {
// Create or Update the KeyValue based on the spec
if r.ReadOnly() {
log.Info("Skipping KeyValue creation or update.",
"read-only", r.ReadOnly(),
)
return nil
}

// Map spec to KeyValue targetConfig
targetConfig, err := keyValueSpecToConfig(&keyValue.Spec)
if err != nil {
return fmt.Errorf("map spec to keyvalue targetConfig: %w", err)
}

// UpdateKeyValue is called on every reconciliation when the stream is not to be deleted.
// TODO(future-feature): Do we need to check if config differs?
err = r.WithJetStreamClient(keyValueConnOpts(keyValue.Spec), func(js jetstream.JetStream) error {
exists := false
_, err := js.KeyValue(ctx, targetConfig.Bucket)
if err == nil {
exists = true
} else if !errors.Is(err, jetstream.ErrBucketNotFound) {
return err
}

if !exists {
log.Info("Creating KeyValue.")
_, err = js.CreateKeyValue(ctx, targetConfig)
return err
}

if !keyValue.Spec.PreventUpdate {
log.Info("Updating KeyValue.")
_, err = js.UpdateKeyValue(ctx, targetConfig)
return err
} else {
log.Info("Skipping KeyValue update.",
"preventUpdate", keyValue.Spec.PreventUpdate,
)
}

return nil
})
if err != nil {
err = fmt.Errorf("create or update keyvalue: %w", err)
keyValue.Status.Conditions = updateReadyCondition(keyValue.Status.Conditions, v1.ConditionFalse, "Errored", err.Error())
if err := r.Status().Update(ctx, keyValue); err != nil {
log.Error(err, "Failed to update ready condition to Errored.")
}
return err
}

// update the observed generation and ready status
keyValue.Status.ObservedGeneration = keyValue.Generation
keyValue.Status.Conditions = updateReadyCondition(
keyValue.Status.Conditions,
v1.ConditionTrue,
"Reconciling",
"KeyValue successfully created or updated.",
)
err = r.Status().Update(ctx, keyValue)
if err != nil {
return fmt.Errorf("update ready condition: %w", err)
}

return nil
}

// keyValueConnOpts extracts nats connection relevant fields from the given KeyValue spec as connectionOptions.
func keyValueConnOpts(spec api.KeyValueSpec) *connectionOptions {
return &connectionOptions{
Account: spec.Account,
Creds: spec.Creds,
Nkey: spec.Nkey,
Servers: spec.Servers,
TLS: spec.TLS,
}
}

// keyValueSpecToConfig creates a jetstream.KeyValueConfig matching the given KeyValue resource spec
func keyValueSpecToConfig(spec *api.KeyValueSpec) (jetstream.KeyValueConfig, error) {
// Set directly mapped fields
config := jetstream.KeyValueConfig{
Bucket: spec.Name,
Compression: spec.Compression,
Description: spec.Description,
History: uint8(spec.History),
MaxBytes: int64(spec.MaxBytes),
MaxValueSize: int32(spec.MaxValueSize),
Replicas: spec.Replicas,
}

// TTL
if spec.TTL != "" {
t, err := time.ParseDuration(spec.TTL)
if err != nil {
return jetstream.KeyValueConfig{}, fmt.Errorf("invalid ttl: %w", err)
}
config.TTL = t
}

// storage
if spec.Storage != "" {
err := config.Storage.UnmarshalJSON(asJsonString(spec.Storage))
if err != nil {
return jetstream.KeyValueConfig{}, fmt.Errorf("invalid storage: %w", err)
}
}

// placement
if spec.Placement != nil {
config.Placement = &jetstream.Placement{
Cluster: spec.Placement.Cluster,
Tags: spec.Placement.Tags,
}
}

// mirror
if spec.Mirror != nil {
ss, err := mapStreamSource(spec.Mirror)
if err != nil {
return jetstream.KeyValueConfig{}, fmt.Errorf("map mirror keyvalue source: %w", err)
}
config.Mirror = ss
}

// sources
if spec.Sources != nil {
config.Sources = []*jetstream.StreamSource{}
for _, source := range spec.Sources {
s, err := mapStreamSource(source)
if err != nil {
return jetstream.KeyValueConfig{}, fmt.Errorf("map keyvalue source: %w", err)
}
config.Sources = append(config.Sources, s)
}
}

// RePublish
if spec.Republish != nil {
config.RePublish = &jetstream.RePublish{
Source: spec.Republish.Source,
Destination: spec.Republish.Destination,
HeadersOnly: spec.Republish.HeadersOnly,
}
}

return config, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *KeyValueReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&api.KeyValue{}).
Owns(&api.KeyValue{}).
// Only trigger on generation changes
WithEventFilter(predicate.GenerationChangedPredicate{}).
Complete(r)
}
699 changes: 699 additions & 0 deletions internal/controller/keyvalue_controller_test.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion internal/controller/register.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package controller

import (
"fmt"

ctrl "sigs.k8s.io/controller-runtime"
)

@@ -19,7 +20,6 @@ type Config struct {
// natsCfg is specific to the nats server connection.
// controllerCfg defines behaviour of the registered controllers.
func RegisterAll(mgr ctrl.Manager, clientConfig *NatsConfig, config *Config) error {

baseController, err := NewJSController(mgr.GetClient(), clientConfig, config)
if err != nil {
return fmt.Errorf("create base jetstream controller: %w", err)
43 changes: 31 additions & 12 deletions internal/controller/stream_controller.go
Original file line number Diff line number Diff line change
@@ -20,6 +20,8 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/go-logr/logr"
api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
"github.com/nats-io/nats.go/jetstream"
@@ -29,7 +31,6 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"time"
)

// StreamReconciler reconciles a Stream object
@@ -69,7 +70,7 @@ func (r *StreamReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
log = log.WithValues("streamName", stream.Spec.Name)

// Update ready status to unknown when no status is set
if stream.Status.Conditions == nil || len(stream.Status.Conditions) == 0 {
if len(stream.Status.Conditions) == 0 {
log.Info("Setting initial ready condition to unknown.")
stream.Status.Conditions = updateReadyCondition(stream.Status.Conditions, v1.ConditionUnknown, "Reconciling", "Starting reconciliation")
err := r.Status().Update(ctx, stream)
@@ -115,7 +116,6 @@ func (r *StreamReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}

func (r *StreamReconciler) deleteStream(ctx context.Context, log logr.Logger, stream *api.Stream) error {

// Set status to not false
stream.Status.Conditions = updateReadyCondition(stream.Status.Conditions, v1.ConditionFalse, "Finalizing", "Performing finalizer operations.")
if err := r.Status().Update(ctx, stream); err != nil {
@@ -151,11 +151,9 @@ func (r *StreamReconciler) deleteStream(ctx context.Context, log logr.Logger, st
}

func (r *StreamReconciler) createOrUpdate(ctx context.Context, log logr.Logger, stream *api.Stream) error {

// Create or Update the stream based on the spec
if stream.Spec.PreventUpdate || r.ReadOnly() {
if r.ReadOnly() {
log.Info("Skipping stream creation or update.",
"preventDelete", stream.Spec.PreventDelete,
"read-only", r.ReadOnly(),
)
return nil
@@ -170,9 +168,31 @@ func (r *StreamReconciler) createOrUpdate(ctx context.Context, log logr.Logger,
// CreateOrUpdateStream is called on every reconciliation when the stream is not to be deleted.
// TODO(future-feature): Do we need to check if config differs?
err = r.WithJetStreamClient(streamConnOpts(stream.Spec), func(js jetstream.JetStream) error {
log.Info("Creating or updating stream.")
_, err = js.CreateOrUpdateStream(ctx, targetConfig)
return err
exists := false
_, err := js.Stream(ctx, targetConfig.Name)
if err == nil {
exists = true
} else if !errors.Is(err, jetstream.ErrStreamNotFound) {
return err
}

if !exists {
log.Info("Creating Stream.")
_, err := js.CreateStream(ctx, targetConfig)
return err
}

if !stream.Spec.PreventUpdate {
log.Info("Updating Stream.")
_, err := js.UpdateStream(ctx, targetConfig)
return err
} else {
log.Info("Skipping Stream update.",
"preventUpdate", stream.Spec.PreventUpdate,
)
}

return nil
})
if err != nil {
err = fmt.Errorf("create or update stream: %w", err)
@@ -212,7 +232,6 @@ func streamConnOpts(spec api.StreamSpec) *connectionOptions {

// streamSpecToConfig creates a jetstream.StreamConfig matching the given stream resource spec
func streamSpecToConfig(spec *api.StreamSpec) (jetstream.StreamConfig, error) {

// Set directly mapped fields
config := jetstream.StreamConfig{
Name: spec.Name,
@@ -293,7 +312,7 @@ func streamSpecToConfig(spec *api.StreamSpec) (jetstream.StreamConfig, error) {
if spec.Mirror != nil {
ss, err := mapStreamSource(spec.Mirror)
if err != nil {
return jetstream.StreamConfig{}, fmt.Errorf("map mirror stream soruce: %w", err)
return jetstream.StreamConfig{}, fmt.Errorf("map mirror stream source: %w", err)
}
config.Mirror = ss
}
@@ -304,7 +323,7 @@ func streamSpecToConfig(spec *api.StreamSpec) (jetstream.StreamConfig, error) {
for _, source := range spec.Sources {
s, err := mapStreamSource(source)
if err != nil {
return jetstream.StreamConfig{}, fmt.Errorf("map stream soruce: %w", err)
return jetstream.StreamConfig{}, fmt.Errorf("map stream source: %w", err)
}
config.Sources = append(config.Sources, s)
}
36 changes: 12 additions & 24 deletions internal/controller/stream_controller_test.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,9 @@ limitations under the License.
package controller

import (
"testing"
"time"

"github.com/nats-io/nats.go/jetstream"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
@@ -27,16 +30,13 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"testing"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

api "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
)

var _ = Describe("Stream Controller", func() {

// The test stream resource
const resourceName = "test-stream"
const streamName = "orders"
@@ -136,9 +136,7 @@ var _ = Describe("Stream Controller", func() {
})

When("reconciling a not initialized resource", func() {

It("should initialize a new resource", func(ctx SpecContext) {

By("re-queueing until it is initialized")
// Initialization can require multiple reconciliation loops
Eventually(func(ctx SpecContext) *api.Stream {
@@ -160,11 +158,9 @@ var _ = Describe("Stream Controller", func() {

assertReadyStateMatches(stream.Status.Conditions[0], v1.ConditionUnknown, "Reconciling", "Starting reconciliation", time.Now())
})

})

When("reconciling an initialized resource", func() {

BeforeEach(func(ctx SpecContext) {
By("initializing the stream resource")

@@ -181,11 +177,9 @@ var _ = Describe("Stream Controller", func() {
LastTransitionTime: time.Now().Format(time.RFC3339Nano),
}}
Expect(k8sClient.Status().Update(ctx, stream)).To(Succeed())

})

It("should create a new stream", func(ctx SpecContext) {

By("running Reconcile")
result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName})
Expect(err).NotTo(HaveOccurred())
@@ -212,21 +206,20 @@ var _ = Describe("Stream Controller", func() {
})

When("PreventUpdate is set", func() {

BeforeEach(func(ctx SpecContext) {
By("setting preventDelete on the resource")
stream.Spec.PreventUpdate = true
Expect(k8sClient.Update(ctx, stream)).To(Succeed())
})
It("should not create the stream", func(ctx SpecContext) {
It("should create the stream", func(ctx SpecContext) {
By("running Reconcile")
result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName})
Expect(err).NotTo(HaveOccurred())
Expect(result.IsZero()).To(BeTrue())

By("checking that no stream was created")
By("checking that stream was created")
_, err = jsClient.Stream(ctx, streamName)
Expect(err).To(MatchError(jetstream.ErrStreamNotFound))
Expect(err).NotTo(HaveOccurred())
})
It("should not update the stream", func(ctx SpecContext) {
By("creating the stream")
@@ -240,12 +233,12 @@ var _ = Describe("Stream Controller", func() {

By("checking that stream was not updated")
s, err := jsClient.Stream(ctx, streamName)
Expect(err).NotTo(HaveOccurred())
Expect(s.CachedInfo().Config.Description).To(BeEmpty())
})
})

When("read-only mode is enabled", func() {

BeforeEach(func(ctx SpecContext) {
By("setting read only on the controller")
readOnly, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{ReadOnly: true})
@@ -277,12 +270,12 @@ var _ = Describe("Stream Controller", func() {

By("checking that stream was not updated")
s, err := jsClient.Stream(ctx, streamName)
Expect(err).NotTo(HaveOccurred())
Expect(s.CachedInfo().Config.Description).To(BeEmpty())
})
})

When("namespace restriction is enabled", func() {

BeforeEach(func(ctx SpecContext) {
By("setting a namespace on the resource")
namespaced, err := NewJSController(k8sClient, &NatsConfig{ServerURL: testServer.ClientURL()}, &Config{Namespace: "other-namespace"})
@@ -314,6 +307,7 @@ var _ = Describe("Stream Controller", func() {

By("checking that stream was not updated")
s, err := jsClient.Stream(ctx, streamName)
Expect(err).NotTo(HaveOccurred())
Expect(s.CachedInfo().Config.Description).To(BeEmpty())
})
})
@@ -359,7 +353,6 @@ var _ = Describe("Stream Controller", func() {
})

It("should set an error state when the nats server is not available", func(ctx SpecContext) {

By("setting up controller with unavailable nats server")
// Setup client for not running server
// Use actual test server to ensure port not used by other service on test instance
@@ -398,7 +391,6 @@ var _ = Describe("Stream Controller", func() {
})

When("the resource is marked for deletion", func() {

BeforeEach(func(ctx SpecContext) {
By("marking the resource for deletion")
Expect(k8sClient.Delete(ctx, stream)).To(Succeed())
@@ -480,7 +472,6 @@ var _ = Describe("Stream Controller", func() {
}
})
It("should delete the resource and not delete the stream", func(ctx SpecContext) {

By("reconciling")
result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName})
Expect(err).NotTo(HaveOccurred())
@@ -506,7 +497,6 @@ var _ = Describe("Stream Controller", func() {
}
})
It("should not delete the resource and stream", func(ctx SpecContext) {

By("reconciling")
result, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: typeNamespacedName})
Expect(err).NotTo(HaveOccurred())
@@ -535,7 +525,7 @@ var _ = Describe("Stream Controller", func() {
Expect(k8sClient.Update(ctx, stream)).To(Succeed())

By("checking precondition, that the stream does not yet exist")
got, err := jsClient.Stream(ctx, streamName)
_, err := jsClient.Stream(ctx, streamName)
Expect(err).To(MatchError(jetstream.ErrStreamNotFound))

By("reconciling the resource")
@@ -550,21 +540,19 @@ var _ = Describe("Stream Controller", func() {
defer closer.Close()
Expect(err).NotTo(HaveOccurred())

got, err = altClient.Stream(ctx, streamName)
got, err := altClient.Stream(ctx, streamName)
Expect(err).NotTo(HaveOccurred())
Expect(got.CachedInfo().Created).To(BeTemporally("~", time.Now(), time.Second))

By("checking that the stream was NOT created on the original server")
_, err = jsClient.Stream(ctx, streamName)
Expect(err).To(MatchError(jetstream.ErrStreamNotFound))

})
})
})

func Test_mapSpecToConfig(t *testing.T) {

date := time.Date(2024, 12, 03, 16, 55, 5, 0, time.UTC)
date := time.Date(2024, 12, 3, 16, 55, 5, 0, time.UTC)
dateString := date.Format(time.RFC3339)

tests := []struct {
23 changes: 14 additions & 9 deletions internal/controller/suite_test.go
Original file line number Diff line number Diff line change
@@ -18,13 +18,14 @@ package controller

import (
"fmt"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go/jetstream"
"os"
"path/filepath"
"runtime"
"testing"

"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go/jetstream"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

@@ -41,12 +42,14 @@ import (
// These tests use Ginkgo (BDD-style Go testing framework). Refer to
// http://onsi.github.io/ginkgo/ to learn more about Ginkgo.

var cfg *rest.Config
var k8sClient client.Client
var testEnv *envtest.Environment
var testServer *server.Server
var jsClient jetstream.JetStream
var baseController JetStreamController
var (
cfg *rest.Config
k8sClient client.Client
testEnv *envtest.Environment
testServer *server.Server
jsClient jetstream.JetStream
baseController JetStreamController
)

func TestControllers(t *testing.T) {
RegisterFailHandler(Fail)
@@ -68,7 +71,7 @@ var _ = BeforeSuite(func() {
// Note that you must have the required binaries setup under the bin directory to perform
// the tests directly. When we run make test it will be setup and used automatically.
BinaryAssetsDirectory: filepath.Join("..", "..", "bin", "k8s",
fmt.Sprintf("1.29.0-%s-%s", runtime.GOOS, runtime.GOARCH)),
fmt.Sprintf("1.31.0-%s-%s", runtime.GOOS, runtime.GOARCH)),
}

var err error
@@ -90,7 +93,9 @@ var _ = BeforeSuite(func() {

testNatsConfig := &NatsConfig{ServerURL: testServer.ClientURL()}
baseController, err = NewJSController(k8sClient, testNatsConfig, &Config{})
Expect(err).NotTo(HaveOccurred())
jsClient, _, err = CreateJetStreamClient(testNatsConfig, true)
Expect(err).NotTo(HaveOccurred())
})

var _ = AfterSuite(func() {
1 change: 1 addition & 0 deletions internal/controller/types.go
Original file line number Diff line number Diff line change
@@ -3,5 +3,6 @@ package controller
const (
readyCondType = "Ready"
streamFinalizer = "stream.nats.io/finalizer"
keyValueFinalizer = "keyvalue.nats.io/finalizer"
consumerFinalizer = "consumer.nats.io/finalizer"
)
4 changes: 2 additions & 2 deletions pkg/bootconfig/bootconfig.go
Original file line number Diff line number Diff line change
@@ -121,15 +121,15 @@ func (c *Controller) Run(ctx context.Context) error {

clientAdvertiseConfig := fmt.Sprintf("\nclient_advertise = \"%s\"\n\n", externalAddress)

err = os.WriteFile(c.opts.ClientAdvertiseFileName, []byte(clientAdvertiseConfig), 0644)
err = os.WriteFile(c.opts.ClientAdvertiseFileName, []byte(clientAdvertiseConfig), 0o644)
if err != nil {
return fmt.Errorf("Could not write client advertise config: %s", err)
}
log.Infof("Successfully wrote client advertise config to %q", c.opts.ClientAdvertiseFileName)

gatewayAdvertiseConfig := fmt.Sprintf("\nadvertise = \"%s\"\n\n", externalAddress)

err = os.WriteFile(c.opts.GatewayAdvertiseFileName, []byte(gatewayAdvertiseConfig), 0644)
err = os.WriteFile(c.opts.GatewayAdvertiseFileName, []byte(gatewayAdvertiseConfig), 0o644)
if err != nil {
return fmt.Errorf("Could not write gateway advertise config: %s", err)
}
55 changes: 55 additions & 0 deletions pkg/jetstream/apis/jetstream/v1beta2/keyvaluetypes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package v1beta2

import (
k8smeta "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// Stream is a specification for a Stream resource
type KeyValue struct {
k8smeta.TypeMeta `json:",inline"`
k8smeta.ObjectMeta `json:"metadata,omitempty"`

Spec KeyValueSpec `json:"spec"`
Status Status `json:"status"`
}

func (s *KeyValue) GetSpec() interface{} {
return s.Spec
}

// StreamSpec is the spec for a Stream resource
type KeyValueSpec struct {
Account string `json:"account"`
Compression bool `json:"compression"`
Creds string `json:"creds"`
Description string `json:"description"`
History int `json:"history"`
MaxBytes int `json:"maxBytes"`
MaxValueSize int `json:"maxValueSize"`
Mirror *StreamSource `json:"mirror"`
Name string `json:"name"`
Nkey string `json:"nkey"`
Placement *StreamPlacement `json:"placement"`
PreventDelete bool `json:"preventDelete"`
PreventUpdate bool `json:"preventUpdate"`
Replicas int `json:"replicas"`
Republish *RePublish `json:"republish"`
Servers []string `json:"servers"`
Sources []*StreamSource `json:"sources"`
Storage string `json:"storage"`
TLS TLS `json:"tls"`
TTL string `json:"ttl"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// KeyValueList is a list of Stream resources
type KeyValueList struct {
k8smeta.TypeMeta `json:",inline"`
k8smeta.ListMeta `json:"metadata"`

Items []KeyValue `json:"items"`
}
2 changes: 2 additions & 0 deletions pkg/jetstream/apis/jetstream/v1beta2/register.go
Original file line number Diff line number Diff line change
@@ -33,6 +33,8 @@ func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&Stream{},
&StreamList{},
&KeyValue{},
&KeyValueList{},
&Consumer{},
&ConsumerList{},
&Account{},
111 changes: 110 additions & 1 deletion pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go
Loading

0 comments on commit 5bd0945

Please sign in to comment.