diff --git a/go.mod b/go.mod index b767e07..68d3f3a 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,10 @@ require ( golang.org/x/sync v0.2.0 ) +require github.com/golang-jwt/jwt/v5 v5.0.0 // indirect + require ( + github.com/adobe/ims-go v0.16.1 github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/go.sum b/go.sum index 321f579..42bd826 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/adobe/ims-go v0.16.1 h1:n1gYlfAV9djx9+r9VGU8I49G8fnIrIsZvX1s/XJVD3w= +github.com/adobe/ims-go v0.16.1/go.mod h1:nsvzRhDFespyZnnLxQlsfBmlfGzqcy/zuS2HGebDHMI= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -65,6 +67,8 @@ github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr6 github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-jwt/jwt/v5 v5.0.0 h1:1n1XNM9hk7O9mnQoNBGolZvzebBQ7p93ULHRc28XJUE= +github.com/golang-jwt/jwt/v5 v5.0.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= diff --git a/kafka/client_config_helper.go b/kafka/client_config_helper.go index 35f5bb5..dca12c1 100644 --- a/kafka/client_config_helper.go +++ b/kafka/client_config_helper.go @@ -108,6 +108,18 @@ func NewKgoConfig(cfg Config, logger *zap.Logger) ([]kgo.Opt, error) { }.AsMechanism() opts = append(opts, kgo.SASL(kerberosMechanism)) } + + if cfg.SASL.Mechanism == SASLMechanismOAuthBearer { + var opt *kgo.Opt + switch cfg.SASL.OAUTHBEARER.Type { + case AdobeOAUTH: + bearer := NewAdobeOAUTHBearer(cfg.SASL.OAUTHBEARER.AdobeIMS) + opt, _ = bearer.Opt() + default: + return nil, fmt.Errorf("unknown oauthbearer type '%v'", cfg.SASL.OAUTHBEARER.Type) + } + opts = append(opts, opt) + } } // Configure TLS diff --git a/kafka/config_sasl.go b/kafka/config_sasl.go index 522f522..80bd2e6 100644 --- a/kafka/config_sasl.go +++ b/kafka/config_sasl.go @@ -18,7 +18,8 @@ type SASLConfig struct { Mechanism string `koanf:"mechanism"` // SASL Mechanisms that require more configuration than username & password - GSSAPI SASLGSSAPIConfig `koanf:"gssapi"` + GSSAPI SASLGSSAPIConfig `koanf:"gssapi"` + OAUTHBEARER OAUTHBEARERConfig `koanf:"oauthbearer"` } // SetDefaults for SASL Config @@ -35,10 +36,8 @@ func (c *SASLConfig) Validate() error { } switch c.Mechanism { - case SASLMechanismPlain, SASLMechanismScramSHA256, SASLMechanismScramSHA512, SASLMechanismGSSAPI: + case SASLMechanismPlain, SASLMechanismScramSHA256, SASLMechanismScramSHA512, SASLMechanismGSSAPI, SASLMechanismOAuthBearer: // Valid and supported - case SASLMechanismOAuthBearer: - return fmt.Errorf("sasl mechanism '%v' is valid but not yet supported. Please submit an issue if you need it", c.Mechanism) default: return fmt.Errorf("given sasl mechanism '%v' is invalid", c.Mechanism) } diff --git a/kafka/config_sasl_oauthbearer.go b/kafka/config_sasl_oauthbearer.go new file mode 100644 index 0000000..7efcb11 --- /dev/null +++ b/kafka/config_sasl_oauthbearer.go @@ -0,0 +1,13 @@ +package kafka + +// SASLGSSAPIConfig represents the Kafka Kerberos config +type OAUTHBEARERConfig struct { + Type OAUTHBEARERType `koanf:"type"` + AdobeIMS AdobeIMSConfig `koanf:"adobeIMS"` +} + +type OAUTHBEARERType string + +const ( + AdobeOAUTH OAUTHBEARERType = "AdobeIMS" +) diff --git a/kafka/oauth_adobe_ims.go b/kafka/oauth_adobe_ims.go new file mode 100644 index 0000000..2b3984d --- /dev/null +++ b/kafka/oauth_adobe_ims.go @@ -0,0 +1,55 @@ +package kafka + +import ( + "context" + "fmt" + + "github.com/adobe/ims-go/ims" + "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/sasl/oauth" +) + +type AdobeIMSConfig struct { + Code string `koanf:"imsClientCode"` + ClientID string `koanf:"imsClientId"` + ClientSecret string `koanf:"imsClientSecret"` + Endpoint string `koanf:"imsEndpoint"` +} + +type AdobeOAUTHBearer struct { + config AdobeIMSConfig + client *ims.Client + token string +} + +func NewAdobeOAUTHBearer(config AdobeIMSConfig) (*AdobeOAUTHBearer, error) { + client, err := ims.NewClient(&ims.ClientConfig{ + URL: config.Endpoint, + }) + if err != nil { + return nil, fmt.Errorf("Unable to retrieve token: %v", err) + } + + token, err := client.Token(&ims.TokenRequest{ + Code: config.Code, + ClientID: config.ClientID, + ClientSecret: config.ClientSecret, + }) + if err != nil { + return nil, fmt.Errorf("Unable to get token from IMS: %v", err) + } + + return &AdobeOAUTHBearer{ + config: config, + client: client, + token: token.AccessToken, + }, nil +} + +func (a *AdobeOAUTHBearer) Opt() kgo.Opt { + return kgo.SASL(oauth.Oauth(func(ctx context.Context) (oauth.Auth, error) { + return oauth.Auth{ + Token: a.token, + }, nil + })) +}