Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NSP Target Registry chained server #258

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 39 additions & 18 deletions cmd/nsp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import (
"github.com/nordix/meridio/pkg/configuration/registry"
"github.com/nordix/meridio/pkg/health"
"github.com/nordix/meridio/pkg/nsp"
nspRegistry "github.com/nordix/meridio/pkg/nsp/registry"
"github.com/nordix/meridio/pkg/nsp/watchhandler"
"github.com/pkg/errors"

keepAliveRegistry "github.com/nordix/meridio/pkg/nsp/registry/keepalive"
sqliteRegistry "github.com/nordix/meridio/pkg/nsp/registry/sqlite"
Expand Down Expand Up @@ -97,31 +100,18 @@ func main() {
ctx = health.CreateChecker(ctx)

// configuration
configurationEventChan := make(chan *registry.ConfigurationEvent, 10)
configurationRegistry := registry.New(configurationEventChan)
configurationMonitor, err := monitor.New(config.ConfigMapName, config.Namespace, configurationRegistry)
configurationManagerServer, err := CreateConfigurationManagerServer(ctx, &config)
if err != nil {
logrus.Fatalf("Unable to start configuration monitor: %v", err)
logrus.Fatalf("CreateConfigurationManagerServer err: %v", err)
}
go configurationMonitor.Start(context.Background())
watcherNotifier := manager.NewWatcherNotifier(configurationRegistry, configurationEventChan)
go watcherNotifier.Start(context.Background())
configurationManagerServer := manager.NewServer(watcherNotifier)

// target registry
sqlr, err := sqliteRegistry.New(config.Datasource)
targetRegistryServer, err := CreateTargetRegistryServer(ctx, &config)
if err != nil {
logrus.Fatalf("Unable create sqlite registry: %v", err)
logrus.Fatalf("CreateTargetRegistryServer err: %v", err)
}
keepAliveRegistry, err := keepAliveRegistry.New(
keepAliveRegistry.WithRegistry(sqlr),
keepAliveRegistry.WithTimeout(config.EntryTimeout),
)
if err != nil {
logrus.Fatalf("Unable create keepalive registry: %v", err)
}
targetRegistryServer := nsp.NewServer(keepAliveRegistry)

// Create Server
server := grpc.NewServer(grpc.Creds(
credentials.GetServer(context.Background()),
))
Expand All @@ -141,3 +131,34 @@ func main() {

<-ctx.Done()
}

func CreateTargetRegistryServer(ctx context.Context, config *Config) (nspAPI.TargetRegistryServer, error) {
sqlr, err := sqliteRegistry.New(config.Datasource)
if err != nil {
return nil, errors.Wrap(err, "Unable create sqlite registry")
}
keepAliveRegistry, err := keepAliveRegistry.New(
keepAliveRegistry.WithRegistry(sqlr),
keepAliveRegistry.WithTimeout(config.EntryTimeout),
)
if err != nil {
return nil, errors.Wrap(err, "Unable create keepalive registry")
}
return nsp.NewServer(
nspRegistry.NewServer(keepAliveRegistry),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separation of nspRegistry and watchhandler seems forced to me. They both rely on the same keepAliveRegistry.

watchhandler.NewServer(keepAliveRegistry),
), nil
}

func CreateConfigurationManagerServer(ctx context.Context, config *Config) (nspAPI.ConfigurationManagerServer, error) {
configurationEventChan := make(chan *registry.ConfigurationEvent, 10)
configurationRegistry := registry.New(configurationEventChan)
configurationMonitor, err := monitor.New(config.ConfigMapName, config.Namespace, configurationRegistry)
if err != nil {
return nil, errors.Wrap(err, "Unable to start configuration monitor")
}
go configurationMonitor.Start(context.Background())
watcherNotifier := manager.NewWatcherNotifier(configurationRegistry, configurationEventChan)
go watcherNotifier.Start(context.Background())
return manager.NewServer(watcherNotifier), nil
}
33 changes: 33 additions & 0 deletions pkg/nsp/next/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
Copyright (c) 2022 Nordix Foundation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package next

// BuildNextTargetRegistryChain chains the target registry servers together.
// Each NextTargetRegistryServer must have a non nil NextTargetRegistryServerImpl.
// If the list of nextTargetRegistryServers is nil or empty, a nil value will be returned.
func BuildNextTargetRegistryChain(nextTargetRegistryServers ...NextTargetRegistryServer) NextTargetRegistryServer {
if len(nextTargetRegistryServers) <= 0 {
return nil
}
for i, ntrs := range nextTargetRegistryServers {
if i >= (len(nextTargetRegistryServers) - 1) {
break
}
ntrs.setNext(nextTargetRegistryServers[i+1])
}
return nextTargetRegistryServers[0]
}
83 changes: 83 additions & 0 deletions pkg/nsp/next/builder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
Copyright (c) 2022 Nordix Foundation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package next_test

import (
"reflect"
"testing"

"github.com/nordix/meridio/pkg/nsp/next"
)

type fakeNextTargetRegistryServerImpl struct {
*next.NextTargetRegistryServerImpl
name string
}

func TestBuildNextTargetRegistryChain(t *testing.T) {
fakeA := &fakeNextTargetRegistryServerImpl{
NextTargetRegistryServerImpl: &next.NextTargetRegistryServerImpl{},
name: "a",
}
type args struct {
nextTargetRegistryServers []next.NextTargetRegistryServer
}
tests := []struct {
name string
args args
want next.NextTargetRegistryServer
}{
{
name: "empty",
args: args{},
want: nil,
},
{
name: "one",
args: args{
[]next.NextTargetRegistryServer{
fakeA,
},
},
want: fakeA,
},
{
name: "multiple",
args: args{
[]next.NextTargetRegistryServer{
fakeA,
&fakeNextTargetRegistryServerImpl{
NextTargetRegistryServerImpl: &next.NextTargetRegistryServerImpl{},
name: "b",
},
&fakeNextTargetRegistryServerImpl{
NextTargetRegistryServerImpl: &next.NextTargetRegistryServerImpl{},
name: "c",
},
},
},
want: fakeA,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := next.BuildNextTargetRegistryChain(tt.args.nextTargetRegistryServers...); !reflect.DeepEqual(got, tt.want) {
t.Errorf("BuildNextTargetRegistryChain() = %v, want %v", got, tt.want)
}
})
}
}
67 changes: 67 additions & 0 deletions pkg/nsp/next/next.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
Copyright (c) 2021 Nordix Foundation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package next

import (
"context"

"github.com/golang/protobuf/ptypes/empty"
nspAPI "github.com/nordix/meridio/api/nsp/v1"
)

// NextTargetRegistryServer is the interface representing TargetRegistryServer with the
// support of the chaining feature.
type NextTargetRegistryServer interface {
nspAPI.TargetRegistryServer
setNext(NextTargetRegistryServer)
}

type NextTargetRegistryServerImpl struct {
nspAPI.UnimplementedTargetRegistryServer
next NextTargetRegistryServer
}

// Register will call the Register function of the next chain element
// If the next element is nil, then &empty.Empty{}, nil will be returned.
func (ntrsi *NextTargetRegistryServerImpl) Register(ctx context.Context, target *nspAPI.Target) (*empty.Empty, error) {
if ntrsi.next == nil {
return &empty.Empty{}, nil
}
return ntrsi.next.Register(ctx, target)
}

// Unregister will call the Unregister function of the next chain element
// If the next element is nil, then &empty.Empty{}, nil will be returned.
func (ntrsi *NextTargetRegistryServerImpl) Unregister(ctx context.Context, target *nspAPI.Target) (*empty.Empty, error) {
if ntrsi.next == nil {
return &empty.Empty{}, nil
}
return ntrsi.next.Unregister(ctx, target)
}

// Watch will call the Watch function of the next chain element
// If the next element is nil, then nil will be returned.
func (ntrsi *NextTargetRegistryServerImpl) Watch(t *nspAPI.Target, watcher nspAPI.TargetRegistry_WatchServer) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably this is similar to NSM's Monitor thingy, but it seems only one chain entry is allowed to implement a "real" Watch function as it needs to block forever. This is probably not an issue, but easy to make mistakes if the user is not aware :)

(Also, I read somewhere that grpc streams are not thread-safe. So, for example let's say the backend i.e the db was partitioned. Then letting loose multiple go routines wouldn't work properly without synchronization. And probably it would require a special tail chain entry that would wait for the watch routines to keep the stream open.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, only one chain element would be allowed to implement the real watch function.

For the second part, do you mean we should have 1 goroutine to handle all watcher?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just meant that special care should be taken in that case, and the default chaining couldn't be used as is.

if ntrsi.next == nil {
return nil
}
return ntrsi.next.Watch(t, watcher)
}

func (ntrsi *NextTargetRegistryServerImpl) setNext(ntrs NextTargetRegistryServer) {
ntrsi.next = ntrs
}
59 changes: 59 additions & 0 deletions pkg/nsp/registry/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
Copyright (c) 2021 Nordix Foundation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package registry

import (
"context"

"github.com/golang/protobuf/ptypes/empty"
nspAPI "github.com/nordix/meridio/api/nsp/v1"
"github.com/nordix/meridio/pkg/nsp/next"
"github.com/nordix/meridio/pkg/nsp/types"
)

type registry struct {
TargetRegistry types.TargetRegistry
*next.NextTargetRegistryServerImpl
}

// NewServer provides an implementation of TargetRegistryServer with the
// support of the chaining feature. This implementation handles Register
// and Unregister calls by adding or removing data into a storage (e.g
// memory or sqlite)
func NewServer(targetRegistry types.TargetRegistry) *registry {
r := &registry{
TargetRegistry: targetRegistry,
NextTargetRegistryServerImpl: &next.NextTargetRegistryServerImpl{},
}
return r
}

func (r *registry) Register(ctx context.Context, target *nspAPI.Target) (*empty.Empty, error) {
err := r.TargetRegistry.Set(ctx, target)
if err != nil {
return &empty.Empty{}, err
}
return r.NextTargetRegistryServerImpl.Register(ctx, target)
}

func (r *registry) Unregister(ctx context.Context, target *nspAPI.Target) (*empty.Empty, error) {
err := r.TargetRegistry.Remove(ctx, target)
if err != nil {
return &empty.Empty{}, err
}
return r.NextTargetRegistryServerImpl.Unregister(ctx, target)
}
53 changes: 3 additions & 50 deletions pkg/nsp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,58 +17,11 @@ limitations under the License.
package nsp

import (
"context"

"github.com/golang/protobuf/ptypes/empty"
nspAPI "github.com/nordix/meridio/api/nsp/v1"
"github.com/nordix/meridio/pkg/nsp/types"
"github.com/sirupsen/logrus"
"github.com/nordix/meridio/pkg/nsp/next"
)

type Server struct {
nspAPI.UnimplementedTargetRegistryServer
TargetRegistry types.TargetRegistry
}

// NewServer -
func NewServer(targetRegistry types.TargetRegistry) nspAPI.TargetRegistryServer {
networkServicePlateformService := &Server{
TargetRegistry: targetRegistry,
}

return networkServicePlateformService
}

func (s *Server) Register(ctx context.Context, target *nspAPI.Target) (*empty.Empty, error) {
return &empty.Empty{}, s.TargetRegistry.Set(ctx, target)
}

func (s *Server) Unregister(ctx context.Context, target *nspAPI.Target) (*empty.Empty, error) {
return &empty.Empty{}, s.TargetRegistry.Remove(ctx, target)
}

func (s *Server) Watch(t *nspAPI.Target, watcher nspAPI.TargetRegistry_WatchServer) error {
targetWatcher, err := s.TargetRegistry.Watch(context.TODO(), t)
if err != nil {
return err
}
s.watcher(watcher, targetWatcher.ResultChan())
targetWatcher.Stop()
return nil
}

func (s *Server) watcher(watcher nspAPI.TargetRegistry_WatchServer, ch <-chan []*nspAPI.Target) {
for {
select {
case event := <-ch:
err := watcher.Send(&nspAPI.TargetResponse{
Targets: event,
})
if err != nil {
logrus.Errorf("err sending TrenchResponse: %v", err)
}
case <-watcher.Context().Done():
return
}
}
func NewServer(nextTargetRegistryServers ...next.NextTargetRegistryServer) nspAPI.TargetRegistryServer {
return next.BuildNextTargetRegistryChain(nextTargetRegistryServers...)
}
Loading