From e5cd4564f63960824c8e433d7ed18c8ecd7958f2 Mon Sep 17 00:00:00 2001 From: Sam Arnold Date: Fri, 11 Jun 2021 19:11:17 -0400 Subject: [PATCH] feat: shared secret auth to influxdb in OSS (#2576) * feat: shared secret auth to influxdb in OSS * test: fix default values in update test --- CHANGELOG.md | 1 + influxdb/client.go | 7 +- influxdb/token_client.go | 186 +++++++++++++++++++++++++++++++++++ server/server.go | 8 +- server/server_test.go | 1 + services/influxdb/config.go | 15 +-- services/influxdb/service.go | 6 ++ 7 files changed, 213 insertions(+), 11 deletions(-) create mode 100644 influxdb/token_client.go diff --git a/CHANGELOG.md b/CHANGELOG.md index f4fcb3bcc..fc23bea73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ - [#2555](https://github.com/influxdata/kapacitor/pull/2555): run flux tasks with built-in flux engine - [#2559](https://github.com/influxdata/kapacitor/pull/2559): kapacitor cli supports flux tasks - [#2560](https://github.com/influxdata/kapacitor/pull/2560): enable new-style slack apps +- [#2576](https://github.com/influxdata/kapacitor/pull/2576): shared secret auth to influxdb in OSS ### Bugfixes - [#2564](https://github.com/influxdata/kapacitor/pull/2564): Fix a panic in the scraper handler when debug mode is enabled diff --git a/influxdb/client.go b/influxdb/client.go index 9435277e3..3a5398d10 100644 --- a/influxdb/client.go +++ b/influxdb/client.go @@ -126,13 +126,14 @@ type Credentials struct { Method AuthenticationMethod // UserAuthentication fields - Username string Password string - // BearerAuthentication fields - + // TokenAuthentication fields Token string + + // BearerAuthentication fields + HttpSharedSecret bool } // HTTPClient is safe for concurrent use. diff --git a/influxdb/token_client.go b/influxdb/token_client.go new file mode 100644 index 000000000..c9c500f69 --- /dev/null +++ b/influxdb/token_client.go @@ -0,0 +1,186 @@ +package influxdb + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/dgrijalva/jwt-go" + "github.com/influxdata/flux" + "github.com/influxdata/kapacitor/keyvalue" + "github.com/pkg/errors" +) + +type Diagnostic interface { + Error(msg string, err error, ctx ...keyvalue.T) +} + +type TokenClientCreator struct { + httpSharedSecret []byte + tokenDuration time.Duration + diag Diagnostic +} + +func NewTokenClientCreator(httpSharedSecret []byte, tokenDuration time.Duration, d Diagnostic) *TokenClientCreator { + return &TokenClientCreator{ + httpSharedSecret: httpSharedSecret, + tokenDuration: tokenDuration, + diag: d, + } +} + +func (cc *TokenClientCreator) Create(config Config) (ClientUpdater, error) { + if config.Credentials.Method != BearerAuthentication { + // Config doesn't need token auth, use normal client + return NewHTTPClient(config) + } + if !config.Credentials.HttpSharedSecret { + // This should not happen since we only set BearerAuthentication in services/influxdb.httpConfig + // if http-shared-secret was false. Eventually we could add an additional config value to set a shared secret + // that is not the one from KAPACITOR_HTTP_SHARED_SECRET + return nil, errors.New("invalid config: bearer auth configured but http-shared-secret was false") + } + // Generate the first token + token, err := generateToken(config.Credentials.Username, cc.httpSharedSecret, cc.tokenDuration) + if err != nil { + return nil, errors.Wrap(err, "generating first token") + } + // Update credentials to use token + config.Credentials.Method = BearerAuthentication + config.Credentials.Token = token + + cli, err := NewHTTPClient(config) + if err != nil { + return nil, err + } + tcli := &tokenClient{ + client: cli, + username: config.Credentials.Username, + sharedSecret: cc.httpSharedSecret, + tokenDuration: cc.tokenDuration, + diag: cc.diag, + closing: make(chan struct{}), + closed: true, + } + tcli.configValue.Store(config) + + if err := tcli.Open(); err != nil { + return nil, errors.Wrap(err, "failed to open client") + } + return tcli, nil +} + +type tokenClient struct { + wg sync.WaitGroup + configValue atomic.Value // influxdb.Config + client *HTTPClient + username string + sharedSecret []byte + tokenDuration time.Duration + + mu sync.Mutex + closing chan struct{} + closed bool + diag Diagnostic +} + +// Update updates the running configuration. +func (tc *tokenClient) Update(c Config) error { + tc.configValue.Store(c) + return tc.client.Update(c) +} + +// Ping checks that status of cluster +func (tc *tokenClient) Ping(c context.Context) (time.Duration, string, error) { + return tc.client.Ping(c) +} + +// Write takes a BatchPoints object and writes all Points to InfluxDB. +func (tc *tokenClient) Write(bp BatchPoints) error { + return tc.client.Write(bp) +} + +// Query makes an InfluxDB Query on the database. +func (tc *tokenClient) Query(q Query) (*Response, error) { + return tc.client.Query(q) +} + +// WriteV2 writes to InfluxDB using the v2 write protocol +func (tc *tokenClient) WriteV2(w FluxWrite) error { + return tc.client.WriteV2(w) +} + +// QueryFlux makes a flux query to InfluxDB +func (tc *tokenClient) QueryFlux(q FluxQuery) (flux.ResultIterator, error) { + return tc.client.QueryFlux(q) +} + +// QueryFluxResponse makes a flux query to InfluxDB and translates it to a Response +func (tc *tokenClient) QueryFluxResponse(q FluxQuery) (*Response, error) { + return tc.client.QueryFluxResponse(q) +} + +func (tc *tokenClient) Open() error { + tc.mu.Lock() + defer tc.mu.Unlock() + if !tc.closed { + return nil + } + tc.closed = false + // Start background routine to preemptively update the token before it expires. + tc.wg.Add(1) + go func() { + defer tc.wg.Done() + tc.manageToken() + }() + return nil +} + +// Close the client. +func (tc *tokenClient) Close() error { + tc.mu.Lock() + defer tc.mu.Unlock() + if tc.closed { + return nil + } + close(tc.closing) + tc.closed = true + tc.wg.Wait() + return tc.client.Close() +} + +// Preemptively update the token before it can expire. +func (tc *tokenClient) manageToken() { + ticker := time.NewTicker(tc.tokenDuration / 2) + defer ticker.Stop() + for { + select { + case <-ticker.C: + token, err := generateToken(tc.username, tc.sharedSecret, tc.tokenDuration) + if err != nil { + tc.diag.Error("failed to generate new token", err) + continue + } + c := tc.configValue.Load().(Config) + c.Credentials.Token = token + tc.Update(c) + case <-tc.closing: + return + } + } +} + +// Generate a new signed token for the user. The token will expire after tokenDuration has elapsed. +func generateToken(username string, secret []byte, tokenDuration time.Duration) (string, error) { + // Create a new token object, specifying signing method and the claims + // you would like it to contain. + token := jwt.NewWithClaims(jwt.SigningMethodHS512, jwt.MapClaims{ + "username": username, + "exp": time.Now().Add(tokenDuration).Unix(), + }) + + // Sign and get the complete encoded token as a string using the secret + tokenString, err := token.SignedString(secret) + return tokenString, errors.Wrap(err, "signing authentication token") +} diff --git a/server/server.go b/server/server.go index 7da34a7aa..c21292b57 100644 --- a/server/server.go +++ b/server/server.go @@ -10,6 +10,7 @@ import ( "runtime" "runtime/pprof" "sync" + "time" "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/services/collectd" @@ -473,6 +474,11 @@ func (s *Server) appendLoadService() error { return nil } +const ( + // Tokens for the InfluxDB clusters are generate with an expiration this far into the future. + tokenExpirationDuration = 10 * time.Minute +) + func (s *Server) appendInfluxDBService() error { c := s.config.InfluxDB d := s.DiagService.NewInfluxDBHandler() @@ -493,7 +499,7 @@ func (s *Server) appendInfluxDBService() error { srv.HTTPDService = s.HTTPDService srv.PointsWriter = s.TaskMaster srv.AuthService = s.AuthService - srv.ClientCreator = iclient.ClientCreator{} + srv.ClientCreator = iclient.NewTokenClientCreator([]byte(s.config.HTTP.SharedSecret), tokenExpirationDuration, s.DiagService.NewInfluxDBHandler()) s.InfluxDBService = srv s.TaskMaster.InfluxDBService = srv diff --git a/server/server_test.go b/server/server_test.go index aa5b751e9..e7df2c17f 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -6497,6 +6497,7 @@ func TestServer_UpdateConfig(t *testing.T) { "http-port": float64(0), "insecure-skip-verify": false, "kapacitor-hostname": "", + "http-shared-secret": false, "name": "default", "password": true, "ssl-ca": "", diff --git a/services/influxdb/config.go b/services/influxdb/config.go index 10a1a4c46..e503805fd 100644 --- a/services/influxdb/config.go +++ b/services/influxdb/config.go @@ -30,13 +30,14 @@ const ( ) type Config struct { - Enabled bool `toml:"enabled" override:"enabled"` - Name string `toml:"name" override:"name"` - Default bool `toml:"default" override:"default"` - URLs []string `toml:"urls" override:"urls"` - Username string `toml:"username" override:"username"` - Password string `toml:"password" override:"password,redact"` - Token string `toml:"token" override:"token,redact"` + Enabled bool `toml:"enabled" override:"enabled"` + Name string `toml:"name" override:"name"` + Default bool `toml:"default" override:"default"` + URLs []string `toml:"urls" override:"urls"` + Username string `toml:"username" override:"username"` + Password string `toml:"password" override:"password,redact"` + Token string `toml:"token" override:"token,redact"` + HttpSharedSecret bool `toml:"http-shared-secret" override:"http-shared-secret"` // Path to CA file SSLCA string `toml:"ssl-ca" override:"ssl-ca"` diff --git a/services/influxdb/service.go b/services/influxdb/service.go index 7e0106bc4..65020a942 100644 --- a/services/influxdb/service.go +++ b/services/influxdb/service.go @@ -511,6 +511,12 @@ func httpConfig(c Config) (influxdb.Config, error) { Method: influxdb.TokenAuthentication, Token: c.Token, } + } else if c.HttpSharedSecret { + credentials = influxdb.Credentials{ + Method: influxdb.BearerAuthentication, + Username: c.Username, + HttpSharedSecret: true, + } } else if c.Username != "" { credentials = influxdb.Credentials{ Method: influxdb.UserAuthentication,