diff --git a/server/controller/controller/master.go b/server/controller/controller/master.go index ce695f87668..d5dc0a728fe 100644 --- a/server/controller/controller/master.go +++ b/server/controller/controller/master.go @@ -93,6 +93,7 @@ func checkAndStartMasterFunctions( return } + regionCheck := monitor.NewRegionCheck(cfg) vtapCheck := vtap.NewVTapCheck(cfg.MonitorCfg, ctx) vtapRebalanceCheck := vtap.NewRebalanceCheck(cfg.MonitorCfg, ctx) vtapLicenseAllocation := license.NewVTapLicenseAllocation(cfg.MonitorCfg, ctx) @@ -142,6 +143,9 @@ func checkAndStartMasterFunctions( // 数据节点检查 analyzerCheck.Start(sCtx) + // region check + regionCheck.Start(sCtx) + // vtap check vtapCheck.Start(sCtx) diff --git a/server/controller/db/mysql/org.go b/server/controller/db/mysql/org.go index 2a461325984..b0edce3d343 100644 --- a/server/controller/db/mysql/org.go +++ b/server/controller/db/mysql/org.go @@ -77,35 +77,75 @@ func CheckORGNumberAndLog() ([]int, error) { return orgIDs, nil } +type SyncORGConfig struct { + HardDelete bool + WhereCondition []interface{} + ExcludeFields []string +} + +type SyncORGConfigOption func(opts *SyncORGConfig) + +func WithHardDelete() SyncORGConfigOption { + return func(opts *SyncORGConfig) { + opts.HardDelete = true + } +} + +func WithWhereCondition(condition ...interface{}) SyncORGConfigOption { + return func(opts *SyncORGConfig) { + opts.WhereCondition = condition + } +} + +func WithExcludeFields(fields []string) SyncORGConfigOption { + return func(opts *SyncORGConfig) { + opts.ExcludeFields = fields + } +} + // SyncDefaultOrgData synchronizes a slice of data items of any type T to all organization databases except the default one. // It assumes each data item has an "ID" field (with a json tag "ID") serving as the primary key. During upsertion, // fields are updated based on their "gorm" tags, and empty string values are converted to null in the database. // // Parameters: +// - uniqueKey: A unique identifier for the data items. // - data: A slice of data items of any type T to be synchronized. The type T must have an "ID" field tagged as the primary key. -func SyncDefaultOrgData[T any](data []T, excludeFields []string) error { +func SyncDefaultORGData[T any](uniqueKey string, data []T, options ...SyncORGConfigOption) error { if len(data) == 0 { return nil } + cfg := &SyncORGConfig{} + for _, option := range options { + option(cfg) + } + excludeFieldsMap := make(map[string]bool) - for _, field := range excludeFields { + for _, field := range cfg.ExcludeFields { excludeFieldsMap[field] = true } // get fields to update dataType := reflect.TypeOf(data[0]) var fields []string + var structFieldName string for i := 0; i < dataType.NumField(); i++ { field := dataType.Field(i) dbTag := field.Tag.Get("gorm") if dbTag != "" { columnName := GetColumnNameFromTag(dbTag) + if columnName == uniqueKey { + structFieldName = field.Name + } if columnName != "" && !excludeFieldsMap[columnName] { fields = append(fields, columnName) } } } + if structFieldName == "" { + return fmt.Errorf("no struct field found for unique key: %s", uniqueKey) + } + log.Infof("weiqiang fields to update: %v", fields) orgIDs, err := GetORGIDs() if err != nil { @@ -125,27 +165,57 @@ func SyncDefaultOrgData[T any](data []T, excludeFields []string) error { db := dbInfo.DB err = db.Transaction(func(tx *gorm.DB) error { // delete - var existingIDs []int + var existingKeys []interface{} var t T - if err := tx.Model(&t).Pluck("id", &existingIDs).Error; err != nil { + query := tx.Model(&t) + if len(cfg.WhereCondition) == 1 { + query = query.Where(cfg.WhereCondition[0]) + } else if len(cfg.WhereCondition) > 1 { + query = query.Where(cfg.WhereCondition[0], cfg.WhereCondition[1:]...) + } + if err := query.Model(&t).Pluck(uniqueKey, &existingKeys).Error; err != nil { return err } - existingIDMap := make(map[int]bool) - for _, id := range existingIDs { - existingIDMap[id] = true + existingKeyMap := make(map[interface{}]bool) + for _, key := range existingKeys { + switch v := key.(type) { + case []byte: + existingKeyMap[string(v)] = true + default: + existingKeyMap[v] = true + } } + log.Infof("weiqiang org(%v) existingKeyMap: %v", orgID, existingKeyMap) for _, item := range data { - id := reflect.ValueOf(item).FieldByName("ID").Int() - existingIDMap[int(id)] = false + val := reflect.ValueOf(item) + field := val.FieldByName(structFieldName) + if !field.IsValid() { + return fmt.Errorf("field %s not found in the struct", structFieldName) + } + key := field.Interface() + switch v := key.(type) { + case []byte: + existingKeyMap[string(v)] = false + default: + existingKeyMap[v] = false + } } - for id, exists := range existingIDMap { + log.Infof("weiqiang org(%v) existingKeyMap: %v", orgID, existingKeyMap) + for key, exists := range existingKeyMap { if exists { - if err := tx.Where("id = ?", id).Delete(&t).Error; err != nil { + if cfg.HardDelete { + err = tx.Unscoped().Where(fmt.Sprintf("%s = ?", uniqueKey), key).Delete(&t).Error + } else { + err = tx.Where(fmt.Sprintf("%s = ?", uniqueKey), key).Delete(&t).Error + } + if err != nil { return err } } } + log.Infof("weiqiang org(%v) save data: %v", orgID, data) + // add or update if err := tx.Clauses(clause.OnConflict{ DoUpdates: clause.AssignmentColumns(fields), // `UpdateAll: true,` can not update time diff --git a/server/controller/http/router/analyzer.go b/server/controller/http/router/analyzer.go index 26516623605..cb89eeded33 100644 --- a/server/controller/http/router/analyzer.go +++ b/server/controller/http/router/analyzer.go @@ -109,8 +109,15 @@ func updateAnalyzer(m *monitor.AnalyzerCheck, cfg *config.ControllerConfig) gin. patchMap := map[string]interface{}{} c.ShouldBindBodyWith(&patchMap, binding.JSON) - lcuuid := c.Param("lcuuid") orgID, _ := c.Get(common.HEADER_KEY_X_ORG_ID) + if _, ok := patchMap["REGION"]; ok { + if orgID.(int) != common.DEFAULT_ORG_ID { + StatusForbiddenResponse(c, "only default orgination can modify region") + return + } + } + + lcuuid := c.Param("lcuuid") data, err := service.UpdateAnalyzer(orgID.(int), lcuuid, patchMap, m, cfg) if err != nil { err = fmt.Errorf("org id(%d), %s", orgID.(int), err.Error()) diff --git a/server/controller/http/router/controller.go b/server/controller/http/router/controller.go index 55cc3cbc691..3c211078d80 100644 --- a/server/controller/http/router/controller.go +++ b/server/controller/http/router/controller.go @@ -120,8 +120,15 @@ func updateController(m *monitor.ControllerCheck, cfg *config.ControllerConfig) patchMap := map[string]interface{}{} c.ShouldBindBodyWith(&patchMap, binding.JSON) - lcuuid := c.Param("lcuuid") orgID, _ := c.Get(common.HEADER_KEY_X_ORG_ID) + if _, ok := patchMap["REGION"]; ok { + if orgID.(int) != common.DEFAULT_ORG_ID { + StatusForbiddenResponse(c, "only default orgination can modify region") + return + } + } + + lcuuid := c.Param("lcuuid") data, err := service.UpdateController(orgID.(int), lcuuid, patchMap, m, cfg) if err != nil { err = fmt.Errorf("org id(%d), %s", orgID.(int), err.Error()) diff --git a/server/controller/http/router/middleware.go b/server/controller/http/router/middleware.go index 1b32264a00d..5e5b817dcbd 100644 --- a/server/controller/http/router/middleware.go +++ b/server/controller/http/router/middleware.go @@ -35,3 +35,14 @@ func AdminPermissionVerificationMiddleware() gin.HandlerFunc { ctx.Next() } } + +func DefaultORGVerificationMiddleware() gin.HandlerFunc { + return func(ctx *gin.Context) { + orgID, _ := ctx.Get(common.HEADER_KEY_X_ORG_ID) + if orgID.(int) != common.DEFAULT_ORG_ID { + routercommon.StatusForbiddenResponse(ctx, fmt.Sprintf("only default orginazation can operate")) + ctx.Abort() + } + ctx.Next() + } +} diff --git a/server/controller/http/service/controller.go b/server/controller/http/service/controller.go index 62058f59fa1..e171bd5d66b 100644 --- a/server/controller/http/service/controller.go +++ b/server/controller/http/service/controller.go @@ -334,6 +334,8 @@ func UpdateController( // - 增加 az_controller_connection 逻辑 // - addConnAzs = newConnAzs - oldConnAzs if _, ok := controllerUpdate["REGION"]; ok { + + if oldConnAzs.Contains("ALL") { delConnAzs.Add("ALL") for _, az := range dbAzs { diff --git a/server/controller/monitor/analyzer.go b/server/controller/monitor/analyzer.go index cf5202f5f8f..2ddabe10d13 100644 --- a/server/controller/monitor/analyzer.go +++ b/server/controller/monitor/analyzer.go @@ -461,7 +461,9 @@ func (c *AnalyzerCheck) SyncDefaultOrgData() { if err := mysql.DefaultDB.Find(&analyzers).Error; err != nil { log.Error(err) } - if err := mysql.SyncDefaultOrgData(analyzers, SyncAnalyzerExcludeField); err != nil { + if err := mysql.SyncDefaultORGData("id", analyzers, + mysql.WithExcludeFields(SyncAnalyzerExcludeField), + ); err != nil { log.Error(err) } } diff --git a/server/controller/monitor/controller.go b/server/controller/monitor/controller.go index 38c520e6706..385edefabc5 100644 --- a/server/controller/monitor/controller.go +++ b/server/controller/monitor/controller.go @@ -462,7 +462,9 @@ func (c *ControllerCheck) SyncDefaultOrgData() { if err := mysql.DefaultDB.Find(&controllers).Error; err != nil { log.Error(err) } - if err := mysql.SyncDefaultOrgData(controllers, SyncControllerExcludeField); err != nil { + if err := mysql.SyncDefaultORGData("id", controllers, + mysql.WithExcludeFields(SyncControllerExcludeField), + ); err != nil { log.Error(err) } } diff --git a/server/controller/monitor/region.go b/server/controller/monitor/region.go new file mode 100644 index 00000000000..31cdbf76818 --- /dev/null +++ b/server/controller/monitor/region.go @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monitor + +import ( + "context" + "time" + + "github.com/deepflowio/deepflow/server/controller/common" + "github.com/deepflowio/deepflow/server/controller/config" + "github.com/deepflowio/deepflow/server/controller/db/mysql" + mconfig "github.com/deepflowio/deepflow/server/controller/monitor/config" +) + +type RegionCheck struct { + cfg mconfig.MonitorConfig +} + +func NewRegionCheck(cfg *config.ControllerConfig) *RegionCheck { + return &RegionCheck{ + cfg: cfg.MonitorCfg, + } +} + +func (c *RegionCheck) Start(ctx context.Context) { + log.Info("region check start") + go func() { + ticker := time.NewTicker(time.Duration(c.cfg.SyncDefaultORGDataInterval) * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + c.SyncDefaultOrgData() + case <-ctx.Done(): + return + } + } + }() +} + +func (c *RegionCheck) SyncDefaultOrgData() { + log.Infof("weiqiang sync default org data") + var regions []mysql.Region + if err := mysql.DefaultDB.Where("create_method = ?", common.CREATE_METHOD_USER_DEFINE).Find(®ions).Error; err != nil { + log.Error(err) + } + + if err := mysql.SyncDefaultORGData("name", regions, + mysql.WithHardDelete(), + mysql.WithWhereCondition("create_method = ?", common.CREATE_METHOD_USER_DEFINE), + ); err != nil { + log.Error(err) + } +}