Skip to content

Commit

Permalink
multi instances change status by properties (#1425)
Browse files Browse the repository at this point in the history
* multi instances change status by properties

* multi instances change status by properties

* multi instances change status by properties

---------

Co-authored-by: qiuqi (C) <[email protected]>
  • Loading branch information
qiuqi06 and qiuqi (C) authored Aug 26, 2023
1 parent 8df058b commit 429da63
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 2 deletions.
53 changes: 53 additions & 0 deletions datasource/etcd/ms.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
esync "github.com/apache/servicecomb-service-center/datasource/etcd/sync"
eutil "github.com/apache/servicecomb-service-center/datasource/etcd/util"
serviceUtil "github.com/apache/servicecomb-service-center/datasource/etcd/util"
"github.com/apache/servicecomb-service-center/datasource/schema"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
Expand Down Expand Up @@ -1569,3 +1570,55 @@ func (ds *MetadataManager) UnregisterService(ctx context.Context, request *pb.De
func (ds *MetadataManager) Statistics(ctx context.Context, withShared bool) (*pb.Statistics, error) {
return statistics(ctx, withShared)
}

func (ds *MetadataManager) UpdateManyInstanceStatus(ctx context.Context, match *datasource.MatchPolicy, status string) error {
resp, _ := ds.ListManyInstances(ctx, &pb.GetAllInstancesRequest{})
instances := resp.Instances
if len(instances) == 0 {
return nil
}
options := make([]etcdadpt.OpOptions, 0)
cmps := make([]etcdadpt.CmpOptions, 0)

domainProject := util.ParseDomainProject(ctx)

for _, instance := range instances {
var t = true
for k, v := range match.Properties {
value, ok := instance.Properties[k]
if ok {
if value != v {
t = false
break
}
} else {
t = false
break
}
}
if t {
key := path.GenerateInstanceKey(domainProject, instance.ServiceId, instance.InstanceId)
//更新状态
instance.Status = status
data, _ := json.Marshal(instance)
leaseID, err := serviceUtil.GetLeaseID(ctx, domainProject, instance.ServiceId, instance.InstanceId)
if err != nil {
log.Error(fmt.Sprintf("get leaseId %s error", instance.InstanceId), err)
continue
}
options = append(options, etcdadpt.Ops(etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data), etcdadpt.WithLease(leaseID)))...)
cmps = append(cmps, etcdadpt.If(etcdadpt.NotEqualVer(path.GenerateServiceKey(domainProject, instance.ServiceId), 0))...)
}
}
_, err := etcdadpt.TxnWithCmp(ctx,
options,
cmps,
nil)

if err != nil {
log.Error("UpdateManyInstanceStatus error", err)

return pb.NewError(pb.ErrUnavailableBackend, err.Error())
}
return nil
}
4 changes: 4 additions & 0 deletions datasource/mongo/ms.go
Original file line number Diff line number Diff line change
Expand Up @@ -1677,3 +1677,7 @@ func formatRevision(consumerServiceID string, instances []*discovery.MicroServic
func (ds *MetadataManager) Statistics(ctx context.Context, withShared bool) (*discovery.Statistics, error) {
return statistics(ctx, withShared)
}

func (ds *MetadataManager) UpdateManyInstanceStatus(ctx context.Context, match *datasource.MatchPolicy, status string) error {
return nil
}
6 changes: 6 additions & 0 deletions datasource/ms.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ var (
ErrModifySchemaNotAllow = errors.New("schema already exist, can not be changed request")
)

type MatchPolicy struct {
Properties map[string]string `json:"properties,omitempty"`
}

// Attention: request validation must be finished before the following interface being invoked!!!
// MetadataManager contains the CRUD of cache metadata
type MetadataManager interface {
Expand Down Expand Up @@ -93,4 +97,6 @@ type MetadataManager interface {
RetireService(ctx context.Context, plan *RetirePlan) error

Statistics(ctx context.Context, withShared bool) (*pb.Statistics, error)

UpdateManyInstanceStatus(ctx context.Context, match *MatchPolicy, status string) error
}
28 changes: 26 additions & 2 deletions server/resource/disco/instance_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (

"github.com/go-chassis/go-chassis/v2/pkg/codec"

discosvc "github.com/apache/servicecomb-service-center/server/service/disco"

"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/rest"
"github.com/apache/servicecomb-service-center/pkg/util"
discosvc "github.com/apache/servicecomb-service-center/server/service/disco"
pb "github.com/go-chassis/cari/discovery"
)

Expand All @@ -51,6 +51,7 @@ func (s *InstanceResource) URLPatterns() []rest.Route {
{Method: http.MethodPut, Path: "/v4/:project/registry/microservices/:serviceId/instances/:instanceId/heartbeat",
Func: s.SendHeartbeat},
{Method: http.MethodPut, Path: "/v4/:project/registry/heartbeats", Func: s.SendManyHeartbeat},
{Method: http.MethodPut, Path: "/v4/:project/registry/instances/status", Func: s.UpdateManyInstanceStatus},
}
}
func (s *InstanceResource) LegacyRegisterInstance(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -311,3 +312,26 @@ func (s *InstanceResource) PutInstanceProperties(w http.ResponseWriter, r *http.
}
rest.WriteResponse(w, r, nil, nil)
}

func (s *InstanceResource) UpdateManyInstanceStatus(w http.ResponseWriter, r *http.Request) {
request := &UpdateManyInstanceStatusRequest{}
message, _ := io.ReadAll(r.Body)
err := codec.Decode(message, request)
if err != nil {
log.Error(fmt.Sprintf("invalid json: %s", util.BytesToStringWithNoCopy(message)), err)
rest.WriteError(w, pb.ErrInvalidParams, "Unmarshal error")
return
}
err = discosvc.UpdateManyInstanceStatus(r.Context(), &request.Matches, request.Status)
if err != nil {
log.Error("can not update instance properties", err)
rest.WriteServiceError(w, err)
return
}
rest.WriteResponse(w, r, nil, nil)
}

type UpdateManyInstanceStatusRequest struct {
Matches datasource.MatchPolicy `json:"matches,omitempty"`
Status string `json:"status,omitempty"`
}
5 changes: 5 additions & 0 deletions server/service/disco/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,3 +485,8 @@ func InstanceUsage(ctx context.Context, request *pb.GetServiceCountRequest) (int
}
return resp.Count, nil
}

func UpdateManyInstanceStatus(ctx context.Context, match *datasource.MatchPolicy, status string) error {
err := datasource.GetMetadataManager().UpdateManyInstanceStatus(ctx, match, status)
return err
}

0 comments on commit 429da63

Please sign in to comment.