From 09dde3743fb4b5139876075e78d66d8bde95e528 Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Fri, 15 Dec 2023 12:46:41 +0100 Subject: [PATCH 1/3] create missing client on-demand to create resources from backend Signed-off-by: Matthias Bertschy --- adapters/backend/v1/client.go | 10 ++--- adapters/incluster/v1/adapter.go | 72 +++++++++++++++++--------------- adapters/incluster/v1/client.go | 5 ++- 3 files changed, 47 insertions(+), 40 deletions(-) diff --git a/adapters/backend/v1/client.go b/adapters/backend/v1/client.go index d238eca..7eee18f 100644 --- a/adapters/backend/v1/client.go +++ b/adapters/backend/v1/client.go @@ -123,7 +123,7 @@ func (c *Client) sendDeleteObjectMessage(ctx context.Context, id domain.KindName Name: id.Name, Namespace: id.Namespace, } - logger.L().Debug("Sending delete object message to producer", + logger.L().Debug("sending delete object message to producer", helpers.String("account", msg.Account), helpers.String("cluster", msg.Cluster), helpers.String("kind", id.Kind.String()), @@ -153,7 +153,7 @@ func (c *Client) sendGetObjectMessage(ctx context.Context, id domain.KindName, b Name: id.Name, Namespace: id.Namespace, } - logger.L().Debug("Sending get object message to producer", + logger.L().Debug("sending get object message to producer", helpers.String("account", msg.Account), helpers.String("cluster", msg.Cluster), helpers.String("kind", id.Kind.String()), @@ -185,7 +185,7 @@ func (c *Client) sendPatchObjectMessage(ctx context.Context, id domain.KindName, Namespace: id.Namespace, Patch: patch, } - logger.L().Debug("Sending patch object message to producer", + logger.L().Debug("sending patch object message to producer", helpers.String("account", msg.Account), helpers.String("cluster", msg.Cluster), helpers.String("kind", id.Kind.String()), @@ -217,7 +217,7 @@ func (c *Client) sendPutObjectMessage(ctx context.Context, id domain.KindName, o Namespace: id.Namespace, Object: object, } - logger.L().Debug("Sending put object message to producer", + logger.L().Debug("sending put object message to producer", helpers.String("account", msg.Account), helpers.String("cluster", msg.Cluster), helpers.String("kind", id.Kind.String()), @@ -248,7 +248,7 @@ func (c *Client) sendVerifyObjectMessage(ctx context.Context, id domain.KindName Name: id.Name, Namespace: id.Namespace, } - logger.L().Debug("Sending verify object message to producer", + logger.L().Debug("sending verify object message to producer", helpers.String("account", msg.Account), helpers.String("cluster", msg.Cluster), helpers.String("kind", id.Kind.String()), diff --git a/adapters/incluster/v1/adapter.go b/adapters/incluster/v1/adapter.go index 1539523..229b956 100644 --- a/adapters/incluster/v1/adapter.go +++ b/adapters/incluster/v1/adapter.go @@ -27,45 +27,61 @@ func NewInClusterAdapter(cfg config.InCluster, k8sclient dynamic.Interface) *Ada var _ adapters.Adapter = (*Adapter)(nil) -func (a *Adapter) DeleteObject(ctx context.Context, id domain.KindName) error { +func (a *Adapter) getClient(id domain.KindName) (adapters.Client, error) { if id.Kind == nil { - return fmt.Errorf("invalid resource kind. resource name: %s", id.Name) + return nil, fmt.Errorf("invalid resource kind. resource name: %s", id.Name) } - if client, ok := a.clients[id.Kind.String()]; ok { - return client.DeleteObject(ctx, id) + client, ok := a.clients[id.Kind.String()] + if !ok { + client = NewClient(a.k8sclient, a.cfg.Account, a.cfg.ClusterName, config.Resource{ + Group: id.Kind.Group, + Version: id.Kind.Version, + Resource: id.Kind.Resource, + Strategy: "copy", + }) + a.clients[id.Kind.String()] = client } - return fmt.Errorf("unknown resource %s", id.Kind.String()) + return client, nil } -func (a *Adapter) GetObject(ctx context.Context, id domain.KindName, baseObject []byte) error { - if id.Kind == nil { - return fmt.Errorf("invalid resource kind. resource name: %s", id.Name) - } - if client, ok := a.clients[id.Kind.String()]; ok { - return client.GetObject(ctx, id, baseObject) +func (a *Adapter) DeleteObject(ctx context.Context, id domain.KindName) error { + client, err := a.getClient(id) + if err != nil { + return fmt.Errorf("failed to get client for resource %s: %w", id.Kind, err) } - return fmt.Errorf("unknown resource %s", id.Kind.String()) + return client.DeleteObject(ctx, id) } -func (a *Adapter) PatchObject(ctx context.Context, id domain.KindName, checksum string, patch []byte) error { - if id.Kind == nil { - return fmt.Errorf("invalid resource kind. resource name: %s", id.Name) +func (a *Adapter) GetObject(ctx context.Context, id domain.KindName, baseObject []byte) error { + client, err := a.getClient(id) + if err != nil { + return fmt.Errorf("failed to get client for resource %s: %w", id.Kind, err) } + return client.GetObject(ctx, id, baseObject) +} - if client, ok := a.clients[id.Kind.String()]; ok { - return client.PatchObject(ctx, id, checksum, patch) +func (a *Adapter) PatchObject(ctx context.Context, id domain.KindName, checksum string, patch []byte) error { + client, err := a.getClient(id) + if err != nil { + return fmt.Errorf("failed to get client for resource %s: %w", id.Kind, err) } - return fmt.Errorf("unknown resource %s", id.Kind.String()) + return client.PatchObject(ctx, id, checksum, patch) } func (a *Adapter) PutObject(ctx context.Context, id domain.KindName, object []byte) error { - if id.Kind == nil { - return fmt.Errorf("invalid resource kind. resource name: %s", id.Name) + client, err := a.getClient(id) + if err != nil { + return fmt.Errorf("failed to get client for resource %s: %w", id.Kind, err) } - if client, ok := a.clients[id.Kind.String()]; ok { - return client.PutObject(ctx, id, object) + return client.PutObject(ctx, id, object) +} + +func (a *Adapter) VerifyObject(ctx context.Context, id domain.KindName, checksum string) error { + client, err := a.getClient(id) + if err != nil { + return fmt.Errorf("failed to get client for resource %s: %w", id.Kind, err) } - return fmt.Errorf("unknown resource %s", id.Kind.String()) + return client.VerifyObject(ctx, id, checksum) } func (a *Adapter) RegisterCallbacks(_ context.Context, callbacks domain.Callbacks) { @@ -87,13 +103,3 @@ func (a *Adapter) Start(ctx context.Context) error { } return nil } - -func (a *Adapter) VerifyObject(ctx context.Context, id domain.KindName, checksum string) error { - if id.Kind == nil { - return fmt.Errorf("invalid resource kind. resource name: %s", id.Name) - } - if client, ok := a.clients[id.Kind.String()]; ok { - return client.VerifyObject(ctx, id, checksum) - } - return fmt.Errorf("unknown resource %s", id.Kind.String()) -} diff --git a/adapters/incluster/v1/client.go b/adapters/incluster/v1/client.go index bb76627..8a9dd28 100644 --- a/adapters/incluster/v1/client.go +++ b/adapters/incluster/v1/client.go @@ -298,9 +298,10 @@ func (c *Client) PutObject(_ context.Context, id domain.KindName, object []byte) if err != nil { return fmt.Errorf("unmarshal object: %w", err) } - _, err = c.client.Resource(c.res).Namespace(id.Namespace).Create(context.Background(), &obj, metav1.CreateOptions{}) + // use apply to create or update object, we want to overwrite existing objects + _, err = c.client.Resource(c.res).Namespace(id.Namespace).Apply(context.Background(), id.Name, &obj, metav1.ApplyOptions{FieldManager: "application/apply-patch"}) if err != nil { - return fmt.Errorf("create resource: %w", err) + return fmt.Errorf("apply resource: %w", err) } return nil } From 7361e7405b539ca25cb2b8435cd77020c45e43f6 Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Fri, 15 Dec 2023 15:11:37 +0100 Subject: [PATCH 2/3] export some fields for testing Signed-off-by: Matthias Bertschy --- adapters/incluster/v1/adapter.go | 12 ++++++------ adapters/incluster/v1/client.go | 16 ++++++++-------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/adapters/incluster/v1/adapter.go b/adapters/incluster/v1/adapter.go index 229b956..6df55f2 100644 --- a/adapters/incluster/v1/adapter.go +++ b/adapters/incluster/v1/adapter.go @@ -27,7 +27,7 @@ func NewInClusterAdapter(cfg config.InCluster, k8sclient dynamic.Interface) *Ada var _ adapters.Adapter = (*Adapter)(nil) -func (a *Adapter) getClient(id domain.KindName) (adapters.Client, error) { +func (a *Adapter) GetClient(id domain.KindName) (adapters.Client, error) { if id.Kind == nil { return nil, fmt.Errorf("invalid resource kind. resource name: %s", id.Name) } @@ -45,7 +45,7 @@ func (a *Adapter) getClient(id domain.KindName) (adapters.Client, error) { } func (a *Adapter) DeleteObject(ctx context.Context, id domain.KindName) error { - client, err := a.getClient(id) + client, err := a.GetClient(id) if err != nil { return fmt.Errorf("failed to get client for resource %s: %w", id.Kind, err) } @@ -53,7 +53,7 @@ func (a *Adapter) DeleteObject(ctx context.Context, id domain.KindName) error { } func (a *Adapter) GetObject(ctx context.Context, id domain.KindName, baseObject []byte) error { - client, err := a.getClient(id) + client, err := a.GetClient(id) if err != nil { return fmt.Errorf("failed to get client for resource %s: %w", id.Kind, err) } @@ -61,7 +61,7 @@ func (a *Adapter) GetObject(ctx context.Context, id domain.KindName, baseObject } func (a *Adapter) PatchObject(ctx context.Context, id domain.KindName, checksum string, patch []byte) error { - client, err := a.getClient(id) + client, err := a.GetClient(id) if err != nil { return fmt.Errorf("failed to get client for resource %s: %w", id.Kind, err) } @@ -69,7 +69,7 @@ func (a *Adapter) PatchObject(ctx context.Context, id domain.KindName, checksum } func (a *Adapter) PutObject(ctx context.Context, id domain.KindName, object []byte) error { - client, err := a.getClient(id) + client, err := a.GetClient(id) if err != nil { return fmt.Errorf("failed to get client for resource %s: %w", id.Kind, err) } @@ -77,7 +77,7 @@ func (a *Adapter) PutObject(ctx context.Context, id domain.KindName, object []by } func (a *Adapter) VerifyObject(ctx context.Context, id domain.KindName, checksum string) error { - client, err := a.getClient(id) + client, err := a.GetClient(id) if err != nil { return fmt.Errorf("failed to get client for resource %s: %w", id.Kind, err) } diff --git a/adapters/incluster/v1/client.go b/adapters/incluster/v1/client.go index 8a9dd28..d7e3ec8 100644 --- a/adapters/incluster/v1/client.go +++ b/adapters/incluster/v1/client.go @@ -26,7 +26,7 @@ type Client struct { kind *domain.Kind callbacks domain.Callbacks res schema.GroupVersionResource - shadowObjects map[string][]byte + ShadowObjects map[string][]byte Strategy domain.Strategy } @@ -42,7 +42,7 @@ func NewClient(client dynamic.Interface, account, cluster string, r config.Resou Resource: res.Resource, }, res: res, - shadowObjects: map[string][]byte{}, + ShadowObjects: map[string][]byte{}, Strategy: r.Strategy, } } @@ -142,7 +142,7 @@ func (c *Client) Start(ctx context.Context) error { } if c.Strategy == domain.PatchStrategy { // remove from known resources - delete(c.shadowObjects, id.Name) + delete(c.ShadowObjects, id.Name) } case event.Type == watch.Modified: logger.L().Debug("modified resource", helpers.String("id", id.String())) @@ -185,9 +185,9 @@ func (c *Client) callPutOrPatch(ctx context.Context, id domain.KindName, baseObj if c.Strategy == domain.PatchStrategy { if len(baseObject) > 0 { // update reference object - c.shadowObjects[id.Name] = baseObject + c.ShadowObjects[id.Name] = baseObject } - if oldObject, ok := c.shadowObjects[id.Name]; ok { + if oldObject, ok := c.ShadowObjects[id.Name]; ok { // calculate checksum checksum, err := utils.CanonicalHash(newObject) if err != nil { @@ -209,7 +209,7 @@ func (c *Client) callPutOrPatch(ctx context.Context, id domain.KindName, baseObj } } // add/update known resources - c.shadowObjects[id.Name] = newObject + c.ShadowObjects[id.Name] = newObject } else { err := c.callbacks.PutObject(ctx, id, newObject) if err != nil { @@ -235,7 +235,7 @@ func (c *Client) callVerifyObject(ctx context.Context, id domain.KindName, objec func (c *Client) DeleteObject(_ context.Context, id domain.KindName) error { if c.Strategy == domain.PatchStrategy { // remove from known resources - delete(c.shadowObjects, id.String()) + delete(c.ShadowObjects, id.String()) } return c.client.Resource(c.res).Namespace(id.Namespace).Delete(context.Background(), id.Name, metav1.DeleteOptions{}) } @@ -287,7 +287,7 @@ func (c *Client) patchObject(ctx context.Context, id domain.KindName, checksum s return object, fmt.Errorf("checksum mismatch: %s != %s", newChecksum, checksum) } // update known resources - c.shadowObjects[id.Name] = modified + c.ShadowObjects[id.Name] = modified // save object return object, c.PutObject(ctx, id, modified) } From 611fce2ec98f6df51de7cf48d89c13bb958910e1 Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Mon, 18 Dec 2023 13:58:23 +0100 Subject: [PATCH 3/3] reconnect on error without restarting the client Signed-off-by: Matthias Bertschy --- cmd/client/main.go | 39 ++++++++++---------- cmd/server/main.go | 2 +- core/synchronizer.go | 75 ++++++++++++++++++++++++++------------- core/synchronizer_test.go | 5 ++- 4 files changed, 75 insertions(+), 46 deletions(-) diff --git a/cmd/client/main.go b/cmd/client/main.go index cd878bc..caf345d 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "net" "net/url" "os" "time" @@ -11,7 +12,6 @@ import ( backendUtils "github.com/kubescape/backend/pkg/utils" "github.com/kubescape/go-logger" "github.com/kubescape/go-logger/helpers" - "github.com/kubescape/synchronizer/adapters" "github.com/kubescape/synchronizer/adapters/incluster/v1" "github.com/kubescape/synchronizer/config" "github.com/kubescape/synchronizer/core" @@ -67,6 +67,11 @@ func main() { defer logger.ShutdownOtel(ctx) } + ctx = context.WithValue(ctx, domain.ContextKeyClientIdentifier, domain.ClientIdentifier{ + Account: cfg.InCluster.Account, + Cluster: cfg.InCluster.ClusterName, + }) + // k8s client k8sclient, err := utils.NewClient() if err != nil { @@ -87,8 +92,18 @@ func main() { // start liveness probe utils.StartLivenessProbe() + // websocket client + newConn := func() (net.Conn, error) { + conn, _, _, err := dialer.Dial(ctx, cfg.InCluster.ServerUrl) + if err != nil { + return nil, fmt.Errorf("unable to create websocket connection: %w", err) + } + return conn, nil + } + + var conn net.Conn for { - if err := start(ctx, cfg.InCluster, adapter, dialer); err != nil { + if conn, err = newConn(); err != nil { d := 5 * time.Second // TODO: use exponential backoff for retries logger.L().Ctx(ctx).Error("connection error", helpers.Error(err), helpers.String("retry in", d.String())) time.Sleep(d) @@ -96,27 +111,11 @@ func main() { break } } - logger.L().Info("exiting") -} - -func start(ctx context.Context, cfg config.InCluster, adapter adapters.Adapter, dialer ws.Dialer) error { - // websocket client - conn, _, _, err := dialer.Dial(ctx, cfg.ServerUrl) - if err != nil { - return fmt.Errorf("unable to create websocket connection: %w", err) - } - defer conn.Close() - - ctx = context.WithValue(ctx, domain.ContextKeyClientIdentifier, domain.ClientIdentifier{ - Account: cfg.Account, - Cluster: cfg.ClusterName, - }) // synchronizer - synchronizer := core.NewSynchronizerClient(ctx, adapter, conn) + synchronizer := core.NewSynchronizerClient(ctx, adapter, conn, newConn) err = synchronizer.Start(ctx) if err != nil { - return fmt.Errorf("error during sync: %w", err) + logger.L().Fatal("error during sync, exiting", helpers.Error(err)) } - return nil } diff --git a/cmd/server/main.go b/cmd/server/main.go index 18454a1..adc313f 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -77,7 +77,7 @@ func main() { synchronizer := core.NewSynchronizerServer(r.Context(), adapter, conn) err = synchronizer.Start(r.Context()) if err != nil { - logger.L().Error("error during sync", helpers.Error(err)) + logger.L().Error("error during sync, closing listener", helpers.Error(err)) return } }() diff --git a/core/synchronizer.go b/core/synchronizer.go index 19cae5b..b3ec883 100644 --- a/core/synchronizer.go +++ b/core/synchronizer.go @@ -20,15 +20,19 @@ import ( const maxMessageDepth = 8 type Synchronizer struct { - adapter adapters.Adapter - isClient bool // which side of the connection is this? - conn net.Conn - outPool *ants.PoolWithFunc - readDataFunc func(rw io.ReadWriter) ([]byte, error) + adapter adapters.Adapter + isClient bool // which side of the connection is this? + Conn *net.Conn + newConn func() (net.Conn, error) + outPool *ants.PoolWithFunc + readDataFunc func(rw io.ReadWriter) ([]byte, error) + writeDataFunc func(w io.Writer, p []byte) error } -func NewSynchronizerClient(mainCtx context.Context, adapter adapters.Adapter, conn net.Conn) *Synchronizer { - return newSynchronizer(mainCtx, adapter, conn, true, wsutil.ReadServerBinary, wsutil.WriteClientBinary) +func NewSynchronizerClient(mainCtx context.Context, adapter adapters.Adapter, conn net.Conn, newConn func() (net.Conn, error)) *Synchronizer { + s := newSynchronizer(mainCtx, adapter, conn, true, wsutil.ReadServerBinary, wsutil.WriteClientBinary) + s.newConn = newConn + return s } func NewSynchronizerServer(mainCtx context.Context, adapter adapters.Adapter, conn net.Conn) *Synchronizer { @@ -36,25 +40,22 @@ func NewSynchronizerServer(mainCtx context.Context, adapter adapters.Adapter, co } func newSynchronizer(mainCtx context.Context, adapter adapters.Adapter, conn net.Conn, isClient bool, readDataFunc func(rw io.ReadWriter) ([]byte, error), writeDataFunc func(w io.Writer, p []byte) error) *Synchronizer { + s := &Synchronizer{ + adapter: adapter, + isClient: isClient, + Conn: &conn, + readDataFunc: readDataFunc, + writeDataFunc: writeDataFunc, + } // outgoing message pool - outPool, err := ants.NewPoolWithFunc(10, func(i interface{}) { + var err error + s.outPool, err = ants.NewPoolWithFunc(1, func(i interface{}) { data := i.([]byte) - err := writeDataFunc(conn, data) - if err != nil { - logger.L().Ctx(mainCtx).Error("cannot send message", helpers.Error(err)) - return - } + s.sendData(mainCtx, data) }) if err != nil { logger.L().Ctx(mainCtx).Fatal("unable to create outgoing message pool", helpers.Error(err)) } - s := &Synchronizer{ - adapter: adapter, - isClient: isClient, - conn: conn, - outPool: outPool, - readDataFunc: readDataFunc, - } callbacks := domain.Callbacks{ DeleteObject: s.DeleteObjectCallback, GetObject: s.GetObjectCallback, @@ -66,6 +67,26 @@ func newSynchronizer(mainCtx context.Context, adapter adapters.Adapter, conn net return s } +func (s *Synchronizer) sendData(ctx context.Context, data []byte) { + err := s.writeDataFunc(*s.Conn, data) + if err != nil { + if s.isClient { + // try to reconnect + logger.L().Ctx(ctx).Warning("connection closed, trying to reconnect") + conn, err := s.newConn() + if err != nil { + logger.L().Ctx(ctx).Error("refreshing connection", helpers.Error(err)) + return + } + logger.L().Ctx(ctx).Info("connection refreshed, synchronization will resume") + s.Conn = &conn + } else { + logger.L().Ctx(ctx).Error("cannot send message", helpers.Error(err)) + return + } + } +} + func (s *Synchronizer) DeleteObjectCallback(ctx context.Context, id domain.KindName) error { err := s.sendObjectDeleted(ctx, id) if err != nil { @@ -111,7 +132,7 @@ func (s *Synchronizer) VerifyObjectCallback(ctx context.Context, id domain.KindN func (s *Synchronizer) Start(ctx context.Context) error { identifiers := utils.ClientIdentifierFromContext(ctx) - logger.L().Info("starting sync", + logger.L().Info("starting synchronization", helpers.String("account", identifiers.Account), helpers.String("cluster", identifiers.Cluster)) if s.isClient { @@ -134,7 +155,7 @@ func (s *Synchronizer) Start(ctx context.Context) error { func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error { clientId := utils.ClientIdentifierFromContext(ctx) // incoming message pool - inPool, err := ants.NewPoolWithFunc(10, func(i interface{}) { + inPool, err := ants.NewPoolWithFunc(1, func(i interface{}) { data := i.([]byte) // unmarshal message var generic domain.Generic @@ -305,9 +326,15 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error { } // process incoming messages for { - data, err := s.readDataFunc(s.conn) + data, err := s.readDataFunc(*s.Conn) if err != nil { - return fmt.Errorf("cannot read server data: %w", err) + if s.isClient { + logger.L().Ctx(ctx).Error("cannot read data, sleeping 1 minute before retrying", helpers.Error(err)) + time.Sleep(1 * time.Minute) + continue + } else { + return fmt.Errorf("cannot read data: %w", err) + } } err = inPool.Invoke(data) if err != nil { diff --git a/core/synchronizer_test.go b/core/synchronizer_test.go index 34372c7..7ae0fe7 100644 --- a/core/synchronizer_test.go +++ b/core/synchronizer_test.go @@ -39,7 +39,10 @@ func initTest(t *testing.T) (context.Context, *adapters.MockAdapter, *adapters.M clientAdapter := adapters.NewMockAdapter(true) serverAdapter := adapters.NewMockAdapter(false) clientConn, serverConn := net.Pipe() - client := NewSynchronizerClient(ctx, clientAdapter, clientConn) + newConn := func() (net.Conn, error) { + return clientConn, nil + } + client := NewSynchronizerClient(ctx, clientAdapter, clientConn, newConn) server := NewSynchronizerServer(ctx, serverAdapter, serverConn) go func() { _ = client.Start(ctx)