-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathmain.go
110 lines (102 loc) · 3.17 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package main
import (
"crypto/tls"
"crypto/x509"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"go_emqx_exhook/conf"
"go_emqx_exhook/emqx.io/grpc/exhook"
"go_emqx_exhook/impl"
"go_emqx_exhook/provider"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"io"
"log"
"net"
"os"
"strings"
)
func main() {
appConf := conf.Config
rule := appConf.BridgeRule
// 创建一个消息提供者
var msgProvider provider.MessageProvider
// 关闭连接
connClose := func(conn io.Closer) {
err := conn.Close()
if err != nil {
log.Fatal(err)
}
}
if strings.EqualFold(appConf.MqType, "Rabbitmq") || strings.EqualFold(appConf.MqType, "rabbitmq") {
rabbit := provider.BuildRabbitmqMessageProvider(appConf.RabbitmqConfig)
defer rabbit.RabbitProducer.Close()
defer connClose(rabbit.RabbitmqConn)
msgProvider = rabbit
} else if strings.EqualFold(appConf.MqType, "RabbitmqStream") || strings.EqualFold(appConf.MqType, "rabbitmqStream") {
rabbitMQStream := provider.BuildRabbitmqStreamMessageProvider(appConf.RabbitmqStreamConfig)
defer connClose(rabbitMQStream.RabbitStreamProducer)
msgProvider = rabbitMQStream
} else if strings.EqualFold(appConf.MqType, "Kafka") || strings.EqualFold(appConf.MqType, "kafka") {
kafka := provider.BuildKafkaMessageProvider(appConf.KafkaConfig)
defer connClose(kafka.KafkaProducer)
msgProvider = kafka
} else if strings.EqualFold(appConf.MqType, "Redis") || strings.EqualFold(appConf.MqType, "redis") {
redisMq := provider.BuildRedisMessageProvider(appConf.RedisConfig)
defer connClose(redisMq.RedisClient)
msgProvider = redisMq
} else {
rmq := provider.BuildRocketmqMessageProvider(appConf.RocketmqConfig)
defer func(p rocketmq.Producer) {
err := p.Shutdown()
if err != nil {
log.Fatal(err)
}
}(rmq.RmqProducer)
msgProvider = rmq
}
ch := make(chan *exhook.Message, appConf.ChanBufferSize)
// 发送方式“ queue or direct ”
if appConf.SendMethod == "queue" {
go Queue(msgProvider, ch)
} else {
go Direct(msgProvider, ch)
}
var grpcServerOptions []grpc.ServerOption
tlsCfg := appConf.Tls
if tlsCfg.Enable {
grpcServerOptions = append(grpcServerOptions, grpc.Creds(getServerCred(tlsCfg)))
}
srv := grpc.NewServer(grpcServerOptions...)
// 注册 emqx 的 exhook grpc 服务
exhook.RegisterHookProviderServer(srv, &impl.HookProviderServerImpl{
SourceTopics: rule.Topics,
Receive: ch,
})
// 监听 指定端口的 tcp 连接
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", appConf.Port))
if err != nil {
log.Fatal(err)
return
}
defer func(lis net.Listener) {
err := lis.Close()
if err != nil {
log.Fatal(err)
}
}(lis)
log.Printf("%s [%s] %s => grpc server listen port : %d \n", appConf.AppName, appConf.SendMethod, appConf.MqType, appConf.Port)
_ = srv.Serve(lis)
}
func getServerCred(tlsCfg conf.TlsConfig) credentials.TransportCredentials {
cert, _ := tls.LoadX509KeyPair(tlsCfg.CertFile, tlsCfg.KeyFile)
certPool := x509.NewCertPool()
ca, _ := os.ReadFile(tlsCfg.CaFile)
certPool.AppendCertsFromPEM(ca)
cred := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: certPool,
})
return cred
}