From 03d2629635113f7a7b4e593012def7e65d95f5a1 Mon Sep 17 00:00:00 2001 From: Gao Hongtao Date: Wed, 2 Oct 2024 02:07:54 +0000 Subject: [PATCH 1/6] Fix some nits Signed-off-by: Gao Hongtao --- banyand/Dockerfile | 3 ++ banyand/measure/query.go | 29 +++++++++++++++---- banyand/measure/write.go | 8 +---- banyand/metadata/schema/etcd.go | 6 ++-- banyand/queue/sub/sub.go | 9 ++++-- banyand/stream/query.go | 20 ++++++++++--- banyand/stream/write.go | 8 +---- pkg/index/inverted/inverted.go | 2 +- pkg/index/inverted/inverted_series.go | 9 ++++-- .../stream/stream_plan_indexscan_local.go | 5 ++++ test/docker/Dockerfile | 2 ++ 11 files changed, 69 insertions(+), 32 deletions(-) diff --git a/banyand/Dockerfile b/banyand/Dockerfile index 42db44e58..5f21348f6 100644 --- a/banyand/Dockerfile +++ b/banyand/Dockerfile @@ -34,6 +34,9 @@ COPY build/bin/windows/${TARGETARCH}/banyand-server-static "/banyand" FROM build-${TARGETOS} AS final +ENV GRPC_GO_LOG_SEVERITY_LEVEL=ERROR +ENV GRPC_GO_LOG_FORMATTER=json + EXPOSE 17912 EXPOSE 17913 EXPOSE 6060 diff --git a/banyand/measure/query.go b/banyand/measure/query.go index ac1ccfa0e..d9d199184 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -41,7 +41,8 @@ import ( ) const ( - preloadSize = 100 + preloadSize = 100 + checkDoneEvery = 128 ) // Query allow to retrieve measure data points. @@ -252,12 +253,16 @@ func (s *measure) searchBlocks(ctx context.Context, result *queryResult, sids [] if tstIter.Error() != nil { return fmt.Errorf("cannot init tstIter: %w", tstIter.Error()) } + var hit int for tstIter.nextBlock() { - select { - case <-ctx.Done(): - return errors.WithMessagef(ctx.Err(), "interrupt: scanned %d blocks, remained %d/%d parts to scan", len(result.data), len(tstIter.piHeap), len(tstIter.piPool)) - default: + if hit%checkDoneEvery == 0 { + select { + case <-ctx.Done(): + return errors.WithMessagef(ctx.Err(), "interrupt: scanned %d blocks, remained %d/%d parts to scan", len(result.data), len(tstIter.piHeap), len(tstIter.piPool)) + default: + } } + hit++ bc := generateBlockCursor() p := tstIter.piHeap[0] bc.init(p.p, p.curBlock, qo) @@ -426,6 +431,7 @@ func binaryDataFieldValue(value []byte) *modelv1.FieldValue { type queryResult struct { ctx context.Context + hit int sidToIndex map[common.SeriesID]int storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue tagProjection []model.TagProjection @@ -438,6 +444,13 @@ type queryResult struct { } func (qr *queryResult) Pull() *model.MeasureResult { + select { + case <-qr.ctx.Done(): + return &model.MeasureResult{ + Error: errors.WithMessagef(qr.ctx.Err(), "interrupt: hit %d", qr.hit), + } + default: + } if !qr.loaded { if len(qr.data) == 0 { return nil @@ -446,6 +459,12 @@ func (qr *queryResult) Pull() *model.MeasureResult { cursorChan := make(chan int, len(qr.data)) for i := 0; i < len(qr.data); i++ { go func(i int) { + select { + case <-qr.ctx.Done(): + cursorChan <- i + return + default: + } tmpBlock := generateBlock() defer releaseBlock(tmpBlock) if !qr.data[i].loadData(tmpBlock) { diff --git a/banyand/measure/write.go b/banyand/measure/write.go index da5ba404e..64362ab2d 100644 --- a/banyand/measure/write.go +++ b/banyand/measure/write.go @@ -238,7 +238,7 @@ func (w *writeCallback) newDpt(tsdb storage.TSDB[*tsTable, option], dpg *dataPoi return dpt, nil } -func (w *writeCallback) Rev(ctx context.Context, message bus.Message) (resp bus.Message) { +func (w *writeCallback) Rev(_ context.Context, message bus.Message) (resp bus.Message) { events, ok := message.Data().([]any) if !ok { w.l.Warn().Msg("invalid event data type") @@ -250,12 +250,6 @@ func (w *writeCallback) Rev(ctx context.Context, message bus.Message) (resp bus. } groups := make(map[string]*dataPointsInGroup) for i := range events { - select { - case <-ctx.Done(): - w.l.Warn().Msgf("context is done, handled %d events", i) - break - default: - } var writeEvent *measurev1.InternalWriteRequest switch e := events[i].(type) { case *measurev1.InternalWriteRequest: diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go index b9a7e6013..0e1120399 100644 --- a/banyand/metadata/schema/etcd.go +++ b/banyand/metadata/schema/etcd.go @@ -521,10 +521,8 @@ func (e *etcdSchemaRegistry) revokeLease(lease *clientv3.LeaseGrantResponse) { ctx, cancel := context.WithTimeout(context.Background(), leaseDuration) defer cancel() _, err := e.client.Lease.Revoke(ctx, lease.ID) - if err != nil { - if !errors.Is(err, context.DeadlineExceeded) { - e.l.Error().Err(err).Msgf("failed to revoke lease %d", lease.ID) - } + if err != nil && e.l.Debug().Enabled() { + e.l.Debug().Err(err).Msgf("failed to revoke lease %d", lease.ID) } } diff --git a/banyand/queue/sub/sub.go b/banyand/queue/sub/sub.go index 0b3f2ae9e..46db41747 100644 --- a/banyand/queue/sub/sub.go +++ b/banyand/queue/sub/sub.go @@ -39,12 +39,11 @@ func (s *server) Send(stream clusterv1.Service_SendServer) error { reply := func(writeEntity *clusterv1.SendRequest, err error, message string) { s.log.Error().Stringer("request", writeEntity).Err(err).Msg(message) s.metrics.totalMsgReceivedErr.Inc(1, writeEntity.Topic) - s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic) if errResp := stream.Send(&clusterv1.SendResponse{ MessageId: writeEntity.MessageId, Error: message, }); errResp != nil { - s.log.Err(errResp).AnErr("original", err).Stringer("request", writeEntity).Msg("failed to send error response") + s.log.Error().Err(errResp).AnErr("original", err).Stringer("request", writeEntity).Msg("failed to send error response") s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic) } } @@ -153,6 +152,12 @@ func (s *server) Send(stream clusterv1.Service_SendServer) error { case proto.Message: message = d case common.Error: + select { + case <-ctx.Done(): + s.metrics.totalMsgReceivedErr.Inc(1, writeEntity.Topic) + return ctx.Err() + default: + } reply(writeEntity, nil, d.Msg()) continue default: diff --git a/banyand/stream/query.go b/banyand/stream/query.go index ee335c3ca..d96e39ee9 100644 --- a/banyand/stream/query.go +++ b/banyand/stream/query.go @@ -42,6 +42,8 @@ import ( "github.com/apache/skywalking-banyandb/pkg/query/model" ) +const checkDoneEvery = 128 + func (s *stream) Query(ctx context.Context, sqo model.StreamQueryOptions) (sqr model.StreamQueryResult, err error) { if sqo.TimeRange == nil || len(sqo.Entities) < 1 { return nil, errors.New("invalid query options: timeRange and series are required") @@ -198,12 +200,16 @@ func (qr *queryResult) scanParts(ctx context.Context, qo queryOptions) error { if ti.Error() != nil { return fmt.Errorf("cannot init tstIter: %w", ti.Error()) } + var hit int for ti.nextBlock() { - select { - case <-ctx.Done(): - return errors.WithMessagef(ctx.Err(), "interrupt: scanned %d blocks, remained %d/%d parts to scan", len(qr.data), len(ti.piHeap), len(ti.piPool)) - default: + if hit%checkDoneEvery == 0 { + select { + case <-ctx.Done(): + return errors.WithMessagef(ctx.Err(), "interrupt: scanned %d blocks, remained %d/%d parts to scan", len(qr.data), len(ti.piHeap), len(ti.piPool)) + default: + } } + hit++ bc := generateBlockCursor() p := ti.piHeap[0] bc.init(p.p, p.curBlock, qo) @@ -229,6 +235,12 @@ func (qr *queryResult) load(ctx context.Context, qo queryOptions) *model.StreamR cursorChan := make(chan int, len(qr.data)) for i := 0; i < len(qr.data); i++ { go func(i int) { + select { + case <-ctx.Done(): + cursorChan <- i + return + default: + } tmpBlock := generateBlock() defer releaseBlock(tmpBlock) if !qr.data[i].loadData(tmpBlock) { diff --git a/banyand/stream/write.go b/banyand/stream/write.go index 1349eecad..610a67819 100644 --- a/banyand/stream/write.go +++ b/banyand/stream/write.go @@ -219,7 +219,7 @@ func (w *writeCallback) handle(dst map[string]*elementsInGroup, writeEvent *stre return dst, nil } -func (w *writeCallback) Rev(ctx context.Context, message bus.Message) (resp bus.Message) { +func (w *writeCallback) Rev(_ context.Context, message bus.Message) (resp bus.Message) { events, ok := message.Data().([]any) if !ok { w.l.Warn().Msg("invalid event data type") @@ -232,12 +232,6 @@ func (w *writeCallback) Rev(ctx context.Context, message bus.Message) (resp bus. groups := make(map[string]*elementsInGroup) var builder strings.Builder for i := range events { - select { - case <-ctx.Done(): - w.l.Warn().Msgf("context is done, handled %d events", i) - break - default: - } var writeEvent *streamv1.InternalWriteRequest switch e := events[i].(type) { case *streamv1.InternalWriteRequest: diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index 4987ddb5c..5d35d6aa6 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -380,7 +380,7 @@ func (bmi *blugeMatchIterator) Next() bool { bmi.err = io.EOF return false } - bmi.hit++ + bmi.hit = match.HitNumber for i := range bmi.current.Values { bmi.current.Values[i] = nil } diff --git a/pkg/index/inverted/inverted_series.go b/pkg/index/inverted/inverted_series.go index f9b700f4f..7288dd315 100644 --- a/pkg/index/inverted/inverted_series.go +++ b/pkg/index/inverted/inverted_series.go @@ -113,12 +113,17 @@ func (s *store) Search(ctx context.Context, func parseResult(dmi search.DocumentMatchIterator, loadedFields []index.FieldKey) ([]index.SeriesDocument, error) { result := make([]index.SeriesDocument, 0, 10) next, err := dmi.Next() + if err != nil { + return nil, errors.WithMessage(err, "iterate document match iterator") + } docIDMap := make(map[uint64]struct{}) fields := make([]string, 0, len(loadedFields)) for i := range loadedFields { fields = append(fields, loadedFields[i].Marshal()) } + var hitNumber int for err == nil && next != nil { + hitNumber = next.HitNumber var doc index.SeriesDocument if len(loadedFields) > 0 { doc.Fields = make(map[string][]byte) @@ -144,7 +149,7 @@ func parseResult(dmi search.DocumentMatchIterator, loadedFields []index.FieldKey return true }) if err != nil { - return nil, errors.WithMessagef(err, "visit stored fields, hit: %d", len(result)) + return nil, errors.WithMessagef(err, "visit stored fields, hit: %d", hitNumber) } if doc.Key.ID > 0 { result = append(result, doc) @@ -152,7 +157,7 @@ func parseResult(dmi search.DocumentMatchIterator, loadedFields []index.FieldKey next, err = dmi.Next() } if err != nil { - return nil, errors.WithMessagef(err, "iterate document match iterator, hit: %d", len(result)) + return nil, errors.WithMessagef(err, "iterate document match iterator, hit: %d", hitNumber) } return result, nil } diff --git a/pkg/query/logical/stream/stream_plan_indexscan_local.go b/pkg/query/logical/stream/stream_plan_indexscan_local.go index 49723340f..f7db768a1 100644 --- a/pkg/query/logical/stream/stream_plan_indexscan_local.go +++ b/pkg/query/logical/stream/stream_plan_indexscan_local.go @@ -73,6 +73,11 @@ func (i *localIndexScan) Sort(order *logical.OrderBy) { } func (i *localIndexScan) Execute(ctx context.Context) ([]*streamv1.Element, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } if i.result != nil { return BuildElementsFromStreamResult(ctx, i.result), nil } diff --git a/test/docker/Dockerfile b/test/docker/Dockerfile index a8b068c48..4ce236a51 100644 --- a/test/docker/Dockerfile +++ b/test/docker/Dockerfile @@ -26,6 +26,8 @@ COPY banyand/build/bin/linux/${TARGETARCH}/banyand-server-static /banyand COPY bydbctl/build/bin/linux/${TARGETARCH}/bydbctl-cli-static /bydbctl COPY --from=certs /etc/ssl/certs /etc/ssl/certs +ENV GRPC_GO_LOG_SEVERITY_LEVEL=INFO +ENV GRPC_GO_LOG_FORMATTER=json EXPOSE 17912 EXPOSE 17913 From cb82eb10650a15eb7e38606d95bce32b163b91b8 Mon Sep 17 00:00:00 2001 From: Gao Hongtao Date: Fri, 4 Oct 2024 05:07:42 +0000 Subject: [PATCH 2/6] Add doc Signed-off-by: Gao Hongtao --- docs/concept/tsdb.md | 25 ++++++++++++++++++++++--- docs/menu.yml | 2 +- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/docs/concept/tsdb.md b/docs/concept/tsdb.md index 2aa9cbd59..024773234 100644 --- a/docs/concept/tsdb.md +++ b/docs/concept/tsdb.md @@ -1,10 +1,10 @@ -# TimeSeries Database(TSDB) +# TimeSeries Database(TSDB) v1.1.0 TSDB is a time-series storage engine designed to store and query large volumes of time-series data. One of the key features of TSDB is its ability to automatically manage data storage over time, optimize performance and ensure that the system can scale to handle large workloads. TSDB empowers `Measure` and `Stream` relevant data. In TSDB, the data in a group is partitioned base on the time range of the data. The segment size is determined by the `segment_interval` of a group. The number of segments in a group is determined by the `ttl` of a group. A new segment is created when the written data exceeds the time range of the current segment. The expired segment will be deleted after the `ttl` of the group. -![tsdb](https://skywalking.apache.org/doc-graph/banyandb/v0.7.0/tsdb.png) +![tsdb](https://skywalking.apache.org/doc-graph/banyandb/v0.7.0/tsdb-hierarchy.png) ## Segment @@ -16,10 +16,29 @@ In each segment, the data is spread into shards based on `entity`. The series in Each shard is assigned to a specific set of storage nodes, and those nodes store and process the data within that shard. This allows BanyanDB to scale horizontally by adding more storage nodes to the cluster as needed. -Each shard is composed of multiple [parts](#Part). Whenever SkyWalking sends a batch of data, BanyanDB writes this batch of data into a new part. For data of the `Stream` type, the inverted indexes generated based on the indexing rules are also stored in the segment. Since BanyanDB adopts a snapshot approach for data read and write operations, the segment also needs to maintain additional snapshot information to record the validity of the parts. +Each shard is composed of multiple [parts](#Part). Whenever SkyWalking sends a batch of data, BanyanDB writes this batch of data into a new part. For data of the `Stream` type, the inverted indexes generated based on the indexing rules are also stored in the segment. + +Since BanyanDB adopts a snapshot approach for data read and write operations, the segment also needs to maintain additional snapshot information to record the validity of the parts. The shard contains `xxxxxxx.snp` to record the validity of parts. In the chart, `0000000000000001` is removed from the snapshot file, which means the part is invalid. It will be cleaned up in the next flush or merge operation. ![shard](https://skywalking.apache.org/doc-graph/banyandb/v0.7.0/shard.png) +## Inverted Index + +The inverted index is used to locate the data in the shard. For `measure`, it is a mapping from the term to the series id. For `stream`, it is a mapping from the term to the timestamp. + +The inverted index stores `snapshot` file `xxxxxxx.snp` to record the validity of segments. In the chart, `0000000000000001.seg` is removed from the snapshot file, which means the segment is invalid. It will be cleaned up in the next flush or merge operation. + +The segment file `xxxxxxxx.seg` contains the inverted index data. It includes four parts: + +- **Tags**: The mapping from the tag name to the dictionary location. +- **Dictionary**: It's a FST(Finite State Transducer) dictionary to map tag value to the posting list. +- **Posting List**: The mapping from the tag value to the series id or timestamp. It also contains a location info to the stored tag value. +- **Stored Tag Value**: The stored tag value. If you set tag spec `indexed_only=true`, the tag value will not be stored here. + +![inverted-index](https://skywalking.apache.org/doc-graph/banyandb/v0.7.0/inverted-index.png) + +If you want to search `Tag1=Value1`, the index will first search the `Tags` part to find the dictionary location of `Tag1`. Then, it will search the `Dictionary` part to find the posting list location of `Value1`. Finally, it will search the `Posting List` part to find the series id or timestamp. If you want to fetch the tag value, it will search the `Stored Tag Value` part to find the tag value. + ## Part Within a part, data is split into multiple files in a columnar manner. The timestamps are stored in the `timestamps.bin` file, tags are organized in persistent tag families as various files with the `.tf` suffix, and fields are stored separately in the `fields.bin` file. diff --git a/docs/menu.yml b/docs/menu.yml index bc31ebd20..16ee424d8 100644 --- a/docs/menu.yml +++ b/docs/menu.yml @@ -126,7 +126,7 @@ catalog: - name: "File Format" catalog: - name: "v1.1.0" - path: "" + path: "/concept/tsdb.md" - name: "Concepts" catalog: - name: "Clustering" From 3b0e6619c2a228d911dec458cac53d54ad35f140 Mon Sep 17 00:00:00 2001 From: Gao Hongtao Date: Fri, 4 Oct 2024 05:07:42 +0000 Subject: [PATCH 3/6] Add doc Signed-off-by: Gao Hongtao --- docs/menu.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/menu.yml b/docs/menu.yml index 16ee424d8..19ec17330 100644 --- a/docs/menu.yml +++ b/docs/menu.yml @@ -126,7 +126,7 @@ catalog: - name: "File Format" catalog: - name: "v1.1.0" - path: "/concept/tsdb.md" + path: "/concept/tsdb" - name: "Concepts" catalog: - name: "Clustering" From 55cf02c658b9f4898b03736a567c7d2c42468361 Mon Sep 17 00:00:00 2001 From: Gao Hongtao Date: Sat, 5 Oct 2024 06:16:09 +0000 Subject: [PATCH 4/6] Append different tag and field structure Signed-off-by: Gao Hongtao --- banyand/measure/block.go | 216 +++++++++++++++++---- banyand/measure/block_test.go | 356 ++++++++++++++++++++++++++++++++++ banyand/measure/query.go | 2 +- banyand/stream/block.go | 137 ++++++++++--- banyand/stream/block_test.go | 214 ++++++++++++++++++++ 5 files changed, 864 insertions(+), 61 deletions(-) diff --git a/banyand/measure/block.go b/banyand/measure/block.go index d6808bd3b..282c0b22b 100644 --- a/banyand/measure/block.go +++ b/banyand/measure/block.go @@ -18,6 +18,7 @@ package measure import ( + "fmt" "slices" "sort" @@ -787,63 +788,206 @@ func (bi *blockPointer) appendAll(b *blockPointer) { bi.append(b, len(b.timestamps)) } +var log = logger.GetLogger("measure").Named("block") + func (bi *blockPointer) append(b *blockPointer, offset int) { if offset <= b.idx { return } if len(bi.tagFamilies) == 0 && len(b.tagFamilies) > 0 { - for _, tf := range b.tagFamilies { - tagFamily := columnFamily{name: tf.name} - for _, c := range tf.columns { - col := column{name: c.name, valueType: c.valueType} - assertIdxAndOffset(col.name, len(c.values), b.idx, offset) - col.values = append(col.values, c.values[b.idx:offset]...) - tagFamily.columns = append(tagFamily.columns, col) + fullTagAppend(bi, b, offset) + } else { + if err := fastTagAppend(bi, b, offset); err != nil { + if log.Debug().Enabled() { + log.Debug().Msgf("fastTagMerge failed: %v; falling back to fullTagMerge", err) } - bi.tagFamilies = append(bi.tagFamilies, tagFamily) + fullTagAppend(bi, b, offset) } + } + + if len(bi.field.columns) == 0 && len(b.field.columns) > 0 { + fullFieldAppend(bi, b, offset) } else { - if len(bi.tagFamilies) != len(b.tagFamilies) { - logger.Panicf("unexpected number of tag families: got %d; want %d", len(bi.tagFamilies), len(b.tagFamilies)) + if err := fastFieldAppend(bi, b, offset); err != nil { + if log.Debug().Enabled() { + log.Debug().Msgf("fastFieldAppend failed: %v; falling back to fullFieldAppend", err) + } + fullFieldAppend(bi, b, offset) + } + } + + assertIdxAndOffset("timestamps", len(b.timestamps), bi.idx, offset) + bi.timestamps = append(bi.timestamps, b.timestamps[b.idx:offset]...) + assertIdxAndOffset("versions", len(b.versions), bi.idx, offset) + bi.versions = append(bi.versions, b.versions[b.idx:offset]...) +} + +func fastTagAppend(bi, b *blockPointer, offset int) error { + if len(bi.tagFamilies) != len(b.tagFamilies) { + return fmt.Errorf("unexpected number of tag families: got %d; want %d", len(b.tagFamilies), len(bi.tagFamilies)) + } + for i := range bi.tagFamilies { + if bi.tagFamilies[i].name != b.tagFamilies[i].name { + return fmt.Errorf("unexpected tag family name: got %q; want %q", b.tagFamilies[i].name, bi.tagFamilies[i].name) } - for i := range bi.tagFamilies { - if bi.tagFamilies[i].name != b.tagFamilies[i].name { - logger.Panicf("unexpected tag family name: got %q; want %q", bi.tagFamilies[i].name, b.tagFamilies[i].name) + if len(bi.tagFamilies[i].columns) != len(b.tagFamilies[i].columns) { + return fmt.Errorf("unexpected number of tags for tag family %q: got %d; want %d", + bi.tagFamilies[i].name, len(b.tagFamilies[i].columns), len(bi.tagFamilies[i].columns)) + } + for j := range bi.tagFamilies[i].columns { + if bi.tagFamilies[i].columns[j].name != b.tagFamilies[i].columns[j].name { + return fmt.Errorf("unexpected tag name for tag family %q: got %q; want %q", + bi.tagFamilies[i].name, b.tagFamilies[i].columns[j].name, bi.tagFamilies[i].columns[j].name) } - if len(bi.tagFamilies[i].columns) != len(b.tagFamilies[i].columns) { - logger.Panicf("unexpected number of tags for tag family %q: got %d; want %d", bi.tagFamilies[i].name, len(bi.tagFamilies[i].columns), len(b.tagFamilies[i].columns)) + assertIdxAndOffset(b.tagFamilies[i].columns[j].name, len(b.tagFamilies[i].columns[j].values), b.idx, offset) + bi.tagFamilies[i].columns[j].values = append(bi.tagFamilies[i].columns[j].values, b.tagFamilies[i].columns[j].values[b.idx:offset]...) + } + } + return nil +} + +func fullTagAppend(bi, b *blockPointer, offset int) { + existDataSize := len(bi.timestamps) + appendTagFamilies := func(tf columnFamily) { + tagFamily := columnFamily{name: tf.name} + for i := range tf.columns { + assertIdxAndOffset(tf.columns[i].name, len(tf.columns[i].values), b.idx, offset) + col := column{name: tf.columns[i].name, valueType: tf.columns[i].valueType} + for j := 0; j < existDataSize; j++ { + col.values = append(col.values, nil) + } + col.values = append(col.values, tf.columns[i].values[b.idx:offset]...) + tagFamily.columns = append(tagFamily.columns, col) + } + bi.tagFamilies = append(bi.tagFamilies, tagFamily) + } + if len(bi.tagFamilies) == 0 { + for _, tf := range b.tagFamilies { + appendTagFamilies(tf) + } + return + } + + tagFamilyMap := make(map[string]*columnFamily) + for i := range bi.tagFamilies { + tagFamilyMap[bi.tagFamilies[i].name] = &bi.tagFamilies[i] + } + + for _, tf := range b.tagFamilies { + if existingTagFamily, exists := tagFamilyMap[tf.name]; exists { + columnMap := make(map[string]*column) + for i := range existingTagFamily.columns { + columnMap[existingTagFamily.columns[i].name] = &existingTagFamily.columns[i] } - for j := range bi.tagFamilies[i].columns { - if bi.tagFamilies[i].columns[j].name != b.tagFamilies[i].columns[j].name { - logger.Panicf("unexpected tag name for tag family %q: got %q; want %q", bi.tagFamilies[i].name, bi.tagFamilies[i].columns[j].name, b.tagFamilies[i].columns[j].name) + + for _, c := range tf.columns { + if existingColumn, exists := columnMap[c.name]; exists { + assertIdxAndOffset(c.name, len(c.values), b.idx, offset) + existingColumn.values = append(existingColumn.values, c.values[b.idx:offset]...) + } else { + assertIdxAndOffset(c.name, len(c.values), b.idx, offset) + col := column{name: c.name, valueType: c.valueType} + for j := 0; j < existDataSize; j++ { + col.values = append(col.values, nil) + } + col.values = append(col.values, c.values[b.idx:offset]...) + existingTagFamily.columns = append(existingTagFamily.columns, col) } - assertIdxAndOffset(b.tagFamilies[i].columns[j].name, len(b.tagFamilies[i].columns[j].values), b.idx, offset) - bi.tagFamilies[i].columns[j].values = append(bi.tagFamilies[i].columns[j].values, b.tagFamilies[i].columns[j].values[b.idx:offset]...) } + } else { + appendTagFamilies(tf) } } + for k := range tagFamilyMap { + delete(tagFamilyMap, k) + } + for i := range b.tagFamilies { + tagFamilyMap[b.tagFamilies[i].name] = &b.tagFamilies[i] + } + emptySize := offset - b.idx + for _, tf := range bi.tagFamilies { + if _, exists := tagFamilyMap[tf.name]; !exists { + for i := range tf.columns { + for j := 0; j < emptySize; j++ { + tf.columns[i].values = append(tf.columns[i].values, nil) + } + } + } else { + existingTagFamily := tagFamilyMap[tf.name] + columnMap := make(map[string]*column) + for i := range existingTagFamily.columns { + columnMap[existingTagFamily.columns[i].name] = &existingTagFamily.columns[i] + } + for i := range tf.columns { + if _, exists := columnMap[tf.columns[i].name]; !exists { + for j := 0; j < emptySize; j++ { + tf.columns[i].values = append(tf.columns[i].values, nil) + } + } + } + } + } +} - if len(bi.field.columns) == 0 && len(b.field.columns) > 0 { - for _, c := range b.field.columns { - col := column{name: c.name, valueType: c.valueType} - assertIdxAndOffset(col.name, len(c.values), b.idx, offset) - col.values = append(col.values, c.values[b.idx:offset]...) - bi.field.columns = append(bi.field.columns, col) +func fastFieldAppend(bi, b *blockPointer, offset int) error { + if len(bi.field.columns) != len(b.field.columns) { + return fmt.Errorf("unexpected number of fields: got %d; want %d", len(bi.field.columns), len(b.field.columns)) + } + for i := range bi.field.columns { + if bi.field.columns[i].name != b.field.columns[i].name { + return fmt.Errorf("unexpected field name: got %q; want %q", b.field.columns[i].name, bi.field.columns[i].name) } - } else { - if len(bi.field.columns) != len(b.field.columns) { - logger.Panicf("unexpected number of fields: got %d; want %d", len(bi.field.columns), len(b.field.columns)) + assertIdxAndOffset(b.field.columns[i].name, len(b.field.columns[i].values), b.idx, offset) + bi.field.columns[i].values = append(bi.field.columns[i].values, b.field.columns[i].values[b.idx:offset]...) + } + return nil +} + +func fullFieldAppend(bi, b *blockPointer, offset int) { + existDataSize := len(bi.timestamps) + appendFields := func(c column) { + col := column{name: c.name, valueType: c.valueType} + for j := 0; j < existDataSize; j++ { + col.values = append(col.values, nil) } - for i := range bi.field.columns { - assertIdxAndOffset(b.field.columns[i].name, len(b.field.columns[i].values), b.idx, offset) - bi.field.columns[i].values = append(bi.field.columns[i].values, b.field.columns[i].values[b.idx:offset]...) + col.values = append(col.values, c.values[b.idx:offset]...) + bi.field.columns = append(bi.field.columns, col) + } + if len(bi.field.columns) == 0 { + for _, c := range b.field.columns { + appendFields(c) } + return } - assertIdxAndOffset("timestamps", len(b.timestamps), bi.idx, offset) - bi.timestamps = append(bi.timestamps, b.timestamps[b.idx:offset]...) - assertIdxAndOffset("versions", len(b.versions), bi.idx, offset) - bi.versions = append(bi.versions, b.versions[b.idx:offset]...) + fieldMap := make(map[string]*column) + for i := range bi.field.columns { + fieldMap[bi.field.columns[i].name] = &bi.field.columns[i] + } + + for _, c := range b.field.columns { + if existingField, exists := fieldMap[c.name]; exists { + assertIdxAndOffset(c.name, len(c.values), b.idx, offset) + existingField.values = append(existingField.values, c.values[b.idx:offset]...) + } else { + appendFields(c) + } + } + for k := range fieldMap { + delete(fieldMap, k) + } + for i := range b.field.columns { + fieldMap[b.field.columns[i].name] = &b.field.columns[i] + } + + emptySize := offset - b.idx + for i := range bi.field.columns { + if _, exists := fieldMap[bi.field.columns[i].name]; !exists { + for j := 0; j < emptySize; j++ { + bi.field.columns[i].values = append(bi.field.columns[i].values, nil) + } + } + } } func assertIdxAndOffset(name string, length int, idx int, offset int) { diff --git a/banyand/measure/block_test.go b/banyand/measure/block_test.go index 5d41f236f..cabaea10d 100644 --- a/banyand/measure/block_test.go +++ b/banyand/measure/block_test.go @@ -716,6 +716,362 @@ func Test_blockPointer_append(t *testing.T) { idx: 0, }, }, + { + name: "Test append with missing tag family", + fields: fields{ + timestamps: []int64{1, 2}, + versions: []int64{1, 1}, + tagFamilies: []columnFamily{ + { + name: "arrTag", + columns: []column{ + { + name: "strArrTag", valueType: pbv1.ValueTypeStrArr, + values: [][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}), marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})}, + }, + }, + }, + }, + field: columnFamily{ + columns: []column{ + {name: "strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), []byte("field4")}}, + }, + }, + partID: 1, + }, + args: args{ + b: &blockPointer{ + block: block{ + timestamps: []int64{4, 5}, + versions: []int64{1, 1}, + tagFamilies: []columnFamily{}, + field: columnFamily{ + columns: []column{ + {name: "strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field5"), []byte("field6")}}, + }, + }, + }, + idx: 0, + }, + offset: 2, + }, + want: &blockPointer{ + block: block{ + timestamps: []int64{1, 2, 4, 5}, + versions: []int64{1, 1, 1, 1}, + tagFamilies: []columnFamily{ + { + name: "arrTag", + columns: []column{ + { + name: "strArrTag", valueType: pbv1.ValueTypeStrArr, + values: [][]byte{ + marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}), + marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}), + nil, nil, + }, + }, + }, + }, + }, + field: columnFamily{ + columns: []column{ + {name: "strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), []byte("field4"), []byte("field5"), []byte("field6")}}, + }, + }, + }, + idx: 0, + }, + }, + { + name: "Test append with additional tag family", + fields: fields{ + timestamps: []int64{1, 2}, + versions: []int64{1, 1}, + tagFamilies: []columnFamily{}, + field: columnFamily{ + columns: []column{ + {name: "strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), []byte("field4")}}, + }, + }, + partID: 1, + }, + args: args{ + b: &blockPointer{ + block: block{ + timestamps: []int64{4, 5}, + versions: []int64{1, 1}, + tagFamilies: []columnFamily{ + { + name: "arrTag", + columns: []column{ + { + name: "strArrTag", valueType: pbv1.ValueTypeStrArr, + values: [][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}), marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})}, + }, + }, + }, + }, + field: columnFamily{ + columns: []column{ + {name: "strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field5"), []byte("field6")}}, + }, + }, + }, + idx: 0, + }, + offset: 2, + }, + want: &blockPointer{ + block: block{ + timestamps: []int64{1, 2, 4, 5}, + versions: []int64{1, 1, 1, 1}, + tagFamilies: []columnFamily{ + { + name: "arrTag", + columns: []column{ + { + name: "strArrTag", valueType: pbv1.ValueTypeStrArr, + values: [][]byte{ + nil, nil, + marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}), + marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}), + }, + }, + }, + }, + }, + field: columnFamily{ + columns: []column{ + {name: "strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), []byte("field4"), []byte("field5"), []byte("field6")}}, + }, + }, + }, + idx: 0, + }, + }, + { + name: "Test append with missing tag", + fields: fields{ + timestamps: []int64{1, 2}, + versions: []int64{1, 1}, + tagFamilies: []columnFamily{ + { + name: "arrTag", + columns: []column{ + { + name: "strArrTag", valueType: pbv1.ValueTypeStrArr, + values: [][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}), marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})}, + }, + }, + }, + }, + field: columnFamily{ + columns: []column{ + {name: "strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), []byte("field4")}}, + }, + }, + partID: 1, + }, + args: args{ + b: &blockPointer{ + block: block{ + timestamps: []int64{4, 5}, + versions: []int64{1, 1}, + tagFamilies: []columnFamily{ + { + name: "arrTag", + columns: []column{}, + }, + }, + field: columnFamily{ + columns: []column{ + {name: "strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field5"), []byte("field6")}}, + }, + }, + }, + idx: 0, + }, + offset: 2, + }, + want: &blockPointer{ + block: block{ + timestamps: []int64{1, 2, 4, 5}, + versions: []int64{1, 1, 1, 1}, + tagFamilies: []columnFamily{ + { + name: "arrTag", + columns: []column{ + { + name: "strArrTag", valueType: pbv1.ValueTypeStrArr, + values: [][]byte{ + marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}), + marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}), + nil, nil, + }, + }, + }, + }, + }, + field: columnFamily{ + columns: []column{ + {name: "strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), []byte("field4"), []byte("field5"), []byte("field6")}}, + }, + }, + }, + idx: 0, + }, + }, + { + name: "Test append with additional tag", + fields: fields{ + timestamps: []int64{1, 2}, + versions: []int64{1, 1}, + tagFamilies: []columnFamily{ + { + name: "arrTag", + columns: []column{}, + }, + }, + field: columnFamily{ + columns: []column{ + {name: "strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), []byte("field4")}}, + }, + }, + partID: 1, + }, + args: args{ + b: &blockPointer{ + block: block{ + timestamps: []int64{4, 5}, + versions: []int64{1, 1}, + tagFamilies: []columnFamily{ + { + name: "arrTag", + columns: []column{ + { + name: "strArrTag", valueType: pbv1.ValueTypeStrArr, + values: [][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}), marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})}, + }, + }, + }, + }, + field: columnFamily{ + columns: []column{ + {name: "strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field5"), []byte("field6")}}, + }, + }, + }, + idx: 0, + }, + offset: 2, + }, + want: &blockPointer{ + block: block{ + timestamps: []int64{1, 2, 4, 5}, + versions: []int64{1, 1, 1, 1}, + tagFamilies: []columnFamily{ + { + name: "arrTag", + columns: []column{ + { + name: "strArrTag", valueType: pbv1.ValueTypeStrArr, + values: [][]byte{ + nil, nil, + marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}), + marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}), + }, + }, + }, + }, + }, + field: columnFamily{ + columns: []column{ + {name: "strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), []byte("field4"), []byte("field5"), []byte("field6")}}, + }, + }, + }, + idx: 0, + }, + }, + { + name: "Test append with missing field", + fields: fields{ + timestamps: []int64{1, 2}, + versions: []int64{1, 1}, + tagFamilies: []columnFamily{}, + field: columnFamily{ + columns: []column{ + {name: "strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), []byte("field4")}}, + }, + }, + partID: 1, + }, + args: args{ + b: &blockPointer{ + block: block{ + timestamps: []int64{4, 5}, + versions: []int64{1, 1}, + tagFamilies: []columnFamily{}, + field: columnFamily{}, + }, + idx: 0, + }, + offset: 2, + }, + want: &blockPointer{ + block: block{ + timestamps: []int64{1, 2, 4, 5}, + versions: []int64{1, 1, 1, 1}, + tagFamilies: []columnFamily{}, + field: columnFamily{ + columns: []column{ + {name: "strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field3"), []byte("field4"), nil, nil}}, + }, + }, + }, + idx: 0, + }, + }, + { + name: "Test append with additional field", + fields: fields{ + timestamps: []int64{1, 2}, + versions: []int64{1, 1}, + tagFamilies: []columnFamily{}, + field: columnFamily{}, + partID: 1, + }, + args: args{ + b: &blockPointer{ + block: block{ + timestamps: []int64{4, 5}, + versions: []int64{1, 1}, + tagFamilies: []columnFamily{}, + field: columnFamily{ + columns: []column{ + {name: "strField", valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field5"), []byte("field6")}}, + }, + }, + }, + idx: 0, + }, + offset: 2, + }, + want: &blockPointer{ + block: block{ + timestamps: []int64{1, 2, 4, 5}, + versions: []int64{1, 1, 1, 1}, + tagFamilies: []columnFamily{}, + field: columnFamily{ + columns: []column{ + {name: "strField", valueType: pbv1.ValueTypeStr, values: [][]byte{nil, nil, []byte("field5"), []byte("field6")}}, + }, + }, + }, + idx: 0, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/banyand/measure/query.go b/banyand/measure/query.go index d9d199184..e4b1cb3d6 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -431,13 +431,13 @@ func binaryDataFieldValue(value []byte) *modelv1.FieldValue { type queryResult struct { ctx context.Context - hit int sidToIndex map[common.SeriesID]int storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue tagProjection []model.TagProjection data []*blockCursor snapshots []*snapshot segments []storage.Segment[*tsTable, option] + hit int loaded bool orderByTS bool ascTS bool diff --git a/banyand/stream/block.go b/banyand/stream/block.go index 3b65e88e5..956c1bf28 100644 --- a/banyand/stream/block.go +++ b/banyand/stream/block.go @@ -18,6 +18,7 @@ package stream import ( + "fmt" "sort" "golang.org/x/exp/slices" @@ -603,45 +604,133 @@ func (bi *blockPointer) appendAll(b *blockPointer) { bi.append(b, len(b.timestamps)) } +var log = logger.GetLogger("stream").Named("block") + func (bi *blockPointer) append(b *blockPointer, offset int) { if offset <= b.idx { return } if len(bi.tagFamilies) == 0 && len(b.tagFamilies) > 0 { + fullTagAppend(bi, b, offset) + } else { + if err := fastTagAppend(bi, b, offset); err != nil { + if log.Debug().Enabled() { + log.Debug().Msgf("fastTagMerge failed: %v; falling back to fullTagMerge", err) + } + fullTagAppend(bi, b, offset) + } + } + + assertIdxAndOffset("timestamps", len(b.timestamps), bi.idx, offset) + bi.timestamps = append(bi.timestamps, b.timestamps[b.idx:offset]...) + bi.elementIDs = append(bi.elementIDs, b.elementIDs[b.idx:offset]...) +} + +func fastTagAppend(bi, b *blockPointer, offset int) error { + if len(bi.tagFamilies) != len(b.tagFamilies) { + return fmt.Errorf("unexpected number of tag families: got %d; want %d", len(b.tagFamilies), len(bi.tagFamilies)) + } + for i := range bi.tagFamilies { + if bi.tagFamilies[i].name != b.tagFamilies[i].name { + return fmt.Errorf("unexpected tag family name: got %q; want %q", b.tagFamilies[i].name, bi.tagFamilies[i].name) + } + if len(bi.tagFamilies[i].tags) != len(b.tagFamilies[i].tags) { + return fmt.Errorf("unexpected number of tags for tag family %q: got %d; want %d", + bi.tagFamilies[i].name, len(b.tagFamilies[i].tags), len(bi.tagFamilies[i].tags)) + } + for j := range bi.tagFamilies[i].tags { + if bi.tagFamilies[i].tags[j].name != b.tagFamilies[i].tags[j].name { + return fmt.Errorf("unexpected tag name for tag family %q: got %q; want %q", + bi.tagFamilies[i].name, b.tagFamilies[i].tags[j].name, bi.tagFamilies[i].tags[j].name) + } + assertIdxAndOffset(b.tagFamilies[i].tags[j].name, len(b.tagFamilies[i].tags[j].values), b.idx, offset) + bi.tagFamilies[i].tags[j].values = append(bi.tagFamilies[i].tags[j].values, b.tagFamilies[i].tags[j].values[b.idx:offset]...) + } + } + return nil +} + +func fullTagAppend(bi, b *blockPointer, offset int) { + existDataSize := len(bi.timestamps) + appendTagFamilies := func(tf tagFamily) { + tfv := tagFamily{name: tf.name} + for i := range tf.tags { + assertIdxAndOffset(tf.tags[i].name, len(tf.tags[i].values), b.idx, offset) + col := tag{name: tf.tags[i].name, valueType: tf.tags[i].valueType} + for j := 0; j < existDataSize; j++ { + col.values = append(col.values, nil) + } + col.values = append(col.values, tf.tags[i].values[b.idx:offset]...) + tfv.tags = append(tfv.tags, col) + } + bi.tagFamilies = append(bi.tagFamilies, tfv) + } + if len(bi.tagFamilies) == 0 { for _, tf := range b.tagFamilies { - tFamily := tagFamily{name: tf.name} + appendTagFamilies(tf) + } + return + } + + tagFamilyMap := make(map[string]*tagFamily) + for i := range bi.tagFamilies { + tagFamilyMap[bi.tagFamilies[i].name] = &bi.tagFamilies[i] + } + + for _, tf := range b.tagFamilies { + if existingTagFamily, exists := tagFamilyMap[tf.name]; exists { + columnMap := make(map[string]*tag) + for i := range existingTagFamily.tags { + columnMap[existingTagFamily.tags[i].name] = &existingTagFamily.tags[i] + } + for _, c := range tf.tags { - col := tag{name: c.name, valueType: c.valueType} - assertIdxAndOffset(col.name, len(c.values), b.idx, offset) - col.values = append(col.values, c.values[b.idx:offset]...) - tFamily.tags = append(tFamily.tags, col) + if existingColumn, exists := columnMap[c.name]; exists { + assertIdxAndOffset(c.name, len(c.values), b.idx, offset) + existingColumn.values = append(existingColumn.values, c.values[b.idx:offset]...) + } else { + assertIdxAndOffset(c.name, len(c.values), b.idx, offset) + col := tag{name: c.name, valueType: c.valueType} + for j := 0; j < existDataSize; j++ { + col.values = append(col.values, nil) + } + col.values = append(col.values, c.values[b.idx:offset]...) + existingTagFamily.tags = append(existingTagFamily.tags, col) + } } - bi.tagFamilies = append(bi.tagFamilies, tFamily) + } else { + appendTagFamilies(tf) } - } else { - if len(bi.tagFamilies) != len(b.tagFamilies) { - logger.Panicf("unexpected number of tag families: got %d; want %d", len(bi.tagFamilies), len(b.tagFamilies)) - } - for i := range bi.tagFamilies { - if bi.tagFamilies[i].name != b.tagFamilies[i].name { - logger.Panicf("unexpected tag family name: got %q; want %q", bi.tagFamilies[i].name, b.tagFamilies[i].name) + } + for k := range tagFamilyMap { + delete(tagFamilyMap, k) + } + for i := range b.tagFamilies { + tagFamilyMap[b.tagFamilies[i].name] = &b.tagFamilies[i] + } + emptySize := offset - b.idx + for _, tf := range bi.tagFamilies { + if _, exists := tagFamilyMap[tf.name]; !exists { + for i := range tf.tags { + for j := 0; j < emptySize; j++ { + tf.tags[i].values = append(tf.tags[i].values, nil) + } } - if len(bi.tagFamilies[i].tags) != len(b.tagFamilies[i].tags) { - logger.Panicf("unexpected number of tags for tag family %q: got %d; want %d", bi.tagFamilies[i].name, len(bi.tagFamilies[i].tags), len(b.tagFamilies[i].tags)) + } else { + existingTagFamily := tagFamilyMap[tf.name] + columnMap := make(map[string]*tag) + for i := range existingTagFamily.tags { + columnMap[existingTagFamily.tags[i].name] = &existingTagFamily.tags[i] } - for j := range bi.tagFamilies[i].tags { - if bi.tagFamilies[i].tags[j].name != b.tagFamilies[i].tags[j].name { - logger.Panicf("unexpected tag name for tag family %q: got %q; want %q", bi.tagFamilies[i].name, bi.tagFamilies[i].tags[j].name, b.tagFamilies[i].tags[j].name) + for i := range tf.tags { + if _, exists := columnMap[tf.tags[i].name]; !exists { + for j := 0; j < emptySize; j++ { + tf.tags[i].values = append(tf.tags[i].values, nil) + } } - assertIdxAndOffset(b.tagFamilies[i].tags[j].name, len(b.tagFamilies[i].tags[j].values), b.idx, offset) - bi.tagFamilies[i].tags[j].values = append(bi.tagFamilies[i].tags[j].values, b.tagFamilies[i].tags[j].values[b.idx:offset]...) } } } - - assertIdxAndOffset("timestamps", len(b.timestamps), bi.idx, offset) - bi.timestamps = append(bi.timestamps, b.timestamps[b.idx:offset]...) - bi.elementIDs = append(bi.elementIDs, b.elementIDs[b.idx:offset]...) } func assertIdxAndOffset(name string, length int, idx int, offset int) { diff --git a/banyand/stream/block_test.go b/banyand/stream/block_test.go index ed57f48de..3632d638d 100644 --- a/banyand/stream/block_test.go +++ b/banyand/stream/block_test.go @@ -609,6 +609,220 @@ func Test_blockPointer_append(t *testing.T) { idx: 0, }, }, + { + name: "Test append with missing tag family", + fields: fields{ + timestamps: []int64{1, 2}, + elementIDs: []uint64{0, 1}, + tagFamilies: []tagFamily{ + { + name: "arrTag", + tags: []tag{ + { + name: "strArrTag", valueType: pbv1.ValueTypeStrArr, + values: [][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}), marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})}, + }, + }, + }, + }, + }, + args: args{ + b: &blockPointer{ + block: block{ + timestamps: []int64{4, 5}, + elementIDs: []uint64{2, 3}, + tagFamilies: []tagFamily{}, + }, + idx: 0, + }, + offset: 2, + }, + want: &blockPointer{ + block: block{ + timestamps: []int64{1, 2, 4, 5}, + elementIDs: []uint64{0, 1, 2, 3}, + tagFamilies: []tagFamily{ + { + name: "arrTag", + tags: []tag{ + { + name: "strArrTag", valueType: pbv1.ValueTypeStrArr, + values: [][]byte{ + marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}), + marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}), + nil, nil, + }, + }, + }, + }, + }, + }, + idx: 0, + }, + }, + { + name: "Test append with additional tag family", + fields: fields{ + timestamps: []int64{1, 2}, + elementIDs: []uint64{0, 1}, + tagFamilies: []tagFamily{}, + }, + args: args{ + b: &blockPointer{ + block: block{ + timestamps: []int64{4, 5}, + elementIDs: []uint64{2, 3}, + tagFamilies: []tagFamily{ + { + name: "arrTag", + tags: []tag{ + { + name: "strArrTag", valueType: pbv1.ValueTypeStrArr, + values: [][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}), marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})}, + }, + }, + }, + }, + }, + idx: 0, + }, + offset: 2, + }, + want: &blockPointer{ + block: block{ + timestamps: []int64{1, 2, 4, 5}, + elementIDs: []uint64{0, 1, 2, 3}, + tagFamilies: []tagFamily{ + { + name: "arrTag", + tags: []tag{ + { + name: "strArrTag", valueType: pbv1.ValueTypeStrArr, + values: [][]byte{ + nil, nil, + marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}), + marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}), + }, + }, + }, + }, + }, + }, + idx: 0, + }, + }, + { + name: "Test append with missing tag", + fields: fields{ + timestamps: []int64{1, 2}, + elementIDs: []uint64{0, 1}, + tagFamilies: []tagFamily{ + { + name: "arrTag", + tags: []tag{ + { + name: "strArrTag", valueType: pbv1.ValueTypeStrArr, + values: [][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}), marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})}, + }, + }, + }, + }, + }, + args: args{ + b: &blockPointer{ + block: block{ + timestamps: []int64{4, 5}, + elementIDs: []uint64{2, 3}, + tagFamilies: []tagFamily{ + { + name: "arrTag", + tags: []tag{}, + }, + }, + }, + idx: 0, + }, + offset: 2, + }, + want: &blockPointer{ + block: block{ + timestamps: []int64{1, 2, 4, 5}, + elementIDs: []uint64{0, 1, 2, 3}, + tagFamilies: []tagFamily{ + { + name: "arrTag", + tags: []tag{ + { + name: "strArrTag", valueType: pbv1.ValueTypeStrArr, + values: [][]byte{ + marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}), + marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}), + nil, nil, + }, + }, + }, + }, + }, + }, + idx: 0, + }, + }, + { + name: "Test append with additional tag", + fields: fields{ + timestamps: []int64{1, 2}, + elementIDs: []uint64{0, 1}, + tagFamilies: []tagFamily{ + { + name: "arrTag", + tags: []tag{}, + }, + }, + }, + args: args{ + b: &blockPointer{ + block: block{ + timestamps: []int64{4, 5}, + elementIDs: []uint64{2, 3}, + tagFamilies: []tagFamily{ + { + name: "arrTag", + tags: []tag{ + { + name: "strArrTag", valueType: pbv1.ValueTypeStrArr, + values: [][]byte{marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}), marshalStrArr([][]byte{[]byte("value7"), []byte("value8")})}, + }, + }, + }, + }, + }, + idx: 0, + }, + offset: 2, + }, + want: &blockPointer{ + block: block{ + timestamps: []int64{1, 2, 4, 5}, + elementIDs: []uint64{0, 1, 2, 3}, + tagFamilies: []tagFamily{ + { + name: "arrTag", + tags: []tag{ + { + name: "strArrTag", valueType: pbv1.ValueTypeStrArr, + values: [][]byte{ + nil, nil, + marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}), + marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}), + }, + }, + }, + }, + }, + }, + idx: 0, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { From 4f0c1aa2c0318d2442925b93b284eb5acd727694 Mon Sep 17 00:00:00 2001 From: Gao Hongtao Date: Sun, 6 Oct 2024 01:35:43 +0000 Subject: [PATCH 5/6] Fix the bug that accesses restored data Signed-off-by: Gao Hongtao --- banyand/measure/query.go | 19 ++++++++++--------- banyand/stream/query.go | 19 ++++++++++--------- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/banyand/measure/query.go b/banyand/measure/query.go index e4b1cb3d6..299402fe1 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -480,17 +480,18 @@ func (qr *queryResult) Pull() *model.MeasureResult { blankCursorList := []int{} for completed := 0; completed < len(qr.data); completed++ { - select { - case <-qr.ctx.Done(): - return &model.MeasureResult{ - Error: errors.WithMessagef(qr.ctx.Err(), "interrupt: loaded %d/%d cursors", completed, len(qr.data)), - } - case result := <-cursorChan: - if result != -1 { - blankCursorList = append(blankCursorList, result) - } + result := <-cursorChan + if result != -1 { + blankCursorList = append(blankCursorList, result) } } + select { + case <-qr.ctx.Done(): + return &model.MeasureResult{ + Error: errors.WithMessagef(qr.ctx.Err(), "interrupt: blank/total=%d/%d", len(blankCursorList), len(qr.data)), + } + default: + } sort.Slice(blankCursorList, func(i, j int) bool { return blankCursorList[i] > blankCursorList[j] }) diff --git a/banyand/stream/query.go b/banyand/stream/query.go index d96e39ee9..d8e14b286 100644 --- a/banyand/stream/query.go +++ b/banyand/stream/query.go @@ -296,16 +296,17 @@ func (qr *queryResult) load(ctx context.Context, qo queryOptions) *model.StreamR blankCursorList := []int{} for completed := 0; completed < len(qr.data); completed++ { - select { - case <-ctx.Done(): - return &model.StreamResult{ - Error: errors.WithMessagef(ctx.Err(), "interrupt: loaded %d/%d cursors", completed, len(qr.data)), - } - case result := <-cursorChan: - if result != -1 { - blankCursorList = append(blankCursorList, result) - } + result := <-cursorChan + if result != -1 { + blankCursorList = append(blankCursorList, result) + } + } + select { + case <-ctx.Done(): + return &model.StreamResult{ + Error: errors.WithMessagef(ctx.Err(), "interrupt: blank/total=%d/%d", len(blankCursorList), len(qr.data)), } + default: } sort.Slice(blankCursorList, func(i, j int) bool { return blankCursorList[i] > blankCursorList[j] From fffe9f4ee57370b5ad3f9495c76408b8a88e0d65 Mon Sep 17 00:00:00 2001 From: Gao Hongtao Date: Sun, 6 Oct 2024 12:28:16 +0000 Subject: [PATCH 6/6] Update changes Signed-off-by: Gao Hongtao --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index 652d735f4..bb78d51da 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -47,6 +47,7 @@ Release Notes. - Fix panic when reading a disorder block of measure. This block's versions are not sorted in descending order. - Fix the bug that the etcd client doesn't reconnect when facing the context timeout in the startup phase. - Fix the bug that the long running query doesn't stop when the context is canceled. +- Fix the bug that merge block with different tags or fields. ### Documentation