-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpath_token.go
112 lines (97 loc) · 3.1 KB
/
path_token.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package main
import (
"context"
"encoding/base64"
"errors"
"time"
"github.com/IBM/sarama"
"github.com/hashicorp/vault/sdk/framework"
"github.com/hashicorp/vault/sdk/logical"
)
const (
tokenPath = "token/"
maxLifetimeKey = "max_life_period"
)
func (b *kafkaScramBackend) pathToken() *framework.Path {
return &framework.Path{
Pattern: tokenPath + framework.GenericNameRegex(nameKey),
Fields: map[string]*framework.FieldSchema{
nameKey: {
Type: framework.TypeString,
Description: "Name of the principal used to issue the token",
Required: true,
},
maxLifetimeKey: {
Type: framework.TypeInt64,
Description: "The maximum lifetime of the token in seconds",
},
},
Callbacks: map[logical.Operation]framework.OperationFunc{
logical.ReadOperation: b.createToken,
},
HelpSynopsis: "Create a Kafka delegation token owned by the user specified in the path",
HelpDescription: `This returns a Token ID and HMAC which may be used as the username and password
to authenticate to the Kafka cluster on the same advertised listener that this plugin is configured for.
For JVM clients, make sure that the sasl.jaas.config in your client.properties ends with: tokenauth=true;
For non JVM-clients, be sure to follow the pseudo-SCRAM procedure for delegation token authentication
defined in KIP-48 (client-first-message suffixed with: "tokenauth=true")`,
}
}
func (b *kafkaScramBackend) createToken(
ctx context.Context,
req *logical.Request,
data *framework.FieldData) (*logical.Response, error) {
if err := data.Validate(); err != nil {
return logical.ErrorResponse(err.Error()), nil
}
name, err := getName(data)
if err != nil {
return logical.ErrorResponse(err.Error()), nil
}
maxLife := -1 * time.Millisecond
if v, ok := data.GetOk(maxLifetimeKey); ok {
if vtyped, ok := v.(int64); ok {
maxLife = time.Duration(vtyped)
} else {
return logical.ErrorResponse("'%s' must be a integer", maxLifetimeKey), nil
}
}
config, err := getConfig(ctx, req.Storage)
if err != nil {
return nil, err
}
admin, err := b.getAdminClient(ctx, req.Storage)
if err != nil {
return nil, err
}
desc, err := admin.DescribeUserScramCredentials([]string{name})
if err != nil {
return nil, err
} else if !errors.Is(desc[0].ErrorCode, sarama.ErrNoError) {
msg := desc[0].ErrorCode.Error()
if desc[0].ErrorMessage != nil {
msg = *desc[0].ErrorMessage
}
return logical.ErrorResponse(msg), nil
}
token, err := admin.CreateDelegationToken([]string{config.Username}, &name, maxLife)
if err != nil {
return nil, err
}
hmac := base64.StdEncoding.EncodeToString(token.HMAC)
resp := b.Secret(kafkaDelegationToken).Response(
map[string]interface{}{
"Token ID": token.TokenID,
"HMAC": hmac,
"Expiry time": token.ExpiryTime,
"Max life time": token.MaxLifeTime,
"Issue time": token.IssueTime,
},
map[string]interface{}{hmacKey: hmac},
)
resp.Secret.Renewable = true
resp.Secret.IssueTime = token.IssueTime
resp.Secret.TTL = time.Until(token.ExpiryTime)
resp.Secret.MaxTTL = time.Until(token.MaxLifeTime)
return resp, nil
}