Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update release related files #535

Merged
merged 6 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ RELEASE_SCRIPTS := $(mk_dir)/scripts/release.sh
release-binary: release-source ## Package binary archive
${RELEASE_SCRIPTS} -b

release-source: clean ## Package source archive
release-source: ## Package source archive
${RELEASE_SCRIPTS} -s

release-sign: ## Sign artifacts
Expand All @@ -196,7 +196,11 @@ release-sign: ## Sign artifacts

release-assembly: release-binary release-sign ## Generate release package

PUSH_RELEASE_SCRIPTS := $(mk_dir)/scripts/push-release.sh

release-push-candidate: ## Push release candidate
${PUSH_RELEASE_SCRIPTS}

.PHONY: all $(PROJECTS) clean build default nuke
.PHONY: lint check tidy format pre-push
.PHONY: test test-race test-coverage test-ci
Expand Down
7 changes: 7 additions & 0 deletions banyand/measure/part.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,13 @@ func (pw *partWrapper) ID() uint64 {
return pw.p.partMetadata.ID
}

func (pw *partWrapper) String() string {
if pw.mp != nil {
return fmt.Sprintf("mem part %v", pw.mp.partMetadata)
}
return fmt.Sprintf("part %v", pw.p.partMetadata)
}

func mustOpenFilePart(id uint64, root string, fileSystem fs.FileSystem) *part {
var p part
partPath := partPath(root, id)
Expand Down
152 changes: 0 additions & 152 deletions banyand/measure/tstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,10 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query/model"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/test"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
"github.com/apache/skywalking-banyandb/pkg/watcher"
)

Expand Down Expand Up @@ -266,155 +263,6 @@ func Test_tstIter(t *testing.T) {
})
}
})

