Skip to content

Commit

Permalink
Merge branch 'main' into wal
Browse files Browse the repository at this point in the history
  • Loading branch information
wu-sheng authored Jun 25, 2023
2 parents 19583a5 + 25fb9cb commit 2a55c01
Show file tree
Hide file tree
Showing 100 changed files with 109,438 additions and 628 deletions.
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@
*.jpg binary
*.jpeg binary
*.ico binary
*.gz binary

go.sum merge=union
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ Release Notes.
- Add units to memory flags
- Introduce TSTable to customize the block's structure
- Add `/system` endpoint to the monitoring server that displays a list of nodes' system information.
- Enhance the `liaison` module by implementing access logging.
- Add the Istio scenario stress test based on the data generated by the integration access log.
- Generalize the index's docID to uint64.
- Remove redundant ID tag type.

### Bugs

Expand Down
1 change: 0 additions & 1 deletion api/proto/banyandb/database/v1/schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ enum TagType {
TAG_TYPE_STRING_ARRAY = 3;
TAG_TYPE_INT_ARRAY = 4;
TAG_TYPE_DATA_BINARY = 5;
TAG_TYPE_ID = 6;
}

message TagFamilySpec {
Expand Down
5 changes: 0 additions & 5 deletions api/proto/banyandb/model/v1/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ import "google/protobuf/struct.proto";
option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1";
option java_package = "org.apache.skywalking.banyandb.model.v1";

message ID {
string value = 1;
}

message Str {
string value = 1;
}
Expand Down Expand Up @@ -56,7 +52,6 @@ message TagValue {
Int int = 4;
IntArray int_array = 5;
bytes binary_data = 6;
ID id = 7;
}
}

