Skip to content

Commit

Permalink
chore: kube client improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
moshloop committed Aug 22, 2024
1 parent 15b0a45 commit edc1c6f
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 8 deletions.
2 changes: 1 addition & 1 deletion context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (k *Context) KubernetesRestConfig() *rest.Config {
}

func (k *Context) KubernetesDynamicClient() *dutyKubernetes.DynamicClient {
return dutyKubernetes.NewKubeClient(k.KubernetesRestConfig())
return dutyKubernetes.NewKubeClient(k.Kubernetes(), k.KubernetesRestConfig())
}

func (k *Context) WithKubeconfig(input types.EnvVar) (*Context, error) {
Expand Down
55 changes: 53 additions & 2 deletions kubernetes/dynamic.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package kubernetes

import (
"bytes"
"context"
"fmt"
"os"
"strings"
"time"
Expand All @@ -16,19 +18,22 @@ import (

"k8s.io/client-go/discovery/cached/disk"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/remotecommand"
)

// DynamicClient is an updated & stripped of kommons client
type DynamicClient struct {
client kubernetes.Interface
restMapper *restmapper.DeferredDiscoveryRESTMapper
dynamicClient *dynamic.DynamicClient
config *rest.Config
}

func NewKubeClient(config *rest.Config) *DynamicClient {
return &DynamicClient{config: config}
func NewKubeClient(client kubernetes.Interface, config *rest.Config) *DynamicClient {
return &DynamicClient{config: config, client: client}
}

func (c *DynamicClient) FetchResources(ctx context.Context, resources ...unstructured.Unstructured) ([]unstructured.Unstructured, error) {
Expand Down Expand Up @@ -138,3 +143,49 @@ func (c *DynamicClient) GetRestMapper() (meta.RESTMapper, error) {
c.restMapper = restmapper.NewDeferredDiscoveryRESTMapper(cache)
return c.restMapper, err
}

func (c *DynamicClient) ExecutePodf(ctx context.Context, namespace, pod, container string, command ...string) (string, string, error) {
const tty = false
req := c.client.CoreV1().RESTClient().Post().
Resource("pods").
Name(pod).
Namespace(namespace).
SubResource("exec").
Param("container", container).
Param("stdin", fmt.Sprintf("%t", false)).
Param("stdout", fmt.Sprintf("%t", true)).
Param("stderr", fmt.Sprintf("%t", true)).
Param("tty", fmt.Sprintf("%t", tty))

for _, c := range command {
req.Param("command", c)
}

exec, err := remotecommand.NewSPDYExecutor(c.config, "POST", req.URL())
if err != nil {
return "", "", fmt.Errorf("ExecutePodf: Failed to get SPDY Executor: %v", err)
}
var stdout, stderr bytes.Buffer
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: nil,
Stdout: &stdout,
Stderr: &stderr,
Tty: tty,
})

_stdout := safeString(&stdout)
_stderr := safeString(&stderr)

if err != nil {
return "", "", fmt.Errorf("failed to execute command: %v, stdout=%s stderr=%s", err, _stdout, _stderr)
}

return _stdout, _stderr, nil
}

func safeString(buf *bytes.Buffer) string {
if buf == nil || buf.Len() == 0 {
return ""
}
return buf.String()
}
52 changes: 47 additions & 5 deletions kubernetes/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"context"
"net/http"
"os"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"time"

"github.com/flanksource/commons/files"
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/cache"
"github.com/henvic/httpretty"
"gopkg.in/yaml.v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
Expand All @@ -19,30 +20,65 @@ import (

var Nil = fake.NewSimpleClientset()

var kubeCache = cache.NewCache[kubeCacheData]("kube-clients", time.Hour)

type kubeCacheData struct {
Client kubernetes.Interface
Config *rest.Config
}

func cacheResult(key string, k kubernetes.Interface, c *rest.Config, e error) (kubernetes.Interface, *rest.Config, error) {
if e != nil {
return nil, nil, e
}

data := kubeCacheData{
Client: k,
Config: c,
}

_ = kubeCache.Set(context.TODO(), key, data)
return k, c, e
}

func NewClient(logger logger.Logger, kubeconfigPaths ...string) (kubernetes.Interface, *rest.Config, error) {
if len(kubeconfigPaths) == 0 {
kubeconfigPaths = []string{os.Getenv("KUBECONFIG"), os.ExpandEnv("$HOME/.kube/config")}
}

for _, path := range kubeconfigPaths {
if cached, _ := kubeCache.Get(context.TODO(), path); cached.Config != nil {
return cached.Client, cached.Config, nil
}
if files.Exists(path) {
if configBytes, err := os.ReadFile(path); err != nil {
return nil, nil, err
} else {
logger.Infof("Using kubeconfig %s", path)
return NewClientWithConfig(logger, configBytes)
client, config, err := NewClientWithConfig(logger, configBytes)
return cacheResult(path, client, config, err)
}
}
}

inCluster := "in-cluster"
if cached, _ := kubeCache.Get(context.TODO(), inCluster); cached.Config != nil {
return cached.Client, cached.Config, nil
}

if config, err := rest.InClusterConfig(); err == nil {
client, err := kubernetes.NewForConfig(trace(logger, config))
return client, config, err
return cacheResult(inCluster, client, config, err)
}
return Nil, nil, nil
}

func NewClientWithConfig(logger logger.Logger, kubeConfig []byte) (kubernetes.Interface, *rest.Config, error) {

if cached, _ := kubeCache.Get(context.TODO(), string(kubeConfig)); cached.Config != nil {
return cached.Client, cached.Config, nil
}

clientConfig, err := clientcmd.NewClientConfigFromBytes(kubeConfig)
if err != nil {
return nil, nil, err
Expand All @@ -52,14 +88,15 @@ func NewClientWithConfig(logger logger.Logger, kubeConfig []byte) (kubernetes.In
return nil, nil, err
} else {
client, err := kubernetes.NewForConfig(trace(logger, config))
return client, config, err
return cacheResult(string(kubeConfig), client, config, err)
}
}

func trace(clogger logger.Logger, config *rest.Config) *rest.Config {
if clogger.IsLevelEnabled(7) {
clogger.Infof("tracing kubernetes API calls")
logger := &httpretty.Logger{

Time: true,
TLS: clogger.IsLevelEnabled(8),
RequestHeader: true,
Expand All @@ -73,10 +110,15 @@ func trace(clogger logger.Logger, config *rest.Config) *rest.Config {
config.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
return logger.RoundTripper(rt)
}
logger.SetBodyFilter(func(h http.Header) (skip bool, err error) {
return false, nil
})
}
return config
}

// ExecutePodf runs the specified shell command inside a container of the specified pod

func GetClusterName(config *rest.Config) string {
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
Expand Down

0 comments on commit edc1c6f

Please sign in to comment.