Skip to content

Commit

Permalink
Refactor upgrade logic to skip pre upgrade job in case of patch upgrades
Browse files Browse the repository at this point in the history
  • Loading branch information
anshumanks committed Feb 7, 2025
1 parent 6f4b3fa commit 9606412
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 27 deletions.
3 changes: 2 additions & 1 deletion controllers/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const (
confTwillSecurityWorkerSecretDiskPath = "twill.security.worker.secret.disk.path"
confJMXServerPort = "jmx.metrics.collector.server.port"
confSecretMountDefaultMode = "secret.mount.default.mode"
confSkipPreUpgradeFlag = "cdap-operator.skip.preupgrade-job"

// default values
defaultImage = "gcr.io/cdapio/cdap:latest"
Expand Down Expand Up @@ -129,4 +130,4 @@ const (
kiloBytes = int64(1024)
megaBytes = int64(1024 * 1024)
gigaBytes = int64(1024 * 1024 * 1024)
)
)
9 changes: 9 additions & 0 deletions controllers/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,8 @@ type VersionUpgradeJobSpec struct {
HConf string `json:"hadoopConf,omitempty"`
PreUpgrade bool `json:"preUpgrade,omitempty"`
PostUpgrade bool `json:"postUpgrade,omitempty"`
SkipPreUpgradeFlag bool `json:"skipPreUpgradeFlag,omitempty"`
SkipPreUpgrade bool `json:"skipPreUpgrade,omitempty"`
}

