Skip to content

Commit

Permalink
Leve up the index
Browse files Browse the repository at this point in the history
Signed-off-by: Gao Hongtao <[email protected]>
  • Loading branch information
hanahmily committed Jun 26, 2023
1 parent 25fb9cb commit c8b2967
Show file tree
Hide file tree
Showing 44 changed files with 683 additions and 772 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Release Notes.
- Add the Istio scenario stress test based on the data generated by the integration access log.
- Generalize the index's docID to uint64.
- Remove redundant ID tag type.
- Improve granularity of index in `measure` by leveling up from data point to series.

### Bugs

Expand Down
21 changes: 11 additions & 10 deletions banyand/measure/measure_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func (s *measure) write(shardID common.ShardID, entity []byte, entityValues tsdb
if err != nil {
return err
}
series, err := shard.Series().Get(entity, entityValues)
seriesDB := shard.Series()
series, err := seriesDB.Get(entity, entityValues)
if err != nil {
return err
}
Expand All @@ -69,18 +70,18 @@ func (s *measure) write(shardID common.ShardID, entity []byte, entityValues tsdb
}
return err
}
writeFn := func() (tsdb.Writer, error) {
writeFn := func() error {
builder := wp.WriterBuilder().Time(t)
for fi, family := range value.GetTagFamilies() {
spec := s.schema.GetTagFamilies()[fi]
bb, errMarshal := pbv1.EncodeFamily(spec, family)
if errMarshal != nil {
return nil, errMarshal
return errMarshal
}
builder.Family(familyIdentity(spec.GetName(), pbv1.TagFlag), bb)
}
if len(value.GetFields()) > len(s.schema.GetFields()) {
return nil, errors.Wrap(errMalformedElement, "fields number is more than expected")
return errors.Wrap(errMalformedElement, "fields number is more than expected")
}
for fi, fieldValue := range value.GetFields() {
fieldSpec := s.schema.GetFields()[fi]
Expand All @@ -90,7 +91,7 @@ func (s *measure) write(shardID common.ShardID, entity []byte, entityValues tsdb
continue
}
if fType != fieldSpec.GetFieldType() {
return nil, errors.Wrapf(errMalformedElement, "field %s type is unexpected", fieldSpec.GetName())
return errors.Wrapf(errMalformedElement, "field %s type is unexpected", fieldSpec.GetName())
}
data := encodeFieldValue(fieldValue)
if data == nil {
Expand All @@ -101,7 +102,7 @@ func (s *measure) write(shardID common.ShardID, entity []byte, entityValues tsdb
}
writer, errWrite := builder.Build()
if errWrite != nil {
return nil, errWrite
return errWrite
}
_, errWrite = writer.Write()
if s.l.Debug().Enabled() {
Expand All @@ -114,15 +115,15 @@ func (s *measure) write(shardID common.ShardID, entity []byte, entityValues tsdb
Int("shard_id", int(shardID)).
Msg("write measure")
}
return writer, errWrite
return errWrite
}
writer, err := writeFn()
if err != nil {

if err = writeFn(); err != nil {
_ = wp.Close()
return err
}
m := index.Message{
LocalWriter: writer,
IndexWriter: tsdb.NewSeriesIndexWriter(series.ID(), seriesDB),
Value: index.Value{
TagFamilies: value.GetTagFamilies(),
Timestamp: t,
Expand Down
3 changes: 3 additions & 0 deletions banyand/measure/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,5 +202,8 @@ func NewService(_ context.Context, metadata metadata.Repo, repo discovery.Servic
repo: repo,
pipeline: pipeline,
stopCh: make(chan struct{}),
dbOpts: tsdb.DatabaseOpts{
IndexGranularity: tsdb.IndexGranularitySeries,
},
}, nil
}
23 changes: 16 additions & 7 deletions banyand/observability/meter_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package observability

import (
"sync"

grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand All @@ -30,7 +32,12 @@ import (
"google.golang.org/grpc"
)

var reg = prometheus.NewRegistry()
var (
reg = prometheus.NewRegistry()

once = sync.Once{}
srvMetrics *grpcprom.ServerMetrics
)

func init() {
reg.MustRegister(prometheus.NewGoCollector())
Expand All @@ -48,11 +55,13 @@ func NewMeterProvider(scope meter.Scope) meter.Provider {

// MetricsServerInterceptor returns a server interceptor for metrics.
func MetricsServerInterceptor() (grpc.UnaryServerInterceptor, grpc.StreamServerInterceptor) {
srvMetrics := grpcprom.NewServerMetrics(
grpcprom.WithServerHandlingTimeHistogram(
grpcprom.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}),
),
)
reg.MustRegister(srvMetrics)
once.Do(func() {
srvMetrics = grpcprom.NewServerMetrics(
grpcprom.WithServerHandlingTimeHistogram(
grpcprom.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}),
),
)
reg.MustRegister(srvMetrics)
})
return srvMetrics.UnaryServerInterceptor(), srvMetrics.StreamServerInterceptor()
}
1 change: 1 addition & 0 deletions banyand/stream/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func NewService(_ context.Context, metadata metadata.Repo, repo discovery.Servic
pipeline: pipeline,
dbOpts: tsdb.DatabaseOpts{
EnableGlobalIndex: true,
IndexGranularity: tsdb.IndexGranularityBlock,
},
stopCh: make(chan struct{}),
}, nil
Expand Down
5 changes: 3 additions & 2 deletions banyand/stream/stream_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,9 @@ func (s *stream) write(shardID common.ShardID, entity []byte, entityValues tsdb.
return err
}
m := index.Message{
Scope: tsdb.Entry(s.name),
LocalWriter: writer,
Scope: tsdb.Entry(s.name),
IndexWriter: writer,
GlobalItemID: writer.ItemID(),
Value: index.Value{
TagFamilies: value.GetTagFamilies(),
Timestamp: t,
Expand Down
24 changes: 16 additions & 8 deletions banyand/tsdb/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ type block struct {

type openOpts struct {
tsTableFactory TSTableFactory
inverted inverted.StoreOpts
inverted *inverted.StoreOpts
lsm lsm.StoreOpts
}

Expand Down Expand Up @@ -163,10 +163,12 @@ func options(ctx context.Context, root string, l *logger.Logger) (openOpts, erro
}
options = o.(DatabaseOpts)
var opts openOpts
opts.inverted = inverted.StoreOpts{
Path: path.Join(root, componentSecondInvertedIdx),
Logger: l.Named(componentSecondInvertedIdx),
BatchWaitSec: options.BlockInvertedIndex.BatchWaitSec,
if options.IndexGranularity == IndexGranularityBlock {
opts.inverted = &inverted.StoreOpts{
Path: path.Join(root, componentSecondInvertedIdx),
Logger: l.Named(componentSecondInvertedIdx),
BatchWaitSec: options.BlockInvertedIndex.BatchWaitSec,
}
}
opts.lsm = lsm.StoreOpts{
Path: path.Join(root, componentSecondLSMIdx),
Expand Down Expand Up @@ -198,13 +200,16 @@ func (b *block) open() (err error) {
return err
}
b.closableLst = append(b.closableLst, b.tsTable)
if b.invertedIndex, err = inverted.NewStore(b.openOpts.inverted); err != nil {
return err
if b.openOpts.inverted != nil {
if b.invertedIndex, err = inverted.NewStore(*b.openOpts.inverted); err != nil {
return err
}
b.closableLst = append(b.closableLst, b.invertedIndex)
}
if b.lsmIndex, err = lsm.NewStore(b.openOpts.lsm); err != nil {
return err
}
b.closableLst = append(b.closableLst, b.invertedIndex, b.lsmIndex)
b.closableLst = append(b.closableLst, b.lsmIndex)
b.ref.Store(0)
b.closed.Store(false)
blockOpenedTimeSecondsGauge.Set(float64(time.Now().Unix()), b.position.LabelValues()...)
Expand Down Expand Up @@ -431,6 +436,9 @@ func (d *bDelegate) writeLSMIndex(fields []index.Field, id common.ItemID) error
}

func (d *bDelegate) writeInvertedIndex(fields []index.Field, id common.ItemID) error {
if d.delegate.invertedIndex == nil {
return errors.New("inverted index is not enabled")
}
total := 0
for _, f := range fields {
total += len(f.Marshal())
Expand Down
3 changes: 3 additions & 0 deletions banyand/tsdb/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ func (b *Buffer) Read(key []byte, ts time.Time) ([]byte, bool) {

// Close gracefully closes the Buffer and ensures that all pending operations are completed.
func (b *Buffer) Close() error {
if b == nil {
return nil
}
b.closerOnce.Do(func() {
b.entryCloser.Done()
b.entryCloser.CloseThenWait()
Expand Down
15 changes: 8 additions & 7 deletions banyand/tsdb/index/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ import (

// Message wraps value and other info to generate relative indices.
type Message struct {
Value Value
LocalWriter tsdb.Writer
BlockCloser io.Closer
Scope tsdb.Entry
IndexWriter tsdb.IndexWriter
BlockCloser io.Closer
Value Value
Scope tsdb.Entry
GlobalItemID tsdb.GlobalItemID
}

// Value represents the input data for generating indices.
Expand Down Expand Up @@ -122,8 +123,8 @@ func NewWriter(ctx context.Context, options WriterOptions) *Writer {

func (s *Writer) Write(m Message) {
err := multierr.Combine(
s.writeLocalIndex(m.LocalWriter, m.Value),
s.writeGlobalIndex(m.Scope, m.LocalWriter.ItemID(), m.Value),
s.writeLocalIndex(m.IndexWriter, m.Value),
s.writeGlobalIndex(m.Scope, m.GlobalItemID, m.Value),
m.BlockCloser.Close(),
)
if err != nil {
Expand Down Expand Up @@ -191,7 +192,7 @@ func (s *Writer) writeGlobalIndex(scope tsdb.Entry, ref tsdb.GlobalItemID, value
)
}

func (s *Writer) writeLocalIndex(writer tsdb.Writer, value Value) (err error) {
func (s *Writer) writeLocalIndex(writer tsdb.IndexWriter, value Value) (err error) {
collect := func(ruleIndexes []*partition.IndexRuleLocator, fn func(fields []index.Field) error) error {
fields := make([]index.Field, 0)
for _, ruleIndex := range ruleIndexes {
Expand Down
13 changes: 13 additions & 0 deletions banyand/tsdb/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"

"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/index"
)

var _ Shard = (*scopedShard)(nil)
Expand Down Expand Up @@ -73,6 +74,18 @@ type scopedSeriesDatabase struct {
scope Entry
}

func (sdd *scopedSeriesDatabase) writeInvertedIndex(fields []index.Field, seriesID common.SeriesID) error {
return sdd.delegated.writeInvertedIndex(fields, seriesID)
}

func (sdd *scopedSeriesDatabase) writeLSMIndex(fields []index.Field, seriesID common.SeriesID) error {
return sdd.delegated.writeLSMIndex(fields, seriesID)
}

func (sdd *scopedSeriesDatabase) Search(ctx context.Context, path Path, filter index.Filter, order *OrderBy) (SeriesList, error) {
return sdd.delegated.Search(ctx, path.prepend(sdd.scope), filter, order)
}

func (sdd *scopedSeriesDatabase) Close() error {
return nil
}
Expand Down
1 change: 0 additions & 1 deletion banyand/tsdb/series_seek.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ type seekerBuilder struct {
seriesSpan *seriesSpan
indexRuleForSorting *databasev1.IndexRule
l *logger.Logger
rangeOptsForSorting index.RangeOpts
order modelv1.Sort
}

Expand Down
5 changes: 3 additions & 2 deletions banyand/tsdb/series_seek_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
var (
errUnspecifiedIndexType = errors.New("Unspecified index type")
emptyFilters = make([]filterFn, 0)
rangeOpts = index.RangeOpts{}
)

func (s *seekerBuilder) OrderByIndex(indexRule *databasev1.IndexRule, order modelv1.Sort) SeekerBuilder {
Expand Down Expand Up @@ -84,9 +85,9 @@ func (s *seekerBuilder) buildSeriesByIndex() (series []Iterator, err error) {
}
switch s.indexRuleForSorting.GetType() {
case databasev1.IndexRule_TYPE_TREE:
inner, err = b.lsmIndexReader().Iterator(fieldKey, s.rangeOptsForSorting, s.order)
inner, err = b.lsmIndexReader().Iterator(fieldKey, rangeOpts, s.order)
case databasev1.IndexRule_TYPE_INVERTED:
inner, err = b.invertedIndexReader().Iterator(fieldKey, s.rangeOptsForSorting, s.order)
inner, err = b.invertedIndexReader().Iterator(fieldKey, rangeOpts, s.order)
case databasev1.IndexRule_TYPE_UNSPECIFIED:
return nil, errors.WithMessagef(errUnspecifiedIndexType, "index rule:%v", s.indexRuleForSorting)
}
Expand Down
23 changes: 23 additions & 0 deletions banyand/tsdb/series_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,26 @@ func (w *writer) Write() (GlobalItemID, error) {
Term: convert.Int64ToBytes(w.ts.UnixNano()),
}, id.ID)
}

var _ IndexWriter = (*seriesIndexWriter)(nil)

type seriesIndexWriter struct {
seriesDB SeriesDatabase
seriesID common.SeriesID
}

func (s *seriesIndexWriter) WriteInvertedIndex(fields []index.Field) error {
return s.seriesDB.writeInvertedIndex(fields, s.seriesID)
}

func (s *seriesIndexWriter) WriteLSMIndex(fields []index.Field) error {
return s.seriesDB.writeLSMIndex(fields, s.seriesID)
}

// NewSeriesIndexWriter returns a new series index writer.
func NewSeriesIndexWriter(seriesID common.SeriesID, seriesDB SeriesDatabase) IndexWriter {
return &seriesIndexWriter{
seriesID: seriesID,
seriesDB: seriesDB,
}
}
Loading

0 comments on commit c8b2967

Please sign in to comment.