Skip to content

Commit

Permalink
monitor: handle initial state transfer
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Glazychev <[email protected]>
  • Loading branch information
glazychev-art committed Nov 23, 2023
1 parent cb5a6d2 commit bace48f
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 36 deletions.
157 changes: 143 additions & 14 deletions pkg/networkservice/chains/nsmgr/upstreamrefresh_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
// Copyright (c) 2022-2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -21,8 +21,10 @@ import (
"testing"
"time"

"github.com/edwarnicke/genericsync"
"github.com/google/uuid"
"go.uber.org/goleak"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/networkservicemesh/api/pkg/api/networkservice"
Expand Down Expand Up @@ -62,7 +64,7 @@ func Test_UpstreamRefreshClient(t *testing.T) {
ctx,
nseReg,
sandbox.GenerateTestToken,
newRefreshSenderServer(),
newRefreshMTUSenderServer(),
counter,
)

Expand Down Expand Up @@ -129,7 +131,7 @@ func Test_UpstreamRefreshClient_LocalNotifications(t *testing.T) {
ctx,
nseReg,
sandbox.GenerateTestToken,
newRefreshSenderServer(),
newRefreshMTUSenderServer(),
counter1,
)

Expand All @@ -143,7 +145,7 @@ func Test_UpstreamRefreshClient_LocalNotifications(t *testing.T) {
ctx,
nseReg2,
sandbox.GenerateTestToken,
newRefreshSenderServer(),
newRefreshMTUSenderServer(),
counter2,
)

Expand Down Expand Up @@ -201,46 +203,173 @@ func Test_UpstreamRefreshClient_LocalNotifications(t *testing.T) {
require.NoError(t, err)
}

type refreshSenderServer struct {
m map[string]*networkservice.Connection
// This test shows a case when the event monitor is faster than the backward request
func Test_UpstreamRefreshClientDelay(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

domain := sandbox.NewBuilder(ctx, t).
SetNodesCount(1).
SetNSMgrProxySupplier(nil).
SetRegistryProxySupplier(nil).
Build()

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService("my-service"))
require.NoError(t, err)

nseReg := defaultRegistryEndpoint(nsReg.Name)

// This NSE will send REFRESH_REQUESTED events
// Channels coordinate the test to send the event at the right time
counter := new(count.Server)
ch1 := make(chan struct{}, 1)
ch2 := make(chan struct{}, 1)
_ = domain.Nodes[0].NewEndpoint(
ctx,
nseReg,
sandbox.GenerateTestToken,
newRefreshSenderServer(ch1, ch2),
counter,
)

// Create the client that has slow request processing
nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken, client.WithAdditionalFunctionality(
upstreamrefresh.NewClient(ctx),
newSlowHandlerClient(ch1, ch2)),
)

reqCtx, reqClose := context.WithTimeout(ctx, time.Second)
defer reqClose()

req := defaultRequest(nsReg.Name)
req.Connection.Id = uuid.New().String()

conn, err := nsc.Request(reqCtx, req)
require.NoError(t, err)
require.Equal(t, 1, counter.UniqueRequests())

// Eventually we should see a refresh
require.Eventually(t, func() bool { return counter.Requests() == 2 }, timeout, tick)

_, err = nsc.Close(ctx, conn)
require.NoError(t, err)
}

type refreshMTUSenderServer struct {
m *genericsync.Map[string, *networkservice.Connection]
mtu uint32
}

const defaultMtu = 9000

