Skip to content

Commit

Permalink
Merge elements.bin and timestamp.bin
Browse files Browse the repository at this point in the history
Signed-off-by: Gao Hongtao <[email protected]>
  • Loading branch information
hanahmily committed Aug 1, 2024
1 parent 3a43587 commit 672dd60
Show file tree
Hide file tree
Showing 12 changed files with 92 additions and 199 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Release Notes.
- Move the series index into segment.
- Swap the segment and the shard.
- Move indexed values in a measure from data files to index files.
- Merge elementIDs.bin and timestamps.bin into a single file.

### Features

Expand Down
28 changes: 12 additions & 16 deletions banyand/measure/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,25 +380,21 @@ func mustSeqReadTimestampsFrom(timestamps, versions []int64, tm *timestampsMetad

func mustDecodeTimestampsWithVersions(timestamps, versions []int64, tm *timestampsMetadata, count int, path string, src []byte) ([]int64, []int64) {
var err error
if t := encoding.GetCommonType(tm.encodeType); t != encoding.EncodeTypeUnknown {
if tm.size < tm.versionOffset {
logger.Panicf("size %d must be greater than versionOffset %d", tm.size, tm.versionOffset)
}
timestamps, err = encoding.BytesToInt64List(timestamps, src[:tm.versionOffset], t, tm.min, count)
if err != nil {
logger.Panicf("%s: cannot unmarshal timestamps with versions: %v", path, err)
}
versions, err = encoding.BytesToInt64List(versions, src[tm.versionOffset:], tm.versionEncodeType, tm.versionFirst, count)
if err != nil {
logger.Panicf("%s: cannot unmarshal versions: %v", path, err)
}
return timestamps, versions
t := encoding.GetCommonType(tm.encodeType)
if t == encoding.EncodeTypeUnknown {
logger.Panicf("unexpected encodeType %d", tm.encodeType)
}
if tm.size < tm.versionOffset {
logger.Panicf("size %d must be greater than versionOffset %d", tm.size, tm.versionOffset)
}
timestamps, err = encoding.BytesToInt64List(timestamps, src[:tm.versionOffset], t, tm.min, count)
if err != nil {
logger.Panicf("%s: cannot unmarshal timestamps with versions: %v", path, err)
}
timestamps, err = encoding.BytesToInt64List(timestamps, src, tm.encodeType, tm.min, count)
versions, err = encoding.BytesToInt64List(versions, src[tm.versionOffset:], tm.versionEncodeType, tm.versionFirst, count)
if err != nil {
logger.Panicf("%s: cannot unmarshal timestamps: %v", path, err)
logger.Panicf("%s: cannot unmarshal versions: %v", path, err)
}
versions = encoding.ExtendInt64ListCapacity(versions, count)
return timestamps, versions
}

Expand Down
100 changes: 24 additions & 76 deletions banyand/stream/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/pkg/bytes"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
Expand Down Expand Up @@ -128,8 +127,7 @@ func (b *block) mustWriteTo(sid common.SeriesID, bm *blockMetadata, ww *writers)
bm.uncompressedSizeBytes = b.uncompressedSizeBytes()
bm.count = uint64(b.Len())

mustWriteTimestampsTo(&bm.timestamps, b.timestamps, &ww.timestampsWriter)
mustWriteElementIDsTo(&bm.elementIDs, b.elementIDs, &ww.elementIDsWriter)
mustWriteTimestampsTo(&bm.timestamps, b.timestamps, b.elementIDs, &ww.timestampsWriter)

for ti := range b.tagFamilies {
b.marshalTagFamily(b.tagFamilies[ti], bm, ww)
Expand Down Expand Up @@ -256,8 +254,7 @@ func (b *block) uncompressedSizeBytes() uint64 {
func (b *block) mustReadFrom(decoder *encoding.BytesBlockDecoder, p *part, bm blockMetadata) {
b.reset()

b.timestamps = mustReadTimestampsFrom(b.timestamps, &bm.timestamps, int(bm.count), p.timestamps)
b.elementIDs = mustReadElementIDsFrom(b.elementIDs, &bm.elementIDs, int(bm.count), p.elementIDs)
b.timestamps, b.elementIDs = mustReadTimestampsFrom(b.timestamps, b.elementIDs, &bm.timestamps, int(bm.count), p.timestamps)

_ = b.resizeTagFamilies(len(bm.tagProjection))
for i := range bm.tagProjection {
Expand All @@ -275,8 +272,7 @@ func (b *block) mustReadFrom(decoder *encoding.BytesBlockDecoder, p *part, bm bl
func (b *block) mustSeqReadFrom(decoder *encoding.BytesBlockDecoder, seqReaders *seqReaders, bm blockMetadata) {
b.reset()

b.timestamps = mustSeqReadTimestampsFrom(b.timestamps, &bm.timestamps, int(bm.count), &seqReaders.timestamps)
b.elementIDs = mustSeqReadElementIDsFrom(b.elementIDs, &bm.elementIDs, int(bm.count), &seqReaders.elementIDs)
b.timestamps, b.elementIDs = mustSeqReadTimestampsFrom(b.timestamps, b.elementIDs, &bm.timestamps, int(bm.count), &seqReaders.timestamps)

_ = b.resizeTagFamilies(len(bm.tagFamilies))
keys := make([]string, 0, len(bm.tagFamilies))
Expand All @@ -298,104 +294,56 @@ func (b *block) sortTagFamilies() {
})
}

func mustWriteTimestampsTo(tm *timestampsMetadata, timestamps []int64, timestampsWriter *writer) {
func mustWriteTimestampsTo(tm *timestampsMetadata, timestamps []int64, elementIDs []uint64, timestampsWriter *writer) {
tm.reset()

bb := bigValuePool.Generate()
defer bigValuePool.Release(bb)
bb.Buf, tm.encodeType, tm.min = encoding.Int64ListToBytes(bb.Buf[:0], timestamps)
if len(bb.Buf) > maxTimestampsBlockSize {
logger.Panicf("too big block with timestamps: %d bytes; the maximum supported size is %d bytes", len(bb.Buf), maxTimestampsBlockSize)
}
tm.max = timestamps[len(timestamps)-1]
tm.offset = timestampsWriter.bytesWritten
tm.size = uint64(len(bb.Buf))
tm.elementIDsOffset = uint64(len(bb.Buf))
timestampsWriter.MustWrite(bb.Buf)
bb.Buf = encoding.VarUint64sToBytes(bb.Buf[:0], elementIDs)
tm.size = tm.elementIDsOffset + uint64(len(bb.Buf))
timestampsWriter.MustWrite(bb.Buf)
}

func mustReadTimestampsFrom(dst []int64, tm *timestampsMetadata, count int, reader fs.Reader) []int64 {
func mustReadTimestampsFrom(timestamps []int64, elementIDs []uint64, tm *timestampsMetadata, count int, reader fs.Reader) ([]int64, []uint64) {
bb := bigValuePool.Generate()
defer bigValuePool.Release(bb)
bb.Buf = bytes.ResizeExact(bb.Buf, int(tm.size))
fs.MustReadData(reader, int64(tm.offset), bb.Buf)
var err error
dst, err = encoding.BytesToInt64List(dst, bb.Buf, tm.encodeType, tm.min, count)
if err != nil {
logger.Panicf("%s: cannot unmarshal timestamps: %v", reader.Path(), err)
}
return dst
return mustDecodeTimestampsWithVersions(timestamps, elementIDs, tm, count, reader.Path(), bb.Buf)
}

func mustWriteElementIDsTo(em *elementIDsMetadata, elementIDs []uint64, elementIDsWriter *writer) {
em.reset()

bb := bigValuePool.Generate()
defer bigValuePool.Release(bb)
elementIDsByteSlice := make([][]byte, len(elementIDs))
for i, elementID := range elementIDs {
elementIDsByteSlice[i] = convert.Uint64ToBytes(elementID)
}
bb.Buf = encoding.EncodeBytesBlock(bb.Buf, elementIDsByteSlice)
if len(bb.Buf) > maxElementIDsBlockSize {
logger.Panicf("too big block with elementIDs: %d bytes; the maximum supported size is %d bytes", len(bb.Buf), maxElementIDsBlockSize)
func mustDecodeTimestampsWithVersions(timestamps []int64, elementIDs []uint64, tm *timestampsMetadata, count int, path string, src []byte) ([]int64, []uint64) {
if tm.size < tm.elementIDsOffset {
logger.Panicf("size %d must be greater than elementIDsOffset %d", tm.size, tm.elementIDsOffset)
}
em.encodeType = encoding.EncodeTypeUnknown
em.offset = elementIDsWriter.bytesWritten
em.size = uint64(len(bb.Buf))
elementIDsWriter.MustWrite(bb.Buf)
}

func mustReadElementIDsFrom(dst []uint64, em *elementIDsMetadata, count int, reader fs.Reader) []uint64 {
bb := bigValuePool.Generate()
defer bigValuePool.Release(bb)
bb.Buf = bytes.ResizeExact(bb.Buf, int(em.size))
fs.MustReadData(reader, int64(em.offset), bb.Buf)
decoder := encoding.BytesBlockDecoder{}
var elementIDsByteSlice [][]byte
elementIDsByteSlice, err := decoder.Decode(elementIDsByteSlice, bb.Buf, uint64(count))
var err error
timestamps, err = encoding.BytesToInt64List(timestamps, src[:tm.elementIDsOffset], tm.encodeType, tm.min, count)
if err != nil {
logger.Panicf("%s: cannot unmarshal elementIDs: %v", reader.Path(), err)
logger.Panicf("%s: cannot unmarshal timestamps: %v", path, err)
}
for _, elementID := range elementIDsByteSlice {
dst = append(dst, convert.BytesToUint64(elementID))
elementIDs = encoding.ExtendListCapacity(elementIDs, count)
elementIDs = elementIDs[:count]
_, err = encoding.BytesToVarUint64s(elementIDs, src[tm.elementIDsOffset:])
if err != nil {
logger.Panicf("%s: cannot unmarshal element ids: %v", path, err)
}
return dst
return timestamps, elementIDs
}

func mustSeqReadTimestampsFrom(dst []int64, tm *timestampsMetadata, count int, reader *seqReader) []int64 {
func mustSeqReadTimestampsFrom(timestamps []int64, elementIDs []uint64, tm *timestampsMetadata, count int, reader *seqReader) ([]int64, []uint64) {
if tm.offset != reader.bytesRead {
logger.Panicf("offset %d must be equal to bytesRead %d", tm.offset, reader.bytesRead)
}
bb := bigValuePool.Generate()
defer bigValuePool.Release(bb)
bb.Buf = bytes.ResizeExact(bb.Buf, int(tm.size))
reader.mustReadFull(bb.Buf)
var err error
dst, err = encoding.BytesToInt64List(dst, bb.Buf, tm.encodeType, tm.min, count)
if err != nil {
logger.Panicf("%s: cannot unmarshal timestamps: %v", reader.Path(), err)
}
return dst
}

func mustSeqReadElementIDsFrom(dst []uint64, em *elementIDsMetadata, count int, reader *seqReader) []uint64 {
if em.offset != reader.bytesRead {
logger.Panicf("offset %d must be equal to bytesRead %d", em.offset, reader.bytesRead)
}
bb := bigValuePool.Generate()
defer bigValuePool.Release(bb)
bb.Buf = bytes.ResizeExact(bb.Buf, int(em.size))
reader.mustReadFull(bb.Buf)
decoder := encoding.BytesBlockDecoder{}
var elementIDsByteSlice [][]byte
elementIDsByteSlice, err := decoder.Decode(elementIDsByteSlice, bb.Buf, uint64(count))
if err != nil {
logger.Panicf("%s: cannot unmarshal elementIDs: %v", reader.Path(), err)
}
for _, elementID := range elementIDsByteSlice {
dst = append(dst, convert.BytesToUint64(elementID))
}
return dst
return mustDecodeTimestampsWithVersions(timestamps, elementIDs, tm, count, reader.Path(), bb.Buf)
}

func generateBlock() *block {
Expand Down
20 changes: 15 additions & 5 deletions banyand/stream/block_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,44 +240,54 @@ func releaseBlockMetadataArray(bma *blockMetadataArray) {

type timestampsMetadata struct {
dataBlock
min int64
max int64
encodeType encoding.EncodeType
min int64
max int64
elementIDsOffset uint64
encodeType encoding.EncodeType
}

func (tm *timestampsMetadata) reset() {
tm.dataBlock.reset()
tm.min = 0
tm.max = 0
tm.encodeType = 0
tm.elementIDsOffset = 0
}

func (tm *timestampsMetadata) copyFrom(src *timestampsMetadata) {
tm.dataBlock.copyFrom(&src.dataBlock)
tm.min = src.min
tm.max = src.max
tm.encodeType = src.encodeType
tm.elementIDsOffset = src.elementIDsOffset
}

func (tm *timestampsMetadata) marshal(dst []byte) []byte {
dst = tm.dataBlock.marshal(dst)
dst = encoding.Uint64ToBytes(dst, uint64(tm.min))
dst = encoding.Uint64ToBytes(dst, uint64(tm.max))
dst = append(dst, byte(tm.encodeType))
dst = encoding.VarUint64ToBytes(dst, tm.elementIDsOffset)
return dst
}

func (tm *timestampsMetadata) unmarshal(src []byte) ([]byte, error) {
src, err := tm.dataBlock.unmarshal(src)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal dataBlock: %w", err)
return nil, fmt.Errorf("cannot unmarshal ts blockData: %w", err)
}
tm.min = int64(encoding.BytesToUint64(src))
src = src[8:]
tm.max = int64(encoding.BytesToUint64(src))
src = src[8:]
tm.encodeType = encoding.EncodeType(src[0])
return src[1:], nil
src = src[1:]
src, n, err := encoding.BytesToVarUint64(src)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal ts offset: %w", err)
}
tm.elementIDsOffset = n
return src, nil
}

type elementIDsMetadata struct {
Expand Down
31 changes: 19 additions & 12 deletions banyand/stream/block_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@ func Test_timestampsMetadata_reset(t *testing.T) {
offset: 1,
size: 1,
},
min: 1,
max: 1,
encodeType: encoding.EncodeTypeConst,
min: 1,
max: 1,
encodeType: encoding.EncodeTypeConst,
elementIDsOffset: 1,
}

tm.reset()
Expand All @@ -90,6 +91,7 @@ func Test_timestampsMetadata_reset(t *testing.T) {
assert.Equal(t, uint64(0), tm.dataBlock.size)
assert.Equal(t, int64(0), tm.min)
assert.Equal(t, int64(0), tm.max)
assert.Equal(t, uint64(0), tm.elementIDsOffset)
assert.Equal(t, encoding.EncodeTypeUnknown, tm.encodeType)
}

Expand All @@ -99,19 +101,21 @@ func Test_timestampsMetadata_copyFrom(t *testing.T) {
offset: 1,
size: 1,
},
min: 1,
max: 1,
encodeType: encoding.EncodeTypeConst,
min: 1,
max: 1,
encodeType: encoding.EncodeTypeConst,
elementIDsOffset: 1,
}

dest := &timestampsMetadata{
dataBlock: dataBlock{
offset: 2,
size: 2,
},
min: 2,
max: 2,
encodeType: encoding.EncodeTypeDelta,
min: 2,
max: 2,
encodeType: encoding.EncodeTypeDelta,
elementIDsOffset: 2,
}

dest.copyFrom(src)
Expand All @@ -121,6 +125,7 @@ func Test_timestampsMetadata_copyFrom(t *testing.T) {
assert.Equal(t, src.min, dest.min)
assert.Equal(t, src.max, dest.max)
assert.Equal(t, src.encodeType, dest.encodeType)
assert.Equal(t, src.elementIDsOffset, dest.elementIDsOffset)
}

func Test_timestampsMetadata_marshal_unmarshal(t *testing.T) {
Expand All @@ -129,9 +134,10 @@ func Test_timestampsMetadata_marshal_unmarshal(t *testing.T) {
offset: 1,
size: 1,
},
min: 1,
max: 1,
encodeType: encoding.EncodeTypeConst,
min: 1,
max: 1,
encodeType: encoding.EncodeTypeConst,
elementIDsOffset: 1,
}

marshaled := original.marshal(nil)
Expand All @@ -146,6 +152,7 @@ func Test_timestampsMetadata_marshal_unmarshal(t *testing.T) {
assert.Equal(t, original.min, unmarshaled.min)
assert.Equal(t, original.max, unmarshaled.max)
assert.Equal(t, original.encodeType, unmarshaled.encodeType)
assert.Equal(t, original.elementIDsOffset, unmarshaled.elementIDsOffset)
}

func Test_blockMetadata_marshal_unmarshal(t *testing.T) {
Expand Down
3 changes: 0 additions & 3 deletions banyand/stream/block_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,11 @@ type seqReaders struct {
tagFamilies map[string]*seqReader
primary seqReader
timestamps seqReader
elementIDs seqReader
}

func (sr *seqReaders) reset() {
sr.primary.reset()
sr.timestamps.reset()
sr.elementIDs.reset()
if sr.tagFamilyMetadata != nil {
for k, r := range sr.tagFamilyMetadata {
releaseSeqReader(r)
Expand All @@ -112,7 +110,6 @@ func (sr *seqReaders) init(p *part) {
sr.reset()
sr.primary.init(p.primary)
sr.timestamps.init(p.timestamps)
sr.elementIDs.init(p.elementIDs)
if sr.tagFamilies == nil {
sr.tagFamilies = make(map[string]*seqReader)
sr.tagFamilyMetadata = make(map[string]*seqReader)
Expand Down
Loading

0 comments on commit 672dd60

Please sign in to comment.