From 0f0966c68a55670a9b859d5b0d43e3cd6a098c46 Mon Sep 17 00:00:00 2001 From: Gao Hongtao Date: Wed, 19 Jul 2023 11:24:27 +0000 Subject: [PATCH 1/2] Support multiple roles for banyand Signed-off-by: Gao Hongtao --- CHANGES.md | 1 + banyand/internal/cmd/liaison.go | 108 ++++++++++++++++++++++ banyand/internal/cmd/meta.go | 87 ++++++++++++++++++ banyand/internal/cmd/root.go | 3 + banyand/internal/cmd/standalone.go | 14 +-- banyand/internal/cmd/storage.go | 141 +++++++++++++++++++++++++++++ banyand/liaison/grpc/property.go | 2 +- banyand/liaison/grpc/registry.go | 12 +-- banyand/liaison/grpc/server.go | 2 +- banyand/liaison/liaison.go | 2 +- banyand/metadata/client.go | 25 +++++ banyand/query/processor.go | 4 +- banyand/query/query.go | 4 +- docs/concept/clustering.md | 8 ++ docs/installation.md | 83 ++++++++--------- pkg/run/run.go | 36 ++++++++ pkg/test/setup/setup.go | 2 +- 17 files changed, 470 insertions(+), 64 deletions(-) create mode 100644 banyand/internal/cmd/liaison.go create mode 100644 banyand/internal/cmd/meta.go create mode 100644 banyand/internal/cmd/storage.go create mode 100644 banyand/metadata/client.go diff --git a/CHANGES.md b/CHANGES.md index dc7ab55bc..15a8a8ae1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -9,6 +9,7 @@ Release Notes. - List all properties in a group. - Implement Write-ahead Logging - Document the clustering. +- Support multiple roles for banyand server. ### Bugs diff --git a/banyand/internal/cmd/liaison.go b/banyand/internal/cmd/liaison.go new file mode 100644 index 000000000..6c8834680 --- /dev/null +++ b/banyand/internal/cmd/liaison.go @@ -0,0 +1,108 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cmd + +import ( + "context" + "fmt" + "os" + + "github.com/spf13/cobra" + + "github.com/apache/skywalking-banyandb/banyand/discovery" + "github.com/apache/skywalking-banyandb/banyand/liaison" + "github.com/apache/skywalking-banyandb/banyand/liaison/http" + "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/banyand/queue" + "github.com/apache/skywalking-banyandb/pkg/config" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/run" + "github.com/apache/skywalking-banyandb/pkg/signal" + "github.com/apache/skywalking-banyandb/pkg/version" +) + +var liaisonGroup = run.NewGroup("liaison") + +func newLiaisonCmd() *cobra.Command { + l := logger.GetLogger("bootstrap") + ctx := context.Background() + repo, err := discovery.NewServiceRepo(ctx) + if err != nil { + l.Fatal().Err(err).Msg("failed to initiate service repository") + } + // nolint: staticcheck + pipeline, err := queue.NewQueue(ctx, repo) + if err != nil { + l.Fatal().Err(err).Msg("failed to initiate data pipeline") + } + metaSvc, err := metadata.NewClient(ctx) + if err != nil { + l.Fatal().Err(err).Msg("failed to initiate metadata service") + } + tcp, err := liaison.NewEndpoint(ctx, pipeline, repo, metaSvc) + if err != nil { + l.Fatal().Err(err).Msg("failed to initiate Endpoint transport layer") + } + profSvc := observability.NewProfService() + metricSvc := observability.NewMetricService() + httpServer := http.NewService() + + units := []run.Unit{ + new(signal.Handler), + repo, + pipeline, + tcp, + httpServer, + profSvc, + } + if metricSvc != nil { + units = append(units, metricSvc) + } + // Meta the run Group units. + liaisonGroup.Register(units...) + logging := logger.Logging{} + liaisonCmd := &cobra.Command{ + Use: "liaison", + Version: version.Build(), + Short: "Run as the liaison server", + PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) { + if err = config.Load("logging", cmd.Flags()); err != nil { + return err + } + return logger.Init(logging) + }, + RunE: func(cmd *cobra.Command, args []string) (err error) { + fmt.Print(logo) + logger.GetLogger().Info().Msg("starting as a liaison server") + // Spawn our go routines and wait for shutdown. + if err := liaisonGroup.Run(); err != nil { + logger.GetLogger().Error().Err(err).Stack().Str("name", liaisonGroup.Name()).Msg("Exit") + os.Exit(-1) + } + return nil + }, + } + + liaisonCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the logging") + liaisonCmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging") + liaisonCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module") + liaisonCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging") + liaisonCmd.Flags().AddFlagSet(liaisonGroup.RegisterFlags().FlagSet) + return liaisonCmd +} diff --git a/banyand/internal/cmd/meta.go b/banyand/internal/cmd/meta.go new file mode 100644 index 000000000..df7530d26 --- /dev/null +++ b/banyand/internal/cmd/meta.go @@ -0,0 +1,87 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cmd + +import ( + "context" + "fmt" + "os" + + "github.com/spf13/cobra" + + "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/pkg/config" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/run" + "github.com/apache/skywalking-banyandb/pkg/signal" + "github.com/apache/skywalking-banyandb/pkg/version" +) + +var metaGroup = run.NewGroup("meta") + +func newMetaCmd() *cobra.Command { + l := logger.GetLogger("bootstrap") + ctx := context.Background() + metaSvc, err := metadata.NewService(ctx) + if err != nil { + l.Fatal().Err(err).Msg("failed to initiate metadata service") + } + profSvc := observability.NewProfService() + metricSvc := observability.NewMetricService() + + units := []run.Unit{ + new(signal.Handler), + metaSvc, + profSvc, + } + if metricSvc != nil { + units = append(units, metricSvc) + } + // Meta the run Group units. + metaGroup.Register(units...) + logging := logger.Logging{} + metaCmd := &cobra.Command{ + Use: "meta", + Version: version.Build(), + Short: "Run as the meta server", + PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) { + if err = config.Load("logging", cmd.Flags()); err != nil { + return err + } + return logger.Init(logging) + }, + RunE: func(cmd *cobra.Command, args []string) (err error) { + fmt.Print(logo) + logger.GetLogger().Info().Msg("starting as a meta server") + // Spawn our go routines and wait for shutdown. + if err := metaGroup.Run(); err != nil { + logger.GetLogger().Error().Err(err).Stack().Str("name", metaGroup.Name()).Msg("Exit") + os.Exit(-1) + } + return nil + }, + } + + metaCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the logging") + metaCmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging") + metaCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module") + metaCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging") + metaCmd.Flags().AddFlagSet(metaGroup.RegisterFlags().FlagSet) + return metaCmd +} diff --git a/banyand/internal/cmd/root.go b/banyand/internal/cmd/root.go index c5dd966a1..da67766da 100644 --- a/banyand/internal/cmd/root.go +++ b/banyand/internal/cmd/root.go @@ -44,5 +44,8 @@ BanyanDB, as an observability database, aims to ingest, analyze and store Metric `, } cmd.AddCommand(newStandaloneCmd()) + cmd.AddCommand(newMetaCmd()) + cmd.AddCommand(newStorageCmd()) + cmd.AddCommand(newLiaisonCmd()) return cmd } diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go index 7a91cd777..027e0407d 100644 --- a/banyand/internal/cmd/standalone.go +++ b/banyand/internal/cmd/standalone.go @@ -40,7 +40,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/version" ) -var g = run.NewGroup("standalone") +var standaloneGroup = run.NewGroup("standalone") func newStandaloneCmd() *cobra.Command { l := logger.GetLogger("bootstrap") @@ -65,7 +65,7 @@ func newStandaloneCmd() *cobra.Command { if err != nil { l.Fatal().Err(err).Msg("failed to initiate measure service") } - q, err := query.NewService(ctx, streamSvc, measureSvc, metaSvc, repo, pipeline) + q, err := query.NewService(ctx, streamSvc, measureSvc, metaSvc, pipeline) if err != nil { l.Fatal().Err(err).Msg("failed to initiate query processor") } @@ -93,12 +93,12 @@ func newStandaloneCmd() *cobra.Command { units = append(units, metricSvc) } // Meta the run Group units. - g.Register(units...) + standaloneGroup.Register(units...) logging := logger.Logging{} standaloneCmd := &cobra.Command{ Use: "standalone", Version: version.Build(), - Short: "Run as the standalone mode", + Short: "Run as the standalone server", PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) { if err = config.Load("logging", cmd.Flags()); err != nil { return err @@ -109,8 +109,8 @@ func newStandaloneCmd() *cobra.Command { fmt.Print(logo) logger.GetLogger().Info().Msg("starting as a standalone server") // Spawn our go routines and wait for shutdown. - if err := g.Run(); err != nil { - logger.GetLogger().Error().Err(err).Stack().Str("name", g.Name()).Msg("Exit") + if err := standaloneGroup.Run(); err != nil { + logger.GetLogger().Error().Err(err).Stack().Str("name", standaloneGroup.Name()).Msg("Exit") os.Exit(-1) } return nil @@ -121,6 +121,6 @@ func newStandaloneCmd() *cobra.Command { standaloneCmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging") standaloneCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module") standaloneCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging") - standaloneCmd.Flags().AddFlagSet(g.RegisterFlags().FlagSet) + standaloneCmd.Flags().AddFlagSet(standaloneGroup.RegisterFlags().FlagSet) return standaloneCmd } diff --git a/banyand/internal/cmd/storage.go b/banyand/internal/cmd/storage.go new file mode 100644 index 000000000..80c3311b9 --- /dev/null +++ b/banyand/internal/cmd/storage.go @@ -0,0 +1,141 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cmd + +import ( + "context" + "fmt" + "os" + + "github.com/spf13/cobra" + + "github.com/apache/skywalking-banyandb/banyand/discovery" + "github.com/apache/skywalking-banyandb/banyand/measure" + "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/banyand/query" + "github.com/apache/skywalking-banyandb/banyand/queue" + "github.com/apache/skywalking-banyandb/banyand/stream" + "github.com/apache/skywalking-banyandb/pkg/config" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/run" + "github.com/apache/skywalking-banyandb/pkg/signal" + "github.com/apache/skywalking-banyandb/pkg/version" +) + +var storageGroup = run.NewGroup("storage") + +const ( + storageModeData = "data" + storageModeQuery = "query" + storageModeMix = "mix" +) + +var flagStorageMode string + +func newStorageCmd() *cobra.Command { + l := logger.GetLogger("bootstrap") + ctx := context.Background() + repo, err := discovery.NewServiceRepo(ctx) + if err != nil { + l.Fatal().Err(err).Msg("failed to initiate service repository") + } + // nolint: staticcheck + pipeline, err := queue.NewQueue(ctx, repo) + if err != nil { + l.Fatal().Err(err).Msg("failed to initiate data pipeline") + } + metaSvc, err := metadata.NewClient(ctx) + if err != nil { + l.Fatal().Err(err).Msg("failed to initiate metadata service") + } + streamSvc, err := stream.NewService(ctx, metaSvc, repo, pipeline) + if err != nil { + l.Fatal().Err(err).Msg("failed to initiate stream service") + } + measureSvc, err := measure.NewService(ctx, metaSvc, repo, pipeline) + if err != nil { + l.Fatal().Err(err).Msg("failed to initiate measure service") + } + // TODO: remove streamSVC and measureSvc from query processor. To use metaSvc instead. + q, err := query.NewService(ctx, streamSvc, measureSvc, metaSvc, pipeline) + if err != nil { + l.Fatal().Err(err).Msg("failed to initiate query processor") + } + profSvc := observability.NewProfService() + metricSvc := observability.NewMetricService() + + units := []run.Unit{ + new(signal.Handler), + repo, + pipeline, + measureSvc, + streamSvc, + q, + profSvc, + } + if metricSvc != nil { + units = append(units, metricSvc) + } + // Meta the run Group units. + storageGroup.Register(units...) + logging := logger.Logging{} + storageCmd := &cobra.Command{ + Use: "storage", + Version: version.Build(), + Short: "Run as the storage server", + PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) { + if err = config.Load("logging", cmd.Flags()); err != nil { + return err + } + return logger.Init(logging) + }, + PreRun: func(cmd *cobra.Command, args []string) { + if flagStorageMode == storageModeMix { + return + } + switch flagStorageMode { + case storageModeData: + storageGroup.Deregister(q) + case storageModeQuery: + storageGroup.Deregister(streamSvc) + storageGroup.Deregister(measureSvc) + default: + l.Fatal().Str("mode", flagStorageMode).Msg("unknown storage mode") + } + }, + RunE: func(cmd *cobra.Command, args []string) (err error) { + fmt.Print(logo) + logger.GetLogger().Info().Msg("starting as a storage server") + // Spawn our go routines and wait for shutdown. + if err := storageGroup.Run(); err != nil { + logger.GetLogger().Error().Err(err).Stack().Str("name", storageGroup.Name()).Msg("Exit") + os.Exit(-1) + } + return nil + }, + } + + storageCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the logging") + storageCmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging") + storageCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module") + storageCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging") + storageCmd.Flags().StringVarP(&flagStorageMode, "mode", "m", storageModeMix, "the storage mode, one of [data, query, mix]") + storageCmd.Flags().AddFlagSet(storageGroup.RegisterFlags().FlagSet) + return storageCmd +} diff --git a/banyand/liaison/grpc/property.go b/banyand/liaison/grpc/property.go index 51b0eba7e..05a542f8a 100644 --- a/banyand/liaison/grpc/property.go +++ b/banyand/liaison/grpc/property.go @@ -26,7 +26,7 @@ import ( type propertyServer struct { propertyv1.UnimplementedPropertyServiceServer - schemaRegistry metadata.Service + schemaRegistry metadata.Repo } func (ps *propertyServer) Apply(ctx context.Context, req *propertyv1.ApplyRequest) (*propertyv1.ApplyResponse, error) { diff --git a/banyand/liaison/grpc/registry.go b/banyand/liaison/grpc/registry.go index 87927e7f1..93e49138c 100644 --- a/banyand/liaison/grpc/registry.go +++ b/banyand/liaison/grpc/registry.go @@ -32,7 +32,7 @@ import ( type streamRegistryServer struct { databasev1.UnimplementedStreamRegistryServiceServer - schemaRegistry metadata.Service + schemaRegistry metadata.Repo } func (rs *streamRegistryServer) Create(ctx context.Context, @@ -123,7 +123,7 @@ func groupExist(ctx context.Context, errResource error, metadata *commonv1.Metad type indexRuleBindingRegistryServer struct { databasev1.UnimplementedIndexRuleBindingRegistryServiceServer - schemaRegistry metadata.Service + schemaRegistry metadata.Repo } func (rs *indexRuleBindingRegistryServer) Create(ctx context.Context, @@ -205,7 +205,7 @@ func (rs *indexRuleBindingRegistryServer) Exist(ctx context.Context, req *databa type indexRuleRegistryServer struct { databasev1.UnimplementedIndexRuleRegistryServiceServer - schemaRegistry metadata.Service + schemaRegistry metadata.Repo } func (rs *indexRuleRegistryServer) Create(ctx context.Context, req *databasev1.IndexRuleRegistryServiceCreateRequest) ( @@ -281,7 +281,7 @@ func (rs *indexRuleRegistryServer) Exist(ctx context.Context, req *databasev1.In type measureRegistryServer struct { databasev1.UnimplementedMeasureRegistryServiceServer - schemaRegistry metadata.Service + schemaRegistry metadata.Repo } func (rs *measureRegistryServer) Create(ctx context.Context, req *databasev1.MeasureRegistryServiceCreateRequest) ( @@ -355,7 +355,7 @@ func (rs *measureRegistryServer) Exist(ctx context.Context, req *databasev1.Meas type groupRegistryServer struct { databasev1.UnimplementedGroupRegistryServiceServer - schemaRegistry metadata.Service + schemaRegistry metadata.Repo } func (rs *groupRegistryServer) Create(ctx context.Context, req *databasev1.GroupRegistryServiceCreateRequest) ( @@ -429,7 +429,7 @@ func (rs *groupRegistryServer) Exist(ctx context.Context, req *databasev1.GroupR type topNAggregationRegistryServer struct { databasev1.UnimplementedTopNAggregationRegistryServiceServer - schemaRegistry metadata.Service + schemaRegistry metadata.Repo } func (ts *topNAggregationRegistryServer) Create(ctx context.Context, diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go index a92ff7a64..9ead16c37 100644 --- a/banyand/liaison/grpc/server.go +++ b/banyand/liaison/grpc/server.go @@ -85,7 +85,7 @@ type server struct { } // NewServer returns a new gRPC server. -func NewServer(_ context.Context, pipeline queue.Queue, repo discovery.ServiceRepo, schemaRegistry metadata.Service) run.Unit { +func NewServer(_ context.Context, pipeline queue.Queue, repo discovery.ServiceRepo, schemaRegistry metadata.Repo) run.Unit { streamSVC := &streamService{ discoveryService: newDiscoveryService(pipeline), } diff --git a/banyand/liaison/liaison.go b/banyand/liaison/liaison.go index bb59f228a..539d2f45e 100644 --- a/banyand/liaison/liaison.go +++ b/banyand/liaison/liaison.go @@ -29,6 +29,6 @@ import ( ) // NewEndpoint return a new endpoint which is the entry point for the database server. -func NewEndpoint(ctx context.Context, pipeline queue.Queue, repo discovery.ServiceRepo, schemaRegistry metadata.Service) (run.Unit, error) { +func NewEndpoint(ctx context.Context, pipeline queue.Queue, repo discovery.ServiceRepo, schemaRegistry metadata.Repo) (run.Unit, error) { return grpc.NewServer(ctx, pipeline, repo, schemaRegistry), nil } diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go new file mode 100644 index 000000000..f89745aad --- /dev/null +++ b/banyand/metadata/client.go @@ -0,0 +1,25 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package metadata + +import "context" + +// NewClient returns a new metadata client. +func NewClient(_ context.Context) (Repo, error) { + return nil, nil +} diff --git a/banyand/query/processor.go b/banyand/query/processor.go index 12c7a1066..2884f8d2c 100644 --- a/banyand/query/processor.go +++ b/banyand/query/processor.go @@ -27,7 +27,6 @@ import ( "github.com/apache/skywalking-banyandb/api/data" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" - "github.com/apache/skywalking-banyandb/banyand/discovery" "github.com/apache/skywalking-banyandb/banyand/measure" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/queue" @@ -54,8 +53,7 @@ var ( type queryService struct { log *logger.Logger // TODO: remove the metaService once https://github.com/apache/skywalking/issues/10121 is fixed. - metaService metadata.Service - serviceRepo discovery.ServiceRepo + metaService metadata.Repo pipeline queue.Queue sqp *streamQueryProcessor mqp *measureQueryProcessor diff --git a/banyand/query/query.go b/banyand/query/query.go index 3e4eab1e9..fbff50306 100644 --- a/banyand/query/query.go +++ b/banyand/query/query.go @@ -21,7 +21,6 @@ package query import ( "context" - "github.com/apache/skywalking-banyandb/banyand/discovery" "github.com/apache/skywalking-banyandb/banyand/measure" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/queue" @@ -31,11 +30,10 @@ import ( // NewService return a new query service. func NewService(_ context.Context, streamService stream.Service, measureService measure.Service, - metaService metadata.Service, serviceRepo discovery.ServiceRepo, pipeline queue.Queue, + metaService metadata.Repo, pipeline queue.Queue, ) (run.Unit, error) { svc := &queryService{ metaService: metaService, - serviceRepo: serviceRepo, pipeline: pipeline, } // measure query processor diff --git a/docs/concept/clustering.md b/docs/concept/clustering.md index 82b5859d8..733f14e8d 100644 --- a/docs/concept/clustering.md +++ b/docs/concept/clustering.md @@ -35,6 +35,14 @@ BanyanDB integrates multiple roles into a single process in the standalone mode, In this mode, the single process performs the roles of the Liaison Node, Query Node, Data Node, and Meta Node. It receives requests, maintains metadata, processes queries, and handles data, all within a unified setup. +### 1.6 Mix Mode in Storage Nodes + +Query nodes and data nodes are implemented by a same executable binary, Storage Node. With the flag "mode", the storage node can be started as a query node or a data node. The default mode is "mix", which means the storage node is both a query node and a data node. + +If the workload of query is high, you can start more storage nodes with the flag "mode" set to "query". If the workload of write is high, you can start more storage nodes with the flag "mode" set to "data". + +Or you can start storage nodes with the flag "mode" set to "mix" to balance the workload of query and write. + ## 2. Communication within a Cluster All nodes within a BanyanDB cluster communicate with other nodes according to their roles: diff --git a/docs/installation.md b/docs/installation.md index 6e3ef2d2e..d29ab2132 100644 --- a/docs/installation.md +++ b/docs/installation.md @@ -92,7 +92,10 @@ Usage: Available Commands: completion generate the autocompletion script for the specified shell help Help about any command - standalone Run as the standalone mode + liaison Run as the liaison server + meta Run as the meta server + standalone Run as the standalone server + storage Run as the storage server Flags: -h, --help help for this command @@ -101,7 +104,7 @@ Flags: Use " [command] --help" for more information about a command. ``` -Banyand is running as a standalone process by +Banyand is running as a standalone server by ```shell $ ./banyand-server standalone @@ -119,46 +122,44 @@ $ ./banyand-server standalone The banyand-server would be listening on the `0.0.0.0:17912` if no errors occurred. -To discover more options to configure the banyand by +## Setup Multiple Banyand as Cluster(TBD) + +### Setup Standalone Nodes + +The standalone node is running as a standalone process by ```shell -$ ./banyand-server standalone -h -Usage: - standalone [flags] +$ ./banyand-server standalone -id n1 +$ ./banyand-server standalone -id n2 +$ ./banyand-server standalone -id n3 +``` -Flags: - --addr string the address of banyand listens (default ":17912") - --cert-file string the TLS cert file - --etcd-listen-client-url string A URL to listen on for client traffic (default "http://localhost:2379") - --etcd-listen-peer-url string A URL to listen on for peer traffic (default "http://localhost:2380") - -h, --help help for standalone - --http-addr string listen addr for http (default ":17913") - --http-cert-file string the TLS cert file of http server - --http-grpc-addr string http server redirect grpc requests to this address (default "localhost:17912") - --http-grpc-cert-file string the grpc TLS cert file if grpc server enables tls - --http-key-file string the TLS key file of http server - --http-tls connection uses TLS if true, else plain HTTP - --key-file string the TLS key file - --logging-env string the logging (default "prod") - --logging-level string the root level of logging (default "info") - --logging-levels stringArray the level logging of logging - --logging-modules stringArray the specific module - --max-recv-msg-size bytes the size of max receiving message (default 10.00MiB) - --measure-buffer-size bytes block buffer size (default 4.00MiB) - --measure-encoder-buffer-size bytes block fields buffer size (default 12.00MiB) - --measure-idx-batch-wait-sec int index batch wait in second (default 1) - --measure-root-path string the root path of database (default "/tmp") - --measure-seriesmeta-mem-size bytes series metadata memory size (default 1.00MiB) - --metadata-root-path string the root path of metadata (default "/tmp") - -n, --name string name of this service (default "standalone") - --observability-listener-addr string listen addr for observability (default ":2121") - --pprof-listener-addr string listen addr for pprof (default ":6060") - --show-rungroup-units show rungroup units - --stream-block-buffer-size bytes block buffer size (default 8.00MiB) - --stream-global-index-mem-size bytes global index memory size (default 2.00MiB) - --stream-idx-batch-wait-sec int index batch wait in second (default 1) - --stream-root-path string the root path of database (default "/tmp") - --stream-seriesmeta-mem-size bytes series metadata memory size (default 1.00MiB) - --tls connection uses TLS if true, else plain TCP - -v, --version version for standalone +The standalone node would be listening on the `` if no errors occurred. + +### Setup Role-Based Nodes + +The meta nodes should boot up firstly to provide the metadata service for the whole cluster. The meta node is running as a standalone process by + +```shell +$ ./banyand-server meta +``` + +The meta node would be listening on the `` if no errors occurred. + + +Data nodes, query nodes and liaison nodes are running as independent processes by + +```shell +$ ./banyand-server storage --mode data +$ ./banyand-server storage --mode query +$ ./banyand-server liaison +``` + +The data node, query node and liaison node would be listening on the `` if no errors occurred. + +If you want to use a `mix` mode instead of separate query and data nodes, you can run the banyand-server as processes by + +```shell +$ ./banyand-server storage +$ ./banyand-server liaison ``` diff --git a/pkg/run/run.go b/pkg/run/run.go index c45e59a0d..4630ff07e 100644 --- a/pkg/run/run.go +++ b/pkg/run/run.go @@ -176,6 +176,42 @@ func (g *Group) Register(units ...Unit) []bool { return hasRegistered } +// Deregister will remove the provided objects implementing the Unit interface +// from the Group. +// The returned array of booleans is of the same size as the amount of provided +// Units, signaling for each provided Unit if it successfully deregistered from +// Group or if it was ignored. +func (g *Group) Deregister(units ...Unit) []bool { + hasDeregistered := make([]bool, len(units)) + for idx := range units { + if c, ok := units[idx].(Config); ok { + for i := range g.c { + if g.c[i] == c { + g.c[i] = nil + hasDeregistered[idx] = true + } + } + } + if p, ok := units[idx].(PreRunner); ok { + for i := range g.p { + if g.p[i] == p { + g.p[i] = nil + hasDeregistered[idx] = true + } + } + } + if s, ok := units[idx].(Service); ok { + for i := range g.s { + if g.s[i] == s { + g.s[i] = nil + hasDeregistered[idx] = true + } + } + } + } + return hasDeregistered +} + // RegisterFlags returns FlagSet contains Flags in all modules. func (g *Group) RegisterFlags() *FlagSet { // run configuration stage diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go index 1c6dd77a8..7b6544713 100644 --- a/pkg/test/setup/setup.go +++ b/pkg/test/setup/setup.go @@ -93,7 +93,7 @@ func modules(schemaLoaders []SchemaLoader, flags []string) func() { measureSvc, err := measure.NewService(context.TODO(), metaSvc, repo, pipeline) gomega.Expect(err).NotTo(gomega.HaveOccurred()) // Init `Query` module - q, err := query.NewService(context.TODO(), streamSvc, measureSvc, metaSvc, repo, pipeline) + q, err := query.NewService(context.TODO(), streamSvc, measureSvc, metaSvc, pipeline) gomega.Expect(err).NotTo(gomega.HaveOccurred()) tcp := grpc.NewServer(context.TODO(), pipeline, repo, metaSvc) httpServer := http.NewService() From c976d23d9ceabef201606b5fbb436bffd52b7d67 Mon Sep 17 00:00:00 2001 From: Gao Hongtao Date: Wed, 26 Jul 2023 01:53:36 +0000 Subject: [PATCH 2/2] Separate etcd server from the schema registry Signed-off-by: Gao Hongtao --- banyand/metadata/client.go | 186 +++++++++++++++++++++++- banyand/metadata/embeddedetcd/server.go | 159 ++++++++++++++++++++ banyand/metadata/metadata.go | 181 ----------------------- banyand/metadata/metadata_test.go | 1 + banyand/metadata/schema/etcd.go | 86 ++--------- banyand/metadata/schema/etcd_test.go | 73 +++++----- banyand/metadata/schema/schema.go | 3 - banyand/metadata/server.go | 95 ++++++++++++ 8 files changed, 485 insertions(+), 299 deletions(-) create mode 100644 banyand/metadata/embeddedetcd/server.go create mode 100644 banyand/metadata/server.go diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go index f89745aad..cfcfec84f 100644 --- a/banyand/metadata/client.go +++ b/banyand/metadata/client.go @@ -17,9 +17,189 @@ package metadata -import "context" +import ( + "context" + "errors" + "time" + + "go.uber.org/multierr" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/pkg/run" +) + +const flagEtcdEndpointsName = "etcd-endpoints" // NewClient returns a new metadata client. -func NewClient(_ context.Context) (Repo, error) { - return nil, nil +func NewClient(_ context.Context) (Service, error) { + return &clientService{closer: run.NewCloser(1)}, nil +} + +type clientService struct { + schemaRegistry schema.Registry + closer *run.Closer + endpoints []string +} + +func (s *clientService) FlagSet() *run.FlagSet { + fs := run.NewFlagSet("metadata") + fs.StringArrayVar(&s.endpoints, flagEtcdEndpointsName, []string{"http://localhost:2379"}, "A comma-delimited list of etcd endpoints") + return fs +} + +func (s *clientService) Validate() error { + if s.endpoints == nil { + return errors.New("endpoints is empty") + } + return nil +} + +func (s *clientService) PreRun() error { + var err error + s.schemaRegistry, err = schema.NewEtcdSchemaRegistry(schema.ConfigureServerEndpoints(s.endpoints)) + if err != nil { + return err + } + return nil +} + +func (s *clientService) Serve() run.StopNotify { + return s.closer.CloseNotify() +} + +func (s *clientService) GracefulStop() { + s.closer.Done() + s.closer.CloseThenWait() + _ = s.schemaRegistry.Close() +} + +func (s *clientService) SchemaRegistry() schema.Registry { + return s.schemaRegistry +} + +func (s *clientService) StreamRegistry() schema.Stream { + return s.schemaRegistry +} + +func (s *clientService) IndexRuleRegistry() schema.IndexRule { + return s.schemaRegistry +} + +func (s *clientService) IndexRuleBindingRegistry() schema.IndexRuleBinding { + return s.schemaRegistry +} + +func (s *clientService) MeasureRegistry() schema.Measure { + return s.schemaRegistry +} + +func (s *clientService) GroupRegistry() schema.Group { + return s.schemaRegistry +} + +func (s *clientService) TopNAggregationRegistry() schema.TopNAggregation { + return s.schemaRegistry +} + +func (s *clientService) PropertyRegistry() schema.Property { + return s.schemaRegistry +} + +func (s *clientService) Name() string { + return "metadata" +} + +func (s *clientService) IndexRules(ctx context.Context, subject *commonv1.Metadata) ([]*databasev1.IndexRule, error) { + bindings, err := s.schemaRegistry.ListIndexRuleBinding(ctx, schema.ListOpt{Group: subject.Group}) + if err != nil { + return nil, err + } + now := time.Now() + foundRules := make([]string, 0) + for _, binding := range bindings { + if binding.GetBeginAt().AsTime().After(now) || + binding.GetExpireAt().AsTime().Before(now) { + continue + } + sub := binding.GetSubject() + if sub.Name != subject.Name { + continue + } + foundRules = append(foundRules, binding.Rules...) + } + result := make([]*databasev1.IndexRule, 0, len(foundRules)) + var indexRuleErr error + for _, rule := range foundRules { + r, getErr := s.schemaRegistry.GetIndexRule(ctx, &commonv1.Metadata{ + Name: rule, + Group: subject.Group, + }) + if getErr != nil { + indexRuleErr = multierr.Append(indexRuleErr, err) + continue + } + result = append(result, r) + } + return result, indexRuleErr +} + +func (s *clientService) Subjects(ctx context.Context, indexRule *databasev1.IndexRule, catalog commonv1.Catalog) ([]schema.Spec, error) { + bindings, err := s.schemaRegistry.ListIndexRuleBinding(ctx, schema.ListOpt{Group: indexRule.GetMetadata().GetGroup()}) + if err != nil { + return nil, err + } + + now := time.Now() + var subjectErr error + foundSubjects := make([]schema.Spec, 0) + for _, binding := range bindings { + if binding.GetBeginAt().AsTime().After(now) || + binding.GetExpireAt().AsTime().Before(now) { + continue + } + sub := binding.GetSubject() + if sub.GetCatalog() != catalog { + continue + } + + if !contains(binding.GetRules(), indexRule.GetMetadata().GetName()) { + continue + } + + switch catalog { + case commonv1.Catalog_CATALOG_STREAM: + stream, getErr := s.schemaRegistry.GetStream(ctx, &commonv1.Metadata{ + Name: sub.GetName(), + Group: indexRule.GetMetadata().GetGroup(), + }) + if getErr != nil { + subjectErr = multierr.Append(subjectErr, getErr) + } + foundSubjects = append(foundSubjects, stream) + case commonv1.Catalog_CATALOG_MEASURE: + measure, getErr := s.schemaRegistry.GetMeasure(ctx, &commonv1.Metadata{ + Name: sub.GetName(), + Group: indexRule.GetMetadata().GetGroup(), + }) + if getErr != nil { + subjectErr = multierr.Append(subjectErr, getErr) + } + foundSubjects = append(foundSubjects, measure) + default: + continue + } + } + + return foundSubjects, subjectErr +} + +func contains(s []string, e string) bool { + for _, a := range s { + if a == e { + return true + } + } + return false } diff --git a/banyand/metadata/embeddedetcd/server.go b/banyand/metadata/embeddedetcd/server.go new file mode 100644 index 000000000..d6ac0cb5d --- /dev/null +++ b/banyand/metadata/embeddedetcd/server.go @@ -0,0 +1,159 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package embeddedetcd implements an embedded etcd server. +package embeddedetcd + +import ( + "io" + "net/url" + "os" + "path/filepath" + "time" + + "go.etcd.io/etcd/server/v3/embed" + "go.uber.org/zap" + + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +// Server is the interface of etcd server. +type Server interface { + io.Closer + ReadyNotify() <-chan struct{} + StopNotify() <-chan struct{} + StoppingNotify() <-chan struct{} +} + +type server struct { + server *embed.Etcd +} + +// Option is the option for etcd server. +type Option func(*config) + +// RootDir sets the root directory of Registry. +func RootDir(rootDir string) Option { + return func(config *config) { + config.rootDir = rootDir + } +} + +// ConfigureListener sets peer urls of listeners. +func ConfigureListener(lcs, lps []string) Option { + return func(config *config) { + config.listenerClientURLs = lcs + config.listenerPeerURLs = lps + } +} + +type config struct { + // rootDir is the root directory for etcd storage + rootDir string + // listenerClientURLs is the listener for client + listenerClientURLs []string + // listenerPeerURLs is the listener for peer + listenerPeerURLs []string +} + +func (e *server) ReadyNotify() <-chan struct{} { + return e.server.Server.ReadyNotify() +} + +func (e *server) StopNotify() <-chan struct{} { + return e.server.Server.StopNotify() +} + +func (e *server) StoppingNotify() <-chan struct{} { + return e.server.Server.StoppingNotify() +} + +func (e *server) Close() error { + e.server.Close() + return nil +} + +// NewServer returns a new etcd server. +func NewServer(options ...Option) (Server, error) { + conf := &config{ + rootDir: os.TempDir(), + listenerClientURLs: []string{embed.DefaultListenClientURLs}, + listenerPeerURLs: []string{embed.DefaultListenPeerURLs}, + } + for _, opt := range options { + opt(conf) + } + zapCfg := logger.GetLogger("etcd").ToZapConfig() + + var l *zap.Logger + var err error + if l, err = zapCfg.Build(); err != nil { + return nil, err + } + // TODO: allow use cluster setting + embedConfig, err := newEmbedEtcdConfig(conf, l) + if err != nil { + return nil, err + } + e, err := embed.StartEtcd(embedConfig) + if err != nil { + return nil, err + } + if e != nil { + <-e.Server.ReadyNotify() // wait for e.Server to join the cluster + } + reg := &server{ + server: e, + } + return reg, nil +} + +func newEmbedEtcdConfig(config *config, logger *zap.Logger) (*embed.Config, error) { + cfg := embed.NewConfig() + cfg.ZapLoggerBuilder = embed.NewZapLoggerBuilder(logger) + cfg.Dir = filepath.Join(config.rootDir, "metadata") + observability.UpdatePath(cfg.Dir) + parseURLs := func(urls []string) ([]url.URL, error) { + uu := make([]url.URL, 0, len(urls)) + for _, u := range urls { + cURL, err := url.Parse(u) + if err != nil { + return nil, err + } + uu = append(uu, *cURL) + } + return uu, nil + } + cURLs, err := parseURLs(config.listenerClientURLs) + if err != nil { + return nil, err + } + pURLs, err := parseURLs(config.listenerPeerURLs) + if err != nil { + return nil, err + } + cfg.Name = "meta" + cfg.ListenClientUrls, cfg.AdvertiseClientUrls = cURLs, cURLs + cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = pURLs, pURLs + cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) + + cfg.BackendBatchInterval = 500 * time.Millisecond + cfg.BackendBatchLimit = 10000 + cfg.MaxRequestBytes = 10 * 1024 * 1024 + return cfg, nil +} diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go index 86697a8ac..f9ab33d7d 100644 --- a/banyand/metadata/metadata.go +++ b/banyand/metadata/metadata.go @@ -21,10 +21,6 @@ package metadata import ( "context" - "errors" - "time" - - "go.uber.org/multierr" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" @@ -60,180 +56,3 @@ type Service interface { run.Config SchemaRegistry() schema.Registry } - -type service struct { - schemaRegistry schema.Registry - rootDir string - listenClientURL string - listenPeerURL string -} - -func (s *service) FlagSet() *run.FlagSet { - fs := run.NewFlagSet("metadata") - fs.StringVar(&s.rootDir, "metadata-root-path", "/tmp", "the root path of metadata") - fs.StringVar(&s.listenClientURL, "etcd-listen-client-url", "http://localhost:2379", "A URL to listen on for client traffic") - fs.StringVar(&s.listenPeerURL, "etcd-listen-peer-url", "http://localhost:2380", "A URL to listen on for peer traffic") - return fs -} - -func (s *service) Validate() error { - if s.rootDir == "" { - return errors.New("rootDir is empty") - } - return nil -} - -func (s *service) PreRun() error { - var err error - s.schemaRegistry, err = schema.NewEtcdSchemaRegistry( - schema.ConfigureListener(s.listenClientURL, s.listenPeerURL), - schema.RootDir(s.rootDir)) - if err != nil { - return err - } - <-s.schemaRegistry.ReadyNotify() - return nil -} - -func (s *service) Serve() run.StopNotify { - return s.schemaRegistry.StoppingNotify() -} - -func (s *service) GracefulStop() { - _ = s.schemaRegistry.Close() - <-s.schemaRegistry.StopNotify() -} - -// NewService returns a new metadata repository Service. -func NewService(_ context.Context) (Service, error) { - return &service{}, nil -} - -func (s *service) SchemaRegistry() schema.Registry { - return s.schemaRegistry -} - -func (s *service) StreamRegistry() schema.Stream { - return s.schemaRegistry -} - -func (s *service) IndexRuleRegistry() schema.IndexRule { - return s.schemaRegistry -} - -func (s *service) IndexRuleBindingRegistry() schema.IndexRuleBinding { - return s.schemaRegistry -} - -func (s *service) MeasureRegistry() schema.Measure { - return s.schemaRegistry -} - -func (s *service) GroupRegistry() schema.Group { - return s.schemaRegistry -} - -func (s *service) TopNAggregationRegistry() schema.TopNAggregation { - return s.schemaRegistry -} - -func (s *service) PropertyRegistry() schema.Property { - return s.schemaRegistry -} - -func (s *service) Name() string { - return "metadata" -} - -func (s *service) IndexRules(ctx context.Context, subject *commonv1.Metadata) ([]*databasev1.IndexRule, error) { - bindings, err := s.schemaRegistry.ListIndexRuleBinding(ctx, schema.ListOpt{Group: subject.Group}) - if err != nil { - return nil, err - } - now := time.Now() - foundRules := make([]string, 0) - for _, binding := range bindings { - if binding.GetBeginAt().AsTime().After(now) || - binding.GetExpireAt().AsTime().Before(now) { - continue - } - sub := binding.GetSubject() - if sub.Name != subject.Name { - continue - } - foundRules = append(foundRules, binding.Rules...) - } - result := make([]*databasev1.IndexRule, 0, len(foundRules)) - var indexRuleErr error - for _, rule := range foundRules { - r, getErr := s.schemaRegistry.GetIndexRule(ctx, &commonv1.Metadata{ - Name: rule, - Group: subject.Group, - }) - if getErr != nil { - indexRuleErr = multierr.Append(indexRuleErr, err) - continue - } - result = append(result, r) - } - return result, indexRuleErr -} - -func (s *service) Subjects(ctx context.Context, indexRule *databasev1.IndexRule, catalog commonv1.Catalog) ([]schema.Spec, error) { - bindings, err := s.schemaRegistry.ListIndexRuleBinding(ctx, schema.ListOpt{Group: indexRule.GetMetadata().GetGroup()}) - if err != nil { - return nil, err - } - - now := time.Now() - var subjectErr error - foundSubjects := make([]schema.Spec, 0) - for _, binding := range bindings { - if binding.GetBeginAt().AsTime().After(now) || - binding.GetExpireAt().AsTime().Before(now) { - continue - } - sub := binding.GetSubject() - if sub.GetCatalog() != catalog { - continue - } - - if !contains(binding.GetRules(), indexRule.GetMetadata().GetName()) { - continue - } - - switch catalog { - case commonv1.Catalog_CATALOG_STREAM: - stream, getErr := s.schemaRegistry.GetStream(ctx, &commonv1.Metadata{ - Name: sub.GetName(), - Group: indexRule.GetMetadata().GetGroup(), - }) - if getErr != nil { - subjectErr = multierr.Append(subjectErr, getErr) - } - foundSubjects = append(foundSubjects, stream) - case commonv1.Catalog_CATALOG_MEASURE: - measure, getErr := s.schemaRegistry.GetMeasure(ctx, &commonv1.Metadata{ - Name: sub.GetName(), - Group: indexRule.GetMetadata().GetGroup(), - }) - if getErr != nil { - subjectErr = multierr.Append(subjectErr, getErr) - } - foundSubjects = append(foundSubjects, measure) - default: - continue - } - } - - return foundSubjects, subjectErr -} - -func contains(s []string, e string) bool { - for _, a := range s { - if a == e { - return true - } - } - return false -} diff --git a/banyand/metadata/metadata_test.go b/banyand/metadata/metadata_test.go index 156480df8..56eac46ce 100644 --- a/banyand/metadata/metadata_test.go +++ b/banyand/metadata/metadata_test.go @@ -47,6 +47,7 @@ func Test_service_RulesBySubject(t *testing.T) { is.NoError(err) err = s.FlagSet().Parse([]string{"--metadata-root-path=" + rootDir}) is.NoError(err) + is.NoError(s.Validate()) err = s.PreRun() is.NoError(err) defer func() { diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go index 59056bf80..2c147f30d 100644 --- a/banyand/metadata/schema/etcd.go +++ b/banyand/metadata/schema/etcd.go @@ -19,15 +19,11 @@ package schema import ( "context" - "net/url" - "os" - "path/filepath" "sync" "time" "github.com/pkg/errors" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/server/v3/embed" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/protobuf/proto" @@ -35,7 +31,6 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" - "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" ) @@ -61,18 +56,10 @@ type HasMetadata interface { // RegistryOption is the option to create Registry. type RegistryOption func(*etcdSchemaRegistryConfig) -// RootDir sets the root directory of Registry. -func RootDir(rootDir string) RegistryOption { +// ConfigureServerEndpoints sets a list of the server urls. +func ConfigureServerEndpoints(url []string) RegistryOption { return func(config *etcdSchemaRegistryConfig) { - config.rootDir = rootDir - } -} - -// ConfigureListener sets client and peer urls of listeners. -func ConfigureListener(lc, lp string) RegistryOption { - return func(config *etcdSchemaRegistryConfig) { - config.listenerClientURL = lc - config.listenerPeerURL = lp + config.serverEndpoints = url } } @@ -86,7 +73,6 @@ func (eh *eventHandler) interestOf(kind Kind) bool { } type etcdSchemaRegistry struct { - server *embed.Etcd client *clientv3.Client closer *run.Closer handlers []*eventHandler @@ -94,12 +80,7 @@ type etcdSchemaRegistry struct { } type etcdSchemaRegistryConfig struct { - // rootDir is the root directory for etcd storage - rootDir string - // listenerClientURL is the listener for client - listenerClientURL string - // listenerPeerURL is the listener for peer - listenerPeerURL string + serverEndpoints []string } func (e *etcdSchemaRegistry) RegisterHandler(kind Kind, handler EventHandler) { @@ -133,35 +114,21 @@ func (e *etcdSchemaRegistry) notifyDelete(metadata Metadata) { } } -func (e *etcdSchemaRegistry) ReadyNotify() <-chan struct{} { - return e.server.Server.ReadyNotify() -} - -func (e *etcdSchemaRegistry) StopNotify() <-chan struct{} { - return e.server.Server.StopNotify() -} - -func (e *etcdSchemaRegistry) StoppingNotify() <-chan struct{} { - return e.server.Server.StoppingNotify() -} - func (e *etcdSchemaRegistry) Close() error { + e.closer.Done() e.closer.CloseThenWait() - _ = e.client.Close() - e.server.Close() - return nil + return e.client.Close() } // NewEtcdSchemaRegistry returns a Registry powered by Etcd. func NewEtcdSchemaRegistry(options ...RegistryOption) (Registry, error) { - registryConfig := &etcdSchemaRegistryConfig{ - rootDir: os.TempDir(), - listenerClientURL: embed.DefaultListenClientURLs, - listenerPeerURL: embed.DefaultListenPeerURLs, - } + registryConfig := &etcdSchemaRegistryConfig{} for _, opt := range options { opt(registryConfig) } + if registryConfig.serverEndpoints == nil { + return nil, errors.New("server address is not set") + } zapCfg := logger.GetLogger("etcd").ToZapConfig() var l *zap.Logger @@ -169,18 +136,9 @@ func NewEtcdSchemaRegistry(options ...RegistryOption) (Registry, error) { if l, err = zapCfg.Build(); err != nil { return nil, err } - // TODO: allow use cluster setting - embedConfig := newStandaloneEtcdConfig(registryConfig, l) - e, err := embed.StartEtcd(embedConfig) - if err != nil { - return nil, err - } - if e != nil { - <-e.Server.ReadyNotify() // wait for e.Server to join the cluster - } config := clientv3.Config{ - Endpoints: []string{e.Config().AdvertiseClientUrls[0].String()}, + Endpoints: registryConfig.serverEndpoints, DialTimeout: 5 * time.Second, DialKeepAliveTime: 30 * time.Second, DialKeepAliveTimeout: 10 * time.Second, @@ -192,9 +150,8 @@ func NewEtcdSchemaRegistry(options ...RegistryOption) (Registry, error) { return nil, err } reg := &etcdSchemaRegistry{ - server: e, client: client, - closer: run.NewCloser(0), + closer: run.NewCloser(1), } return reg, nil } @@ -397,22 +354,3 @@ func incrementLastByte(key string) string { bb[len(bb)-1]++ return string(bb) } - -func newStandaloneEtcdConfig(config *etcdSchemaRegistryConfig, logger *zap.Logger) *embed.Config { - cfg := embed.NewConfig() - cfg.ZapLoggerBuilder = embed.NewZapLoggerBuilder(logger) - cfg.Dir = filepath.Join(config.rootDir, "metadata") - observability.UpdatePath(cfg.Dir) - cURL, _ := url.Parse(config.listenerClientURL) - pURL, _ := url.Parse(config.listenerPeerURL) - - cfg.ClusterState = "new" - cfg.ListenClientUrls, cfg.AdvertiseClientUrls = []url.URL{*cURL}, []url.URL{*cURL} - cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = []url.URL{*pURL}, []url.URL{*pURL} - cfg.InitialCluster = ",default=" + pURL.String() - - cfg.BackendBatchInterval = 500 * time.Millisecond - cfg.BackendBatchLimit = 10000 - cfg.MaxRequestBytes = 10 * 1024 * 1024 - return cfg -} diff --git a/banyand/metadata/schema/etcd_test.go b/banyand/metadata/schema/etcd_test.go index 4b8136a87..4d9a647cd 100644 --- a/banyand/metadata/schema/etcd_test.go +++ b/banyand/metadata/schema/etcd_test.go @@ -34,6 +34,7 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd" "github.com/apache/skywalking-banyandb/pkg/test" ) @@ -118,30 +119,34 @@ func randomTempDir() string { return path.Join(os.TempDir(), fmt.Sprintf("banyandb-embed-etcd-%s", uuid.New().String())) } -func useRandomTempDir() RegistryOption { - return func(config *etcdSchemaRegistryConfig) { - config.rootDir = randomTempDir() +func initServerAndRegister(t *testing.T) (Registry, func()) { + req := require.New(t) + ports, err := test.AllocateFreePorts(2) + if err != nil { + panic("fail to find free ports") } -} - -func useRandomPort() RegistryOption { - return func(config *etcdSchemaRegistryConfig) { - ports, err := test.AllocateFreePorts(2) - if err != nil { - panic("fail to find free ports") - } - config.listenerClientURL, config.listenerPeerURL = fmt.Sprintf("http://127.0.0.1:%d", ports[0]), fmt.Sprintf("http://127.0.0.1:%d", ports[1]) + endpoints := []string{fmt.Sprintf("http://127.0.0.1:%d", ports[0])} + server, err := embeddedetcd.NewServer( + embeddedetcd.ConfigureListener(endpoints, []string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}), + embeddedetcd.RootDir(randomTempDir())) + req.NoError(err) + req.NotNil(server) + <-server.ReadyNotify() + schemaRegistry, err := NewEtcdSchemaRegistry(ConfigureServerEndpoints(endpoints)) + req.NoError(err) + req.NotNil(server) + return schemaRegistry, func() { + server.Close() + <-server.StopNotify() + schemaRegistry.Close() } } func Test_Etcd_Entity_Get(t *testing.T) { tester := assert.New(t) - registry, err := NewEtcdSchemaRegistry(useRandomPort(), useRandomTempDir()) - tester.NoError(err) - tester.NotNil(registry) - defer registry.Close() - - err = preloadSchema(registry) + registry, closer := initServerAndRegister(t) + defer closer() + err := preloadSchema(registry) tester.NoError(err) tests := []struct { @@ -228,12 +233,10 @@ func Test_Etcd_Entity_Get(t *testing.T) { func Test_Etcd_Entity_List(t *testing.T) { tester := assert.New(t) - registry, err := NewEtcdSchemaRegistry(useRandomPort(), useRandomTempDir()) - tester.NoError(err) - tester.NotNil(registry) - defer registry.Close() + registry, closer := initServerAndRegister(t) + defer closer() - err = preloadSchema(registry) + err := preloadSchema(registry) tester.NoError(err) tests := []struct { @@ -310,12 +313,10 @@ func Test_Etcd_Entity_List(t *testing.T) { func Test_Etcd_Delete(t *testing.T) { tester := assert.New(t) - registry, err := NewEtcdSchemaRegistry(useRandomPort(), useRandomTempDir()) - tester.NoError(err) - tester.NotNil(registry) - defer registry.Close() + registry, closer := initServerAndRegister(t) + defer closer() - err = preloadSchema(registry) + err := preloadSchema(registry) tester.NoError(err) tests := []struct { @@ -379,12 +380,10 @@ func Test_Etcd_Delete(t *testing.T) { func Test_Notify(t *testing.T) { req := require.New(t) - registry, err := NewEtcdSchemaRegistry(useRandomPort(), useRandomTempDir()) - req.NoError(err) - req.NotNil(registry) - defer registry.Close() + registry, closer := initServerAndRegister(t) + defer closer() - err = preloadSchema(registry) + err := preloadSchema(registry) req.NoError(err) tests := []struct { @@ -524,12 +523,10 @@ func Test_Notify(t *testing.T) { func Test_Etcd_Entity_Update(t *testing.T) { tester := assert.New(t) - registry, err := NewEtcdSchemaRegistry(useRandomPort(), useRandomTempDir()) - tester.NoError(err) - tester.NotNil(registry) - defer registry.Close() + registry, closer := initServerAndRegister(t) + defer closer() - err = preloadSchema(registry) + err := preloadSchema(registry) tester.NoError(err) tests := []struct { diff --git a/banyand/metadata/schema/schema.go b/banyand/metadata/schema/schema.go index e26ad943a..f96149383 100644 --- a/banyand/metadata/schema/schema.go +++ b/banyand/metadata/schema/schema.go @@ -61,9 +61,6 @@ type ListOpt struct { // Registry allowing depositing resources. type Registry interface { io.Closer - ReadyNotify() <-chan struct{} - StopNotify() <-chan struct{} - StoppingNotify() <-chan struct{} Stream IndexRule IndexRuleBinding diff --git a/banyand/metadata/server.go b/banyand/metadata/server.go new file mode 100644 index 000000000..ce004e3c3 --- /dev/null +++ b/banyand/metadata/server.go @@ -0,0 +1,95 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package metadata + +import ( + "context" + "errors" + "strings" + + "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd" + "github.com/apache/skywalking-banyandb/pkg/run" +) + +type server struct { + Service + metaServer embeddedetcd.Server + rootDir string + listenClientURL []string + listenPeerURL []string +} + +func (s *server) Name() string { + return "metadata" +} + +func (s *server) FlagSet() *run.FlagSet { + fs := run.NewFlagSet("metadata") + fs.StringVar(&s.rootDir, "metadata-root-path", "/tmp", "the root path of metadata") + fs.StringArrayVar(&s.listenClientURL, "etcd-listen-client-url", []string{"http://localhost:2379"}, "A URL to listen on for client traffic") + fs.StringArrayVar(&s.listenPeerURL, "etcd-listen-peer-url", []string{"http://localhost:2380"}, "A URL to listen on for peer traffic") + return fs +} + +func (s *server) Validate() error { + if s.rootDir == "" { + return errors.New("rootDir is empty") + } + if s.listenClientURL == nil { + return errors.New("listenClientURL is empty") + } + if s.listenPeerURL == nil { + return errors.New("listenPeerURL is empty") + } + if err := s.Service.FlagSet().Set(flagEtcdEndpointsName, + strings.Join(s.listenClientURL, ",")); err != nil { + return err + } + return s.Service.Validate() +} + +func (s *server) PreRun() error { + var err error + s.metaServer, err = embeddedetcd.NewServer(embeddedetcd.RootDir(s.rootDir), embeddedetcd.ConfigureListener(s.listenClientURL, s.listenPeerURL)) + if err != nil { + return err + } + <-s.metaServer.ReadyNotify() + return s.Service.PreRun() +} + +func (s *server) Serve() run.StopNotify { + return s.metaServer.StoppingNotify() +} + +func (s *server) GracefulStop() { + s.Service.GracefulStop() + s.metaServer.Close() + <-s.metaServer.StopNotify() +} + +// NewService returns a new metadata repository Service. +func NewService(ctx context.Context) (Service, error) { + s := &server{} + var err error + s.Service, err = NewClient(ctx) + if err != nil { + return nil, err + } + return s, nil +}