Skip to content

Commit

Permalink
feat: 给消息处理通道增加容量,有利于处理大量数据
Browse files Browse the repository at this point in the history
  • Loading branch information
koch3092 committed Apr 9, 2023
1 parent ac7074e commit efc9888
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 4 deletions.
2 changes: 2 additions & 0 deletions initialize/amqp_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ func (am *AmqpManager) StartReceiveMessage(ctx context.Context, sdRcvMsg chan<-
aRcvMsg <- message
err := message.Accept()
if err != nil {
am.Logger.Error(fmt.Sprintf("amqp receive data accept error:%s", err))
childDone()
return
}
am.Logger.Info("amqp receive data accept ok.")
} else {
fmt.Println("amqp receive data error:", err)

Expand Down
2 changes: 2 additions & 0 deletions initialize/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ ProcessMessage:
for {
select {
case message := <-rcvMessage:
h.Logger.Debug(fmt.Sprintf("Save data to be processed: %d", len(rcvMessage)))
var dataType model.DataType
errJson := json.Unmarshal(message.GetData(), &dataType)
if errJson != nil {
Expand Down Expand Up @@ -135,6 +136,7 @@ ProcessMessage:
for {
select {
case message := <-rcvMessage:
h.Logger.Debug(fmt.Sprintf("Measurement data to be processed: %d", len(rcvMessage)))
// 读取公共数据
var tdMetricBase *model.TdMetricBase
errTd := json.Unmarshal(message.GetData(), &tdMetricBase)
Expand Down
2 changes: 1 addition & 1 deletion initialize/orm_tdengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
func InitTdengine() (*sql.DB, error) {
m := global.CONFIG.TDengine
dsn := m.Dsn()
global.Logger.Debug("Open TDengin dsn: " + dsn)
global.Logger.Debug("Open TDengine dsn: " + dsn)
tdengine, err := sql.Open("taosSql", dsn)
if err != nil {
global.Logger.Debug("Open TDengine failed: " + err.Error())
Expand Down
6 changes: 3 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ func main() {
ctx := context.Background()

// 用于转发消息的Channel
sdRcvMsg := make(chan *amqp.Message) // 保存原始数据用
mRcvMsg := make(chan *amqp.Message) // 保存Measurement数据用
aRcvMsg := make(chan *amqp.Message) // 告警用
sdRcvMsg := make(chan *amqp.Message, 200) // 保存原始数据用
mRcvMsg := make(chan *amqp.Message, 200) // 保存Measurement数据用
aRcvMsg := make(chan *amqp.Message, 200) // 告警用

// 阿里云AMQP凭证对象
aliCred := global.CONFIG.AliAmqpCred
Expand Down

0 comments on commit efc9888

Please sign in to comment.