Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[extension/leader elector] Initial implementation for the extension leader elector #37144

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions extension/leaderelector/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
34 changes: 34 additions & 0 deletions extension/leaderelector/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Leader Elector Extension

This extension enables OpenTelemetry components to run in HA mode across a Kubernetes cluster. The component that owns the lease becomes the leader and becomes the active instance.
## How It Works

The extension uses k8s.io/client-go/tools/leaderelection to perform leader election. The component that owns the lease becomes the leader and runs the function defined in onStartedLeading. If the leader loses the lease, it runs the function defined in onStoppedLeading, stops its operation, and waits to acquire the lease again.
## Configuration

```yaml
receivers:
my_awesome_receiver:
leader_elector: leader_elector
extensions:
leader_elector:
auth_type: kubeConfig
lease_name: foo
lease_namespace: default

service:
extensions: [leader_elector]
pipelines:
metrics:
receivers: [my_awesome_receiver]
```
### Leader Election Configuration
Copy link
Member

@JaredTan95 JaredTan95 Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to give a rolebingding example about the lease that serviceaccount needs to bind with ClusterRoleBinding ClusterRole.

See the example in Kubernetes Events Receiver: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/k8seventsreceiver#rbac

| configuration | description | default value |
|---------------------|-------------------------------------------------------------------------------|-----------------|
| **auth_type** | Authorization type to be used (serviceAccount, kubeConfig). | none (required) |
| **lease_name** | The name of the lease object. | none (required) |
| **lease_namespace** | The namespace of the lease object. | none (required) |
| **lease_duration** | The duration of the lease. | 15s |
| **renew_deadline** | The deadline for renewing the lease. It must be less than the lease duration. | 10s |
| **retry_period** | The period for retrying the leader election. | 2s |
35 changes: 35 additions & 0 deletions extension/leaderelector/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package leaderelector

import (
"time"

"go.opentelemetry.io/collector/component"
"k8s.io/client-go/kubernetes"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
)

// Config is the configuration for the leader elector extension.
type Config struct {
k8sconfig.APIConfig `mapstructure:",squash"`
LeaseName string `mapstructure:"lease_name"`
LeaseNamespace string `mapstructure:"lease_namespace"`
LeaseDuration time.Duration `mapstructure:"lease_duration"`
RenewDuration time.Duration `mapstructure:"renew_deadline"`
RetryPeriod time.Duration `mapstructure:"retry_period"`
makeClient func(apiConf k8sconfig.APIConfig) (kubernetes.Interface, error)
}

type LeaderElector struct{}

func (cfg *Config) getK8sClient() (kubernetes.Interface, error) {
if cfg.makeClient == nil {
cfg.makeClient = k8sconfig.MakeClient
}
return cfg.makeClient(cfg.APIConfig)
}

var _ component.Config = (*Config)(nil)
67 changes: 67 additions & 0 deletions extension/leaderelector/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package leaderelector

import (
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap/confmaptest"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/leaderelector/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
)

func TestLoadConfig(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)

tests := []struct {
id component.ID
expectedConfig component.Config
}{
{
id: component.NewIDWithName(metadata.Type, "defaults"),
expectedConfig: &Config{
APIConfig: k8sconfig.APIConfig{
AuthType: "kubeConfig",
},
LeaseName: "foo",
LeaseNamespace: "default",
LeaseDuration: 15 * time.Second,
RenewDuration: 10 * time.Second,
RetryPeriod: 2 * time.Second,
},
},
{
id: component.NewIDWithName(metadata.Type, "with_lease_duration"),
expectedConfig: &Config{
APIConfig: k8sconfig.APIConfig{
AuthType: "kubeConfig",
},
LeaseName: "bar",
LeaseNamespace: "default",
LeaseDuration: 20 * time.Second,
RenewDuration: 10 * time.Second,
RetryPeriod: 2 * time.Second,
},
},
}

for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

sub, err := cm.Sub(tt.id.String())
require.NoError(t, err)
require.NoError(t, sub.Unmarshal(cfg))

require.Equal(t, tt.expectedConfig, cfg)
})
}
}
8 changes: 8 additions & 0 deletions extension/leaderelector/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:generate mdatagen metadata.yaml

// Package myextension implements an extension

package leaderelector
97 changes: 97 additions & 0 deletions extension/leaderelector/extension.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package leaderelector

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension"
"go.uber.org/zap"
"k8s.io/client-go/kubernetes"
)

type (
StartCallback = func(context.Context)
StopCallback = func()
)

// LeaderElection Interface allows the invoker to set the callback functions
// that would be invoked when the leader wins or loss the election.
type LeaderElection interface {
extension.Extension
SetCallBackFuncs(StartCallback, StopCallback)
}

// SetCallBackFuncs set the functions that can be invoked when the leader wins or loss the election
func (lee *leaderElectionExtension) SetCallBackFuncs(onStartLeading StartCallback, onStopLeading StopCallback) {
lee.onStartedLeading = append(lee.onStartedLeading, onStartLeading)
lee.onStoppedLeading = append(lee.onStoppedLeading, onStopLeading)
}

