Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove ftc manager and ftc controller #142

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 1 addition & 26 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/healthz"

"github.com/kubewharf/kubeadmiral/cmd/controller-manager/app/options"
fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
"github.com/kubewharf/kubeadmiral/pkg/controllermanager"
"github.com/kubewharf/kubeadmiral/pkg/controllermanager/healthcheck"
fedleaderelection "github.com/kubewharf/kubeadmiral/pkg/controllermanager/leaderelection"
Expand All @@ -37,14 +36,12 @@ import (

const (
FederatedClusterControllerName = "cluster"
TypeConfigControllerName = "typeconfig"
MonitorControllerName = "monitor"
FollowerControllerName = "follower"
)

var knownControllers = map[string]controllermanager.StartControllerFunc{
FederatedClusterControllerName: startFederatedClusterController,
TypeConfigControllerName: startTypeConfigController,
MonitorControllerName: startMonitorController,
FollowerControllerName: startFollowerController,
}
Expand Down Expand Up @@ -77,7 +74,7 @@ func Run(ctx context.Context, opts *options.Options) {
defer klog.Infoln("Ready to stop controllers")
klog.Infoln("Ready to start controllers")

err := startControllers(ctx, controllerCtx, knownControllers, knownFTCSubControllers, opts.Controllers, healthCheckHandler)
err := startControllers(ctx, controllerCtx, knownControllers, opts.Controllers, healthCheckHandler)
if err != nil {
klog.Fatalf("Error starting controllers %s: %v", opts.Controllers, err)
}
Expand Down Expand Up @@ -127,7 +124,6 @@ func startControllers(
ctx context.Context,
controllerCtx *controllercontext.Context,
startControllerFuncs map[string]controllermanager.StartControllerFunc,
ftcSubControllerInitFuncs map[string]controllermanager.FTCSubControllerInitFuncs,
enabledControllers []string,
healthCheckHandler *healthcheck.MutableHealthCheckHandler,
) error {
Expand All @@ -153,26 +149,5 @@ func startControllers(
})
}

manager := NewFederatedTypeConfigManager(
controllerCtx.FedInformerFactory.Core().V1alpha1().FederatedTypeConfigs(),
controllerCtx,
healthCheckHandler,
controllerCtx.Metrics,
)
for controllerName, initFuncs := range ftcSubControllerInitFuncs {
controllerName := controllerName
initFuncs := initFuncs
manager.RegisterSubController(controllerName, initFuncs.StartFunc, func(typeConfig *fedcorev1a1.FederatedTypeConfig) bool {
if !isControllerEnabled(controllerName, controllersDisabledByDefault, enabledControllers) {
return false
}
if initFuncs.IsEnabledFunc != nil {
return initFuncs.IsEnabledFunc(typeConfig)
}
return true
})
}
go manager.Run(ctx)

return nil
}
21 changes: 0 additions & 21 deletions cmd/controller-manager/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,27 +58,6 @@ func startFederatedClusterController(ctx context.Context, controllerCtx *control
return clusterController, nil
}

func startTypeConfigController(ctx context.Context, controllerCtx *controllercontext.Context) (controllermanager.Controller, error) {
controllerConfig := controllerConfigFromControllerContext(controllerCtx)
//nolint:contextcheck
typeConfigController, err := federatedtypeconfig.NewController(
controllerConfig,
controllerCtx.KubeClientset,
controllerCtx.DynamicClientset,
controllerCtx.FedClientset,
controllerCtx.KubeInformerFactory,
controllerCtx.DynamicInformerFactory,
controllerCtx.FedInformerFactory,
)
if err != nil {
return nil, fmt.Errorf("error creating type config controller: %w", err)
}

go typeConfigController.Run(ctx.Done())

return typeConfigController, nil
}

