Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve granularity of index in measure by leveling up from data point to series #286

Merged
merged 5 commits into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Release Notes.
- Add the Istio scenario stress test based on the data generated by the integration access log.
- Generalize the index's docID to uint64.
- Remove redundant ID tag type.
- Improve granularity of index in `measure` by leveling up from data point to series.

### Bugs

Expand Down
21 changes: 11 additions & 10 deletions banyand/measure/measure_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func (s *measure) write(shardID common.ShardID, entity []byte, entityValues tsdb
if err != nil {
return err
}
series, err := shard.Series().Get(entity, entityValues)
seriesDB := shard.Series()
series, err := seriesDB.Get(entity, entityValues)
if err != nil {
return err
}
Expand All @@ -69,18 +70,18 @@ func (s *measure) write(shardID common.ShardID, entity []byte, entityValues tsdb
}
return err
}
writeFn := func() (tsdb.Writer, error) {
writeFn := func() error {
builder := wp.WriterBuilder().Time(t)
for fi, family := range value.GetTagFamilies() {
spec := s.schema.GetTagFamilies()[fi]
bb, errMarshal := pbv1.EncodeFamily(spec, family)
if errMarshal != nil {
return nil, errMarshal
return errMarshal
}
builder.Family(familyIdentity(spec.GetName(), pbv1.TagFlag), bb)
}
if len(value.GetFields()) > len(s.schema.GetFields()) {
return nil, errors.Wrap(errMalformedElement, "fields number is more than expected")
return errors.Wrap(errMalformedElement, "fields number is more than expected")
}
for fi, fieldValue := range value.GetFields() {
fieldSpec := s.schema.GetFields()[fi]
Expand All @@ -90,7 +91,7 @@ func (s *measure) write(shardID common.ShardID, entity []byte, entityValues tsdb
continue
}
if fType != fieldSpec.GetFieldType() {
return nil, errors.Wrapf(errMalformedElement, "field %s type is unexpected", fieldSpec.GetName())
return errors.Wrapf(errMalformedElement, "field %s type is unexpected", fieldSpec.GetName())
}
data := encodeFieldValue(fieldValue)
if data == nil {
Expand All @@ -101,7 +102,7 @@ func (s *measure) write(shardID common.ShardID, entity []byte, entityValues tsdb
}
writer, errWrite := builder.Build()
if errWrite != nil {
return nil, errWrite
return errWrite
}
_, errWrite = writer.Write()
if s.l.Debug().Enabled() {
Expand All @@ -114,15 +115,15 @@ func (s *measure) write(shardID common.ShardID, entity []byte, entityValues tsdb
Int("shard_id", int(shardID)).
Msg("write measure")
}
return writer, errWrite
return errWrite
}
writer, err := writeFn()
if err != nil {

if err = writeFn(); err != nil {
_ = wp.Close()
return err
}
m := index.Message{
LocalWriter: writer,
IndexWriter: tsdb.NewSeriesIndexWriter(series.ID(), seriesDB),
Value: index.Value{
TagFamilies: value.GetTagFamilies(),
Timestamp: t,
Expand Down
3 changes: 3 additions & 0 deletions banyand/measure/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,5 +202,8 @@ func NewService(_ context.Context, metadata metadata.Repo, repo discovery.Servic
repo: repo,
pipeline: pipeline,
stopCh: make(chan struct{}),
dbOpts: tsdb.DatabaseOpts{
IndexGranularity: tsdb.IndexGranularitySeries,
},
}, nil
}
23 changes: 16 additions & 7 deletions banyand/observability/meter_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package observability

import (
"sync"

grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand All @@ -30,7 +32,12 @@ import (
"google.golang.org/grpc"
)

var reg = prometheus.NewRegistry()
var (
reg = prometheus.NewRegistry()

once = sync.Once{}
srvMetrics *grpcprom.ServerMetrics
)

func init() {
reg.MustRegister(prometheus.NewGoCollector())
Expand All @@ -48,11 +55,13 @@ func NewMeterProvider(scope meter.Scope) meter.Provider {

// MetricsServerInterceptor returns a server interceptor for metrics.
func MetricsServerInterceptor() (grpc.UnaryServerInterceptor, grpc.StreamServerInterceptor) {
srvMetrics := grpcprom.NewServerMetrics(
grpcprom.WithServerHandlingTimeHistogram(
grpcprom.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}),
),
)
reg.MustRegister(srvMetrics)
once.Do(func() {
srvMetrics = grpcprom.NewServerMetrics(
grpcprom.WithServerHandlingTimeHistogram(
grpcprom.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}),
),
)
reg.MustRegister(srvMetrics)
})
return srvMetrics.UnaryServerInterceptor(), srvMetrics.StreamServerInterceptor()
}
1 change: 1 addition & 0 deletions banyand/stream/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func NewService(_ context.Context, metadata metadata.Repo, repo discovery.Servic
pipeline: pipeline,
dbOpts: tsdb.DatabaseOpts{
EnableGlobalIndex: true,
IndexGranularity: tsdb.IndexGranularityBlock,
},
stopCh: make(chan struct{}),
}, nil
Expand Down
5 changes: 3 additions & 2 deletions banyand/stream/stream_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,9 @@ func (s *stream) write(shardID common.ShardID, entity []byte, entityValues tsdb.
return err
}
m := index.Message{
Scope: tsdb.Entry(s.name),
LocalWriter: writer,
Scope: tsdb.Entry(s.name),
IndexWriter: writer,
GlobalItemID: writer.ItemID(),
Value: index.Value{
TagFamilies: value.GetTagFamilies(),
Timestamp: t,
Expand Down
24 changes: 16 additions & 8 deletions banyand/tsdb/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ type block struct {

type openOpts struct {
tsTableFactory TSTableFactory
inverted inverted.StoreOpts
inverted *inverted.StoreOpts
lsm lsm.StoreOpts
}

Expand Down Expand Up @@ -163,10 +163,12 @@ func options(ctx context.Context, root string, l *logger.Logger) (openOpts, erro
}
options = o.(DatabaseOpts)
var opts openOpts
opts.inverted = inverted.StoreOpts{
Path: path.Join(root, componentSecondInvertedIdx),
Logger: l.Named(componentSecondInvertedIdx),
BatchWaitSec: options.BlockInvertedIndex.BatchWaitSec,
if options.IndexGranularity == IndexGranularityBlock {
opts.inverted = &inverted.StoreOpts{
Path: path.Join(root, componentSecondInvertedIdx),
Logger: l.Named(componentSecondInvertedIdx),
BatchWaitSec: options.BlockInvertedIndex.BatchWaitSec,
}
}
opts.lsm = lsm.StoreOpts{
Path: path.Join(root, componentSecondLSMIdx),
Expand Down Expand Up @@ -198,13 +200,16 @@ func (b *block) open() (err error) {
return err
}
b.closableLst = append(b.closableLst, b.tsTable)
if b.invertedIndex, err = inverted.NewStore(b.openOpts.inverted); err != nil {
return err
if b.openOpts.inverted != nil {
if b.invertedIndex, err = inverted.NewStore(*b.openOpts.inverted); err != nil {
return err
}
b.closableLst = append(b.closableLst, b.invertedIndex)
}
if b.lsmIndex, err = lsm.NewStore(b.openOpts.lsm); err != nil {
return err
}
b.closableLst = append(b.closableLst, b.invertedIndex, b.lsmIndex)
b.closableLst = append(b.closableLst, b.lsmIndex)
b.ref.Store(0)
b.closed.Store(false)
blockOpenedTimeSecondsGauge.Set(float64(time.Now().Unix()), b.position.LabelValues()...)
Expand Down Expand Up @@ -431,6 +436,9 @@ func (d *bDelegate) writeLSMIndex(fields []index.Field, id common.ItemID) error
}

func (d *bDelegate) writeInvertedIndex(fields []index.Field, id common.ItemID) error {
if d.delegate.invertedIndex == nil {
return errors.New("inverted index is not enabled")
}
total := 0
for _, f := range fields {
total += len(f.Marshal())
Expand Down
3 changes: 3 additions & 0 deletions banyand/tsdb/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ func (b *Buffer) Read(key []byte, ts time.Time) ([]byte, bool) {

// Close gracefully closes the Buffer and ensures that all pending operations are completed.
func (b *Buffer) Close() error {
if b == nil {
return nil
}
b.closerOnce.Do(func() {
b.entryCloser.Done()
b.entryCloser.CloseThenWait()
Expand Down
15 changes: 8 additions & 7 deletions banyand/tsdb/index/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ import (

// Message wraps value and other info to generate relative indices.
type Message struct {
Value Value
LocalWriter tsdb.Writer
BlockCloser io.Closer
Scope tsdb.Entry
IndexWriter tsdb.IndexWriter
BlockCloser io.Closer
Value Value
Scope tsdb.Entry
GlobalItemID tsdb.GlobalItemID
}

// Value represents the input data for generating indices.
Expand Down Expand Up @@ -122,8 +123,8 @@ func NewWriter(ctx context.Context, options WriterOptions) *Writer {

func (s *Writer) Write(m Message) {
err := multierr.Combine(
s.writeLocalIndex(m.LocalWriter, m.Value),
s.writeGlobalIndex(m.Scope, m.LocalWriter.ItemID(), m.Value),
s.writeLocalIndex(m.IndexWriter, m.Value),
s.writeGlobalIndex(m.Scope, m.GlobalItemID, m.Value),
m.BlockCloser.Close(),
)
if err != nil {
Expand Down Expand Up @@ -191,7 +192,7 @@ func (s *Writer) writeGlobalIndex(scope tsdb.Entry, ref tsdb.GlobalItemID, value
)
}

func (s *Writer) writeLocalIndex(writer tsdb.Writer, value Value) (err error) {
func (s *Writer) writeLocalIndex(writer tsdb.IndexWriter, value Value) (err error) {
collect := func(ruleIndexes []*partition.IndexRuleLocator, fn func(fields []index.Field) error) error {
fields := make([]index.Field, 0)
for _, ruleIndex := range ruleIndexes {
Expand Down
13 changes: 13 additions & 0 deletions banyand/tsdb/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"

"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/index"
)

var _ Shard = (*scopedShard)(nil)
Expand Down Expand Up @@ -73,6 +74,18 @@ type scopedSeriesDatabase struct {
scope Entry
}

func (sdd *scopedSeriesDatabase) writeInvertedIndex(fields []index.Field, seriesID common.SeriesID) error {
return sdd.delegated.writeInvertedIndex(fields, seriesID)
}

func (sdd *scopedSeriesDatabase) writeLSMIndex(fields []index.Field, seriesID common.SeriesID) error {
return sdd.delegated.writeLSMIndex(fields, seriesID)
}

func (sdd *scopedSeriesDatabase) Search(ctx context.Context, path Path, filter index.Filter, order *OrderBy) (SeriesList, error) {
return sdd.delegated.Search(ctx, path.prepend(sdd.scope), filter, order)
}

func (sdd *scopedSeriesDatabase) Close() error {
return nil
}
Expand Down
1 change: 0 additions & 1 deletion banyand/tsdb/series_seek.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ type seekerBuilder struct {
seriesSpan *seriesSpan
indexRuleForSorting *databasev1.IndexRule
l *logger.Logger
rangeOptsForSorting index.RangeOpts
order modelv1.Sort
}

Expand Down
5 changes: 3 additions & 2 deletions banyand/tsdb/series_seek_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
var (
errUnspecifiedIndexType = errors.New("Unspecified index type")
emptyFilters = make([]filterFn, 0)
rangeOpts = index.RangeOpts{}
)

func (s *seekerBuilder) OrderByIndex(indexRule *databasev1.IndexRule, order modelv1.Sort) SeekerBuilder {
Expand Down Expand Up @@ -84,9 +85,9 @@ func (s *seekerBuilder) buildSeriesByIndex() (series []Iterator, err error) {
}
switch s.indexRuleForSorting.GetType() {
case databasev1.IndexRule_TYPE_TREE:
inner, err = b.lsmIndexReader().Iterator(fieldKey, s.rangeOptsForSorting, s.order)
inner, err = b.lsmIndexReader().Iterator(fieldKey, rangeOpts, s.order)
case databasev1.IndexRule_TYPE_INVERTED:
inner, err = b.invertedIndexReader().Iterator(fieldKey, s.rangeOptsForSorting, s.order)
inner, err = b.invertedIndexReader().Iterator(fieldKey, rangeOpts, s.order)
case databasev1.IndexRule_TYPE_UNSPECIFIED:
return nil, errors.WithMessagef(errUnspecifiedIndexType, "index rule:%v", s.indexRuleForSorting)
}
Expand Down
23 changes: 23 additions & 0 deletions banyand/tsdb/series_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,26 @@ func (w *writer) Write() (GlobalItemID, error) {
Term: convert.Int64ToBytes(w.ts.UnixNano()),
}, id.ID)
}

var _ IndexWriter = (*seriesIndexWriter)(nil)

type seriesIndexWriter struct {
seriesDB SeriesDatabase
seriesID common.SeriesID
}

func (s *seriesIndexWriter) WriteInvertedIndex(fields []index.Field) error {
return s.seriesDB.writeInvertedIndex(fields, s.seriesID)
}

func (s *seriesIndexWriter) WriteLSMIndex(fields []index.Field) error {
return s.seriesDB.writeLSMIndex(fields, s.seriesID)
}

// NewSeriesIndexWriter returns a new series index writer.
func NewSeriesIndexWriter(seriesID common.SeriesID, seriesDB SeriesDatabase) IndexWriter {
return &seriesIndexWriter{
seriesID: seriesID,
seriesDB: seriesDB,
}
}
Loading