Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/k8sobjects] Support running in kubernetes leader election mode. #24662

Closed
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
8a0a1ba
Support running in kubernetes leader election mode.
JaredTan95 Jul 28, 2023
49846ac
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Jul 28, 2023
dd0bbaa
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Jul 29, 2023
60d896a
fix proto
JaredTan95 Jul 29, 2023
1d6e851
fix UT.
JaredTan95 Jul 29, 2023
9b2b808
fix mod
JaredTan95 Jul 29, 2023
819f8f1
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 2, 2023
77b413c
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 2, 2023
342cfd1
Update .chloggen/support-kubernetes-leader-election.yaml
JaredTan95 Aug 3, 2023
73f5477
polish
JaredTan95 Aug 3, 2023
976fa57
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 3, 2023
b4f5362
fix UT
JaredTan95 Aug 3, 2023
f463b1d
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 3, 2023
0854e43
polish
JaredTan95 Aug 3, 2023
3438834
fix tidy
JaredTan95 Aug 3, 2023
47dbbde
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 3, 2023
2ee86aa
polish docs.
JaredTan95 Aug 4, 2023
c117ac7
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 4, 2023
a1c2e3d
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 4, 2023
c0dee21
Update receiver/k8sobjectsreceiver/receiver.go
JaredTan95 Aug 5, 2023
8d31074
Update receiver/k8sobjectsreceiver/receiver.go
JaredTan95 Aug 5, 2023
711cb97
polish
JaredTan95 Aug 5, 2023
671837d
Merge remote-tracking branch 'upstream/main' into k8sobjectreceiver_l…
JaredTan95 Aug 5, 2023
b391469
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 5, 2023
6148233
fix gomod
JaredTan95 Aug 5, 2023
1141aac
fix lint
JaredTan95 Aug 5, 2023
adc7a5b
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 7, 2023
b7943d9
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 7, 2023
e6d754f
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 8, 2023
85afbbd
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 11, 2023
c70e004
Update receiver/k8sobjectsreceiver/README.md
JaredTan95 Aug 12, 2023
0d64def
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 13, 2023
42e27f7
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 14, 2023
17ff65c
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 14, 2023
2034511
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 14, 2023
d1d0c24
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 14, 2023
9f290aa
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 14, 2023
0130b7c
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 14, 2023
8b98374
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 15, 2023
981559c
fix luck name set
JaredTan95 Aug 15, 2023
a61986b
polish docs.
JaredTan95 Aug 17, 2023
0713c76
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 17, 2023
62d503e
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 17, 2023
6ecd080
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 17, 2023
5afcf86
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 20, 2023
d1bfea3
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 21, 2023
229a0ab
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 25, 2023
d67dc0f
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 28, 2023
3765528
polish
JaredTan95 Aug 28, 2023
7c64ab8
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 28, 2023
c47bee6
fix non leader stop collecting logical
JaredTan95 Aug 31, 2023
0c588b5
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 31, 2023
3c7a4a1
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 31, 2023
783625b
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Aug 31, 2023
e6c4c0e
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Sep 1, 2023
4df103e
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Sep 3, 2023
147f6e6
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Sep 4, 2023
959d72e
Merge remote-tracking branch 'upstream/main' into k8sobjectreceiver_l…
JaredTan95 Sep 5, 2023
c93439e
gotidy
JaredTan95 Sep 5, 2023
4494de0
update logical
JaredTan95 Sep 5, 2023
0dbc925
Merge remote-tracking branch 'upstream/main' into k8sobjectreceiver_l…
JaredTan95 Sep 5, 2023
893d2fb
Merge remote-tracking branch 'upstream/main' into k8sobjectreceiver_l…
JaredTan95 Sep 30, 2023
8786687
add leader election pull mode
JaredTan95 Oct 1, 2023
57a6da5
Update receiver/k8sobjectsreceiver/receiver.go
JaredTan95 Oct 11, 2023
59e14f7
Merge branch 'main' into k8sobjectreceiver_leader_election
JaredTan95 Dec 10, 2023
a867048
fix mod
JaredTan95 Dec 10, 2023
050da0c
fix mod
JaredTan95 Dec 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions .chloggen/support-kubernetes-leader-election.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sobjectsreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support running selected Kubernetes components in "leader election mode", meaning that only one replica of the Collector will react to Kubernetes events at any given point in time.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [17369]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
Support running in kubernetes leader election mode.
3 changes: 3 additions & 0 deletions extension/observer/k8sobserver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions internal/k8sconfig/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

