-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathsend.go
46 lines (43 loc) · 1.13 KB
/
send.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package main
import (
"go_emqx_exhook/channelx"
"go_emqx_exhook/conf"
"go_emqx_exhook/emqx.io/grpc/exhook"
"go_emqx_exhook/provider"
"log"
"time"
)
// Queue 使用队列
func Queue(producer provider.MessageProvider, ch chan *exhook.Message) {
queue := conf.Config.Queue
// 批量消息队列中
aggregator := channelx.NewAggregator[*exhook.Message](
func(messages []*exhook.Message) error {
producer.BatchSend(messages)
return nil
},
func(option channelx.AggregatorOption[*exhook.Message]) channelx.AggregatorOption[*exhook.Message] {
option.BatchSize = queue.BatchSize
option.Workers = queue.Workers
option.ChannelBufferSize = option.BatchSize * 2
option.LingerTime = time.Duration(queue.LingerTime) * time.Second
log.Printf("channelx option : %v \n", option)
return option
},
)
aggregator.Start()
defer aggregator.SafeStop()
for {
if sourceMessage, ok := <-ch; ok {
aggregator.TryEnqueue(sourceMessage)
}
}
}
// Direct 直接发送
func Direct(producer provider.MessageProvider, ch chan *exhook.Message) {
for {
if sourceMessage, ok := <-ch; ok {
producer.SingleSend(sourceMessage)
}
}
}