Skip to content

feat: create multiple resource snapshots for same group index #642

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jan 11, 2024
133 changes: 108 additions & 25 deletions pkg/controllers/clusterresourceplacement/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,6 @@ func (r *Reconciler) deleteRedundantResourceSnapshots(ctx context.Context, crp *
return nil
}

// TODO handle all the resources selected by placement larger than 1MB size limit of k8s objects.
func (r *Reconciler) getOrCreateClusterResourceSnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement, envelopeObjCount int, resourceSnapshotSpec *fleetv1beta1.ResourceSnapshotSpec, revisionHistoryLimit int) (*fleetv1beta1.ClusterResourceSnapshot, error) {
resourceHash, err := generateResourceHash(resourceSnapshotSpec)
crpKObj := klog.KObj(crp)
Expand All @@ -424,24 +423,55 @@ func (r *Reconciler) getOrCreateClusterResourceSnapshot(ctx context.Context, crp
}

latestResourceSnapshotHash := ""
numberOfSnapshots := -1
if latestResourceSnapshot != nil {
latestResourceSnapshotHash, err = parseResourceGroupHashFromAnnotation(latestResourceSnapshot)
if err != nil {
klog.ErrorS(err, "Failed to get the ResourceGroupHashAnnotation", "clusterResourceSnapshot", klog.KObj(latestResourceSnapshot))
return nil, controller.NewUnexpectedBehaviorError(err)
}
numberOfSnapshots, err = annotations.ExtractNumberOfResourceSnapshotsFromResourceSnapshot(latestResourceSnapshot)
if err != nil {
klog.ErrorS(err, "Failed to get the NumberOfResourceSnapshotsAnnotation", "clusterResourceSnapshot", klog.KObj(latestResourceSnapshot))
return nil, controller.NewUnexpectedBehaviorError(err)
}
}

shouldCreateNewMasterClusterSnapshot := true
// This index indicates the selected resource in the split selectedResourceList, if this index is zero we start
// from creating the master clusterResourceSnapshot if it's greater than zero it means that the master clusterResourceSnapshot
// got created but not all sub-indexed clusterResourceSnapshots have been created yet. It covers the corner case where the
// controller crashes in the middle.
resourceSnapshotStartIndex := 0
if latestResourceSnapshot != nil && latestResourceSnapshotHash == resourceHash {
if err := r.ensureLatestResourceSnapshot(ctx, latestResourceSnapshot); err != nil {
return nil, err
}
klog.V(2).InfoS("Resources have not been changed and updated the existing clusterResourceSnapshot", "clusterResourcePlacement", crpKObj, "clusterResourceSnapshot", klog.KObj(latestResourceSnapshot))
return latestResourceSnapshot, nil
// check to see all that the master cluster resource snapshot and sub-indexed snapshots belonging to the same group index exists.
latestGroupResourceLabelMatcher := client.MatchingLabels{
fleetv1beta1.ResourceIndexLabel: latestResourceSnapshot.Labels[fleetv1beta1.ResourceIndexLabel],
fleetv1beta1.CRPTrackingLabel: crp.Name,
}
resourceSnapshotList := &fleetv1beta1.ClusterResourceSnapshotList{}
if err := r.Client.List(ctx, resourceSnapshotList, latestGroupResourceLabelMatcher); err != nil {
klog.ErrorS(err, "Failed to list the latest group clusterResourceSnapshots associated with the clusterResourcePlacement",
"clusterResourcePlacement", crp.Name)
return nil, controller.NewAPIServerError(true, err)
}
if len(resourceSnapshotList.Items) == numberOfSnapshots {
klog.V(2).InfoS("ClusterResourceSnapshots have not changed", "clusterResourcePlacement", crpKObj, "clusterResourceSnapshot", klog.KObj(latestResourceSnapshot))
return latestResourceSnapshot, nil
}
// we should not create a new master cluster resource snapshot.
shouldCreateNewMasterClusterSnapshot = false
// set resourceSnapshotStartIndex to start from this index, so we don't try to recreate existing sub-indexed cluster resource snapshots.
resourceSnapshotStartIndex = len(resourceSnapshotList.Items)
}

// Need to create new snapshot when 1) there is no snapshots or 2) the latest snapshot hash != current one.
// mark the last resource snapshot as inactive if it is different from what we have now
// mark the last resource snapshot as inactive if it is different from what we have now or 3) when some
// sub-indexed cluster resource snapshots belonging to the same group have not been created, the master
// cluster resource snapshot should exist and be latest.
if latestResourceSnapshot != nil &&
latestResourceSnapshotHash != resourceHash &&
latestResourceSnapshot.Labels[fleetv1beta1.IsLatestSnapshotLabel] == strconv.FormatBool(true) {
Expand All @@ -453,44 +483,96 @@ func (r *Reconciler) getOrCreateClusterResourceSnapshot(ctx context.Context, crp
}
klog.V(2).InfoS("Marked the existing clusterResourceSnapshot as inactive", "clusterResourcePlacement", crpKObj, "clusterResourceSnapshot", klog.KObj(latestResourceSnapshot))
}
// delete redundant snapshot revisions before creating a new snapshot to guarantee that the number of snapshots
// won't exceed the limit.
if err := r.deleteRedundantResourceSnapshots(ctx, crp, revisionHistoryLimit); err != nil {
return nil, err

// only delete redundant resource snapshots and increment the latest resource snapshot index if new master cluster resource snapshot is to be created.
if shouldCreateNewMasterClusterSnapshot {
// delete redundant snapshot revisions before creating a new master cluster resource snapshot to guarantee that the number of snapshots
// won't exceed the limit.
if err := r.deleteRedundantResourceSnapshots(ctx, crp, revisionHistoryLimit); err != nil {
return nil, err
}
latestResourceSnapshotIndex++
}
// split selected resources as list of lists.
selectedResourcesList := splitSelectedResources(resourceSnapshotSpec.SelectedResources)
var resourceSnapshot *fleetv1beta1.ClusterResourceSnapshot
for i := resourceSnapshotStartIndex; i < len(selectedResourcesList); i++ {
if i == 0 {
resourceSnapshot = buildMasterClusterResourceSnapshot(latestResourceSnapshotIndex, len(selectedResourcesList), envelopeObjCount, crp.Name, resourceHash, selectedResourcesList[i])
latestResourceSnapshot = resourceSnapshot
} else {
resourceSnapshot = buildSubIndexResourceSnapshot(latestResourceSnapshotIndex, i-1, crp.Name, selectedResourcesList[i])
}
if err = r.createResourceSnapshot(ctx, crp, resourceSnapshot); err != nil {
return nil, err
}
}
// shouldCreateNewMasterClusterSnapshot is used here to be defensive in case of the regression.
if shouldCreateNewMasterClusterSnapshot && len(selectedResourcesList) == 0 {
resourceSnapshot = buildMasterClusterResourceSnapshot(latestResourceSnapshotIndex, 1, envelopeObjCount, crp.Name, resourceHash, []fleetv1beta1.ResourceContent{})
latestResourceSnapshot = resourceSnapshot
if err = r.createResourceSnapshot(ctx, crp, resourceSnapshot); err != nil {
return nil, err
}
}
return latestResourceSnapshot, nil
}

// create a new resource snapshot
latestResourceSnapshotIndex++
latestResourceSnapshot = &fleetv1beta1.ClusterResourceSnapshot{
// buildMasterClusterResourceSnapshot builds and returns the master cluster resource snapshot for the latest resource snapshot index and selected resources.
func buildMasterClusterResourceSnapshot(latestResourceSnapshotIndex, resourceSnapshotCount, envelopeObjCount int, crpName, resourceHash string, selectedResources []fleetv1beta1.ResourceContent) *fleetv1beta1.ClusterResourceSnapshot {
return &fleetv1beta1.ClusterResourceSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf(fleetv1beta1.ResourceSnapshotNameFmt, crp.Name, latestResourceSnapshotIndex),
Name: fmt.Sprintf(fleetv1beta1.ResourceSnapshotNameFmt, crpName, latestResourceSnapshotIndex),
Labels: map[string]string{
fleetv1beta1.CRPTrackingLabel: crp.Name,
fleetv1beta1.CRPTrackingLabel: crpName,
fleetv1beta1.IsLatestSnapshotLabel: strconv.FormatBool(true),
fleetv1beta1.ResourceIndexLabel: strconv.Itoa(latestResourceSnapshotIndex),
},
Annotations: map[string]string{
fleetv1beta1.ResourceGroupHashAnnotation: resourceHash,
// TODO: need to update this once we support multiple snapshots
fleetv1beta1.NumberOfResourceSnapshotsAnnotation: "1",
fleetv1beta1.ResourceGroupHashAnnotation: resourceHash,
fleetv1beta1.NumberOfResourceSnapshotsAnnotation: strconv.Itoa(resourceSnapshotCount),
fleetv1beta1.NumberOfEnvelopedObjectsAnnotation: strconv.Itoa(envelopeObjCount),
},
},
Spec: *resourceSnapshotSpec,
Spec: fleetv1beta1.ResourceSnapshotSpec{
SelectedResources: selectedResources,
},
}
resourceSnapshotKObj := klog.KObj(latestResourceSnapshot)
if err := controllerutil.SetControllerReference(crp, latestResourceSnapshot, r.Scheme); err != nil {
}

// buildSubIndexResourceSnapshot builds and returns the sub index resource snapshot for the latestResourceSnapshotIndex, sub index and selected resources.
func buildSubIndexResourceSnapshot(latestResourceSnapshotIndex, resourceSnapshotSubIndex int, crpName string, selectedResources []fleetv1beta1.ResourceContent) *fleetv1beta1.ClusterResourceSnapshot {
return &fleetv1beta1.ClusterResourceSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf(fleetv1beta1.ResourceSnapshotNameWithSubindexFmt, crpName, latestResourceSnapshotIndex, resourceSnapshotSubIndex),
Labels: map[string]string{
fleetv1beta1.CRPTrackingLabel: crpName,
fleetv1beta1.ResourceIndexLabel: strconv.Itoa(latestResourceSnapshotIndex),
},
Annotations: map[string]string{
fleetv1beta1.SubindexOfResourceSnapshotAnnotation: strconv.Itoa(resourceSnapshotSubIndex),
},
},
Spec: fleetv1beta1.ResourceSnapshotSpec{
SelectedResources: selectedResources,
},
}
}

