Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Control Loop #225

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@
/nats-boot-config
/nats-boot-config.docker
/tools
/bin
/.idea
67 changes: 50 additions & 17 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
export GO111MODULE := on

SHELL=/usr/bin/env bash

ENVTEST_K8S_VERSION = 1.31.0

now := $(shell date -u +%Y-%m-%dT%H:%M:%S%z)
gitBranch := $(shell git rev-parse --abbrev-ref HEAD)
gitCommit := $(shell git rev-parse --short HEAD)
Expand All @@ -9,8 +13,7 @@ VERSION ?= version-not-set
linkerVars := -X main.BuildTime=$(now) -X main.GitInfo=$(gitBranch)-$(gitCommit)$(repoDirty) -X main.Version=$(VERSION)
drepo ?= natsio

jetstreamGenIn:= $(shell grep -l -R -F "// +k8s:" pkg/jetstream/apis)
jetstreamSrc := $(shell find cmd/jetstream-controller pkg/jetstream controllers/jetstream -name "*.go") pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go
jetstreamSrc := $(shell find cmd/jetstream-controller pkg/jetstream internal/controller controllers/jetstream -name "*.go") pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go

configReloaderSrc := $(shell find cmd/nats-server-config-reloader/ pkg/natsreloader/ -name "*.go")

Expand All @@ -27,20 +30,21 @@ default:
# make nats-server-config-reloader
# make nats-boot-config

pkg/jetstream/generated pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go: fetch-modules $(jetstreamGenIn) pkg/k8scodegen/file-header.txt
generate: fetch-modules pkg/k8scodegen/file-header.txt
rm -rf pkg/jetstream/generated
# Temporary chmod fix until we migrate to kube_codegen.sh
D="$(codeGeneratorDir)"; : "$${D:=`go list -m -f '{{.Dir}}' k8s.io/code-generator`}"; \
chmod u+x "$$D/generate-internal-groups.sh"; \
GOFLAGS='' bash "$$D/generate-groups.sh" all \
github.com/nats-io/nack/pkg/jetstream/generated \
github.com/nats-io/nack/pkg/jetstream/apis \
"jetstream:v1beta2" \
--output-base . \
--go-header-file pkg/k8scodegen/file-header.txt
mv github.com/nats-io/nack/pkg/jetstream/generated pkg/jetstream/generated
mv github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go pkg/jetstream/apis/jetstream/v1beta2/zz_generated.deepcopy.go
rm -rf github.com
source "$$D/kube_codegen.sh" ; \
kube::codegen::gen_helpers \
--boilerplate pkg/k8scodegen/file-header.txt \
pkg/jetstream/apis; \
kube::codegen::gen_client \
--with-watch \
--with-applyconfig \
--boilerplate pkg/k8scodegen/file-header.txt \
--output-dir pkg/jetstream/generated \
--output-pkg github.com/nats-io/nack/pkg/jetstream/generated \
--one-input-api jetstream/v1beta2 \
pkg/jetstream/apis

jetstream-controller: $(jetstreamSrc)
go build -race -o $@ \
Expand Down Expand Up @@ -173,10 +177,39 @@ fetch-modules:
.PHONY: build
build: jetstream-controller nats-server-config-reloader nats-boot-config

# Setup envtest tools based on a operator-sdk project makefile
LOCALBIN ?= $(shell pwd)/bin
$(LOCALBIN):
mkdir -p $(LOCALBIN)

# go-install-tool will 'go install' any package with custom target and name of binary, if it doesn't exist
# $1 - target path with name of binary (ideally with version)
# $2 - package url which can be installed
# $3 - specific version of package
define go-install-tool
@[ -f $(1) ] || { \
set -e; \
package=$(2)@$(3) ;\
echo "Downloading $${package}" ;\
GOBIN=$(LOCALBIN) go install $${package} ;\
mv "$$(echo "$(1)" | sed "s/-$(3)$$//")" $(1) ;\
}
endef

ENVTEST ?= $(LOCALBIN)/setup-envtest-$(ENVTEST_VERSION)
ENVTEST_VERSION ?= release-0.19

.PHONY: envtest
envtest: $(ENVTEST) ## Download setup-envtest locally if necessary.
$(ENVTEST): $(LOCALBIN)
$(call go-install-tool,$(ENVTEST),sigs.k8s.io/controller-runtime/tools/setup-envtest,$(ENVTEST_VERSION))


