Skip to content

Commit

Permalink
api/v1/orc: TaskQuery handler to use GetActiveCondition
Browse files Browse the repository at this point in the history
This cleans up how active/pending conditions are looked up by the Orc API
for both Condition, Task lookups.

Additionally, the handler returns a 422 if the Task object being queried
is identified to be stale - from a previous condition. At which point
an operator intervention/reconciliation is required.
  • Loading branch information
joelrebel committed Jul 30, 2024
1 parent cb9c191 commit 3198c82
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 94 deletions.
56 changes: 20 additions & 36 deletions pkg/api/v1/orchestrator/routes/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (r *Routes) conditionStatusUpdate(c *gin.Context) (int, *v1types.ServerResp
}
}

// @Summary taskQuery returns the active condition for a serverID
// @Summary taskQuery returns the active/pending condition for a serverID
// @Tag Conditions
// @Description Queries a *rivets.Task object from KV for a condition
// @Description Controllers will not have always know the taskID and so this enables querying
Expand All @@ -156,8 +156,8 @@ func (r *Routes) conditionStatusUpdate(c *gin.Context) (int, *v1types.ServerResp
// @Success 200 {object} v1types.ServerResponse
// Failure 400 {object} v1types.ServerResponse
// Failure 404 {object} v1types.ServerResponse
// Failure 422 {object} v1types.ServerResponse
// Failure 500 {object} v1types.ServerResponse
// Failure 503 {object} v1types.ServerResponse
// @Router /servers/{uuid}/condition-task/{conditionKind} [get]
func (r *Routes) taskQuery(c *gin.Context) (int, *v1types.ServerResponse) {
ctx, span := otel.Tracer(pkgName).Start(c.Request.Context(), "Routes.taskQuery")
Expand Down Expand Up @@ -185,53 +185,37 @@ func (r *Routes) taskQuery(c *gin.Context) (int, *v1types.ServerResponse) {
}
}

cr, err := r.repository.Get(ctx, serverID)
found, err := r.repository.GetActiveCondition(ctx, serverID)
if err != nil {
return http.StatusInternalServerError, &v1types.ServerResponse{
Message: "condition lookup: " + err.Error(),
}
}

var found bool
var activeCond *rctypes.Condition
for idx, cond := range cr.Conditions {
if cond.Kind == conditionKind {
found = true
}

if !rctypes.StateIsComplete(cond.State) {
activeCond = cr.Conditions[idx]
if errors.Is(err, store.ErrConditionNotFound) {
return http.StatusNotFound, &v1types.ServerResponse{
Message: "no pending/active condition not found for server",
}
}
}

if !found {
return http.StatusBadRequest, &v1types.ServerResponse{
Message: "no matching condition found in record: " + string(conditionKind),
}
}
r.logger.WithError(err).Info("condition record query error")

if activeCond == nil {
return http.StatusBadRequest, &v1types.ServerResponse{
Message: "no active condition found in record",
return http.StatusInternalServerError, &v1types.ServerResponse{
Message: "condition lookup: " + err.Error(),
}
}

task, err := r.taskKV.get(c.Request.Context(), conditionKind, activeCond.ID, serverID)
task, err := r.taskKV.get(c.Request.Context(), conditionKind, found.ID, serverID)
if err != nil {
r.logger.WithField("conditionID", activeCond.ID).WithError(err).Info("task KV query error")
if errors.Is(err, errStaleTask) {
// 422 indicates a stale task for this server in the KV and unless that is purged, we cannot proceed,
// an operator is required to clean up the stale task.
return http.StatusUnprocessableEntity, &v1types.ServerResponse{
Message: err.Error(),
}
}

r.logger.WithField("conditionID", found.ID).WithError(err).Info("task KV query error")
return http.StatusInternalServerError, &v1types.ServerResponse{
Message: err.Error(),
}
}

// A stale task was not cleaned up and now we have an odd situation
if activeCond.ID != task.ID {
return http.StatusBadRequest, &v1types.ServerResponse{
Message: fmt.Sprintf("TaskID: %s does not match active ConditionID: %s", task.ID, activeCond.ID),
}
}

return http.StatusOK, &v1types.ServerResponse{
Message: "Task identified",
Task: task,
Expand Down Expand Up @@ -303,7 +287,7 @@ func (r *Routes) taskPublish(c *gin.Context) (int, *v1types.ServerResponse) {
}

// the controller retrieved the condition from the queue which created the Task entry
// we expect an active condition to allow this publish
// we expect an active/pending condition to allow this publish
activeCond, err := r.repository.GetActiveCondition(ctx, serverID)
if err != nil {
if errors.Is(err, store.ErrConditionNotFound) {
Expand Down
87 changes: 29 additions & 58 deletions pkg/api/v1/orchestrator/routes/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,8 @@ func TestTaskQuery(t *testing.T) {
{
name: "repository error",
mockRepository: func(r *store.MockRepository) {
r.On("Get", mock.Anything, serverID).
Return(nil, errors.New("repository error")).
r.On("GetActiveCondition", mock.Anything, serverID).
Return(nil, errors.Wrap(store.ErrRepository, "cosmic rays")).
Once()
},
request: func(t *testing.T) *http.Request {
Expand All @@ -361,37 +361,10 @@ func TestTaskQuery(t *testing.T) {
},
},
{
name: "no matching condition found",
name: "no active/pending condition found",
mockRepository: func(r *store.MockRepository) {
r.On("Get", mock.Anything, serverID).
Return(&store.ConditionRecord{
Conditions: []*rctypes.Condition{
{Kind: rctypes.Kind("other_kind"), State: rctypes.Active},
},
}, nil).
Once()
},
request: func(t *testing.T) *http.Request {
request, err := http.NewRequestWithContext(context.TODO(), http.MethodGet, surl, http.NoBody)
if err != nil {
t.Fatal(err)
}
return request
},
assertResponse: func(t *testing.T, r *httptest.ResponseRecorder) {
assert.Equal(t, http.StatusBadRequest, r.Code)
assert.Contains(t, string(asBytes(t, r.Body)), "no matching condition found in record")
},
},
{
name: "no active condition found",
mockRepository: func(r *store.MockRepository) {
r.On("Get", mock.Anything, serverID).
Return(&store.ConditionRecord{
Conditions: []*rctypes.Condition{
{Kind: conditionKind, State: rctypes.Succeeded},
},
}, nil).
r.On("GetActiveCondition", mock.Anything, serverID).
Return(nil, store.ErrConditionNotFound).
Once()
},
request: func(t *testing.T) *http.Request {
Expand All @@ -402,27 +375,27 @@ func TestTaskQuery(t *testing.T) {
return request
},
assertResponse: func(t *testing.T, r *httptest.ResponseRecorder) {
assert.Equal(t, http.StatusBadRequest, r.Code)
assert.Contains(t, string(asBytes(t, r.Body)), "no active condition found in record")
assert.Equal(t, http.StatusNotFound, r.Code)
assert.Contains(t, string(asBytes(t, r.Body)), store.ErrConditionNotFound.Error())
},
},
{
name: "task KV query error",
mockRepository: func(r *store.MockRepository) {
r.On("Get", mock.Anything, serverID).
Return(&store.ConditionRecord{
Conditions: []*rctypes.Condition{
{ID: conditionID, Kind: conditionKind, State: rctypes.Active},
},
}, nil).
r.On("GetActiveCondition", mock.Anything, serverID).
Return(
&rctypes.Condition{ID: conditionID, Kind: conditionKind, State: rctypes.Active},
nil,
).
Once()
},
mockTaskKV: func(tk *MocktaskKV) {
tk.On("get", mock.Anything, conditionKind, conditionID, serverID).
Return(nil, fmt.Errorf("task KV query error")).
Return(nil, errQueryTask).
Once()
},
request: func(t *testing.T) *http.Request {
fmt.Println(surl)
request, err := http.NewRequestWithContext(context.TODO(), http.MethodGet, surl, http.NoBody)
if err != nil {
t.Fatal(err)
Expand All @@ -431,23 +404,22 @@ func TestTaskQuery(t *testing.T) {
},
assertResponse: func(t *testing.T, r *httptest.ResponseRecorder) {
assert.Equal(t, http.StatusInternalServerError, r.Code)
assert.Contains(t, string(asBytes(t, r.Body)), "task KV query error")
assert.Contains(t, string(asBytes(t, r.Body)), errQueryTask.Error())
},
},
{
name: "task ID mismatch",
name: "task obj stale/mismatch",
mockRepository: func(r *store.MockRepository) {
r.On("Get", mock.Anything, serverID).
Return(&store.ConditionRecord{
Conditions: []*rctypes.Condition{
{ID: conditionID, Kind: conditionKind, State: rctypes.Active},
},
}, nil).
r.On("GetActiveCondition", mock.Anything, serverID).
Return(
&rctypes.Condition{ID: conditionID, Kind: conditionKind, State: rctypes.Active},
nil,
).
Once()
},
mockTaskKV: func(tk *MocktaskKV) {
tk.On("get", mock.Anything, conditionKind, conditionID, serverID).
Return(&rctypes.Task[any, any]{ID: uuid.New(), Kind: conditionKind}, nil).
Return(nil, errStaleTask).
Once()
},
request: func(t *testing.T) *http.Request {
Expand All @@ -458,19 +430,18 @@ func TestTaskQuery(t *testing.T) {
return request
},
assertResponse: func(t *testing.T, r *httptest.ResponseRecorder) {
assert.Equal(t, http.StatusBadRequest, r.Code)
assert.Contains(t, string(asBytes(t, r.Body)), "does not match active ConditionID")
assert.Equal(t, http.StatusUnprocessableEntity, r.Code)
assert.Contains(t, string(asBytes(t, r.Body)), errStaleTask.Error())
},
},
{
name: "successful task query",
mockRepository: func(r *store.MockRepository) {
r.On("Get", mock.Anything, serverID).
Return(&store.ConditionRecord{
Conditions: []*rctypes.Condition{
{ID: conditionID, Kind: conditionKind, State: rctypes.Active},
},
}, nil).
r.On("GetActiveCondition", mock.Anything, serverID).
Return(
&rctypes.Condition{ID: conditionID, Kind: conditionKind, State: rctypes.Active},
nil,
).
Once()
},
mockTaskKV: func(tk *MocktaskKV) {
Expand Down

0 comments on commit 3198c82

Please sign in to comment.