// createResourceSnapshot sets ClusterResourcePlacement owner reference on the ClusterResourceSnapshot and create it.
func (r *Reconciler) createResourceSnapshot(ctx context.Context, crp *fleetv1beta1.ClusterResourcePlacement, rs *fleetv1beta1.ClusterResourceSnapshot) error {
resourceSnapshotKObj := klog.KObj(rs)
if err := controllerutil.SetControllerReference(crp, rs, r.Scheme); err != nil {
klog.ErrorS(err, "Failed to set owner reference", "clusterResourceSnapshot", resourceSnapshotKObj)
// should never happen
return nil, controller.NewUnexpectedBehaviorError(err)
return controller.NewUnexpectedBehaviorError(err)
}

if err := r.Client.Create(ctx, latestResourceSnapshot); err != nil {
if err := r.Client.Create(ctx, rs); err != nil {
klog.ErrorS(err, "Failed to create new clusterResourceSnapshot", "clusterResourceSnapshot", resourceSnapshotKObj)
return nil, controller.NewAPIServerError(false, err)
return controller.NewAPIServerError(false, err)
}
klog.V(2).InfoS("Created new clusterResourceSnapshot", "clusterResourcePlacement", klog.KObj(crp), "clusterSchedulingPolicySnapshot", resourceSnapshotKObj)
return latestResourceSnapshot, nil
klog.V(2).InfoS("Created new clusterResourceSnapshot", "clusterResourcePlacement", klog.KObj(crp), "clusterResourceSnapshot", resourceSnapshotKObj)
return nil
}

// splitSelectedResources splits selected resources in a ClusterResourcePlacement into separate lists
Expand Down Expand Up @@ -584,6 +666,7 @@ func (r *Reconciler) ensureLatestResourceSnapshot(ctx context.Context, latest *f
klog.ErrorS(err, "Failed to update the clusterResourceSnapshot", "ClusterResourceSnapshot", klog.KObj(latest))
return controller.NewUpdateIgnoreConflictError(err)
}
klog.V(2).InfoS("ClusterResourceSnapshot's IsLatestSnapshotLabel was updated to true", "clusterResourceSnapshot", klog.KObj(latest))
return nil
}

Expand Down
Loading