Skip to content

Commit

Permalink
Merge branch 'main' into concept-fs
Browse files Browse the repository at this point in the history
  • Loading branch information
sollhui committed Jul 8, 2023
2 parents 3a77d64 + c1d3271 commit e00fe4e
Show file tree
Hide file tree
Showing 33 changed files with 1,458 additions and 683 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ Release Notes.

- List all properties in a group.

### Chores

- Bump several dependencies and tools.

## 0.4.0

### Features
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ include scripts/build/ginkgo.mk
test-ci: $(GINKGO) ## Run the unit tests in CI
$(GINKGO) --race \
-ldflags \
"-X github.com/apache/skywalking-banyandb/pkg/test/flags.eventuallyTimeout=30s -X github.com/apache/skywalking-banyandb/pkg/test/flags.LogLevel=info" \
"-X github.com/apache/skywalking-banyandb/pkg/test/flags.eventuallyTimeout=30s -X github.com/apache/skywalking-banyandb/pkg/test/flags.LogLevel=error" \
$(TEST_CI_OPTS) \
./...

Expand Down
64 changes: 16 additions & 48 deletions api/proto/banyandb/database/v1/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ import "protoc-gen-openapiv2/options/annotations.proto";

option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1";
option java_package = "org.apache.skywalking.banyandb.database.v1";
option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_swagger) = {
base_path: "/api"
};
option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_swagger) = {base_path: "/api"};

message StreamRegistryServiceCreateRequest {
banyandb.database.v1.Stream stream = 1;
Expand Down Expand Up @@ -91,21 +89,15 @@ service StreamRegistryService {
}

rpc Delete(StreamRegistryServiceDeleteRequest) returns (StreamRegistryServiceDeleteResponse) {
option (google.api.http) = {
delete: "/v1/stream/schema/{metadata.group}/{metadata.name}"
};
option (google.api.http) = {delete: "/v1/stream/schema/{metadata.group}/{metadata.name}"};
}

rpc Get(StreamRegistryServiceGetRequest) returns (StreamRegistryServiceGetResponse) {
option (google.api.http) = {
get: "/v1/stream/schema/{metadata.group}/{metadata.name}"
};
option (google.api.http) = {get: "/v1/stream/schema/{metadata.group}/{metadata.name}"};
}

rpc List(StreamRegistryServiceListRequest) returns (StreamRegistryServiceListResponse) {
option (google.api.http) = {
get: "/v1/stream/schema/lists/{group}"
};
option (google.api.http) = {get: "/v1/stream/schema/lists/{group}"};
}

// Exist doesn't expose an HTTP endpoint. Please use HEAD method to touch Get instead
Expand Down Expand Up @@ -173,21 +165,15 @@ service IndexRuleBindingRegistryService {
}

rpc Delete(IndexRuleBindingRegistryServiceDeleteRequest) returns (IndexRuleBindingRegistryServiceDeleteResponse) {
option (google.api.http) = {
delete: "/v1/index-rule-binding/schema/{metadata.group}/{metadata.name}"
};
option (google.api.http) = {delete: "/v1/index-rule-binding/schema/{metadata.group}/{metadata.name}"};
}

rpc Get(IndexRuleBindingRegistryServiceGetRequest) returns (IndexRuleBindingRegistryServiceGetResponse) {
option (google.api.http) = {
get: "/v1/index-rule-binding/schema/{metadata.group}/{metadata.name}"
};
option (google.api.http) = {get: "/v1/index-rule-binding/schema/{metadata.group}/{metadata.name}"};
}

rpc List(IndexRuleBindingRegistryServiceListRequest) returns (IndexRuleBindingRegistryServiceListResponse) {
option (google.api.http) = {
get: "/v1/index-rule-binding/schema/lists/{group}"
};
option (google.api.http) = {get: "/v1/index-rule-binding/schema/lists/{group}"};
}

// Exist doesn't expose an HTTP endpoint. Please use HEAD method to touch Get instead
Expand Down Expand Up @@ -255,21 +241,15 @@ service IndexRuleRegistryService {
}

rpc Delete(IndexRuleRegistryServiceDeleteRequest) returns (IndexRuleRegistryServiceDeleteResponse) {
option (google.api.http) = {
delete: "/v1/index-rule/schema/{metadata.group}/{metadata.name}"
};
option (google.api.http) = {delete: "/v1/index-rule/schema/{metadata.group}/{metadata.name}"};
}

rpc Get(IndexRuleRegistryServiceGetRequest) returns (IndexRuleRegistryServiceGetResponse) {
option (google.api.http) = {
get: "/v1/index-rule/schema/{metadata.group}/{metadata.name}"
};
option (google.api.http) = {get: "/v1/index-rule/schema/{metadata.group}/{metadata.name}"};
}

rpc List(IndexRuleRegistryServiceListRequest) returns (IndexRuleRegistryServiceListResponse) {
option (google.api.http) = {
get: "/v1/index-rule/schema/lists/{group}"
};
option (google.api.http) = {get: "/v1/index-rule/schema/lists/{group}"};
}

// Exist doesn't expose an HTTP endpoint. Please use HEAD method to touch Get instead
Expand Down Expand Up @@ -337,21 +317,15 @@ service MeasureRegistryService {
}

rpc Delete(MeasureRegistryServiceDeleteRequest) returns (MeasureRegistryServiceDeleteResponse) {
option (google.api.http) = {
delete: "/v1/measure/schema/{metadata.group}/{metadata.name}"
};
option (google.api.http) = {delete: "/v1/measure/schema/{metadata.group}/{metadata.name}"};
}

rpc Get(MeasureRegistryServiceGetRequest) returns (MeasureRegistryServiceGetResponse) {
option (google.api.http) = {
get: "/v1/measure/schema/{metadata.group}/{metadata.name}"
};
option (google.api.http) = {get: "/v1/measure/schema/{metadata.group}/{metadata.name}"};
}

rpc List(MeasureRegistryServiceListRequest) returns (MeasureRegistryServiceListResponse) {
option (google.api.http) = {
get: "/v1/measure/schema/lists/{group}"
};
option (google.api.http) = {get: "/v1/measure/schema/lists/{group}"};
}

// Exist doesn't expose an HTTP endpoint. Please use HEAD method to touch Get instead
Expand Down Expand Up @@ -416,21 +390,15 @@ service GroupRegistryService {
}

rpc Delete(GroupRegistryServiceDeleteRequest) returns (GroupRegistryServiceDeleteResponse) {
option (google.api.http) = {
delete: "/v1/group/schema/{group}"
};
option (google.api.http) = {delete: "/v1/group/schema/{group}"};
}

rpc Get(GroupRegistryServiceGetRequest) returns (GroupRegistryServiceGetResponse) {
option (google.api.http) = {
get: "/v1/group/schema/{group}"
};
option (google.api.http) = {get: "/v1/group/schema/{group}"};
}

rpc List(GroupRegistryServiceListRequest) returns (GroupRegistryServiceListResponse) {
option (google.api.http) = {
get: "/v1/group/schema/lists"
};
option (google.api.http) = {get: "/v1/group/schema/lists"};
}

// Exist doesn't expose an HTTP endpoint. Please use HEAD method to touch Get instead
Expand Down
4 changes: 1 addition & 3 deletions api/proto/banyandb/measure/v1/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ import "protoc-gen-openapiv2/options/annotations.proto";

option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1";
option java_package = "org.apache.skywalking.banyandb.measure.v1";
option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_swagger) = {
base_path: "/api"
};
option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_swagger) = {base_path: "/api"};

