From 530e439255e773c2b92bc7fb8ccedba1a38e188a Mon Sep 17 00:00:00 2001 From: Artem Glazychev Date: Thu, 23 Nov 2023 18:16:33 +0700 Subject: [PATCH] monitor: handle initial state transfer (#1483) * monitor: handle initial state transfer Signed-off-by: Artem Glazychev * Add healing test Signed-off-by: Artem Glazychev --------- Signed-off-by: Artem Glazychev --- pkg/networkservice/chains/nsmgr/heal_test.go | 80 +++++++++ .../chains/nsmgr/upstreamrefresh_test.go | 157 ++++++++++++++++-- pkg/networkservice/common/heal/eventloop.go | 7 - .../common/monitor/eventloop.go | 7 - .../common/upstreamrefresh/client_filter.go | 3 +- .../common/upstreamrefresh/eventloop.go | 7 - 6 files changed, 225 insertions(+), 36 deletions(-) diff --git a/pkg/networkservice/chains/nsmgr/heal_test.go b/pkg/networkservice/chains/nsmgr/heal_test.go index 0894a25d8..722cb6e21 100644 --- a/pkg/networkservice/chains/nsmgr/heal_test.go +++ b/pkg/networkservice/chains/nsmgr/heal_test.go @@ -37,6 +37,7 @@ import ( nsclient "github.com/networkservicemesh/sdk/pkg/networkservice/chains/client" "github.com/networkservicemesh/sdk/pkg/networkservice/common/heal" "github.com/networkservicemesh/sdk/pkg/networkservice/common/null" + "github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkrequest" "github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkresponse" "github.com/networkservicemesh/sdk/pkg/networkservice/utils/count" "github.com/networkservicemesh/sdk/pkg/networkservice/utils/inject/injecterror" @@ -814,3 +815,82 @@ func TestNSMGR_RefreshFailed_DataPlaneBroken(t *testing.T) { _, err = nsc.Close(ctx, conn.Clone()) require.NoError(t, err) } + +// This test shows that healing successfully restores the connection if one of the components is killed during the Request +func TestNSMGR_RefreshFailed_ControlPlaneBroken(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t) }) + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + domain := sandbox.NewBuilder(ctx, t). + SetNodesCount(1). + Build() + + nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken) + + nsReg := defaultRegistryService(t.Name()) + nsReg, err := nsRegistryClient.Register(ctx, nsReg) + require.NoError(t, err) + + nseReg := defaultRegistryEndpoint(nsReg.Name) + + counter := new(count.Server) + domain.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, counter) + + request := defaultRequest(nsReg.Name) + + tokenDuration := time.Minute * 15 + clk := clockmock.New(ctx) + clk.Set(time.Now()) + + // syncCh is used to catch the situation when the forwarder dies during the Request (after the heal chain element) + syncCh := make(chan struct{}, 1) + + nsc := domain.Nodes[0].NewClient(ctx, + sandbox.GenerateExpiringToken(tokenDuration), + nsclient.WithHealClient(heal.NewClient(ctx)), + nsclient.WithAdditionalFunctionality( + checkrequest.NewClient(t, func(t *testing.T, request *networkservice.NetworkServiceRequest) { + <-syncCh + }), + ), + ) + + requestCtx, requestCalcel := context.WithTimeout(ctx, time.Second) + requestCtx = clock.WithClock(requestCtx, clk) + defer requestCalcel() + + // allow the first Request + syncCh <- struct{}{} + conn, err := nsc.Request(requestCtx, request.Clone()) + require.NoError(t, err) + require.Equal(t, 1, counter.Requests()) + + // refresh interval in this test is expected to be 3 minutes and a few milliseconds + clk.Add(time.Second * 190) + + // kill the forwarder during the healing Request (it is stopped by syncCh). Then continue - the healing process will fail. + for _, forwarder := range domain.Nodes[0].Forwarders { + forwarder.Cancel() + break + } + syncCh <- struct{}{} + + // create a new forwarder and allow the healing Request + forwarderReg := ®istry.NetworkServiceEndpoint{ + Name: sandbox.UniqueName("forwarder-2"), + NetworkServiceNames: []string{"forwarder"}, + } + domain.Nodes[0].NewForwarder(ctx, forwarderReg, sandbox.GenerateTestToken) + syncCh <- struct{}{} + + // wait till Request reached NSE + require.Eventually(t, func() bool { + return counter.Requests() == 2 + }, timeout, tick) + + _, err = nsc.Close(ctx, conn.Clone()) + require.NoError(t, err) + require.Equal(t, 2, counter.Requests()) +} diff --git a/pkg/networkservice/chains/nsmgr/upstreamrefresh_test.go b/pkg/networkservice/chains/nsmgr/upstreamrefresh_test.go index 57a93093e..62e1b1224 100644 --- a/pkg/networkservice/chains/nsmgr/upstreamrefresh_test.go +++ b/pkg/networkservice/chains/nsmgr/upstreamrefresh_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2022 Cisco and/or its affiliates. +// Copyright (c) 2022-2023 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -21,8 +21,10 @@ import ( "testing" "time" + "github.com/edwarnicke/genericsync" "github.com/google/uuid" "go.uber.org/goleak" + "google.golang.org/grpc" "google.golang.org/protobuf/types/known/emptypb" "github.com/networkservicemesh/api/pkg/api/networkservice" @@ -62,7 +64,7 @@ func Test_UpstreamRefreshClient(t *testing.T) { ctx, nseReg, sandbox.GenerateTestToken, - newRefreshSenderServer(), + newRefreshMTUSenderServer(), counter, ) @@ -129,7 +131,7 @@ func Test_UpstreamRefreshClient_LocalNotifications(t *testing.T) { ctx, nseReg, sandbox.GenerateTestToken, - newRefreshSenderServer(), + newRefreshMTUSenderServer(), counter1, ) @@ -143,7 +145,7 @@ func Test_UpstreamRefreshClient_LocalNotifications(t *testing.T) { ctx, nseReg2, sandbox.GenerateTestToken, - newRefreshSenderServer(), + newRefreshMTUSenderServer(), counter2, ) @@ -201,46 +203,173 @@ func Test_UpstreamRefreshClient_LocalNotifications(t *testing.T) { require.NoError(t, err) } -type refreshSenderServer struct { - m map[string]*networkservice.Connection +// This test shows a case when the event monitor is faster than the backward request +func Test_UpstreamRefreshClientDelay(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t) }) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + domain := sandbox.NewBuilder(ctx, t). + SetNodesCount(1). + SetNSMgrProxySupplier(nil). + SetRegistryProxySupplier(nil). + Build() + + nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken) + + nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService("my-service")) + require.NoError(t, err) + + nseReg := defaultRegistryEndpoint(nsReg.Name) + + // This NSE will send REFRESH_REQUESTED events + // Channels coordinate the test to send the event at the right time + counter := new(count.Server) + ch1 := make(chan struct{}, 1) + ch2 := make(chan struct{}, 1) + _ = domain.Nodes[0].NewEndpoint( + ctx, + nseReg, + sandbox.GenerateTestToken, + newRefreshSenderServer(ch1, ch2), + counter, + ) + + // Create the client that has slow request processing + nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken, client.WithAdditionalFunctionality( + upstreamrefresh.NewClient(ctx), + newSlowHandlerClient(ch1, ch2)), + ) + + reqCtx, reqClose := context.WithTimeout(ctx, time.Second) + defer reqClose() + + req := defaultRequest(nsReg.Name) + req.Connection.Id = uuid.New().String() + + conn, err := nsc.Request(reqCtx, req) + require.NoError(t, err) + require.Equal(t, 1, counter.UniqueRequests()) + + // Eventually we should see a refresh + require.Eventually(t, func() bool { return counter.Requests() == 2 }, timeout, tick) + + _, err = nsc.Close(ctx, conn) + require.NoError(t, err) +} + +type refreshMTUSenderServer struct { + m *genericsync.Map[string, *networkservice.Connection] mtu uint32 } const defaultMtu = 9000 -func newRefreshSenderServer() *refreshSenderServer { - return &refreshSenderServer{ - m: make(map[string]*networkservice.Connection), +func newRefreshMTUSenderServer() *refreshMTUSenderServer { + return &refreshMTUSenderServer{ + m: new(genericsync.Map[string, *networkservice.Connection]), mtu: defaultMtu, } } -func (r *refreshSenderServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { +func (r *refreshMTUSenderServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { conn, err := next.Server(ctx).Request(ctx, request) if err != nil { return nil, err } if conn.GetContext().GetMTU() != r.mtu { - if _, ok := r.m[conn.Id]; ok { + if _, ok := r.m.Load(conn.Id); ok { return conn, err } ec, _ := monitor.LoadEventConsumer(ctx, false) connectionsToSend := make(map[string]*networkservice.Connection) - for k, v := range r.m { + r.m.Range(func(k string, v *networkservice.Connection) bool { connectionsToSend[k] = v.Clone() connectionsToSend[k].State = networkservice.State_REFRESH_REQUESTED - } + return true + }) + _ = ec.Send(&networkservice.ConnectionEvent{ Type: networkservice.ConnectionEventType_UPDATE, Connections: connectionsToSend, }) } - r.m[conn.Id] = conn + r.m.Store(conn.GetId(), conn.Clone()) return conn, err } +func (r *refreshMTUSenderServer) Close(ctx context.Context, conn *networkservice.Connection) (*emptypb.Empty, error) { + return next.Server(ctx).Close(ctx, conn) +} + +type refreshSenderServer struct { + waitSignalCh <-chan struct{} + eventSentCh chan<- struct{} + + eventWasSent bool +} + +func newRefreshSenderServer(waitSignalCh <-chan struct{}, eventSentCh chan<- struct{}) *refreshSenderServer { + return &refreshSenderServer{ + waitSignalCh: waitSignalCh, + eventSentCh: eventSentCh, + } +} + +func (r *refreshSenderServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { + conn, err := next.Server(ctx).Request(ctx, request) + if err != nil { + return nil, err + } + + if !r.eventWasSent { + ec, _ := monitor.LoadEventConsumer(ctx, false) + + c := conn.Clone() + c.State = networkservice.State_REFRESH_REQUESTED + + go func() { + <-r.waitSignalCh + _ = ec.Send(&networkservice.ConnectionEvent{ + Type: networkservice.ConnectionEventType_UPDATE, + Connections: map[string]*networkservice.Connection{c.Id: c}, + }) + time.Sleep(time.Millisecond * 100) + r.eventWasSent = true + r.eventSentCh <- struct{}{} + }() + } else { + r.eventSentCh <- struct{}{} + } + return conn, err +} + func (r *refreshSenderServer) Close(ctx context.Context, conn *networkservice.Connection) (*emptypb.Empty, error) { return next.Server(ctx).Close(ctx, conn) } + +type slowHandlerClient struct { + startSlowHandlingCh chan<- struct{} + finishSlowHandlingCh <-chan struct{} +} + +func newSlowHandlerClient(startSlowHandlingCh chan<- struct{}, finishSlowHandlingCh <-chan struct{}) networkservice.NetworkServiceClient { + return &slowHandlerClient{ + startSlowHandlingCh: startSlowHandlingCh, + finishSlowHandlingCh: finishSlowHandlingCh, + } +} + +func (s *slowHandlerClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) { + conn, err := next.Client(ctx).Request(ctx, request, opts...) + s.startSlowHandlingCh <- struct{}{} + <-s.finishSlowHandlingCh + return conn, err +} + +func (s *slowHandlerClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*emptypb.Empty, error) { + return next.Client(ctx).Close(ctx, conn, opts...) +} diff --git a/pkg/networkservice/common/heal/eventloop.go b/pkg/networkservice/common/heal/eventloop.go index dbe858313..ea2373c2f 100644 --- a/pkg/networkservice/common/heal/eventloop.go +++ b/pkg/networkservice/common/heal/eventloop.go @@ -71,13 +71,6 @@ func newEventLoop(ctx context.Context, cc grpc.ClientConnInterface, conn *networ return nil, nil, errors.Wrap(err, "failed get MonitorConnections client") } - // get the initial state transfer and use it to detect whether we have a real connection or not - _, err = client.Recv() - if err != nil { - eventLoopCancel() - return nil, nil, errors.Wrap(err, "failed to get the initial state transfer") - } - logger := log.FromContext(ctx).WithField("heal", "eventLoop") cev := &eventLoop{ heal: heal, diff --git a/pkg/networkservice/common/monitor/eventloop.go b/pkg/networkservice/common/monitor/eventloop.go index b01d61c58..0519ab8f4 100644 --- a/pkg/networkservice/common/monitor/eventloop.go +++ b/pkg/networkservice/common/monitor/eventloop.go @@ -57,13 +57,6 @@ func newEventLoop(ctx context.Context, ec EventConsumer, cc grpc.ClientConnInter return nil, errors.Wrap(err, "failed to get a MonitorConnections client") } - // get the initial state transfer and use it to detect whether we have a real connection or not - _, err = client.Recv() - if err != nil { - eventLoopCancel() - return nil, errors.Wrap(err, "failed to get the initial state transfer") - } - cev := &eventLoop{ eventLoopCtx: eventLoopCtx, conn: conn, diff --git a/pkg/networkservice/common/upstreamrefresh/client_filter.go b/pkg/networkservice/common/upstreamrefresh/client_filter.go index b5b8f9e4e..9e78a67dd 100644 --- a/pkg/networkservice/common/upstreamrefresh/client_filter.go +++ b/pkg/networkservice/common/upstreamrefresh/client_filter.go @@ -50,7 +50,8 @@ func (c *clientFilter) Recv() (*networkservice.ConnectionEvent, error) { Connections: make(map[string]*networkservice.Connection), } for _, connIn := range eventIn.GetConnections() { - if eventIn.GetType() != networkservice.ConnectionEventType_UPDATE { + if eventIn.GetType() != networkservice.ConnectionEventType_UPDATE && + eventIn.GetType() != networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER { continue } diff --git a/pkg/networkservice/common/upstreamrefresh/eventloop.go b/pkg/networkservice/common/upstreamrefresh/eventloop.go index 5ffef487a..2d1a80fd4 100644 --- a/pkg/networkservice/common/upstreamrefresh/eventloop.go +++ b/pkg/networkservice/common/upstreamrefresh/eventloop.go @@ -64,13 +64,6 @@ func newEventLoop(ctx context.Context, cc grpc.ClientConnInterface, conn *networ return nil, errors.Wrap(err, "failed to get a MonitorConnection client") } - // get the initial state transfer and use it to detect whether we have a real connection or not - _, err = client.Recv() - if err != nil { - eventLoopCancel() - return nil, errors.Wrap(err, "failed to get the initial state transfer") - } - logger := log.FromContext(ctx).WithField("upstreamrefresh", "eventLoop") cev := &eventLoop{ eventLoopCtx: eventLoopCtx,