Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: region association logic of controllers and analyers #7810

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions server/controller/controller/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func checkAndStartMasterFunctions(
return
}

regionCheck := monitor.NewRegionCheck(cfg)
vtapCheck := vtap.NewVTapCheck(cfg.MonitorCfg, ctx)
vtapRebalanceCheck := vtap.NewRebalanceCheck(cfg.MonitorCfg, ctx)
vtapLicenseAllocation := license.NewVTapLicenseAllocation(cfg.MonitorCfg, ctx)
Expand Down Expand Up @@ -142,6 +143,9 @@ func checkAndStartMasterFunctions(
// 数据节点检查
analyzerCheck.Start(sCtx)

// region check
regionCheck.Start(sCtx)

// vtap check
vtapCheck.Start(sCtx)

Expand Down
92 changes: 81 additions & 11 deletions server/controller/db/mysql/org.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,35 +77,75 @@ func CheckORGNumberAndLog() ([]int, error) {
return orgIDs, nil
}

type SyncORGConfig struct {
HardDelete bool
WhereCondition []interface{}
ExcludeFields []string
}

type SyncORGConfigOption func(opts *SyncORGConfig)

func WithHardDelete() SyncORGConfigOption {
return func(opts *SyncORGConfig) {
opts.HardDelete = true
}
}

func WithWhereCondition(condition ...interface{}) SyncORGConfigOption {
return func(opts *SyncORGConfig) {
opts.WhereCondition = condition
}
}

func WithExcludeFields(fields []string) SyncORGConfigOption {
return func(opts *SyncORGConfig) {
opts.ExcludeFields = fields
}
}

// SyncDefaultOrgData synchronizes a slice of data items of any type T to all organization databases except the default one.
// It assumes each data item has an "ID" field (with a json tag "ID") serving as the primary key. During upsertion,
// fields are updated based on their "gorm" tags, and empty string values are converted to null in the database.
//
// Parameters:
// - uniqueKey: A unique identifier for the data items.
// - data: A slice of data items of any type T to be synchronized. The type T must have an "ID" field tagged as the primary key.
func SyncDefaultOrgData[T any](data []T, excludeFields []string) error {
func SyncDefaultORGData[T any](uniqueKey string, data []T, options ...SyncORGConfigOption) error {
if len(data) == 0 {
return nil
}

cfg := &SyncORGConfig{}
for _, option := range options {
option(cfg)
}

excludeFieldsMap := make(map[string]bool)
for _, field := range excludeFields {
for _, field := range cfg.ExcludeFields {
excludeFieldsMap[field] = true
}

// get fields to update
dataType := reflect.TypeOf(data[0])
var fields []string
var structFieldName string
for i := 0; i < dataType.NumField(); i++ {
field := dataType.Field(i)
dbTag := field.Tag.Get("gorm")
if dbTag != "" {
columnName := GetColumnNameFromTag(dbTag)
if columnName == uniqueKey {
structFieldName = field.Name
}
if columnName != "" && !excludeFieldsMap[columnName] {
fields = append(fields, columnName)
}
}
}
if structFieldName == "" {
return fmt.Errorf("no struct field found for unique key: %s", uniqueKey)
}
log.Infof("weiqiang fields to update: %v", fields)

orgIDs, err := GetORGIDs()
if err != nil {
Expand All @@ -125,27 +165,57 @@ func SyncDefaultOrgData[T any](data []T, excludeFields []string) error {
db := dbInfo.DB
err = db.Transaction(func(tx *gorm.DB) error {
// delete
var existingIDs []int
var existingKeys []interface{}
var t T
if err := tx.Model(&t).Pluck("id", &existingIDs).Error; err != nil {
query := tx.Model(&t)
if len(cfg.WhereCondition) == 1 {
query = query.Where(cfg.WhereCondition[0])
} else if len(cfg.WhereCondition) > 1 {
query = query.Where(cfg.WhereCondition[0], cfg.WhereCondition[1:]...)
}
if err := query.Model(&t).Pluck(uniqueKey, &existingKeys).Error; err != nil {
return err
}
existingIDMap := make(map[int]bool)
for _, id := range existingIDs {
existingIDMap[id] = true
existingKeyMap := make(map[interface{}]bool)
for _, key := range existingKeys {
switch v := key.(type) {
case []byte:
existingKeyMap[string(v)] = true
default:
existingKeyMap[v] = true
}
}
log.Infof("weiqiang org(%v) existingKeyMap: %v", orgID, existingKeyMap)
for _, item := range data {
id := reflect.ValueOf(item).FieldByName("ID").Int()
existingIDMap[int(id)] = false
val := reflect.ValueOf(item)
field := val.FieldByName(structFieldName)
if !field.IsValid() {
return fmt.Errorf("field %s not found in the struct", structFieldName)
}
key := field.Interface()
switch v := key.(type) {
case []byte:
existingKeyMap[string(v)] = false
default:
existingKeyMap[v] = false
}
}
for id, exists := range existingIDMap {
log.Infof("weiqiang org(%v) existingKeyMap: %v", orgID, existingKeyMap)
for key, exists := range existingKeyMap {
if exists {
if err := tx.Where("id = ?", id).Delete(&t).Error; err != nil {
if cfg.HardDelete {
err = tx.Unscoped().Where(fmt.Sprintf("%s = ?", uniqueKey), key).Delete(&t).Error
} else {
err = tx.Where(fmt.Sprintf("%s = ?", uniqueKey), key).Delete(&t).Error
}
if err != nil {
return err
}
}
}

log.Infof("weiqiang org(%v) save data: %v", orgID, data)

// add or update
if err := tx.Clauses(clause.OnConflict{
DoUpdates: clause.AssignmentColumns(fields), // `UpdateAll: true,` can not update time
Expand Down
9 changes: 8 additions & 1 deletion server/controller/http/router/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,15 @@ func updateAnalyzer(m *monitor.AnalyzerCheck, cfg *config.ControllerConfig) gin.
patchMap := map[string]interface{}{}
c.ShouldBindBodyWith(&patchMap, binding.JSON)

lcuuid := c.Param("lcuuid")
orgID, _ := c.Get(common.HEADER_KEY_X_ORG_ID)
if _, ok := patchMap["REGION"]; ok {
if orgID.(int) != common.DEFAULT_ORG_ID {
StatusForbiddenResponse(c, "only default orgination can modify region")
return
}
}

lcuuid := c.Param("lcuuid")
data, err := service.UpdateAnalyzer(orgID.(int), lcuuid, patchMap, m, cfg)
if err != nil {
err = fmt.Errorf("org id(%d), %s", orgID.(int), err.Error())
Expand Down
9 changes: 8 additions & 1 deletion server/controller/http/router/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,15 @@ func updateController(m *monitor.ControllerCheck, cfg *config.ControllerConfig)
patchMap := map[string]interface{}{}
c.ShouldBindBodyWith(&patchMap, binding.JSON)

lcuuid := c.Param("lcuuid")
orgID, _ := c.Get(common.HEADER_KEY_X_ORG_ID)
if _, ok := patchMap["REGION"]; ok {
if orgID.(int) != common.DEFAULT_ORG_ID {
StatusForbiddenResponse(c, "only default orgination can modify region")
return
}
}

lcuuid := c.Param("lcuuid")
data, err := service.UpdateController(orgID.(int), lcuuid, patchMap, m, cfg)
if err != nil {
err = fmt.Errorf("org id(%d), %s", orgID.(int), err.Error())
Expand Down
11 changes: 11 additions & 0 deletions server/controller/http/router/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,14 @@ func AdminPermissionVerificationMiddleware() gin.HandlerFunc {
ctx.Next()
}
}

func DefaultORGVerificationMiddleware() gin.HandlerFunc {
return func(ctx *gin.Context) {
orgID, _ := ctx.Get(common.HEADER_KEY_X_ORG_ID)
if orgID.(int) != common.DEFAULT_ORG_ID {
routercommon.StatusForbiddenResponse(ctx, fmt.Sprintf("only default orginazation can operate"))
ctx.Abort()
}
ctx.Next()
}
}
2 changes: 2 additions & 0 deletions server/controller/http/service/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ func UpdateController(
// - 增加 az_controller_connection 逻辑
// - addConnAzs = newConnAzs - oldConnAzs
if _, ok := controllerUpdate["REGION"]; ok {


if oldConnAzs.Contains("ALL") {
delConnAzs.Add("ALL")
for _, az := range dbAzs {
Expand Down
4 changes: 3 additions & 1 deletion server/controller/monitor/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,9 @@ func (c *AnalyzerCheck) SyncDefaultOrgData() {
if err := mysql.DefaultDB.Find(&analyzers).Error; err != nil {
log.Error(err)
}
if err := mysql.SyncDefaultOrgData(analyzers, SyncAnalyzerExcludeField); err != nil {
if err := mysql.SyncDefaultORGData("id", analyzers,
mysql.WithExcludeFields(SyncAnalyzerExcludeField),
); err != nil {
log.Error(err)
}
}
4 changes: 3 additions & 1 deletion server/controller/monitor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,9 @@ func (c *ControllerCheck) SyncDefaultOrgData() {
if err := mysql.DefaultDB.Find(&controllers).Error; err != nil {
log.Error(err)
}
if err := mysql.SyncDefaultOrgData(controllers, SyncControllerExcludeField); err != nil {
if err := mysql.SyncDefaultORGData("id", controllers,
mysql.WithExcludeFields(SyncControllerExcludeField),
); err != nil {
log.Error(err)
}
}
69 changes: 69 additions & 0 deletions server/controller/monitor/region.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2024 Yunshan Networks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monitor

import (
"context"
"time"

"github.com/deepflowio/deepflow/server/controller/common"
"github.com/deepflowio/deepflow/server/controller/config"
"github.com/deepflowio/deepflow/server/controller/db/mysql"
mconfig "github.com/deepflowio/deepflow/server/controller/monitor/config"
)

type RegionCheck struct {
cfg mconfig.MonitorConfig
}

func NewRegionCheck(cfg *config.ControllerConfig) *RegionCheck {
return &RegionCheck{
cfg: cfg.MonitorCfg,
}
}

func (c *RegionCheck) Start(ctx context.Context) {
log.Info("region check start")
go func() {
ticker := time.NewTicker(time.Duration(c.cfg.SyncDefaultORGDataInterval) * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
c.SyncDefaultOrgData()
case <-ctx.Done():
return
}
}
}()
}

func (c *RegionCheck) SyncDefaultOrgData() {
log.Infof("weiqiang sync default org data")
var regions []mysql.Region
if err := mysql.DefaultDB.Where("create_method = ?", common.CREATE_METHOD_USER_DEFINE).Find(&regions).Error; err != nil {
log.Error(err)
}

if err := mysql.SyncDefaultORGData("name", regions,
mysql.WithHardDelete(),
mysql.WithWhereCondition("create_method = ?", common.CREATE_METHOD_USER_DEFINE),
); err != nil {
log.Error(err)
}
}
Loading