func startMonitorController(ctx context.Context, controllerCtx *controllercontext.Context) (controllermanager.Controller, error) {
controllerConfig := controllerConfigFromControllerContext(controllerCtx)
//nolint:contextcheck
Expand Down
207 changes: 0 additions & 207 deletions cmd/controller-manager/app/ftcmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,7 @@ limitations under the License.
package app

import (
"context"
"fmt"
"net/http"
"sync"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

fedcorev1a1informers "github.com/kubewharf/kubeadmiral/pkg/client/informers/externalversions/core/v1alpha1"
"github.com/kubewharf/kubeadmiral/pkg/controllermanager"
"github.com/kubewharf/kubeadmiral/pkg/controllermanager/healthcheck"
"github.com/kubewharf/kubeadmiral/pkg/controllers/common"
controllercontext "github.com/kubewharf/kubeadmiral/pkg/controllers/context"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util/delayingdeliver"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util/worker"
"github.com/kubewharf/kubeadmiral/pkg/stats"
)

const (
Expand All @@ -59,191 +40,3 @@ var knownFTCSubControllers = map[string]controllermanager.FTCSubControllerInitFu
IsEnabledFunc: isAutoMigrationControllerEnabled,
},
}

type FederatedTypeConfigManager struct {
informer fedcorev1a1informers.FederatedTypeConfigInformer
handle cache.ResourceEventHandlerRegistration

lock sync.Mutex
registeredSubControllers map[string]controllermanager.StartFTCSubControllerFunc
isSubControllerEnabledFuncs map[string]controllermanager.IsFTCSubControllerEnabledFunc

subControllerContexts map[string]context.Context
subControllerCancelFuncs map[string]context.CancelFunc
startedSubControllers map[string]sets.Set[string]

healthCheckHandler *healthcheck.MutableHealthCheckHandler
worker worker.ReconcileWorker
controllerCtx *controllercontext.Context

metrics stats.Metrics
logger klog.Logger
}

func NewFederatedTypeConfigManager(
informer fedcorev1a1informers.FederatedTypeConfigInformer,
controllerCtx *controllercontext.Context,
healthCheckHandler *healthcheck.MutableHealthCheckHandler,
metrics stats.Metrics,
) *FederatedTypeConfigManager {
m := &FederatedTypeConfigManager{
informer: informer,
lock: sync.Mutex{},
registeredSubControllers: map[string]controllermanager.StartFTCSubControllerFunc{},
isSubControllerEnabledFuncs: map[string]controllermanager.IsFTCSubControllerEnabledFunc{},
subControllerContexts: map[string]context.Context{},
subControllerCancelFuncs: map[string]context.CancelFunc{},
startedSubControllers: map[string]sets.Set[string]{},
controllerCtx: controllerCtx,
healthCheckHandler: healthCheckHandler,
metrics: metrics,
logger: klog.LoggerWithValues(klog.Background(), "controller", "federated-type-config-manager"),
}

m.worker = worker.NewReconcileWorker(
m.reconcile,
worker.WorkerTiming{},
1,
metrics,
delayingdeliver.NewMetricTags("federated-type-config-manager", "FederatedTypeConfig"),
)

m.handle, _ = informer.Informer().AddEventHandler(util.NewTriggerOnAllChanges(m.worker.EnqueueObject))

return m
}

func (m *FederatedTypeConfigManager) RegisterSubController(
name string,
startFunc controllermanager.StartFTCSubControllerFunc,
isEnabledFunc controllermanager.IsFTCSubControllerEnabledFunc,
) {
m.lock.Lock()
defer m.lock.Unlock()
m.registeredSubControllers[name] = startFunc
m.isSubControllerEnabledFuncs[name] = isEnabledFunc
}

func (m *FederatedTypeConfigManager) Run(ctx context.Context) {
m.logger.Info("Starting FederatedTypeConfig manager")
defer m.logger.Info("Stopping FederatedTypeConfig manager")

if !cache.WaitForNamedCacheSync("federated-type-config-manager", ctx.Done(), m.informer.Informer().HasSynced) {
return
}

m.worker.Run(ctx.Done())
<-ctx.Done()
}

