Skip to content

Commit

Permalink
perf: improve the performance of ClickHouse table changes under multi…
Browse files Browse the repository at this point in the history
…ple organizations
  • Loading branch information
lzf575 authored and sharang committed Aug 13, 2024
1 parent 938434e commit 539f709
Showing 1 changed file with 92 additions and 37 deletions.
129 changes: 92 additions & 37 deletions server/ingester/ckissu/ckissu.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"regexp"
"strings"
"sync"
"time"

"database/sql"
Expand All @@ -39,7 +40,10 @@ const (
INTERVAL_HOUR = 60
INTERVAL_DAY = 1440
DEFAULT_TTL = 168
RETRY_COUNT = 2
RETRY_COUNT = 1

MAX_JOB_COUNT = 10
MIN_ORG_PER_JOB = 8
)

type Issu struct {
Expand Down Expand Up @@ -703,7 +707,8 @@ func (i *Issu) renameColumnWithAddNewColumn(connect *sql.DB, cr *ColumnRename) e
sql = fmt.Sprintf("ALTER TABLE %s.`%s` update %s=%s WHERE 1",
cr.Db, cr.Table, cr.NewColumnName, cr.OldColumnName)
log.Info("rename copy column: ", sql)
_, err = Exec(connect, sql)
// the returned error value can be ignored
Exec(connect, sql)

return err
}
Expand Down Expand Up @@ -940,6 +945,11 @@ func Exec(connect *sql.DB, sql string) (sql.Result, error) {
result, err := connect.Exec(sql)
retryTimes := RETRY_COUNT
for err != nil && retryTimes > 0 {
if strings.Contains(err.Error(), "already exists") ||
strings.Contains(err.Error(), "does not exist") {
log.Infof("Exec SQL (%s) result: %s", sql, err)
return result, nil
}
log.Warningf("Exec SQL (%s) failed: %s, will retry", sql, err)
time.Sleep(100 * time.Millisecond)
result, err = connect.Exec(sql)
Expand Down Expand Up @@ -973,7 +983,7 @@ func getColumnRenames(columnRenamess []*ColumnRenames) []*ColumnRename {
return renames
}

func (i *Issu) renameColumns(index int, orgIDPrefix string) ([]*ColumnRename, error) {
func (i *Issu) renameColumns(index int, orgIDPrefix string, connect *sql.DB) ([]*ColumnRename, error) {
dones := []*ColumnRename{}
for _, renameColumn := range i.columnRenames {
renameColumn.Db = getOrgDatabase(renameColumn.Db, orgIDPrefix)
Expand All @@ -982,7 +992,7 @@ func (i *Issu) renameColumns(index int, orgIDPrefix string) ([]*ColumnRename, er
continue
}

if err := i.renameColumn(i.Connections[index], renameColumn); err != nil {
if err := i.renameColumn(connect, renameColumn); err != nil {
return dones, err
}
dones = append(dones, renameColumn)
Expand All @@ -991,15 +1001,15 @@ func (i *Issu) renameColumns(index int, orgIDPrefix string) ([]*ColumnRename, er
return dones, nil
}

func (i *Issu) modColumns(index int, orgIDPrefix string) ([]*ColumnMod, error) {
func (i *Issu) modColumns(index int, orgIDPrefix string, connect *sql.DB) ([]*ColumnMod, error) {
dones := []*ColumnMod{}
for _, modColumn := range i.columnMods {
modColumn.Db = getOrgDatabase(modColumn.Db, orgIDPrefix)
version, _ := i.getTableVersion(index, modColumn.Db, modColumn.Table)
if version == common.CK_VERSION {
continue
}
if err := i.modColumn(i.Connections[index], modColumn); err != nil {
if err := i.modColumn(connect, modColumn); err != nil {
return dones, err
}
dones = append(dones, modColumn)
Expand All @@ -1008,34 +1018,34 @@ func (i *Issu) modColumns(index int, orgIDPrefix string) ([]*ColumnMod, error) {
return dones, nil
}

func (i *Issu) dropColumns(index int, orgIDPrefix string) ([]*ColumnDrop, error) {
func (i *Issu) dropColumns(index int, orgIDPrefix string, connect *sql.DB) ([]*ColumnDrop, error) {
dones := []*ColumnDrop{}
for _, dropColumn := range i.columnDrops {
dropColumn.Db = getOrgDatabase(dropColumn.Db, orgIDPrefix)
version, _ := i.getTableVersion(index, dropColumn.Db, dropColumn.Table)
if version == common.CK_VERSION {
continue
}
if err := i.dropColumn(i.Connections[index], dropColumn); err != nil {
if err := i.dropColumn(connect, dropColumn); err != nil {
return dones, err
}
dones = append(dones, dropColumn)
}
return dones, nil
}

func (i *Issu) modTableTTLs(index int, orgIDPrefix string) error {
func (i *Issu) modTableTTLs(index int, orgIDPrefix string, connect *sql.DB) error {
for _, modTTL := range i.modTTLs {
modTTL.Db = getOrgDatabase(modTTL.Db, orgIDPrefix)
version, _ := i.getTableVersion(index, modTTL.Db, modTTL.Table)
if version == common.CK_VERSION {
continue
}
if err := i.modTTL(i.Connections[index], modTTL); err != nil {
if err := i.modTTL(connect, modTTL); err != nil {
log.Error(err)
return err
} else {
if err := i.setTableVersion(i.Connections[index], modTTL.Db, modTTL.Table); err != nil {
if err := i.setTableVersion(connect, modTTL.Db, modTTL.Table); err != nil {
log.Error(err)
return err
}
Expand Down Expand Up @@ -1117,8 +1127,7 @@ func getColumnDatasourceAdds(columnDatasourceAddss []*ColumnDatasourceAdds) []*C
return adds
}

func (i *Issu) addColumns(index int, orgIDPrefix string) ([]*ColumnAdd, error) {
connect := i.Connections[index]
func (i *Issu) addColumns(index int, orgIDPrefix string, connect *sql.DB) ([]*ColumnAdd, error) {
dones := []*ColumnAdd{}
for _, add := range i.columnAdds {
add.Db = getOrgDatabase(add.Db, orgIDPrefix)
Expand Down Expand Up @@ -1154,7 +1163,7 @@ func (i *Issu) addColumns(index int, orgIDPrefix string) ([]*ColumnAdd, error) {
return dones, nil
}

func (i *Issu) addIndexs(index int, orgIDPrefix string) ([]*IndexAdd, error) {
func (i *Issu) addIndexs(index int, orgIDPrefix string, connect *sql.DB) ([]*IndexAdd, error) {
dones := []*IndexAdd{}
for _, add := range i.indexAdds {
add.Db = getOrgDatabase(add.Db, orgIDPrefix)
Expand All @@ -1163,7 +1172,7 @@ func (i *Issu) addIndexs(index int, orgIDPrefix string) ([]*IndexAdd, error) {
log.Infof("db (%s) table (%s) already updated", add.Db, add.Table)
continue
}
if err := i.addIndex(i.Connections[index], add); err != nil {
if err := i.addIndex(connect, add); err != nil {
log.Warningf("db (%s) table (%s) add index failed.err: %s", add.Db, add.Table, err)
continue
}
Expand Down Expand Up @@ -1198,28 +1207,27 @@ func genKey(db, table string) string {
return db + "-" + table
}

func (i *Issu) startOrg(index int, orgIDPrefix string) error {
connect := i.Connections[index]
renames, errRenames := i.renameColumns(index, orgIDPrefix)
func (i *Issu) startOrg(index int, orgIDPrefix string, connect *sql.DB) error {
renames, errRenames := i.renameColumns(index, orgIDPrefix, connect)
if errRenames != nil {
return errRenames
}
mods, errMods := i.modColumns(index, orgIDPrefix)
mods, errMods := i.modColumns(index, orgIDPrefix, connect)
if errMods != nil {
return errMods
}

adds, errAdds := i.addColumns(index, orgIDPrefix)
adds, errAdds := i.addColumns(index, orgIDPrefix, connect)
if errAdds != nil {
return errAdds
}

addIndexs, errAddIndexs := i.addIndexs(index, orgIDPrefix)
addIndexs, errAddIndexs := i.addIndexs(index, orgIDPrefix, connect)
if errAddIndexs != nil {
log.Warning(errAddIndexs)
}

drops, errDrops := i.dropColumns(index, orgIDPrefix)
drops, errDrops := i.dropColumns(index, orgIDPrefix, connect)
if errDrops != nil {
return errDrops
}
Expand Down Expand Up @@ -1270,11 +1278,11 @@ func (i *Issu) startOrg(index int, orgIDPrefix string) error {
}
versionSetted[genKey(cr.Db, cr.Table)] = struct{}{}
}
go i.modTableTTLs(index, orgIDPrefix)
go i.modTableTTLs(index, orgIDPrefix, connect)
return nil
}

func (i *Issu) getOrgIDPrefixs(connect *sql.DB) ([]string, error) {
func (i *Issu) getOrgIDPrefixsWithoutDefault(connect *sql.DB) ([]string, error) {
checkOrgDatabase := "event"
sql := fmt.Sprintf("SELECT name FROM system.databases WHERE name like '%%%s%%'", checkOrgDatabase)
rows, err := Query(connect, sql)
Expand All @@ -1291,6 +1299,9 @@ func (i *Issu) getOrgIDPrefixs(connect *sql.DB) ([]string, error) {
}

orgPrefix, _ := parseOrgDatabase(db)
if orgPrefix == "" {
continue
}
orgIDPrefixs = append(orgIDPrefixs, orgPrefix)
}
return orgIDPrefixs, nil
Expand All @@ -1312,24 +1323,68 @@ func (i *Issu) Start() error {
i.VersionMaps[idx] = m
}

errCount := 0
var err error
orgIDPrefixs := make([][]string, len(i.Connections))
// update default organization databases first
for index, connect := range i.Connections {
orgIDPrefixs, err := i.getOrgIDPrefixs(connect)
err = i.startOrg(index, "", connect)
if err != nil {
log.Error(err)
return err
}
orgIDPrefixs[index], err = i.getOrgIDPrefixsWithoutDefault(connect)
if err != nil {
return fmt.Errorf("get orgIDs failed, err: %s", err)
}
for _, orgIDPrefix := range orgIDPrefixs {
err := i.startOrg(index, orgIDPrefix)
if err != nil {
errCount++
err = fmt.Errorf("orgIDPrefix %s run issu failed, err: %s", orgIDPrefix, err)
log.Error(err)
// if more than 1 Org upgrade fails, or the Default Org upgrade fails, an error will be returned and restarted.
if errCount > 1 || orgIDPrefix == "" {
log.Error(err)
return err
}
}

var wg sync.WaitGroup
for index, prefixes := range orgIDPrefixs {
orgCount := len(prefixes)
if orgCount == 0 {
continue
}

jobNum := orgCount/MIN_ORG_PER_JOB + 1
if jobNum > MAX_JOB_COUNT {
jobNum = MAX_JOB_COUNT
}
orgPerJob := (orgCount + jobNum - 1) / jobNum

errCount := 0
var err error
for j := 0; j < jobNum; j++ {
minIndex := j * orgPerJob
maxIndex := j*orgPerJob + orgPerJob
if maxIndex > orgCount {
maxIndex = orgCount
}

wg.Add(1)
go func(orgPrefixs []string) {
defer wg.Done()
log.Infof("begin ckissu %+v", orgPrefixs)
connect, e := common.NewCKConnection(i.Addrs[index], i.username, i.password)
if err != nil {
err = e
return
}
defer connect.Close()
for _, orgIDPrefix := range orgPrefixs {
if e := i.startOrg(index, orgIDPrefix, connect); e != nil {
err = fmt.Errorf("orgIDPrefix %s run issu failed, err: %s", orgIDPrefix, e)
log.Error(err)
errCount++
}
}
log.Infof("end ckissu %+v", orgPrefixs)
}(prefixes[minIndex:maxIndex])
}
wg.Wait()

// When only 1 non-default organization issu exception occurs, the error is ignored
if errCount > 1 {
return err
}
}

Expand Down

0 comments on commit 539f709

Please sign in to comment.