Skip to content

Commit

Permalink
feat(worker): add metrics for asyncworker
Browse files Browse the repository at this point in the history
Signed-off-by: chang.qiangqiang <[email protected]>
  • Loading branch information
CharlesQQ committed Nov 14, 2024
1 parent 90fe21e commit 050580f
Show file tree
Hide file tree
Showing 24 changed files with 226 additions and 88 deletions.
1 change: 1 addition & 0 deletions cmd/agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ func run(ctx context.Context, opts *options.Options) error {
crtlmetrics.Registry.MustRegister(metrics.ClusterCollectors()...)
crtlmetrics.Registry.MustRegister(metrics.ResourceCollectorsForAgent()...)
crtlmetrics.Registry.MustRegister(metrics.PoolCollectors()...)
crtlmetrics.Registry.MustRegister(metrics.AsyncWorkerCollectors()...)

if err = setupControllers(controllerManager, opts, ctx.Done()); err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func Run(ctx context.Context, opts *options.Options) error {
crtlmetrics.Registry.MustRegister(metrics.ClusterCollectors()...)
crtlmetrics.Registry.MustRegister(metrics.ResourceCollectors()...)
crtlmetrics.Registry.MustRegister(metrics.PoolCollectors()...)
crtlmetrics.Registry.MustRegister(metrics.AsyncWorkerCollectors()...)

setupControllers(controllerManager, opts, ctx.Done())

Expand Down
10 changes: 5 additions & 5 deletions pkg/clusterdiscovery/clusterapi/clusterapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ import (
"github.com/karmada-io/karmada/pkg/karmadactl/options"
"github.com/karmada-io/karmada/pkg/karmadactl/unjoin"
"github.com/karmada-io/karmada/pkg/karmadactl/util/apiclient"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/worker"
)

const (
Expand All @@ -62,7 +62,7 @@ type ClusterDetector struct {
ClusterAPIClient client.Client
InformerManager genericmanager.SingleClusterInformerManager
EventHandler cache.ResourceEventHandler
Processor util.AsyncWorker
Processor worker.AsyncWorker
ConcurrentReconciles int

stopCh <-chan struct{}
Expand All @@ -74,12 +74,12 @@ func (d *ClusterDetector) Start(ctx context.Context) error {
d.stopCh = ctx.Done()

d.EventHandler = fedinformer.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete)
workerOptions := util.Options{
workerOptions := worker.Options{
Name: "cluster-api cluster detector",
KeyFunc: ClusterWideKeyFunc,
ReconcileFunc: d.Reconcile,
}
d.Processor = util.NewAsyncWorker(workerOptions)
d.Processor = worker.NewAsyncWorker(workerOptions)
d.Processor.Run(d.ConcurrentReconciles, d.stopCh)
d.discoveryCluster()

Expand Down Expand Up @@ -120,7 +120,7 @@ func (d *ClusterDetector) OnDelete(obj interface{}) {

// Reconcile performs a full reconciliation for the object referred to by the key.
// The key will be re-queued if an error is non-nil.
func (d *ClusterDetector) Reconcile(key util.QueueKey) error {
func (d *ClusterDetector) Reconcile(key worker.QueueKey) error {
clusterWideKey, ok := key.(keys.ClusterWideKey)
if !ok {
klog.Errorf("Invalid key")
Expand Down
4 changes: 2 additions & 2 deletions pkg/clusterdiscovery/clusterapi/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ limitations under the License.
package clusterapi

import (
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/worker"
)

// ClusterWideKeyFunc generates a ClusterWideKey for object.
func ClusterWideKeyFunc(obj interface{}) (util.QueueKey, error) {
func ClusterWideKeyFunc(obj interface{}) (worker.QueueKey, error) {
return keys.ClusterWideKeyFunc(obj)
}
18 changes: 9 additions & 9 deletions pkg/controllers/cluster/taint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import (
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/features"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/worker"
)

// TaintManagerName is the controller name that will be used when reporting events and metrics.
Expand All @@ -51,8 +51,8 @@ type NoExecuteTaintManager struct {
ClusterTaintEvictionRetryFrequency time.Duration
ConcurrentReconciles int

bindingEvictionWorker util.AsyncWorker
clusterBindingEvictionWorker util.AsyncWorker
bindingEvictionWorker worker.AsyncWorker
clusterBindingEvictionWorker worker.AsyncWorker
}

// Reconcile performs a full reconciliation for the object referred to by the Request.
Expand Down Expand Up @@ -117,27 +117,27 @@ func (tc *NoExecuteTaintManager) syncCluster(ctx context.Context, cluster *clust

// Start starts an asynchronous loop that handle evictions.
func (tc *NoExecuteTaintManager) Start(ctx context.Context) error {
bindingEvictionWorkerOptions := util.Options{
bindingEvictionWorkerOptions := worker.Options{
Name: "binding-eviction",
KeyFunc: nil,
ReconcileFunc: tc.syncBindingEviction,
}
tc.bindingEvictionWorker = util.NewAsyncWorker(bindingEvictionWorkerOptions)
tc.bindingEvictionWorker = worker.NewAsyncWorker(bindingEvictionWorkerOptions)
tc.bindingEvictionWorker.Run(tc.ConcurrentReconciles, ctx.Done())

clusterBindingEvictionWorkerOptions := util.Options{
clusterBindingEvictionWorkerOptions := worker.Options{
Name: "cluster-binding-eviction",
KeyFunc: nil,
ReconcileFunc: tc.syncClusterBindingEviction,
}
tc.clusterBindingEvictionWorker = util.NewAsyncWorker(clusterBindingEvictionWorkerOptions)
tc.clusterBindingEvictionWorker = worker.NewAsyncWorker(clusterBindingEvictionWorkerOptions)
tc.clusterBindingEvictionWorker.Run(tc.ConcurrentReconciles, ctx.Done())

<-ctx.Done()
return nil
}

func (tc *NoExecuteTaintManager) syncBindingEviction(key util.QueueKey) error {
func (tc *NoExecuteTaintManager) syncBindingEviction(key worker.QueueKey) error {
fedKey, ok := key.(keys.FederatedKey)
if !ok {
klog.Errorf("Failed to sync binding eviction as invalid key: %v", key)
Expand Down Expand Up @@ -193,7 +193,7 @@ func (tc *NoExecuteTaintManager) syncBindingEviction(key util.QueueKey) error {
return nil
}

func (tc *NoExecuteTaintManager) syncClusterBindingEviction(key util.QueueKey) error {
func (tc *NoExecuteTaintManager) syncClusterBindingEviction(key worker.QueueKey) error {
fedKey, ok := key.(keys.FederatedKey)
if !ok {
klog.Errorf("Failed to sync cluster binding eviction as invalid key: %v", key)
Expand Down
9 changes: 5 additions & 4 deletions pkg/controllers/cluster/taint_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/gclient"
"github.com/karmada-io/karmada/pkg/util/worker"
)

func newNoExecuteTaintManager() *NoExecuteTaintManager {
Expand All @@ -57,19 +58,19 @@ func newNoExecuteTaintManager() *NoExecuteTaintManager {
WithIndex(&workv1alpha2.ResourceBinding{}, rbClusterKeyIndex, rbIndexerFunc).
WithIndex(&workv1alpha2.ClusterResourceBinding{}, crbClusterKeyIndex, crbIndexerFunc).Build(),
}
bindingEvictionWorkerOptions := util.Options{
bindingEvictionWorkerOptions := worker.Options{
Name: "binding-eviction",
KeyFunc: nil,
ReconcileFunc: mgr.syncBindingEviction,
}
mgr.bindingEvictionWorker = util.NewAsyncWorker(bindingEvictionWorkerOptions)
mgr.bindingEvictionWorker = worker.NewAsyncWorker(bindingEvictionWorkerOptions)

clusterBindingEvictionWorkerOptions := util.Options{
clusterBindingEvictionWorkerOptions := worker.Options{
Name: "cluster-binding-eviction",
KeyFunc: nil,
ReconcileFunc: mgr.syncClusterBindingEviction,
}
mgr.clusterBindingEvictionWorker = util.NewAsyncWorker(clusterBindingEvictionWorkerOptions)
mgr.clusterBindingEvictionWorker = worker.NewAsyncWorker(clusterBindingEvictionWorkerOptions)
return mgr
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"

"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/worker"
)

const (
Expand All @@ -40,16 +40,16 @@ type HpaScaleTargetMarker struct {
DynamicClient dynamic.Interface
RESTMapper meta.RESTMapper

scaleTargetWorker util.AsyncWorker
scaleTargetWorker worker.AsyncWorker
}

// SetupWithManager creates a controller and register to controller manager.
func (r *HpaScaleTargetMarker) SetupWithManager(mgr controllerruntime.Manager) error {
scaleTargetWorkerOptions := util.Options{
scaleTargetWorkerOptions := worker.Options{
Name: "scale target worker",
ReconcileFunc: r.reconcileScaleRef,
}
r.scaleTargetWorker = util.NewAsyncWorker(scaleTargetWorkerOptions)
r.scaleTargetWorker = worker.NewAsyncWorker(scaleTargetWorkerOptions)
r.scaleTargetWorker.Run(scaleTargetWorkerNum, context.Background().Done())

return controllerruntime.NewControllerManagedBy(mgr).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/worker"
)

type labelEventKind int
Expand All @@ -45,7 +46,7 @@ type labelEvent struct {
hpa *autoscalingv2.HorizontalPodAutoscaler
}

func (r *HpaScaleTargetMarker) reconcileScaleRef(key util.QueueKey) (err error) {
func (r *HpaScaleTargetMarker) reconcileScaleRef(key worker.QueueKey) (err error) {
event, ok := key.(labelEvent)
if !ok {
klog.Errorf("Found invalid key when reconciling hpa scale ref: %+v", key)
Expand Down
9 changes: 5 additions & 4 deletions pkg/controllers/mcs/service_export_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/names"
"github.com/karmada-io/karmada/pkg/util/worker"
)

// ServiceExportControllerName is the controller name that will be used when reporting events and metrics.
Expand All @@ -75,7 +76,7 @@ type ServiceExportController struct {
// "member1": instance of ResourceEventHandler
eventHandlers sync.Map
// worker process resources periodic from rateLimitingQueue.
worker util.AsyncWorker
worker worker.AsyncWorker
}

var (
Expand Down Expand Up @@ -136,12 +137,12 @@ func (c *ServiceExportController) SetupWithManager(mgr controllerruntime.Manager

// RunWorkQueue initializes worker and run it, worker will process resource asynchronously.
func (c *ServiceExportController) RunWorkQueue() {
workerOptions := util.Options{
workerOptions := worker.Options{
Name: "service-export",
KeyFunc: nil,
ReconcileFunc: c.syncServiceExportOrEndpointSlice,
}
c.worker = util.NewAsyncWorker(workerOptions)
c.worker = worker.NewAsyncWorker(workerOptions)
c.worker.Run(c.WorkerNumber, c.StopChan)

go c.enqueueReportedEpsServiceExport()
Expand Down Expand Up @@ -191,7 +192,7 @@ func (c *ServiceExportController) enqueueReportedEpsServiceExport() {
}
}

func (c *ServiceExportController) syncServiceExportOrEndpointSlice(key util.QueueKey) error {
func (c *ServiceExportController) syncServiceExportOrEndpointSlice(key worker.QueueKey) error {
ctx := context.Background()
fedKey, ok := key.(keys.FederatedKey)
if !ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/names"
"github.com/karmada-io/karmada/pkg/util/worker"
)

// EndpointSliceCollectController collects EndpointSlice from member clusters and reports them to control-plane.
Expand All @@ -64,7 +65,7 @@ type EndpointSliceCollectController struct {
// Each handler takes the cluster name as key and takes the handler function as the value, e.g.
// "member1": instance of ResourceEventHandler
eventHandlers sync.Map
worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue.
worker worker.AsyncWorker // worker process resources periodic from rateLimitingQueue.

ClusterCacheSyncTimeout metav1.Duration
}
Expand Down Expand Up @@ -124,16 +125,16 @@ func (c *EndpointSliceCollectController) SetupWithManager(mgr controllerruntime.

// RunWorkQueue initializes worker and run it, worker will process resource asynchronously.
func (c *EndpointSliceCollectController) RunWorkQueue() {
workerOptions := util.Options{
workerOptions := worker.Options{
Name: "endpointslice-collect",
KeyFunc: nil,
ReconcileFunc: c.collectEndpointSlice,
}
c.worker = util.NewAsyncWorker(workerOptions)
c.worker = worker.NewAsyncWorker(workerOptions)
c.worker.Run(c.WorkerNumber, c.StopChan)
}

func (c *EndpointSliceCollectController) collectEndpointSlice(key util.QueueKey) error {
func (c *EndpointSliceCollectController) collectEndpointSlice(key worker.QueueKey) error {
ctx := context.Background()
fedKey, ok := key.(keys.FederatedKey)
if !ok {
Expand Down
11 changes: 6 additions & 5 deletions pkg/controllers/status/work_status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/karmada-io/karmada/pkg/util/names"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
"github.com/karmada-io/karmada/pkg/util/restmapper"
"github.com/karmada-io/karmada/pkg/util/worker"
)

// WorkStatusControllerName is the controller name that will be used when reporting events and metrics.
Expand All @@ -66,7 +67,7 @@ type WorkStatusController struct {
InformerManager genericmanager.MultiClusterInformerManager
eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster.
StopChan <-chan struct{}
worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue.
worker worker.AsyncWorker // worker process resources periodic from rateLimitingQueue.
// ConcurrentWorkStatusSyncs is the number of Work status that are allowed to sync concurrently.
ConcurrentWorkStatusSyncs int
ObjectWatcher objectwatcher.ObjectWatcher
Expand Down Expand Up @@ -142,17 +143,17 @@ func (c *WorkStatusController) getEventHandler() cache.ResourceEventHandler {

// RunWorkQueue initializes worker and run it, worker will process resource asynchronously.
func (c *WorkStatusController) RunWorkQueue() {
workerOptions := util.Options{
workerOptions := worker.Options{
Name: "work-status",
KeyFunc: generateKey,
ReconcileFunc: c.syncWorkStatus,
}
c.worker = util.NewAsyncWorker(workerOptions)
c.worker = worker.NewAsyncWorker(workerOptions)
c.worker.Run(c.ConcurrentWorkStatusSyncs, c.StopChan)
}

// generateKey generates a key from obj, the key contains cluster, GVK, namespace and name.
func generateKey(obj interface{}) (util.QueueKey, error) {
func generateKey(obj interface{}) (worker.QueueKey, error) {
resource := obj.(*unstructured.Unstructured)
cluster, err := getClusterNameFromAnnotation(resource)
if err != nil {
Expand Down Expand Up @@ -183,7 +184,7 @@ func getClusterNameFromAnnotation(resource *unstructured.Unstructured) (string,
}

// syncWorkStatus will collect status of object referencing by key and update to work which holds the object.
func (c *WorkStatusController) syncWorkStatus(key util.QueueKey) error {
func (c *WorkStatusController) syncWorkStatus(key worker.QueueKey) error {
ctx := context.Background()
fedKey, ok := key.(keys.FederatedKey)
if !ok {
Expand Down
9 changes: 5 additions & 4 deletions pkg/controllers/status/work_status_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/karmada-io/karmada/pkg/util/gclient"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
"github.com/karmada-io/karmada/pkg/util/worker"
testhelper "github.com/karmada-io/karmada/test/helper"
)

Expand Down Expand Up @@ -325,7 +326,7 @@ func TestWorkStatusController_Reconcile(t *testing.T) {
}

func TestWorkStatusController_getEventHandler(t *testing.T) {
opt := util.Options{
opt := worker.Options{
Name: "opt",
KeyFunc: nil,
ReconcileFunc: nil,
Expand All @@ -339,7 +340,7 @@ func TestWorkStatusController_getEventHandler(t *testing.T) {
ClusterCacheSyncTimeout: metav1.Duration{},
RateLimiterOptions: ratelimiterflag.Options{},
eventHandler: nil,
worker: util.NewAsyncWorker(opt),
worker: worker.NewAsyncWorker(opt),
}

eventHandler := c.getEventHandler()
Expand Down Expand Up @@ -980,12 +981,12 @@ func TestWorkStatusController_registerInformersAndStart(t *testing.T) {
defer close(stopCh)
dynamicClientSet := dynamicfake.NewSimpleDynamicClient(scheme.Scheme)
c := newWorkStatusController(cluster)
opt := util.Options{
opt := worker.Options{
Name: "opt",
KeyFunc: nil,
ReconcileFunc: nil,
}
c.worker = util.NewAsyncWorker(opt)
c.worker = worker.NewAsyncWorker(opt)

workUID := "92345678-1234-5678-1234-567812345678"
raw := []byte(`{"apiVersion":"v1","kind":"Pod","metadata":{"name":"pod","namespace":"default"}}`)
Expand Down
Loading

0 comments on commit 050580f

Please sign in to comment.