func (m *FederatedTypeConfigManager) reconcile(qualifiedName common.QualifiedName) (status worker.Result) {
_ = m.metrics.Rate("federated-type-config-manager.throughput", 1)
key := qualifiedName.String()
logger := m.logger.WithValues("federated-type-config", key)
startTime := time.Now()

logger.V(3).Info("Start reconcile")
defer m.metrics.Duration("federated-type-config-manager.latency", startTime)
defer func() {
logger.WithValues("duration", time.Since(startTime), "status", status.String()).V(3).Info("Finished reconcile")
}()

typeConfig, err := m.informer.Lister().Get(qualifiedName.Name)
if err != nil && apierrors.IsNotFound(err) {
logger.V(3).Info("Observed FederatedTypeConfig deletion")
m.processFTCDeletion(qualifiedName.Name)
return worker.StatusAllOK
}
if err != nil {
logger.Error(err, "Failed to get FederatedTypeConfig")
return worker.StatusError
}

m.lock.Lock()
defer m.lock.Unlock()

startedSubControllers, ok := m.startedSubControllers[qualifiedName.Name]
if !ok {
startedSubControllers = sets.New[string]()
m.startedSubControllers[qualifiedName.Name] = startedSubControllers
}
subControllerCtx, ok := m.subControllerContexts[qualifiedName.Name]
if !ok {
subControllerCtx, m.subControllerCancelFuncs[qualifiedName.Name] = context.WithCancel(context.TODO())
m.subControllerContexts[qualifiedName.Name] = subControllerCtx
}

needRetry := false
for controllerName, startFunc := range m.registeredSubControllers {
logger := logger.WithValues("subcontroller", controllerName)

if startedSubControllers.Has(controllerName) {
logger.V(3).Info("Subcontroller already started")
continue
}

isEnabledFunc := m.isSubControllerEnabledFuncs[controllerName]
if isEnabledFunc != nil && !isEnabledFunc(typeConfig) {
logger.V(3).Info("Skip starting subcontroller, is disabled")
continue
}

controller, err := startFunc(subControllerCtx, m.controllerCtx, typeConfig)
if err != nil {
logger.Error(err, "Failed to start subcontroller")
needRetry = true
continue
} else {
logger.Info("Started subcontroller")
startedSubControllers.Insert(controllerName)
}

m.healthCheckHandler.AddReadyzChecker(
resolveSubcontrollerName(controllerName, qualifiedName.Name),
func(_ *http.Request) error {
if controller.IsControllerReady() {
return nil
}
return fmt.Errorf("controller not ready")
},
)
}

// Since the controllers are created dynamically, we have to start the informer factories again, in case any new
// informers were accessed. Note that a different context is used in case a FTC is recreated and the same informer
// needs to be used again (SharedInformerFactory and SharedInformers do not support restarts).
ctx := context.TODO()
m.controllerCtx.KubeInformerFactory.Start(ctx.Done())
m.controllerCtx.DynamicInformerFactory.Start(ctx.Done())
m.controllerCtx.FedInformerFactory.Start(ctx.Done())

if needRetry {
return worker.StatusError
}

return worker.StatusAllOK
}

func (m *FederatedTypeConfigManager) processFTCDeletion(ftcName string) {
m.lock.Lock()
defer m.lock.Unlock()

cancel, ok := m.subControllerCancelFuncs[ftcName]
if !ok {
return
}

cancel()

for controller := range m.startedSubControllers[ftcName] {
m.healthCheckHandler.RemoveReadyzChecker(resolveSubcontrollerName(controller, ftcName))
}

delete(m.subControllerCancelFuncs, ftcName)
delete(m.subControllerContexts, ftcName)
delete(m.startedSubControllers, ftcName)
}

func resolveSubcontrollerName(baseName, ftcName string) string {
return fmt.Sprintf("%s[%s]", ftcName, baseName)
}
Loading
Loading