diff --git a/proxmox/session.go b/proxmox/session.go index 04916a36..2843b9d3 100644 --- a/proxmox/session.go +++ b/proxmox/session.go @@ -14,6 +14,9 @@ import ( "net/http" "net/http/httputil" "net/url" + "strings" + "sync" + "time" ) var Debug = new(bool) @@ -146,6 +149,24 @@ func ResponseJSON(resp *http.Response) (jbody map[string]interface{}, err error) return jbody, err } +func taskResponse(resp *http.Response) (Task, error) { + var jbody map[string]interface{} + var err error + if err = decodeResponse(resp, &jbody); err != nil { + return nil, err + } + if v, isSet := jbody["errors"]; isSet { + errJSON, _ := json.MarshalIndent(v, "", " ") + return nil, fmt.Errorf("error: %s", errJSON) + } + if v, isSet := jbody["data"]; isSet { + task := &task{} + task.mapToSDK_Unsafe(v.(string)) + return task, nil + } + return nil, nil +} + // Is this needed? func TypedResponse(resp *http.Response, v interface{}) error { var intermediate struct { @@ -421,3 +442,183 @@ func (s *Session) Put( } return s.Request(ctx, "PUT", url, params, headers, body) } + +type Task interface { + EndTime() time.Time + ExitStatus() string + ID() string + Node() string + OperationType() string + ProcessID() uint + StartTime() time.Time + Status() string + User() UserID + WaitForCompletion(context.Context, *Client) error +} + +type task struct { + id string + node string + operationType string + status map[string]interface{} + statusMutex sync.Mutex + user UserID +} + +const ( + taskApiKeyEndTime = "endtime" + taskApiKeyExitStatus = "exitstatus" + taskApiKeyProcessID = "pid" + taskApiKeyStartTime = "starttime" + taskApiKeyStatus = "status" +) + +// Returns the time the task ended. If the task has not ended, the zero time is returned. +func (t *task) EndTime() time.Time { + if t == nil { + return time.Time{} + } + t.statusMutex.Lock() + defer t.statusMutex.Unlock() + if v, isSet := t.status[taskApiKeyEndTime]; isSet { + return time.Unix(int64(v.(float64)), 0) + } + return time.Time{} +} + +// Returns the exit status of the task. If the task has not ended, an empty string is returned. +func (t *task) ExitStatus() string { + if t == nil { + return "" + } + t.statusMutex.Lock() + defer t.statusMutex.Unlock() + if v, isSet := t.status[taskApiKeyExitStatus]; isSet { + return v.(string) + } + return "" +} + +// Returns the ID of the task. +func (t *task) ID() string { + if t == nil { + return "" + } + return t.id +} + +// Returns the node the task was executed on. +func (t *task) Node() string { + if t == nil { + return "" + } + return t.node +} + +// Returns the operation type of the task. +func (t *task) OperationType() string { + if t == nil { + return "" + } + return t.operationType +} + +// Returns the process ID of the task. +func (t *task) ProcessID() uint { + if t == nil { + return 0 + } + t.statusMutex.Lock() + defer t.statusMutex.Unlock() + if v, isSet := t.status[taskApiKeyProcessID]; isSet { + return uint(v.(float64)) + } + return 0 +} + +// Returns the time the task started. +func (t *task) StartTime() time.Time { + if t == nil { + return time.Time{} + } + t.statusMutex.Lock() + defer t.statusMutex.Unlock() + if v, isSet := t.status[taskApiKeyStartTime]; isSet { + return time.Unix(int64(v.(float64)), 0) + } + return time.Time{} +} + +// Returns the status of the task. +func (t *task) Status() string { + if t == nil { + return "" + } + t.statusMutex.Lock() + defer t.statusMutex.Unlock() + if v, isSet := t.status[taskApiKeyStatus]; isSet { + return v.(string) + } + return "" +} + +// Returns the user that started the task. +func (t *task) User() UserID { + if t == nil { + return UserID{} + } + return t.user +} + +// Poll the API for task completion +func (t *task) WaitForCompletion(ctx context.Context, c *Client) error { + if t == nil { + return nil + } + var err error + var waited int + for waited < c.TaskTimeout { + err = t.getTaskStatus_Unsafe(ctx, c.session) + if err != nil && err != io.ErrUnexpectedEOF { // don't give up on ErrUnexpectedEOF + return err + } + time.Sleep(TaskStatusCheckInterval * time.Second) + if err = ctx.Err(); err != nil { + return err + } + waited += TaskStatusCheckInterval + } + return fmt.Errorf("Wait timeout for:" + t.id) +} + +// UPID:pve-test:002860A9:051E01C1:67536165:qmmove:102:root@pam: +// Requires the caller to ensure (t *task) is not nil. +func (t *task) mapToSDK_Unsafe(upID string) { + t.id = upID + indexA := strings.Index(upID[5:], ":") + 5 + t.node = upID[5:indexA] + indexB := strings.Index(upID[indexA+28:], ":") + indexA + 28 + t.operationType = upID[indexA+28 : indexB] + indexA = strings.Index(upID[indexB+1:], ":") + indexB + 1 + 1 // +1 because we are skipping a field + t.user = UserID{}.mapToStruct(upID[indexA : strings.Index(upID[indexA:], ":")+indexA]) +} + +// Requires the caller to ensure (t *task) is not nil. +func (t *task) getTaskStatus_Unsafe(ctx context.Context, session *Session) (err error) { + var data map[string]interface{} + _, err = session.GetJSON(ctx, "/nodes/"+t.node+"/tasks/"+t.id+"/status", nil, nil, &data) + if err != nil { + return + } + status := data["data"].(map[string]interface{}) + t.statusMutex.Lock() + t.status = status + t.statusMutex.Unlock() + if v, isSet := status[taskApiKeyExitStatus]; isSet { + exitStatus := v.(string) + if !(strings.HasPrefix(exitStatus, "OK") || strings.HasPrefix(exitStatus, "WARNINGS")) { + return fmt.Errorf(exitStatus) + } + } + return +} diff --git a/proxmox/session_test.go b/proxmox/session_test.go index 380c7569..25e75d85 100644 --- a/proxmox/session_test.go +++ b/proxmox/session_test.go @@ -2,6 +2,8 @@ package proxmox import ( "testing" + + "github.com/stretchr/testify/require" ) func TestParamsTo(t *testing.T) { @@ -94,3 +96,40 @@ func TestParamsToWithEmpty(t *testing.T) { }) } } + +func Test_nodeFromUpID_Unsafe(t *testing.T) { + tests := []struct { + name string + input string + output task + }{ + {name: "1", + input: "UPID:pve-test:002860A9:051E01C1:67536165:qmmove:102:root@pam:", + output: task{ + id: "UPID:pve-test:002860A9:051E01C1:67536165:qmmove:102:root@pam:", + node: "pve-test", + operationType: "qmmove", + user: UserID{ + Name: "root", + Realm: "pam"}}}, + {name: "2", + input: "UPID:pve:002860A9:051E01C1:67536165:qmshutdown:102:test-user@realm:", + output: task{ + id: "UPID:pve:002860A9:051E01C1:67536165:qmshutdown:102:test-user@realm:", + node: "pve", + operationType: "qmshutdown", + user: UserID{ + Name: "test-user", + Realm: "realm"}}}, + } + for i := range tests { + t.Run(tests[i].name, func(t *testing.T) { + tmpTask := &task{} + tmpTask.mapToSDK_Unsafe(tests[i].input) + require.Equal(t, tests[i].output.id, tmpTask.id) + require.Equal(t, tests[i].output.node, tmpTask.node) + require.Equal(t, tests[i].output.operationType, tmpTask.operationType) + require.Equal(t, tests[i].output.user, tmpTask.user) + }) + } +}