From ac086ede84dfa1e122ea58ee4d3e5005ff733b69 Mon Sep 17 00:00:00 2001 From: humingcheng Date: Tue, 8 Oct 2024 18:12:00 +0800 Subject: [PATCH] Syncer supports to enable rbac (#1491) --- .github/workflows/eventbase-ci.yml | 4 +- .github/workflows/static_check.yml | 2 +- docker-compose.yml | 2 +- etc/conf/syncer.yaml | 2 + scripts/integration_test.sh | 2 +- scripts/ut_test_in_docker.sh | 4 +- syncer/config/config.go | 13 ++- syncer/rpc/auth.go | 63 ++++++++++++ syncer/rpc/auth_test.go | 130 ++++++++++++++++++++++++ syncer/rpc/server.go | 38 ++++++- syncer/rpc/server_test.go | 85 ++++++++++++++++ syncer/server/server.go | 14 ++- syncer/service/admin/health.go | 55 ++++++---- syncer/service/replicator/replicator.go | 33 ++++-- 14 files changed, 405 insertions(+), 42 deletions(-) create mode 100644 syncer/rpc/auth.go create mode 100644 syncer/rpc/auth_test.go create mode 100644 syncer/rpc/server_test.go diff --git a/.github/workflows/eventbase-ci.yml b/.github/workflows/eventbase-ci.yml index b0918b7a3..87dfd75c3 100644 --- a/.github/workflows/eventbase-ci.yml +++ b/.github/workflows/eventbase-ci.yml @@ -13,7 +13,7 @@ jobs: uses: actions/checkout@v1 - name: UT test run: | - sudo docker-compose -f ./scripts/docker-compose.yaml up -d + sudo docker compose -f ./scripts/docker-compose.yaml up -d sleep 20 export TEST_DB_MODE=mongo export TEST_DB_URI=mongodb://127.0.0.1:27017 @@ -31,7 +31,7 @@ jobs: uses: actions/checkout@v1 - name: UT for etcd run: | - time docker run -d -p 2379:2379 --name etcd quay.io/coreos/etcd etcd -name etcd --advertise-client-urls http://0.0.0.0:2379 --listen-client-urls http://0.0.0.0:2379 + time docker run -d -p 2379:2379 --name etcd quay.io/coreos/etcd:v3.5.15 etcd -name etcd --advertise-client-urls http://0.0.0.0:2379 --listen-client-urls http://0.0.0.0:2379 while ! nc -z 127.0.0.1 2379; do sleep 1 done diff --git a/.github/workflows/static_check.yml b/.github/workflows/static_check.yml index fe95ef40c..00b4a8e49 100644 --- a/.github/workflows/static_check.yml +++ b/.github/workflows/static_check.yml @@ -40,7 +40,7 @@ jobs: uses: actions/checkout@v1 - name: UT-MONGO run: | - sudo docker-compose -f ./scripts/docker-compose.yaml up -d + sudo docker compose -f ./scripts/docker-compose.yaml up -d sleep 20 bash -x scripts/ut_test_in_docker.sh mongo local: diff --git a/docker-compose.yml b/docker-compose.yml index c7be231ec..e50d3d990 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -17,7 +17,7 @@ version: '3' services: etcd: - image: 'quay.io/coreos/etcd:latest' + image: 'quay.io/coreos/etcd:v3.5.15' # restart: always #ports: # - "2379:2379" diff --git a/etc/conf/syncer.yaml b/etc/conf/syncer.yaml index 784f86a4d..11b65a648 100644 --- a/etc/conf/syncer.yaml +++ b/etc/conf/syncer.yaml @@ -1,11 +1,13 @@ sync: enableOnStart: false + rbacEnabled: false peers: - name: dc kind: servicecomb endpoints: ["127.0.0.1:30105"] # only allow mode implemented in incremental approach like push, watch(such as pub/sub, long polling) mode: [push] + token: tombstone: retire: # use linux crontab not Quartz cron diff --git a/scripts/integration_test.sh b/scripts/integration_test.sh index 6b4c4a92a..a5984f73f 100755 --- a/scripts/integration_test.sh +++ b/scripts/integration_test.sh @@ -41,7 +41,7 @@ set +e docker rm -f etcd kill -9 $(ps aux | grep 'service-center' | awk '{print $2}') set -e -sudo docker run -d -v /usr/share/ca-certificates/:/etc/ssl/certs -p 40010:40010 -p 23800:23800 -p 2379:2379 --name etcd quay.io/coreos/etcd etcd -name etcd0 -advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:40010 -listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:40010 -initial-advertise-peer-urls http://127.0.0.1:23800 -listen-peer-urls http://0.0.0.0:23800 -initial-cluster-token etcd-cluster-1 -initial-cluster etcd0=http://127.0.0.1:23800 -initial-cluster-state new +sudo docker run -d -v /usr/share/ca-certificates/:/etc/ssl/certs -p 40010:40010 -p 23800:23800 -p 2379:2379 --name etcd quay.io/coreos/etcd:v3.5.15 etcd -name etcd0 -advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:40010 -listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:40010 -initial-advertise-peer-urls http://127.0.0.1:23800 -listen-peer-urls http://0.0.0.0:23800 -initial-cluster-token etcd-cluster-1 -initial-cluster etcd0=http://127.0.0.1:23800 -initial-cluster-state new while ! nc -z 127.0.0.1 2379; do echo "Waiting Etcd to launch on 2379..." sleep 1 diff --git a/scripts/ut_test_in_docker.sh b/scripts/ut_test_in_docker.sh index 9e76d121c..357d0e6f6 100644 --- a/scripts/ut_test_in_docker.sh +++ b/scripts/ut_test_in_docker.sh @@ -31,7 +31,7 @@ echo "${green}Starting Unit Testing for Service Center${reset}" if [ "${db_name}" == "etcd" ];then echo "${green}Starting etcd in docker${reset}" - docker run -d -v /usr/share/ca-certificates/:/etc/ssl/certs -p 40010:40010 -p 23800:23800 -p 2379:2379 --name etcd quay.io/coreos/etcd etcd -name etcd0 -advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:40010 -listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:40010 -initial-advertise-peer-urls http://127.0.0.1:23800 -listen-peer-urls http://0.0.0.0:23800 -initial-cluster-token etcd-cluster-1 -initial-cluster etcd0=http://127.0.0.1:23800 -initial-cluster-state new + docker run -d -v /usr/share/ca-certificates/:/etc/ssl/certs -p 40010:40010 -p 23800:23800 -p 2379:2379 --name etcd quay.io/coreos/etcd:v3.5.15 etcd -name etcd0 -advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:40010 -listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:40010 -initial-advertise-peer-urls http://127.0.0.1:23800 -listen-peer-urls http://0.0.0.0:23800 -initial-cluster-token etcd-cluster-1 -initial-cluster etcd0=http://127.0.0.1:23800 -initial-cluster-state new while ! nc -z 127.0.0.1 2379; do echo "Waiting Etcd to launch on 2379..." sleep 1 @@ -45,7 +45,7 @@ elif [ ${db_name} == "mongo" ];then echo "${green}mongodb is running......${reset}" elif [ ${db_name} == "local" ];then echo "${green}Starting etcd in docker${reset}" - docker run -d -v /usr/share/ca-certificates/:/etc/ssl/certs -p 40010:40010 -p 23800:23800 -p 2379:2379 --name etcd quay.io/coreos/etcd etcd -name etcd0 -advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:40010 -listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:40010 -initial-advertise-peer-urls http://127.0.0.1:23800 -listen-peer-urls http://0.0.0.0:23800 -initial-cluster-token etcd-cluster-1 -initial-cluster etcd0=http://127.0.0.1:23800 -initial-cluster-state new + docker run -d -v /usr/share/ca-certificates/:/etc/ssl/certs -p 40010:40010 -p 23800:23800 -p 2379:2379 --name etcd quay.io/coreos/etcd:v3.5.15 etcd -name etcd0 -advertise-client-urls http://127.0.0.1:2379,http://127.0.0.1:40010 -listen-client-urls http://0.0.0.0:2379,http://0.0.0.0:40010 -initial-advertise-peer-urls http://127.0.0.1:23800 -listen-peer-urls http://0.0.0.0:23800 -initial-cluster-token etcd-cluster-1 -initial-cluster etcd0=http://127.0.0.1:23800 -initial-cluster-state new while ! nc -z 127.0.0.1 2379; do echo "Waiting Etcd to launch on 2379..." sleep 1 diff --git a/syncer/config/config.go b/syncer/config/config.go index de01f5ece..a568d5c98 100644 --- a/syncer/config/config.go +++ b/syncer/config/config.go @@ -21,9 +21,10 @@ import ( "fmt" "path/filepath" + "github.com/go-chassis/go-archaius" + "github.com/apache/servicecomb-service-center/pkg/log" "github.com/apache/servicecomb-service-center/pkg/util" - "github.com/go-chassis/go-archaius" ) var config Config @@ -33,8 +34,12 @@ type Config struct { } type Sync struct { - EnableOnStart bool `yaml:"enableOnStart"` - Peers []*Peer `yaml:"peers"` + EnableOnStart bool `yaml:"enableOnStart"` + // When RbacEnabled is true, syncer's API requires the rbac token, + // and service-center also provides the rbac token to communicate with peer. + // At the same time, service-center rbac must be enabled. + RbacEnabled bool `yaml:"rbacEnabled"` + Peers []*Peer `yaml:"peers"` } type Peer struct { @@ -42,6 +47,8 @@ type Peer struct { Kind string `yaml:"kind"` Endpoints []string `yaml:"endpoints"` Mode []string `yaml:"mode"` + // The token to communicate with peer, this takes effect only when RbacEnabled is true + Token string `yaml:"token"` } func Init() error { diff --git a/syncer/rpc/auth.go b/syncer/rpc/auth.go new file mode 100644 index 000000000..9f3ce607a --- /dev/null +++ b/syncer/rpc/auth.go @@ -0,0 +1,63 @@ +package rpc + +import ( + "context" + "fmt" + "strings" + + "github.com/go-chassis/cari/rbac" + "github.com/go-chassis/go-chassis/v2/security/authr" + "github.com/go-chassis/go-chassis/v2/server/restful" + "google.golang.org/grpc/metadata" + + "github.com/apache/servicecomb-service-center/pkg/log" + "github.com/apache/servicecomb-service-center/syncer/config" +) + +var errWrongAccountNorRole = fmt.Errorf("account should be %s, and roles should contain %s", RbacAllowedAccountName, RbacAllowedRoleName) + +func auth(ctx context.Context) error { + if !config.GetConfig().Sync.RbacEnabled { + return nil + } + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return rbac.NewError(rbac.ErrNoAuthHeader, "") + } + + authHeader := md.Get(restful.HeaderAuth) + if len(authHeader) == 0 { + return rbac.NewError(rbac.ErrNoAuthHeader, fmt.Sprintf("header %s not found nor content empty", restful.HeaderAuth)) + } + + s := strings.Split(authHeader[0], " ") + if len(s) != 2 { + return rbac.ErrInvalidHeader + } + to := s[1] + + claims, err := authr.Authenticate(ctx, to) + if err != nil { + return err + } + m, ok := claims.(map[string]interface{}) + if !ok { + log.Error("claims convert failed", rbac.ErrConvert) + return rbac.ErrConvert + } + account, err := rbac.GetAccount(m) + if err != nil { + log.Error("get account from token failed", err) + return err + } + + if account.Name != RbacAllowedAccountName { + return errWrongAccountNorRole + } + for _, role := range account.Roles { + if role == RbacAllowedRoleName { + return nil + } + } + return errWrongAccountNorRole +} diff --git a/syncer/rpc/auth_test.go b/syncer/rpc/auth_test.go new file mode 100644 index 000000000..2cc1bcba7 --- /dev/null +++ b/syncer/rpc/auth_test.go @@ -0,0 +1,130 @@ +package rpc + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "testing" + + "github.com/go-chassis/cari/pkg/errsvc" + "github.com/go-chassis/cari/rbac" + "github.com/go-chassis/go-chassis/v2/security/authr" + "github.com/go-chassis/go-chassis/v2/server/restful" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc/metadata" + + "github.com/apache/servicecomb-service-center/syncer/config" +) + +type testAuth struct{} + +func (testAuth) Login(ctx context.Context, user string, password string, opts ...authr.LoginOption) (string, error) { + return "", nil +} + +func (testAuth) Authenticate(ctx context.Context, token string) (interface{}, error) { + var claim map[string]interface{} + return claim, json.Unmarshal([]byte(token), &claim) +} + +func Test_auth(t *testing.T) { + // use the custom auth plugin + authr.Install("test", func(opts *authr.Options) (authr.Authenticator, error) { + return testAuth{}, nil + }) + assert.NoError(t, authr.Init(authr.WithPlugin("test"))) + + type args struct { + ctx context.Context + } + tests := []struct { + name string + preDo func() + args args + wantErr assert.ErrorAssertionFunc + }{ + { + name: "sync rbac disables", + preDo: func() { + config.SetConfig(config.Config{ + Sync: &config.Sync{ + RbacEnabled: false, + }}) + }, + args: args{ + ctx: context.Background(), // rbac disabled, empty ctx should pass the auth + }, + wantErr: assert.NoError, + }, + { + name: "no header", + preDo: func() { + config.SetConfig(config.Config{ + Sync: &config.Sync{ + RbacEnabled: true, + }}) + }, + args: args{ + ctx: context.Background(), // rbac enabled, empty ctx should not pass the auth + }, + wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { + var errSvcErr *errsvc.Error + ok := errors.As(err, &errSvcErr) + assert.True(t, ok) + + return assert.Equal(t, rbac.ErrNoAuthHeader, errSvcErr.Code) + }, + }, + { + name: "with header but no auth header", + args: args{ + ctx: metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{"fake": "fake"})), + }, + wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { + var errSvcErr *errsvc.Error + ok := errors.As(err, &errSvcErr) + assert.True(t, ok) + + return assert.Equal(t, rbac.ErrNoAuthHeader, errSvcErr.Code) + }, + }, + { + name: "auth header format error", + args: args{ + ctx: metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{restful.HeaderAuth: "fake"})), + }, + wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { + return assert.Equal(t, rbac.ErrInvalidHeader, err) + }, + }, + { + name: "wrong account nor role", + args: args{ + ctx: metadata.NewIncomingContext(context.Background(), + metadata.New(map[string]string{restful.HeaderAuth: `Bear {"account":"x","roles":["x"]}`})), + }, + wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { + return assert.Equal(t, errWrongAccountNorRole, err) + }, + }, + { + name: "valid token", + args: args{ + ctx: metadata.NewIncomingContext(context.Background(), + metadata.New(map[string]string{restful.HeaderAuth: `Bear {"account":"sync-user","roles":["sync-admin"]}`})), + }, + wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { + return assert.NoError(t, err) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.preDo != nil { + tt.preDo() + } + tt.wantErr(t, auth(tt.args.ctx), fmt.Sprintf("auth(%v)", tt.args.ctx)) + }) + } +} diff --git a/syncer/rpc/server.go b/syncer/rpc/server.go index 990f78946..dc6efee0e 100644 --- a/syncer/rpc/server.go +++ b/syncer/rpc/server.go @@ -22,18 +22,21 @@ import ( "fmt" "time" - "github.com/apache/servicecomb-service-center/syncer/service/replicator" - "github.com/apache/servicecomb-service-center/syncer/service/replicator/resource" - "github.com/apache/servicecomb-service-center/pkg/log" v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1" "github.com/apache/servicecomb-service-center/syncer/config" + "github.com/apache/servicecomb-service-center/syncer/service/replicator" + "github.com/apache/servicecomb-service-center/syncer/service/replicator/resource" ) const ( HealthStatusConnected = "CONNECTED" HealthStatusAbnormal = "ABNORMAL" HealthStatusClose = "CLOSE" + HealthStatusAuthFail = "AuthFail" + + RbacAllowedAccountName = "sync-user" + RbacAllowedRoleName = "sync-admin" ) func NewServer() *Server { @@ -49,6 +52,12 @@ type Server struct { } func (s *Server) Sync(ctx context.Context, events *v1sync.EventList) (*v1sync.Results, error) { + err := auth(ctx) + if err != nil { + log.Error("auth failed", err) + return generateFailedResults(events, err) + } + log.Info(fmt.Sprintf("start sync: %s", events.Flag())) res := s.replicator.Persist(ctx, events) @@ -56,6 +65,20 @@ func (s *Server) Sync(ctx context.Context, events *v1sync.EventList) (*v1sync.Re return s.toResults(res), nil } +func generateFailedResults(events *v1sync.EventList, err error) (*v1sync.Results, error) { + if events == nil || len(events.Events) == 0 { + return &v1sync.Results{Results: map[string]*v1sync.Result{}}, nil + } + rsts := make(map[string]*v1sync.Result, len(events.Events)) + for _, evt := range events.Events { + rsts[evt.Id] = &v1sync.Result{ + Code: resource.Fail, + Message: err.Error(), + } + } + return &v1sync.Results{Results: rsts}, nil +} + func (s *Server) toResults(results []*resource.Result) *v1sync.Results { syncResult := make(map[string]*v1sync.Result, len(results)) for _, r := range results { @@ -69,11 +92,18 @@ func (s *Server) toResults(results []*resource.Result) *v1sync.Results { } } -func (s *Server) Health(_ context.Context, _ *v1sync.HealthRequest) (*v1sync.HealthReply, error) { +func (s *Server) Health(ctx context.Context, _ *v1sync.HealthRequest) (*v1sync.HealthReply, error) { resp := &v1sync.HealthReply{ Status: HealthStatusConnected, LocalTimestamp: time.Now().UnixNano(), } + err := auth(ctx) + if err != nil { + resp.Status = HealthStatusAuthFail + log.Error("auth failed", err) + return resp, nil + } + // TODO enable to close syncer if !config.GetConfig().Sync.EnableOnStart { resp.Status = HealthStatusClose diff --git a/syncer/rpc/server_test.go b/syncer/rpc/server_test.go new file mode 100644 index 000000000..4123c1ac6 --- /dev/null +++ b/syncer/rpc/server_test.go @@ -0,0 +1,85 @@ +package rpc + +import ( + "context" + "reflect" + "testing" + + "github.com/go-chassis/cari/rbac" + "github.com/stretchr/testify/assert" + + v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1" + "github.com/apache/servicecomb-service-center/syncer/config" + "github.com/apache/servicecomb-service-center/syncer/service/replicator/resource" +) + +type testReplicator struct{} + +func (testReplicator) Replicate(ctx context.Context, el *v1sync.EventList) (*v1sync.Results, error) { + return &v1sync.Results{Results: map[string]*v1sync.Result{ + "constant_id": &v1sync.Result{ + Code: resource.Success, + }, + }}, nil +} + +func (testReplicator) Persist(ctx context.Context, el *v1sync.EventList) []*resource.Result { + return []*resource.Result{ + &resource.Result{ + EventID: "constant_id", + Status: resource.Success, + }, + } +} + +func TestServer_Sync(t *testing.T) { + s := NewServer() + + // rbac enabled, should sync failed and return auth failed message + config.SetConfig(config.Config{ + Sync: &config.Sync{ + RbacEnabled: true, + }}) + events := &v1sync.EventList{Events: []*v1sync.Event{ + { + Id: "evt1", + }, + { + Id: "evt2", + }, + }} + + expectedRst := map[string]*v1sync.Result{ + "evt1": &v1sync.Result{ + Code: resource.Fail, + Message: rbac.NewError(rbac.ErrNoAuthHeader, "").Error(), + }, + + "evt2": &v1sync.Result{ + Code: resource.Fail, + Message: rbac.NewError(rbac.ErrNoAuthHeader, "").Error(), + }, + } + rst, err := s.Sync(context.Background(), events) + assert.NoError(t, err) + assert.True(t, reflect.DeepEqual(expectedRst, rst.Results)) + + rst, err = s.Sync(context.Background(), nil) // nil input + assert.NoError(t, err) + assert.Equal(t, 0, len(rst.Results)) + + // rbac disabled, should sync success(with the mock replicator) + config.SetConfig(config.Config{ + Sync: &config.Sync{ + RbacEnabled: false, + }}) + expectedRst = map[string]*v1sync.Result{ + "constant_id": &v1sync.Result{ + Code: resource.Success, + }, + } + s.replicator = &testReplicator{} + rst, err = s.Sync(context.Background(), events) + assert.NoError(t, err) + assert.True(t, reflect.DeepEqual(expectedRst, rst.Results)) +} diff --git a/syncer/server/server.go b/syncer/server/server.go index c880d55fd..095ae615a 100644 --- a/syncer/server/server.go +++ b/syncer/server/server.go @@ -18,6 +18,9 @@ package server import ( + "github.com/go-chassis/go-chassis/v2" + chassisServer "github.com/go-chassis/go-chassis/v2/core/server" + "github.com/apache/servicecomb-service-center/pkg/log" syncv1 "github.com/apache/servicecomb-service-center/syncer/api/v1" "github.com/apache/servicecomb-service-center/syncer/config" @@ -25,8 +28,6 @@ import ( "github.com/apache/servicecomb-service-center/syncer/rpc" "github.com/apache/servicecomb-service-center/syncer/service/admin" "github.com/apache/servicecomb-service-center/syncer/service/sync" - "github.com/go-chassis/go-chassis/v2" - chassisServer "github.com/go-chassis/go-chassis/v2/core/server" ) // Run register chassis schema and run syncer services before chassis.Run() @@ -40,6 +41,15 @@ func Run() { return } + if len(config.GetConfig().Sync.Peers) <= 0 { + log.Warn("peers parameter configuration is empty") + return + } + + if config.GetConfig().Sync.RbacEnabled { + log.Info("syncer rbac enabled") + } + chassis.RegisterSchema("grpc", rpc.NewServer(), chassisServer.WithRPCServiceDesc(&syncv1.EventService_ServiceDesc)) diff --git a/syncer/service/admin/health.go b/syncer/service/admin/health.go index 99c9db79a..cdba73582 100644 --- a/syncer/service/admin/health.go +++ b/syncer/service/admin/health.go @@ -20,13 +20,17 @@ package admin import ( "context" "errors" + "fmt" "time" + "github.com/go-chassis/go-chassis/v2/server/restful" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" "github.com/apache/servicecomb-service-center/client" "github.com/apache/servicecomb-service-center/pkg/log" pkgrpc "github.com/apache/servicecomb-service-center/pkg/rpc" + "github.com/apache/servicecomb-service-center/server/plugin/security/cipher" v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1" syncerclient "github.com/apache/servicecomb-service-center/syncer/client" "github.com/apache/servicecomb-service-center/syncer/config" @@ -59,25 +63,15 @@ type Peer struct { Mode []string `json:"mode"` Endpoints []string `json:"endpoints"` Status string `json:"status"` + Token string `json:"-"` } func Init() { cfg := config.GetConfig() - if cfg.Sync == nil { - log.Warn("sync config is empty") - return - } - if !cfg.Sync.EnableOnStart { - log.Info("syncer is disabled") - return - } - if len(cfg.Sync.Peers) <= 0 { - log.Warn("peers parameter configuration is empty") - return - } peerInfos = make([]*PeerInfo, 0, len(cfg.Sync.Peers)) for _, c := range cfg.Sync.Peers { if len(c.Endpoints) <= 0 { + log.Warn("no endpoints of peer: " + c.Name) continue } p := &Peer{ @@ -86,10 +80,21 @@ func Init() { Mode: c.Mode, Endpoints: c.Endpoints, } + if config.GetConfig().Sync.RbacEnabled { + plainToken, err := cipher.Decrypt(c.Token) + if err != nil { + log.Error(fmt.Sprintf("decrypt token of peer %s failed, use original content", c.Name), err) + plainToken = c.Token + } + p.Token = plainToken + } + conn, err := newRPCConn(p.Endpoints) - if err == nil { - peerInfos = append(peerInfos, &PeerInfo{Peer: p, ClientConn: conn}) + if err != nil { + log.Error(fmt.Sprintf("new client failed for peer: %s", c.Name), err) + continue } + peerInfos = append(peerInfos, &PeerInfo{Peer: p, ClientConn: conn}) } } @@ -103,7 +108,7 @@ func Health() (*Resp, error) { if len(peerInfo.Peer.Endpoints) <= 0 { continue } - status := getPeerStatus(peerInfo.Peer.Name, peerInfo.ClientConn) + status := getPeerStatus(peerInfo) resp.Peers = append(resp.Peers, &Peer{ Name: peerInfo.Peer.Name, Kind: peerInfo.Peer.Kind, @@ -117,19 +122,25 @@ func Health() (*Resp, error) { return resp, nil } -func getPeerStatus(peerName string, clientConn *grpc.ClientConn) string { - if clientConn == nil { +func getPeerStatus(peerInfo *PeerInfo) string { + if peerInfo.ClientConn == nil { log.Warn("clientConn is nil") return rpc.HealthStatusAbnormal } local := time.Now().UnixNano() - set := client.NewSet(clientConn) - reply, err := set.EventServiceClient.Health(context.Background(), &v1sync.HealthRequest{}) + set := client.NewSet(peerInfo.ClientConn) + ctx := context.Background() + if config.GetConfig().Sync.RbacEnabled { + ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{ + restful.HeaderAuth: "Bearer " + peerInfo.Peer.Token, + })) + } + reply, err := set.EventServiceClient.Health(ctx, &v1sync.HealthRequest{}) if err != nil || reply == nil { log.Error("get peer health failed", err) return rpc.HealthStatusAbnormal } - reportClockDiff(peerName, local, reply.LocalTimestamp) + reportClockDiff(peerInfo.Peer.Name, local, reply.LocalTimestamp) return reply.Status } @@ -159,3 +170,7 @@ func newRPCConn(endpoints []string) (*grpc.ClientConn, error) { TLSConfig: syncerclient.RPClientConfig(), }) } + +func Peers() []*PeerInfo { + return peerInfos +} diff --git a/syncer/service/replicator/replicator.go b/syncer/service/replicator/replicator.go index d7f1e3d49..52adfc2df 100644 --- a/syncer/service/replicator/replicator.go +++ b/syncer/service/replicator/replicator.go @@ -21,16 +21,20 @@ import ( "context" "fmt" + "github.com/go-chassis/foundation/gopool" + "github.com/go-chassis/go-chassis/v2/server/restful" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + "github.com/apache/servicecomb-service-center/client" "github.com/apache/servicecomb-service-center/pkg/log" "github.com/apache/servicecomb-service-center/pkg/rpc" "github.com/apache/servicecomb-service-center/pkg/util" + "github.com/apache/servicecomb-service-center/server/plugin/security/cipher" v1sync "github.com/apache/servicecomb-service-center/syncer/api/v1" syncerclient "github.com/apache/servicecomb-service-center/syncer/client" "github.com/apache/servicecomb-service-center/syncer/config" "github.com/apache/servicecomb-service-center/syncer/service/replicator/resource" - "github.com/go-chassis/foundation/gopool" - "google.golang.org/grpc" ) const ( @@ -48,7 +52,8 @@ var ( ) var ( - conn *grpc.ClientConn + conn *grpc.ClientConn + peerToken = "" ) func Work() error { @@ -68,8 +73,7 @@ func Work() error { } func InitSyncClient() error { - cfg := config.GetConfig() - peer := cfg.Sync.Peers[0] + peer := config.GetConfig().Sync.Peers[0] log.Info(fmt.Sprintf("peer is %v", peer)) var err error conn, err = rpc.GetRoundRobinLbConn(&rpc.Config{ @@ -78,7 +82,19 @@ func InitSyncClient() error { ServiceName: serviceName, TLSConfig: syncerclient.RPClientConfig(), }) - return err + if err != nil { + log.Error("get rpc client failed", err) + return err + } + if !config.GetConfig().Sync.RbacEnabled { + return nil + } + peerToken, err = cipher.Decrypt(peer.Token) + if err != nil { + log.Error("decrypt peer token failed, use original content", err) + peerToken = peer.Token + } + return nil } func Close() { @@ -161,6 +177,11 @@ func (r *replicatorManager) replicate(ctx context.Context, el *v1sync.EventList) } log.Info(fmt.Sprintf("page count %d to sync", len(els))) + if config.GetConfig().Sync.RbacEnabled { + ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{ + restful.HeaderAuth: "Bearer " + peerToken, + })) + } for _, in := range els { res, err := set.EventServiceClient.Sync(ctx, in)