Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dynamic ingress hostname and NodePort env vars #259

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 115 additions & 12 deletions cluster/kube/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ package kube

import (
"context"

"fmt"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"

metricsutils "github.com/akash-network/node/util/metrics"

"github.com/akash-network/provider/cluster/kube/builder"
crdapi "github.com/akash-network/provider/pkg/client/clientset/versioned"
"k8s.io/client-go/util/retry"
)

func applyNS(ctx context.Context, kc kubernetes.Interface, b builder.NS) error {
Expand Down Expand Up @@ -115,18 +116,50 @@ func applyDeployment(ctx context.Context, kc kubernetes.Interface, b builder.Dep

switch {
case err == nil:
obj, err = b.Update(obj)
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
obj, err = kc.AppsV1().Deployments(b.NS()).Get(ctx, b.Name(), metav1.GetOptions{})
if err != nil {
return err
}

if err == nil {
_, err = kc.AppsV1().Deployments(b.NS()).Update(ctx, obj, metav1.UpdateOptions{})
metricsutils.IncCounterVecWithLabelValues(kubeCallsCounter, "deployments-update", err)
obj, err = b.Update(obj)
if err != nil {
return err
}

result, err := kc.AppsV1().Deployments(b.NS()).Update(ctx, obj, metav1.UpdateOptions{})
if err != nil {
return err
}

getFunc := func(ctx context.Context, name, ns string, opts metav1.GetOptions) (interface{}, error) {
return kc.AppsV1().Deployments(ns).Get(ctx, name, opts)
}
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
return kc.AppsV1().Deployments(b.NS()).Watch(ctx, opts)
}
return waitForProcessedVersion(ctx, getFunc, watchFunc, b.NS(), b.Name(), result.ResourceVersion)
})
if retryErr != nil {
return retryErr
}

case errors.IsNotFound(err):
obj, err = b.Create()
if err == nil {
_, err = kc.AppsV1().Deployments(b.NS()).Create(ctx, obj, metav1.CreateOptions{})
result, err := kc.AppsV1().Deployments(b.NS()).Create(ctx, obj, metav1.CreateOptions{})
metricsutils.IncCounterVecWithLabelValues(kubeCallsCounter, "deployments-create", err)
if err != nil {
return err
}

getFunc := func(ctx context.Context, name, ns string, opts metav1.GetOptions) (interface{}, error) {
return kc.AppsV1().Deployments(ns).Get(ctx, name, opts)
}
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
return kc.AppsV1().Deployments(b.NS()).Watch(ctx, opts)
}
return waitForProcessedVersion(ctx, getFunc, watchFunc, b.NS(), b.Name(), result.ResourceVersion)
}
}
return err
Expand All @@ -138,18 +171,50 @@ func applyStatefulSet(ctx context.Context, kc kubernetes.Interface, b builder.St

switch {
case err == nil:
obj, err = b.Update(obj)
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
obj, err = kc.AppsV1().StatefulSets(b.NS()).Get(ctx, b.Name(), metav1.GetOptions{})
if err != nil {
return err
}

if err == nil {
_, err = kc.AppsV1().StatefulSets(b.NS()).Update(ctx, obj, metav1.UpdateOptions{})
metricsutils.IncCounterVecWithLabelValues(kubeCallsCounter, "statefulset-update", err)
obj, err = b.Update(obj)
if err != nil {
return err
}

result, err := kc.AppsV1().StatefulSets(b.NS()).Update(ctx, obj, metav1.UpdateOptions{})
if err != nil {
return err
}

getFunc := func(ctx context.Context, name, ns string, opts metav1.GetOptions) (interface{}, error) {
return kc.AppsV1().StatefulSets(ns).Get(ctx, name, opts)
}
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
return kc.AppsV1().StatefulSets(b.NS()).Watch(ctx, opts)
}
return waitForProcessedVersion(ctx, getFunc, watchFunc, b.NS(), b.Name(), result.ResourceVersion)
})
if retryErr != nil {
return retryErr
}