110 changes: 110 additions & 0 deletions internal/k8sconfig/leaderelection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package k8sconfig // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"

import (
"context"
"errors"
"fmt"
"os"
"time"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
)

// LeaderElectionConfig is used to enable leader election
type LeaderElectionConfig struct {
Enabled bool `mapstructure:"enabled"`
// LockName determines the name of the resource that leader election will use for holding the leader lock.
LockName string `mapstructure:"lock_name"`
LeaseDuration time.Duration `mapstructure:"lease_duration"`
RenewDeadline time.Duration `mapstructure:"renew_deadline"`
RetryPeriod time.Duration `mapstructure:"retry_period"`
}

const (
inClusterNamespacePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
defaultLeaseDuration = 15 * time.Second
defaultRenewDeadline = 10 * time.Second
defaultRetryPeriod = 2 * time.Second
)

// NewResourceLock creates a new leases resource lock for use in a leader election loop
func newResourceLock(client kubernetes.Interface, lockName string) (resourcelock.Interface, error) {
if lockName == "" {
JaredTan95 marked this conversation as resolved.
Show resolved Hide resolved
return nil, errors.New("lockName must be configured")
dmitryax marked this conversation as resolved.
Show resolved Hide resolved
}

leaderElectionNamespace, err := getInClusterNamespace()
if err != nil {
return nil, fmt.Errorf("unable to find leader election namespace: %w", err)
}

// Leader id, needs to be unique, use pod name in kubernetes case.
id, err := os.Hostname()
if err != nil {
return nil, err
}

return resourcelock.New(resourcelock.LeasesResourceLock,
leaderElectionNamespace, lockName,
client.CoreV1(),
client.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: id,
})
}

func getInClusterNamespace() (string, error) {
// Check whether the namespace file exists.
// If not, we are not running in cluster so can't guess the namespace.
_, err := os.Stat(inClusterNamespacePath)
if os.IsNotExist(err) {
return "", fmt.Errorf("not running in-cluster, unable to get namespace")
} else if err != nil {
return "", fmt.Errorf("error checking namespace file: %w", err)
}

// Load the namespace file and return its content
namespace, err := os.ReadFile(inClusterNamespacePath)
if err != nil {
return "", fmt.Errorf("error reading namespace file: %w", err)
}
return string(namespace), nil
}

// NewLeaderElector return a leader elector object using client-go
func NewLeaderElector(cfg LeaderElectionConfig, client kubernetes.Interface, startFunc func(context.Context), stopFunc func()) (*leaderelection.LeaderElector, error) {
resourceLock, err := newResourceLock(client, cfg.LockName)
if err != nil {
return &leaderelection.LeaderElector{}, err
}

leConfig := leaderelection.LeaderElectionConfig{
Lock: resourceLock,
LeaseDuration: defaultLeaseDuration,
RenewDeadline: defaultRenewDeadline,
RetryPeriod: defaultRetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: startFunc,
OnStoppedLeading: stopFunc,
},
}

if cfg.LeaseDuration != 0 {
leConfig.LeaseDuration = cfg.LeaseDuration
}

if cfg.RenewDeadline != 0 {
leConfig.RenewDeadline = cfg.RenewDeadline
}

if cfg.RetryPeriod != 0 {
leConfig.RetryPeriod = cfg.RetryPeriod
}
JaredTan95 marked this conversation as resolved.
Show resolved Hide resolved

return leaderelection.NewLeaderElector(leConfig)
}
3 changes: 3 additions & 0 deletions internal/kubelet/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 63 additions & 0 deletions receiver/k8sobjectsreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,32 @@ The following is example configuration
mode: watch
group: events.k8s.io
namespaces: [default]

