Skip to content

Commit

Permalink
SCB-993 search sc cluster when key mismatch with aggregator cache (#493)
Browse files Browse the repository at this point in the history
* SCB-993 search sc cluster when key mismatch with aggregator cache

* SCB-993 New revision mechanism

* SCB-993 Add global flag in query api

* SCB-993 Prevent aggregator to enter infinite recursion

* SCB-993 Optimize codes
  • Loading branch information
little-cui authored Nov 21, 2018
1 parent 0a0b1fd commit 4942829
Show file tree
Hide file tree
Showing 47 changed files with 851 additions and 474 deletions.
8 changes: 4 additions & 4 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (
type level1 struct {
}

func (l *level1) Name(ctx context.Context) string {
func (l *level1) Name(ctx context.Context, _ *Node) string {
return ctx.Value("key1").(string)
}

func (l *level1) Init(ctx context.Context, parent *Node) (node *Node, err error) {
p := l.Name(ctx)
p := l.Name(ctx, parent)
if p == "err" {
return nil, fmt.Errorf("wrong logic")
}
Expand All @@ -51,7 +51,7 @@ type level2 struct {
changed string
}

func (l *level2) Name(ctx context.Context) string {
func (l *level2) Name(ctx context.Context, _ *Node) string {
return ctx.Value("key2").(string)
}

Expand All @@ -60,7 +60,7 @@ func (l *level2) Init(ctx context.Context, parent *Node) (node *Node, err error)
return
}

p := l.Name(ctx)
p := l.Name(ctx, parent)
if p == "err" {
return nil, fmt.Errorf("wrong logic")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ package cache
import "golang.org/x/net/context"

type Filter interface {
Name(ctx context.Context) string
Name(ctx context.Context, parent *Node) string
Init(ctx context.Context, parent *Node) (*Node, error)
}
6 changes: 3 additions & 3 deletions pkg/cache/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (t *Tree) Remove(ctx context.Context) {
return
}

t.roots.Delete(t.filters[0].Name(ctx))
t.roots.Delete(t.filters[0].Name(ctx, nil))
}

func (t *Tree) getOrCreateRoot(ctx context.Context) (node *Node, err error) {
Expand All @@ -86,7 +86,7 @@ func (t *Tree) getOrCreateRoot(ctx context.Context) (node *Node, err error) {
}

filter := t.filters[0]
name := filter.Name(ctx)
name := filter.Name(ctx, nil)
item, err := t.roots.Fetch(name, t.Config.TTL(), func() (interface{}, error) {
node, err := t.getOrCreateNode(ctx, 0, nil)
if err != nil {
Expand All @@ -108,7 +108,7 @@ func (t *Tree) getOrCreateRoot(ctx context.Context) (node *Node, err error) {

func (t *Tree) getOrCreateNode(ctx context.Context, idx int, parent *Node) (node *Node, err error) {
filter := t.filters[idx]
name := t.nodeFullName(filter.Name(ctx), parent)
name := t.nodeFullName(filter.Name(ctx, parent), parent)

if parent == nil {
// new a temp node
Expand Down
132 changes: 102 additions & 30 deletions pkg/client/sc/apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,22 @@ import (
scerr "github.com/apache/servicecomb-service-center/server/error"
"github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
"github.com/apache/servicecomb-service-center/version"
"golang.org/x/net/context"
"io/ioutil"
"net/http"
)

const (
apiVersionURL = "/version"
apiDumpURL = "/v4/default/admin/dump"
apiClustersURL = "/v4/default/admin/clusters"
apiHealthURL = "/v4/default/registry/health"
apiSchemasURL = "/v4/%s/registry/microservices/%s/schemas"
apiSchemaURL = "/v4/%s/registry/microservices/%s/schemas/%s"
apiVersionURL = "/version"
apiDumpURL = "/v4/default/admin/dump"
apiClustersURL = "/v4/default/admin/clusters"
apiHealthURL = "/v4/default/registry/health"
apiSchemasURL = "/v4/%s/registry/microservices/%s/schemas"
apiSchemaURL = "/v4/%s/registry/microservices/%s/schemas/%s"
apiInstancesURL = "/v4/%s/registry/microservices/%s/instances"
apiInstanceURL = "/v4/%s/registry/microservices/%s/instances/%s"

QueryGlobal = "global"
)

func (c *SCClient) toError(body []byte) *scerr.Error {
Expand All @@ -47,8 +52,18 @@ func (c *SCClient) toError(body []byte) *scerr.Error {
return message
}

func (c *SCClient) GetScVersion() (*version.VersionSet, *scerr.Error) {
resp, err := c.RestDo(http.MethodGet, apiVersionURL, c.CommonHeaders(), nil)
func (c *SCClient) parseQuery(ctx context.Context) (q string) {
switch {
case ctx.Value(QueryGlobal) == "1":
q += "global=true"
default:
q += "global=false"
}
return
}

func (c *SCClient) GetScVersion(ctx context.Context) (*version.VersionSet, *scerr.Error) {
resp, err := c.RestDoWithContext(ctx, http.MethodGet, apiVersionURL, c.CommonHeaders(ctx), nil)
if err != nil {
return nil, scerr.NewError(scerr.ErrInternal, err.Error())
}
Expand All @@ -66,18 +81,17 @@ func (c *SCClient) GetScVersion() (*version.VersionSet, *scerr.Error) {
v := &version.VersionSet{}
err = json.Unmarshal(body, v)
if err != nil {
fmt.Println(string(body))
return nil, scerr.NewError(scerr.ErrInternal, err.Error())
}

return v, nil
}

func (c *SCClient) GetScCache() (*model.Cache, *scerr.Error) {
headers := c.CommonHeaders()
func (c *SCClient) GetScCache(ctx context.Context) (*model.Cache, *scerr.Error) {
headers := c.CommonHeaders(ctx)
// only default domain has admin permission
headers.Set("X-Domain-Name", "default")
resp, err := c.RestDo(http.MethodGet, apiDumpURL, headers, nil)
resp, err := c.RestDoWithContext(ctx, http.MethodGet, apiDumpURL, headers, nil)
if err != nil {
return nil, scerr.NewError(scerr.ErrInternal, err.Error())
}
Expand All @@ -95,19 +109,18 @@ func (c *SCClient) GetScCache() (*model.Cache, *scerr.Error) {
dump := &model.DumpResponse{}
err = json.Unmarshal(body, dump)
if err != nil {
fmt.Println(string(body))
return nil, scerr.NewError(scerr.ErrInternal, err.Error())
}

return dump.Cache, nil
}

func (c *SCClient) GetSchemasByServiceId(domainProject, serviceId string) ([]*pb.Schema, *scerr.Error) {
func (c *SCClient) GetSchemasByServiceId(ctx context.Context, domainProject, serviceId string) ([]*pb.Schema, *scerr.Error) {
domain, project := core.FromDomainProject(domainProject)
headers := c.CommonHeaders()
headers := c.CommonHeaders(ctx)
headers.Set("X-Domain-Name", domain)
resp, err := c.RestDo(http.MethodGet,
fmt.Sprintf(apiSchemasURL, project, serviceId)+"?withSchema=1",
resp, err := c.RestDoWithContext(ctx, http.MethodGet,
fmt.Sprintf(apiSchemasURL, project, serviceId)+"?withSchema=1&"+c.parseQuery(ctx),
headers, nil)
if err != nil {
return nil, scerr.NewError(scerr.ErrInternal, err.Error())
Expand All @@ -126,19 +139,18 @@ func (c *SCClient) GetSchemasByServiceId(domainProject, serviceId string) ([]*pb
schemas := &pb.GetAllSchemaResponse{}
err = json.Unmarshal(body, schemas)
if err != nil {
fmt.Println(util.BytesToStringWithNoCopy(body))
return nil, scerr.NewError(scerr.ErrInternal, err.Error())
}

return schemas.Schemas, nil
}

func (c *SCClient) GetSchemaBySchemaId(domainProject, serviceId, schemaId string) (*pb.Schema, *scerr.Error) {
func (c *SCClient) GetSchemaBySchemaId(ctx context.Context, domainProject, serviceId, schemaId string) (*pb.Schema, *scerr.Error) {
domain, project := core.FromDomainProject(domainProject)
headers := c.CommonHeaders()
headers := c.CommonHeaders(ctx)
headers.Set("X-Domain-Name", domain)
resp, err := c.RestDo(http.MethodGet,
fmt.Sprintf(apiSchemaURL, project, serviceId, schemaId),
resp, err := c.RestDoWithContext(ctx, http.MethodGet,
fmt.Sprintf(apiSchemaURL, project, serviceId, schemaId)+"?"+c.parseQuery(ctx),
headers, nil)
if err != nil {
return nil, scerr.NewError(scerr.ErrInternal, err.Error())
Expand All @@ -157,7 +169,6 @@ func (c *SCClient) GetSchemaBySchemaId(domainProject, serviceId, schemaId string
schema := &pb.GetSchemaResponse{}
err = json.Unmarshal(body, schema)
if err != nil {
fmt.Println(util.BytesToStringWithNoCopy(body))
return nil, scerr.NewError(scerr.ErrInternal, err.Error())
}

Expand All @@ -168,11 +179,11 @@ func (c *SCClient) GetSchemaBySchemaId(domainProject, serviceId, schemaId string
}, nil
}

func (c *SCClient) GetClusters() (registry.Clusters, *scerr.Error) {
headers := c.CommonHeaders()
func (c *SCClient) GetClusters(ctx context.Context) (registry.Clusters, *scerr.Error) {
headers := c.CommonHeaders(ctx)
// only default domain has admin permission
headers.Set("X-Domain-Name", "default")
resp, err := c.RestDo(http.MethodGet, apiClustersURL, headers, nil)
resp, err := c.RestDoWithContext(ctx, http.MethodGet, apiClustersURL, headers, nil)
if err != nil {
return nil, scerr.NewError(scerr.ErrInternal, err.Error())
}
Expand All @@ -190,18 +201,17 @@ func (c *SCClient) GetClusters() (registry.Clusters, *scerr.Error) {
clusters := &model.ClustersResponse{}
err = json.Unmarshal(body, clusters)
if err != nil {
fmt.Println(string(body))
return nil, scerr.NewError(scerr.ErrInternal, err.Error())
}

return clusters.Clusters, nil
}

func (c *SCClient) HealthCheck() *scerr.Error {
headers := c.CommonHeaders()
func (c *SCClient) HealthCheck(ctx context.Context) *scerr.Error {
headers := c.CommonHeaders(ctx)
// only default domain has admin permission
headers.Set("X-Domain-Name", "default")
resp, err := c.RestDo(http.MethodGet, apiHealthURL, headers, nil)
resp, err := c.RestDoWithContext(ctx, http.MethodGet, apiHealthURL, headers, nil)
if err != nil {
return scerr.NewError(scerr.ErrUnavailableBackend, err.Error())
}
Expand All @@ -217,3 +227,65 @@ func (c *SCClient) HealthCheck() *scerr.Error {
}
return nil
}

func (c *SCClient) GetInstancesByServiceId(ctx context.Context, domainProject, providerId, consumerId string) ([]*pb.MicroServiceInstance, *scerr.Error) {
domain, project := core.FromDomainProject(domainProject)
headers := c.CommonHeaders(ctx)
headers.Set("X-Domain-Name", domain)
headers.Set("X-ConsumerId", consumerId)
resp, err := c.RestDoWithContext(ctx, http.MethodGet,
fmt.Sprintf(apiInstancesURL, project, providerId)+"?"+c.parseQuery(ctx),
headers, nil)
if err != nil {
return nil, scerr.NewError(scerr.ErrInternal, err.Error())
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, scerr.NewError(scerr.ErrInternal, err.Error())
}

if resp.StatusCode != http.StatusOK {
return nil, c.toError(body)
}

instancesResp := &pb.GetInstancesResponse{}
err = json.Unmarshal(body, instancesResp)
if err != nil {
return nil, scerr.NewError(scerr.ErrInternal, err.Error())
}

return instancesResp.Instances, nil
}

func (c *SCClient) GetInstanceByInstanceId(ctx context.Context, domainProject, providerId, instanceId, consumerId string) (*pb.MicroServiceInstance, *scerr.Error) {
domain, project := core.FromDomainProject(domainProject)
headers := c.CommonHeaders(ctx)
headers.Set("X-Domain-Name", domain)
headers.Set("X-ConsumerId", consumerId)
resp, err := c.RestDoWithContext(ctx, http.MethodGet,
fmt.Sprintf(apiInstanceURL, project, providerId, instanceId)+"?"+c.parseQuery(ctx),
headers, nil)
if err != nil {
return nil, scerr.NewError(scerr.ErrInternal, err.Error())
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, scerr.NewError(scerr.ErrInternal, err.Error())
}

if resp.StatusCode != http.StatusOK {
return nil, c.toError(body)
}

instanceResp := &pb.GetOneInstanceResponse{}
err = json.Unmarshal(body, instanceResp)
if err != nil {
return nil, scerr.NewError(scerr.ErrInternal, err.Error())
}

return instanceResp.Instance, nil
}
4 changes: 3 additions & 1 deletion pkg/client/sc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package sc

import (
"golang.org/x/net/context"
"net/http"
)

Expand All @@ -32,8 +33,9 @@ type SCClient struct {
Cfg Config
}

func (c *SCClient) CommonHeaders() http.Header {
func (c *SCClient) CommonHeaders(ctx context.Context) http.Header {
var headers = make(http.Header)
// TODO overwrote by context values
if len(c.Cfg.Token) > 0 {
headers.Set("X-Auth-Token", c.Cfg.Token)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/client/sc/client_lb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/apache/servicecomb-service-center/pkg/lb"
"github.com/apache/servicecomb-service-center/pkg/rest"
"github.com/apache/servicecomb-service-center/pkg/util"
"golang.org/x/net/context"
"net/http"
)

Expand All @@ -44,9 +45,9 @@ func (c *LBClient) Next() string {
return c.LB.Next()
}

func (c *LBClient) RestDo(method string, api string, headers http.Header, body []byte) (resp *http.Response, err error) {
func (c *LBClient) RestDoWithContext(ctx context.Context, method string, api string, headers http.Header, body []byte) (resp *http.Response, err error) {
for i := 0; i < c.Retries; i++ {
resp, err = c.HttpDo(method, c.Next()+api, headers, body)
resp, err = c.HttpDoWithContext(ctx, method, c.Next()+api, headers, body)
if err != nil {
util.GetBackoff().Delay(i)
continue
Expand Down
8 changes: 7 additions & 1 deletion pkg/rest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"github.com/apache/servicecomb-service-center/pkg/tlsutil"
"github.com/apache/servicecomb-service-center/pkg/util"
"golang.org/x/net/context"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -64,7 +65,7 @@ type URLClient struct {
Cfg URLClientOption
}

func (client *URLClient) HttpDo(method string, rawURL string, headers http.Header, body []byte) (resp *http.Response, err error) {
func (client *URLClient) HttpDoWithContext(ctx context.Context, method string, rawURL string, headers http.Header, body []byte) (resp *http.Response, err error) {
if strings.HasPrefix(rawURL, "https") {
if transport, ok := client.Client.Transport.(*http.Transport); ok {
transport.TLSClientConfig = client.TLS
Expand Down Expand Up @@ -93,6 +94,7 @@ func (client *URLClient) HttpDo(method string, rawURL string, headers http.Heade
if err != nil {
return nil, errors.New(fmt.Sprintf("create request failed: %s", err.Error()))
}
req = req.WithContext(ctx)
req.Header = headers

resp, err = client.Client.Do(req)
Expand Down Expand Up @@ -122,6 +124,10 @@ func (client *URLClient) HttpDo(method string, rawURL string, headers http.Heade
return resp, nil
}

func (client *URLClient) HttpDo(method string, rawURL string, headers http.Header, body []byte) (resp *http.Response, err error) {
return client.HttpDoWithContext(context.Background(), method, rawURL, headers, body)
}

func setOptionDefaultValue(o *URLClientOption) URLClientOption {
if o == nil {
return defaultURLClientOption
Expand Down
8 changes: 8 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,11 @@ func ResetTimer(timer *time.Timer, d time.Duration) {
}
timer.Reset(d)
}

func StringTRUE(s string) bool {
s = strings.ToLower(strings.TrimSpace(s))
if s == "1" || s == "true" {
return true
}
return false
}
2 changes: 1 addition & 1 deletion scctl/pkg/plugin/diagnose/diagnose.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func DiagnoseCommandFunc(_ *cobra.Command, args []string) {
}

// query sc
cache, scErr := scClient.GetScCache()
cache, scErr := scClient.GetScCache(context.Background())
if scErr != nil {
cmd.StopAndExit(cmd.ExitError, scErr)
}
Expand Down
Loading

0 comments on commit 4942829

Please sign in to comment.