Skip to content

Commit

Permalink
refactor: region association logic of controllers and analyers
Browse files Browse the repository at this point in the history
  • Loading branch information
roryye committed Aug 16, 2024
1 parent 259333c commit d580b10
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 15 deletions.
4 changes: 4 additions & 0 deletions server/controller/controller/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func checkAndStartMasterFunctions(
return
}

regionCheck := monitor.NewRegionCheck(cfg)
vtapCheck := vtap.NewVTapCheck(cfg.MonitorCfg, ctx)
vtapRebalanceCheck := vtap.NewRebalanceCheck(cfg.MonitorCfg, ctx)
vtapLicenseAllocation := license.NewVTapLicenseAllocation(cfg.MonitorCfg, ctx)
Expand Down Expand Up @@ -142,6 +143,9 @@ func checkAndStartMasterFunctions(
// 数据节点检查
analyzerCheck.Start(sCtx)

// region check
regionCheck.Start(sCtx)

// vtap check
vtapCheck.Start(sCtx)

Expand Down
92 changes: 81 additions & 11 deletions server/controller/db/mysql/org.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,35 +77,75 @@ func CheckORGNumberAndLog() ([]int, error) {
return orgIDs, nil
}

type SyncORGConfig struct {
HardDelete bool
WhereCondition []interface{}
ExcludeFields []string
}

type SyncORGConfigOption func(opts *SyncORGConfig)

func WithHardDelete() SyncORGConfigOption {
return func(opts *SyncORGConfig) {
opts.HardDelete = true
}
}

func WithWhereCondition(condition ...interface{}) SyncORGConfigOption {
return func(opts *SyncORGConfig) {
opts.WhereCondition = condition
}
}

func WithExcludeFields(fields []string) SyncORGConfigOption {
return func(opts *SyncORGConfig) {
opts.ExcludeFields = fields
}
}

// SyncDefaultOrgData synchronizes a slice of data items of any type T to all organization databases except the default one.
// It assumes each data item has an "ID" field (with a json tag "ID") serving as the primary key. During upsertion,
// fields are updated based on their "gorm" tags, and empty string values are converted to null in the database.
//
// Parameters:
// - uniqueKey: A unique identifier for the data items.
// - data: A slice of data items of any type T to be synchronized. The type T must have an "ID" field tagged as the primary key.
func SyncDefaultOrgData[T any](data []T, excludeFields []string) error {
func SyncDefaultORGData[T any](uniqueKey string, data []T, options ...SyncORGConfigOption) error {
if len(data) == 0 {
return nil
}

cfg := &SyncORGConfig{}
for _, option := range options {
option(cfg)
}

excludeFieldsMap := make(map[string]bool)
for _, field := range excludeFields {
for _, field := range cfg.ExcludeFields {
excludeFieldsMap[field] = true
}

// get fields to update
dataType := reflect.TypeOf(data[0])
var fields []string
var structFieldName string
for i := 0; i < dataType.NumField(); i++ {
field := dataType.Field(i)
dbTag := field.Tag.Get("gorm")
if dbTag != "" {
columnName := GetColumnNameFromTag(dbTag)
if columnName == uniqueKey {
structFieldName = field.Name
}
if columnName != "" && !excludeFieldsMap[columnName] {
fields = append(fields, columnName)
}
}
}
if structFieldName == "" {
return fmt.Errorf("no struct field found for unique key: %s", uniqueKey)
}
log.Infof("weiqiang fields to update: %v", fields)

orgIDs, err := GetORGIDs()
if err != nil {
Expand All @@ -125,27 +165,57 @@ func SyncDefaultOrgData[T any](data []T, excludeFields []string) error {
db := dbInfo.DB
err = db.Transaction(func(tx *gorm.DB) error {
// delete
var existingIDs []int
var existingKeys []interface{}
var t T
if err := tx.Model(&t).Pluck("id", &existingIDs).Error; err != nil {
query := tx.Model(&t)
if len(cfg.WhereCondition) == 1 {
query = query.Where(cfg.WhereCondition[0])
} else if len(cfg.WhereCondition) > 1 {
query = query.Where(cfg.WhereCondition[0], cfg.WhereCondition[1:]...)
}
if err := query.Model(&t).Pluck(uniqueKey, &existingKeys).Error; err != nil {
return err
}
existingIDMap := make(map[int]bool)
for _, id := range existingIDs {
existingIDMap[id] = true
existingKeyMap := make(map[interface{}]bool)
for _, key := range existingKeys {
switch v := key.(type) {
case []byte:
existingKeyMap[string(v)] = true
default:
existingKeyMap[v] = true
}
}
log.Infof("weiqiang org(%v) existingKeyMap: %v", orgID, existingKeyMap)
for _, item := range data {
id := reflect.ValueOf(item).FieldByName("ID").Int()
existingIDMap[int(id)] = false
val := reflect.ValueOf(item)
field := val.FieldByName(structFieldName)
if !field.IsValid() {
return fmt.Errorf("field %s not found in the struct", structFieldName)
}
key := field.Interface()
switch v := key.(type) {
case []byte:
existingKeyMap[string(v)] = false
default:
existingKeyMap[v] = false
}
}
for id, exists := range existingIDMap {
log.Infof("weiqiang org(%v) existingKeyMap: %v", orgID, existingKeyMap)
for key, exists := range existingKeyMap {
if exists {
if err := tx.Where("id = ?", id).Delete(&t).Error; err != nil {
if cfg.HardDelete {
err = tx.Unscoped().Where(fmt.Sprintf("%s = ?", uniqueKey), key).Delete(&t).Error
} else {
err = tx.Where(fmt.Sprintf("%s = ?", uniqueKey), key).Delete(&t).Error
}
if err != nil {
return err
}
}
}

log.Infof("weiqiang org(%v) save data: %v", orgID, data)

// add or update
if err := tx.Clauses(clause.OnConflict{
DoUpdates: clause.AssignmentColumns(fields), // `UpdateAll: true,` can not update time
Expand Down
9 changes: 8 additions & 1 deletion server/controller/http/router/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,15 @@ func updateAnalyzer(m *monitor.AnalyzerCheck, cfg *config.ControllerConfig) gin.
patchMap := map[string]interface{}{}
c.ShouldBindBodyWith(&patchMap, binding.JSON)

lcuuid := c.Param("lcuuid")
orgID, _ := c.Get(common.HEADER_KEY_X_ORG_ID)
if _, ok := patchMap["REGION"]; ok {
if orgID.(int) != common.DEFAULT_ORG_ID {
StatusForbiddenResponse(c, "only default orgination can modify region")
return
}
}

lcuuid := c.Param("lcuuid")
data, err := service.UpdateAnalyzer(orgID.(int), lcuuid, patchMap, m, cfg)
if err != nil {
err = fmt.Errorf("org id(%d), %s", orgID.(int), err.Error())
Expand Down
9 changes: 8 additions & 1 deletion server/controller/http/router/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,15 @@ func updateController(m *monitor.ControllerCheck, cfg *config.ControllerConfig)
patchMap := map[string]interface{}{}
c.ShouldBindBodyWith(&patchMap, binding.JSON)

lcuuid := c.Param("lcuuid")
orgID, _ := c.Get(common.HEADER_KEY_X_ORG_ID)
if _, ok := patchMap["REGION"]; ok {
if orgID.(int) != common.DEFAULT_ORG_ID {
StatusForbiddenResponse(c, "only default orgination can modify region")
return
}
}

lcuuid := c.Param("lcuuid")
data, err := service.UpdateController(orgID.(int), lcuuid, patchMap, m, cfg)
if err != nil {
err = fmt.Errorf("org id(%d), %s", orgID.(int), err.Error())
Expand Down
11 changes: 11 additions & 0 deletions server/controller/http/router/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,14 @@ func AdminPermissionVerificationMiddleware() gin.HandlerFunc {
ctx.Next()
}
}

func DefaultORGVerificationMiddleware() gin.HandlerFunc {
return func(ctx *gin.Context) {
orgID, _ := ctx.Get(common.HEADER_KEY_X_ORG_ID)
if orgID.(int) != common.DEFAULT_ORG_ID {
routercommon.StatusForbiddenResponse(ctx, fmt.Sprintf("only default orginazation can operate"))
ctx.Abort()
}
ctx.Next()
}
}
2 changes: 2 additions & 0 deletions server/controller/http/service/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ func UpdateController(
// - 增加 az_controller_connection 逻辑
// - addConnAzs = newConnAzs - oldConnAzs
if _, ok := controllerUpdate["REGION"]; ok {


if oldConnAzs.Contains("ALL") {
delConnAzs.Add("ALL")
for _, az := range dbAzs {
Expand Down
4 changes: 3 additions & 1 deletion server/controller/monitor/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,9 @@ func (c *AnalyzerCheck) SyncDefaultOrgData() {
if err := mysql.DefaultDB.Find(&analyzers).Error; err != nil {
log.Error(err)
}
if err := mysql.SyncDefaultOrgData(analyzers, SyncAnalyzerExcludeField); err != nil {
if err := mysql.SyncDefaultORGData("id", analyzers,
mysql.WithExcludeFields(SyncAnalyzerExcludeField),
); err != nil {
log.Error(err)
}
}
4 changes: 3 additions & 1 deletion server/controller/monitor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,9 @@ func (c *ControllerCheck) SyncDefaultOrgData() {
if err := mysql.DefaultDB.Find(&controllers).Error; err != nil {
log.Error(err)
}
if err := mysql.SyncDefaultOrgData(controllers, SyncControllerExcludeField); err != nil {
if err := mysql.SyncDefaultORGData("id", controllers,
mysql.WithExcludeFields(SyncControllerExcludeField),
); err != nil {
log.Error(err)
}
}
69 changes: 69 additions & 0 deletions server/controller/monitor/region.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2024 Yunshan Networks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monitor

import (
"context"
"time"

"github.com/deepflowio/deepflow/server/controller/common"
"github.com/deepflowio/deepflow/server/controller/config"
"github.com/deepflowio/deepflow/server/controller/db/mysql"
mconfig "github.com/deepflowio/deepflow/server/controller/monitor/config"
)

type RegionCheck struct {
cfg mconfig.MonitorConfig
}

func NewRegionCheck(cfg *config.ControllerConfig) *RegionCheck {
return &RegionCheck{
cfg: cfg.MonitorCfg,
}
}

func (c *RegionCheck) Start(ctx context.Context) {
log.Info("region check start")
go func() {
ticker := time.NewTicker(time.Duration(c.cfg.SyncDefaultORGDataInterval) * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
c.SyncDefaultOrgData()
case <-ctx.Done():
return
}
}
}()
}

func (c *RegionCheck) SyncDefaultOrgData() {
log.Infof("weiqiang sync default org data")
var regions []mysql.Region
if err := mysql.DefaultDB.Where("create_method = ?", common.CREATE_METHOD_USER_DEFINE).Find(&regions).Error; err != nil {
log.Error(err)
}

if err := mysql.SyncDefaultORGData("name", regions,
mysql.WithHardDelete(),
mysql.WithWhereCondition("create_method = ?", common.CREATE_METHOD_USER_DEFINE),
); err != nil {
log.Error(err)
}
}

0 comments on commit d580b10

Please sign in to comment.