Skip to content

Commit

Permalink
Add flink job name to job status (#81)
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas authored Aug 9, 2021
1 parent 6d12274 commit 33a2ecb
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 89 deletions.
9 changes: 6 additions & 3 deletions api/v1beta1/flinkcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,12 +567,15 @@ type FlinkClusterControlStatus struct {

// JobStatus defines the status of a job.
type JobStatus struct {
// The name of the Kubernetes job resource.
Name string `json:"name,omitempty"`

// The ID of the Flink job.
ID string `json:"id,omitempty"`

// The Name of the Flink job.
Name string `json:"name,omitempty"`

// The name of the Kubernetes job resource.
SubmitterName string `json:"submitterName,omitempty"`

// The state of the Kubernetes job.
State string `json:"state"`

Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5850,6 +5850,8 @@ spec:
type: string
state:
type: string
submitterName:
type: string
required:
- state
type: object
Expand Down
63 changes: 17 additions & 46 deletions controllers/flinkcluster_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ func (observer *ClusterStateObserver) observeJob(
var observedFlinkJobStatus FlinkJobStatus
var observedFlinkJobSubmitLog *FlinkJobSubmitLog

var recordedJobStatus = observed.cluster.Status.Components.Job
var err error
var log = observer.log

Expand Down Expand Up @@ -260,19 +259,8 @@ func (observer *ClusterStateObserver) observeJob(
}
observed.flinkJobSubmitLog = observedFlinkJobSubmitLog

// Get Flink job ID.
// While job state is pending and job submitter is completed, extract the job ID from the pod termination log.
var flinkJobID string
if observedFlinkJobSubmitLog != nil && observedFlinkJobSubmitLog.JobID != "" {
flinkJobID = observedFlinkJobSubmitLog.JobID
}
// Or get the job ID from the recorded job status which is written previous iteration.
if flinkJobID == "" && recordedJobStatus != nil {
flinkJobID = recordedJobStatus.ID
}

// Flink job status.
observer.observeFlinkJobStatus(observed, flinkJobID, &observedFlinkJobStatus)
observer.observeFlinkJobStatus(observed, &observedFlinkJobStatus)
observed.flinkJobStatus = observedFlinkJobStatus
return nil
}
Expand All @@ -282,24 +270,13 @@ func (observer *ClusterStateObserver) observeJob(
//
// This needs to be done after the job manager is ready, because we use it to detect whether the Flink API server is up
// and running.
func (observer *ClusterStateObserver) observeFlinkJobStatus(
observed *ObservedClusterState,
flinkJobID string,
flinkJobStatus *FlinkJobStatus) {
func (observer *ClusterStateObserver) observeFlinkJobStatus(observed *ObservedClusterState, flinkJobStatus *FlinkJobStatus) {
var log = observer.log

// Observe following
var flinkJob *flink.Job
var flinkJobExceptions *flink.JobExceptions
var flinkJobList *flink.JobsOverview
var flinkJobsUnexpected []string

// Wait until the job manager is ready.
var jmReady = observed.jmStatefulSet != nil && getStatefulSetState(observed.jmStatefulSet) == v1beta1.ComponentStateReady
jmReady := observed.jmStatefulSet != nil && getStatefulSetState(observed.jmStatefulSet) == v1beta1.ComponentStateReady
if !jmReady {
log.Info(
"Skip getting Flink job status.",
"job manager ready", jmReady)
log.Info("Skip getting Flink job status; JobManager is not ready")
return
}

Expand All @@ -316,12 +293,17 @@ func (observer *ClusterStateObserver) observeFlinkJobStatus(
// Initialize flinkJobStatus if flink API is available.
flinkJobStatus.flinkJobList = flinkJobList

// Extract the current job status and unexpected jobs, if submitted job ID is provided.
if flinkJobID == "" {
// Check running jobs
if len(flinkJobList.Jobs) < 1 {
return
}

flinkJobExceptions, err = observer.flinkClient.GetJobExceptions(flinkAPIBaseURL, flinkJobID)
flinkJobStatus.flinkJob = &flinkJobList.Jobs[0]
for _, job := range flinkJobList.Jobs[1:] {
flinkJobStatus.flinkJobsUnexpected = append(flinkJobStatus.flinkJobsUnexpected, job.Id)
}

flinkJobExceptions, err := observer.flinkClient.GetJobExceptions(flinkAPIBaseURL, flinkJobStatus.flinkJob.Id)
if err != nil {
// It is normal in many cases, not an error.
log.Info("Failed to get Flink job exceptions.", "error", err)
Expand All @@ -330,31 +312,20 @@ func (observer *ClusterStateObserver) observeFlinkJobStatus(
log.Info("Observed Flink job exceptions", "jobs", flinkJobExceptions)
flinkJobStatus.flinkJobExceptions = flinkJobExceptions

for _, job := range flinkJobList.Jobs {
if flinkJobID == job.Id {
*flinkJob = job
} else if getFlinkJobDeploymentState(job.State) == v1beta1.JobStateRunning {
flinkJobsUnexpected = append(flinkJobsUnexpected, job.Id)
}
}

flinkJobStatus.flinkJob = flinkJob
flinkJobStatus.flinkJobsUnexpected = flinkJobsUnexpected

// It is okay if there are multiple jobs, but at most one of them is
// expected to be running. This is typically caused by job client
// timed out and exited but the job submission was actually
// successfully. When retrying, it first cancels the existing running
// job which it has lost track of, then submit the job again.
if len(flinkJobsUnexpected) > 1 {
if len(flinkJobStatus.flinkJobsUnexpected) > 1 {
log.Error(
errors.New("more than one unexpected Flink job were found"),
"", "unexpected jobs", flinkJobsUnexpected)
}
if flinkJob != nil {
log.Info("Observed Flink job", "flink job", *flinkJob)
"", "unexpected jobs", flinkJobStatus.flinkJobsUnexpected)
}

if flinkJobStatus.flinkJob != nil {
log.Info("Observed Flink job", "flink job", *flinkJobStatus.flinkJob)
}
}

func (observer *ClusterStateObserver) observeSavepoint(observed *ObservedClusterState) error {
Expand Down
66 changes: 32 additions & 34 deletions controllers/flinkcluster_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,74 +698,72 @@ func (updater *ClusterStatusUpdater) getFlinkJobID() *string {

func (updater *ClusterStatusUpdater) deriveJobStatus() *v1beta1.JobStatus {
var observed = updater.observed
var observedJob = updater.observed.job
var observedFlinkJob = updater.observed.flinkJobStatus.flinkJob
var observedCluster = updater.observed.cluster
var observedSavepoint = updater.observed.savepoint
var recordedJobStatus = updater.observed.cluster.Status.Components.Job
var newJobStatus *v1beta1.JobStatus
var observedFlinkJob = observed.flinkJobStatus.flinkJob
var observedJobStatus = observed.cluster.Status.Components.Job

if recordedJobStatus == nil {
if observedJobStatus == nil {
return nil
}
newJobStatus = recordedJobStatus.DeepCopy()
newJobStatus := observedJobStatus.DeepCopy()

// Flink Job ID
if observedFlinkJob != nil {
newJobStatus.ID = observedFlinkJob.Id
newJobStatus.Name = observedFlinkJob.Name
}

if observed.job != nil {
newJobStatus.SubmitterName = observed.job.Name
}

// Determine job state
var jobState string
switch {
// Updating state
case isUpdateTriggered(observedCluster.Status) &&
(isJobStopped(recordedJobStatus) || observedCluster.Status.State == v1beta1.ClusterStateStopped):
case isUpdateTriggered(observed.cluster.Status) &&
(isJobStopped(observedJobStatus) || observed.cluster.Status.State == v1beta1.ClusterStateStopped):
jobState = v1beta1.JobStateUpdating
// Already terminated state
case isJobTerminated(observedCluster.Spec.Job.RestartPolicy, recordedJobStatus):
jobState = recordedJobStatus.State
case isJobTerminated(observed.cluster.Spec.Job.RestartPolicy, observedJobStatus):
jobState = observedJobStatus.State
// Derive state from the observed Flink job
case observedFlinkJob != nil:
jobState = getFlinkJobDeploymentState(observedFlinkJob.State)
if jobState == "" {
updater.log.Error(errors.New("failed to determine Flink job deployment state"), "observed flink job status", observedFlinkJob.State)
jobState = recordedJobStatus.State
jobState = observedJobStatus.State
}
// When Flink job not found
case isFlinkAPIReady(observed):
switch recordedJobStatus.State {
switch observedJobStatus.State {
case v1beta1.JobStateRunning:
jobState = v1beta1.JobStateLost
case v1beta1.JobStatePending:
// Flink job is submitted but not confirmed via job manager yet
var jobSubmitSucceeded = updater.getFlinkJobID() != nil
// Flink job submit is in progress
var jobSubmitInProgress = observedJob != nil &&
observedJob.Status.Succeeded == 0 &&
observedJob.Status.Failed == 0
var jobSubmitInProgress = observed.job != nil &&
observed.job.Status.Succeeded == 0 &&
observed.job.Status.Failed == 0
if jobSubmitSucceeded || jobSubmitInProgress {
jobState = v1beta1.JobStatePending
break
}
jobState = v1beta1.JobStateFailed
default:
jobState = recordedJobStatus.State
jobState = observedJobStatus.State
}
// When Flink API unavailable
default:
if recordedJobStatus.State == v1beta1.JobStatePending {
var jobSubmitFailed = observedJob != nil && observedJob.Status.Failed > 0
if observedJobStatus.State == v1beta1.JobStatePending {
var jobSubmitFailed = observed.job != nil && observed.job.Status.Failed > 0
if jobSubmitFailed {
jobState = v1beta1.JobStateFailed
break
}
}
jobState = recordedJobStatus.State
}

// Flink Job ID
if jobState == v1beta1.JobStateUpdating {
newJobStatus.ID = ""
} else if observedFlinkJob != nil {
newJobStatus.ID = observedFlinkJob.Id
jobState = observedJobStatus.State
}

// Update State
newJobStatus.State = jobState

Expand All @@ -778,23 +776,23 @@ func (updater *ClusterStatusUpdater) deriveJobStatus() *v1beta1.JobStatus {
if newJobStatus.State == v1beta1.JobStateFailed {
if len(newJobStatus.FailureReasons) == 0 {
newJobStatus.FailureReasons = []string{}
exceptions := updater.observed.flinkJobStatus.flinkJobExceptions
exceptions := observed.flinkJobStatus.flinkJobExceptions
if exceptions != nil && len(exceptions.Exceptions) > 0 {
for _, e := range exceptions.Exceptions {
newJobStatus.FailureReasons = append(newJobStatus.FailureReasons, e.Exception)
}
}
if updater.observed.flinkJobSubmitLog != nil {
newJobStatus.FailureReasons = append(newJobStatus.FailureReasons, updater.observed.flinkJobSubmitLog.Message)
newJobStatus.FailureReasons = append(newJobStatus.FailureReasons, observed.flinkJobSubmitLog.Message)
}
}
}

// Savepoint
if observedSavepoint != nil && observedSavepoint.IsSuccessful() {
if observed.savepoint != nil && observed.savepoint.IsSuccessful() {
newJobStatus.SavepointGeneration++
newJobStatus.LastSavepointTriggerID = observedSavepoint.TriggerID
newJobStatus.SavepointLocation = observedSavepoint.Location
newJobStatus.LastSavepointTriggerID = observed.savepoint.TriggerID
newJobStatus.SavepointLocation = observed.savepoint.Location
setTimestamp(&newJobStatus.LastSavepointTime)
}

Expand Down
12 changes: 6 additions & 6 deletions controllers/flinkcluster_updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func TestClusterStatus(t *testing.T) {
State: "NotReady",
},
Job: &v1beta1.JobStatus{
Name: "my-job",
State: "Pending",
SubmitterName: "my-job",
State: "Pending",
},
},
State: "Creating"}
Expand All @@ -91,8 +91,8 @@ func TestClusterStatus(t *testing.T) {
URLs: []string{"http://my-jobmanager"},
},
Job: &v1beta1.JobStatus{
Name: "my-job",
State: "Running",
SubmitterName: "my-job",
State: "Running",
},
},
State: "Creating"}
Expand Down Expand Up @@ -122,8 +122,8 @@ func TestClusterStatus(t *testing.T) {
State: v1beta1.ComponentStateNotReady,
},
Job: &v1beta1.JobStatus{
Name: "my-job",
State: v1beta1.JobStatePending,
SubmitterName: "my-job",
State: v1beta1.JobStatePending,
},
},
State: v1beta1.ClusterStateCreating,
Expand Down

0 comments on commit 33a2ecb

Please sign in to comment.