From f9a4683c9805a845695d5e85479dcd20e15ba3c8 Mon Sep 17 00:00:00 2001 From: Ivan Sabelnikov Date: Mon, 6 Jan 2025 17:07:33 +0000 Subject: [PATCH] Allow to specify user+password and token secrets in Account --- controllers/jetstream/consumer.go | 130 ++----------- controllers/jetstream/controller.go | 175 ++++++++++++++++++ controllers/jetstream/stream.go | 130 ++----------- .../apis/jetstream/v1beta2/accounttypes.go | 8 +- pkg/jetstream/apis/jetstream/v1beta2/types.go | 11 ++ 5 files changed, 213 insertions(+), 241 deletions(-) diff --git a/controllers/jetstream/consumer.go b/controllers/jetstream/consumer.go index 52f3f532..83520514 100644 --- a/controllers/jetstream/consumer.go +++ b/controllers/jetstream/consumer.go @@ -4,10 +4,7 @@ import ( "context" "errors" "fmt" - "os" - "path/filepath" "strconv" - "strings" "time" "github.com/nats-io/jsm.go" @@ -49,70 +46,9 @@ func (c *Controller) processConsumerObject(cns *apis.Consumer, jsm jsmClientFunc spec := cns.Spec ifc := c.ji.Consumers(ns) - var ( - remoteClientCert string - remoteClientKey string - remoteRootCA string - accServers []string - accUserCreds string - ) - if spec.Account != "" && c.opts.CRDConnect { - // Lookup the account using the REST client. - ctx, done := context.WithTimeout(context.Background(), 5*time.Second) - defer done() - acc, err := c.ji.Accounts(ns).Get(ctx, spec.Account, k8smeta.GetOptions{}) - if err != nil { - return err - } - - accServers = acc.Spec.Servers - - // Lookup the TLS secrets - if acc.Spec.TLS != nil && acc.Spec.TLS.Secret != nil { - secretName := acc.Spec.TLS.Secret.Name - secret, err := c.ki.Secrets(ns).Get(c.ctx, secretName, k8smeta.GetOptions{}) - if err != nil { - return err - } - - // Write this to the cacheDir - accDir := filepath.Join(c.cacheDir, ns, spec.Account) - if err := os.MkdirAll(accDir, 0755); err != nil { - return err - } - - remoteClientCert = filepath.Join(accDir, acc.Spec.TLS.ClientCert) - remoteClientKey = filepath.Join(accDir, acc.Spec.TLS.ClientKey) - remoteRootCA = filepath.Join(accDir, acc.Spec.TLS.RootCAs) - - for k, v := range secret.Data { - if err := os.WriteFile(filepath.Join(accDir, k), v, 0o644); err != nil { - return err - } - } - } - // Lookup the UserCredentials. - if acc.Spec.Creds != nil { - secretName := acc.Spec.Creds.Secret.Name - secret, err := c.ki.Secrets(ns).Get(c.ctx, secretName, k8smeta.GetOptions{}) - if err != nil { - return err - } - - // Write the user credentials to the cache dir. - accDir := filepath.Join(c.cacheDir, ns, spec.Account) - if err := os.MkdirAll(accDir, 0755); err != nil { - return err - } - for k, v := range secret.Data { - if k == acc.Spec.Creds.File { - accUserCreds = filepath.Join(c.cacheDir, ns, spec.Account, k) - if err := os.WriteFile(filepath.Join(accDir, k), v, 0o644); err != nil { - return err - } - } - } - } + acc, err := c.getAccountOverrides(spec.Account, ns) + if err != nil { + return err } defer func() { @@ -128,58 +64,14 @@ func (c *Controller) processConsumerObject(cns *apis.Consumer, jsm jsmClientFunc type operator func(ctx context.Context, c jsmClient, spec apis.ConsumerSpec) (err error) natsClientUtil := func(op operator) error { - servers := spec.Servers - if c.opts.CRDConnect { - // Create a new client - natsCtx := &natsContext{} - // Use JWT/NKEYS based credentials if present. - if spec.Creds != "" { - natsCtx.Credentials = spec.Creds - } else if spec.Nkey != "" { - natsCtx.Nkey = spec.Nkey - } - if spec.TLS.ClientCert != "" && spec.TLS.ClientKey != "" { - natsCtx.TLSCert = spec.TLS.ClientCert - natsCtx.TLSKey = spec.TLS.ClientKey - } - - // Use fetched secrets for the account and server if defined. - if remoteClientCert != "" && remoteClientKey != "" { - natsCtx.TLSCert = remoteClientCert - natsCtx.TLSKey = remoteClientKey - } - if remoteRootCA != "" { - natsCtx.TLSCAs = []string{remoteRootCA} - } - if accUserCreds != "" { - natsCtx.Credentials = accUserCreds - } - if len(spec.TLS.RootCAs) > 0 { - natsCtx.TLSCAs = spec.TLS.RootCAs - } - - natsServers := strings.Join(append(servers, accServers...), ",") - natsCtx.URL = natsServers - c.normalEvent(cns, "Connecting", "Connecting to new nats-servers") - jsmc, err := jsm(natsCtx) - if err != nil { - return err - } - defer jsmc.Close() - - if err := op(c.ctx, jsmc, spec); err != nil { - return err - } - } else { - jsmc, err := jsm(&natsContext{}) - if err != nil { - return err - } - if err := op(c.ctx, jsmc, spec); err != nil { - return err - } - } - return nil + return c.runWithJsmc(jsm, acc, &jsmcSpecOverrides{ + servers: spec.Servers, + tls: spec.TLS, + creds: spec.Creds, + nkey: spec.Nkey, + }, cns, func(jsmc jsmClient) error { + return op(c.ctx, jsmc, spec) + }) } deleteOK := cns.GetDeletionTimestamp() != nil diff --git a/controllers/jetstream/controller.go b/controllers/jetstream/controller.go index 785b3559..6b3495c8 100644 --- a/controllers/jetstream/controller.go +++ b/controllers/jetstream/controller.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "os" + "path/filepath" "strings" "time" @@ -414,6 +415,180 @@ func (c *Controller) warningEvent(o runtime.Object, reason, message string) { } } +type accountOverrides struct { + remoteClientCert string + remoteClientKey string + remoteRootCA string + servers []string + userCreds string + user string + password string + token string +} + +func (c *Controller) getAccountOverrides(account string, ns string) (*accountOverrides, error) { + overrides := &accountOverrides{} + + if account == "" || !c.opts.CRDConnect { + return overrides, nil + } + + // Lookup the account using the REST client. + ctx, done := context.WithTimeout(context.Background(), 5*time.Second) + defer done() + acc, err := c.ji.Accounts(ns).Get(ctx, account, k8smeta.GetOptions{}) + if err != nil { + return nil, err + } + + overrides.servers = acc.Spec.Servers + + // Lookup the TLS secrets + if acc.Spec.TLS != nil && acc.Spec.TLS.Secret != nil { + secretName := acc.Spec.TLS.Secret.Name + secret, err := c.ki.Secrets(ns).Get(c.ctx, secretName, k8smeta.GetOptions{}) + if err != nil { + return nil, err + } + + // Write this to the cacheDir. + accDir := filepath.Join(c.cacheDir, ns, account) + if err := os.MkdirAll(accDir, 0o755); err != nil { + return nil, err + } + + overrides.remoteClientCert = filepath.Join(accDir, acc.Spec.TLS.ClientCert) + overrides.remoteClientKey = filepath.Join(accDir, acc.Spec.TLS.ClientKey) + overrides.remoteRootCA = filepath.Join(accDir, acc.Spec.TLS.RootCAs) + + for k, v := range secret.Data { + if err := os.WriteFile(filepath.Join(accDir, k), v, 0o644); err != nil { + return nil, err + } + } + } + // Lookup the UserCredentials. + if acc.Spec.Creds != nil { + secretName := acc.Spec.Creds.Secret.Name + secret, err := c.ki.Secrets(ns).Get(c.ctx, secretName, k8smeta.GetOptions{}) + if err != nil { + return nil, err + } + + // Write the user credentials to the cache dir. + accDir := filepath.Join(c.cacheDir, ns, account) + if err := os.MkdirAll(accDir, 0o755); err != nil { + return nil, err + } + for k, v := range secret.Data { + if k == acc.Spec.Creds.File { + overrides.userCreds = filepath.Join(c.cacheDir, ns, account, k) + if err := os.WriteFile(filepath.Join(accDir, k), v, 0o644); err != nil { + return nil, err + } + } + } + } + + // Lookup the Token. + if acc.Spec.Token != nil { + secretName := acc.Spec.Token.Secret.Name + secret, err := c.ki.Secrets(ns).Get(c.ctx, secretName, k8smeta.GetOptions{}) + if err != nil { + return nil, err + } + + for k, v := range secret.Data { + if k == acc.Spec.Token.Token { + overrides.token = string(v) + } + } + } + + // Lookup the UserWithPassword. + if acc.Spec.UserWithPassword != nil { + secretName := acc.Spec.UserWithPassword.Secret.Name + secret, err := c.ki.Secrets(ns).Get(c.ctx, secretName, k8smeta.GetOptions{}) + if err != nil { + return nil, err + } + + for k, v := range secret.Data { + if k == acc.Spec.UserWithPassword.User { + overrides.user = string(v) + } + if k == acc.Spec.UserWithPassword.Password { + overrides.password = string(v) + } + } + } + + return overrides, nil +} + +type jsmcSpecOverrides struct { + servers []string + tls apis.TLS + creds string + nkey string +} + +func (c *Controller) runWithJsmc(jsm jsmClientFunc, acc *accountOverrides, spec *jsmcSpecOverrides, o runtime.Object, op func(jsmClient) error) error { + if !c.opts.CRDConnect { + jsmc, err := jsm(&natsContext{}) + if err != nil { + return err + } + + return op(jsmc) + } + + // Create a new client + natsCtx := &natsContext{} + // Use JWT/NKEYS based credentials if present. + if spec.creds != "" { + natsCtx.Credentials = spec.creds + } else if spec.nkey != "" { + natsCtx.Nkey = spec.nkey + } + if spec.tls.ClientCert != "" && spec.tls.ClientKey != "" { + natsCtx.TLSCert = spec.tls.ClientCert + natsCtx.TLSKey = spec.tls.ClientKey + } + + // Use fetched secrets for the account and server if defined. + if acc.remoteClientCert != "" && acc.remoteClientKey != "" { + natsCtx.TLSCert = acc.remoteClientCert + natsCtx.TLSKey = acc.remoteClientKey + } + if acc.remoteRootCA != "" { + natsCtx.TLSCAs = []string{acc.remoteRootCA} + } + if acc.userCreds != "" { + natsCtx.Credentials = acc.userCreds + } + + natsCtx.Username = acc.user + natsCtx.Password = acc.password + natsCtx.Token = acc.token + + if len(spec.tls.RootCAs) > 0 { + natsCtx.TLSCAs = spec.tls.RootCAs + } + + natsServers := strings.Join(append(spec.servers, acc.servers...), ",") + natsCtx.URL = natsServers + c.normalEvent(o, "Connecting", "Connecting to new nats-servers") + jsmc, err := jsm(natsCtx) + if err != nil { + return fmt.Errorf("failed to connect to nats-servers(%s): %w", natsServers, err) + } + + defer jsmc.Close() + + return op(jsmc) +} + func splitNamespaceName(item interface{}) (ns string, name string, err error) { defer func() { if err != nil { diff --git a/controllers/jetstream/stream.go b/controllers/jetstream/stream.go index 7dc2ec89..5fae6744 100644 --- a/controllers/jetstream/stream.go +++ b/controllers/jetstream/stream.go @@ -17,9 +17,6 @@ import ( "context" "errors" "fmt" - "os" - "path/filepath" - "strings" "time" jsm "github.com/nats-io/jsm.go" @@ -63,71 +60,9 @@ func (c *Controller) processStreamObject(str *apis.Stream, jsm jsmClientFunc) (e ns := str.Namespace readOnly := c.opts.ReadOnly - var ( - remoteClientCert string - remoteClientKey string - remoteRootCA string - accServers []string - acc *apis.Account - accUserCreds string - ) - if spec.Account != "" && c.opts.CRDConnect { - // Lookup the account using the REST client. - ctx, done := context.WithTimeout(context.Background(), 5*time.Second) - defer done() - acc, err = c.ji.Accounts(ns).Get(ctx, spec.Account, k8smeta.GetOptions{}) - if err != nil { - return err - } - - accServers = acc.Spec.Servers - - // Lookup the TLS secrets - if acc.Spec.TLS != nil && acc.Spec.TLS.Secret != nil { - secretName := acc.Spec.TLS.Secret.Name - secret, err := c.ki.Secrets(ns).Get(c.ctx, secretName, k8smeta.GetOptions{}) - if err != nil { - return err - } - - // Write this to the cacheDir. - accDir := filepath.Join(c.cacheDir, ns, spec.Account) - if err := os.MkdirAll(accDir, 0o755); err != nil { - return err - } - - remoteClientCert = filepath.Join(accDir, acc.Spec.TLS.ClientCert) - remoteClientKey = filepath.Join(accDir, acc.Spec.TLS.ClientKey) - remoteRootCA = filepath.Join(accDir, acc.Spec.TLS.RootCAs) - - for k, v := range secret.Data { - if err := os.WriteFile(filepath.Join(accDir, k), v, 0o644); err != nil { - return err - } - } - } - // Lookup the UserCredentials. - if acc.Spec.Creds != nil { - secretName := acc.Spec.Creds.Secret.Name - secret, err := c.ki.Secrets(ns).Get(c.ctx, secretName, k8smeta.GetOptions{}) - if err != nil { - return err - } - - // Write the user credentials to the cache dir. - accDir := filepath.Join(c.cacheDir, ns, spec.Account) - if err := os.MkdirAll(accDir, 0o755); err != nil { - return err - } - for k, v := range secret.Data { - if k == acc.Spec.Creds.File { - accUserCreds = filepath.Join(c.cacheDir, ns, spec.Account, k) - if err := os.WriteFile(filepath.Join(accDir, k), v, 0o644); err != nil { - return err - } - } - } - } + acc, err := c.getAccountOverrides(spec.Account, ns) + if err != nil { + return err } defer func() { @@ -143,57 +78,14 @@ func (c *Controller) processStreamObject(str *apis.Stream, jsm jsmClientFunc) (e type operator func(ctx context.Context, c jsmClient, spec apis.StreamSpec) (err error) natsClientUtil := func(op operator) error { - servers := spec.Servers - if c.opts.CRDConnect { - // Create a new client - natsCtx := &natsContext{} - // Use JWT/NKEYS based credentials if present. - if spec.Creds != "" { - natsCtx.Credentials = spec.Creds - } else if spec.Nkey != "" { - natsCtx.Nkey = spec.Nkey - } - if spec.TLS.ClientCert != "" && spec.TLS.ClientKey != "" { - natsCtx.TLSCert = spec.TLS.ClientCert - natsCtx.TLSKey = spec.TLS.ClientKey - } - - // Use fetched secrets for the account and server if defined. - if remoteClientCert != "" && remoteClientKey != "" { - natsCtx.TLSCert = remoteClientCert - natsCtx.TLSKey = remoteClientKey - } - if remoteRootCA != "" { - natsCtx.TLSCAs = []string{remoteRootCA} - } - if accUserCreds != "" { - natsCtx.Credentials = accUserCreds - } - if len(spec.TLS.RootCAs) > 0 { - natsCtx.TLSCAs = spec.TLS.RootCAs - } - - natsServers := strings.Join(append(servers, accServers...), ",") - natsCtx.URL = natsServers - c.normalEvent(str, "Connecting", "Connecting to new nats-servers") - jsmc, err := jsm(natsCtx) - if err != nil { - return fmt.Errorf("failed to connect to nats-servers(%s): %w", natsServers, err) - } - defer jsmc.Close() - if err := op(c.ctx, jsmc, spec); err != nil { - return err - } - } else { - jsmc, err := jsm(&natsContext{}) - if err != nil { - return err - } - if err := op(c.ctx, jsmc, spec); err != nil { - return err - } - } - return nil + return c.runWithJsmc(jsm, acc, &jsmcSpecOverrides{ + servers: spec.Servers, + tls: spec.TLS, + creds: spec.Creds, + nkey: spec.Nkey, + }, str, func(jsmc jsmClient) error { + return op(c.ctx, jsmc, spec) + }) } deleteOK := str.GetDeletionTimestamp() != nil diff --git a/pkg/jetstream/apis/jetstream/v1beta2/accounttypes.go b/pkg/jetstream/apis/jetstream/v1beta2/accounttypes.go index 01679053..f5f99a14 100644 --- a/pkg/jetstream/apis/jetstream/v1beta2/accounttypes.go +++ b/pkg/jetstream/apis/jetstream/v1beta2/accounttypes.go @@ -22,9 +22,11 @@ func (c *Account) GetSpec() interface{} { // AccountSpec is the spec for a Account resource type AccountSpec struct { - Servers []string `json:"servers"` - TLS *TLSSecret `json:"tls"` - Creds *CredsSecret `json:"creds"` + Servers []string `json:"servers"` + TLS *TLSSecret `json:"tls"` + Creds *CredsSecret `json:"creds"` + Token *TokenSecret `json:"token"` + UserWithPassword *UserWithPassword `json:"userWithPassword"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/jetstream/apis/jetstream/v1beta2/types.go b/pkg/jetstream/apis/jetstream/v1beta2/types.go index 099c7bd6..e71a28c4 100644 --- a/pkg/jetstream/apis/jetstream/v1beta2/types.go +++ b/pkg/jetstream/apis/jetstream/v1beta2/types.go @@ -40,6 +40,17 @@ type CredsSecret struct { Secret SecretRef `json:"secret"` } +type TokenSecret struct { + Token string `json:"token"` + Secret SecretRef `json:"secret"` +} + +type UserWithPassword struct { + User string `json:"user"` + Password string `json:"password"` + Secret SecretRef `json:"secret"` +} + type SecretRef struct { Name string `json:"name"` }