Skip to content

Commit

Permalink
Add watcher to sync data in etcd to local cache (#319)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanahmily authored Aug 23, 2023
1 parent c37bc25 commit 3e589ab
Show file tree
Hide file tree
Showing 30 changed files with 593 additions and 444 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Release Notes.
- Fix timer not released
- BanyanDB ui misses fields when creating a group
- Fix data duplicate writing
- Syncing metadata change events from etcd instead of a local channel.

### Chores

Expand Down
2 changes: 1 addition & 1 deletion api/proto/banyandb/database/v1/database.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ enum Role {
}

message Node {
string name = 1;
common.v1.Metadata metadata = 1;
repeated Role roles = 2;
string grpc_address = 3;
string http_address = 4;
Expand Down
4 changes: 2 additions & 2 deletions banyand/liaison/grpc/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ func (ds *discoveryService) initialize(ctx context.Context) error {
return fmt.Errorf("unsupported kind: %d", ds.kind)
}
}
ds.metadataRepo.RegisterHandler(schema.KindShard, ds.shardRepo)
ds.metadataRepo.RegisterHandler(ds.kind, ds.entityRepo)
ds.metadataRepo.RegisterHandler("liaison", schema.KindShard, ds.shardRepo)
ds.metadataRepo.RegisterHandler("liaison", ds.kind, ds.entityRepo)
return nil
}

