Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Jeff Ortel <[email protected]>
  • Loading branch information
jortel committed Feb 19, 2024
1 parent e1fa5f6 commit bdde133
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 90 deletions.
52 changes: 26 additions & 26 deletions binding/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,17 +662,17 @@ func (r *Client) send(rb func() (*http.Request, error)) (response *http.Response
response.StatusCode,
request.Method,
request.URL.Path))
if response.StatusCode == http.StatusUnauthorized {
refreshed, nErr := r.refreshToken(request)
if nErr != nil {
r.Error = liberr.Wrap(nErr)
err = r.Error
return
}
if refreshed {
continue
}
}
if response.StatusCode == http.StatusUnauthorized {
refreshed, nErr := r.refreshToken(request)
if nErr != nil {
r.Error = liberr.Wrap(nErr)
err = r.Error
return
}
if refreshed {
continue
}
}
break
}
}
Expand Down Expand Up @@ -779,19 +779,19 @@ func (f *Field) disposition() (d string) {

// refreshToken refreshes the token.
func (r *Client) refreshToken(request *http.Request) (refreshed bool, err error) {
if r.token.Token == "" ||
strings.HasSuffix(request.URL.Path, api.AuthRefreshRoot) {
return
}
login := &api.Login{Refresh: r.token.Refresh}
err = r.Post(api.AuthRefreshRoot, login)
if err == nil {
r.token.Token = login.Token
refreshed = true
return
}
if errors.Is(err, &RestError{}) {
err = nil
}
return
if r.token.Token == "" ||
strings.HasSuffix(request.URL.Path, api.AuthRefreshRoot) {
return
}
login := &api.Login{Refresh: r.token.Refresh}
err = r.Post(api.AuthRefreshRoot, login)
if err == nil {
r.token.Token = login.Token
refreshed = true
return
}
if errors.Is(err, &RestError{}) {
err = nil
}
return
}
9 changes: 9 additions & 0 deletions settings/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
EnvRwxSupported = "RWX_SUPPORTED"
EnvCachePath = "CACHE_PATH"
EnvCachePvc = "CACHE_PVC"
EnvSharedPath = "SHARED_PATH"
EnvPassphrase = "ENCRYPTION_PASSPHRASE"
EnvTaskReapCreated = "TASK_REAP_CREATED"
EnvTaskReapSucceeded = "TASK_REAP_SUCCEEDED"
Expand Down Expand Up @@ -52,6 +53,10 @@ type Hub struct {
Path string
PVC string
}
// Shared mount settings.
Shared struct {
Path string
}
// Encryption settings.
Encryption struct {
Passphrase string
Expand Down Expand Up @@ -115,6 +120,10 @@ func (r *Hub) Load() (err error) {
if !found {
r.Cache.Path = "/cache"
}
r.Shared.Path, found = os.LookupEnv(EnvSharedPath)
if !found {
r.Shared.Path = "/shared"
}
r.Encryption.Passphrase, found = os.LookupEnv(EnvPassphrase)
if !found {
r.Encryption.Passphrase = "tackle"
Expand Down
197 changes: 133 additions & 64 deletions task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ const (
Unit = time.Second
)

const (
Shared = "shared"
Cache = "cache"
)

var (
Settings = &settings.Settings
Log = logr.WithName("task-scheduler")
Expand Down Expand Up @@ -335,35 +340,15 @@ func (r *Task) Reflect(client k8s.Client) (err error) {
}
return
}
mark := time.Now()
status := pod.Status
switch status.Phase {
switch pod.Status.Phase {
case core.PodPending:
r.podPending(pod)
case core.PodRunning:
r.State = Running
r.podRunning(pod, client)
case core.PodSucceeded:
r.State = Succeeded
r.Terminated = &mark
r.podSucceeded(pod)
case core.PodFailed:
r.Error(
"Error",
"Pod failed: %s",
pod.Status.ContainerStatuses[0].State.Terminated.Reason)
switch pod.Status.ContainerStatuses[0].State.Terminated.ExitCode {
case 137: // Killed.
if r.Retries < Settings.Hub.Task.Retries {
_ = client.Delete(context.TODO(), pod)
r.Pod = ""
r.State = Ready
r.Errors = nil
r.Retries++
} else {
r.State = Failed
r.Terminated = &mark
}
default:
r.State = Failed
r.Terminated = &mark
}
r.podFailed(pod, client)
}

return
Expand Down Expand Up @@ -398,6 +383,86 @@ func (r *Task) Delete(client k8s.Client) (err error) {
return
}

// podPending handles pod pending.
func (r *Task) podPending(pod *core.Pod) {
for _, status := range pod.Status.InitContainerStatuses {
if status.Started == nil {
continue
}
if *status.Started {
r.State = Running
return
}
}
}

// podRunning handles pod running.
func (r *Task) podRunning(pod *core.Pod, client k8s.Client) {
var statuses []core.ContainerStatus
statuses = append(
statuses,
pod.Status.InitContainerStatuses...)
statuses = append(
statuses,
pod.Status.ContainerStatuses...)
for _, status := range statuses {
if status.State.Terminated == nil {
continue
}
switch status.State.Terminated.ExitCode {
case 0: // Succeeded.
default: // failed.
r.podFailed(pod, client)
}
}
}

// podFailed handles pod succeeded.
func (r *Task) podSucceeded(pod *core.Pod) {
mark := time.Now()
r.State = Succeeded
r.Terminated = &mark
}

// podFailed handles pod failed.
func (r *Task) podFailed(pod *core.Pod, client k8s.Client) {
mark := time.Now()
var statuses []core.ContainerStatus
statuses = append(
statuses,
pod.Status.InitContainerStatuses...)
statuses = append(
statuses,
pod.Status.ContainerStatuses...)
for _, status := range statuses {
if status.State.Terminated == nil {
continue
}
switch status.State.Terminated.ExitCode {
case 0: // Succeeded.
case 137: // Killed.
if r.Retries < Settings.Hub.Task.Retries {
_ = client.Delete(context.TODO(), pod)
r.Pod = ""
r.State = Ready
r.Errors = nil
r.Retries++
return
}
fallthrough
default: // Error.
r.State = Failed
r.Terminated = &mark
r.Error(
"Error",
"Container (%s) failed: %s",
status.Name,
status.State.Terminated.Reason)
return
}
}
}

// Cancel the task.
func (r *Task) Cancel(client k8s.Client) (err error) {
err = r.Delete(client)
Expand Down Expand Up @@ -478,8 +543,14 @@ func (r *Task) pod(addon *crd.Addon, owner *crd.Tackle, secret *core.Secret) (po

// specification builds a Pod specification.
func (r *Task) specification(addon *crd.Addon, secret *core.Secret) (specification core.PodSpec) {
shared := core.Volume{
Name: Shared,
VolumeSource: core.VolumeSource{
EmptyDir: &core.EmptyDirVolumeSource{},
},
}
cache := core.Volume{
Name: "cache",
Name: Cache,
}
if Settings.Cache.RWX {
cache.VolumeSource = core.VolumeSource{
Expand All @@ -499,6 +570,7 @@ func (r *Task) specification(addon *crd.Addon, secret *core.Secret) (specificati
r.container(addon, secret),
},
Volumes: []core.Volume{
shared,
cache,
},
}
Expand All @@ -509,47 +581,44 @@ func (r *Task) specification(addon *crd.Addon, secret *core.Secret) (specificati
// container builds the pod container.
func (r *Task) container(addon *crd.Addon, secret *core.Secret) (container core.Container) {
userid := int64(0)
policy := core.PullIfNotPresent
if addon.Spec.ImagePullPolicy != "" {
policy = addon.Spec.ImagePullPolicy
}
container = core.Container{
Name: "main",
Image: r.Image,
ImagePullPolicy: policy,
Resources: addon.Spec.Resources,
Env: []core.EnvVar{
{
Name: settings.EnvHubBaseURL,
Value: Settings.Addon.Hub.URL,
},
{
Name: settings.EnvTask,
Value: strconv.Itoa(int(r.Task.ID)),
},
{
Name: settings.EnvHubToken,
ValueFrom: &core.EnvVarSource{
SecretKeyRef: &core.SecretKeySelector{
Key: settings.EnvHubToken,
LocalObjectReference: core.LocalObjectReference{
Name: secret.Name,
},
},
},
token := &core.EnvVarSource{
SecretKeyRef: &core.SecretKeySelector{
Key: settings.EnvHubToken,
LocalObjectReference: core.LocalObjectReference{
Name: secret.Name,
},
},
VolumeMounts: []core.VolumeMount{
{
Name: "cache",
MountPath: Settings.Cache.Path,
},
}
if container.ImagePullPolicy == "" {
container.ImagePullPolicy = core.PullAlways
}
container.SecurityContext = &core.SecurityContext{
RunAsUser: &userid,
}
container.VolumeMounts = append(
container.VolumeMounts,
core.VolumeMount{
Name: Shared,
MountPath: Settings.Shared.Path,
},
core.VolumeMount{
Name: Cache,
MountPath: Settings.Cache.Path,
})
container.Env = append(
container.Env,
core.EnvVar{
Name: settings.EnvHubBaseURL,
Value: Settings.Addon.Hub.URL,
},
SecurityContext: &core.SecurityContext{
RunAsUser: &userid,
core.EnvVar{
Name: settings.EnvTask,
Value: strconv.Itoa(int(r.Task.ID)),
},
}

core.EnvVar{
Name: settings.EnvHubToken,
ValueFrom: token,
})
return
}

Expand Down

0 comments on commit bdde133

Please sign in to comment.