Skip to content

Commit

Permalink
feat: implement Task interface
Browse files Browse the repository at this point in the history
  • Loading branch information
Tinyblargon committed Dec 7, 2024
1 parent 976ef50 commit ca4e9bb
Show file tree
Hide file tree
Showing 2 changed files with 240 additions and 0 deletions.
201 changes: 201 additions & 0 deletions proxmox/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
"net/http"
"net/http/httputil"
"net/url"
"strings"
"sync"
"time"
)

var Debug = new(bool)
Expand Down Expand Up @@ -146,6 +149,24 @@ func ResponseJSON(resp *http.Response) (jbody map[string]interface{}, err error)
return jbody, err
}

func taskResponse(resp *http.Response) (Task, error) {
var jbody map[string]interface{}
var err error
if err = decodeResponse(resp, &jbody); err != nil {
return nil, err
}
if v, isSet := jbody["errors"]; isSet {
errJSON, _ := json.MarshalIndent(v, "", " ")
return nil, fmt.Errorf("error: %s", errJSON)
}
if v, isSet := jbody["data"]; isSet {
task := &task{}
task.mapToSDK_Unsafe(v.(string))
return task, nil
}
return nil, nil
}

// Is this needed?
func TypedResponse(resp *http.Response, v interface{}) error {
var intermediate struct {
Expand Down Expand Up @@ -421,3 +442,183 @@ func (s *Session) Put(
}
return s.Request(ctx, "PUT", url, params, headers, body)
}

type Task interface {
EndTime() time.Time
ExitStatus() string
ID() string
Node() string
OperationType() string
ProcessID() uint
StartTime() time.Time
Status() string
User() UserID
WaitForCompletion(context.Context, *Client) error
}

type task struct {
id string
node string
operationType string
status map[string]interface{}
statusMutex sync.Mutex
user UserID
}

const (
taskApiKeyEndTime = "endtime"
taskApiKeyExitStatus = "exitstatus"
taskApiKeyProcessID = "pid"
taskApiKeyStartTime = "starttime"
taskApiKeyStatus = "status"
)

// Returns the time the task ended. If the task has not ended, the zero time is returned.
func (t *task) EndTime() time.Time {
if t == nil {
return time.Time{}
}
t.statusMutex.Lock()
defer t.statusMutex.Unlock()
if v, isSet := t.status[taskApiKeyEndTime]; isSet {
return time.Unix(int64(v.(float64)), 0)
}
return time.Time{}
}

// Returns the exit status of the task. If the task has not ended, an empty string is returned.
func (t *task) ExitStatus() string {
if t == nil {
return ""
}
t.statusMutex.Lock()
defer t.statusMutex.Unlock()
if v, isSet := t.status[taskApiKeyExitStatus]; isSet {
return v.(string)
}
return ""
}

// Returns the ID of the task.
func (t *task) ID() string {
if t == nil {
return ""
}
return t.id
}

// Returns the node the task was executed on.
func (t *task) Node() string {
if t == nil {
return ""
}
return t.node
}

// Returns the operation type of the task.
func (t *task) OperationType() string {
if t == nil {
return ""
}
return t.operationType
}

// Returns the process ID of the task.
func (t *task) ProcessID() uint {
if t == nil {
return 0
}
t.statusMutex.Lock()
defer t.statusMutex.Unlock()
if v, isSet := t.status[taskApiKeyProcessID]; isSet {
return uint(v.(float64))
}
return 0
}

// Returns the time the task started.
func (t *task) StartTime() time.Time {
if t == nil {
return time.Time{}
}
t.statusMutex.Lock()
defer t.statusMutex.Unlock()
if v, isSet := t.status[taskApiKeyStartTime]; isSet {
return time.Unix(int64(v.(float64)), 0)
}
return time.Time{}
}

// Returns the status of the task.
func (t *task) Status() string {
if t == nil {
return ""
}
t.statusMutex.Lock()
defer t.statusMutex.Unlock()
if v, isSet := t.status[taskApiKeyStatus]; isSet {
return v.(string)
}
return ""
}

// Returns the user that started the task.
func (t *task) User() UserID {
if t == nil {
return UserID{}
}
return t.user
}

// Poll the API for task completion
func (t *task) WaitForCompletion(ctx context.Context, c *Client) error {
if t == nil {
return nil
}
var err error
var waited int
for waited < c.TaskTimeout {
err = t.getTaskStatus_Unsafe(ctx, c.session)
if err != nil && err != io.ErrUnexpectedEOF { // don't give up on ErrUnexpectedEOF
return err
}
time.Sleep(TaskStatusCheckInterval * time.Second)
if err = ctx.Err(); err != nil {
return err
}
waited += TaskStatusCheckInterval
}
return fmt.Errorf("Wait timeout for:" + t.id)
}

// UPID:pve-test:002860A9:051E01C1:67536165:qmmove:102:root@pam:
// Requires the caller to ensure (t *task) is not nil.
func (t *task) mapToSDK_Unsafe(upID string) {
t.id = upID
indexA := strings.Index(upID[5:], ":") + 5
t.node = upID[5:indexA]
indexB := strings.Index(upID[indexA+28:], ":") + indexA + 28
t.operationType = upID[indexA+28 : indexB]
indexA = strings.Index(upID[indexB+1:], ":") + indexB + 1 + 1 // +1 because we are skipping a field
t.user = UserID{}.mapToStruct(upID[indexA : strings.Index(upID[indexA:], ":")+indexA])
}

// Requires the caller to ensure (t *task) is not nil.
func (t *task) getTaskStatus_Unsafe(ctx context.Context, session *Session) (err error) {
var data map[string]interface{}
_, err = session.GetJSON(ctx, "/nodes/"+t.node+"/tasks/"+t.id+"/status", nil, nil, &data)
if err != nil {
return
}
status := data["data"].(map[string]interface{})
t.statusMutex.Lock()
t.status = status
t.statusMutex.Unlock()
if v, isSet := status[taskApiKeyExitStatus]; isSet {
exitStatus := v.(string)
if !(strings.HasPrefix(exitStatus, "OK") || strings.HasPrefix(exitStatus, "WARNINGS")) {
return fmt.Errorf(exitStatus)
}
}
return
}
39 changes: 39 additions & 0 deletions proxmox/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package proxmox

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestParamsTo(t *testing.T) {
Expand Down Expand Up @@ -94,3 +96,40 @@ func TestParamsToWithEmpty(t *testing.T) {
})
}
}

