Skip to content

Commit

Permalink
NSP Target Registry chained server
Browse files Browse the repository at this point in the history
* New target registry server pattern allowing future feature
* Current chain: watch handler -> store (sqlite + keepalive)
  • Loading branch information
LionelJouin committed Aug 3, 2022
1 parent 61c9766 commit 34b917a
Show file tree
Hide file tree
Showing 7 changed files with 356 additions and 68 deletions.
57 changes: 39 additions & 18 deletions cmd/nsp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import (
"github.com/nordix/meridio/pkg/configuration/registry"
"github.com/nordix/meridio/pkg/health"
"github.com/nordix/meridio/pkg/nsp"
nspRegistry "github.com/nordix/meridio/pkg/nsp/registry"
"github.com/nordix/meridio/pkg/nsp/watchhandler"
"github.com/pkg/errors"

keepAliveRegistry "github.com/nordix/meridio/pkg/nsp/registry/keepalive"
sqliteRegistry "github.com/nordix/meridio/pkg/nsp/registry/sqlite"
Expand Down Expand Up @@ -97,31 +100,18 @@ func main() {
ctx = health.CreateChecker(ctx)

// configuration
configurationEventChan := make(chan *registry.ConfigurationEvent, 10)
configurationRegistry := registry.New(configurationEventChan)
configurationMonitor, err := monitor.New(config.ConfigMapName, config.Namespace, configurationRegistry)
configurationManagerServer, err := CreateConfigurationManagerServer(ctx, &config)
if err != nil {
logrus.Fatalf("Unable to start configuration monitor: %v", err)
logrus.Fatalf("CreateConfigurationManagerServer err: %v", err)
}
go configurationMonitor.Start(context.Background())
watcherNotifier := manager.NewWatcherNotifier(configurationRegistry, configurationEventChan)
go watcherNotifier.Start(context.Background())
configurationManagerServer := manager.NewServer(watcherNotifier)

// target registry
sqlr, err := sqliteRegistry.New(config.Datasource)
targetRegistryServer, err := CreateTargetRegistryServer(ctx, &config)
if err != nil {
logrus.Fatalf("Unable create sqlite registry: %v", err)
logrus.Fatalf("CreateTargetRegistryServer err: %v", err)
}
keepAliveRegistry, err := keepAliveRegistry.New(
keepAliveRegistry.WithRegistry(sqlr),
keepAliveRegistry.WithTimeout(config.EntryTimeout),
)
if err != nil {
logrus.Fatalf("Unable create keepalive registry: %v", err)
}
targetRegistryServer := nsp.NewServer(keepAliveRegistry)

// Create Server
server := grpc.NewServer(grpc.Creds(
credentials.GetServer(context.Background()),
))
Expand All @@ -141,3 +131,34 @@ func main() {

<-ctx.Done()
}

func CreateTargetRegistryServer(ctx context.Context, config *Config) (nspAPI.TargetRegistryServer, error) {
sqlr, err := sqliteRegistry.New(config.Datasource)
if err != nil {
return nil, errors.Wrap(err, "Unable create sqlite registry")
}
keepAliveRegistry, err := keepAliveRegistry.New(
keepAliveRegistry.WithRegistry(sqlr),
keepAliveRegistry.WithTimeout(config.EntryTimeout),
)
if err != nil {
return nil, errors.Wrap(err, "Unable create keepalive registry")
}
return nsp.NewServer(
nspRegistry.NewServer(keepAliveRegistry),
watchhandler.NewServer(keepAliveRegistry),
), nil
}

func CreateConfigurationManagerServer(ctx context.Context, config *Config) (nspAPI.ConfigurationManagerServer, error) {
configurationEventChan := make(chan *registry.ConfigurationEvent, 10)
configurationRegistry := registry.New(configurationEventChan)
configurationMonitor, err := monitor.New(config.ConfigMapName, config.Namespace, configurationRegistry)
if err != nil {
return nil, errors.Wrap(err, "Unable to start configuration monitor")
}
go configurationMonitor.Start(context.Background())
watcherNotifier := manager.NewWatcherNotifier(configurationRegistry, configurationEventChan)
go watcherNotifier.Start(context.Background())
return manager.NewServer(watcherNotifier), nil
}
33 changes: 33 additions & 0 deletions pkg/nsp/next/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
Copyright (c) 2022 Nordix Foundation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package next

// BuildNextTargetRegistryChain chains the target registry servers together.
// Each NextTargetRegistryServer must have a non nil NextTargetRegistryServerImpl.
// If the list of nextTargetRegistryServers is nil or empty, a nil value will be returned.
func BuildNextTargetRegistryChain(nextTargetRegistryServers ...NextTargetRegistryServer) NextTargetRegistryServer {
if len(nextTargetRegistryServers) <= 0 {
return nil
}
for i, ntrs := range nextTargetRegistryServers {
if i >= (len(nextTargetRegistryServers) - 1) {
break
}
ntrs.setNext(nextTargetRegistryServers[i+1])
}
return nextTargetRegistryServers[0]
}
83 changes: 83 additions & 0 deletions pkg/nsp/next/builder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
Copyright (c) 2022 Nordix Foundation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package next_test

import (
"reflect"
"testing"

"github.com/nordix/meridio/pkg/nsp/next"
)

type fakeNextTargetRegistryServerImpl struct {
*next.NextTargetRegistryServerImpl
name string
}

func TestBuildNextTargetRegistryChain(t *testing.T) {
fakeA := &fakeNextTargetRegistryServerImpl{
NextTargetRegistryServerImpl: &next.NextTargetRegistryServerImpl{},
name: "a",
}
type args struct {
nextTargetRegistryServers []next.NextTargetRegistryServer
}
tests := []struct {
name string
args args
want next.NextTargetRegistryServer
}{
{
name: "empty",
args: args{},
want: nil,
},
{
name: "one",
args: args{
[]next.NextTargetRegistryServer{
fakeA,
},
},
want: fakeA,
},
{
name: "multiple",
args: args{
[]next.NextTargetRegistryServer{
fakeA,
&fakeNextTargetRegistryServerImpl{
NextTargetRegistryServerImpl: &next.NextTargetRegistryServerImpl{},
name: "b",
},
&fakeNextTargetRegistryServerImpl{
NextTargetRegistryServerImpl: &next.NextTargetRegistryServerImpl{},
name: "c",
},
},
},
want: fakeA,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := next.BuildNextTargetRegistryChain(tt.args.nextTargetRegistryServers...); !reflect.DeepEqual(got, tt.want) {
t.Errorf("BuildNextTargetRegistryChain() = %v, want %v", got, tt.want)
}
})
}
}
67 changes: 67 additions & 0 deletions pkg/nsp/next/next.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
Copyright (c) 2021 Nordix Foundation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package next

import (
"context"

"github.com/golang/protobuf/ptypes/empty"
nspAPI "github.com/nordix/meridio/api/nsp/v1"
)

// NextTargetRegistryServer is the interface representing TargetRegistryServer with the
// support of the chaining feature.
type NextTargetRegistryServer interface {
nspAPI.TargetRegistryServer
setNext(NextTargetRegistryServer)
}

type NextTargetRegistryServerImpl struct {
nspAPI.UnimplementedTargetRegistryServer
next NextTargetRegistryServer
}

// Register will call the Register function of the next chain element
// If the next element is nil, then &empty.Empty{}, nil will be returned.
func (ntrsi *NextTargetRegistryServerImpl) Register(ctx context.Context, target *nspAPI.Target) (*empty.Empty, error) {
if ntrsi.next == nil {
return &empty.Empty{}, nil
}
return ntrsi.next.Register(ctx, target)
}

// Unregister will call the Unregister function of the next chain element
// If the next element is nil, then &empty.Empty{}, nil will be returned.
func (ntrsi *NextTargetRegistryServerImpl) Unregister(ctx context.Context, target *nspAPI.Target) (*empty.Empty, error) {
if ntrsi.next == nil {
return &empty.Empty{}, nil
}
return ntrsi.next.Unregister(ctx, target)
}

// Watch will call the Watch function of the next chain element
// If the next element is nil, then nil will be returned.
func (ntrsi *NextTargetRegistryServerImpl) Watch(t *nspAPI.Target, watcher nspAPI.TargetRegistry_WatchServer) error {
if ntrsi.next == nil {
return nil
}
return ntrsi.next.Watch(t, watcher)
}

func (ntrsi *NextTargetRegistryServerImpl) setNext(ntrs NextTargetRegistryServer) {
ntrsi.next = ntrs
}
59 changes: 59 additions & 0 deletions pkg/nsp/registry/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
Copyright (c) 2021 Nordix Foundation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package registry

import (
"context"

"github.com/golang/protobuf/ptypes/empty"
nspAPI "github.com/nordix/meridio/api/nsp/v1"
"github.com/nordix/meridio/pkg/nsp/next"
"github.com/nordix/meridio/pkg/nsp/types"
)

type registry struct {
TargetRegistry types.TargetRegistry
*next.NextTargetRegistryServerImpl
}

// NewServer provides an implementation of TargetRegistryServer with the
// support of the chaining feature. This implementation handles Register
// and Unregister calls by adding or removing data into a storage (e.g
// memory or sqlite)
func NewServer(targetRegistry types.TargetRegistry) *registry {
r := &registry{
TargetRegistry: targetRegistry,
NextTargetRegistryServerImpl: &next.NextTargetRegistryServerImpl{},
}
return r
}

func (r *registry) Register(ctx context.Context, target *nspAPI.Target) (*empty.Empty, error) {
err := r.TargetRegistry.Set(ctx, target)
if err != nil {
return &empty.Empty{}, err
}
return r.NextTargetRegistryServerImpl.Register(ctx, target)
}

func (r *registry) Unregister(ctx context.Context, target *nspAPI.Target) (*empty.Empty, error) {
err := r.TargetRegistry.Remove(ctx, target)
if err != nil {
return &empty.Empty{}, err
}
return r.NextTargetRegistryServerImpl.Unregister(ctx, target)
}
53 changes: 3 additions & 50 deletions pkg/nsp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,58 +17,11 @@ limitations under the License.
package nsp

import (
"context"

"github.com/golang/protobuf/ptypes/empty"
nspAPI "github.com/nordix/meridio/api/nsp/v1"
"github.com/nordix/meridio/pkg/nsp/types"
"github.com/sirupsen/logrus"
"github.com/nordix/meridio/pkg/nsp/next"
)

type Server struct {
nspAPI.UnimplementedTargetRegistryServer
TargetRegistry types.TargetRegistry
}

// NewServer -
func NewServer(targetRegistry types.TargetRegistry) nspAPI.TargetRegistryServer {
networkServicePlateformService := &Server{
TargetRegistry: targetRegistry,
}

return networkServicePlateformService
}

func (s *Server) Register(ctx context.Context, target *nspAPI.Target) (*empty.Empty, error) {
return &empty.Empty{}, s.TargetRegistry.Set(ctx, target)
}

func (s *Server) Unregister(ctx context.Context, target *nspAPI.Target) (*empty.Empty, error) {
return &empty.Empty{}, s.TargetRegistry.Remove(ctx, target)
}

func (s *Server) Watch(t *nspAPI.Target, watcher nspAPI.TargetRegistry_WatchServer) error {
targetWatcher, err := s.TargetRegistry.Watch(context.TODO(), t)
if err != nil {
return err
}
s.watcher(watcher, targetWatcher.ResultChan())
targetWatcher.Stop()
return nil
}

func (s *Server) watcher(watcher nspAPI.TargetRegistry_WatchServer, ch <-chan []*nspAPI.Target) {
for {
select {
case event := <-ch:
err := watcher.Send(&nspAPI.TargetResponse{
Targets: event,
})
if err != nil {
logrus.Errorf("err sending TrenchResponse: %v", err)
}
case <-watcher.Context().Done():
return
}
}
func NewServer(nextTargetRegistryServers ...next.NextTargetRegistryServer) nspAPI.TargetRegistryServer {
return next.BuildNextTargetRegistryChain(nextTargetRegistryServers...)
}
Loading

0 comments on commit 34b917a

Please sign in to comment.