func newUpgradeJobSpec(master *v1alpha1.CDAPMaster, name string, labels map[string]string, startTimeMs int64, cconf, hconf string) *VersionUpgradeJobSpec {
Expand All @@ -595,6 +597,8 @@ func newUpgradeJobSpec(master *v1alpha1.CDAPMaster, name string, labels map[stri
s.StartTimeMs = startTimeMs
s.CConf = cconf
s.HConf = hconf
s.SkipPreUpgradeFlag = (master.Spec.Config[confSkipPreUpgradeFlag] == "true")
s.SkipPreUpgrade = false
return s
}

Expand All @@ -607,3 +611,8 @@ func (s *VersionUpgradeJobSpec) SetPostUpgrade(isPostUpgrade bool) *VersionUpgra
s.PostUpgrade = isPostUpgrade
return s
}

func (s *VersionUpgradeJobSpec) SetSkipPreUpgrade(isPatchUpgrade bool) *VersionUpgradeJobSpec {
s.SkipPreUpgrade = isPatchUpgrade && s.SkipPreUpgradeFlag
return s
}
3 changes: 2 additions & 1 deletion controllers/testdata/pre_upgrade_job.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@
"args": [
"io.cdap.cdap.master.upgrade.UpgradeJobMain",
"cdap-test-router",
"11015"
"11015",
"false"
],
"image": "gcr.io/cloud-data-fusion-images/cloud-data-fusion:6.1.0.5",
"imagePullPolicy": "IfNotPresent",
Expand Down
59 changes: 39 additions & 20 deletions controllers/version_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,21 @@ func init() {
/////////////////////////////////////////////////////////////

func handleVersionUpdate(master *v1alpha1.CDAPMaster, labels map[string]string, observed []reconciler.Object) ([]reconciler.Object, error) {
curVersion, err := getCurrentImageVersion(master)
if err != nil {
return nil, err
}
newVersion, err := getNewImageVersion(master)
if err != nil {
return nil, err
}
versionComparison := compareVersion(curVersion, newVersion)
isPatchUpgrade := versionComparison == -2

// Let the current update complete if there is any
if isConditionTrue(master, updateStatus.Inprogress) {
log.Printf("Version update ingress. Continue... ")
return upgradeForBackend(master, labels, observed)
return upgradeForBackend(master, labels, observed, isPatchUpgrade)
}

if objs, versionUpdated, err := updateForUserInterface(master); err != nil {
Expand All @@ -45,21 +56,13 @@ func handleVersionUpdate(master *v1alpha1.CDAPMaster, labels map[string]string,
}

// Update backend service image version
curVersion, err := getCurrentImageVersion(master)
if err != nil {
return nil, err
}
newVersion, err := getNewImageVersion(master)
if err != nil {
return nil, err
}
if len(curVersion.rawString) == 0 {
setImageToUse(master)
return []reconciler.Object{}, nil
}

switch compareVersion(curVersion, newVersion) {
case -1:
switch versionComparison {
case -2, -1:
// Upgrade case

// Don't retry upgrade if it failed.
Expand All @@ -73,7 +76,7 @@ func handleVersionUpdate(master *v1alpha1.CDAPMaster, labels map[string]string,
setCondition(master, updateStatus.Inprogress)
master.Status.UpgradeStartTimeMillis = getCurrentTimeMs()
log.Printf("Version update: start upgrading %s -> %s ", curVersion.rawString, newVersion.rawString)
return upgradeForBackend(master, labels, observed)
return upgradeForBackend(master, labels, observed, isPatchUpgrade)
case 0:
// Reset all condition so that failed upgraded/downgrade can be retried later if needed.
// This is needed when last upgrade failed and user has reset the version in spec.
Expand Down Expand Up @@ -120,7 +123,7 @@ func downgradeForBackend(master *v1alpha1.CDAPMaster) ([]reconciler.Object, erro
return []reconciler.Object{}, nil
}

func upgradeForBackend(master *v1alpha1.CDAPMaster, labels map[string]string, observed []reconciler.Object) ([]reconciler.Object, error) {
func upgradeForBackend(master *v1alpha1.CDAPMaster, labels map[string]string, observed []reconciler.Object, isPatchUpgrade bool) ([]reconciler.Object, error) {
// Find either pre- or post- upgrade job
findJob := func(jobName string) *batchv1.Job {
var job *batchv1.Job = nil
Expand Down Expand Up @@ -163,7 +166,7 @@ func upgradeForBackend(master *v1alpha1.CDAPMaster, labels map[string]string, ob
if !isConditionTrue(master, updateStatus.PreUpgradeSucceeded) {
log.Printf("Version update: pre-upgrade job not completed")
preJobName := getPreUpgradeJobName(master.Status.UpgradeStartTimeMillis)
preJobSpec := buildPreUpgradeJobSpec(getPreUpgradeJobName(master.Status.UpgradeStartTimeMillis), master, labels)
preJobSpec := buildPreUpgradeJobSpec(getPreUpgradeJobName(master.Status.UpgradeStartTimeMillis), master, labels, isPatchUpgrade)
job := findJob(preJobName)
if job == nil {
obj, err := createJob(preJobSpec)
Expand Down Expand Up @@ -406,6 +409,7 @@ func parseImageString(imageString string) (*Version, error) {
}

// compare two parsed versions
// -2: left < right, patch upgrade
// -1: left < right
// 0: left = right
// 1: left > right
Expand All @@ -418,9 +422,24 @@ func compareVersion(l, r *Version) int {
return -1
}

lenL, lenR := len(l.components), len(r.components)
// Check if it only a patch upgrade
if lenL == lenR && lenL > 0 && l.components[lenL-1] < r.components[lenL-1] {
allEqual := true
for i := 0; i < lenL-1; i++ {
if l.components[i] != r.components[i] {
allEqual = false
break
}
}
if allEqual {
return -2
}
}

i := 0
j := 0
for i < len(l.components) && j < len(r.components) {
for i < lenL && j < lenR {
if l.components[i] > r.components[j] {
return 1
} else if l.components[i] < r.components[j] {
Expand All @@ -429,13 +448,13 @@ func compareVersion(l, r *Version) int {
i++
j++
}
for i < len(l.components) {
for i < lenL {
if l.components[i] > 0 {
return 1
}
i++
}
for j < len(r.components) {
for j < lenR {
if r.components[j] > 0 {
return 1
}
Expand Down Expand Up @@ -513,12 +532,12 @@ func getPostUpgradeJobName(startTimeMs int64) string {
}

// Return pre-upgrade job spec
func buildPreUpgradeJobSpec(jobName string, master *v1alpha1.CDAPMaster, labels map[string]string) *VersionUpgradeJobSpec {
func buildPreUpgradeJobSpec(jobName string, master *v1alpha1.CDAPMaster, labels map[string]string, isPatchUpgrade bool) *VersionUpgradeJobSpec {
startTimeMs := master.Status.UpgradeStartTimeMillis
cconf := getObjName(master, configMapCConf)
hconf := getObjName(master, configMapHConf)
name := getObjName(master, jobName)
return newUpgradeJobSpec(master, name, labels, startTimeMs, cconf, hconf).SetPreUpgrade(true)
return newUpgradeJobSpec(master, name, labels, startTimeMs, cconf, hconf).SetPreUpgrade(true).SetSkipPreUpgrade(isPatchUpgrade)
}

// Return post-upgrade job spec
Expand All @@ -537,4 +556,4 @@ func buildUpgradeJobObject(spec *VersionUpgradeJobSpec) (*reconciler.Object, err
return nil, err
}
return obj, nil
}
}
21 changes: 18 additions & 3 deletions controllers/version_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var _ = Describe("Controller Suite", func() {
It("Compare image versions", func() {
imagePairs := []Pair{
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:latest"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.0.0.1"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.0.1.0"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.1.0"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:7"},
}
Expand All @@ -55,6 +55,21 @@ var _ = Describe("Controller Suite", func() {
Expect(compareVersion(high, low)).To(Equal(1))
}
})
It("Compare image versions in patch upgrade", func() {
imagePairs := []Pair{
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.0.0.1"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.0.0.3"},
Pair{"gcr.io/cdapio/cdap:6.0.0.0", "gcr.io/cdapio/cdap:6.0.0.9"},
}
for _, imagePair := range imagePairs {
low, err := parseImageString(imagePair.first.(string))
Expect(err).To(BeNil())
high, err := parseImageString(imagePair.second.(string))
Expect(err).To(BeNil())
Expect(compareVersion(low, high)).To(Equal(-2))
Expect(compareVersion(high, low)).To(Equal(1))
}
})
It("Compare same image versions", func() {
imagePairs := []Pair{
Pair{"gcr.io/cdapio/cdap:latest", "gcr.io/cdapio/cdap:latest"},
Expand Down Expand Up @@ -199,7 +214,7 @@ var _ = Describe("Controller Suite", func() {
ImageToUse: curUIImage,
},
}
postJobSpec := buildPreUpgradeJobSpec(getPreUpgradeJobName(master.Status.UpgradeStartTimeMillis), master, emptyLabels)
postJobSpec := buildPreUpgradeJobSpec(getPreUpgradeJobName(master.Status.UpgradeStartTimeMillis), master, emptyLabels, false)
object, err := buildUpgradeJobObject(postJobSpec)
Expect(err).To(BeNil())

Expand Down Expand Up @@ -241,4 +256,4 @@ var _ = Describe("Controller Suite", func() {
Expect(diff.String()).To(Equal(jsondiff.SupersetMatch.String()), text)
})
})
})
})
4 changes: 2 additions & 2 deletions templates/upgrade-job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ spec:
{{end}}
{{if .PreUpgrade}}
- name: pre-upgrade
args: ["io.cdap.cdap.master.upgrade.UpgradeJobMain", "{{.HostName}}", "11015"]
args: ["io.cdap.cdap.master.upgrade.UpgradeJobMain", "{{.HostName}}", "11015", "{{.SkipPreUpgrade}}"]
{{end}}
image: {{.Image}}
volumeMounts:
Expand Down Expand Up @@ -73,4 +73,4 @@ spec:
secretName: {{.SecuritySecret}}
{{end}}
restartPolicy: Never
backoffLimit: {{.BackoffLimit}}
backoffLimit: {{.BackoffLimit}}

0 comments on commit 9606412

Please sign in to comment.