From 611fce2ec98f6df51de7cf48d89c13bb958910e1 Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Mon, 18 Dec 2023 13:58:23 +0100 Subject: [PATCH] reconnect on error without restarting the client Signed-off-by: Matthias Bertschy --- cmd/client/main.go | 39 ++++++++++---------- cmd/server/main.go | 2 +- core/synchronizer.go | 75 ++++++++++++++++++++++++++------------- core/synchronizer_test.go | 5 ++- 4 files changed, 75 insertions(+), 46 deletions(-) diff --git a/cmd/client/main.go b/cmd/client/main.go index cd878bc..caf345d 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "net" "net/url" "os" "time" @@ -11,7 +12,6 @@ import ( backendUtils "github.com/kubescape/backend/pkg/utils" "github.com/kubescape/go-logger" "github.com/kubescape/go-logger/helpers" - "github.com/kubescape/synchronizer/adapters" "github.com/kubescape/synchronizer/adapters/incluster/v1" "github.com/kubescape/synchronizer/config" "github.com/kubescape/synchronizer/core" @@ -67,6 +67,11 @@ func main() { defer logger.ShutdownOtel(ctx) } + ctx = context.WithValue(ctx, domain.ContextKeyClientIdentifier, domain.ClientIdentifier{ + Account: cfg.InCluster.Account, + Cluster: cfg.InCluster.ClusterName, + }) + // k8s client k8sclient, err := utils.NewClient() if err != nil { @@ -87,8 +92,18 @@ func main() { // start liveness probe utils.StartLivenessProbe() + // websocket client + newConn := func() (net.Conn, error) { + conn, _, _, err := dialer.Dial(ctx, cfg.InCluster.ServerUrl) + if err != nil { + return nil, fmt.Errorf("unable to create websocket connection: %w", err) + } + return conn, nil + } + + var conn net.Conn for { - if err := start(ctx, cfg.InCluster, adapter, dialer); err != nil { + if conn, err = newConn(); err != nil { d := 5 * time.Second // TODO: use exponential backoff for retries logger.L().Ctx(ctx).Error("connection error", helpers.Error(err), helpers.String("retry in", d.String())) time.Sleep(d) @@ -96,27 +111,11 @@ func main() { break } } - logger.L().Info("exiting") -} - -func start(ctx context.Context, cfg config.InCluster, adapter adapters.Adapter, dialer ws.Dialer) error { - // websocket client - conn, _, _, err := dialer.Dial(ctx, cfg.ServerUrl) - if err != nil { - return fmt.Errorf("unable to create websocket connection: %w", err) - } - defer conn.Close() - - ctx = context.WithValue(ctx, domain.ContextKeyClientIdentifier, domain.ClientIdentifier{ - Account: cfg.Account, - Cluster: cfg.ClusterName, - }) // synchronizer - synchronizer := core.NewSynchronizerClient(ctx, adapter, conn) + synchronizer := core.NewSynchronizerClient(ctx, adapter, conn, newConn) err = synchronizer.Start(ctx) if err != nil { - return fmt.Errorf("error during sync: %w", err) + logger.L().Fatal("error during sync, exiting", helpers.Error(err)) } - return nil } diff --git a/cmd/server/main.go b/cmd/server/main.go index 18454a1..adc313f 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -77,7 +77,7 @@ func main() { synchronizer := core.NewSynchronizerServer(r.Context(), adapter, conn) err = synchronizer.Start(r.Context()) if err != nil { - logger.L().Error("error during sync", helpers.Error(err)) + logger.L().Error("error during sync, closing listener", helpers.Error(err)) return } }() diff --git a/core/synchronizer.go b/core/synchronizer.go index 19cae5b..b3ec883 100644 --- a/core/synchronizer.go +++ b/core/synchronizer.go @@ -20,15 +20,19 @@ import ( const maxMessageDepth = 8 type Synchronizer struct { - adapter adapters.Adapter - isClient bool // which side of the connection is this? - conn net.Conn - outPool *ants.PoolWithFunc - readDataFunc func(rw io.ReadWriter) ([]byte, error) + adapter adapters.Adapter + isClient bool // which side of the connection is this? + Conn *net.Conn + newConn func() (net.Conn, error) + outPool *ants.PoolWithFunc + readDataFunc func(rw io.ReadWriter) ([]byte, error) + writeDataFunc func(w io.Writer, p []byte) error } -func NewSynchronizerClient(mainCtx context.Context, adapter adapters.Adapter, conn net.Conn) *Synchronizer { - return newSynchronizer(mainCtx, adapter, conn, true, wsutil.ReadServerBinary, wsutil.WriteClientBinary) +func NewSynchronizerClient(mainCtx context.Context, adapter adapters.Adapter, conn net.Conn, newConn func() (net.Conn, error)) *Synchronizer { + s := newSynchronizer(mainCtx, adapter, conn, true, wsutil.ReadServerBinary, wsutil.WriteClientBinary) + s.newConn = newConn + return s } func NewSynchronizerServer(mainCtx context.Context, adapter adapters.Adapter, conn net.Conn) *Synchronizer { @@ -36,25 +40,22 @@ func NewSynchronizerServer(mainCtx context.Context, adapter adapters.Adapter, co } func newSynchronizer(mainCtx context.Context, adapter adapters.Adapter, conn net.Conn, isClient bool, readDataFunc func(rw io.ReadWriter) ([]byte, error), writeDataFunc func(w io.Writer, p []byte) error) *Synchronizer { + s := &Synchronizer{ + adapter: adapter, + isClient: isClient, + Conn: &conn, + readDataFunc: readDataFunc, + writeDataFunc: writeDataFunc, + } // outgoing message pool - outPool, err := ants.NewPoolWithFunc(10, func(i interface{}) { + var err error + s.outPool, err = ants.NewPoolWithFunc(1, func(i interface{}) { data := i.([]byte) - err := writeDataFunc(conn, data) - if err != nil { - logger.L().Ctx(mainCtx).Error("cannot send message", helpers.Error(err)) - return - } + s.sendData(mainCtx, data) }) if err != nil { logger.L().Ctx(mainCtx).Fatal("unable to create outgoing message pool", helpers.Error(err)) } - s := &Synchronizer{ - adapter: adapter, - isClient: isClient, - conn: conn, - outPool: outPool, - readDataFunc: readDataFunc, - } callbacks := domain.Callbacks{ DeleteObject: s.DeleteObjectCallback, GetObject: s.GetObjectCallback, @@ -66,6 +67,26 @@ func newSynchronizer(mainCtx context.Context, adapter adapters.Adapter, conn net return s } +func (s *Synchronizer) sendData(ctx context.Context, data []byte) { + err := s.writeDataFunc(*s.Conn, data) + if err != nil { + if s.isClient { + // try to reconnect + logger.L().Ctx(ctx).Warning("connection closed, trying to reconnect") + conn, err := s.newConn() + if err != nil { + logger.L().Ctx(ctx).Error("refreshing connection", helpers.Error(err)) + return + } + logger.L().Ctx(ctx).Info("connection refreshed, synchronization will resume") + s.Conn = &conn + } else { + logger.L().Ctx(ctx).Error("cannot send message", helpers.Error(err)) + return + } + } +} + func (s *Synchronizer) DeleteObjectCallback(ctx context.Context, id domain.KindName) error { err := s.sendObjectDeleted(ctx, id) if err != nil { @@ -111,7 +132,7 @@ func (s *Synchronizer) VerifyObjectCallback(ctx context.Context, id domain.KindN func (s *Synchronizer) Start(ctx context.Context) error { identifiers := utils.ClientIdentifierFromContext(ctx) - logger.L().Info("starting sync", + logger.L().Info("starting synchronization", helpers.String("account", identifiers.Account), helpers.String("cluster", identifiers.Cluster)) if s.isClient { @@ -134,7 +155,7 @@ func (s *Synchronizer) Start(ctx context.Context) error { func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error { clientId := utils.ClientIdentifierFromContext(ctx) // incoming message pool - inPool, err := ants.NewPoolWithFunc(10, func(i interface{}) { + inPool, err := ants.NewPoolWithFunc(1, func(i interface{}) { data := i.([]byte) // unmarshal message var generic domain.Generic @@ -305,9 +326,15 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error { } // process incoming messages for { - data, err := s.readDataFunc(s.conn) + data, err := s.readDataFunc(*s.Conn) if err != nil { - return fmt.Errorf("cannot read server data: %w", err) + if s.isClient { + logger.L().Ctx(ctx).Error("cannot read data, sleeping 1 minute before retrying", helpers.Error(err)) + time.Sleep(1 * time.Minute) + continue + } else { + return fmt.Errorf("cannot read data: %w", err) + } } err = inPool.Invoke(data) if err != nil { diff --git a/core/synchronizer_test.go b/core/synchronizer_test.go index 34372c7..7ae0fe7 100644 --- a/core/synchronizer_test.go +++ b/core/synchronizer_test.go @@ -39,7 +39,10 @@ func initTest(t *testing.T) (context.Context, *adapters.MockAdapter, *adapters.M clientAdapter := adapters.NewMockAdapter(true) serverAdapter := adapters.NewMockAdapter(false) clientConn, serverConn := net.Pipe() - client := NewSynchronizerClient(ctx, clientAdapter, clientConn) + newConn := func() (net.Conn, error) { + return clientConn, nil + } + client := NewSynchronizerClient(ctx, clientAdapter, clientConn, newConn) server := NewSynchronizerServer(ctx, serverAdapter, serverConn) go func() { _ = client.Start(ctx)