service MeasureService {
rpc Query(banyandb.measure.v1.QueryRequest) returns (banyandb.measure.v1.QueryResponse) {
Expand Down
16 changes: 4 additions & 12 deletions api/proto/banyandb/property/v1/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ import "validate/validate.proto";

option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1";
option java_package = "org.apache.skywalking.banyandb.property.v1";
option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_swagger) = {
base_path: "/api"
};
option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_swagger) = {base_path: "/api"};

message ApplyRequest {
banyandb.property.v1.Property property = 1 [(validate.rules).message.required = true];
Expand Down Expand Up @@ -88,23 +86,17 @@ service PropertyService {
}

rpc Delete(DeleteRequest) returns (DeleteResponse) {
option (google.api.http) = {
delete: "/v1/property/{metadata.container.group}/{metadata.container.name}/{metadata.id}/{tags}"
};
option (google.api.http) = {delete: "/v1/property/{metadata.container.group}/{metadata.container.name}/{metadata.id}/{tags}"};
}

rpc Get(GetRequest) returns (GetResponse) {
option (google.api.http) = {
get: "/v1/property/{metadata.container.group}/{metadata.container.name}/{metadata.id}/{tags}"
};
option (google.api.http) = {get: "/v1/property/{metadata.container.group}/{metadata.container.name}/{metadata.id}/{tags}"};
}

rpc List(ListRequest) returns (ListResponse) {
option (google.api.http) = {
get: "/v1/property/lists/{container.group}/{container.name}/{ids}/{tags}"
additional_bindings {
get: "/v1/property/lists/{container.group}"
}
additional_bindings {get: "/v1/property/lists/{container.group}"}
};
}
}
4 changes: 1 addition & 3 deletions api/proto/banyandb/stream/v1/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ import "protoc-gen-openapiv2/options/annotations.proto";

option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1";
option java_package = "org.apache.skywalking.banyandb.stream.v1";
option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_swagger) = {
base_path: "/api"
};
option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_swagger) = {base_path: "/api"};