case errors.IsNotFound(err):
obj, err = b.Create()
if err == nil {
_, err = kc.AppsV1().StatefulSets(b.NS()).Create(ctx, obj, metav1.CreateOptions{})
result, err := kc.AppsV1().StatefulSets(b.NS()).Create(ctx, obj, metav1.CreateOptions{})
metricsutils.IncCounterVecWithLabelValues(kubeCallsCounter, "statefulset-create", err)
if err != nil {
return err
}

getFunc := func(ctx context.Context, name, ns string, opts metav1.GetOptions) (interface{}, error) {
return kc.AppsV1().StatefulSets(ns).Get(ctx, name, opts)
}
watchFunc := func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
return kc.AppsV1().StatefulSets(b.NS()).Watch(ctx, opts)
}
return waitForProcessedVersion(ctx, getFunc, watchFunc, b.NS(), b.Name(), result.ResourceVersion)
}
}
return err
Expand Down Expand Up @@ -199,3 +264,41 @@ func applyManifest(ctx context.Context, kc crdapi.Interface, b builder.Manifest)

return err
}

func waitForProcessedVersion(ctx context.Context,
getFunc func(ctx context.Context, name, ns string, opts metav1.GetOptions) (interface{}, error),
watchFunc func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error),
ns, name string, resourceVersion string) error {

// First try to get the object with the new resource version
_, err := getFunc(ctx, name, ns, metav1.GetOptions{
ResourceVersion: resourceVersion,
})
if err == nil {
return nil
}
if !errors.IsNotFound(err) && !errors.IsConflict(err) {
return err
}

// If we couldn't get it, then watch for changes
watcher, err := watchFunc(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", name),
ResourceVersion: resourceVersion,
})
if err != nil {
return err
}
defer watcher.Stop()

for {
select {
case event := <-watcher.ResultChan():
if event.Type == watch.Modified {
return nil
}
case <-ctx.Done():
return ctx.Err()
}
}
}
3 changes: 3 additions & 0 deletions cluster/kube/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ const (
envVarAkashOwner = "AKASH_OWNER"
envVarAkashProvider = "AKASH_PROVIDER"
envVarAkashClusterPublicHostname = "AKASH_CLUSTER_PUBLIC_HOSTNAME"
envVarAkashIngressHostname = "AKASH_INGRESS_HOST"
envVarAkashIngressCustomHostname = "AKASH_INGRESS_CUSTOM_HOST"
envVarAkashExternalPort = "AKASH_EXTERNAL_PORT"
)

