Skip to content

Commit

Permalink
reconnect on error without restarting the client
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Bertschy <[email protected]>
  • Loading branch information
matthyx committed Dec 21, 2023
1 parent 7361e74 commit 611fce2
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 46 deletions.
39 changes: 19 additions & 20 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"net"
"net/url"
"os"
"time"
Expand All @@ -11,7 +12,6 @@ import (
backendUtils "github.com/kubescape/backend/pkg/utils"
"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
"github.com/kubescape/synchronizer/adapters"
"github.com/kubescape/synchronizer/adapters/incluster/v1"
"github.com/kubescape/synchronizer/config"
"github.com/kubescape/synchronizer/core"
Expand Down Expand Up @@ -67,6 +67,11 @@ func main() {
defer logger.ShutdownOtel(ctx)
}

ctx = context.WithValue(ctx, domain.ContextKeyClientIdentifier, domain.ClientIdentifier{
Account: cfg.InCluster.Account,
Cluster: cfg.InCluster.ClusterName,
})

// k8s client
k8sclient, err := utils.NewClient()
if err != nil {
Expand All @@ -87,36 +92,30 @@ func main() {
// start liveness probe
utils.StartLivenessProbe()

// websocket client
newConn := func() (net.Conn, error) {
conn, _, _, err := dialer.Dial(ctx, cfg.InCluster.ServerUrl)
if err != nil {
return nil, fmt.Errorf("unable to create websocket connection: %w", err)
}
return conn, nil
}

var conn net.Conn
for {
if err := start(ctx, cfg.InCluster, adapter, dialer); err != nil {
if conn, err = newConn(); err != nil {
d := 5 * time.Second // TODO: use exponential backoff for retries
logger.L().Ctx(ctx).Error("connection error", helpers.Error(err), helpers.String("retry in", d.String()))
time.Sleep(d)
} else {
break
}
}
logger.L().Info("exiting")
}

func start(ctx context.Context, cfg config.InCluster, adapter adapters.Adapter, dialer ws.Dialer) error {
// websocket client
conn, _, _, err := dialer.Dial(ctx, cfg.ServerUrl)
if err != nil {
return fmt.Errorf("unable to create websocket connection: %w", err)
}
defer conn.Close()

ctx = context.WithValue(ctx, domain.ContextKeyClientIdentifier, domain.ClientIdentifier{
Account: cfg.Account,
Cluster: cfg.ClusterName,
})

// synchronizer
synchronizer := core.NewSynchronizerClient(ctx, adapter, conn)
synchronizer := core.NewSynchronizerClient(ctx, adapter, conn, newConn)
err = synchronizer.Start(ctx)
if err != nil {
return fmt.Errorf("error during sync: %w", err)
logger.L().Fatal("error during sync, exiting", helpers.Error(err))
}
return nil
}
2 changes: 1 addition & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func main() {
synchronizer := core.NewSynchronizerServer(r.Context(), adapter, conn)
err = synchronizer.Start(r.Context())
if err != nil {
logger.L().Error("error during sync", helpers.Error(err))
logger.L().Error("error during sync, closing listener", helpers.Error(err))
return
}
}()
Expand Down
75 changes: 51 additions & 24 deletions core/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,41 +20,42 @@ import (
const maxMessageDepth = 8

type Synchronizer struct {
adapter adapters.Adapter
isClient bool // which side of the connection is this?
conn net.Conn
outPool *ants.PoolWithFunc
readDataFunc func(rw io.ReadWriter) ([]byte, error)
adapter adapters.Adapter
isClient bool // which side of the connection is this?
Conn *net.Conn
newConn func() (net.Conn, error)
outPool *ants.PoolWithFunc
readDataFunc func(rw io.ReadWriter) ([]byte, error)
writeDataFunc func(w io.Writer, p []byte) error
}

func NewSynchronizerClient(mainCtx context.Context, adapter adapters.Adapter, conn net.Conn) *Synchronizer {
return newSynchronizer(mainCtx, adapter, conn, true, wsutil.ReadServerBinary, wsutil.WriteClientBinary)
func NewSynchronizerClient(mainCtx context.Context, adapter adapters.Adapter, conn net.Conn, newConn func() (net.Conn, error)) *Synchronizer {
s := newSynchronizer(mainCtx, adapter, conn, true, wsutil.ReadServerBinary, wsutil.WriteClientBinary)
s.newConn = newConn
return s
}

func NewSynchronizerServer(mainCtx context.Context, adapter adapters.Adapter, conn net.Conn) *Synchronizer {
return newSynchronizer(mainCtx, adapter, conn, false, wsutil.ReadClientBinary, wsutil.WriteServerBinary)
}

func newSynchronizer(mainCtx context.Context, adapter adapters.Adapter, conn net.Conn, isClient bool, readDataFunc func(rw io.ReadWriter) ([]byte, error), writeDataFunc func(w io.Writer, p []byte) error) *Synchronizer {
s := &Synchronizer{
adapter: adapter,
isClient: isClient,
Conn: &conn,
readDataFunc: readDataFunc,
writeDataFunc: writeDataFunc,
}
// outgoing message pool
outPool, err := ants.NewPoolWithFunc(10, func(i interface{}) {
var err error
s.outPool, err = ants.NewPoolWithFunc(1, func(i interface{}) {
data := i.([]byte)
err := writeDataFunc(conn, data)
if err != nil {
logger.L().Ctx(mainCtx).Error("cannot send message", helpers.Error(err))
return
}
s.sendData(mainCtx, data)
})
if err != nil {
logger.L().Ctx(mainCtx).Fatal("unable to create outgoing message pool", helpers.Error(err))
}
s := &Synchronizer{
adapter: adapter,
isClient: isClient,
conn: conn,
outPool: outPool,
readDataFunc: readDataFunc,
}
callbacks := domain.Callbacks{
DeleteObject: s.DeleteObjectCallback,
GetObject: s.GetObjectCallback,
Expand All @@ -66,6 +67,26 @@ func newSynchronizer(mainCtx context.Context, adapter adapters.Adapter, conn net
return s
}

func (s *Synchronizer) sendData(ctx context.Context, data []byte) {
err := s.writeDataFunc(*s.Conn, data)
if err != nil {
if s.isClient {
// try to reconnect
logger.L().Ctx(ctx).Warning("connection closed, trying to reconnect")
conn, err := s.newConn()
if err != nil {
logger.L().Ctx(ctx).Error("refreshing connection", helpers.Error(err))
return
}
logger.L().Ctx(ctx).Info("connection refreshed, synchronization will resume")
s.Conn = &conn
} else {
logger.L().Ctx(ctx).Error("cannot send message", helpers.Error(err))
return
}
}
}

func (s *Synchronizer) DeleteObjectCallback(ctx context.Context, id domain.KindName) error {
err := s.sendObjectDeleted(ctx, id)
if err != nil {
Expand Down Expand Up @@ -111,7 +132,7 @@ func (s *Synchronizer) VerifyObjectCallback(ctx context.Context, id domain.KindN

func (s *Synchronizer) Start(ctx context.Context) error {
identifiers := utils.ClientIdentifierFromContext(ctx)
logger.L().Info("starting sync",
logger.L().Info("starting synchronization",
helpers.String("account", identifiers.Account),
helpers.String("cluster", identifiers.Cluster))
if s.isClient {
Expand All @@ -134,7 +155,7 @@ func (s *Synchronizer) Start(ctx context.Context) error {
func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error {
clientId := utils.ClientIdentifierFromContext(ctx)
// incoming message pool
inPool, err := ants.NewPoolWithFunc(10, func(i interface{}) {
inPool, err := ants.NewPoolWithFunc(1, func(i interface{}) {
data := i.([]byte)
// unmarshal message
var generic domain.Generic
Expand Down Expand Up @@ -305,9 +326,15 @@ func (s *Synchronizer) listenForSyncEvents(ctx context.Context) error {
}
// process incoming messages
for {
data, err := s.readDataFunc(s.conn)
data, err := s.readDataFunc(*s.Conn)
if err != nil {
return fmt.Errorf("cannot read server data: %w", err)
if s.isClient {
logger.L().Ctx(ctx).Error("cannot read data, sleeping 1 minute before retrying", helpers.Error(err))
time.Sleep(1 * time.Minute)
continue
} else {
return fmt.Errorf("cannot read data: %w", err)
}
}
err = inPool.Invoke(data)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion core/synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ func initTest(t *testing.T) (context.Context, *adapters.MockAdapter, *adapters.M
clientAdapter := adapters.NewMockAdapter(true)
serverAdapter := adapters.NewMockAdapter(false)
clientConn, serverConn := net.Pipe()
client := NewSynchronizerClient(ctx, clientAdapter, clientConn)
newConn := func() (net.Conn, error) {
return clientConn, nil
}
client := NewSynchronizerClient(ctx, clientAdapter, clientConn, newConn)
server := NewSynchronizerServer(ctx, serverAdapter, serverConn)
go func() {
_ = client.Start(ctx)
Expand Down

0 comments on commit 611fce2

Please sign in to comment.