func Test_nodeFromUpID_Unsafe(t *testing.T) {
tests := []struct {
name string
input string
output task
}{
{name: "1",
input: "UPID:pve-test:002860A9:051E01C1:67536165:qmmove:102:root@pam:",
output: task{
id: "UPID:pve-test:002860A9:051E01C1:67536165:qmmove:102:root@pam:",
node: "pve-test",
operationType: "qmmove",
user: UserID{
Name: "root",
Realm: "pam"}}},
{name: "2",
input: "UPID:pve:002860A9:051E01C1:67536165:qmshutdown:102:test-user@realm:",
output: task{
id: "UPID:pve:002860A9:051E01C1:67536165:qmshutdown:102:test-user@realm:",
node: "pve",
operationType: "qmshutdown",
user: UserID{
Name: "test-user",
Realm: "realm"}}},
}
for i := range tests {
t.Run(tests[i].name, func(t *testing.T) {
tmpTask := &task{}
tmpTask.mapToSDK_Unsafe(tests[i].input)
require.Equal(t, tests[i].output.id, tmpTask.id)
require.Equal(t, tests[i].output.node, tmpTask.node)
require.Equal(t, tests[i].output.operationType, tmpTask.operationType)
require.Equal(t, tests[i].output.user, tmpTask.user)
})
}
}

0 comments on commit ca4e9bb

Please sign in to comment.