Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bugs and add docs #548

Merged
merged 6 commits into from
Oct 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ Release Notes.
- Fix panic when reading a disorder block of measure. This block's versions are not sorted in descending order.
- Fix the bug that the etcd client doesn't reconnect when facing the context timeout in the startup phase.
- Fix the bug that the long running query doesn't stop when the context is canceled.
- Fix the bug that merge block with different tags or fields.

### Documentation

Expand Down
3 changes: 3 additions & 0 deletions banyand/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ COPY build/bin/windows/${TARGETARCH}/banyand-server-static "/banyand"

FROM build-${TARGETOS} AS final

ENV GRPC_GO_LOG_SEVERITY_LEVEL=ERROR
ENV GRPC_GO_LOG_FORMATTER=json

EXPOSE 17912
EXPOSE 17913
EXPOSE 6060
Expand Down
216 changes: 180 additions & 36 deletions banyand/measure/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package measure

import (
"fmt"
"slices"
"sort"

Expand Down Expand Up @@ -787,63 +788,206 @@ func (bi *blockPointer) appendAll(b *blockPointer) {
bi.append(b, len(b.timestamps))
}

var log = logger.GetLogger("measure").Named("block")

func (bi *blockPointer) append(b *blockPointer, offset int) {
if offset <= b.idx {
return
}
if len(bi.tagFamilies) == 0 && len(b.tagFamilies) > 0 {
for _, tf := range b.tagFamilies {
tagFamily := columnFamily{name: tf.name}
for _, c := range tf.columns {
col := column{name: c.name, valueType: c.valueType}
assertIdxAndOffset(col.name, len(c.values), b.idx, offset)
col.values = append(col.values, c.values[b.idx:offset]...)
tagFamily.columns = append(tagFamily.columns, col)
fullTagAppend(bi, b, offset)
} else {
if err := fastTagAppend(bi, b, offset); err != nil {
if log.Debug().Enabled() {
log.Debug().Msgf("fastTagMerge failed: %v; falling back to fullTagMerge", err)
}
bi.tagFamilies = append(bi.tagFamilies, tagFamily)
fullTagAppend(bi, b, offset)
}
}

if len(bi.field.columns) == 0 && len(b.field.columns) > 0 {
fullFieldAppend(bi, b, offset)
} else {
if len(bi.tagFamilies) != len(b.tagFamilies) {
logger.Panicf("unexpected number of tag families: got %d; want %d", len(bi.tagFamilies), len(b.tagFamilies))
if err := fastFieldAppend(bi, b, offset); err != nil {
if log.Debug().Enabled() {
log.Debug().Msgf("fastFieldAppend failed: %v; falling back to fullFieldAppend", err)
}
fullFieldAppend(bi, b, offset)
}
}

assertIdxAndOffset("timestamps", len(b.timestamps), bi.idx, offset)
bi.timestamps = append(bi.timestamps, b.timestamps[b.idx:offset]...)
assertIdxAndOffset("versions", len(b.versions), bi.idx, offset)
bi.versions = append(bi.versions, b.versions[b.idx:offset]...)
}

func fastTagAppend(bi, b *blockPointer, offset int) error {
if len(bi.tagFamilies) != len(b.tagFamilies) {
return fmt.Errorf("unexpected number of tag families: got %d; want %d", len(b.tagFamilies), len(bi.tagFamilies))
}
for i := range bi.tagFamilies {
if bi.tagFamilies[i].name != b.tagFamilies[i].name {
return fmt.Errorf("unexpected tag family name: got %q; want %q", b.tagFamilies[i].name, bi.tagFamilies[i].name)
}
for i := range bi.tagFamilies {
if bi.tagFamilies[i].name != b.tagFamilies[i].name {
logger.Panicf("unexpected tag family name: got %q; want %q", bi.tagFamilies[i].name, b.tagFamilies[i].name)
if len(bi.tagFamilies[i].columns) != len(b.tagFamilies[i].columns) {
return fmt.Errorf("unexpected number of tags for tag family %q: got %d; want %d",
bi.tagFamilies[i].name, len(b.tagFamilies[i].columns), len(bi.tagFamilies[i].columns))
}
for j := range bi.tagFamilies[i].columns {
if bi.tagFamilies[i].columns[j].name != b.tagFamilies[i].columns[j].name {
return fmt.Errorf("unexpected tag name for tag family %q: got %q; want %q",
bi.tagFamilies[i].name, b.tagFamilies[i].columns[j].name, bi.tagFamilies[i].columns[j].name)
}
if len(bi.tagFamilies[i].columns) != len(b.tagFamilies[i].columns) {
logger.Panicf("unexpected number of tags for tag family %q: got %d; want %d", bi.tagFamilies[i].name, len(bi.tagFamilies[i].columns), len(b.tagFamilies[i].columns))
assertIdxAndOffset(b.tagFamilies[i].columns[j].name, len(b.tagFamilies[i].columns[j].values), b.idx, offset)
bi.tagFamilies[i].columns[j].values = append(bi.tagFamilies[i].columns[j].values, b.tagFamilies[i].columns[j].values[b.idx:offset]...)
}
}
return nil
}

func fullTagAppend(bi, b *blockPointer, offset int) {
existDataSize := len(bi.timestamps)
appendTagFamilies := func(tf columnFamily) {
tagFamily := columnFamily{name: tf.name}
for i := range tf.columns {
assertIdxAndOffset(tf.columns[i].name, len(tf.columns[i].values), b.idx, offset)
col := column{name: tf.columns[i].name, valueType: tf.columns[i].valueType}
for j := 0; j < existDataSize; j++ {
col.values = append(col.values, nil)
}
col.values = append(col.values, tf.columns[i].values[b.idx:offset]...)
tagFamily.columns = append(tagFamily.columns, col)
}
bi.tagFamilies = append(bi.tagFamilies, tagFamily)
}
if len(bi.tagFamilies) == 0 {
for _, tf := range b.tagFamilies {
appendTagFamilies(tf)
}
return
}

tagFamilyMap := make(map[string]*columnFamily)
for i := range bi.tagFamilies {
tagFamilyMap[bi.tagFamilies[i].name] = &bi.tagFamilies[i]
}

for _, tf := range b.tagFamilies {
if existingTagFamily, exists := tagFamilyMap[tf.name]; exists {
columnMap := make(map[string]*column)
for i := range existingTagFamily.columns {
columnMap[existingTagFamily.columns[i].name] = &existingTagFamily.columns[i]
}
for j := range bi.tagFamilies[i].columns {
if bi.tagFamilies[i].columns[j].name != b.tagFamilies[i].columns[j].name {
logger.Panicf("unexpected tag name for tag family %q: got %q; want %q", bi.tagFamilies[i].name, bi.tagFamilies[i].columns[j].name, b.tagFamilies[i].columns[j].name)

for _, c := range tf.columns {
if existingColumn, exists := columnMap[c.name]; exists {
assertIdxAndOffset(c.name, len(c.values), b.idx, offset)
existingColumn.values = append(existingColumn.values, c.values[b.idx:offset]...)
} else {
assertIdxAndOffset(c.name, len(c.values), b.idx, offset)
col := column{name: c.name, valueType: c.valueType}
for j := 0; j < existDataSize; j++ {
col.values = append(col.values, nil)
}
col.values = append(col.values, c.values[b.idx:offset]...)
existingTagFamily.columns = append(existingTagFamily.columns, col)
}
assertIdxAndOffset(b.tagFamilies[i].columns[j].name, len(b.tagFamilies[i].columns[j].values), b.idx, offset)
bi.tagFamilies[i].columns[j].values = append(bi.tagFamilies[i].columns[j].values, b.tagFamilies[i].columns[j].values[b.idx:offset]...)
}
} else {
appendTagFamilies(tf)
}
}
for k := range tagFamilyMap {
delete(tagFamilyMap, k)
}
for i := range b.tagFamilies {
tagFamilyMap[b.tagFamilies[i].name] = &b.tagFamilies[i]
}
emptySize := offset - b.idx
for _, tf := range bi.tagFamilies {
if _, exists := tagFamilyMap[tf.name]; !exists {
for i := range tf.columns {
for j := 0; j < emptySize; j++ {
tf.columns[i].values = append(tf.columns[i].values, nil)
}
}
} else {
existingTagFamily := tagFamilyMap[tf.name]
columnMap := make(map[string]*column)
for i := range existingTagFamily.columns {
columnMap[existingTagFamily.columns[i].name] = &existingTagFamily.columns[i]
}
for i := range tf.columns {
if _, exists := columnMap[tf.columns[i].name]; !exists {
for j := 0; j < emptySize; j++ {
tf.columns[i].values = append(tf.columns[i].values, nil)
}
}
}
}
}
}

if len(bi.field.columns) == 0 && len(b.field.columns) > 0 {
for _, c := range b.field.columns {
col := column{name: c.name, valueType: c.valueType}
assertIdxAndOffset(col.name, len(c.values), b.idx, offset)
col.values = append(col.values, c.values[b.idx:offset]...)
bi.field.columns = append(bi.field.columns, col)
func fastFieldAppend(bi, b *blockPointer, offset int) error {
if len(bi.field.columns) != len(b.field.columns) {
return fmt.Errorf("unexpected number of fields: got %d; want %d", len(bi.field.columns), len(b.field.columns))
}
for i := range bi.field.columns {
if bi.field.columns[i].name != b.field.columns[i].name {
return fmt.Errorf("unexpected field name: got %q; want %q", b.field.columns[i].name, bi.field.columns[i].name)
}
} else {
if len(bi.field.columns) != len(b.field.columns) {
logger.Panicf("unexpected number of fields: got %d; want %d", len(bi.field.columns), len(b.field.columns))
assertIdxAndOffset(b.field.columns[i].name, len(b.field.columns[i].values), b.idx, offset)
bi.field.columns[i].values = append(bi.field.columns[i].values, b.field.columns[i].values[b.idx:offset]...)
}
return nil
}

func fullFieldAppend(bi, b *blockPointer, offset int) {
existDataSize := len(bi.timestamps)
appendFields := func(c column) {
col := column{name: c.name, valueType: c.valueType}
for j := 0; j < existDataSize; j++ {
col.values = append(col.values, nil)
}
for i := range bi.field.columns {
assertIdxAndOffset(b.field.columns[i].name, len(b.field.columns[i].values), b.idx, offset)
bi.field.columns[i].values = append(bi.field.columns[i].values, b.field.columns[i].values[b.idx:offset]...)
col.values = append(col.values, c.values[b.idx:offset]...)
bi.field.columns = append(bi.field.columns, col)
}
if len(bi.field.columns) == 0 {
for _, c := range b.field.columns {
appendFields(c)
}
return
}

assertIdxAndOffset("timestamps", len(b.timestamps), bi.idx, offset)
bi.timestamps = append(bi.timestamps, b.timestamps[b.idx:offset]...)
assertIdxAndOffset("versions", len(b.versions), bi.idx, offset)
bi.versions = append(bi.versions, b.versions[b.idx:offset]...)
fieldMap := make(map[string]*column)
for i := range bi.field.columns {
fieldMap[bi.field.columns[i].name] = &bi.field.columns[i]
}

for _, c := range b.field.columns {
if existingField, exists := fieldMap[c.name]; exists {
assertIdxAndOffset(c.name, len(c.values), b.idx, offset)
existingField.values = append(existingField.values, c.values[b.idx:offset]...)
} else {
appendFields(c)
}
}
for k := range fieldMap {
delete(fieldMap, k)
}
for i := range b.field.columns {
fieldMap[b.field.columns[i].name] = &b.field.columns[i]
}

emptySize := offset - b.idx
for i := range bi.field.columns {
if _, exists := fieldMap[bi.field.columns[i].name]; !exists {
for j := 0; j < emptySize; j++ {
bi.field.columns[i].values = append(bi.field.columns[i].values, nil)
}
}
}
}

func assertIdxAndOffset(name string, length int, idx int, offset int) {
Expand Down
Loading
Loading