Expand Down
21 changes: 20 additions & 1 deletion banyand/liaison/grpc/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/apache/skywalking-banyandb/api/data"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/accesslog"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
Expand All @@ -38,13 +39,22 @@ import (
type measureService struct {
measurev1.UnimplementedMeasureServiceServer
*discoveryService
sampled *logger.Logger
sampled *logger.Logger
ingestionAccessLog accesslog.Log
}

func (ms *measureService) setLogger(log *logger.Logger) {
ms.sampled = log.Sampled(10)
}

func (ms *measureService) activeIngestionAccessLog(root string) (err error) {
if ms.ingestionAccessLog, err = accesslog.
NewFileLog(root, "measure-ingest-%s", 10*time.Minute, ms.log); err != nil {
return err
}
return nil
}

func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) error {
reply := func(measure measurev1.MeasureService_WriteServer, logger *logger.Logger) {
if errResp := measure.Send(&measurev1.WriteResponse{}); errResp != nil {
Expand Down Expand Up @@ -78,6 +88,11 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er
reply(measure, ms.sampled)
continue
}
if ms.ingestionAccessLog != nil {
if errAccessLog := ms.ingestionAccessLog.Write(writeRequest); errAccessLog != nil {
ms.sampled.Error().Err(errAccessLog).RawJSON("written", logger.Proto(writeRequest)).Msg("failed to write access log")
}
}
iwr := &measurev1.InternalWriteRequest{
Request: writeRequest,
ShardId: uint32(shardID),
Expand Down Expand Up @@ -144,3 +159,7 @@ func (ms *measureService) TopN(_ context.Context, topNRequest *measurev1.TopNReq
}
return nil, nil
}

func (ms *measureService) Close() error {
return ms.ingestionAccessLog.Close()
}
81 changes: 56 additions & 25 deletions banyand/liaison/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,46 +51,52 @@ import (
const defaultRecvSize = 10 << 20

var (
errServerCert = errors.New("invalid server cert file")
errServerKey = errors.New("invalid server key file")
errNoAddr = errors.New("no address")
errQueryMsg = errors.New("invalid query message")
errServerCert = errors.New("invalid server cert file")
errServerKey = errors.New("invalid server key file")
errNoAddr = errors.New("no address")
errQueryMsg = errors.New("invalid query message")
errAccessLogRootPath = errors.New("access log root path is required")
)

type server struct {
pipeline queue.Queue
creds credentials.TransportCredentials
repo discovery.ServiceRepo
stopCh chan struct{}
*measureRegistryServer
log *logger.Logger
ser *grpclib.Server
*indexRuleRegistryServer
measureSVC *measureService
log *logger.Logger
ser *grpclib.Server
*propertyServer
*topNAggregationRegistryServer
*groupRegistryServer
*indexRuleRegistryServer
streamSVC *streamService
measureSVC *measureService
stopCh chan struct{}
streamSVC *streamService
*measureRegistryServer
*streamRegistryServer
*indexRuleBindingRegistryServer
addr string
keyFile string
certFile string
maxRecvMsgSize run.Bytes
tls bool
addr string
keyFile string
certFile string
accessLogRootPath string
accessLogRecorders []accessLogRecorder
maxRecvMsgSize run.Bytes
tls bool
enableIngestionAccessLog bool
}

// NewServer returns a new gRPC server.
func NewServer(_ context.Context, pipeline queue.Queue, repo discovery.ServiceRepo, schemaRegistry metadata.Service) run.Unit {
return &server{
pipeline: pipeline,
repo: repo,
streamSVC: &streamService{
discoveryService: newDiscoveryService(pipeline),
},
measureSVC: &measureService{
discoveryService: newDiscoveryService(pipeline),
},
streamSVC := &streamService{
discoveryService: newDiscoveryService(pipeline),
}
measureSVC := &measureService{
discoveryService: newDiscoveryService(pipeline),
}
s := &server{
pipeline: pipeline,
repo: repo,
streamSVC: streamSVC,
measureSVC: measureSVC,
streamRegistryServer: &streamRegistryServer{
schemaRegistry: schemaRegistry,
},
Expand All @@ -113,6 +119,8 @@ func NewServer(_ context.Context, pipeline queue.Queue, repo discovery.ServiceRe
schemaRegistry: schemaRegistry,
},
}
s.accessLogRecorders = []accessLogRecorder{streamSVC, measureSVC}
return s
}

func (s *server) PreRun() error {
Expand Down Expand Up @@ -146,6 +154,14 @@ func (s *server) PreRun() error {
return err
}
}

if s.enableIngestionAccessLog {
for _, alr := range s.accessLogRecorders {
if err := alr.activeIngestionAccessLog(s.accessLogRootPath); err != nil {
return err
}
}
}
return nil
}

Expand All @@ -161,13 +177,18 @@ func (s *server) FlagSet() *run.FlagSet {
fs.StringVarP(&s.certFile, "cert-file", "", "", "the TLS cert file")
fs.StringVarP(&s.keyFile, "key-file", "", "", "the TLS key file")
fs.StringVarP(&s.addr, "addr", "", ":17912", "the address of banyand listens")
fs.BoolVarP(&s.enableIngestionAccessLog, "enable-ingestion-access-log", "", false, "enable ingestion access log")
fs.StringVarP(&s.accessLogRootPath, "access-log-root-path", "", "", "access log root path")
return fs
}

func (s *server) Validate() error {
if s.addr == "" {
return errNoAddr
}
if s.enableIngestionAccessLog && s.accessLogRootPath == "" {
return errAccessLogRootPath
}
observability.UpdateAddress("grpc", s.addr)
if !s.tls {
return nil
Expand Down Expand Up @@ -254,6 +275,11 @@ func (s *server) GracefulStop() {
stopped := make(chan struct{})
go func() {
s.ser.GracefulStop()
if s.enableIngestionAccessLog {
for _, alr := range s.accessLogRecorders {
_ = alr.Close()
}
}
close(stopped)
}()

Expand All @@ -267,3 +293,8 @@ func (s *server) GracefulStop() {
s.log.Info().Msg("stopped gracefully")
}
}

type accessLogRecorder interface {
activeIngestionAccessLog(root string) error
Close() error
}
21 changes: 20 additions & 1 deletion banyand/liaison/grpc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/apache/skywalking-banyandb/api/data"
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/accesslog"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
Expand All @@ -38,13 +39,22 @@ import (
type streamService struct {
streamv1.UnimplementedStreamServiceServer
*discoveryService
sampled *logger.Logger
sampled *logger.Logger
ingestionAccessLog accesslog.Log
}

func (s *streamService) setLogger(log *logger.Logger) {
s.sampled = log.Sampled(10)
}

func (s *streamService) activeIngestionAccessLog(root string) (err error) {
if s.ingestionAccessLog, err = accesslog.
NewFileLog(root, "stream-ingest-%s", 10*time.Minute, s.log); err != nil {
return err
}
return nil
}

func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
reply := func(stream streamv1.StreamService_WriteServer, logger *logger.Logger) {
if errResp := stream.Send(&streamv1.WriteResponse{}); errResp != nil {
Expand Down Expand Up @@ -78,6 +88,11 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
reply(stream, s.sampled)
continue
}
if s.ingestionAccessLog != nil {
if errAccessLog := s.ingestionAccessLog.Write(writeEntity); errAccessLog != nil {
s.sampled.Error().Err(errAccessLog).Msg("failed to write ingestion access log")
}
}
iwr := &streamv1.InternalWriteRequest{
Request: writeEntity,
ShardId: uint32(shardID),
Expand Down Expand Up @@ -126,3 +141,7 @@ func (s *streamService) Query(_ context.Context, req *streamv1.QueryRequest) (*s
}
return nil, nil
}

func (s *streamService) Close() error {
return s.ingestionAccessLog.Close()
}
6 changes: 2 additions & 4 deletions banyand/measure/measure_topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket strin
Tags: append([]*modelv1.TagValue{
// MeasureID
{
Value: &modelv1.TagValue_Id{
Id: &modelv1.ID{
Value: &modelv1.TagValue_Str{
Str: &modelv1.Str{
Value: measureID,
},
},
Expand Down Expand Up @@ -484,8 +484,6 @@ func stringify(tagValue *modelv1.TagValue) string {
switch v := tagValue.GetValue().(type) {
case *modelv1.TagValue_Str:
return v.Str.GetValue()
case *modelv1.TagValue_Id:
return v.Id.GetValue()
case *modelv1.TagValue_Int:
return strconv.FormatInt(v.Int.GetValue(), 10)
case *modelv1.TagValue_BinaryData:
Expand Down
2 changes: 1 addition & 1 deletion banyand/measure/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func createOrUpdateTopNMeasure(measureSchemaRegistry schema.Measure, topNSchema
Tags: append([]*databasev1.TagSpec{
{
Name: "measure_id",
Type: databasev1.TagType_TAG_TYPE_ID,
Type: databasev1.TagType_TAG_TYPE_STRING,
},
}, seriesSpecs...),
},
Expand Down
4 changes: 0 additions & 4 deletions banyand/metadata/schema/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,3 @@ func BadRequest(field, desc string) error {
st, _ := statusGRPCInvalidArgument.WithDetails(br)
return st.Err()
}

func isNotFound(err error) bool {
return errors.Is(err, ErrGRPCResourceNotFound)
}
4 changes: 4 additions & 0 deletions banyand/metadata/schema/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,5 +410,9 @@ func newStandaloneEtcdConfig(config *etcdSchemaRegistryConfig, logger *zap.Logge
cfg.LCUrls, cfg.ACUrls = []url.URL{*cURL}, []url.URL{*cURL}
cfg.LPUrls, cfg.APUrls = []url.URL{*pURL}, []url.URL{*pURL}
cfg.InitialCluster = ",default=" + pURL.String()

cfg.BackendBatchInterval = 500 * time.Millisecond
cfg.BackendBatchLimit = 10000
cfg.MaxRequestBytes = 10 * 1024 * 1024
return cfg
}
Loading

0 comments on commit 2a55c01

Please sign in to comment.