Skip to content

Commit

Permalink
parallize the binding change (#667)
Browse files Browse the repository at this point in the history
Co-authored-by: Ryan Zhang <[email protected]>
  • Loading branch information
ryanzhang-oss and Ryan Zhang authored Feb 5, 2024
1 parent 0c9b86c commit 860835a
Showing 1 changed file with 44 additions and 16 deletions.
60 changes: 44 additions & 16 deletions pkg/scheduler/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,16 +268,17 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p
klog.ErrorS(err, "Failed to collect bindings", "clusterSchedulingPolicySnapshot", policyRef)
return ctrl.Result{}, err
}
klog.V(2).InfoS("listed all the existing bindings belong to one crp", "clusterSchedulingPolicySnapshot", policyRef, "latency", time.Since(startTime).Milliseconds())

// Parse the bindings, find out
//
// * bound bindings, i.e., bindings that are associated with a normally operating cluster and
// have been cleared for processing by the dispatcher; and
// have been cleared for processing by the rollout controller; and
// * scheduled bindings, i.e., bindings that have been associated with a normally operating cluster,
// but have not yet been cleared for processing by the dispatcher; and
// but have not yet been cleared for processing by the rollout controller; and
// * obsolete bindings, i.e., bindings that are scheduled in accordance with an out-of-date
// (i.e., no longer active) scheduling policy snapshot; it may or may have been cleared for
// processing by the dispatcher; and
// processing by the rollout controller; and
// * unscheduled bindings, i.e., bindings that are marked as unscheduled in the previous round
// of scheduling activity; it can either produced by the same or different policy snapshot; and
// * dangling bindings, i.e., bindings that are associated with a cluster that is no longer
Expand All @@ -299,7 +300,7 @@ func (f *framework) RunSchedulingCycleFor(ctx context.Context, crpName string, p
// Prepare the cycle state for this run.
//
// Note that this state is shared between all plugins and the scheduler framework itself (though some fields are reserved by
// the framework). These resevered fields are never accessed concurrently, as each scheduling run has its own cycle and a run
// the framework). These reserved fields are never accessed concurrently, as each scheduling run has its own cycle and a run
// is always executed in one single goroutine; plugin access to the state is guarded by sync.Map.
state := NewCycleState(clusters, obsolete, bound, scheduled)

Expand Down Expand Up @@ -662,26 +663,53 @@ func (f *framework) manipulateBindings(

// createBindings creates a list of new bindings.
func (f *framework) createBindings(ctx context.Context, toCreate []*placementv1beta1.ClusterResourceBinding) error {
// issue all the create requests in parallel
errs, cctx := errgroup.WithContext(ctx)
for _, binding := range toCreate {
// TO-DO (chenyu1): Add some jitters here to avoid swarming the API when there is a large number of
// bindings to create.
if err := f.client.Create(ctx, binding); err != nil {
return controller.NewCreateIgnoreAlreadyExistError(fmt.Errorf("failed to create binding %s: %w", binding.Name, err))
}
newBinding := binding
errs.Go(func() error {
return retry.OnError(retry.DefaultBackoff,
func(err error) bool {
return apierrors.IsServiceUnavailable(err) || apierrors.IsServerTimeout(err)
},
func() error {
err := f.client.Create(cctx, newBinding)
if err != nil {
if apierrors.IsAlreadyExists(err) {
// The binding already exists, which is fine.
return nil
}
klog.ErrorS(err, "failed to create a new binding", "clusterResourceBinding", klog.KObj(newBinding))
}
return err
})
})
}
return nil
return errs.Wait()
}

// patchBindings patches a list of existing bindings using JSON patch.
func (f *framework) patchBindings(ctx context.Context, toPatch []*bindingWithPatch) error {
// TODO (rzhang): issue those patches in parallel, retry if there is conflict
// issue all the patch requests in parallel
errs, cctx := errgroup.WithContext(ctx)
for _, bp := range toPatch {
// Use JSON patch to avoid races.
if err := f.client.Patch(ctx, bp.updated, bp.patch); err != nil {
return controller.NewUpdateIgnoreConflictError(fmt.Errorf("failed to patch binding %s: %w", bp.updated.Name, err))
}
patchBinding := bp
errs.Go(func() error {
return retry.OnError(retry.DefaultBackoff,
func(err error) bool {
return apierrors.IsServiceUnavailable(err) || apierrors.IsServerTimeout(err)
},
func() error {
// Use JSON patch to avoid races so we shouldn't get a conflict error.
err := f.client.Patch(cctx, patchBinding.updated, patchBinding.patch)
if err != nil {
klog.ErrorS(err, "failed to patch a binding", "clusterResourceBinding", klog.KObj(patchBinding.updated))
}
return err
})
})
}
return nil
return errs.Wait()
}

// updatePolicySnapshotStatusFromBindings updates the policy snapshot status, in accordance with the list of
Expand Down

0 comments on commit 860835a

Please sign in to comment.