Skip to content

Commit

Permalink
Reflect the VM statuses to the KubeVirtCluster status
Browse files Browse the repository at this point in the history
If not all the cluster VMs are ready, CAPK operator now sets the new
`AllMachinesAreReady` condition in the KubeVirtCluster CR to `False`.

Signed-off-by: Nahshon Unna-Tsameret <[email protected]>
  • Loading branch information
nunnatsa committed Dec 20, 2023
1 parent 35cc2f1 commit afad5d5
Show file tree
Hide file tree
Showing 10 changed files with 361 additions and 41 deletions.
4 changes: 3 additions & 1 deletion api/v1alpha1/kubevirtmachine_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type VirtualMachineBootstrapCheckSpec struct {
// KubevirtMachineStatus defines the observed state of KubevirtMachine.
type KubevirtMachineStatus struct {
// Ready denotes that the machine is ready
// +optional
// +kubebuilder:default=false
Ready bool `json:"ready"`

// LoadBalancerConfigured denotes that the machine has been
Expand Down Expand Up @@ -134,6 +134,8 @@ type KubevirtMachineStatus struct {
// +kubebuilder:object:root=true
// +kubebuilder:storageversion
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
// +kubebuilder:printcolumn:name="Ready",type="boolean",JSONPath=".status.ready",description="Is machine ready"

// KubevirtMachine is the Schema for the kubevirtmachines API.
type KubevirtMachine struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@ spec:
singular: kubevirtmachine
scope: Namespaced
versions:
- name: v1alpha1
- additionalPrinterColumns:
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
- description: Is machine ready
jsonPath: .status.ready
name: Ready
type: boolean
name: v1alpha1
schema:
openAPIV3Schema:
description: KubevirtMachine is the Schema for the kubevirtmachines API.
Expand Down Expand Up @@ -4614,8 +4622,11 @@ spec:
Node of this KubevirtMachine
type: boolean
ready:
default: false
description: Ready denotes that the machine is ready
type: boolean
required:
- ready
type: object
type: object
served: true
Expand Down
8 changes: 8 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ rules:
verbs:
- delete
- list
- apiGroups:
- cdi.kubevirt.io
resources:
- datavolumes
verbs:
- get
- list
- watch
- apiGroups:
- cluster.x-k8s.io
resources:
Expand Down
24 changes: 14 additions & 10 deletions controllers/kubevirtmachine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,13 @@ import (
"regexp"
"time"

"sigs.k8s.io/controller-runtime/pkg/builder"

"github.com/pkg/errors"
"gopkg.in/yaml.v3"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
infrav1 "sigs.k8s.io/cluster-api-provider-kubevirt/api/v1alpha1"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/context"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/infracluster"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/kubevirt"
kubevirthandler "sigs.k8s.io/cluster-api-provider-kubevirt/pkg/kubevirt"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/ssh"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/workloadcluster"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
capierrors "sigs.k8s.io/cluster-api/errors"
"sigs.k8s.io/cluster-api/util"
Expand All @@ -46,10 +37,19 @@ import (
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/cluster-api/util/predicates"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"

infrav1 "sigs.k8s.io/cluster-api-provider-kubevirt/api/v1alpha1"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/context"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/infracluster"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/kubevirt"
kubevirthandler "sigs.k8s.io/cluster-api-provider-kubevirt/pkg/kubevirt"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/ssh"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/workloadcluster"
)

// KubevirtMachineReconciler reconciles a KubevirtMachine object.
Expand All @@ -66,6 +66,7 @@ type KubevirtMachineReconciler struct {
// +kubebuilder:rbac:groups="",resources=secrets;,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=kubevirt.io,resources=virtualmachines;,verbs=get;create;update;patch;delete
// +kubebuilder:rbac:groups=kubevirt.io,resources=virtualmachineinstances;,verbs=get;delete
// +kubebuilder:rbac:groups=cdi.kubevirt.io,resources=datavolumes;,verbs=get;list;watch

// Reconcile handles KubevirtMachine events.
func (r *KubevirtMachineReconciler) Reconcile(goctx gocontext.Context, req ctrl.Request) (_ ctrl.Result, rerr error) {
Expand Down Expand Up @@ -277,6 +278,9 @@ func (r *KubevirtMachineReconciler) reconcileNormal(ctx *context.MachineContext)
// Mark VMProvisionedCondition to indicate that the VM has successfully started
conditions.MarkTrue(ctx.KubevirtMachine, infrav1.VMProvisionedCondition)
} else {
reason, message := externalMachine.GetVMNotReadyReason()
conditions.MarkFalse(ctx.KubevirtMachine, infrav1.VMProvisionedCondition, reason, clusterv1.ConditionSeverityInfo, message)

// Waiting for VM to boot
ctx.KubevirtMachine.Status.Ready = false
ctx.Logger.Info("KubeVirt VM is not fully provisioned and running...")
Expand Down Expand Up @@ -476,7 +480,7 @@ func (r *KubevirtMachineReconciler) reconcileDelete(ctx *context.MachineContext)

// SetupWithManager will add watches for this controller.
func (r *KubevirtMachineReconciler) SetupWithManager(goctx gocontext.Context, mgr ctrl.Manager, options controller.Options) error {
clusterToKubevirtMachines, err := util.ClusterToObjectsMapper(mgr.GetClient(), &infrav1.KubevirtMachineList{}, mgr.GetScheme())
clusterToKubevirtMachines, err := util.ClusterToTypedObjectsMapper(mgr.GetClient(), &infrav1.KubevirtMachineList{}, mgr.GetScheme())
if err != nil {
return err
}
Expand Down
38 changes: 28 additions & 10 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ import (
"flag"
"math/rand"
"os"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/webhookhandler"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"time"

"github.com/spf13/pflag"
Expand All @@ -35,23 +31,27 @@ import (
"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
kubevirtv1 "kubevirt.io/api/core/v1"
cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/feature"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
k8sclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"

infrav1 "sigs.k8s.io/cluster-api-provider-kubevirt/api/v1alpha1"
"sigs.k8s.io/cluster-api-provider-kubevirt/controllers"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/infracluster"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/kubevirt"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/webhookhandler"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/workloadcluster"
// +kubebuilder:scaffold:imports
)

var (
myscheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")

//flags.
Expand All @@ -67,12 +67,24 @@ var (

func init() {
klog.InitFlags(nil)
}

_ = scheme.AddToScheme(myscheme)
_ = infrav1.AddToScheme(myscheme)
_ = clusterv1.AddToScheme(myscheme)
_ = kubevirtv1.AddToScheme(myscheme)
// +kubebuilder:scaffold:scheme
func registerScheme() (*runtime.Scheme, error) {
myscheme := runtime.NewScheme()

for _, f := range []func(*runtime.Scheme) error{
scheme.AddToScheme,
infrav1.AddToScheme,
clusterv1.AddToScheme,
kubevirtv1.AddToScheme,
cdiv1.AddToScheme,
// +kubebuilder:scaffold:scheme
} {
if err := f(myscheme); err != nil {
return nil, err
}
}
return myscheme, nil
}

func initFlags(fs *pflag.FlagSet) {
Expand Down Expand Up @@ -106,6 +118,12 @@ func main() {

ctrl.SetLogger(klogr.New())

myscheme, err := registerScheme()
if err != nil {
setupLog.Error(err, "can't register scheme")
os.Exit(1)
}

var defaultNamespaces map[string]cache.Config
if watchNamespace != "" {
setupLog.Info("Watching cluster-api objects only in namespace for reconciliation", "namespace", watchNamespace)
Expand Down
94 changes: 92 additions & 2 deletions pkg/kubevirt/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,25 @@ package kubevirt
import (
gocontext "context"
"fmt"
"time"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
kubedrain "k8s.io/kubectl/pkg/drain"
kubevirtv1 "kubevirt.io/api/core/v1"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/workloadcluster"
cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/noderefutil"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"time"

infrav1 "sigs.k8s.io/cluster-api-provider-kubevirt/api/v1alpha1"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/context"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/ssh"
"sigs.k8s.io/cluster-api-provider-kubevirt/pkg/workloadcluster"
)

const (
Expand All @@ -49,6 +51,7 @@ type Machine struct {
machineContext *context.MachineContext
vmiInstance *kubevirtv1.VirtualMachineInstance
vmInstance *kubevirtv1.VirtualMachine
dataVolumes []*cdiv1.DataVolume

sshKeys *ssh.ClusterNodeSshKeys
getCommandExecutor func(string, *ssh.ClusterNodeSshKeys) ssh.VMCommandExecutor
Expand All @@ -63,6 +66,7 @@ func NewMachine(ctx *context.MachineContext, client client.Client, namespace str
vmiInstance: nil,
vmInstance: nil,
sshKeys: sshKeys,
dataVolumes: nil,
getCommandExecutor: ssh.NewVMCommandExecutor,
}

Expand Down Expand Up @@ -90,6 +94,20 @@ func NewMachine(ctx *context.MachineContext, client client.Client, namespace str
machine.vmInstance = vm
}

if vm != nil {
for _, dvTemp := range vm.Spec.DataVolumeTemplates {
dv := &cdiv1.DataVolume{}
err = client.Get(ctx.Context, types.NamespacedName{Name: dvTemp.ObjectMeta.Name, Namespace: namespace}, dv)
if err != nil {
if !apierrors.IsNotFound(err) {
return nil, err
}
} else {
machine.dataVolumes = append(machine.dataVolumes, dv)
}
}
}

return machine, nil
}

Expand Down Expand Up @@ -219,6 +237,78 @@ func (m *Machine) IsReady() bool {
return m.hasReadyCondition()
}

const (
defaultCondReason = "VMNotReady"
defaultCondMessage = "VM is not ready"
)

func (m *Machine) GetVMNotReadyReason() (reason string, message string) {
reason = defaultCondReason

if m.vmInstance == nil {
message = defaultCondMessage
return
}

message = fmt.Sprintf("%s: %s", defaultCondMessage, m.vmInstance.Status.PrintableStatus)

cond := m.getVMCondition(kubevirtv1.VirtualMachineConditionType(corev1.PodScheduled))
if cond != nil {
if cond.Status == corev1.ConditionTrue {
return
} else if cond.Status == corev1.ConditionFalse {
if cond.Reason == "Unschedulable" {
return "Unschedulable", cond.Message
}
}
}

for _, dv := range m.dataVolumes {
dvReason, dvMessage, foundDVReason := m.getDVNotProvisionedReason(dv)
if foundDVReason {
return dvReason, dvMessage
}
}

return
}

func (m *Machine) getDVNotProvisionedReason(dv *cdiv1.DataVolume) (string, string, bool) {
msg := fmt.Sprintf("DataVolume %s is not ready; Phase: %s", dv.Name, dv.Status.Phase)
switch dv.Status.Phase {
case cdiv1.Succeeded: // DV's OK, return default reason & message
return "", "", false
case cdiv1.Pending:
return "DVPending", msg, true
case cdiv1.Failed:
return "DVFailed", msg, true
default:
for _, dvCond := range dv.Status.Conditions {
if dvCond.Type == cdiv1.DataVolumeRunning {
if dvCond.Status == corev1.ConditionFalse {
msg = fmt.Sprintf("DataVolume %s import is not running: %s", dv.Name, dvCond.Message)
}
break
}
}
return "DVNotReady", msg, true
}
}

func (m *Machine) getVMCondition(t kubevirtv1.VirtualMachineConditionType) *kubevirtv1.VirtualMachineCondition {
if m.vmInstance == nil {
return nil
}

for _, cond := range m.vmInstance.Status.Conditions {
if cond.Type == t {
return cond.DeepCopy()
}
}

return nil
}

// SupportsCheckingIsBootstrapped checks if we have a method of checking
// that this bootstrapper has completed.
func (m *Machine) SupportsCheckingIsBootstrapped() bool {
Expand Down
3 changes: 3 additions & 0 deletions pkg/kubevirt/machine_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type MachineInterface interface {
IsTerminal() (bool, string, error)

DrainNodeIfNeeded(workloadcluster.WorkloadCluster) (time.Duration, error)

// GetVMUnscheduledReason returns the reason and message for the condition, if the VM is not ready
GetVMNotReadyReason() (string, string)
}

// MachineFactory allows creating new instances of kubevirt.machine
Expand Down
Loading

0 comments on commit afad5d5

Please sign in to comment.