Skip to content

Commit

Permalink
Relocate utils (#342)
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas authored Mar 24, 2022
1 parent 0776c83 commit 9d7cccd
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 112 deletions.
5 changes: 3 additions & 2 deletions controllers/flinkcluster/flinkcluster_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
v1beta1 "github.com/spotify/flink-on-k8s-operator/apis/flinkcluster/v1beta1"
"github.com/spotify/flink-on-k8s-operator/internal/flink"
"github.com/spotify/flink-on-k8s-operator/internal/model"
"github.com/spotify/flink-on-k8s-operator/internal/util"

appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
Expand Down Expand Up @@ -775,12 +776,12 @@ func convertFromSavepoint(jobSpec *v1beta1.JobSpec, jobStatus *v1beta1.JobStatus
switch {
// Creating for the first time
case jobStatus == nil:
if !IsBlank(jobSpec.FromSavepoint) {
if !util.IsBlank(jobSpec.FromSavepoint) {
return jobSpec.FromSavepoint
}
return nil
// Updating with FromSavepoint provided
case revision.IsUpdateTriggered() && !IsBlank(jobSpec.FromSavepoint):
case revision.IsUpdateTriggered() && !util.IsBlank(jobSpec.FromSavepoint):
return jobSpec.FromSavepoint
// Latest savepoint
case jobStatus.SavepointLocation != "":
Expand Down
5 changes: 3 additions & 2 deletions controllers/flinkcluster/flinkcluster_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
v1beta1 "github.com/spotify/flink-on-k8s-operator/apis/flinkcluster/v1beta1"
"github.com/spotify/flink-on-k8s-operator/internal/controllers/history"
flink "github.com/spotify/flink-on-k8s-operator/internal/flink"
"github.com/spotify/flink-on-k8s-operator/internal/util"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -604,7 +605,7 @@ func (observer *ClusterStateObserver) syncRevisionStatus(observed *ObservedClust
}

// create a new revision from the current cluster
nextRevision, err := newRevision(cluster, getNextRevisionNumber(revisions), &collisionCount)
nextRevision, err := newRevision(cluster, util.GetNextRevisionNumber(revisions), &collisionCount)
if err != nil {
return err
}
Expand Down Expand Up @@ -675,7 +676,7 @@ func (observer *ClusterStateObserver) truncateHistory(observed *ObservedClusterS
historyLimit = 10
}

nonLiveHistory := getNonLiveHistory(revisions, historyLimit)
nonLiveHistory := util.GetNonLiveHistory(revisions, historyLimit)

// delete any non-live history to maintain the revision limit.
for i := 0; i < len(nonLiveHistory); i++ {
Expand Down
15 changes: 8 additions & 7 deletions controllers/flinkcluster/flinkcluster_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
schedulerTypes "github.com/spotify/flink-on-k8s-operator/internal/batchscheduler/types"
"github.com/spotify/flink-on-k8s-operator/internal/flink"
"github.com/spotify/flink-on-k8s-operator/internal/model"
"github.com/spotify/flink-on-k8s-operator/internal/util"

appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
Expand Down Expand Up @@ -572,7 +573,7 @@ func (reconciler *ClusterReconciler) reconcileJob() (ctrl.Result, error) {
if recorded.Revision.IsUpdateTriggered() {
log.Info("Preparing job update")
var takeSavepoint = jobSpec.TakeSavepointOnUpdate == nil || *jobSpec.TakeSavepointOnUpdate
var shouldSuspend = takeSavepoint && IsBlank(jobSpec.FromSavepoint)
var shouldSuspend = takeSavepoint && util.IsBlank(jobSpec.FromSavepoint)
if shouldSuspend {
newSavepointStatus, err = reconciler.trySuspendJob()
} else if shouldUpdateJob(&observed) {
Expand Down Expand Up @@ -826,7 +827,7 @@ func (reconciler *ClusterReconciler) shouldTakeSavepoint() v1beta1.SavepointReas
case jobSpec.AutoSavepointSeconds != nil:
// When previous try was failed, check retry interval.
if savepoint.IsFailed() && savepoint.TriggerReason == v1beta1.SavepointReasonScheduled {
var nextRetryTime = GetTime(savepoint.UpdateTime).Add(SavepointRetryIntervalSeconds * time.Second)
var nextRetryTime = util.GetTime(savepoint.UpdateTime).Add(SavepointRetryIntervalSeconds * time.Second)
if time.Now().After(nextRetryTime) {
return v1beta1.SavepointReasonScheduled
} else {
Expand Down Expand Up @@ -931,7 +932,7 @@ func (reconciler *ClusterReconciler) updateStatus(
if controlStatus != nil {
newStatus.Control = controlStatus
}
SetTimestamp(&newStatus.LastUpdateTime)
util.SetTimestamp(&newStatus.LastUpdateTime)
log.Info("Updating cluster status", "clusterClone", clusterClone, "newStatus", newStatus)
statusUpdateErr = reconciler.k8sClient.Status().Update(reconciler.context, clusterClone)
if statusUpdateErr == nil {
Expand Down Expand Up @@ -966,8 +967,8 @@ func (reconciler *ClusterReconciler) updateJobDeployStatus() error {
newJob.CompletionTime = nil

// Mark as job submitter is deployed.
SetTimestamp(&newJob.DeployTime)
SetTimestamp(&clusterClone.Status.LastUpdateTime)
util.SetTimestamp(&newJob.DeployTime)
util.SetTimestamp(&clusterClone.Status.LastUpdateTime)

// Latest savepoint location should be fromSavepoint.
var fromSavepoint = getFromSavepoint(desiredJobSubmitter.Spec)
Expand All @@ -992,7 +993,7 @@ func (reconciler *ClusterReconciler) getNewSavepointStatus(triggerID string, tri
var jobID = reconciler.getFlinkJobID()
var savepointState string
var now string
SetTimestamp(&now)
util.SetTimestamp(&now)

if triggerSuccess {
savepointState = v1beta1.SavepointStateInProgress
Expand All @@ -1014,7 +1015,7 @@ func (reconciler *ClusterReconciler) getNewSavepointStatus(triggerID string, tri
// Convert raw time to object and add `addedSeconds` to it,
// getting a time object for the parsed `rawTime` with `addedSeconds` added to it.
func getTimeAfterAddedSeconds(rawTime string, addedSeconds int64) time.Time {
var tc = &TimeConverter{}
var tc = &util.TimeConverter{}
var lastTriggerTime = time.Time{}
if len(rawTime) != 0 {
lastTriggerTime = tc.FromString(rawTime)
Expand Down
13 changes: 7 additions & 6 deletions controllers/flinkcluster/flinkcluster_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/go-logr/logr"
v1beta1 "github.com/spotify/flink-on-k8s-operator/apis/flinkcluster/v1beta1"
"github.com/spotify/flink-on-k8s-operator/internal/util"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -76,7 +77,7 @@ func (updater *ClusterStatusUpdater) updateStatusIfChanged() (
updater.observed.cluster.Status,
"new", newStatus)
updater.createStatusChangeEvents(oldStatus, newStatus)
var tc = &TimeConverter{}
var tc = &util.TimeConverter{}
newStatus.LastUpdateTime = tc.ToString(time.Now())
return true, updater.updateClusterStatus(newStatus)
}
Expand Down Expand Up @@ -632,7 +633,7 @@ func (updater *ClusterStatusUpdater) deriveJobStatus() *v1beta1.JobStatus {
newJob.RestartCount++
}
case newJob.State == v1beta1.JobStateRunning:
SetTimestamp(&newJob.StartTime)
util.SetTimestamp(&newJob.StartTime)
newJob.CompletionTime = nil
// When job started, the savepoint is not the final state of the job any more.
if oldJob.FinalSavepoint {
Expand Down Expand Up @@ -675,7 +676,7 @@ func (updater *ClusterStatusUpdater) deriveJobStatus() *v1beta1.JobStatus {
// Currently savepoint complete timestamp is not included in savepoints API response.
// Whereas checkpoint API returns the timestamp latest_ack_timestamp.
// Note: https://ci.apache.org/projects/flink/flink-docs-stable/ops/rest_api.html#jobs-jobid-checkpoints-details-checkpointid
SetTimestamp(&newJob.SavepointTime)
util.SetTimestamp(&newJob.SavepointTime)
}

return newJob
Expand Down Expand Up @@ -953,7 +954,7 @@ func deriveControlStatus(
}
// Update time when state changed.
if c.State != v1beta1.ControlStateInProgress {
SetTimestamp(&c.UpdateTime)
util.SetTimestamp(&c.UpdateTime)
}
return c
}
Expand Down Expand Up @@ -981,10 +982,10 @@ func deriveRevisionStatus(
}

// Update revision status.
r.NextRevision = getRevisionWithNameNumber(observedRevision.nextRevision)
r.NextRevision = util.GetRevisionWithNameNumber(observedRevision.nextRevision)
if r.CurrentRevision == "" {
if recordedRevision.CurrentRevision == "" {
r.CurrentRevision = getRevisionWithNameNumber(observedRevision.currentRevision)
r.CurrentRevision = util.GetRevisionWithNameNumber(observedRevision.currentRevision)
} else {
r.CurrentRevision = recordedRevision.CurrentRevision
}
Expand Down
93 changes: 4 additions & 89 deletions controllers/flinkcluster/flinkcluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@ package flinkcluster

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"regexp"
"strconv"
"strings"
"time"

"github.com/spotify/flink-on-k8s-operator/internal/flink"
"github.com/spotify/flink-on-k8s-operator/internal/util"

v1beta1 "github.com/spotify/flink-on-k8s-operator/apis/flinkcluster/v1beta1"
"github.com/spotify/flink-on-k8s-operator/internal/controllers/history"
Expand Down Expand Up @@ -125,40 +124,6 @@ func getSubmitterJobName(clusterName string) string {
return clusterName + "-job-submitter"
}

// TimeConverter converts between time.Time and string.
type TimeConverter struct{}

// FromString converts string to time.Time.
func (tc *TimeConverter) FromString(timeStr string) time.Time {
timestamp, err := time.Parse(
time.RFC3339, timeStr)
if err != nil {
panic(fmt.Sprintf("Failed to parse time string: %s", timeStr))
}
return timestamp
}

// ToString converts time.Time to string.
func (tc *TimeConverter) ToString(timestamp time.Time) string {
return timestamp.Format(time.RFC3339)
}

// SetTimestamp sets the current timestamp to the target.
func SetTimestamp(target *string) {
var tc = &TimeConverter{}
var now = time.Now()
*target = tc.ToString(now)
}

func GetTime(timeStr string) time.Time {
var tc TimeConverter
return tc.FromString(timeStr)
}

func IsBlank(s *string) bool {
return s == nil || strings.TrimSpace(*s) == ""
}

// Checks whether it is possible to take savepoint.
func canTakeSavepoint(cluster *v1beta1.FlinkCluster) bool {
var jobSpec = cluster.Spec.Job
Expand Down Expand Up @@ -242,14 +207,6 @@ func getPatch(cluster *v1beta1.FlinkCluster) ([]byte, error) {
return patch, err
}

func getNextRevisionNumber(revisions []*appsv1.ControllerRevision) int64 {
count := len(revisions)
if count <= 0 {
return 1
}
return revisions[count-1].Revision + 1
}

func getCurrentRevisionName(r *v1beta1.RevisionStatus) string {
return r.CurrentRevision[:strings.LastIndex(r.CurrentRevision, "-")]
}
Expand All @@ -258,11 +215,6 @@ func getNextRevisionName(r *v1beta1.RevisionStatus) string {
return r.NextRevision[:strings.LastIndex(r.NextRevision, "-")]
}

// Compose revision in FlinkClusterStatus with name and number of ControllerRevision
func getRevisionWithNameNumber(cr *appsv1.ControllerRevision) string {
return fmt.Sprintf("%v-%v", cr.Name, cr.Revision)
}

func getRetryCount(data map[string]string) (string, error) {
var err error
var retries, ok = data["retries"]
Expand Down Expand Up @@ -292,7 +244,7 @@ func getControlStatus(controlName string, state string) *v1beta1.FlinkClusterCon
var controlStatus = new(v1beta1.FlinkClusterControlStatus)
controlStatus.Name = controlName
controlStatus.State = state
SetTimestamp(&controlStatus.UpdateTime)
util.SetTimestamp(&controlStatus.UpdateTime)
return controlStatus
}

Expand Down Expand Up @@ -359,7 +311,7 @@ func isUserControlFinished(controlStatus *v1beta1.FlinkClusterControlStatus) boo

// Check time has passed
func hasTimeElapsed(timeToCheckStr string, now time.Time, intervalSec int) bool {
tc := &TimeConverter{}
tc := &util.TimeConverter{}
timeToCheck := tc.FromString(timeToCheckStr)
intervalPassedTime := timeToCheck.Add(time.Duration(int64(intervalSec) * int64(time.Second)))
return now.After(intervalPassedTime)
Expand Down Expand Up @@ -489,20 +441,6 @@ func shouldUpdateCluster(observed *ObservedClusterState) bool {
return !job.IsActive() && observed.updateState == UpdateStateInProgress
}

func getNonLiveHistory(revisions []*appsv1.ControllerRevision, historyLimit int) []*appsv1.ControllerRevision {

history := append([]*appsv1.ControllerRevision{}, revisions...)
nonLiveHistory := make([]*appsv1.ControllerRevision, 0)

historyLen := len(history)
if historyLen <= historyLimit {
return nonLiveHistory
}

nonLiveHistory = append(nonLiveHistory, history[:(historyLen-historyLimit)]...)
return nonLiveHistory
}

func getFlinkJobDeploymentState(flinkJobState string) string {
switch flinkJobState {
case "INITIALIZING", "CREATED", "RUNNING", "FAILING", "CANCELLING", "RESTARTING", "RECONCILING", "SUSPENDED":
Expand All @@ -518,32 +456,9 @@ func getFlinkJobDeploymentState(flinkJobState string) string {
}
}

func getPodLogs(clientset *kubernetes.Clientset, pod *corev1.Pod) (string, error) {
if pod == nil {
return "", fmt.Errorf("no job pod found, even though submission completed")
}
pods := clientset.CoreV1().Pods(pod.Namespace)

req := pods.GetLogs(pod.Name, &corev1.PodLogOptions{})
podLogs, err := req.Stream(context.TODO())
if err != nil {
return "", fmt.Errorf("failed to get logs for pod %s: %v", pod.Name, err)
}
defer podLogs.Close()

buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
return "", fmt.Errorf("error in copy information from pod logs to buf")
}
str := buf.String()

return str, nil
}

// getFlinkJobSubmitLog extract logs from the job submitter pod.
func getFlinkJobSubmitLog(clientset *kubernetes.Clientset, observedPod *corev1.Pod) (*SubmitterLog, error) {
log, err := getPodLogs(clientset, observedPod)
log, err := util.GetPodLogs(clientset, observedPod)
if err != nil {
return nil, err
}
Expand Down
13 changes: 7 additions & 6 deletions controllers/flinkcluster/flinkcluster_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ import (
"k8s.io/apimachinery/pkg/runtime"

v1beta1 "github.com/spotify/flink-on-k8s-operator/apis/flinkcluster/v1beta1"
"github.com/spotify/flink-on-k8s-operator/internal/util"
"gotest.tools/v3/assert"
)

func TestTimeConverter(t *testing.T) {
var tc = &TimeConverter{}
var tc = &util.TimeConverter{}

var str1 = "2019-10-23T05:10:36Z"
var tm1 = tc.FromString(str1)
Expand Down Expand Up @@ -199,11 +200,11 @@ func TestCanTakeSavepoint(t *testing.T) {

func TestGetNextRevisionNumber(t *testing.T) {
var revisions []*appsv1.ControllerRevision
var nextRevision = getNextRevisionNumber(revisions)
var nextRevision = util.GetNextRevisionNumber(revisions)
assert.Equal(t, nextRevision, int64(1))

revisions = []*appsv1.ControllerRevision{{Revision: 1}, {Revision: 2}}
nextRevision = getNextRevisionNumber(revisions)
nextRevision = util.GetNextRevisionNumber(revisions)
assert.Equal(t, nextRevision, int64(3))
}

Expand Down Expand Up @@ -308,7 +309,7 @@ func TestGetUpdateState(t *testing.T) {
}

func TestHasTimeElapsed(t *testing.T) {
var tc = &TimeConverter{}
var tc = &util.TimeConverter{}
var timeToCheckStr = "2020-01-01T00:00:00+00:00"
var timeToCompare = tc.FromString("2020-01-01T00:00:20+00:00")
var elapsed = hasTimeElapsed(timeToCheckStr, timeToCompare, 10)
Expand Down Expand Up @@ -348,12 +349,12 @@ func TestGetNonLiveHistory(t *testing.T) {
revisions := []*appsv1.ControllerRevision{&revison0, &revison1}

historyLimit := 1
nonLiveHistory := getNonLiveHistory(revisions, historyLimit)
nonLiveHistory := util.GetNonLiveHistory(revisions, historyLimit)
assert.Equal(t, len(nonLiveHistory), 1)
assert.Equal(t, nonLiveHistory[0].Revision, int64(0))

historyLimit = 3
nonLiveHistory = getNonLiveHistory(revisions, historyLimit)
nonLiveHistory = util.GetNonLiveHistory(revisions, historyLimit)
assert.Equal(t, len(nonLiveHistory), 0)
}

Expand Down
Loading

0 comments on commit 9d7cccd

Please sign in to comment.