var (
Expand Down
5 changes: 4 additions & 1 deletion cluster/kube/builder/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ type deployment struct {

var _ Deployment = (*deployment)(nil)

func NewDeployment(workload Workload) Deployment {
func NewDeployment(workload Workload, svc Service) Deployment {
ss := &deployment{
Workload: workload,
}

ss.Workload.log = ss.Workload.log.With("object", "deployment", "service-name", ss.deployment.ManifestGroup().Services[ss.serviceIdx].Name)
if svc != nil {
ss.setService(svc.(*service))
}

return ss
}
Expand Down
22 changes: 21 additions & 1 deletion cluster/kube/builder/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package builder

import (
"fmt"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -21,6 +20,7 @@ type Service interface {
type service struct {
Workload
requireNodePort bool
portMap map[int32]int32
}

var _ Service = (*service)(nil)
Expand All @@ -29,6 +29,7 @@ func BuildService(workload Workload, requireNodePort bool) Service {
ss := &service{
Workload: workload,
requireNodePort: requireNodePort,
portMap: make(map[int32]int32),
}

ss.Workload.log = ss.Workload.log.With("object", "service", "service-name", ss.deployment.ManifestGroup().Services[ss.serviceIdx].Name)
Expand Down Expand Up @@ -68,6 +69,8 @@ func (b *service) Create() (*corev1.Service, error) { // nolint:golint,unparam
},
}

b.updatePortMap(ports)

return svc, nil
}

Expand Down Expand Up @@ -101,9 +104,26 @@ func (b *service) Update(obj *corev1.Service) (*corev1.Service, error) { // noli
}

obj.Spec.Ports = ports

b.updatePortMap(ports)

return obj, nil
}

func (b *service) updatePortMap(ports []corev1.ServicePort) {
b.log.Debug("provider/cluster/kube/builder: updating port map", "requireNodePort", b.requireNodePort, "service", b.Name)
if !b.requireNodePort {
return
}

b.portMap = make(map[int32]int32)
for _, port := range ports {
if port.NodePort > 0 {
b.portMap[port.TargetPort.IntVal] = port.NodePort
}
}
}

func (b *service) Any() bool {
service := &b.deployment.ManifestGroup().Services[b.serviceIdx]

Expand Down
5 changes: 4 additions & 1 deletion cluster/kube/builder/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ type statefulSet struct {

var _ StatefulSet = (*statefulSet)(nil)

func BuildStatefulSet(workload Workload) StatefulSet {
func BuildStatefulSet(workload Workload, svc Service) StatefulSet {
ss := &statefulSet{
Workload: workload,
}

ss.Workload.log = ss.Workload.log.With("object", "statefulset", "service-name", ss.deployment.ManifestGroup().Services[ss.serviceIdx].Name)
if svc != nil {
ss.setService(svc.(*service))
}

return ss
}
Expand Down
34 changes: 34 additions & 0 deletions cluster/kube/builder/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/akash-network/node/sdl"
sdlutil "github.com/akash-network/node/sdl/util"

pmanifest "github.com/akash-network/provider/manifest"
crd "github.com/akash-network/provider/pkg/apis/akash.network/v2beta2"
)

Expand All @@ -32,6 +33,7 @@ type workloadBase interface {
type Workload struct {
builder
serviceIdx int
service *service
}

var _ workloadBase = (*Workload)(nil)
Expand Down Expand Up @@ -382,6 +384,10 @@ func (b *Workload) imagePullSecrets() []corev1.LocalObjectReference {
return []corev1.LocalObjectReference{{Name: sname}}
}

func (b *Workload) setService(service *service) {
b.service = service
}

func (b *Workload) addEnvVarsForDeployment(envVarsAlreadyAdded map[string]int, env []corev1.EnvVar) []corev1.EnvVar {
lid := b.deployment.LeaseID()

Expand All @@ -393,5 +399,33 @@ func (b *Workload) addEnvVarsForDeployment(envVarsAlreadyAdded map[string]int, e
env = addIfNotPresent(envVarsAlreadyAdded, env, envVarAkashProvider, lid.Provider)
env = addIfNotPresent(envVarsAlreadyAdded, env, envVarAkashClusterPublicHostname, b.settings.ClusterPublicHostname)

ingressHost := pmanifest.IngressHost(lid, b.Name())
env = addIfNotPresent(envVarsAlreadyAdded, env, envVarAkashIngressHostname, fmt.Sprintf("%s.%s", ingressHost, b.settings.DeploymentIngressDomain))

svc := &b.deployment.ManifestGroup().Services[b.serviceIdx]

// Add hostnames from service expose configurations
for _, expose := range svc.Expose {
if expose.IsIngress() {
// Add custom hostnames if specified
for idx, hostname := range expose.Hosts {
env = addIfNotPresent(envVarsAlreadyAdded, env,
fmt.Sprintf("%s_%d_%d", envVarAkashIngressCustomHostname, expose.Port, idx),
hostname)
}
}

if expose.Global {
// Add external port mappings
if svc := b.service; svc != nil {
if nodePort, exists := svc.portMap[int32(expose.Port)]; exists {
env = addIfNotPresent(envVarsAlreadyAdded, env,
fmt.Sprintf("%s_%d", envVarAkashExternalPort, expose.Port),
fmt.Sprintf("%d", nodePort))
}
}
}
}

return env
}
Loading