diff --git a/app/post_upgrade.go b/app/post_upgrade.go index 68df7dc945..6e69e958d6 100644 --- a/app/post_upgrade.go +++ b/app/post_upgrade.go @@ -10,16 +10,24 @@ import ( "github.com/sirupsen/logrus" "github.com/urfave/cli" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/record" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" + "github.com/longhorn/longhorn-manager/constant" "github.com/longhorn/longhorn-manager/types" + + longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2" ) const ( + PostUpgradeEventer = "longhorn-post-upgrade" + RetryCounts = 360 RetryInterval = 5 * time.Second ) @@ -33,8 +41,10 @@ func PostUpgradeCmd() cli.Command { Usage: "Specify path to kube config (optional)", }, cli.StringFlag{ - Name: FlagNamespace, - EnvVar: types.EnvPodNamespace, + Name: FlagNamespace, + EnvVar: types.EnvPodNamespace, + Required: true, + Usage: "Specify Longhorn namespace", }, }, Action: func(c *cli.Context) { @@ -50,34 +60,61 @@ func PostUpgradeCmd() cli.Command { func postUpgrade(c *cli.Context) error { namespace := c.String(FlagNamespace) - if namespace == "" { - return errors.New("namespace is required") - } config, err := clientcmd.BuildConfigFromFlags("", c.String(FlagKubeConfig)) if err != nil { return errors.Wrap(err, "failed to get client config") } + eventBroadcaster, err := createEventBroadcaster(config) + if err != nil { + return errors.Wrap(err, "failed to create event broadcaster") + } + defer eventBroadcaster.Shutdown() + + scheme := runtime.NewScheme() + if err := longhorn.SchemeBuilder.AddToScheme(scheme); err != nil { + return errors.Wrap(err, "failed to create scheme") + } + + eventRecorder := eventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: PostUpgradeEventer}) + kubeClient, err := kubernetes.NewForConfig(config) if err != nil { return errors.Wrap(err, "failed to get k8s client") } - return newPostUpgrader(namespace, kubeClient).Run() + err = newPostUpgrader(namespace, kubeClient, eventRecorder).Run() + if err != nil { + logrus.Warnf("Done with Run() ... err is %v", err) + } + + return err } type postUpgrader struct { - namespace string - kubeClient kubernetes.Interface + namespace string + kubeClient kubernetes.Interface + eventRecorder record.EventRecorder } -func newPostUpgrader(namespace string, kubeClient kubernetes.Interface) *postUpgrader { - return &postUpgrader{namespace, kubeClient} +func newPostUpgrader(namespace string, kubeClient kubernetes.Interface, eventRecorder record.EventRecorder) *postUpgrader { + return &postUpgrader{namespace, kubeClient, eventRecorder} } func (u *postUpgrader) Run() error { - if err := u.waitManagerUpgradeComplete(); err != nil { + var err error + defer func() { + if err != nil { + u.eventRecorder.Event(&corev1.ObjectReference{Namespace: u.namespace, Name: PostUpgradeEventer}, + corev1.EventTypeWarning, constant.EventReasonFailedUpgradePostCheck, err.Error()) + } else { + u.eventRecorder.Event(&corev1.ObjectReference{Namespace: u.namespace, Name: PostUpgradeEventer}, + corev1.EventTypeNormal, constant.EventReasonPassedUpgradeCheck, "post-upgrade check passed") + } + }() + + if err = u.waitManagerUpgradeComplete(); err != nil { return err } diff --git a/app/pre_upgrade.go b/app/pre_upgrade.go index 47eeb4ad58..8bb26527d1 100644 --- a/app/pre_upgrade.go +++ b/app/pre_upgrade.go @@ -5,14 +5,24 @@ import ( "github.com/sirupsen/logrus" "github.com/urfave/cli" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/record" + corev1 "k8s.io/api/core/v1" + + "github.com/longhorn/longhorn-manager/constant" "github.com/longhorn/longhorn-manager/types" + longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2" lhclientset "github.com/longhorn/longhorn-manager/k8s/pkg/client/clientset/versioned" upgradeutil "github.com/longhorn/longhorn-manager/upgrade/util" ) +const ( + PreUpgradeEventer = "longhorn-pre-upgrade" +) + func PreUpgradeCmd() cli.Command { return cli.Command{ Name: "pre-upgrade", @@ -29,8 +39,8 @@ func PreUpgradeCmd() cli.Command { }, }, Action: func(c *cli.Context) { - logrus.Infof("Running pre-upgrade...") - defer logrus.Infof("Completed pre-upgrade.") + logrus.Info("Running pre-upgrade...") + defer logrus.Info("Completed pre-upgrade.") if err := preUpgrade(c); err != nil { logrus.WithError(err).Fatalf("Failed to run pre-upgrade") @@ -47,17 +57,61 @@ func preUpgrade(c *cli.Context) error { return errors.Wrap(err, "failed to get client config") } + eventBroadcaster, err := createEventBroadcaster(config) + if err != nil { + return errors.Wrap(err, "failed to create event broadcaster") + } + defer eventBroadcaster.Shutdown() + + scheme := runtime.NewScheme() + if err := longhorn.SchemeBuilder.AddToScheme(scheme); err != nil { + return errors.Wrap(err, "failed to create scheme") + } + + eventRecorder := eventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: PreUpgradeEventer}) + lhClient, err := lhclientset.NewForConfig(config) if err != nil { return errors.Wrap(err, "failed to get clientset") } - if err := upgradeutil.CheckUpgradePath(namespace, lhClient, nil, true); err != nil { + err = newPreUpgrader(namespace, lhClient, eventRecorder).Run() + if err != nil { + logrus.Warnf("Done with Run() ... err is %v", err) + } + + return err +} + +type preUpgrader struct { + namespace string + lhClient lhclientset.Interface + eventRecorder record.EventRecorder +} + +func newPreUpgrader(namespace string, lhClient lhclientset.Interface, eventRecorder record.EventRecorder) *preUpgrader { + return &preUpgrader{namespace, lhClient, eventRecorder} +} + +func (u *preUpgrader) Run() error { + var err error + defer func() { + if err != nil { + u.eventRecorder.Event(&corev1.ObjectReference{Namespace: u.namespace, Name: PreUpgradeEventer}, + corev1.EventTypeWarning, constant.EventReasonFailedUpgradePreCheck, err.Error()) + } else { + u.eventRecorder.Event(&corev1.ObjectReference{Namespace: u.namespace, Name: PreUpgradeEventer}, + corev1.EventTypeNormal, constant.EventReasonPassedUpgradeCheck, "pre-upgrade check passed") + } + }() + + if err = upgradeutil.CheckUpgradePath(u.namespace, u.lhClient, u.eventRecorder, true); err != nil { return err } - if err := environmentCheck(); err != nil { - return errors.Wrap(err, "failed to check environment, please make sure you have iscsiadm/open-iscsi installed on the host") + if err = environmentCheck(); err != nil { + err = errors.Wrap(err, "failed to check environment, please make sure you have iscsiadm/open-iscsi installed on the host") + return err } return nil diff --git a/app/recurring_job.go b/app/recurring_job.go index 1dd2296ed7..f6619e2c69 100644 --- a/app/recurring_job.go +++ b/app/recurring_job.go @@ -21,8 +21,6 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - clientset "k8s.io/client-go/kubernetes" - typedv1core "k8s.io/client-go/kubernetes/typed/core/v1" "github.com/longhorn/longhorn-manager/constant" "github.com/longhorn/longhorn-manager/types" @@ -292,20 +290,6 @@ func newJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName stri }, nil } -func createEventBroadcaster(config *rest.Config) (record.EventBroadcaster, error) { - kubeClient, err := clientset.NewForConfig(config) - if err != nil { - return nil, errors.Wrap(err, "failed to get k8s client") - } - - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(logrus.Infof) - // TODO: remove the wrapper when every clients have moved to use the clientset. - eventBroadcaster.StartRecordingToSink(&typedv1core.EventSinkImpl{Interface: typedv1core.New(kubeClient.CoreV1().RESTClient()).Events("")}) - - return eventBroadcaster, nil -} - func (job *Job) run() (err error) { job.logger.Info("job starts running") diff --git a/app/util.go b/app/util.go new file mode 100644 index 0000000000..20e3759705 --- /dev/null +++ b/app/util.go @@ -0,0 +1,26 @@ +package app + +import ( + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + + clientset "k8s.io/client-go/kubernetes" + typedv1core "k8s.io/client-go/kubernetes/typed/core/v1" +) + +func createEventBroadcaster(config *rest.Config) (record.EventBroadcaster, error) { + kubeClient, err := clientset.NewForConfig(config) + if err != nil { + return nil, errors.Wrap(err, "failed to get k8s client") + } + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(logrus.Infof) + // TODO: remove the wrapper when every clients have moved to use the clientset. + eventBroadcaster.StartRecordingToSink(&typedv1core.EventSinkImpl{Interface: typedv1core.New(kubeClient.CoreV1().RESTClient()).Events("")}) + + return eventBroadcaster, nil +} diff --git a/constant/events.go b/constant/events.go index e8630a427a..903093914d 100644 --- a/constant/events.go +++ b/constant/events.go @@ -62,7 +62,10 @@ const ( EventReasonReady = "Ready" EventReasonUploaded = "Uploaded" - EventReasonUpgrade = "Upgrade" + EventReasonUpgrade = "Upgrade" + EventReasonFailedUpgradePreCheck = "FailedUpgradePreCheck" + EventReasonFailedUpgradePostCheck = "FailedUpgradePostCheck" + EventReasonPassedUpgradeCheck = "PassedUpgradeCheck" EventReasonRolloutSkippedFmt = "RolloutSkipped: %v %v" diff --git a/upgrade/upgrade.go b/upgrade/upgrade.go index e46aeacca4..4aa085cebb 100644 --- a/upgrade/upgrade.go +++ b/upgrade/upgrade.go @@ -241,7 +241,7 @@ func doResourceUpgrade(namespace string, lhClient *lhclientset.Clientset, kubeCl return nil } - // When lhVersionBeforeUpgrade < v1.5.0, it is v1.4.x. The `CheckUpgradePathSupported` method would have failed us out earlier if it was not v1.4.x. + // When lhVersionBeforeUpgrade < v1.5.0, it is v1.4.x. The `CheckUpgradePath` method would have failed us out earlier if it was not v1.4.x. resourceMaps := map[string]interface{}{} if semver.Compare(lhVersionBeforeUpgrade, "v1.5.0") < 0 { logrus.Info("Walking through the resource upgrade path v1.4.x to v1.5.0") @@ -255,14 +255,14 @@ func doResourceUpgrade(namespace string, lhClient *lhclientset.Clientset, kubeCl return err } } - // When lhVersionBeforeUpgrade < v1.6.0, it is v1.5.x. The `CheckUpgradePathSupported` method would have failed us out earlier if it was not v1.5.x. + // When lhVersionBeforeUpgrade < v1.6.0, it is v1.5.x. The `CheckUpgradePath` method would have failed us out earlier if it was not v1.5.x. if semver.Compare(lhVersionBeforeUpgrade, "v1.6.0") < 0 { logrus.Info("Walking through the resource upgrade path v1.5.x to v1.6.0") if err := v15xto160.UpgradeResources(namespace, lhClient, kubeClient, resourceMaps); err != nil { return err } } - // When lhVersionBeforeUpgrade < v1.7.0, it is v1.6.x. The `CheckUpgradePathSupported` method would have failed us out earlier if it was not v1.6.x. + // When lhVersionBeforeUpgrade < v1.7.0, it is v1.6.x. The `CheckUpgradePath` method would have failed us out earlier if it was not v1.6.x. if semver.Compare(lhVersionBeforeUpgrade, "v1.7.0") < 0 { logrus.Info("Walking through the resource upgrade path v1.6.x to v1.7.0") if err := v16xto170.UpgradeResources(namespace, lhClient, kubeClient, resourceMaps); err != nil { @@ -279,7 +279,7 @@ func doResourceUpgrade(namespace string, lhClient *lhclientset.Clientset, kubeCl return err } - // When lhVersionBeforeUpgrade < v1.5.0, it is v1.4.x. The `CheckUpgradePathSupported` method would have failed us out earlier if it was not v1.4.x. + // When lhVersionBeforeUpgrade < v1.5.0, it is v1.4.x. The `CheckUpgradePath` method would have failed us out earlier if it was not v1.4.x. resourceMaps = map[string]interface{}{} if semver.Compare(lhVersionBeforeUpgrade, "v1.5.0") < 0 { logrus.Info("Walking through the resource status upgrade path v1.4.x to v1.5.0") @@ -287,14 +287,14 @@ func doResourceUpgrade(namespace string, lhClient *lhclientset.Clientset, kubeCl return err } } - // When lhVersionBeforeUpgrade < v1.6.0, it is v1.5.x. The `CheckUpgradePathSupported` method would have failed us out earlier if it was not v1.5.x. + // When lhVersionBeforeUpgrade < v1.6.0, it is v1.5.x. The `CheckUpgradePath` method would have failed us out earlier if it was not v1.5.x. if semver.Compare(lhVersionBeforeUpgrade, "v1.6.0") < 0 { logrus.Info("Walking through the resource status upgrade path v1.5.x to v1.6.0") if err := v15xto160.UpgradeResourcesStatus(namespace, lhClient, kubeClient, resourceMaps); err != nil { return err } } - // When lhVersionBeforeUpgrade < v1.7.0, it is v1.6.x. The `CheckUpgradePathSupported` method would have failed us out earlier if it was not v1.6.x. + // When lhVersionBeforeUpgrade < v1.7.0, it is v1.6.x. The `CheckUpgradePath` method would have failed us out earlier if it was not v1.6.x. if semver.Compare(lhVersionBeforeUpgrade, "v1.7.0") < 0 { logrus.Info("Walking through the resource status upgrade path v1.6.x to v1.7.0") if err := v16xto170.UpgradeResourcesStatus(namespace, lhClient, kubeClient, resourceMaps); err != nil {