Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OAUTH support for AdobeIMS #10

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ require (
golang.org/x/sync v0.2.0
)

require github.com/golang-jwt/jwt/v5 v5.0.0 // indirect

require (
github.com/adobe/ims-go v0.16.1
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/adobe/ims-go v0.16.1 h1:n1gYlfAV9djx9+r9VGU8I49G8fnIrIsZvX1s/XJVD3w=
github.com/adobe/ims-go v0.16.1/go.mod h1:nsvzRhDFespyZnnLxQlsfBmlfGzqcy/zuS2HGebDHMI=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
Expand Down Expand Up @@ -65,6 +67,8 @@ github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr6
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt/v5 v5.0.0 h1:1n1XNM9hk7O9mnQoNBGolZvzebBQ7p93ULHRc28XJUE=
github.com/golang-jwt/jwt/v5 v5.0.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down
12 changes: 12 additions & 0 deletions kafka/client_config_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,18 @@ func NewKgoConfig(cfg Config, logger *zap.Logger) ([]kgo.Opt, error) {
}.AsMechanism()
opts = append(opts, kgo.SASL(kerberosMechanism))
}

if cfg.SASL.Mechanism == SASLMechanismOAuthBearer {
var opt *kgo.Opt
switch cfg.SASL.OAUTHBEARER.Type {
case AdobeOAUTH:
bearer := NewAdobeOAUTHBearer(cfg.SASL.OAUTHBEARER.AdobeIMS)
opt, _ = bearer.Opt()
default:
return nil, fmt.Errorf("unknown oauthbearer type '%v'", cfg.SASL.OAUTHBEARER.Type)
}
opts = append(opts, opt)
}
}

// Configure TLS
Expand Down
7 changes: 3 additions & 4 deletions kafka/config_sasl.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ type SASLConfig struct {
Mechanism string `koanf:"mechanism"`

// SASL Mechanisms that require more configuration than username & password
GSSAPI SASLGSSAPIConfig `koanf:"gssapi"`
GSSAPI SASLGSSAPIConfig `koanf:"gssapi"`
OAUTHBEARER OAUTHBEARERConfig `koanf:"oauthbearer"`
}

// SetDefaults for SASL Config
Expand All @@ -35,10 +36,8 @@ func (c *SASLConfig) Validate() error {
}

switch c.Mechanism {
case SASLMechanismPlain, SASLMechanismScramSHA256, SASLMechanismScramSHA512, SASLMechanismGSSAPI:
case SASLMechanismPlain, SASLMechanismScramSHA256, SASLMechanismScramSHA512, SASLMechanismGSSAPI, SASLMechanismOAuthBearer:
// Valid and supported
case SASLMechanismOAuthBearer:
return fmt.Errorf("sasl mechanism '%v' is valid but not yet supported. Please submit an issue if you need it", c.Mechanism)
default:
return fmt.Errorf("given sasl mechanism '%v' is invalid", c.Mechanism)
}
Expand Down
13 changes: 13 additions & 0 deletions kafka/config_sasl_oauthbearer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package kafka

// SASLGSSAPIConfig represents the Kafka Kerberos config
type OAUTHBEARERConfig struct {
Type OAUTHBEARERType `koanf:"type"`
AdobeIMS AdobeIMSConfig `koanf:"adobeIMS"`
}

type OAUTHBEARERType string

const (
AdobeOAUTH OAUTHBEARERType = "AdobeIMS"
)
55 changes: 55 additions & 0 deletions kafka/oauth_adobe_ims.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package kafka

import (
"context"
"fmt"

"github.com/adobe/ims-go/ims"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/oauth"
)

type AdobeIMSConfig struct {
Code string `koanf:"imsClientCode"`
ClientID string `koanf:"imsClientId"`
ClientSecret string `koanf:"imsClientSecret"`
Endpoint string `koanf:"imsEndpoint"`
}

type AdobeOAUTHBearer struct {
config AdobeIMSConfig
client *ims.Client
token string
}

func NewAdobeOAUTHBearer(config AdobeIMSConfig) (*AdobeOAUTHBearer, error) {
client, err := ims.NewClient(&ims.ClientConfig{
URL: config.Endpoint,
})
if err != nil {
return nil, fmt.Errorf("Unable to retrieve token: %v", err)
}

token, err := client.Token(&ims.TokenRequest{
Code: config.Code,
ClientID: config.ClientID,
ClientSecret: config.ClientSecret,
})
if err != nil {
return nil, fmt.Errorf("Unable to get token from IMS: %v", err)
}

return &AdobeOAUTHBearer{
config: config,
client: client,
token: token.AccessToken,
}, nil
}

func (a *AdobeOAUTHBearer) Opt() kgo.Opt {
return kgo.SASL(oauth.Oauth(func(ctx context.Context) (oauth.Auth, error) {
return oauth.Auth{
Token: a.token,
}, nil
}))
}