Skip to content

Commit

Permalink
feat: support volume expansion (#567)
Browse files Browse the repository at this point in the history
* feat: support volume expansion

Signed-off-by: Aylei <[email protected]>

* Update pkg/controllers/common/volume.go

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* tidy

Signed-off-by: Aylei <[email protected]>

* fix e2e

Signed-off-by: Aylei <[email protected]>

---------

Signed-off-by: Aylei <[email protected]>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
aylei and github-actions[bot] authored Nov 20, 2024
1 parent 581017c commit 684a75a
Show file tree
Hide file tree
Showing 12 changed files with 181 additions and 10 deletions.
2 changes: 1 addition & 1 deletion examples/mo-cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ metadata:
name: mo
spec:
imageRepository: matrixorigin/matrixone
version: #TAG
version: 1.2.3
logService:
replicas: 3
sharedStorage:
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
k8s.io/apiserver v0.27.2
k8s.io/client-go v0.27.2
k8s.io/component-helpers v0.27.2
k8s.io/klog v1.0.0
k8s.io/kubelet v0.27.2
k8s.io/kubernetes v1.27.2
k8s.io/utils v0.0.0-20230209194617-a36077c30491
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8b
github.com/go-errors/errors v1.5.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY=
Expand Down Expand Up @@ -869,6 +870,8 @@ k8s.io/component-base v0.27.2 h1:neju+7s/r5O4x4/txeUONNTS9r1HsPbyoPBAtHsDCpo=
k8s.io/component-base v0.27.2/go.mod h1:5UPk7EjfgrfgRIuDBFtsEFAe4DAvP3U+M8RTzoSJkpo=
k8s.io/component-helpers v0.27.2 h1:i9TgWJ6TH8lQ9x4ExHOwhVitrRpBOr7Wn8aZLbBWxkc=
k8s.io/component-helpers v0.27.2/go.mod h1:NwcpSKo1xzXtUtrUjj5NTSVWex84UPua/z0PYDcCzNo=
k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw=
k8s.io/klog/v2 v2.90.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f h1:2kWPakN3i/k81b0gvD5C5FJ2kxm1WrQFanWchyKuqGg=
Expand Down
10 changes: 7 additions & 3 deletions pkg/controllers/cnset/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ func (c *Actor) Observe(ctx *recon.Context[*v1alpha1.CNSet]) (recon.Action[*v1al
return c.with(cs).Scale, nil
}

if cn.Spec.CacheVolume != nil {
if err := common.SyncCloneSetVolumeSize(ctx, cn, cn.Spec.CacheVolume.Size, cs); err != nil {
return nil, errors.WrapPrefix(err, "sync volume size", 0)
}
}

if recon.IsReady(&cn.Status.ConditionalStatus) {
cn.Status.Host = fmt.Sprintf("%s.%s", svc.Name, svc.Namespace)
cn.Status.Port = CNSQLPort
Expand Down Expand Up @@ -263,6 +269,7 @@ func (c *Actor) Create(ctx *recon.Context[*v1alpha1.CNSet]) error {
if err := syncCloneSet(ctx, cnSet); err != nil {
return errors.WrapPrefix(err, "sync clone set", 0)
}
syncPersistentVolumeClaim(cn, cnSet)

// create all resources
err := lo.Reduce[client.Object, error]([]client.Object{
Expand Down Expand Up @@ -331,9 +338,6 @@ func syncCloneSet(ctx *recon.Context[*v1alpha1.CNSet], cs *kruisev1alpha1.CloneS
if ctx.Dep != nil {
syncPodSpec(ctx.Obj, cs, ctx.Dep.Deps.LogSet.Spec.SharedStorage)
}
// support update cacheVolume, NOTE: pvc only updated when pod rolling updated
// ref: https://openkruise.io/zh/docs/next/user-manuals/cloneset/#%E6%94%AF%E6%8C%81-pvc-%E6%A8%A1%E6%9D%BF
syncPersistentVolumeClaim(cn, cs)
if pooling {
if cs.Annotations == nil {
cs.Annotations = map[string]string{}
Expand Down
125 changes: 125 additions & 0 deletions pkg/controllers/common/volume.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2024 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"fmt"

"github.com/go-errors/errors"
recon "github.com/matrixorigin/controller-runtime/pkg/reconciler"
kruisev1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
kruisev1 "github.com/openkruise/kruise-api/apps/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func SyncCloneSetVolumeSize(kubeCli recon.KubeClient, owner client.Object, size resource.Quantity, cs *kruisev1alpha1.CloneSet) error {
var changed bool
for i := range cs.Spec.VolumeClaimTemplates {
if cs.Spec.VolumeClaimTemplates[i].Name == DataVolume {
oldSize := cs.Spec.VolumeClaimTemplates[i].Spec.Resources.Requests[corev1.ResourceStorage]
c := oldSize.Cmp(size)
if c < 0 {
changed = true
cs.Spec.VolumeClaimTemplates[i].Spec.Resources.Requests[corev1.ResourceStorage] = size
} else if c > 0 {
return errors.New(fmt.Sprintf("volume size cannot be decreased from %s to %s", oldSize.String(), size.String()))
}
}
}
if !changed {
return nil
}
podList := &corev1.PodList{}
err := kubeCli.List(podList, client.InNamespace(owner.GetNamespace()), client.MatchingLabels(SubResourceLabels(owner)))
if err != nil {
return errors.WrapPrefix(err, "list pods", 0)
}
for i := range podList.Items {
pod := &podList.Items[i]
instanceId := pod.Labels[kruisev1alpha1.CloneSetInstanceID]
if instanceId == "" {
continue
}
pvcList := &corev1.PersistentVolumeClaimList{}
err := kubeCli.List(pvcList, client.InNamespace(owner.GetNamespace()), client.MatchingLabels(map[string]string{
kruisev1alpha1.CloneSetInstanceID: instanceId,
}))

klog.Infof("sync volume size for %s, pvc list: %v", owner.GetName(), pvcList.Items)
if err != nil {
return errors.WrapPrefix(err, "list volumes", 0)
}
for j := range pvcList.Items {
pvc := &pvcList.Items[j]
current := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
if current.Cmp(size) < 0 {
if err := kubeCli.Patch(pvc, func() error {
pvc.Spec.Resources.Requests[corev1.ResourceStorage] = size
return nil
}); err != nil {
return errors.WrapPrefix(err, "patch volume size", 0)
}
}
}
}
if err := kubeCli.Update(cs); err != nil {
return errors.WrapPrefix(err, "sync volume size", 0)
}
return nil
}

// SyncStsVolumeSize syncs the volume size of component backed by kruise statefuset
func SyncStsVolumeSize(kubeCli recon.KubeClient, owner client.Object, size resource.Quantity, sts *kruisev1.StatefulSet) error {
var changed bool
for i := range sts.Spec.VolumeClaimTemplates {
if sts.Spec.VolumeClaimTemplates[i].Name == DataVolume {
oldSize := sts.Spec.VolumeClaimTemplates[i].Spec.Resources.Requests[corev1.ResourceStorage]
c := oldSize.Cmp(size)
if c < 0 {
changed = true
sts.Spec.VolumeClaimTemplates[i].Spec.Resources.Requests[corev1.ResourceStorage] = size
} else if c > 0 {
return errors.New(fmt.Sprintf("volume size cannot be decreased from %s to %s", oldSize.String(), size.String()))
}
}
}
if !changed {
return nil
}
pvcList := &corev1.PersistentVolumeClaimList{}
err := kubeCli.List(pvcList, client.InNamespace(owner.GetNamespace()), client.MatchingLabels(SubResourceLabels(owner)))
if err != nil {
return errors.WrapPrefix(err, "list volumes", 0)
}
for i := range pvcList.Items {
pvc := &pvcList.Items[i]
current := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
if current.Cmp(size) < 0 {
if err := kubeCli.Patch(pvc, func() error {
pvc.Spec.Resources.Requests[corev1.ResourceStorage] = size
return nil
}); err != nil {
return errors.WrapPrefix(err, "patch volume size", 0)
}
}
}
if err := kubeCli.Update(sts); err != nil {
return errors.WrapPrefix(err, "sync volume size", 0)
}
return nil
}
10 changes: 8 additions & 2 deletions pkg/controllers/dnset/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
package dnset

import (
"github.com/matrixorigin/matrixone-operator/api/features"
"github.com/matrixorigin/matrixone-operator/pkg/utils"
"strconv"
"time"

"github.com/matrixorigin/matrixone-operator/api/features"
"github.com/matrixorigin/matrixone-operator/pkg/utils"

"github.com/go-errors/errors"
recon "github.com/matrixorigin/controller-runtime/pkg/reconciler"
"github.com/matrixorigin/controller-runtime/pkg/util"
Expand Down Expand Up @@ -132,6 +133,11 @@ func (d *Actor) Observe(ctx *recon.Context[*v1alpha1.DNSet]) (recon.Action[*v1al
return d.with(sts, svc).Update, nil
}

if dn.Spec.CacheVolume != nil {
if err := common.SyncStsVolumeSize(ctx, dn, dn.Spec.CacheVolume.Size, sts); err != nil {
return nil, errors.WrapPrefix(err, "sync volume size", 0)
}
}
if err := d.syncMetricService(ctx); err != nil {
return nil, errors.WrapPrefix(err, "sync metric service", 0)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/controllers/logset/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ func (r *Actor) Observe(ctx *recon.Context[*v1alpha1.LogSet]) (recon.Action[*v1a
return r.with(sts).Update, nil
}

if err = common.SyncStsVolumeSize(ctx, ls, ls.Spec.Volume.Size, sts); err != nil {
return nil, errors.WrapPrefix(err, "sync volume size", 0)
}

if err = r.syncBucketClaim(ctx, sts); err != nil {
return nil, errors.WrapPrefix(err, "sync bucket claim", 0)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/webhook/cnset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func (c *cnSetValidator) ValidateUpdate(ctx context.Context, oldObj, newObj runt
return nil, unexpectedKindError("CNSet", newObj)
}
errs = append(errs, validatePodSetUpdate(&oldCN.Spec.PodSet, &newCN.Spec.PodSet, field.NewPath("spec"))...)
errs = append(errs, validateVolumeUpdate(oldCN.Spec.CacheVolume, newCN.Spec.CacheVolume, field.NewPath("spec").Child("cacheVolume"))...)
return nil, invalidOrNil(errs, newCN)
}

Expand Down
14 changes: 14 additions & 0 deletions pkg/webhook/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,3 +297,17 @@ func validatePodSetUpdate(oldPodSet, newPodSet *v1alpha1.PodSet, path *field.Pat
}
return errs
}

func validateVolumeUpdate(oldVolume, newVolume *v1alpha1.Volume, path *field.Path) field.ErrorList {
var errs field.ErrorList
if oldVolume == nil || newVolume == nil {
return nil
}
if newVolume.StorageClassName != nil && oldVolume.StorageClassName != nil && *newVolume.StorageClassName != *oldVolume.StorageClassName {
errs = append(errs, field.Invalid(path.Child("storageClassName"), *newVolume.StorageClassName, "storageClassName is immutable"))
}
if newVolume.Size.Cmp(oldVolume.Size) < 0 {
errs = append(errs, field.Invalid(path.Child("size"), newVolume.Size, "volume size cannot be decreased"))
}
return errs
}
14 changes: 12 additions & 2 deletions pkg/webhook/dnset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,22 @@ func (d *dnSetValidator) ValidateCreate(_ context.Context, obj runtime.Object) (
return nil, invalidOrNil(errs, dnSet)
}

func (d *dnSetValidator) ValidateUpdate(ctx context.Context, _, newObj runtime.Object) (warnings admission.Warnings, err error) {
func (d *dnSetValidator) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (warnings admission.Warnings, err error) {
var errs field.ErrorList
warnings, err = d.ValidateCreate(ctx, newObj)
if err != nil {
return warnings, err
}
return warnings, nil
oldDN, ok := oldObj.(*v1alpha1.DNSet)
if !ok {
return nil, unexpectedKindError("DNSet", oldObj)
}
newDN, ok := newObj.(*v1alpha1.DNSet)
if !ok {
return nil, unexpectedKindError("DNSet", newObj)
}
errs = append(errs, validateVolumeUpdate(oldDN.Spec.CacheVolume, newDN.Spec.CacheVolume, field.NewPath("spec").Child("cacheVolume"))...)
return warnings, invalidOrNil(errs, newDN)
}

func (d *dnSetValidator) ValidateDelete(_ context.Context, _ runtime.Object) (warnings admission.Warnings, err error) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/webhook/logset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,11 @@ func (l *logSetValidator) ValidateCreate(_ context.Context, obj runtime.Object)
}

func (l *logSetValidator) ValidateUpdate(_ context.Context, oldObj, newObj runtime.Object) (warnings admission.Warnings, err error) {
var errs field.ErrorList
old := oldObj.(*v1alpha1.LogSet)
logSet := newObj.(*v1alpha1.LogSet)
errs := l.ValidateSpecUpdate(&old.Spec, &logSet.Spec, logSet.ObjectMeta)
errs = append(errs, l.ValidateSpecUpdate(&old.Spec, &logSet.Spec, logSet.ObjectMeta)...)
errs = append(errs, validateVolumeUpdate(&old.Spec.Volume, &logSet.Spec.Volume, field.NewPath("spec").Child("volume"))...)
return nil, invalidOrNil(errs, logSet)
}

Expand Down
3 changes: 2 additions & 1 deletion test/e2e/claim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package e2e
import (
"context"
"fmt"
"time"

"github.com/matrixorigin/controller-runtime/pkg/util"
"github.com/matrixorigin/matrixone-operator/api/core/v1alpha1"
. "github.com/onsi/ginkgo/v2"
Expand All @@ -27,7 +29,6 @@ import (
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"time"
)

const (
Expand Down

0 comments on commit 684a75a

Please sign in to comment.