t.Run("file snapshot", func(t *testing.T) {
tests := []testCtx{
{
name: "Test with no data points",
dpsList: []*dataPoints{},
sids: []common.SeriesID{1, 2, 3},
minTimestamp: 1,
maxTimestamp: 1,
},
{
name: "Test with single part",
dpsList: []*dataPoints{dpsTS1},
sids: []common.SeriesID{1, 2, 3},
minTimestamp: 1,
maxTimestamp: 1,
want: []blockMetadata{
{seriesID: 1, count: 1, uncompressedSizeBytes: 1684},
{seriesID: 2, count: 1, uncompressedSizeBytes: 63},
{seriesID: 3, count: 1, uncompressedSizeBytes: 32},
},
},
{
name: "Test with multiple parts with different ts, the block will be merged",
dpsList: []*dataPoints{dpsTS1, dpsTS2},
sids: []common.SeriesID{1, 2, 3},
minTimestamp: 1,
maxTimestamp: 2,
want: []blockMetadata{
{seriesID: 1, count: 2, uncompressedSizeBytes: 3368},
{seriesID: 2, count: 2, uncompressedSizeBytes: 126},
{seriesID: 3, count: 2, uncompressedSizeBytes: 64},
},
},
{
name: "Test with multiple parts with same ts, duplicated blocks will be merged",
dpsList: []*dataPoints{dpsTS1, dpsTS1},
sids: []common.SeriesID{1, 2, 3},
minTimestamp: 1,
maxTimestamp: 2,
want: []blockMetadata{
{seriesID: 1, count: 1, uncompressedSizeBytes: 1684},
{seriesID: 2, count: 1, uncompressedSizeBytes: 63},
{seriesID: 3, count: 1, uncompressedSizeBytes: 32},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Run("merging on the fly", func(t *testing.T) {
tmpPath, defFn := test.Space(require.New(t))
fileSystem := fs.NewLocalFileSystem()
defer defFn()

tst, err := newTSTable(fileSystem, tmpPath, common.Position{},
logger.GetLogger("test"), timestamp.TimeRange{}, option{flushTimeout: 0, mergePolicy: newDefaultMergePolicyForTesting()}, nil)
require.NoError(t, err)
for i, dps := range tt.dpsList {
tst.mustAddDataPoints(dps)
timeout := time.After(flags.EventuallyTimeout) // Set the timeout duration
OUTER:
for {
select {
case <-timeout:
t.Fatalf("timeout waiting for snapshot %d to be introduced", i)
default:
snp := tst.currentSnapshot()
if snp == nil {
t.Logf("waiting for snapshot %d to be introduced", i)
time.Sleep(100 * time.Millisecond)
continue
}
if snp.creator != snapshotCreatorMemPart {
snp.decRef()
break OUTER
}
t.Logf("waiting for snapshot %d to be flushed or merged: current creator:%d, parts: %+v",
i, snp.creator, snp.parts)
snp.decRef()
time.Sleep(100 * time.Millisecond)
}
}
}
// wait until some parts are merged
if len(tt.dpsList) > 0 {
timeout := time.After(flags.EventuallyTimeout) // Set the timeout duration
OUTER1:
for {
select {
case <-timeout:
t.Fatalf("timeout waiting for snapshot to be merged")
default:
snp := tst.currentSnapshot()
if snp == nil {
time.Sleep(100 * time.Millisecond)
continue
}
if len(snp.parts) == 1 {
snp.decRef()
break OUTER1
}
t.Logf("waiting for snapshot to be merged: current creator:%d, parts: %+v", snp.creator, snp.parts)
snp.decRef()
time.Sleep(100 * time.Millisecond)
}
}
}
verify(t, tt, tst)
})

t.Run("merging on close", func(t *testing.T) {
t.Skip("the test is flaky due to unpredictable merge loop schedule.")
tmpPath, defFn := test.Space(require.New(t))
fileSystem := fs.NewLocalFileSystem()
defer defFn()

tst, err := newTSTable(fileSystem, tmpPath, common.Position{},
logger.GetLogger("test"), timestamp.TimeRange{}, option{flushTimeout: defaultFlushTimeout, mergePolicy: newDefaultMergePolicyForTesting()}, &metrics{})
require.NoError(t, err)
for _, dps := range tt.dpsList {
tst.mustAddDataPoints(dps)
time.Sleep(100 * time.Millisecond)
}
// wait until the introducer is done
if len(tt.dpsList) > 0 {
for {
snp := tst.currentSnapshot()
if snp == nil {
time.Sleep(100 * time.Millisecond)
continue
}
if len(snp.parts) == len(tt.dpsList) {
snp.decRef()
tst.Close()
break
}
snp.decRef()
time.Sleep(100 * time.Millisecond)
}
}
// reopen the table
tst, err = newTSTable(fileSystem, tmpPath, common.Position{},
logger.GetLogger("test"), timestamp.TimeRange{}, option{flushTimeout: defaultFlushTimeout, mergePolicy: newDefaultMergePolicyForTesting()}, nil)
require.NoError(t, err)
verify(t, tt, tst)
})
})
}
})
}

var tagProjections = map[int][]model.TagProjection{
Expand Down
2 changes: 1 addition & 1 deletion banyand/metadata/schema/property.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (e *etcdSchemaRegistry) replaceProperty(ctx context.Context, key string, pr

func tagLen(property *propertyv1.Property) (uint32, error) {
tagsCount := len(property.Tags)
if tagsCount < 0 || tagsCount > math.MaxUint32 {
if tagsCount < 0 || uint64(tagsCount) > math.MaxUint32 {
return 0, errors.New("integer overflow: tags count exceeds uint32 range")
}
tagsNum := uint32(tagsCount)
Expand Down
7 changes: 7 additions & 0 deletions banyand/stream/part.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,13 @@ func (pw *partWrapper) ID() uint64 {
return pw.p.partMetadata.ID
}

func (pw *partWrapper) String() string {
if pw.mp != nil {
return fmt.Sprintf("mem part %v", pw.mp.partMetadata)
}
return fmt.Sprintf("part %v", pw.p.partMetadata)
}

func mustOpenFilePart(id uint64, root string, fileSystem fs.FileSystem) *part {
var p part
partPath := partPath(root, id)
Expand Down
Loading
Loading