Expand Down
5 changes: 0 additions & 5 deletions banyand/measure/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) {
})
cancel()
if err != nil {
sr.l.Error().Err(err).Msg("fail to get subject")
return
}
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Expand All @@ -107,7 +106,6 @@ func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) {
defer cancel()
subjects, err := sr.metadata.Subjects(ctx, metadata.Spec.(*databasev1.IndexRule), commonv1.Catalog_CATALOG_MEASURE)
if err != nil {
sr.l.Error().Err(err).Msg("fail to get subjects(measure)")
return
}
for _, sub := range subjects {
Expand All @@ -121,7 +119,6 @@ func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) {
// createOrUpdate TopN schemas in advance
_, err := createOrUpdateTopNMeasure(context.Background(), sr.metadata.MeasureRegistry(), metadata.Spec.(*databasev1.TopNAggregation))
if err != nil {
sr.l.Error().Err(err).Msg("fail to create/update topN measure")
return
}
// reload source measure
Expand Down Expand Up @@ -231,7 +228,6 @@ func (sr *schemaRepo) OnDelete(metadata schema.Metadata) {
Group: metadata.Group,
})
if err != nil {
sr.l.Error().Err(err).Msg("fail to get subject")
return
}
// we should update instead of delete
Expand All @@ -245,7 +241,6 @@ func (sr *schemaRepo) OnDelete(metadata schema.Metadata) {
case schema.KindTopNAggregation:
err := sr.removeTopNMeasure(metadata.Spec.(*databasev1.TopNAggregation).GetSourceMeasure())
if err != nil {
sr.l.Error().Err(err).Msg("fail to remove topN measure")
return
}
// we should update instead of delete
Expand Down
64 changes: 3 additions & 61 deletions banyand/measure/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ package measure
import (
"context"
"path"
"time"

"github.com/pkg/errors"
"go.uber.org/multierr"

"github.com/apache/skywalking-banyandb/api/data"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
Expand Down Expand Up @@ -107,81 +105,25 @@ func (s *service) Role() databasev1.Role {
return databasev1.Role_ROLE_DATA
}

func (s *service) PreRun(ctx context.Context) error {
func (s *service) PreRun(_ context.Context) error {
s.l = logger.GetLogger(s.Name())
ctxGroup, cancelGroup := context.WithTimeout(ctx, 5*time.Second)
groups, err := s.metadata.GroupRegistry().ListGroup(ctxGroup)
cancelGroup()
if err != nil {
return err
}
path := path.Join(s.root, s.Name())
observability.UpdatePath(path)
s.schemaRepo = newSchemaRepo(path, s.metadata, s.dbOpts,
s.l, s.pipeline, int64(s.BlockEncoderBufferSize), int64(s.BlockBufferSize))
for _, g := range groups {
if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
continue
}
gp, innerErr := s.schemaRepo.StoreGroup(g.Metadata)
if innerErr != nil {
return innerErr
}
ctxMeasure, cancelMeasure := context.WithTimeout(ctx, 5*time.Second)
allMeasureSchemas, innerErr := s.metadata.MeasureRegistry().
ListMeasure(ctxMeasure, schema.ListOpt{Group: gp.GetSchema().GetMetadata().GetName()})
cancelMeasure()
if innerErr != nil {
return innerErr
}
for _, measureSchema := range allMeasureSchemas {
// sanity check before calling StoreResource
// since StoreResource may be called inside the event loop
if checkErr := s.sanityCheck(ctx, gp, measureSchema); checkErr != nil {
return checkErr
}
if _, innerErr := gp.StoreResource(ctx, measureSchema); innerErr != nil {
return innerErr
}
}
}
// run a serial watcher
go s.schemaRepo.Watcher()
s.metadata.
RegisterHandler(schema.KindGroup|schema.KindMeasure|schema.KindIndexRuleBinding|schema.KindIndexRule|schema.KindTopNAggregation,
RegisterHandler("measure", schema.KindGroup|schema.KindMeasure|schema.KindIndexRuleBinding|schema.KindIndexRule|schema.KindTopNAggregation,
&s.schemaRepo)

s.writeListener = setUpWriteCallback(s.l, &s.schemaRepo)
err = s.pipeline.Subscribe(data.TopicMeasureWrite, s.writeListener)
err := s.pipeline.Subscribe(data.TopicMeasureWrite, s.writeListener)
if err != nil {
return err
}
return nil
}

func (s *service) sanityCheck(ctx context.Context, group resourceSchema.Group, measureSchema *databasev1.Measure) error {
var topNAggrs []*databasev1.TopNAggregation
ctxLocal, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
topNAggrs, err := s.metadata.MeasureRegistry().TopNAggregations(ctxLocal, measureSchema.GetMetadata())
if err != nil || len(topNAggrs) == 0 {
return err
}

for _, topNAggr := range topNAggrs {
topNMeasure, innerErr := createOrUpdateTopNMeasure(ctx, s.metadata.MeasureRegistry(), topNAggr)
err = multierr.Append(err, innerErr)
if topNMeasure != nil {
_, storeErr := group.StoreResource(ctx, topNMeasure)
if storeErr != nil {
err = multierr.Append(err, storeErr)
}
}
}

return err
}

func (s *service) Serve() run.StopNotify {
return s.schemaRepo.StopCh()
}
Expand Down
10 changes: 6 additions & 4 deletions banyand/metadata/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ func (s *clientService) PreRun(ctx context.Context) error {
ctxRegister, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
if err = s.schemaRegistry.RegisterNode(ctxRegister, &databasev1.Node{
Name: node.NodeID,
Metadata: &commonv1.Metadata{
Name: node.NodeID,
},
GrpcAddress: node.GrpcAddress,
HttpAddress: node.HTTPAddress,
Roles: nodeRoles,
Expand All @@ -92,7 +94,7 @@ func (s *clientService) PreRun(ctx context.Context) error {
return err
}
s.alc = newAllocator(s.schemaRegistry, logger.GetLogger(s.Name()).Named("allocator"))
s.schemaRegistry.RegisterHandler(schema.KindGroup|schema.KindNode, s.alc)
s.schemaRegistry.RegisterHandler("shard-allocator", schema.KindGroup|schema.KindNode, s.alc)
return nil
}

Expand All @@ -106,8 +108,8 @@ func (s *clientService) GracefulStop() {
_ = s.schemaRegistry.Close()
}

func (s *clientService) RegisterHandler(kind schema.Kind, handler schema.EventHandler) {
s.schemaRegistry.RegisterHandler(kind, handler)
func (s *clientService) RegisterHandler(name string, kind schema.Kind, handler schema.EventHandler) {
s.schemaRegistry.RegisterHandler(name, kind, handler)
}

func (s *clientService) StreamRegistry() schema.Stream {
Expand Down
2 changes: 1 addition & 1 deletion banyand/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Repo interface {
TopNAggregationRegistry() schema.TopNAggregation
PropertyRegistry() schema.Property
ShardRegistry() schema.Shard
RegisterHandler(schema.Kind, schema.EventHandler)
RegisterHandler(string, schema.Kind, schema.EventHandler)
}

// Service is the metadata repository.
Expand Down
Loading

0 comments on commit 3e589ab

Please sign in to comment.