diff --git a/cmd/nsp/main.go b/cmd/nsp/main.go index 840f37b1..aa715681 100644 --- a/cmd/nsp/main.go +++ b/cmd/nsp/main.go @@ -32,6 +32,9 @@ import ( "github.com/nordix/meridio/pkg/configuration/registry" "github.com/nordix/meridio/pkg/health" "github.com/nordix/meridio/pkg/nsp" + nspRegistry "github.com/nordix/meridio/pkg/nsp/registry" + "github.com/nordix/meridio/pkg/nsp/watchhandler" + "github.com/pkg/errors" keepAliveRegistry "github.com/nordix/meridio/pkg/nsp/registry/keepalive" sqliteRegistry "github.com/nordix/meridio/pkg/nsp/registry/sqlite" @@ -97,31 +100,18 @@ func main() { ctx = health.CreateChecker(ctx) // configuration - configurationEventChan := make(chan *registry.ConfigurationEvent, 10) - configurationRegistry := registry.New(configurationEventChan) - configurationMonitor, err := monitor.New(config.ConfigMapName, config.Namespace, configurationRegistry) + configurationManagerServer, err := CreateConfigurationManagerServer(ctx, &config) if err != nil { - logrus.Fatalf("Unable to start configuration monitor: %v", err) + logrus.Fatalf("CreateConfigurationManagerServer err: %v", err) } - go configurationMonitor.Start(context.Background()) - watcherNotifier := manager.NewWatcherNotifier(configurationRegistry, configurationEventChan) - go watcherNotifier.Start(context.Background()) - configurationManagerServer := manager.NewServer(watcherNotifier) // target registry - sqlr, err := sqliteRegistry.New(config.Datasource) + targetRegistryServer, err := CreateTargetRegistryServer(ctx, &config) if err != nil { - logrus.Fatalf("Unable create sqlite registry: %v", err) + logrus.Fatalf("CreateTargetRegistryServer err: %v", err) } - keepAliveRegistry, err := keepAliveRegistry.New( - keepAliveRegistry.WithRegistry(sqlr), - keepAliveRegistry.WithTimeout(config.EntryTimeout), - ) - if err != nil { - logrus.Fatalf("Unable create keepalive registry: %v", err) - } - targetRegistryServer := nsp.NewServer(keepAliveRegistry) + // Create Server server := grpc.NewServer(grpc.Creds( credentials.GetServer(context.Background()), )) @@ -141,3 +131,34 @@ func main() { <-ctx.Done() } + +func CreateTargetRegistryServer(ctx context.Context, config *Config) (nspAPI.TargetRegistryServer, error) { + sqlr, err := sqliteRegistry.New(config.Datasource) + if err != nil { + return nil, errors.Wrap(err, "Unable create sqlite registry") + } + keepAliveRegistry, err := keepAliveRegistry.New( + keepAliveRegistry.WithRegistry(sqlr), + keepAliveRegistry.WithTimeout(config.EntryTimeout), + ) + if err != nil { + return nil, errors.Wrap(err, "Unable create keepalive registry") + } + return nsp.NewServer( + nspRegistry.NewServer(keepAliveRegistry), + watchhandler.NewServer(keepAliveRegistry), + ), nil +} + +func CreateConfigurationManagerServer(ctx context.Context, config *Config) (nspAPI.ConfigurationManagerServer, error) { + configurationEventChan := make(chan *registry.ConfigurationEvent, 10) + configurationRegistry := registry.New(configurationEventChan) + configurationMonitor, err := monitor.New(config.ConfigMapName, config.Namespace, configurationRegistry) + if err != nil { + return nil, errors.Wrap(err, "Unable to start configuration monitor") + } + go configurationMonitor.Start(context.Background()) + watcherNotifier := manager.NewWatcherNotifier(configurationRegistry, configurationEventChan) + go watcherNotifier.Start(context.Background()) + return manager.NewServer(watcherNotifier), nil +} diff --git a/pkg/nsp/next/builder.go b/pkg/nsp/next/builder.go new file mode 100644 index 00000000..0d2f0513 --- /dev/null +++ b/pkg/nsp/next/builder.go @@ -0,0 +1,33 @@ +/* +Copyright (c) 2022 Nordix Foundation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package next + +// BuildNextTargetRegistryChain chains the target registry servers together. +// Each NextTargetRegistryServer must have a non nil NextTargetRegistryServerImpl. +// If the list of nextTargetRegistryServers is nil or empty, a nil value will be returned. +func BuildNextTargetRegistryChain(nextTargetRegistryServers ...NextTargetRegistryServer) NextTargetRegistryServer { + if len(nextTargetRegistryServers) <= 0 { + return nil + } + for i, ntrs := range nextTargetRegistryServers { + if i >= (len(nextTargetRegistryServers) - 1) { + break + } + ntrs.setNext(nextTargetRegistryServers[i+1]) + } + return nextTargetRegistryServers[0] +} diff --git a/pkg/nsp/next/builder_test.go b/pkg/nsp/next/builder_test.go new file mode 100644 index 00000000..6be6afa2 --- /dev/null +++ b/pkg/nsp/next/builder_test.go @@ -0,0 +1,83 @@ +/* +Copyright (c) 2022 Nordix Foundation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package next_test + +import ( + "reflect" + "testing" + + "github.com/nordix/meridio/pkg/nsp/next" +) + +type fakeNextTargetRegistryServerImpl struct { + *next.NextTargetRegistryServerImpl + name string +} + +func TestBuildNextTargetRegistryChain(t *testing.T) { + fakeA := &fakeNextTargetRegistryServerImpl{ + NextTargetRegistryServerImpl: &next.NextTargetRegistryServerImpl{}, + name: "a", + } + type args struct { + nextTargetRegistryServers []next.NextTargetRegistryServer + } + tests := []struct { + name string + args args + want next.NextTargetRegistryServer + }{ + { + name: "empty", + args: args{}, + want: nil, + }, + { + name: "one", + args: args{ + []next.NextTargetRegistryServer{ + fakeA, + }, + }, + want: fakeA, + }, + { + name: "multiple", + args: args{ + []next.NextTargetRegistryServer{ + fakeA, + &fakeNextTargetRegistryServerImpl{ + NextTargetRegistryServerImpl: &next.NextTargetRegistryServerImpl{}, + name: "b", + }, + &fakeNextTargetRegistryServerImpl{ + NextTargetRegistryServerImpl: &next.NextTargetRegistryServerImpl{}, + name: "c", + }, + }, + }, + want: fakeA, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := next.BuildNextTargetRegistryChain(tt.args.nextTargetRegistryServers...); !reflect.DeepEqual(got, tt.want) { + t.Errorf("BuildNextTargetRegistryChain() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/nsp/next/next.go b/pkg/nsp/next/next.go new file mode 100644 index 00000000..2ae74e2b --- /dev/null +++ b/pkg/nsp/next/next.go @@ -0,0 +1,67 @@ +/* +Copyright (c) 2021 Nordix Foundation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package next + +import ( + "context" + + "github.com/golang/protobuf/ptypes/empty" + nspAPI "github.com/nordix/meridio/api/nsp/v1" +) + +// NextTargetRegistryServer is the interface representing TargetRegistryServer with the +// support of the chaining feature. +type NextTargetRegistryServer interface { + nspAPI.TargetRegistryServer + setNext(NextTargetRegistryServer) +} + +type NextTargetRegistryServerImpl struct { + nspAPI.UnimplementedTargetRegistryServer + next NextTargetRegistryServer +} + +// Register will call the Register function of the next chain element +// If the next element is nil, then &empty.Empty{}, nil will be returned. +func (ntrsi *NextTargetRegistryServerImpl) Register(ctx context.Context, target *nspAPI.Target) (*empty.Empty, error) { + if ntrsi.next == nil { + return &empty.Empty{}, nil + } + return ntrsi.next.Register(ctx, target) +} + +// Unregister will call the Unregister function of the next chain element +// If the next element is nil, then &empty.Empty{}, nil will be returned. +func (ntrsi *NextTargetRegistryServerImpl) Unregister(ctx context.Context, target *nspAPI.Target) (*empty.Empty, error) { + if ntrsi.next == nil { + return &empty.Empty{}, nil + } + return ntrsi.next.Unregister(ctx, target) +} + +// Watch will call the Watch function of the next chain element +// If the next element is nil, then nil will be returned. +func (ntrsi *NextTargetRegistryServerImpl) Watch(t *nspAPI.Target, watcher nspAPI.TargetRegistry_WatchServer) error { + if ntrsi.next == nil { + return nil + } + return ntrsi.next.Watch(t, watcher) +} + +func (ntrsi *NextTargetRegistryServerImpl) setNext(ntrs NextTargetRegistryServer) { + ntrsi.next = ntrs +} diff --git a/pkg/nsp/registry/server.go b/pkg/nsp/registry/server.go new file mode 100644 index 00000000..9a8a60de --- /dev/null +++ b/pkg/nsp/registry/server.go @@ -0,0 +1,59 @@ +/* +Copyright (c) 2021 Nordix Foundation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registry + +import ( + "context" + + "github.com/golang/protobuf/ptypes/empty" + nspAPI "github.com/nordix/meridio/api/nsp/v1" + "github.com/nordix/meridio/pkg/nsp/next" + "github.com/nordix/meridio/pkg/nsp/types" +) + +type registry struct { + TargetRegistry types.TargetRegistry + *next.NextTargetRegistryServerImpl +} + +// NewServer provides an implementation of TargetRegistryServer with the +// support of the chaining feature. This implementation handles Register +// and Unregister calls by adding or removing data into a storage (e.g +// memory or sqlite) +func NewServer(targetRegistry types.TargetRegistry) *registry { + r := ®istry{ + TargetRegistry: targetRegistry, + NextTargetRegistryServerImpl: &next.NextTargetRegistryServerImpl{}, + } + return r +} + +func (r *registry) Register(ctx context.Context, target *nspAPI.Target) (*empty.Empty, error) { + err := r.TargetRegistry.Set(ctx, target) + if err != nil { + return &empty.Empty{}, err + } + return r.NextTargetRegistryServerImpl.Register(ctx, target) +} + +func (r *registry) Unregister(ctx context.Context, target *nspAPI.Target) (*empty.Empty, error) { + err := r.TargetRegistry.Remove(ctx, target) + if err != nil { + return &empty.Empty{}, err + } + return r.NextTargetRegistryServerImpl.Unregister(ctx, target) +} diff --git a/pkg/nsp/server.go b/pkg/nsp/server.go index 193b790f..07317bc7 100644 --- a/pkg/nsp/server.go +++ b/pkg/nsp/server.go @@ -17,58 +17,11 @@ limitations under the License. package nsp import ( - "context" - - "github.com/golang/protobuf/ptypes/empty" nspAPI "github.com/nordix/meridio/api/nsp/v1" - "github.com/nordix/meridio/pkg/nsp/types" - "github.com/sirupsen/logrus" + "github.com/nordix/meridio/pkg/nsp/next" ) -type Server struct { - nspAPI.UnimplementedTargetRegistryServer - TargetRegistry types.TargetRegistry -} - // NewServer - -func NewServer(targetRegistry types.TargetRegistry) nspAPI.TargetRegistryServer { - networkServicePlateformService := &Server{ - TargetRegistry: targetRegistry, - } - - return networkServicePlateformService -} - -func (s *Server) Register(ctx context.Context, target *nspAPI.Target) (*empty.Empty, error) { - return &empty.Empty{}, s.TargetRegistry.Set(ctx, target) -} - -func (s *Server) Unregister(ctx context.Context, target *nspAPI.Target) (*empty.Empty, error) { - return &empty.Empty{}, s.TargetRegistry.Remove(ctx, target) -} - -func (s *Server) Watch(t *nspAPI.Target, watcher nspAPI.TargetRegistry_WatchServer) error { - targetWatcher, err := s.TargetRegistry.Watch(context.TODO(), t) - if err != nil { - return err - } - s.watcher(watcher, targetWatcher.ResultChan()) - targetWatcher.Stop() - return nil -} - -func (s *Server) watcher(watcher nspAPI.TargetRegistry_WatchServer, ch <-chan []*nspAPI.Target) { - for { - select { - case event := <-ch: - err := watcher.Send(&nspAPI.TargetResponse{ - Targets: event, - }) - if err != nil { - logrus.Errorf("err sending TrenchResponse: %v", err) - } - case <-watcher.Context().Done(): - return - } - } +func NewServer(nextTargetRegistryServers ...next.NextTargetRegistryServer) nspAPI.TargetRegistryServer { + return next.BuildNextTargetRegistryChain(nextTargetRegistryServers...) } diff --git a/pkg/nsp/watchhandler/server.go b/pkg/nsp/watchhandler/server.go new file mode 100644 index 00000000..ab59383c --- /dev/null +++ b/pkg/nsp/watchhandler/server.go @@ -0,0 +1,72 @@ +/* +Copyright (c) 2021 Nordix Foundation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watchhandler + +import ( + "context" + + nspAPI "github.com/nordix/meridio/api/nsp/v1" + "github.com/nordix/meridio/pkg/nsp/next" + "github.com/nordix/meridio/pkg/nsp/types" + "github.com/sirupsen/logrus" +) + +type watchhandler struct { + TargetRegistry types.TargetRegistry + *next.NextTargetRegistryServerImpl +} + +// NewServer provides an implementation of TargetRegistryServer with the +// support of the chaining feature. This implementation handles Watch +// calls by sending corresponding watched resources on every change. +func NewServer(targetRegistry types.TargetRegistry) *watchhandler { + r := &watchhandler{ + TargetRegistry: targetRegistry, + NextTargetRegistryServerImpl: &next.NextTargetRegistryServerImpl{}, + } + return r +} + +func (wh *watchhandler) Watch(t *nspAPI.Target, watcher nspAPI.TargetRegistry_WatchServer) error { + err := wh.NextTargetRegistryServerImpl.Watch(t, watcher) + if err != nil { + return err + } + targetWatcher, err := wh.TargetRegistry.Watch(context.TODO(), t) + if err != nil { + return err + } + wh.watcher(watcher, targetWatcher.ResultChan()) + targetWatcher.Stop() + return nil +} + +func (wh *watchhandler) watcher(watcher nspAPI.TargetRegistry_WatchServer, ch <-chan []*nspAPI.Target) { + for { + select { + case event := <-ch: + err := watcher.Send(&nspAPI.TargetResponse{ + Targets: event, + }) + if err != nil { + logrus.Errorf("err sending TrenchResponse: %v", err) + } + case <-watcher.Context().Done(): + return + } + } +}