Skip to content

Commit

Permalink
Merge pull request #693 from WeBankPartners/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
pobu168 authored Jan 13, 2021
2 parents 1d0f17e + 230c13a commit e386cbe
Show file tree
Hide file tree
Showing 34 changed files with 436 additions and 155 deletions.
6 changes: 4 additions & 2 deletions build/conf/archive_mysql_tool.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"user": "{{MONITOR_ARCHIVE_MYSQL_USER}}",
"password": "{{MONITOR_ARCHIVE_MYSQL_PWD}}",
"database_prefix": "prometheus_archive_",
"max_open": 100,
"max_open": 150,
"max_idle": 10,
"timeout": 60
},
Expand All @@ -33,7 +33,9 @@
},
"trans": {
"max_unit_speed": 5,
"five_min_start_day": 90
"five_min_start_day": 90,
"concurrent_insert_num": 100,
"retry_wait_second": 60
},
"http": {
"enable": true,
Expand Down
1 change: 1 addition & 0 deletions monitor-agent/agent_manager/funcs/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func LoadDeployProcess() {
log.Printf("process start error : %v \n", err)
} else {
GlobalProcessMap[p.Guid] = &p
deployGuidStatus[p.Guid] = p.Status
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions monitor-agent/archive_mysql_tool/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"user": "root",
"password": "wecube",
"database_prefix": "prometheus_archive_",
"max_open": 100,
"max_open": 150,
"max_idle": 10,
"timeout": 60
},
Expand All @@ -33,7 +33,9 @@
},
"trans": {
"max_unit_speed": 5,
"five_min_start_day": 90
"five_min_start_day": 90,
"concurrent_insert_num": 100,
"retry_wait_second": 60
},
"http": {
"enable": true,
Expand Down
2 changes: 2 additions & 0 deletions monitor-agent/archive_mysql_tool/funcs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type MonitorConfig struct {
type TransConfig struct {
MaxUnitSpeed int `json:"max_unit_speed"`
FiveMinStartDay int64 `json:"five_min_start_day"`
ConcurrentInsertNum int `json:"concurrent_insert_num"`
RetryWaitSecond int `json:"retry_wait_second"`
}

type HttpConfig struct {
Expand Down
41 changes: 35 additions & 6 deletions monitor-agent/archive_mysql_tool/funcs/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ var (
monitorMysqlEngine *xorm.Engine
databaseSelect string
hostIp string
maxUnitNum int
concurrentInsertNum int
retryWaitSecond int
)

func InitDbEngine(databaseName string) (err error) {
Expand Down Expand Up @@ -66,23 +69,49 @@ func InitMonitorDbEngine() (err error) {

func insertMysql(rows []*ArchiveTable,tableName string) error {
var sqlList []string
var rowCountList []int
tmpCount := 0
sqlString := fmt.Sprintf("INSERT INTO %s VALUES ", tableName)
for i,v := range rows {
tmpCount += 1
sqlString += fmt.Sprintf("('%s','%s','%s',%d,%.3f,%.3f,%.3f,%.3f)", v.Endpoint, v.Metric, v.Tags, v.UnixTime, v.Avg, v.Min, v.Max, v.P95)
if (i+1)%100 == 0 || i == len(rows)-1 {
if (i+1)%concurrentInsertNum == 0 || i == len(rows)-1 {
rowCountList = append(rowCountList, tmpCount)
tmpCount = 0
sqlList = append(sqlList, sqlString)
sqlString = fmt.Sprintf("INSERT INTO %s VALUES ", tableName)
}else{
sqlString += ","
}
}
for _,v := range sqlList {
_,err := mysqlEngine.Exec(v)
if err != nil {
return err
gErrMessage := ""
for sqlIndex, v := range sqlList {
var tmpErr error
for i:=0;i<3;i++ {
_, err := mysqlEngine.Exec(v)
if err != nil {
tmpErr = err
} else {
tmpErr = nil
break
}
if i < 2 {
time.Sleep(time.Duration(retryWaitSecond) * time.Second)
}
}
if tmpErr != nil {
tmpErrorString := tmpErr.Error()
if len(tmpErrorString) > 200 {
tmpErrorString = tmpErrorString[:200]
}
gErrMessage += fmt.Sprintf("fail with rows length:%d error:%s \n", rowCountList[sqlIndex], tmpErrorString)
}
}
return nil
if gErrMessage == "" {
return nil
}else{
return fmt.Errorf(gErrMessage)
}
}

func createTable(start int64,isFiveArchive bool) (err error, tableName string) {
Expand Down
68 changes: 55 additions & 13 deletions monitor-agent/archive_mysql_tool/funcs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,26 @@ package funcs
import (
"log"
"sort"
"sync"
"time"
"fmt"
)

var jobChannelList chan ArchiveActionList

func StartCronJob() {
concurrentInsertNum = 50
if Config().Trans.ConcurrentInsertNum > 0 {
concurrentInsertNum = Config().Trans.ConcurrentInsertNum
}
maxUnitNum = 5
if Config().Trans.MaxUnitSpeed > 0 {
maxUnitNum = Config().Trans.MaxUnitSpeed
}
retryWaitSecond = 60
if Config().Trans.RetryWaitSecond > 0 {
retryWaitSecond = Config().Trans.RetryWaitSecond
}
jobChannelList = make(chan ArchiveActionList, Config().Prometheus.MaxHttpOpen)
go consumeJob()
t,_ := time.Parse("2006-01-02 15:04:05 MST", fmt.Sprintf("%s 00:00:00 CST", time.Now().Format("2006-01-02")))
Expand Down Expand Up @@ -55,25 +68,15 @@ func CreateJob(dateString string) {
log.Printf("try to create table:%s error:%v \n", tableName, err)
return
}
var unitPerJob int
unitCount := 0
for _,v := range MonitorObjList {
unitPerJob += len(v.Metrics)
}
unitPerJob = unitPerJob/Config().Prometheus.MaxHttpOpen
if unitPerJob > Config().Trans.MaxUnitSpeed {
unitPerJob = Config().Trans.MaxUnitSpeed
}
if unitPerJob == 0 {
unitPerJob = 1
}
actionParamObjLength := maxUnitNum*Config().Prometheus.MaxHttpOpen
var actionParamList []*ArchiveActionList
var tmpActionParamObjList []*ArchiveActionParamObj
for _,v := range MonitorObjList {
for _,vv := range v.Metrics {
unitCount++
tmpActionParamObjList = append(tmpActionParamObjList, &ArchiveActionParamObj{Endpoint:v.Endpoint, Metric:vv.Metric, PromQl:vv.PromQl, TableName:tableName, Start:start, End:end})
if unitCount == unitPerJob {
if unitCount == actionParamObjLength {
tmpArchiveActionList := ArchiveActionList{}
for _,vvv := range tmpActionParamObjList {
tmpArchiveActionList = append(tmpArchiveActionList, vvv)
Expand All @@ -84,6 +87,13 @@ func CreateJob(dateString string) {
}
}
}
if len(tmpActionParamObjList) > 0 {
tmpArchiveActionList := ArchiveActionList{}
for _,vvv := range tmpActionParamObjList {
tmpArchiveActionList = append(tmpArchiveActionList, vvv)
}
actionParamList = append(actionParamList, &tmpArchiveActionList)
}
go checkJobStatus()
for _,v := range actionParamList {
jobChannelList <- *v
Expand All @@ -93,7 +103,38 @@ func CreateJob(dateString string) {
func consumeJob() {
for {
param := <- jobChannelList
go archiveAction(param)
if len(param) == 0 {
continue
}
tmpUnixCount := 0
var concurrentJobList []ArchiveActionList
tmpJobList := ArchiveActionList{}
for _,v := range param {
tmpUnixCount ++
tmpJobList = append(tmpJobList, v)
if tmpUnixCount >= maxUnitNum {
concurrentJobList = append(concurrentJobList, tmpJobList)
tmpJobList = ArchiveActionList{}
tmpUnixCount = 0
}
}
if len(tmpJobList) > 0 {
concurrentJobList = append(concurrentJobList, tmpJobList)
}
log.Printf("start consume job,length:%d ,concurrent:%d \n", len(param), len(concurrentJobList))
startTime := time.Now()
wg := sync.WaitGroup{}
for _,job := range concurrentJobList {
wg.Add(1)
go func(jobList ArchiveActionList) {
archiveAction(jobList)
wg.Done()
}(job)
}
wg.Wait()
endTime := time.Now()
useTime := float64(endTime.Sub(startTime).Nanoseconds()) / 1e6
log.Printf("done with consume job,use time: %.3f ms", useTime)
}
}

Expand All @@ -105,6 +146,7 @@ func checkJobStatus() {
log.Printf("archive job done \n")
break
}
time.Sleep(10*time.Second)
}
}

Expand Down
2 changes: 1 addition & 1 deletion monitor-agent/node_exporter/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.10.0(base1.0.0)
1.10.0(base1.0.1)
10 changes: 3 additions & 7 deletions monitor-agent/node_exporter/collector/log_monitor_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,10 @@ func (c *logCollectorObj) getRows(keyword string,value,lastValue float64) []logK
c.Lock.RLock()
for _,v := range c.Rule {
if v.Keyword == keyword {
for _,vv := range v.FetchRow {
if vv.Index > lastValue && vv.Index <= value {
data = append(data, logKeywordFetchObj{Content: vv.Content})
}
//if (nowTimestamp-vv.Timestamp) <= 10 {
// data = append(data, logKeywordFetchObj{Timestamp: vv.Timestamp, Content: vv.Content})
//}
if len(v.FetchRow) == 0 {
continue
}
data = append(data, logKeywordFetchObj{Content: v.FetchRow[len(v.FetchRow)-1].Content})
}
}
c.Lock.RUnlock()
Expand Down
20 changes: 10 additions & 10 deletions monitor-server/api/api.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
package api

import (
"github.com/gin-gonic/gin"
m "github.com/WeBankPartners/open-monitor/monitor-server/models"
"github.com/gin-contrib/cors"
"github.com/WeBankPartners/open-monitor/monitor-server/api/v1/user"
"net/http"
"github.com/swaggo/gin-swagger"
"github.com/swaggo/gin-swagger/swaggerFiles"
"fmt"
"github.com/WeBankPartners/open-monitor/monitor-server/api/v1/dashboard"
"github.com/WeBankPartners/open-monitor/monitor-server/api/v1/agent"
"github.com/WeBankPartners/open-monitor/monitor-server/api/v1/alarm"
"github.com/WeBankPartners/open-monitor/monitor-server/api/v1/dashboard"
"github.com/WeBankPartners/open-monitor/monitor-server/api/v1/user"
_ "github.com/WeBankPartners/open-monitor/monitor-server/docs"
"time"
"github.com/WeBankPartners/open-monitor/monitor-server/middleware/log"
m "github.com/WeBankPartners/open-monitor/monitor-server/models"
"github.com/gin-contrib/cors"
"github.com/gin-gonic/gin"
"github.com/swaggo/gin-swagger"
"github.com/swaggo/gin-swagger/swaggerFiles"
"net/http"
"time"
)

func InitHttpServer(exportAgent bool) {
Expand Down Expand Up @@ -225,6 +225,6 @@ func httpLogHandle() gin.HandlerFunc {
return func(c *gin.Context) {
start := time.Now()
c.Next()
log.Logger.Info("request", log.String("url", c.Request.RequestURI), log.String("method",c.Request.Method), log.Int("code",c.Writer.Status()), log.String("ip",c.ClientIP()), log.Float64("cost_second",time.Now().Sub(start).Seconds()))
log.Logger.Info("request", log.String("url", c.Request.RequestURI), log.String("method", c.Request.Method), log.Int("code", c.Writer.Status()), log.String("operator", c.GetString("operatorName")), log.String("ip", c.ClientIP()), log.Int64("cost_time", time.Now().Sub(start).Milliseconds()))
}
}
27 changes: 13 additions & 14 deletions monitor-server/api/v1/agent/export.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
package agent

import (
"github.com/gin-gonic/gin"
"strings"
"encoding/json"
"fmt"
mid "github.com/WeBankPartners/open-monitor/monitor-server/middleware"
"github.com/WeBankPartners/open-monitor/monitor-server/middleware/log"
m "github.com/WeBankPartners/open-monitor/monitor-server/models"
"net/http"
"fmt"
"github.com/WeBankPartners/open-monitor/monitor-server/services/db"
"github.com/gin-gonic/gin"
"io/ioutil"
"encoding/json"
"github.com/WeBankPartners/open-monitor/monitor-server/api/v1/alarm"
"net/http"
"strconv"
"github.com/WeBankPartners/open-monitor/monitor-server/middleware/log"
"strings"
)

type resultObj struct {
Expand Down Expand Up @@ -263,19 +262,19 @@ func autoAddAppPathConfig(param m.RegisterParamNew, paths string) error {
if hostEndpoint.Id <= 0 {
return fmt.Errorf("Host endpoint with ip:%s can not find,please register this host first ", param.Ip)
}
var businessTables []*m.BusinessMonitorTable
var businessTables []*m.BusinessUpdatePathObj
for _,v := range tmpPathList {
businessTables = append(businessTables, &m.BusinessMonitorTable{EndpointId:hostEndpoint.Id, Path:v, OwnerEndpoint:fmt.Sprintf("%s_%s_%s", param.Name, param.Ip, param.Type)})
businessTables = append(businessTables, &m.BusinessUpdatePathObj{Path:v, OwnerEndpoint:fmt.Sprintf("%s_%s_%s", param.Name, param.Ip, param.Type)})
}
err := db.UpdateAppendBusiness(m.BusinessUpdateDto{EndpointId:hostEndpoint.Id, PathList:[]*m.BusinessUpdatePathObj{}})
err := db.UpdateAppendBusiness(m.BusinessUpdateDto{EndpointId:hostEndpoint.Id, PathList:businessTables})
if err != nil {
log.Logger.Error("Update endpoint business table error", log.Error(err))
return err
}
err = alarm.UpdateNodeExporterBusinessConfig(hostEndpoint.Id)
if err != nil {
log.Logger.Error("Update business config error", log.Error(err))
}
//err = alarm.UpdateNodeExporterBusinessConfig(hostEndpoint.Id)
//if err != nil {
// log.Logger.Error("Update business config error", log.Error(err))
//}
return err
}

Expand Down
Loading

0 comments on commit e386cbe

Please sign in to comment.