service StreamService {
rpc Query(banyandb.stream.v1.QueryRequest) returns (banyandb.stream.v1.QueryResponse) {
Expand Down
2 changes: 1 addition & 1 deletion banyand/measure/measure_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (
"context"
"testing"

"github.com/golang/mock/gomock"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
"go.uber.org/mock/gomock"

"github.com/apache/skywalking-banyandb/api/event"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
Expand Down
6 changes: 3 additions & 3 deletions banyand/metadata/schema/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func NewEtcdSchemaRegistry(options ...RegistryOption) (Registry, error) {
}

config := clientv3.Config{
Endpoints: []string{e.Config().ACUrls[0].String()},
Endpoints: []string{e.Config().AdvertiseClientUrls[0].String()},
DialTimeout: 5 * time.Second,
DialKeepAliveTime: 30 * time.Second,
DialKeepAliveTimeout: 10 * time.Second,
Expand Down Expand Up @@ -407,8 +407,8 @@ func newStandaloneEtcdConfig(config *etcdSchemaRegistryConfig, logger *zap.Logge
pURL, _ := url.Parse(config.listenerPeerURL)

cfg.ClusterState = "new"
cfg.LCUrls, cfg.ACUrls = []url.URL{*cURL}, []url.URL{*cURL}
cfg.LPUrls, cfg.APUrls = []url.URL{*pURL}, []url.URL{*pURL}
cfg.ListenClientUrls, cfg.AdvertiseClientUrls = []url.URL{*cURL}, []url.URL{*cURL}
cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = []url.URL{*pURL}, []url.URL{*pURL}
cfg.InitialCluster = ",default=" + pURL.String()

cfg.BackendBatchInterval = 500 * time.Millisecond
Expand Down
2 changes: 1 addition & 1 deletion banyand/stream/stream_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (
"context"
"testing"

"github.com/golang/mock/gomock"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"go.uber.org/mock/gomock"

"github.com/apache/skywalking-banyandb/api/event"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
Expand Down
24 changes: 12 additions & 12 deletions banyand/tsdb/bucket/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"sync"
"time"

"github.com/hashicorp/golang-lru/simplelru"
"github.com/hashicorp/golang-lru/v2/simplelru"
"github.com/robfig/cron/v3"

"github.com/apache/skywalking-banyandb/pkg/logger"
Expand All @@ -45,7 +45,7 @@ type Queue interface {
Remove(id fmt.Stringer)
Len() int
Volume() int
All() []interface{}
All() []fmt.Stringer
}

const (
Expand All @@ -59,9 +59,9 @@ const (
var errInvalidSize = errors.New("invalid size")

type lruQueue struct {
recent simplelru.LRUCache
frequent simplelru.LRUCache
recentEvict simplelru.LRUCache
recent simplelru.LRUCache[fmt.Stringer, any]
frequent simplelru.LRUCache[fmt.Stringer, any]
recentEvict simplelru.LRUCache[fmt.Stringer, any]
l *logger.Logger
evictFn EvictFn
size int
Expand All @@ -79,15 +79,15 @@ func NewQueue(l *logger.Logger, size int, maxSize int, scheduler *timestamp.Sche
recentSize := int(float64(size) * defaultRecentRatio)
evictSize := maxSize - size

recent, err := simplelru.NewLRU(size, nil)
recent, err := simplelru.NewLRU[fmt.Stringer, any](size, nil)
if err != nil {
return nil, err
}
frequent, err := simplelru.NewLRU(size, nil)
frequent, err := simplelru.NewLRU[fmt.Stringer, any](size, nil)
if err != nil {
return nil, err
}
recentEvict, err := simplelru.NewLRU(evictSize, nil)
recentEvict, err := simplelru.NewLRU[fmt.Stringer, any](evictSize, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -201,10 +201,10 @@ func (q *lruQueue) Volume() int {
return q.size + q.recentSize + q.evictSize
}

func (q *lruQueue) All() []interface{} {
func (q *lruQueue) All() []fmt.Stringer {
q.lock.RLock()
defer q.lock.RUnlock()
all := make([]interface{}, q.recent.Len()+q.frequent.Len()+q.recentEvict.Len())
all := make([]fmt.Stringer, q.recent.Len()+q.frequent.Len()+q.recentEvict.Len())
copy(all, q.recent.Keys())
copy(all[q.recent.Len():], q.frequent.Keys())
copy(all[q.recent.Len()+q.frequent.Len():], q.recentEvict.Keys())
Expand Down Expand Up @@ -237,7 +237,7 @@ func (q *lruQueue) ensureSpace(ctx context.Context, recentEvict bool) error {
return q.removeOldest(ctx, q.frequent)
}

func (q *lruQueue) addLst(ctx context.Context, lst simplelru.LRUCache, size int, id interface{}) error {
func (q *lruQueue) addLst(ctx context.Context, lst simplelru.LRUCache[fmt.Stringer, any], size int, id fmt.Stringer) error {
if lst.Len() < size {
lst.Add(id, nil)
return nil
Expand All @@ -249,7 +249,7 @@ func (q *lruQueue) addLst(ctx context.Context, lst simplelru.LRUCache, size int,
return nil
}

func (q *lruQueue) removeOldest(ctx context.Context, lst simplelru.LRUCache) error {
func (q *lruQueue) removeOldest(ctx context.Context, lst simplelru.LRUCache[fmt.Stringer, any]) error {
oldestID, _, ok := lst.GetOldest()
if ok && q.evictFn != nil {
if err := q.evictFn(ctx, oldestID); err != nil {
Expand Down
Loading

0 comments on commit e00fe4e

Please sign in to comment.