diff --git a/datasource/etcd/ms.go b/datasource/etcd/ms.go index 66cc4908d..a0096989f 100644 --- a/datasource/etcd/ms.go +++ b/datasource/etcd/ms.go @@ -39,6 +39,7 @@ import ( "github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore" esync "github.com/apache/servicecomb-service-center/datasource/etcd/sync" eutil "github.com/apache/servicecomb-service-center/datasource/etcd/util" + serviceUtil "github.com/apache/servicecomb-service-center/datasource/etcd/util" "github.com/apache/servicecomb-service-center/datasource/schema" "github.com/apache/servicecomb-service-center/pkg/log" "github.com/apache/servicecomb-service-center/pkg/util" @@ -1569,3 +1570,55 @@ func (ds *MetadataManager) UnregisterService(ctx context.Context, request *pb.De func (ds *MetadataManager) Statistics(ctx context.Context, withShared bool) (*pb.Statistics, error) { return statistics(ctx, withShared) } + +func (ds *MetadataManager) UpdateManyInstanceStatus(ctx context.Context, match *datasource.MatchPolicy, status string) error { + resp, _ := ds.ListManyInstances(ctx, &pb.GetAllInstancesRequest{}) + instances := resp.Instances + if len(instances) == 0 { + return nil + } + options := make([]etcdadpt.OpOptions, 0) + cmps := make([]etcdadpt.CmpOptions, 0) + + domainProject := util.ParseDomainProject(ctx) + + for _, instance := range instances { + var t = true + for k, v := range match.Properties { + value, ok := instance.Properties[k] + if ok { + if value != v { + t = false + break + } + } else { + t = false + break + } + } + if t { + key := path.GenerateInstanceKey(domainProject, instance.ServiceId, instance.InstanceId) + //更新状态 + instance.Status = status + data, _ := json.Marshal(instance) + leaseID, err := serviceUtil.GetLeaseID(ctx, domainProject, instance.ServiceId, instance.InstanceId) + if err != nil { + log.Error(fmt.Sprintf("get leaseId %s error", instance.InstanceId), err) + continue + } + options = append(options, etcdadpt.Ops(etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data), etcdadpt.WithLease(leaseID)))...) + cmps = append(cmps, etcdadpt.If(etcdadpt.NotEqualVer(path.GenerateServiceKey(domainProject, instance.ServiceId), 0))...) + } + } + _, err := etcdadpt.TxnWithCmp(ctx, + options, + cmps, + nil) + + if err != nil { + log.Error("UpdateManyInstanceStatus error", err) + + return pb.NewError(pb.ErrUnavailableBackend, err.Error()) + } + return nil +} diff --git a/datasource/mongo/ms.go b/datasource/mongo/ms.go index 3fed25803..78451de80 100644 --- a/datasource/mongo/ms.go +++ b/datasource/mongo/ms.go @@ -1677,3 +1677,7 @@ func formatRevision(consumerServiceID string, instances []*discovery.MicroServic func (ds *MetadataManager) Statistics(ctx context.Context, withShared bool) (*discovery.Statistics, error) { return statistics(ctx, withShared) } + +func (ds *MetadataManager) UpdateManyInstanceStatus(ctx context.Context, match *datasource.MatchPolicy, status string) error { + return nil +} diff --git a/datasource/ms.go b/datasource/ms.go index 52a6e29ef..371759c0e 100644 --- a/datasource/ms.go +++ b/datasource/ms.go @@ -38,6 +38,10 @@ var ( ErrModifySchemaNotAllow = errors.New("schema already exist, can not be changed request") ) +type MatchPolicy struct { + Properties map[string]string `json:"properties,omitempty"` +} + // Attention: request validation must be finished before the following interface being invoked!!! // MetadataManager contains the CRUD of cache metadata type MetadataManager interface { @@ -93,4 +97,6 @@ type MetadataManager interface { RetireService(ctx context.Context, plan *RetirePlan) error Statistics(ctx context.Context, withShared bool) (*pb.Statistics, error) + + UpdateManyInstanceStatus(ctx context.Context, match *MatchPolicy, status string) error } diff --git a/server/resource/disco/instance_resource.go b/server/resource/disco/instance_resource.go index 7a0129c87..ec336b467 100644 --- a/server/resource/disco/instance_resource.go +++ b/server/resource/disco/instance_resource.go @@ -25,11 +25,11 @@ import ( "github.com/go-chassis/go-chassis/v2/pkg/codec" - discosvc "github.com/apache/servicecomb-service-center/server/service/disco" - + "github.com/apache/servicecomb-service-center/datasource" "github.com/apache/servicecomb-service-center/pkg/log" "github.com/apache/servicecomb-service-center/pkg/rest" "github.com/apache/servicecomb-service-center/pkg/util" + discosvc "github.com/apache/servicecomb-service-center/server/service/disco" pb "github.com/go-chassis/cari/discovery" ) @@ -51,6 +51,7 @@ func (s *InstanceResource) URLPatterns() []rest.Route { {Method: http.MethodPut, Path: "/v4/:project/registry/microservices/:serviceId/instances/:instanceId/heartbeat", Func: s.SendHeartbeat}, {Method: http.MethodPut, Path: "/v4/:project/registry/heartbeats", Func: s.SendManyHeartbeat}, + {Method: http.MethodPut, Path: "/v4/:project/registry/instances/status", Func: s.UpdateManyInstanceStatus}, } } func (s *InstanceResource) LegacyRegisterInstance(w http.ResponseWriter, r *http.Request) { @@ -311,3 +312,26 @@ func (s *InstanceResource) PutInstanceProperties(w http.ResponseWriter, r *http. } rest.WriteResponse(w, r, nil, nil) } + +func (s *InstanceResource) UpdateManyInstanceStatus(w http.ResponseWriter, r *http.Request) { + request := &UpdateManyInstanceStatusRequest{} + message, _ := io.ReadAll(r.Body) + err := codec.Decode(message, request) + if err != nil { + log.Error(fmt.Sprintf("invalid json: %s", util.BytesToStringWithNoCopy(message)), err) + rest.WriteError(w, pb.ErrInvalidParams, "Unmarshal error") + return + } + err = discosvc.UpdateManyInstanceStatus(r.Context(), &request.Matches, request.Status) + if err != nil { + log.Error("can not update instance properties", err) + rest.WriteServiceError(w, err) + return + } + rest.WriteResponse(w, r, nil, nil) +} + +type UpdateManyInstanceStatusRequest struct { + Matches datasource.MatchPolicy `json:"matches,omitempty"` + Status string `json:"status,omitempty"` +} diff --git a/server/service/disco/instance.go b/server/service/disco/instance.go index 4fa3689e4..45b8cc4de 100644 --- a/server/service/disco/instance.go +++ b/server/service/disco/instance.go @@ -485,3 +485,8 @@ func InstanceUsage(ctx context.Context, request *pb.GetServiceCountRequest) (int } return resp.Count, nil } + +func UpdateManyInstanceStatus(ctx context.Context, match *datasource.MatchPolicy, status string) error { + err := datasource.GetMetadataManager().UpdateManyInstanceStatus(ctx, match, status) + return err +}