// leaderElectionExtension is the main struct implementing the extension's behavior.
type leaderElectionExtension struct {
config *Config
cancel context.CancelFunc
client kubernetes.Interface
logger *zap.Logger
leaseHolderId string

onStartedLeading []StartCallback
onStoppedLeading []StopCallback
}

// If the receiver sets a callback function then it would be invoked when the leader wins the election
func (lee *leaderElectionExtension) startedLeading(ctx context.Context) {
for _, callback := range lee.onStartedLeading {
callback(ctx)
}
}

// If the receiver sets a callback function then it would be invoked when the leader loss the election
func (lee *leaderElectionExtension) stoppedLeading() {
for _, callback := range lee.onStoppedLeading {
callback()
}
}

// Start begins the extension's processing.
func (lee *leaderElectionExtension) Start(_ context.Context, host component.Host) error {
lee.logger.Info("Starting Leader Elector")

ctx := context.Background()
ctx, lee.cancel = context.WithCancel(ctx)

// Create the leader elector
leaderElector, err := NewLeaderElector(lee.config, lee.client, lee.startedLeading, lee.stoppedLeading, lee.leaseHolderId)
if err != nil {
lee.logger.Error("Failed to create leader elector", zap.Error(err))
return err
}

go func() {
// Leader election loop stops if context is canceled or the leader elector loses the lease.
// The loop allows continued participation in leader election, even if the lease is lost.
for {
leaderElector.Run(ctx)

if ctx.Err() != nil {
break
}

lee.logger.Info("Leader lease lost. Returning to standby mode...")
}
}()

return nil
}

// Shutdown cleans up the extension when stopping.
func (lee *leaderElectionExtension) Shutdown(ctx context.Context) error {
lee.logger.Info("Stopping Leader Elector")
if lee.cancel != nil {
lee.cancel()
}
return nil
}
74 changes: 74 additions & 0 deletions extension/leaderelector/extension_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package leaderelector

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/utils/ptr"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
)

func TestExtension(t *testing.T) {
config := &Config{
LeaseName: "foo",
LeaseNamespace: "default",
LeaseDuration: 15 * time.Second,
RenewDuration: 10 * time.Second,
RetryPeriod: 2 * time.Second,
}

iamInvokedOnLeading := false

ctx := context.TODO()
fakeClient := fake.NewClientset()
config.makeClient = func(apiConfig k8sconfig.APIConfig) (kubernetes.Interface, error) {
return fakeClient, nil
}

observedZapCore, _ := observer.New(zap.WarnLevel)

leaderElection := leaderElectionExtension{
config: config,
client: fakeClient,
logger: zap.New(observedZapCore),
leaseHolderId: "foo",
}

leaderElection.SetCallBackFuncs(
func(ctx context.Context) {
iamInvokedOnLeading = true
fmt.Printf("LeaderElection started leading")
},
func() {
fmt.Printf("LeaderElection stopped leading")
},
)

require.NoError(t, leaderElection.Start(ctx, componenttest.NewNopHost()))

expectedLeaseDurationSeconds := ptr.To(int32(15))

require.Eventually(t, func() bool {
lease, err := fakeClient.CoordinationV1().Leases("default").Get(ctx, "foo", metav1.GetOptions{})
require.NoError(t, err)
require.NotNil(t, lease)
require.Equal(t, expectedLeaseDurationSeconds, lease.Spec.LeaseDurationSeconds)
return true
}, 10*time.Second, 100*time.Millisecond)

require.True(t, iamInvokedOnLeading)
require.NoError(t, leaderElection.Shutdown(ctx))
}
76 changes: 76 additions & 0 deletions extension/leaderelector/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package leaderelector

import (
"context"
"errors"
"os"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/leaderelector/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
)

const (
defaultLeaseDuration = 15 * time.Second
defaultRenewDeadline = 10 * time.Second
defaultRetryPeriod = 2 * time.Second
)

// CreateDefaultConfig returns the default configuration for the extension.
func CreateDefaultConfig() component.Config {
return &Config{
APIConfig: k8sconfig.APIConfig{
AuthType: k8sconfig.AuthTypeServiceAccount,
},
LeaseDuration: defaultLeaseDuration,
RenewDuration: defaultRenewDeadline,
RetryPeriod: defaultRetryPeriod,
}
}

// CreateExtension creates the extension instance based on the configuration.
func CreateExtension(
ctx context.Context,
set extension.Settings,
cfg component.Config,
) (extension.Extension, error) {
baseCfg, ok := cfg.(*Config)
if !ok {
return nil, errors.New("Invalid config, cannot create extension leaderelector")
}

// Initialize k8s client in factory as doing it in extension.Start()
// should cause race condition as http Proxy gets shared.
client, err := baseCfg.getK8sClient()
if err != nil {
return nil, errors.New("failed to create k8s client")
}

leaseHolderID, err := os.Hostname()
if err != nil {
return nil, err
}

return &leaderElectionExtension{
config: baseCfg,
logger: set.Logger,
client: client,
leaseHolderId: leaseHolderID,
}, nil
}

// NewFactory creates a new factory for your extension.
func NewFactory() extension.Factory {
return extension.NewFactory(
component.MustNewType(metadata.Type.String()),
CreateDefaultConfig,
CreateExtension,
component.StabilityLevelDevelopment,
)
}
Loading
Loading