.PHONY: test
test:
go vet ./controllers/... ./pkg/natsreloader/...
go test -race -cover -count=1 -timeout 10s ./controllers/... ./pkg/natsreloader/...
test: envtest
go vet ./controllers/... ./pkg/natsreloader/... ./internal/controller/...
$(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path ## Get k8s binaries
go test -race -cover -count=1 -timeout 10s ./controllers/... ./pkg/natsreloader/... ./internal/controller/...

.PHONY: clean
clean:
Expand Down
2 changes: 1 addition & 1 deletion cicd/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#syntax=docker/dockerfile-upstream:1.5
#syntax=docker/dockerfile-upstream:1.13
ARG GO_APP

FROM alpine:3.21.2 as deps
Expand Down
2 changes: 1 addition & 1 deletion cicd/Dockerfile_goreleaser
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#syntax=docker/dockerfile-upstream:1.5
#syntax=docker/dockerfile-upstream:1.13
FROM --platform=$BUILDPLATFORM golang:1.23.5-bullseye as build

RUN <<EOT
Expand Down
112 changes: 96 additions & 16 deletions cmd/jetstream-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,21 @@ import (
"syscall"
"time"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/nats-io/nack/controllers/jetstream"
"github.com/nats-io/nack/internal/controller"
v1beta2 "github.com/nats-io/nack/pkg/jetstream/apis/jetstream/v1beta2"
clientset "github.com/nats-io/nack/pkg/jetstream/generated/clientset/versioned"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
klog "k8s.io/klog/v2"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log"
)

var (
Expand All @@ -49,7 +55,10 @@ func main() {

func run() error {
klog.InitFlags(nil)
kubeConfig := flag.String("kubeconfig", "", "Path to kubeconfig")

// Explicitly register controller-runtime flags
ctrl.RegisterFlags(nil)

namespace := flag.String("namespace", v1.NamespaceAll, "Restrict to a namespace")
version := flag.Bool("version", false, "Print the version and exit")
creds := flag.String("creds", "", "NATS Credentials")
Expand All @@ -59,9 +68,13 @@ func run() error {
ca := flag.String("tlsca", "", "NATS TLS certificate authority chain")
tlsfirst := flag.Bool("tlsfirst", false, "If enabled, forces explicit TLS without waiting for Server INFO")
server := flag.String("s", "", "NATS Server URL")
crdConnect := flag.Bool("crd-connect", false, "If true, then NATS connections will be made from CRD config, not global config")
crdConnect := flag.Bool("crd-connect", false, "If true, then NATS connections will be made from CRD config, not global config. Ignored if running with control loop, CRD options will always override global config")
cleanupPeriod := flag.Duration("cleanup-period", 30*time.Second, "Period to run object cleanup")
readOnly := flag.Bool("read-only", false, "Starts the controller without causing changes to the NATS resources")
cacheDir := flag.String("cache-dir", "", "Directory to store cached credential and TLS files")
controlLoop := flag.Bool("control-loop", false, "Experimental: Run controller with a full reconciliation control loop.")
controlLoopSyncInterval := flag.Duration("sync-interval", 5*time.Minute, "Interval to perform scheduled reconcile")

flag.Parse()

if *version {
Expand All @@ -73,18 +86,37 @@ func run() error {
return errors.New("NATS Server URL is required")
}

var config *rest.Config
var err error
if *kubeConfig == "" {
config, err = rest.InClusterConfig()
if err != nil {
return err
config, err := ctrl.GetConfig()
if err != nil {
return fmt.Errorf("get kubernetes rest config: %w", err)
}

if *controlLoop {
klog.Warning("Starting JetStream controller in experimental control loop mode")

natsCfg := &controller.NatsConfig{
ClientName: "jetstream-controller",
Credentials: *creds,
NKey: *nkey,
ServerURL: *server,
CAs: []string{},
Certificate: *cert,
Key: *key,
TLSFirst: *tlsfirst,
}
} else {
config, err = clientcmd.BuildConfigFromFlags("", *kubeConfig)
if err != nil {
return err

if *ca != "" {
natsCfg.CAs = []string{*ca}
}

controllerCfg := &controller.Config{
ReadOnly: *readOnly,
Namespace: *namespace,
CacheDir: *cacheDir,
RequeueInterval: *controlLoopSyncInterval,
}

return runControlLoop(config, natsCfg, controllerCfg)
}

// K8S API Client.
Expand Down Expand Up @@ -129,6 +161,54 @@ func run() error {
return ctrl.Run()
}

func runControlLoop(config *rest.Config, natsCfg *controller.NatsConfig, controllerCfg *controller.Config) error {
// Setup scheme
scheme := runtime.NewScheme()
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(v1beta2.AddToScheme(scheme))

log.SetLogger(klog.NewKlogr())

mgr, err := ctrl.NewManager(config, ctrl.Options{
Scheme: scheme,
Logger: log.Log,
})
if err != nil {
return fmt.Errorf("unable to start manager: %w", err)
}

if controllerCfg.CacheDir == "" {
samuelattwood marked this conversation as resolved.
Show resolved Hide resolved
cacheDir, err := os.MkdirTemp(".", "nack")
if err != nil {
return fmt.Errorf("create cache dir: %w", err)
}
defer os.RemoveAll(cacheDir)
controllerCfg.CacheDir = cacheDir
} else {
if _, err := os.Stat(controllerCfg.CacheDir); os.IsNotExist(err) {
err = os.MkdirAll(controllerCfg.CacheDir, 0o755)
if err != nil {
return fmt.Errorf("create cache dir: %w", err)
}
}
}

err = controller.RegisterAll(mgr, natsCfg, controllerCfg)
if err != nil {
return fmt.Errorf("register jetstream controllers: %w", err)
}

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
return fmt.Errorf("unable to set up health check: %w", err)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
return fmt.Errorf("unable to set up ready check: %w", err)
}

klog.Info("starting manager")
return mgr.Start(ctrl.SetupSignalHandler())
}

func handleSignals(cancel context.CancelFunc) {
sigc := make(chan os.Signal, 2)
signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
Expand Down
8 changes: 4 additions & 4 deletions controllers/jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func consumerSpecToOpts(spec apis.ConsumerSpec) ([]jsm.ConsumerOption, error) {
opts = append(opts, jsm.AcknowledgeExplicit())
case "":
default:
return nil, fmt.Errorf("invalid value for 'ackPolicy': '%s'. Must be one of 'none', 'all', 'explicit'.", spec.AckPolicy)
return nil, fmt.Errorf("invalid value for 'ackPolicy': '%s'. Must be one of 'none', 'all', 'explicit'", spec.AckPolicy)
}

if spec.AckWait != "" {
Expand All @@ -262,7 +262,7 @@ func consumerSpecToOpts(spec apis.ConsumerSpec) ([]jsm.ConsumerOption, error) {
opts = append(opts, jsm.ReplayAsReceived())
case "":
default:
return nil, fmt.Errorf("invalid value for 'replayPolicy': '%s'. Must be one of 'instant', 'original'.", spec.ReplayPolicy)
return nil, fmt.Errorf("invalid value for 'replayPolicy': '%s'. Must be one of 'instant', 'original'", spec.ReplayPolicy)
}

if spec.SampleFreq != "" {
Expand Down Expand Up @@ -354,7 +354,7 @@ func setConsumerOK(ctx context.Context, s *apis.Consumer, i typed.ConsumerInterf
sc := s.DeepCopy()

sc.Status.ObservedGeneration = s.Generation
sc.Status.Conditions = upsertCondition(sc.Status.Conditions, apis.Condition{
sc.Status.Conditions = UpsertCondition(sc.Status.Conditions, apis.Condition{
Type: readyCondType,
Status: k8sapi.ConditionTrue,
LastTransitionTime: time.Now().UTC().Format(time.RFC3339Nano),
Expand Down Expand Up @@ -382,7 +382,7 @@ func setConsumerErrored(ctx context.Context, s *apis.Consumer, sif typed.Consume
}

sc := s.DeepCopy()
sc.Status.Conditions = upsertCondition(sc.Status.Conditions, apis.Condition{
sc.Status.Conditions = UpsertCondition(sc.Status.Conditions, apis.Condition{
Type: readyCondType,
Status: k8sapi.ConditionFalse,
LastTransitionTime: time.Now().UTC().Format(time.RFC3339Nano),
Expand Down
Loading