diff --git a/CHANGES.md b/CHANGES.md index dc7ab55bc..15a8a8ae1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -9,6 +9,7 @@ Release Notes. - List all properties in a group. - Implement Write-ahead Logging - Document the clustering. +- Support multiple roles for banyand server. ### Bugs diff --git a/banyand/internal/cmd/liaison.go b/banyand/internal/cmd/liaison.go new file mode 100644 index 000000000..6c8834680 --- /dev/null +++ b/banyand/internal/cmd/liaison.go @@ -0,0 +1,108 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cmd + +import ( + "context" + "fmt" + "os" + + "github.com/spf13/cobra" + + "github.com/apache/skywalking-banyandb/banyand/discovery" + "github.com/apache/skywalking-banyandb/banyand/liaison" + "github.com/apache/skywalking-banyandb/banyand/liaison/http" + "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/banyand/queue" + "github.com/apache/skywalking-banyandb/pkg/config" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/run" + "github.com/apache/skywalking-banyandb/pkg/signal" + "github.com/apache/skywalking-banyandb/pkg/version" +) + +var liaisonGroup = run.NewGroup("liaison") + +func newLiaisonCmd() *cobra.Command { + l := logger.GetLogger("bootstrap") + ctx := context.Background() + repo, err := discovery.NewServiceRepo(ctx) + if err != nil { + l.Fatal().Err(err).Msg("failed to initiate service repository") + } + // nolint: staticcheck + pipeline, err := queue.NewQueue(ctx, repo) + if err != nil { + l.Fatal().Err(err).Msg("failed to initiate data pipeline") + } + metaSvc, err := metadata.NewClient(ctx) + if err != nil { + l.Fatal().Err(err).Msg("failed to initiate metadata service") + } + tcp, err := liaison.NewEndpoint(ctx, pipeline, repo, metaSvc) + if err != nil { + l.Fatal().Err(err).Msg("failed to initiate Endpoint transport layer") + } + profSvc := observability.NewProfService() + metricSvc := observability.NewMetricService() + httpServer := http.NewService() + + units := []run.Unit{ + new(signal.Handler), + repo, + pipeline, + tcp, + httpServer, + profSvc, + } + if metricSvc != nil { + units = append(units, metricSvc) + } + // Meta the run Group units. + liaisonGroup.Register(units...) + logging := logger.Logging{} + liaisonCmd := &cobra.Command{ + Use: "liaison", + Version: version.Build(), + Short: "Run as the liaison server", + PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) { + if err = config.Load("logging", cmd.Flags()); err != nil { + return err + } + return logger.Init(logging) + }, + RunE: func(cmd *cobra.Command, args []string) (err error) { + fmt.Print(logo) + logger.GetLogger().Info().Msg("starting as a liaison server") + // Spawn our go routines and wait for shutdown. + if err := liaisonGroup.Run(); err != nil { + logger.GetLogger().Error().Err(err).Stack().Str("name", liaisonGroup.Name()).Msg("Exit") + os.Exit(-1) + } + return nil + }, + } + + liaisonCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the logging") + liaisonCmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging") + liaisonCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module") + liaisonCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging") + liaisonCmd.Flags().AddFlagSet(liaisonGroup.RegisterFlags().FlagSet) + return liaisonCmd +} diff --git a/banyand/internal/cmd/meta.go b/banyand/internal/cmd/meta.go new file mode 100644 index 000000000..df7530d26 --- /dev/null +++ b/banyand/internal/cmd/meta.go @@ -0,0 +1,87 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cmd + +import ( + "context" + "fmt" + "os" + + "github.com/spf13/cobra" + + "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/pkg/config" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/run" + "github.com/apache/skywalking-banyandb/pkg/signal" + "github.com/apache/skywalking-banyandb/pkg/version" +) + +var metaGroup = run.NewGroup("meta") + +func newMetaCmd() *cobra.Command { + l := logger.GetLogger("bootstrap") + ctx := context.Background() + metaSvc, err := metadata.NewService(ctx) + if err != nil { + l.Fatal().Err(err).Msg("failed to initiate metadata service") + } + profSvc := observability.NewProfService() + metricSvc := observability.NewMetricService() + + units := []run.Unit{ + new(signal.Handler), + metaSvc, + profSvc, + } + if metricSvc != nil { + units = append(units, metricSvc) + } + // Meta the run Group units. + metaGroup.Register(units...) + logging := logger.Logging{} + metaCmd := &cobra.Command{ + Use: "meta", + Version: version.Build(), + Short: "Run as the meta server", + PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) { + if err = config.Load("logging", cmd.Flags()); err != nil { + return err + } + return logger.Init(logging) + }, + RunE: func(cmd *cobra.Command, args []string) (err error) { + fmt.Print(logo) + logger.GetLogger().Info().Msg("starting as a meta server") + // Spawn our go routines and wait for shutdown. + if err := metaGroup.Run(); err != nil { + logger.GetLogger().Error().Err(err).Stack().Str("name", metaGroup.Name()).Msg("Exit") + os.Exit(-1) + } + return nil + }, + } + + metaCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the logging") + metaCmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging") + metaCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module") + metaCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging") + metaCmd.Flags().AddFlagSet(metaGroup.RegisterFlags().FlagSet) + return metaCmd +} diff --git a/banyand/internal/cmd/root.go b/banyand/internal/cmd/root.go index c5dd966a1..da67766da 100644 --- a/banyand/internal/cmd/root.go +++ b/banyand/internal/cmd/root.go @@ -44,5 +44,8 @@ BanyanDB, as an observability database, aims to ingest, analyze and store Metric `, } cmd.AddCommand(newStandaloneCmd()) + cmd.AddCommand(newMetaCmd()) + cmd.AddCommand(newStorageCmd()) + cmd.AddCommand(newLiaisonCmd()) return cmd } diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go index 7a91cd777..027e0407d 100644 --- a/banyand/internal/cmd/standalone.go +++ b/banyand/internal/cmd/standalone.go @@ -40,7 +40,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/version" ) -var g = run.NewGroup("standalone") +var standaloneGroup = run.NewGroup("standalone") func newStandaloneCmd() *cobra.Command { l := logger.GetLogger("bootstrap") @@ -65,7 +65,7 @@ func newStandaloneCmd() *cobra.Command { if err != nil { l.Fatal().Err(err).Msg("failed to initiate measure service") } - q, err := query.NewService(ctx, streamSvc, measureSvc, metaSvc, repo, pipeline) + q, err := query.NewService(ctx, streamSvc, measureSvc, metaSvc, pipeline) if err != nil { l.Fatal().Err(err).Msg("failed to initiate query processor") } @@ -93,12 +93,12 @@ func newStandaloneCmd() *cobra.Command { units = append(units, metricSvc) } // Meta the run Group units. - g.Register(units...) + standaloneGroup.Register(units...) logging := logger.Logging{} standaloneCmd := &cobra.Command{ Use: "standalone", Version: version.Build(), - Short: "Run as the standalone mode", + Short: "Run as the standalone server", PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) { if err = config.Load("logging", cmd.Flags()); err != nil { return err @@ -109,8 +109,8 @@ func newStandaloneCmd() *cobra.Command { fmt.Print(logo) logger.GetLogger().Info().Msg("starting as a standalone server") // Spawn our go routines and wait for shutdown. - if err := g.Run(); err != nil { - logger.GetLogger().Error().Err(err).Stack().Str("name", g.Name()).Msg("Exit") + if err := standaloneGroup.Run(); err != nil { + logger.GetLogger().Error().Err(err).Stack().Str("name", standaloneGroup.Name()).Msg("Exit") os.Exit(-1) } return nil @@ -121,6 +121,6 @@ func newStandaloneCmd() *cobra.Command { standaloneCmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging") standaloneCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module") standaloneCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging") - standaloneCmd.Flags().AddFlagSet(g.RegisterFlags().FlagSet) + standaloneCmd.Flags().AddFlagSet(standaloneGroup.RegisterFlags().FlagSet) return standaloneCmd } diff --git a/banyand/internal/cmd/storage.go b/banyand/internal/cmd/storage.go new file mode 100644 index 000000000..80c3311b9 --- /dev/null +++ b/banyand/internal/cmd/storage.go @@ -0,0 +1,141 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cmd + +import ( + "context" + "fmt" + "os" + + "github.com/spf13/cobra" + + "github.com/apache/skywalking-banyandb/banyand/discovery" + "github.com/apache/skywalking-banyandb/banyand/measure" + "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/banyand/query" + "github.com/apache/skywalking-banyandb/banyand/queue" + "github.com/apache/skywalking-banyandb/banyand/stream" + "github.com/apache/skywalking-banyandb/pkg/config" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/run" + "github.com/apache/skywalking-banyandb/pkg/signal" + "github.com/apache/skywalking-banyandb/pkg/version" +) + +var storageGroup = run.NewGroup("storage") + +const ( + storageModeData = "data" + storageModeQuery = "query" + storageModeMix = "mix" +) + +var flagStorageMode string + +func newStorageCmd() *cobra.Command { + l := logger.GetLogger("bootstrap") + ctx := context.Background() + repo, err := discovery.NewServiceRepo(ctx) + if err != nil { + l.Fatal().Err(err).Msg("failed to initiate service repository") + } + // nolint: staticcheck + pipeline, err := queue.NewQueue(ctx, repo) + if err != nil { + l.Fatal().Err(err).Msg("failed to initiate data pipeline") + } + metaSvc, err := metadata.NewClient(ctx) + if err != nil { + l.Fatal().Err(err).Msg("failed to initiate metadata service") + } + streamSvc, err := stream.NewService(ctx, metaSvc, repo, pipeline) + if err != nil { + l.Fatal().Err(err).Msg("failed to initiate stream service") + } + measureSvc, err := measure.NewService(ctx, metaSvc, repo, pipeline) + if err != nil { + l.Fatal().Err(err).Msg("failed to initiate measure service") + } + // TODO: remove streamSVC and measureSvc from query processor. To use metaSvc instead. + q, err := query.NewService(ctx, streamSvc, measureSvc, metaSvc, pipeline) + if err != nil { + l.Fatal().Err(err).Msg("failed to initiate query processor") + } + profSvc := observability.NewProfService() + metricSvc := observability.NewMetricService() + + units := []run.Unit{ + new(signal.Handler), + repo, + pipeline, + measureSvc, + streamSvc, + q, + profSvc, + } + if metricSvc != nil { + units = append(units, metricSvc) + } + // Meta the run Group units. + storageGroup.Register(units...) + logging := logger.Logging{} + storageCmd := &cobra.Command{ + Use: "storage", + Version: version.Build(), + Short: "Run as the storage server", + PersistentPreRunE: func(cmd *cobra.Command, args []string) (err error) { + if err = config.Load("logging", cmd.Flags()); err != nil { + return err + } + return logger.Init(logging) + }, + PreRun: func(cmd *cobra.Command, args []string) { + if flagStorageMode == storageModeMix { + return + } + switch flagStorageMode { + case storageModeData: + storageGroup.Deregister(q) + case storageModeQuery: + storageGroup.Deregister(streamSvc) + storageGroup.Deregister(measureSvc) + default: + l.Fatal().Str("mode", flagStorageMode).Msg("unknown storage mode") + } + }, + RunE: func(cmd *cobra.Command, args []string) (err error) { + fmt.Print(logo) + logger.GetLogger().Info().Msg("starting as a storage server") + // Spawn our go routines and wait for shutdown. + if err := storageGroup.Run(); err != nil { + logger.GetLogger().Error().Err(err).Stack().Str("name", storageGroup.Name()).Msg("Exit") + os.Exit(-1) + } + return nil + }, + } + + storageCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the logging") + storageCmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the root level of logging") + storageCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", nil, "the specific module") + storageCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", nil, "the level logging of logging") + storageCmd.Flags().StringVarP(&flagStorageMode, "mode", "m", storageModeMix, "the storage mode, one of [data, query, mix]") + storageCmd.Flags().AddFlagSet(storageGroup.RegisterFlags().FlagSet) + return storageCmd +} diff --git a/banyand/liaison/grpc/property.go b/banyand/liaison/grpc/property.go index 51b0eba7e..05a542f8a 100644 --- a/banyand/liaison/grpc/property.go +++ b/banyand/liaison/grpc/property.go @@ -26,7 +26,7 @@ import ( type propertyServer struct { propertyv1.UnimplementedPropertyServiceServer - schemaRegistry metadata.Service + schemaRegistry metadata.Repo } func (ps *propertyServer) Apply(ctx context.Context, req *propertyv1.ApplyRequest) (*propertyv1.ApplyResponse, error) { diff --git a/banyand/liaison/grpc/registry.go b/banyand/liaison/grpc/registry.go index 87927e7f1..93e49138c 100644 --- a/banyand/liaison/grpc/registry.go +++ b/banyand/liaison/grpc/registry.go @@ -32,7 +32,7 @@ import ( type streamRegistryServer struct { databasev1.UnimplementedStreamRegistryServiceServer - schemaRegistry metadata.Service + schemaRegistry metadata.Repo } func (rs *streamRegistryServer) Create(ctx context.Context, @@ -123,7 +123,7 @@ func groupExist(ctx context.Context, errResource error, metadata *commonv1.Metad type indexRuleBindingRegistryServer struct { databasev1.UnimplementedIndexRuleBindingRegistryServiceServer - schemaRegistry metadata.Service + schemaRegistry metadata.Repo } func (rs *indexRuleBindingRegistryServer) Create(ctx context.Context, @@ -205,7 +205,7 @@ func (rs *indexRuleBindingRegistryServer) Exist(ctx context.Context, req *databa type indexRuleRegistryServer struct { databasev1.UnimplementedIndexRuleRegistryServiceServer - schemaRegistry metadata.Service + schemaRegistry metadata.Repo } func (rs *indexRuleRegistryServer) Create(ctx context.Context, req *databasev1.IndexRuleRegistryServiceCreateRequest) ( @@ -281,7 +281,7 @@ func (rs *indexRuleRegistryServer) Exist(ctx context.Context, req *databasev1.In type measureRegistryServer struct { databasev1.UnimplementedMeasureRegistryServiceServer - schemaRegistry metadata.Service + schemaRegistry metadata.Repo } func (rs *measureRegistryServer) Create(ctx context.Context, req *databasev1.MeasureRegistryServiceCreateRequest) ( @@ -355,7 +355,7 @@ func (rs *measureRegistryServer) Exist(ctx context.Context, req *databasev1.Meas type groupRegistryServer struct { databasev1.UnimplementedGroupRegistryServiceServer - schemaRegistry metadata.Service + schemaRegistry metadata.Repo } func (rs *groupRegistryServer) Create(ctx context.Context, req *databasev1.GroupRegistryServiceCreateRequest) ( @@ -429,7 +429,7 @@ func (rs *groupRegistryServer) Exist(ctx context.Context, req *databasev1.GroupR type topNAggregationRegistryServer struct { databasev1.UnimplementedTopNAggregationRegistryServiceServer - schemaRegistry metadata.Service + schemaRegistry metadata.Repo } func (ts *topNAggregationRegistryServer) Create(ctx context.Context, diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go index a92ff7a64..9ead16c37 100644 --- a/banyand/liaison/grpc/server.go +++ b/banyand/liaison/grpc/server.go @@ -85,7 +85,7 @@ type server struct { } // NewServer returns a new gRPC server. -func NewServer(_ context.Context, pipeline queue.Queue, repo discovery.ServiceRepo, schemaRegistry metadata.Service) run.Unit { +func NewServer(_ context.Context, pipeline queue.Queue, repo discovery.ServiceRepo, schemaRegistry metadata.Repo) run.Unit { streamSVC := &streamService{ discoveryService: newDiscoveryService(pipeline), } diff --git a/banyand/liaison/liaison.go b/banyand/liaison/liaison.go index bb59f228a..539d2f45e 100644 --- a/banyand/liaison/liaison.go +++ b/banyand/liaison/liaison.go @@ -29,6 +29,6 @@ import ( ) // NewEndpoint return a new endpoint which is the entry point for the database server. -func NewEndpoint(ctx context.Context, pipeline queue.Queue, repo discovery.ServiceRepo, schemaRegistry metadata.Service) (run.Unit, error) { +func NewEndpoint(ctx context.Context, pipeline queue.Queue, repo discovery.ServiceRepo, schemaRegistry metadata.Repo) (run.Unit, error) { return grpc.NewServer(ctx, pipeline, repo, schemaRegistry), nil } diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go new file mode 100644 index 000000000..f89745aad --- /dev/null +++ b/banyand/metadata/client.go @@ -0,0 +1,25 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package metadata + +import "context" + +// NewClient returns a new metadata client. +func NewClient(_ context.Context) (Repo, error) { + return nil, nil +} diff --git a/banyand/query/processor.go b/banyand/query/processor.go index 12c7a1066..2884f8d2c 100644 --- a/banyand/query/processor.go +++ b/banyand/query/processor.go @@ -27,7 +27,6 @@ import ( "github.com/apache/skywalking-banyandb/api/data" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" - "github.com/apache/skywalking-banyandb/banyand/discovery" "github.com/apache/skywalking-banyandb/banyand/measure" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/queue" @@ -54,8 +53,7 @@ var ( type queryService struct { log *logger.Logger // TODO: remove the metaService once https://github.com/apache/skywalking/issues/10121 is fixed. - metaService metadata.Service - serviceRepo discovery.ServiceRepo + metaService metadata.Repo pipeline queue.Queue sqp *streamQueryProcessor mqp *measureQueryProcessor diff --git a/banyand/query/query.go b/banyand/query/query.go index 3e4eab1e9..fbff50306 100644 --- a/banyand/query/query.go +++ b/banyand/query/query.go @@ -21,7 +21,6 @@ package query import ( "context" - "github.com/apache/skywalking-banyandb/banyand/discovery" "github.com/apache/skywalking-banyandb/banyand/measure" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/queue" @@ -31,11 +30,10 @@ import ( // NewService return a new query service. func NewService(_ context.Context, streamService stream.Service, measureService measure.Service, - metaService metadata.Service, serviceRepo discovery.ServiceRepo, pipeline queue.Queue, + metaService metadata.Repo, pipeline queue.Queue, ) (run.Unit, error) { svc := &queryService{ metaService: metaService, - serviceRepo: serviceRepo, pipeline: pipeline, } // measure query processor diff --git a/docs/concept/clustering.md b/docs/concept/clustering.md index 82b5859d8..733f14e8d 100644 --- a/docs/concept/clustering.md +++ b/docs/concept/clustering.md @@ -35,6 +35,14 @@ BanyanDB integrates multiple roles into a single process in the standalone mode, In this mode, the single process performs the roles of the Liaison Node, Query Node, Data Node, and Meta Node. It receives requests, maintains metadata, processes queries, and handles data, all within a unified setup. +### 1.6 Mix Mode in Storage Nodes + +Query nodes and data nodes are implemented by a same executable binary, Storage Node. With the flag "mode", the storage node can be started as a query node or a data node. The default mode is "mix", which means the storage node is both a query node and a data node. + +If the workload of query is high, you can start more storage nodes with the flag "mode" set to "query". If the workload of write is high, you can start more storage nodes with the flag "mode" set to "data". + +Or you can start storage nodes with the flag "mode" set to "mix" to balance the workload of query and write. + ## 2. Communication within a Cluster All nodes within a BanyanDB cluster communicate with other nodes according to their roles: diff --git a/docs/installation.md b/docs/installation.md index 6e3ef2d2e..d29ab2132 100644 --- a/docs/installation.md +++ b/docs/installation.md @@ -92,7 +92,10 @@ Usage: Available Commands: completion generate the autocompletion script for the specified shell help Help about any command - standalone Run as the standalone mode + liaison Run as the liaison server + meta Run as the meta server + standalone Run as the standalone server + storage Run as the storage server Flags: -h, --help help for this command @@ -101,7 +104,7 @@ Flags: Use " [command] --help" for more information about a command. ``` -Banyand is running as a standalone process by +Banyand is running as a standalone server by ```shell $ ./banyand-server standalone @@ -119,46 +122,44 @@ $ ./banyand-server standalone The banyand-server would be listening on the `0.0.0.0:17912` if no errors occurred. -To discover more options to configure the banyand by +## Setup Multiple Banyand as Cluster(TBD) + +### Setup Standalone Nodes + +The standalone node is running as a standalone process by ```shell -$ ./banyand-server standalone -h -Usage: - standalone [flags] +$ ./banyand-server standalone -id n1 +$ ./banyand-server standalone -id n2 +$ ./banyand-server standalone -id n3 +``` -Flags: - --addr string the address of banyand listens (default ":17912") - --cert-file string the TLS cert file - --etcd-listen-client-url string A URL to listen on for client traffic (default "http://localhost:2379") - --etcd-listen-peer-url string A URL to listen on for peer traffic (default "http://localhost:2380") - -h, --help help for standalone - --http-addr string listen addr for http (default ":17913") - --http-cert-file string the TLS cert file of http server - --http-grpc-addr string http server redirect grpc requests to this address (default "localhost:17912") - --http-grpc-cert-file string the grpc TLS cert file if grpc server enables tls - --http-key-file string the TLS key file of http server - --http-tls connection uses TLS if true, else plain HTTP - --key-file string the TLS key file - --logging-env string the logging (default "prod") - --logging-level string the root level of logging (default "info") - --logging-levels stringArray the level logging of logging - --logging-modules stringArray the specific module - --max-recv-msg-size bytes the size of max receiving message (default 10.00MiB) - --measure-buffer-size bytes block buffer size (default 4.00MiB) - --measure-encoder-buffer-size bytes block fields buffer size (default 12.00MiB) - --measure-idx-batch-wait-sec int index batch wait in second (default 1) - --measure-root-path string the root path of database (default "/tmp") - --measure-seriesmeta-mem-size bytes series metadata memory size (default 1.00MiB) - --metadata-root-path string the root path of metadata (default "/tmp") - -n, --name string name of this service (default "standalone") - --observability-listener-addr string listen addr for observability (default ":2121") - --pprof-listener-addr string listen addr for pprof (default ":6060") - --show-rungroup-units show rungroup units - --stream-block-buffer-size bytes block buffer size (default 8.00MiB) - --stream-global-index-mem-size bytes global index memory size (default 2.00MiB) - --stream-idx-batch-wait-sec int index batch wait in second (default 1) - --stream-root-path string the root path of database (default "/tmp") - --stream-seriesmeta-mem-size bytes series metadata memory size (default 1.00MiB) - --tls connection uses TLS if true, else plain TCP - -v, --version version for standalone +The standalone node would be listening on the `` if no errors occurred. + +### Setup Role-Based Nodes + +The meta nodes should boot up firstly to provide the metadata service for the whole cluster. The meta node is running as a standalone process by + +```shell +$ ./banyand-server meta +``` + +The meta node would be listening on the `` if no errors occurred. + + +Data nodes, query nodes and liaison nodes are running as independent processes by + +```shell +$ ./banyand-server storage --mode data +$ ./banyand-server storage --mode query +$ ./banyand-server liaison +``` + +The data node, query node and liaison node would be listening on the `` if no errors occurred. + +If you want to use a `mix` mode instead of separate query and data nodes, you can run the banyand-server as processes by + +```shell +$ ./banyand-server storage +$ ./banyand-server liaison ``` diff --git a/pkg/run/run.go b/pkg/run/run.go index c45e59a0d..4630ff07e 100644 --- a/pkg/run/run.go +++ b/pkg/run/run.go @@ -176,6 +176,42 @@ func (g *Group) Register(units ...Unit) []bool { return hasRegistered } +// Deregister will remove the provided objects implementing the Unit interface +// from the Group. +// The returned array of booleans is of the same size as the amount of provided +// Units, signaling for each provided Unit if it successfully deregistered from +// Group or if it was ignored. +func (g *Group) Deregister(units ...Unit) []bool { + hasDeregistered := make([]bool, len(units)) + for idx := range units { + if c, ok := units[idx].(Config); ok { + for i := range g.c { + if g.c[i] == c { + g.c[i] = nil + hasDeregistered[idx] = true + } + } + } + if p, ok := units[idx].(PreRunner); ok { + for i := range g.p { + if g.p[i] == p { + g.p[i] = nil + hasDeregistered[idx] = true + } + } + } + if s, ok := units[idx].(Service); ok { + for i := range g.s { + if g.s[i] == s { + g.s[i] = nil + hasDeregistered[idx] = true + } + } + } + } + return hasDeregistered +} + // RegisterFlags returns FlagSet contains Flags in all modules. func (g *Group) RegisterFlags() *FlagSet { // run configuration stage diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go index 1c6dd77a8..7b6544713 100644 --- a/pkg/test/setup/setup.go +++ b/pkg/test/setup/setup.go @@ -93,7 +93,7 @@ func modules(schemaLoaders []SchemaLoader, flags []string) func() { measureSvc, err := measure.NewService(context.TODO(), metaSvc, repo, pipeline) gomega.Expect(err).NotTo(gomega.HaveOccurred()) // Init `Query` module - q, err := query.NewService(context.TODO(), streamSvc, measureSvc, metaSvc, repo, pipeline) + q, err := query.NewService(context.TODO(), streamSvc, measureSvc, metaSvc, pipeline) gomega.Expect(err).NotTo(gomega.HaveOccurred()) tcp := grpc.NewServer(context.TODO(), pipeline, repo, metaSvc) httpServer := http.NewService()