diff --git a/Makefile b/Makefile index aa5449dfa..0d6a17477 100644 --- a/Makefile +++ b/Makefile @@ -186,7 +186,7 @@ RELEASE_SCRIPTS := $(mk_dir)/scripts/release.sh release-binary: release-source ## Package binary archive ${RELEASE_SCRIPTS} -b -release-source: clean ## Package source archive +release-source: ## Package source archive ${RELEASE_SCRIPTS} -s release-sign: ## Sign artifacts @@ -196,7 +196,11 @@ release-sign: ## Sign artifacts release-assembly: release-binary release-sign ## Generate release package +PUSH_RELEASE_SCRIPTS := $(mk_dir)/scripts/push-release.sh +release-push-candidate: ## Push release candidate + ${PUSH_RELEASE_SCRIPTS} + .PHONY: all $(PROJECTS) clean build default nuke .PHONY: lint check tidy format pre-push .PHONY: test test-race test-coverage test-ci diff --git a/banyand/measure/part.go b/banyand/measure/part.go index 1e89b1e9c..f4b6b812e 100644 --- a/banyand/measure/part.go +++ b/banyand/measure/part.go @@ -269,6 +269,13 @@ func (pw *partWrapper) ID() uint64 { return pw.p.partMetadata.ID } +func (pw *partWrapper) String() string { + if pw.mp != nil { + return fmt.Sprintf("mem part %v", pw.mp.partMetadata) + } + return fmt.Sprintf("part %v", pw.p.partMetadata) +} + func mustOpenFilePart(id uint64, root string, fileSystem fs.FileSystem) *part { var p part partPath := partPath(root, id) diff --git a/banyand/measure/tstable_test.go b/banyand/measure/tstable_test.go index 2f24d5a6c..8b011bd03 100644 --- a/banyand/measure/tstable_test.go +++ b/banyand/measure/tstable_test.go @@ -30,13 +30,10 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/fs" - "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/query/model" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/test" - "github.com/apache/skywalking-banyandb/pkg/test/flags" - "github.com/apache/skywalking-banyandb/pkg/timestamp" "github.com/apache/skywalking-banyandb/pkg/watcher" ) @@ -266,155 +263,6 @@ func Test_tstIter(t *testing.T) { }) } }) - - t.Run("file snapshot", func(t *testing.T) { - tests := []testCtx{ - { - name: "Test with no data points", - dpsList: []*dataPoints{}, - sids: []common.SeriesID{1, 2, 3}, - minTimestamp: 1, - maxTimestamp: 1, - }, - { - name: "Test with single part", - dpsList: []*dataPoints{dpsTS1}, - sids: []common.SeriesID{1, 2, 3}, - minTimestamp: 1, - maxTimestamp: 1, - want: []blockMetadata{ - {seriesID: 1, count: 1, uncompressedSizeBytes: 1684}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 32}, - }, - }, - { - name: "Test with multiple parts with different ts, the block will be merged", - dpsList: []*dataPoints{dpsTS1, dpsTS2}, - sids: []common.SeriesID{1, 2, 3}, - minTimestamp: 1, - maxTimestamp: 2, - want: []blockMetadata{ - {seriesID: 1, count: 2, uncompressedSizeBytes: 3368}, - {seriesID: 2, count: 2, uncompressedSizeBytes: 126}, - {seriesID: 3, count: 2, uncompressedSizeBytes: 64}, - }, - }, - { - name: "Test with multiple parts with same ts, duplicated blocks will be merged", - dpsList: []*dataPoints{dpsTS1, dpsTS1}, - sids: []common.SeriesID{1, 2, 3}, - minTimestamp: 1, - maxTimestamp: 2, - want: []blockMetadata{ - {seriesID: 1, count: 1, uncompressedSizeBytes: 1684}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 32}, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - t.Run("merging on the fly", func(t *testing.T) { - tmpPath, defFn := test.Space(require.New(t)) - fileSystem := fs.NewLocalFileSystem() - defer defFn() - - tst, err := newTSTable(fileSystem, tmpPath, common.Position{}, - logger.GetLogger("test"), timestamp.TimeRange{}, option{flushTimeout: 0, mergePolicy: newDefaultMergePolicyForTesting()}, nil) - require.NoError(t, err) - for i, dps := range tt.dpsList { - tst.mustAddDataPoints(dps) - timeout := time.After(flags.EventuallyTimeout) // Set the timeout duration - OUTER: - for { - select { - case <-timeout: - t.Fatalf("timeout waiting for snapshot %d to be introduced", i) - default: - snp := tst.currentSnapshot() - if snp == nil { - t.Logf("waiting for snapshot %d to be introduced", i) - time.Sleep(100 * time.Millisecond) - continue - } - if snp.creator != snapshotCreatorMemPart { - snp.decRef() - break OUTER - } - t.Logf("waiting for snapshot %d to be flushed or merged: current creator:%d, parts: %+v", - i, snp.creator, snp.parts) - snp.decRef() - time.Sleep(100 * time.Millisecond) - } - } - } - // wait until some parts are merged - if len(tt.dpsList) > 0 { - timeout := time.After(flags.EventuallyTimeout) // Set the timeout duration - OUTER1: - for { - select { - case <-timeout: - t.Fatalf("timeout waiting for snapshot to be merged") - default: - snp := tst.currentSnapshot() - if snp == nil { - time.Sleep(100 * time.Millisecond) - continue - } - if len(snp.parts) == 1 { - snp.decRef() - break OUTER1 - } - t.Logf("waiting for snapshot to be merged: current creator:%d, parts: %+v", snp.creator, snp.parts) - snp.decRef() - time.Sleep(100 * time.Millisecond) - } - } - } - verify(t, tt, tst) - }) - - t.Run("merging on close", func(t *testing.T) { - t.Skip("the test is flaky due to unpredictable merge loop schedule.") - tmpPath, defFn := test.Space(require.New(t)) - fileSystem := fs.NewLocalFileSystem() - defer defFn() - - tst, err := newTSTable(fileSystem, tmpPath, common.Position{}, - logger.GetLogger("test"), timestamp.TimeRange{}, option{flushTimeout: defaultFlushTimeout, mergePolicy: newDefaultMergePolicyForTesting()}, &metrics{}) - require.NoError(t, err) - for _, dps := range tt.dpsList { - tst.mustAddDataPoints(dps) - time.Sleep(100 * time.Millisecond) - } - // wait until the introducer is done - if len(tt.dpsList) > 0 { - for { - snp := tst.currentSnapshot() - if snp == nil { - time.Sleep(100 * time.Millisecond) - continue - } - if len(snp.parts) == len(tt.dpsList) { - snp.decRef() - tst.Close() - break - } - snp.decRef() - time.Sleep(100 * time.Millisecond) - } - } - // reopen the table - tst, err = newTSTable(fileSystem, tmpPath, common.Position{}, - logger.GetLogger("test"), timestamp.TimeRange{}, option{flushTimeout: defaultFlushTimeout, mergePolicy: newDefaultMergePolicyForTesting()}, nil) - require.NoError(t, err) - verify(t, tt, tst) - }) - }) - } - }) } var tagProjections = map[int][]model.TagProjection{ diff --git a/banyand/metadata/schema/property.go b/banyand/metadata/schema/property.go index 8c72e7db6..8c87ef029 100644 --- a/banyand/metadata/schema/property.go +++ b/banyand/metadata/schema/property.go @@ -190,7 +190,7 @@ func (e *etcdSchemaRegistry) replaceProperty(ctx context.Context, key string, pr func tagLen(property *propertyv1.Property) (uint32, error) { tagsCount := len(property.Tags) - if tagsCount < 0 || tagsCount > math.MaxUint32 { + if tagsCount < 0 || uint64(tagsCount) > math.MaxUint32 { return 0, errors.New("integer overflow: tags count exceeds uint32 range") } tagsNum := uint32(tagsCount) diff --git a/banyand/stream/part.go b/banyand/stream/part.go index b4ad9d47f..d0dce1f90 100644 --- a/banyand/stream/part.go +++ b/banyand/stream/part.go @@ -247,6 +247,13 @@ func (pw *partWrapper) ID() uint64 { return pw.p.partMetadata.ID } +func (pw *partWrapper) String() string { + if pw.mp != nil { + return fmt.Sprintf("mem part %v", pw.mp.partMetadata) + } + return fmt.Sprintf("part %v", pw.p.partMetadata) +} + func mustOpenFilePart(id uint64, root string, fileSystem fs.FileSystem) *part { var p part partPath := partPath(root, id) diff --git a/banyand/stream/tstable_test.go b/banyand/stream/tstable_test.go index 9224e5523..42e2a6eb6 100644 --- a/banyand/stream/tstable_test.go +++ b/banyand/stream/tstable_test.go @@ -31,13 +31,10 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/fs" - "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/query/model" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/test" - "github.com/apache/skywalking-banyandb/pkg/test/flags" - "github.com/apache/skywalking-banyandb/pkg/timestamp" "github.com/apache/skywalking-banyandb/pkg/watcher" ) @@ -252,167 +249,6 @@ func Test_tstIter(t *testing.T) { }) } }) - - t.Run("file snapshot", func(t *testing.T) { - tests := []testCtx{ - { - name: "Test with no data points", - esList: []*elements{}, - sids: []common.SeriesID{1, 2, 3}, - minTimestamp: 1, - maxTimestamp: 1, - }, - { - name: "Test with single part", - esList: []*elements{esTS1}, - sids: []common.SeriesID{1, 2, 3}, - minTimestamp: 1, - maxTimestamp: 1, - want: []blockMetadata{ - {seriesID: 1, count: 1, uncompressedSizeBytes: 889}, - {seriesID: 2, count: 1, uncompressedSizeBytes: 63}, - {seriesID: 3, count: 1, uncompressedSizeBytes: 16}, - }, - }, - { - name: "Test with multiple parts with different ts, the block will be merged", - esList: []*elements{esTS1, esTS2, esTS2}, - sids: []common.SeriesID{1, 2, 3}, - minTimestamp: 1, - maxTimestamp: 2, - want: []blockMetadata{ - {seriesID: 1, count: 3, uncompressedSizeBytes: 2667}, - {seriesID: 2, count: 3, uncompressedSizeBytes: 189}, - {seriesID: 3, count: 3, uncompressedSizeBytes: 48}, - }, - }, - { - name: "Test with multiple parts with same ts, duplicated blocks will be merged", - esList: []*elements{esTS1, esTS1}, - sids: []common.SeriesID{1, 2, 3}, - minTimestamp: 1, - maxTimestamp: 2, - want: []blockMetadata{ - {seriesID: 1, count: 2, uncompressedSizeBytes: 1778}, - {seriesID: 2, count: 2, uncompressedSizeBytes: 126}, - {seriesID: 3, count: 2, uncompressedSizeBytes: 32}, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - t.Run("merging on the fly", func(t *testing.T) { - tmpPath, defFn := test.Space(require.New(t)) - fileSystem := fs.NewLocalFileSystem() - defer defFn() - - tst, err := newTSTable(fileSystem, tmpPath, common.Position{}, - logger.GetLogger("test"), timestamp.TimeRange{}, option{flushTimeout: 0, elementIndexFlushTimeout: 0, mergePolicy: newDefaultMergePolicyForTesting()}, nil) - require.NoError(t, err) - for i, es := range tt.esList { - tst.mustAddElements(es) - timeout := time.After(flags.EventuallyTimeout) // Set the timeout duration - OUTER: - for { - select { - case <-timeout: - t.Fatalf("timeout waiting for snapshot %d to be introduced", i) - default: - snp := tst.currentSnapshot() - if snp == nil { - t.Logf("waiting for snapshot %d to be introduced", i) - time.Sleep(100 * time.Millisecond) - continue - } - if snp.creator != snapshotCreatorMemPart { - snp.decRef() - break OUTER - } - t.Logf("waiting for snapshot %d to be flushed or merged: current creator:%d, parts: %+v", - i, snp.creator, snp.parts) - snp.decRef() - time.Sleep(100 * time.Millisecond) - } - } - } - // wait until some parts are merged - if len(tt.esList) > 0 { - timeout := time.After(flags.EventuallyTimeout) // Set the timeout duration - OUTER1: - for { - select { - case <-timeout: - t.Fatalf("timeout waiting for snapshot to be merged") - default: - snp := tst.currentSnapshot() - if snp == nil { - time.Sleep(100 * time.Millisecond) - continue - } - if len(snp.parts) == 1 || len(snp.parts) < len(tt.esList) { - snp.decRef() - break OUTER1 - } - t.Logf("waiting for snapshot to be merged: current creator:%d, parts: %+v", snp.creator, snp.parts) - snp.decRef() - time.Sleep(100 * time.Millisecond) - } - } - } - verify(t, tt, tst) - }) - - t.Run("merging on close", func(t *testing.T) { - t.Skip("the test is flaky due to unpredictable merge loop schedule.") - tmpPath, defFn := test.Space(require.New(t)) - fileSystem := fs.NewLocalFileSystem() - defer defFn() - - tst, err := newTSTable(fileSystem, tmpPath, common.Position{}, - logger.GetLogger("test"), timestamp.TimeRange{}, - option{ - flushTimeout: defaultFlushTimeout, - elementIndexFlushTimeout: defaultFlushTimeout, - mergePolicy: newDefaultMergePolicyForTesting(), - }, nil) - require.NoError(t, err) - for _, es := range tt.esList { - tst.mustAddElements(es) - time.Sleep(100 * time.Millisecond) - } - // wait until the introducer is done - if len(tt.esList) > 0 { - for { - snp := tst.currentSnapshot() - if snp == nil { - time.Sleep(100 * time.Millisecond) - continue - } - if len(snp.parts) == len(tt.esList) { - snp.decRef() - tst.Close() - break - } - snp.decRef() - time.Sleep(100 * time.Millisecond) - } - } else { - tst.Close() - } - // reopen the table - tst, err = newTSTable(fileSystem, tmpPath, common.Position{}, - logger.GetLogger("test"), timestamp.TimeRange{}, - option{ - flushTimeout: defaultFlushTimeout, - elementIndexFlushTimeout: defaultFlushTimeout, - mergePolicy: newDefaultMergePolicyForTesting(), - }, nil) - require.NoError(t, err) - verify(t, tt, tst) - }) - }) - } - }) } var tagProjectionAll = []model.TagProjection{ diff --git a/docs/release.md b/docs/release.md index 4201b70e4..561c9696e 100644 --- a/docs/release.md +++ b/docs/release.md @@ -29,16 +29,12 @@ The `skywalking-banyandb-${VERSION}-bin.tgz`, `skywalking-banyandb-${VERSION}-sr ## Upload to Apache svn ```shell -svn co https://dist.apache.org/repos/dist/dev/skywalking/ -mkdir -p skywalking/banyandb/"$VERSION" -cp skywalking-banyandb/build/skywalking-banyandb*.tgz skywalking/banyandb/"$VERSION" -cp skywalking-banyandb/build/skywalking-banyandb*.tgz.asc skywalking/banyandb/"$VERSION" -cp skywalking-banyandb/build/skywalking-banyandb*.tgz.sha512 skywalking/banyandb/"$VERSION" - -cd skywalking/banyandb && svn add "$VERSION" && svn commit -m "Draft Apache SkyWalking BanyanDB release $VERSION" +make release-push-candidate ``` -## Call for vote in dev@ mailing list +The script will upload the source code package and binary package to the Apache SVN staging repository. The script will ask for your Apache ID and password. **You can do this only if you are a PMC member**. + +After the script is finished, you will get the vote mail subject and content based on the following template. Call for vote in `dev@skywalking.apache.org` diff --git a/go.mod b/go.mod index 2c79c9aa0..dde09b8f6 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/apache/skywalking-banyandb -go 1.23.0 +go 1.23 toolchain go1.23.1 diff --git a/pkg/index/inverted/metrics.go b/pkg/index/inverted/metrics.go index 6816b90e8..98b6d7a3d 100644 --- a/pkg/index/inverted/metrics.go +++ b/pkg/index/inverted/metrics.go @@ -112,6 +112,7 @@ func (s *store) CollectMetrics(labelValues ...string) { if s.metrics == nil { return } + // fixme: data race here status := s.writer.Status() s.metrics.totalUpdates.Set(float64(status.TotUpdates), labelValues...) s.metrics.totalDeletes.Set(float64(status.TotDeletes), labelValues...) diff --git a/scripts/build/build.mk b/scripts/build/build.mk index 647f0a89d..899dfef0c 100644 --- a/scripts/build/build.mk +++ b/scripts/build/build.mk @@ -86,7 +86,7 @@ clean-build: ## Clean all artifacts # be rebuilt again. $(BUILD_LOCK): @echo "cleaning up stale build artifacts..." - $(MAKE) clean + $(MAKE) clean-build mkdir -p $(BUILD_DIR) touch $@ diff --git a/scripts/push-release.sh b/scripts/push-release.sh new file mode 100755 index 000000000..0a0ed328e --- /dev/null +++ b/scripts/push-release.sh @@ -0,0 +1,95 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +set -ex + +if [ "$VERSION" == "" ]; then + echo "VERSION environment variable not found, Please setting the VERSION." + echo "For example: export VERSION=1.0.0" + exit 1 +fi + +VERSION=${VERSION} +TAG_NAME=v${VERSION} +PRODUCT_NAME="skywalking-banyandb-${VERSION}" + +echo "Release version "${VERSION} +echo "Source tag "${TAG_NAME} + +SCRIPTDIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +ROOTDIR=${SCRIPTDIR}/.. +BUILDDIR=${ROOTDIR}/build + +pushd ${BUILDDIR} +trap 'popd' EXIT + +rm -rf skywalking + +svn co https://dist.apache.org/repos/dist/dev/skywalking/ +mkdir -p skywalking/banyandb/"$VERSION" +cp ${PRODUCT_NAME}-*.tgz skywalking/banyandb/"$VERSION" +cp ${PRODUCT_NAME}-*.tgz.asc skywalking/banyandb/"$VERSION" +cp ${PRODUCT_NAME}-*.tgz.sha512 skywalking/banyandb/"$VERSION" + +cd skywalking/banyandb && svn add "$VERSION" && svn commit -m "Draft Apache SkyWalking BanyanDB release $VERSION" + +cat << EOF +========================================================================= +Subject: [VOTE] Release Apache SkyWalking BanyanDB version $VERSION + +Content: + +Hi the SkyWalking Community: +This is a call for vote to release Apache SkyWalking BanyanDB version $VERSION. + +Release notes: + + * https://github.com/apache/skywalking-banyandb/blob/v$VERSION/CHANGES.md + +Release Candidate: + + * https://dist.apache.org/repos/dist/dev/skywalking/banyandb/$VERSION + * sha512 checksums + - $(cat ${PRODUCT_NAME}-src.tgz.sha512) + - $(cat ${PRODUCT_NAME}-banyand.tgz.sha512) + - $(cat ${PRODUCT_NAME}-bydbctl.tgz.sha512) + +Release Tag : + + * (Git Tag) $TAG_NAME + +Release Commit Hash : + + * https://github.com/apache/skywalking-banyandb/tree/$(git rev-list -n 1 "$TAG_NAME") + +Keys to verify the Release Candidate : + + * https://dist.apache.org/repos/dist/release/skywalking/KEYS + +Guide to build the release from source : + + * https://github.com/apache/skywalking-banyandb/blob/v$VERSION/docs/installation/binaries.md#Build-From-Source + +Voting will start now and will remain open for at least 72 hours, all PMC members are required to give their votes. + +[ ] +1 Release this package. +[ ] +0 No opinion. +[ ] -1 Do not release this package because.... + +Thanks. + +[1] https://github.com/apache/skywalking/blob/master/docs/en/guides/How-to-release.md#vote-check +EOF diff --git a/scripts/release.sh b/scripts/release.sh index 8e16d117c..ab619fa48 100755 --- a/scripts/release.sh +++ b/scripts/release.sh @@ -24,7 +24,8 @@ BUILDDIR=${ROOTDIR}/build RELEASE_TAG=$(git describe --tags $(git rev-list --tags --max-count=1)) RELEASE_VERSION=${RELEASE_TAG#"v"} -SOURCE_FILE=${BUILDDIR}/skywalking-banyandb-${RELEASE_VERSION}-src.tgz +SOURCE_FILE_NAME=skywalking-banyandb-${RELEASE_VERSION}-src.tgz +SOURCE_FILE=${BUILDDIR}/${SOURCE_FILE_NAME} binary(){ if [ ! -f "${SOURCE_FILE}" ]; then @@ -34,8 +35,9 @@ binary(){ tmpdir=`mktemp -d` trap "rm -rf ${tmpdir}" EXIT pushd ${tmpdir} + trap 'popd' EXIT tar -xvf ${SOURCE_FILE} - make generate + make generate && make -C ui build TARGET_OS=linux PLATFORMS=linux/amd64,linux/arm64 make -C banyand release bindir=./build mkdir -p ${bindir}/bin @@ -57,7 +59,6 @@ binary(){ copy_binaries bydbctl # Package tar -czf ${BUILDDIR}/skywalking-banyandb-${RELEASE_VERSION}-bydbctl.tgz -C ${bindir} . - popd } copy_binaries() { @@ -73,7 +74,8 @@ copy_binaries() { source(){ # Package - mkdir -p ${BUILDDIR} + tmpdir=`mktemp -d` + trap "rm -rf ${tmpdir}" EXIT rm -rf ${SOURCE_FILE} pushd ${ROOTDIR} echo "RELEASE_VERSION=${RELEASE_VERSION}" > .env @@ -85,8 +87,11 @@ source(){ --exclude=".idea" \ --exclude=".vscode" \ --exclude="bin" \ - -czf ${SOURCE_FILE} \ + -czf ${tmpdir}/${SOURCE_FILE_NAME} \ . + + mkdir -p ${BUILDDIR} + mv ${tmpdir}/${SOURCE_FILE_NAME} ${BUILDDIR} popd }