k8sobjects/leader-election:
leader_election:
enabled: true
lock_name: "k8sobjects-leader-election"
auth_type: serviceAccount
objects:
- name: pods
mode: pull
label_selector: environment in (production),tier in (frontend)
field_selector: status.phase=Running
interval: 15m
- name: events
mode: watch
group: events.k8s.io
namespaces: [default]
```

Brief description of configuration properties:
- `leader_election`: Leader election mode helps to ensure that only one instance (as leader instance) is collecting `k8sobjects` data, and if the leader instance fails, another one is elected as leader and takes its place.
*Note*: You need to add additional RBAC to perform the leader election mode. See below [Leader Election RBAC](#RBAC-Leader-Election) for more information.
- `enabled` (default = `false`): whether run in leader election mode.
- `lock_name` (default = `k8sobjects`): the identity name of holder, will use component's ID if not set.
- `lease_duration` (default = `15s`): the duration that non-leader candidates will wait to force acquire leadership.
- `renew_deadline` (default = `10s`): the duration that the acting master will retry refreshing leadership before giving up.
- `retry_period` (default = `2s`): the duration the LeaderElector clients should wait between tries of actions.
JaredTan95 marked this conversation as resolved.
Show resolved Hide resolved
- `auth_type` (default = `serviceAccount`): Determines how to authenticate to
the K8s API server. This can be one of `none` (for no auth), `serviceAccount`
(to use the standard service account token provided to the agent pod), or
Expand Down Expand Up @@ -151,6 +174,46 @@ rules:
EOF
```

### RBAC-Leader-Election

Use the below commands to create a `ClusterRole` with `leader election` required permissions and a
`ClusterRoleBinding` to grant the role to the service account created above.

```bash
<<EOF | kubectl apply -f -
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: otelcontribcol
labels:
app: otelcontribcol
rules:
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- create
- get
- update
- apiGroups:
- ""
resources:
- events
- pods
verbs:
- get
- list
- watch
- apiGroups:
- "events.k8s.io"
resources:
- events
verbs:
- watch
EOF
```

```bash
<<EOF | kubectl apply -f -
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
17 changes: 16 additions & 1 deletion receiver/k8sobjectsreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
)
Expand Down Expand Up @@ -46,11 +47,13 @@ type K8sObjectsConfig struct {
type Config struct {
k8sconfig.APIConfig `mapstructure:",squash"`

Objects []*K8sObjectsConfig `mapstructure:"objects"`
Objects []*K8sObjectsConfig `mapstructure:"objects"`
LeaderElection k8sconfig.LeaderElectionConfig `mapstructure:"leader_election"`

// For mocking purposes only.
makeDiscoveryClient func() (discovery.ServerResourcesInterface, error)
makeDynamicClient func() (dynamic.Interface, error)
makeClient func() (kubernetes.Interface, error)
}

func (c *Config) Validate() error {
Expand Down Expand Up @@ -92,6 +95,18 @@ func (c *Config) Validate() error {
return nil
}

func (c *Config) getClient() (kubernetes.Interface, error) {
if c.makeClient != nil {
return c.makeClient()
}
client, err := k8sconfig.MakeClient(c.APIConfig)
if err != nil {
return nil, err
}

return client, nil
}

func (c *Config) getDiscoveryClient() (discovery.ServerResourcesInterface, error) {
if c.makeDiscoveryClient != nil {
return c.makeDiscoveryClient()
Expand Down
11 changes: 9 additions & 2 deletions receiver/k8sobjectsreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap/confmaptest"
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
)

func TestLoadConfig(t *testing.T) {
Expand All @@ -37,7 +39,7 @@ func TestLoadConfig(t *testing.T) {
err = component.ValidateConfig(cfg)
require.NoError(t, err)

expected := []*K8sObjectsConfig{
expectedObjects := []*K8sObjectsConfig{
{
Name: "pods",
Mode: PullMode,
Expand All @@ -63,7 +65,12 @@ func TestLoadConfig(t *testing.T) {
},
},
}
assert.EqualValues(t, expected, cfg.Objects)
expectedLeaderElection := k8sconfig.LeaderElectionConfig{
Enabled: false,
}

assert.EqualValues(t, expectedObjects, cfg.Objects)
assert.EqualValues(t, expectedLeaderElection, cfg.LeaderElection)

}

Expand Down
3 changes: 3 additions & 0 deletions receiver/k8sobjectsreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ func createDefaultConfig() component.Config {
APIConfig: k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeServiceAccount,
},
LeaderElection: k8sconfig.LeaderElectionConfig{
Enabled: false,
},
}
}

Expand Down
3 changes: 3 additions & 0 deletions receiver/k8sobjectsreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ func TestDefaultConfig(t *testing.T) {
APIConfig: k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeServiceAccount,
},
LeaderElection: k8sconfig.LeaderElectionConfig{
Enabled: false,
},
}, rCfg)
}

Expand Down
Loading