Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(reconcileworker): modernize ReconcileWorker #138

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/controllers/automigration/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
"github.com/kubewharf/kubeadmiral/pkg/controllers/common"
"github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util/delayingdeliver"

Check failure on line 44 in pkg/controllers/automigration/controller.go

View workflow job for this annotation

GitHub Actions / test (1.19)

no required module provides package github.com/kubewharf/kubeadmiral/pkg/controllers/util/delayingdeliver; to add it:

Check failure on line 44 in pkg/controllers/automigration/controller.go

View workflow job for this annotation

GitHub Actions / test (1.20)

no required module provides package github.com/kubewharf/kubeadmiral/pkg/controllers/util/delayingdeliver; to add it:
"github.com/kubewharf/kubeadmiral/pkg/controllers/util/eventsink"
utilunstructured "github.com/kubewharf/kubeadmiral/pkg/controllers/util/unstructured"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util/worker"
Expand Down Expand Up @@ -109,7 +109,7 @@

c.worker = worker.NewReconcileWorker(
c.reconcile,
worker.WorkerTiming{},
worker.RateLimiterOptions{},
controllerConfig.WorkerCount,
controllerConfig.Metrics,
delayingdeliver.NewMetricTags("auto-migration-worker", c.typeConfig.GetFederatedType().Kind),
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/federate/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func NewFederateController(

c.worker = worker.NewReconcileWorker(
c.reconcile,
worker.WorkerTiming{},
worker.RateLimiterOptions{},
workerCount,
metrics,
delayingdeliver.NewMetricTags("federate-controller-worker", c.typeConfig.GetFederatedType().Kind),
Expand Down
7 changes: 3 additions & 4 deletions pkg/controllers/federatedcluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,16 @@ func NewFederatedClusterController(

c.worker = worker.NewReconcileWorker(
c.reconcile,
worker.WorkerTiming{},
worker.RateLimiterOptions{},
workerCount,
metrics,
delayingdeliver.NewMetricTags("federatedcluster-worker", "FederatedCluster"),
)

c.statusCollectWorker = worker.NewReconcileWorker(
c.collectClusterStatus,
worker.WorkerTiming{
Interval: 50 * time.Millisecond,
InitialBackoff: 50 * time.Millisecond,
worker.RateLimiterOptions{
InitialDelay: 50 * time.Millisecond,
},
workerCount,
metrics,
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/follower/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func NewFollowerController(
func(qualifiedName common.QualifiedName) worker.Result {
return reconcile(handles, qualifiedName)
},
worker.WorkerTiming{},
worker.RateLimiterOptions{},
workerCount,
c.metrics,
delayingdeliver.NewMetricTags("follower-controller-worker", handles.name),
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/monitor/monitor_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
genericclient "github.com/kubewharf/kubeadmiral/pkg/client/generic"
"github.com/kubewharf/kubeadmiral/pkg/controllers/common"
"github.com/kubewharf/kubeadmiral/pkg/controllers/federatedtypeconfig"

Check failure on line 39 in pkg/controllers/monitor/monitor_controller.go

View workflow job for this annotation

GitHub Actions / test (1.19)

no required module provides package github.com/kubewharf/kubeadmiral/pkg/controllers/federatedtypeconfig; to add it:

Check failure on line 39 in pkg/controllers/monitor/monitor_controller.go

View workflow job for this annotation

GitHub Actions / test (1.20)

no required module provides package github.com/kubewharf/kubeadmiral/pkg/controllers/federatedtypeconfig; to add it:
"github.com/kubewharf/kubeadmiral/pkg/controllers/util"
"github.com/kubewharf/kubeadmiral/pkg/controllers/util/delayingdeliver"
finalizersutil "github.com/kubewharf/kubeadmiral/pkg/controllers/util/finalizers"
Expand Down Expand Up @@ -97,7 +97,7 @@
stopChannels: make(map[string]chan struct{}),
}

c.worker = worker.NewReconcileWorker(c.reconcile, worker.WorkerTiming{}, 1, config.Metrics,
c.worker = worker.NewReconcileWorker(c.reconcile, worker.RateLimiterOptions{}, 1, config.Metrics,
delayingdeliver.NewMetricTags("monitor-worker", ""))
c.meters = &sync.Map{}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/monitor/monitor_subcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func newMonitorSubController(
return nil, err
}

m.worker = worker.NewReconcileWorker(m.reconcile, worker.WorkerTiming{}, controllerConfig.WorkerCount,
m.worker = worker.NewReconcileWorker(m.reconcile, worker.RateLimiterOptions{}, controllerConfig.WorkerCount,
controllerConfig.Metrics, delayingdeliver.NewMetricTags("monitor-subcontroller", m.kind))

m.federatedStore, m.federatedController = util.NewResourceInformer(m.federatedClient,
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/nsautoprop/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func newController(

c.worker = worker.NewReconcileWorker(
c.reconcile,
worker.WorkerTiming{},
worker.RateLimiterOptions{},
controllerConfig.WorkerCount,
controllerConfig.Metrics,
delayingdeliver.NewMetricTags(userAgent, federatedNamespaceApiResource.Kind),
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/override/overridepolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func newController(

c.worker = worker.NewReconcileWorker(
c.reconcile,
worker.WorkerTiming{},
worker.RateLimiterOptions{},
controllerConfig.WorkerCount,
controllerConfig.Metrics,
delayingdeliver.NewMetricTags(c.name, federatedApiResource.Kind),
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/policyrc/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func newController(controllerConfig *util.ControllerConfig,

c.countWorker = worker.NewReconcileWorker(
c.reconcileCount,
worker.WorkerTiming{},
worker.RateLimiterOptions{},
1, // currently only one worker is meaningful due to the global mutex
controllerConfig.Metrics,
delayingdeliver.NewMetricTags("policyrc-controller-count-worker", c.typeConfig.GetFederatedType().Kind),
Expand All @@ -114,7 +114,7 @@ func newController(controllerConfig *util.ControllerConfig,
func(qualifiedName common.QualifiedName) worker.Result {
return c.reconcilePersist("propagation-policy", qualifiedName, c.pp.store, c.cpp.store, c.ppCounter)
},
worker.WorkerTiming{},
worker.RateLimiterOptions{},
controllerConfig.WorkerCount,
controllerConfig.Metrics,
delayingdeliver.NewMetricTags("policyrc-controller-persist-worker", c.typeConfig.GetFederatedType().Kind),
Expand All @@ -123,7 +123,7 @@ func newController(controllerConfig *util.ControllerConfig,
func(qualifiedName common.QualifiedName) worker.Result {
return c.reconcilePersist("override-policy", qualifiedName, c.op.store, c.cop.store, c.opCounter)
},
worker.WorkerTiming{},
worker.RateLimiterOptions{},
controllerConfig.WorkerCount,
controllerConfig.Metrics,
delayingdeliver.NewMetricTags("policyrc-controller-persist-worker", c.typeConfig.GetFederatedType().Kind),
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func NewScheduler(

s.worker = worker.NewReconcileWorker(
s.reconcile,
worker.WorkerTiming{},
worker.RateLimiterOptions{},
workerCount,
metrics,
delayingdeliver.NewMetricTags("scheduler-worker", s.typeConfig.GetFederatedType().Kind),
Expand Down
30 changes: 17 additions & 13 deletions pkg/controllers/status/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
Expand All @@ -58,7 +59,6 @@ import (

const (
StatusControllerName = "status-controller"
allClustersKey = "ALL_CLUSTERS"
)

const (
Expand All @@ -71,7 +71,7 @@ type StatusController struct {

// For triggering reconciliation of all target resources. This is
// used when a new cluster becomes available.
clusterDeliverer *delayingdeliver.DelayingDeliverer
clusterQueue workqueue.DelayingInterface

// Informer for resources in member clusters
informer util.FederatedInformer
Expand Down Expand Up @@ -171,14 +171,14 @@ func newStatusController(

s.worker = worker.NewReconcileWorker(
s.reconcile,
worker.WorkerTiming{},
worker.RateLimiterOptions{},
controllerConfig.WorkerCount,
controllerConfig.Metrics,
delayingdeliver.NewMetricTags("status-worker", typeConfig.GetTargetType().Kind),
)

// Build deliverer for triggering cluster reconciliations.
s.clusterDeliverer = delayingdeliver.NewDelayingDeliverer()
// Build queue for triggering cluster reconciliations.
s.clusterQueue = workqueue.NewNamedDelayingQueue("status-controller-cluster-queue")

// Start informers on the resources for the federated type
enqueueObj := s.worker.EnqueueObject
Expand Down Expand Up @@ -214,11 +214,11 @@ func newStatusController(
&util.ClusterLifecycleHandlerFuncs{
ClusterAvailable: func(cluster *fedcorev1a1.FederatedCluster) {
// When new cluster becomes available process all the target resources again.
s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterAvailableDelay))
s.clusterQueue.AddAfter(struct{}{}, s.clusterAvailableDelay)
},
// When a cluster becomes unavailable process all the target resources again.
ClusterUnavailable: func(cluster *fedcorev1a1.FederatedCluster, _ []interface{}) {
s.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(s.clusterUnavailableDelay))
s.clusterQueue.AddAfter(struct{}{}, s.clusterUnavailableDelay)
},
},
)
Expand All @@ -239,14 +239,18 @@ func (s *StatusController) minimizeLatency() {

// Run runs the status controller
func (s *StatusController) Run(stopChan <-chan struct{}) {
go s.clusterDeliverer.RunMetricLoop(stopChan, 30*time.Second, s.metrics,
delayingdeliver.NewMetricTags("status-clusterDeliverer", s.typeConfig.GetTargetType().Kind))
go s.federatedController.Run(stopChan)
go s.statusController.Run(stopChan)
s.informer.Start()
s.clusterDeliverer.StartWithHandler(func(_ *delayingdeliver.DelayingDelivererItem) {
s.reconcileOnClusterChange()
})
go func() {
for {
_, shutdown := s.clusterQueue.Get()
if shutdown {
break
}
s.reconcileOnClusterChange()
}
}()

if !cache.WaitForNamedCacheSync(s.name, stopChan, s.HasSynced) {
return
Expand All @@ -258,7 +262,7 @@ func (s *StatusController) Run(stopChan <-chan struct{}) {
go func() {
<-stopChan
s.informer.Stop()
s.clusterDeliverer.Stop()
s.clusterQueue.ShutDown()
}()
}

Expand Down
31 changes: 17 additions & 14 deletions pkg/controllers/statusaggregator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
Expand All @@ -51,8 +52,6 @@ import (
const (
ControllerName = "status-aggregator-controller"

allClustersKey = "ALL_CLUSTERS"

EventReasonUpdateSourceObjectStatus = "UpdateSourceObjectStatus"
EventReasonUpdateSourceObjectAnnotation = "UpdateSourceObjectAnnotation"
)
Expand Down Expand Up @@ -80,7 +79,7 @@ type StatusAggregator struct {
informer util.FederatedInformer
// For triggering reconciliation of all target resources. This is
// used when a new cluster becomes available.
clusterDeliverer *delayingdeliver.DelayingDeliverer
clusterQueue workqueue.DelayingInterface
clusterAvailableDelay time.Duration
clusterUnavailableDelay time.Duration
objectEnqueueDelay time.Duration
Expand Down Expand Up @@ -146,15 +145,15 @@ func newStatusAggregator(controllerConfig *util.ControllerConfig,
return nil, err
}

// Build deliverer for triggering cluster reconciliations.
a.clusterDeliverer = delayingdeliver.NewDelayingDeliverer()
// Build queue for triggering cluster reconciliations.
a.clusterQueue = workqueue.NewNamedDelayingQueue("status-aggregator-cluster-queue")
a.clusterAvailableDelay = controllerConfig.ClusterAvailableDelay
a.clusterUnavailableDelay = controllerConfig.ClusterUnavailableDelay
a.objectEnqueueDelay = 10 * time.Second

a.worker = worker.NewReconcileWorker(
a.reconcile,
worker.WorkerTiming{},
worker.RateLimiterOptions{},
controllerConfig.WorkerCount,
controllerConfig.Metrics,
delayingdeliver.NewMetricTags("statusaggregator-worker", typeConfig.GetTargetType().Kind),
Expand Down Expand Up @@ -185,11 +184,11 @@ func newStatusAggregator(controllerConfig *util.ControllerConfig,
&util.ClusterLifecycleHandlerFuncs{
ClusterAvailable: func(cluster *fedcorev1a1.FederatedCluster) {
// When new cluster becomes available process all the target resources again.
a.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(a.clusterAvailableDelay))
a.clusterQueue.AddAfter(struct{}{}, a.clusterAvailableDelay)
},
// When a cluster becomes unavailable process all the target resources again.
ClusterUnavailable: func(cluster *fedcorev1a1.FederatedCluster, _ []interface{}) {
a.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(a.clusterUnavailableDelay))
a.clusterQueue.AddAfter(struct{}{}, a.clusterUnavailableDelay)
},
},
)
Expand All @@ -206,11 +205,15 @@ func (a *StatusAggregator) Run(stopChan <-chan struct{}) {
go a.sourceController.Run(stopChan)
go a.federatedController.Run(stopChan)
a.informer.Start()
a.clusterDeliverer.StartWithHandler(func(_ *delayingdeliver.DelayingDelivererItem) {
a.reconcileOnClusterChange()
})
go a.clusterDeliverer.RunMetricLoop(stopChan, 30*time.Second, a.metrics,
delayingdeliver.NewMetricTags("schedulingpreference-clusterDeliverer", a.typeConfig.GetTargetType().Kind))
go func() {
for {
_, shutdown := a.clusterQueue.Get()
if shutdown {
break
}
a.reconcileOnClusterChange()
}
}()
if !cache.WaitForNamedCacheSync(a.name, stopChan, a.HasSynced) {
return
}
Expand All @@ -225,7 +228,7 @@ func (a *StatusAggregator) Run(stopChan <-chan struct{}) {
}()
<-stopChan
a.informer.Stop()
a.clusterDeliverer.Stop()
a.clusterQueue.ShutDown()
}()
}

Expand Down
Loading
Loading