func newRefreshSenderServer() *refreshSenderServer {
return &refreshSenderServer{
m: make(map[string]*networkservice.Connection),
func newRefreshMTUSenderServer() *refreshMTUSenderServer {
return &refreshMTUSenderServer{
m: new(genericsync.Map[string, *networkservice.Connection]),
mtu: defaultMtu,
}
}

func (r *refreshSenderServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
func (r *refreshMTUSenderServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
conn, err := next.Server(ctx).Request(ctx, request)
if err != nil {
return nil, err
}
if conn.GetContext().GetMTU() != r.mtu {
if _, ok := r.m[conn.Id]; ok {
if _, ok := r.m.Load(conn.Id); ok {
return conn, err
}
ec, _ := monitor.LoadEventConsumer(ctx, false)

connectionsToSend := make(map[string]*networkservice.Connection)
for k, v := range r.m {
r.m.Range(func(k string, v *networkservice.Connection) bool {
connectionsToSend[k] = v.Clone()
connectionsToSend[k].State = networkservice.State_REFRESH_REQUESTED
}
return true
})

_ = ec.Send(&networkservice.ConnectionEvent{
Type: networkservice.ConnectionEventType_UPDATE,
Connections: connectionsToSend,
})
}
r.m[conn.Id] = conn
r.m.Store(conn.GetId(), conn.Clone())

return conn, err
}

func (r *refreshMTUSenderServer) Close(ctx context.Context, conn *networkservice.Connection) (*emptypb.Empty, error) {
return next.Server(ctx).Close(ctx, conn)
}

type refreshSenderServer struct {
waitSignalCh <-chan struct{}
eventSentCh chan<- struct{}

eventWasSent bool
}

func newRefreshSenderServer(waitSignalCh <-chan struct{}, eventSentCh chan<- struct{}) *refreshSenderServer {
return &refreshSenderServer{
waitSignalCh: waitSignalCh,
eventSentCh: eventSentCh,
}
}

func (r *refreshSenderServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
conn, err := next.Server(ctx).Request(ctx, request)
if err != nil {
return nil, err
}

if !r.eventWasSent {
ec, _ := monitor.LoadEventConsumer(ctx, false)

c := conn.Clone()
c.State = networkservice.State_REFRESH_REQUESTED

go func() {
<-r.waitSignalCh
_ = ec.Send(&networkservice.ConnectionEvent{
Type: networkservice.ConnectionEventType_UPDATE,
Connections: map[string]*networkservice.Connection{c.Id: c},
})
time.Sleep(time.Millisecond * 100)
r.eventWasSent = true
r.eventSentCh <- struct{}{}
}()
} else {
r.eventSentCh <- struct{}{}
}
return conn, err
}

func (r *refreshSenderServer) Close(ctx context.Context, conn *networkservice.Connection) (*emptypb.Empty, error) {
return next.Server(ctx).Close(ctx, conn)
}

type slowHandlerClient struct {
startSlowHandlingCh chan<- struct{}
finishSlowHandlingCh <-chan struct{}
}

func newSlowHandlerClient(startSlowHandlingCh chan<- struct{}, finishSlowHandlingCh <-chan struct{}) networkservice.NetworkServiceClient {
return &slowHandlerClient{
startSlowHandlingCh: startSlowHandlingCh,
finishSlowHandlingCh: finishSlowHandlingCh,
}
}

func (s *slowHandlerClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) {
conn, err := next.Client(ctx).Request(ctx, request, opts...)
s.startSlowHandlingCh <- struct{}{}
<-s.finishSlowHandlingCh
return conn, err
}

func (s *slowHandlerClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*emptypb.Empty, error) {
return next.Client(ctx).Close(ctx, conn, opts...)
}
7 changes: 0 additions & 7 deletions pkg/networkservice/common/heal/eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,6 @@ func newEventLoop(ctx context.Context, cc grpc.ClientConnInterface, conn *networ
return nil, nil, errors.Wrap(err, "failed get MonitorConnections client")
}

// get the initial state transfer and use it to detect whether we have a real connection or not
_, err = client.Recv()
if err != nil {
eventLoopCancel()
return nil, nil, errors.Wrap(err, "failed to get the initial state transfer")
}

logger := log.FromContext(ctx).WithField("heal", "eventLoop")
cev := &eventLoop{
heal: heal,
Expand Down
7 changes: 0 additions & 7 deletions pkg/networkservice/common/monitor/eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,6 @@ func newEventLoop(ctx context.Context, ec EventConsumer, cc grpc.ClientConnInter
return nil, errors.Wrap(err, "failed to get a MonitorConnections client")
}

// get the initial state transfer and use it to detect whether we have a real connection or not
_, err = client.Recv()
if err != nil {
eventLoopCancel()
return nil, errors.Wrap(err, "failed to get the initial state transfer")
}

cev := &eventLoop{
eventLoopCtx: eventLoopCtx,
conn: conn,
Expand Down
3 changes: 2 additions & 1 deletion pkg/networkservice/common/upstreamrefresh/client_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ func (c *clientFilter) Recv() (*networkservice.ConnectionEvent, error) {
Connections: make(map[string]*networkservice.Connection),
}
for _, connIn := range eventIn.GetConnections() {
if eventIn.GetType() != networkservice.ConnectionEventType_UPDATE {
if eventIn.GetType() != networkservice.ConnectionEventType_UPDATE &&
eventIn.GetType() != networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER {
continue
}

Expand Down
7 changes: 0 additions & 7 deletions pkg/networkservice/common/upstreamrefresh/eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,6 @@ func newEventLoop(ctx context.Context, cc grpc.ClientConnInterface, conn *networ
return nil, errors.Wrap(err, "failed to get a MonitorConnection client")
}

// get the initial state transfer and use it to detect whether we have a real connection or not
_, err = client.Recv()
if err != nil {
eventLoopCancel()
return nil, errors.Wrap(err, "failed to get the initial state transfer")
}

logger := log.FromContext(ctx).WithField("upstreamrefresh", "eventLoop")
cev := &eventLoop{
eventLoopCtx: eventLoopCtx,
Expand Down

0 comments on commit bace48f

Please sign in to comment.