Skip to content

Commit

Permalink
Merge pull request #25 from kubescape/clients
Browse files Browse the repository at this point in the history
fix all integration tests findings
  • Loading branch information
matthyx authored Dec 21, 2023
2 parents 1ccbca3 + 611fce2 commit 7584c90
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 94 deletions.
10 changes: 5 additions & 5 deletions adapters/backend/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (c *Client) sendDeleteObjectMessage(ctx context.Context, id domain.KindName
Name: id.Name,
Namespace: id.Namespace,
}
logger.L().Debug("Sending delete object message to producer",
logger.L().Debug("sending delete object message to producer",
helpers.String("account", msg.Account),
helpers.String("cluster", msg.Cluster),
helpers.String("kind", id.Kind.String()),
Expand Down Expand Up @@ -153,7 +153,7 @@ func (c *Client) sendGetObjectMessage(ctx context.Context, id domain.KindName, b
Name: id.Name,
Namespace: id.Namespace,
}
logger.L().Debug("Sending get object message to producer",
logger.L().Debug("sending get object message to producer",
helpers.String("account", msg.Account),
helpers.String("cluster", msg.Cluster),
helpers.String("kind", id.Kind.String()),
Expand Down Expand Up @@ -185,7 +185,7 @@ func (c *Client) sendPatchObjectMessage(ctx context.Context, id domain.KindName,
Namespace: id.Namespace,
Patch: patch,
}
logger.L().Debug("Sending patch object message to producer",
logger.L().Debug("sending patch object message to producer",
helpers.String("account", msg.Account),
helpers.String("cluster", msg.Cluster),
helpers.String("kind", id.Kind.String()),
Expand Down Expand Up @@ -217,7 +217,7 @@ func (c *Client) sendPutObjectMessage(ctx context.Context, id domain.KindName, o
Namespace: id.Namespace,
Object: object,
}
logger.L().Debug("Sending put object message to producer",
logger.L().Debug("sending put object message to producer",
helpers.String("account", msg.Account),
helpers.String("cluster", msg.Cluster),
helpers.String("kind", id.Kind.String()),
Expand Down Expand Up @@ -248,7 +248,7 @@ func (c *Client) sendVerifyObjectMessage(ctx context.Context, id domain.KindName
Name: id.Name,
Namespace: id.Namespace,
}
logger.L().Debug("Sending verify object message to producer",
logger.L().Debug("sending verify object message to producer",
helpers.String("account", msg.Account),
helpers.String("cluster", msg.Cluster),
helpers.String("kind", id.Kind.String()),
Expand Down
72 changes: 39 additions & 33 deletions adapters/incluster/v1/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,45 +27,61 @@ func NewInClusterAdapter(cfg config.InCluster, k8sclient dynamic.Interface) *Ada

var _ adapters.Adapter = (*Adapter)(nil)

func (a *Adapter) DeleteObject(ctx context.Context, id domain.KindName) error {
func (a *Adapter) GetClient(id domain.KindName) (adapters.Client, error) {
if id.Kind == nil {
return fmt.Errorf("invalid resource kind. resource name: %s", id.Name)
return nil, fmt.Errorf("invalid resource kind. resource name: %s", id.Name)
}
if client, ok := a.clients[id.Kind.String()]; ok {
return client.DeleteObject(ctx, id)
client, ok := a.clients[id.Kind.String()]
if !ok {
client = NewClient(a.k8sclient, a.cfg.Account, a.cfg.ClusterName, config.Resource{
Group: id.Kind.Group,
Version: id.Kind.Version,
Resource: id.Kind.Resource,
Strategy: "copy",
})
a.clients[id.Kind.String()] = client
}
return fmt.Errorf("unknown resource %s", id.Kind.String())
return client, nil
}

func (a *Adapter) GetObject(ctx context.Context, id domain.KindName, baseObject []byte) error {
if id.Kind == nil {
return fmt.Errorf("invalid resource kind. resource name: %s", id.Name)
}
if client, ok := a.clients[id.Kind.String()]; ok {
return client.GetObject(ctx, id, baseObject)
func (a *Adapter) DeleteObject(ctx context.Context, id domain.KindName) error {
client, err := a.GetClient(id)
if err != nil {
return fmt.Errorf("failed to get client for resource %s: %w", id.Kind, err)
}
return fmt.Errorf("unknown resource %s", id.Kind.String())
return client.DeleteObject(ctx, id)
}

func (a *Adapter) PatchObject(ctx context.Context, id domain.KindName, checksum string, patch []byte) error {
if id.Kind == nil {
return fmt.Errorf("invalid resource kind. resource name: %s", id.Name)
func (a *Adapter) GetObject(ctx context.Context, id domain.KindName, baseObject []byte) error {
client, err := a.GetClient(id)
if err != nil {
return fmt.Errorf("failed to get client for resource %s: %w", id.Kind, err)
}
return client.GetObject(ctx, id, baseObject)
}

if client, ok := a.clients[id.Kind.String()]; ok {
return client.PatchObject(ctx, id, checksum, patch)
func (a *Adapter) PatchObject(ctx context.Context, id domain.KindName, checksum string, patch []byte) error {
client, err := a.GetClient(id)
if err != nil {
return fmt.Errorf("failed to get client for resource %s: %w", id.Kind, err)
}
return fmt.Errorf("unknown resource %s", id.Kind.String())
return client.PatchObject(ctx, id, checksum, patch)
}

func (a *Adapter) PutObject(ctx context.Context, id domain.KindName, object []byte) error {
if id.Kind == nil {
return fmt.Errorf("invalid resource kind. resource name: %s", id.Name)
client, err := a.GetClient(id)
if err != nil {
return fmt.Errorf("failed to get client for resource %s: %w", id.Kind, err)
}
if client, ok := a.clients[id.Kind.String()]; ok {
return client.PutObject(ctx, id, object)
return client.PutObject(ctx, id, object)
}

func (a *Adapter) VerifyObject(ctx context.Context, id domain.KindName, checksum string) error {
client, err := a.GetClient(id)
if err != nil {
return fmt.Errorf("failed to get client for resource %s: %w", id.Kind, err)
}
return fmt.Errorf("unknown resource %s", id.Kind.String())
return client.VerifyObject(ctx, id, checksum)
}

func (a *Adapter) RegisterCallbacks(_ context.Context, callbacks domain.Callbacks) {
Expand All @@ -87,13 +103,3 @@ func (a *Adapter) Start(ctx context.Context) error {
}
return nil
}

func (a *Adapter) VerifyObject(ctx context.Context, id domain.KindName, checksum string) error {
if id.Kind == nil {
return fmt.Errorf("invalid resource kind. resource name: %s", id.Name)
}
if client, ok := a.clients[id.Kind.String()]; ok {
return client.VerifyObject(ctx, id, checksum)
}
return fmt.Errorf("unknown resource %s", id.Kind.String())
}
21 changes: 11 additions & 10 deletions adapters/incluster/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Client struct {
kind *domain.Kind
callbacks domain.Callbacks
res schema.GroupVersionResource
shadowObjects map[string][]byte
ShadowObjects map[string][]byte
Strategy domain.Strategy
}

Expand All @@ -42,7 +42,7 @@ func NewClient(client dynamic.Interface, account, cluster string, r config.Resou
Resource: res.Resource,
},
res: res,
shadowObjects: map[string][]byte{},
ShadowObjects: map[string][]byte{},
Strategy: r.Strategy,
}
}
Expand Down Expand Up @@ -142,7 +142,7 @@ func (c *Client) Start(ctx context.Context) error {
}
if c.Strategy == domain.PatchStrategy {
// remove from known resources
delete(c.shadowObjects, id.Name)
delete(c.ShadowObjects, id.Name)
}
case event.Type == watch.Modified:
logger.L().Debug("modified resource", helpers.String("id", id.String()))
Expand Down Expand Up @@ -185,9 +185,9 @@ func (c *Client) callPutOrPatch(ctx context.Context, id domain.KindName, baseObj
if c.Strategy == domain.PatchStrategy {
if len(baseObject) > 0 {
// update reference object
c.shadowObjects[id.Name] = baseObject
c.ShadowObjects[id.Name] = baseObject
}
if oldObject, ok := c.shadowObjects[id.Name]; ok {
if oldObject, ok := c.ShadowObjects[id.Name]; ok {
// calculate checksum
checksum, err := utils.CanonicalHash(newObject)
if err != nil {
Expand All @@ -209,7 +209,7 @@ func (c *Client) callPutOrPatch(ctx context.Context, id domain.KindName, baseObj
}
}
// add/update known resources
c.shadowObjects[id.Name] = newObject
c.ShadowObjects[id.Name] = newObject
} else {
err := c.callbacks.PutObject(ctx, id, newObject)
if err != nil {
Expand All @@ -235,7 +235,7 @@ func (c *Client) callVerifyObject(ctx context.Context, id domain.KindName, objec
func (c *Client) DeleteObject(_ context.Context, id domain.KindName) error {
if c.Strategy == domain.PatchStrategy {
// remove from known resources
delete(c.shadowObjects, id.String())
delete(c.ShadowObjects, id.String())
}
return c.client.Resource(c.res).Namespace(id.Namespace).Delete(context.Background(), id.Name, metav1.DeleteOptions{})
}
Expand Down Expand Up @@ -287,7 +287,7 @@ func (c *Client) patchObject(ctx context.Context, id domain.KindName, checksum s
return object, fmt.Errorf("checksum mismatch: %s != %s", newChecksum, checksum)
}
// update known resources
c.shadowObjects[id.Name] = modified
c.ShadowObjects[id.Name] = modified
// save object
return object, c.PutObject(ctx, id, modified)
}
Expand All @@ -298,9 +298,10 @@ func (c *Client) PutObject(_ context.Context, id domain.KindName, object []byte)
if err != nil {
return fmt.Errorf("unmarshal object: %w", err)
}
_, err = c.client.Resource(c.res).Namespace(id.Namespace).Create(context.Background(), &obj, metav1.CreateOptions{})
// use apply to create or update object, we want to overwrite existing objects
_, err = c.client.Resource(c.res).Namespace(id.Namespace).Apply(context.Background(), id.Name, &obj, metav1.ApplyOptions{FieldManager: "application/apply-patch"})
if err != nil {
return fmt.Errorf("create resource: %w", err)
return fmt.Errorf("apply resource: %w", err)
}
return nil
}
Expand Down
39 changes: 19 additions & 20 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"net"
"net/url"
"os"
"time"
Expand All @@ -11,7 +12,6 @@ import (
backendUtils "github.com/kubescape/backend/pkg/utils"
"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
"github.com/kubescape/synchronizer/adapters"
"github.com/kubescape/synchronizer/adapters/incluster/v1"
"github.com/kubescape/synchronizer/config"
"github.com/kubescape/synchronizer/core"
Expand Down Expand Up @@ -67,6 +67,11 @@ func main() {
defer logger.ShutdownOtel(ctx)
}

ctx = context.WithValue(ctx, domain.ContextKeyClientIdentifier, domain.ClientIdentifier{
Account: cfg.InCluster.Account,
Cluster: cfg.InCluster.ClusterName,
})

// k8s client
k8sclient, err := utils.NewClient()
if err != nil {
Expand All @@ -87,36 +92,30 @@ func main() {
// start liveness probe
utils.StartLivenessProbe()

// websocket client
newConn := func() (net.Conn, error) {
conn, _, _, err := dialer.Dial(ctx, cfg.InCluster.ServerUrl)
if err != nil {
return nil, fmt.Errorf("unable to create websocket connection: %w", err)
}
return conn, nil
}

var conn net.Conn
for {
if err := start(ctx, cfg.InCluster, adapter, dialer); err != nil {
if conn, err = newConn(); err != nil {
d := 5 * time.Second // TODO: use exponential backoff for retries
logger.L().Ctx(ctx).Error("connection error", helpers.Error(err), helpers.String("retry in", d.String()))
time.Sleep(d)
} else {
break
}
}
logger.L().Info("exiting")
}

func start(ctx context.Context, cfg config.InCluster, adapter adapters.Adapter, dialer ws.Dialer) error {
// websocket client
conn, _, _, err := dialer.Dial(ctx, cfg.ServerUrl)
if err != nil {
return fmt.Errorf("unable to create websocket connection: %w", err)
}
defer conn.Close()

ctx = context.WithValue(ctx, domain.ContextKeyClientIdentifier, domain.ClientIdentifier{
Account: cfg.Account,
Cluster: cfg.ClusterName,
})

// synchronizer
synchronizer := core.NewSynchronizerClient(ctx, adapter, conn)
synchronizer := core.NewSynchronizerClient(ctx, adapter, conn, newConn)
err = synchronizer.Start(ctx)
if err != nil {
return fmt.Errorf("error during sync: %w", err)
logger.L().Fatal("error during sync, exiting", helpers.Error(err))
}
return nil
}
2 changes: 1 addition & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func main() {
synchronizer := core.NewSynchronizerServer(r.Context(), adapter, conn)
err = synchronizer.Start(r.Context())
if err != nil {
logger.L().Error("error during sync", helpers.Error(err))
logger.L().Error("error during sync, closing listener", helpers.Error(err))
return
}
}()
Expand Down
Loading

0 comments on commit 7584c90

Please sign in to comment.