From a13ea489785a4fa8563f0f69d7d14470243789b9 Mon Sep 17 00:00:00 2001 From: wei_lilitw Date: Mon, 15 Apr 2024 17:45:50 +0800 Subject: [PATCH] feat(hmq):Provide handler StartUnixSocketClientListening used to handle the Unix communications --- broker/broker.go | 72 ++++++++++++++++++++++++++++++++++++++++++++++++ broker/config.go | 34 ++++++++++++----------- 2 files changed, 90 insertions(+), 16 deletions(-) diff --git a/broker/broker.go b/broker/broker.go index 8d51929..be2cd0c 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -7,6 +7,7 @@ import ( "fmt" "net" "net/http" + "os" "sync" "time" @@ -160,6 +161,11 @@ func (b *Broker) Start() { go b.StartClientListening(false) } + //listen client over unix + if b.config.Port == "" && b.config.UnixFilePath != "" { + go b.StartUnixSocketClientListening(b.config.UnixFilePath, true) + } + //listen for cluster if b.config.Cluster.Port != "" { go b.StartClusterListening() @@ -268,6 +274,60 @@ func (b *Broker) StartClientListening(Tls bool) { } } +func (b *Broker) StartUnixSocketClientListening(socketPath string, UnixSocket bool) { + var err error + var l net.Listener + for { + if UnixSocket { + if FileExist(socketPath) { + if err != nil { + log.Error("Remove Unix socketPath ", zap.Error(err)) + } + } + conn, _ := net.ResolveUnixAddr("unix", socketPath) + l, err = net.ListenUnix("unix", conn) + log.Info("Start Listening client on Unix socket", zap.String("socketPath", socketPath)) + } + if err == nil { + break // successfully listening + } + + log.Error("Error listening on ", zap.Error(err)) + time.Sleep(1 * time.Second) + } + + tmpDelay := 10 * ACCEPT_MIN_SLEEP + for { + conn, err := l.Accept() + if err != nil { + if ne, ok := err.(net.Error); ok && ne.Temporary() { + log.Error( + "Temporary Client Accept Error(%v), sleeping %dms", + zap.Error(ne), + zap.Duration("sleeping", tmpDelay/time.Millisecond), + ) + + time.Sleep(tmpDelay) + tmpDelay *= 2 + if tmpDelay > ACCEPT_MAX_SLEEP { + tmpDelay = ACCEPT_MAX_SLEEP + } + } else { + log.Error("Accept error", zap.Error(err)) + } + continue + } + + tmpDelay = ACCEPT_MIN_SLEEP + go func() { + err := b.handleConnection(CLIENT, conn) + if err != nil { + conn.Close() + } + }() + } +} + func (b *Broker) StartClusterListening() { var hp string = b.config.Cluster.Host + ":" + b.config.Cluster.Port log.Info("Start Listening cluster on ", zap.String("hp", hp)) @@ -743,3 +803,15 @@ func (b *Broker) OnlineOfflineNotification(info Info, online bool, lastMsg int64 b.PublishMessage(packet) } + +func FileExist(name string) bool { + _, err := os.Stat(name) + if err == nil { + return true + } else if os.IsNotExist(err) { + return false + } else { + panic(err) + } + +} diff --git a/broker/config.go b/broker/config.go index 2aa22e2..9bec4df 100644 --- a/broker/config.go +++ b/broker/config.go @@ -19,20 +19,21 @@ import ( var json = jsoniter.ConfigCompatibleWithStandardLibrary type Config struct { - Worker int `json:"workerNum"` - HTTPPort string `json:"httpPort"` - Host string `json:"host"` - Port string `json:"port"` - Cluster RouteInfo `json:"cluster"` - Router string `json:"router"` - TlsHost string `json:"tlsHost"` - TlsPort string `json:"tlsPort"` - WsPath string `json:"wsPath"` - WsPort string `json:"wsPort"` - WsTLS bool `json:"wsTLS"` - TlsInfo TLSInfo `json:"tlsInfo"` - Debug bool `json:"debug"` - Plugin Plugins `json:"plugins"` + Worker int `json:"workerNum"` + HTTPPort string `json:"httpPort"` + Host string `json:"host"` + Port string `json:"port"` + Cluster RouteInfo `json:"cluster"` + Router string `json:"router"` + TlsHost string `json:"tlsHost"` + TlsPort string `json:"tlsPort"` + WsPath string `json:"wsPath"` + WsPort string `json:"wsPort"` + WsTLS bool `json:"wsTLS"` + TlsInfo TLSInfo `json:"tlsInfo"` + Debug bool `json:"debug"` + Plugin Plugins `json:"plugins"` + UnixFilePath string `json:"unixFilePath"` } type Plugins struct { @@ -87,8 +88,9 @@ func ConfigureConfig(args []string) (*Config, error) { fs.IntVar(&config.Worker, "worker", 1024, "worker num to process message, perfer (client num)/10.") fs.StringVar(&config.HTTPPort, "httpport", "8080", "Port to listen on.") fs.StringVar(&config.HTTPPort, "hp", "8080", "Port to listen on.") - fs.StringVar(&config.Port, "port", "1883", "Port to listen on.") - fs.StringVar(&config.Port, "p", "1883", "Port to listen on.") + fs.StringVar(&config.Port, "port", "", "Port to listen on.") + fs.StringVar(&config.Port, "p", "", "Port to listen on.") + fs.StringVar(&config.UnixFilePath, "unixfilepath", "", "unix sock to listen on.") fs.StringVar(&config.Host, "host", "0.0.0.0", "Network host to listen on") fs.StringVar(&config.Cluster.Port, "cp", "", "Cluster port from which members can connect.") fs.StringVar(&config.Cluster.Port, "clusterport", "", "Cluster port from which members can connect.")