Skip to content

Commit

Permalink
feat(hmq):Provide handler StartUnixSocketClientListening used to hand…
Browse files Browse the repository at this point in the history
…le the Unix communications (#198)

Co-authored-by: wei_lilitw <[email protected]>
  • Loading branch information
xinkonglili and wei_lilitw authored Apr 15, 2024
1 parent 6c75361 commit 8ddca9b
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 16 deletions.
72 changes: 72 additions & 0 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"net"
"net/http"
"os"
"sync"
"time"

Expand Down Expand Up @@ -160,6 +161,11 @@ func (b *Broker) Start() {
go b.StartClientListening(false)
}

//listen client over unix
if b.config.Port == "" && b.config.UnixFilePath != "" {
go b.StartUnixSocketClientListening(b.config.UnixFilePath, true)
}

//listen for cluster
if b.config.Cluster.Port != "" {
go b.StartClusterListening()
Expand Down Expand Up @@ -268,6 +274,60 @@ func (b *Broker) StartClientListening(Tls bool) {
}
}

func (b *Broker) StartUnixSocketClientListening(socketPath string, UnixSocket bool) {
var err error
var l net.Listener
for {
if UnixSocket {
if FileExist(socketPath) {
if err != nil {
log.Error("Remove Unix socketPath ", zap.Error(err))
}
}
conn, _ := net.ResolveUnixAddr("unix", socketPath)
l, err = net.ListenUnix("unix", conn)
log.Info("Start Listening client on Unix socket", zap.String("socketPath", socketPath))
}
if err == nil {
break // successfully listening
}

log.Error("Error listening on ", zap.Error(err))
time.Sleep(1 * time.Second)
}

tmpDelay := 10 * ACCEPT_MIN_SLEEP
for {
conn, err := l.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
log.Error(
"Temporary Client Accept Error(%v), sleeping %dms",
zap.Error(ne),
zap.Duration("sleeping", tmpDelay/time.Millisecond),
)

time.Sleep(tmpDelay)
tmpDelay *= 2
if tmpDelay > ACCEPT_MAX_SLEEP {
tmpDelay = ACCEPT_MAX_SLEEP
}
} else {
log.Error("Accept error", zap.Error(err))
}
continue
}

tmpDelay = ACCEPT_MIN_SLEEP
go func() {
err := b.handleConnection(CLIENT, conn)
if err != nil {
conn.Close()
}
}()
}
}

func (b *Broker) StartClusterListening() {
var hp string = b.config.Cluster.Host + ":" + b.config.Cluster.Port
log.Info("Start Listening cluster on ", zap.String("hp", hp))
Expand Down Expand Up @@ -743,3 +803,15 @@ func (b *Broker) OnlineOfflineNotification(info Info, online bool, lastMsg int64

b.PublishMessage(packet)
}

func FileExist(name string) bool {
_, err := os.Stat(name)
if err == nil {
return true
} else if os.IsNotExist(err) {
return false
} else {
panic(err)
}

}
34 changes: 18 additions & 16 deletions broker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,21 @@ import (
var json = jsoniter.ConfigCompatibleWithStandardLibrary

type Config struct {
Worker int `json:"workerNum"`
HTTPPort string `json:"httpPort"`
Host string `json:"host"`
Port string `json:"port"`
Cluster RouteInfo `json:"cluster"`
Router string `json:"router"`
TlsHost string `json:"tlsHost"`
TlsPort string `json:"tlsPort"`
WsPath string `json:"wsPath"`
WsPort string `json:"wsPort"`
WsTLS bool `json:"wsTLS"`
TlsInfo TLSInfo `json:"tlsInfo"`
Debug bool `json:"debug"`
Plugin Plugins `json:"plugins"`
Worker int `json:"workerNum"`
HTTPPort string `json:"httpPort"`
Host string `json:"host"`
Port string `json:"port"`
Cluster RouteInfo `json:"cluster"`
Router string `json:"router"`
TlsHost string `json:"tlsHost"`
TlsPort string `json:"tlsPort"`
WsPath string `json:"wsPath"`
WsPort string `json:"wsPort"`
WsTLS bool `json:"wsTLS"`
TlsInfo TLSInfo `json:"tlsInfo"`
Debug bool `json:"debug"`
Plugin Plugins `json:"plugins"`
UnixFilePath string `json:"unixFilePath"`
}

type Plugins struct {
Expand Down Expand Up @@ -87,8 +88,9 @@ func ConfigureConfig(args []string) (*Config, error) {
fs.IntVar(&config.Worker, "worker", 1024, "worker num to process message, perfer (client num)/10.")
fs.StringVar(&config.HTTPPort, "httpport", "8080", "Port to listen on.")
fs.StringVar(&config.HTTPPort, "hp", "8080", "Port to listen on.")
fs.StringVar(&config.Port, "port", "1883", "Port to listen on.")
fs.StringVar(&config.Port, "p", "1883", "Port to listen on.")
fs.StringVar(&config.Port, "port", "", "Port to listen on.")
fs.StringVar(&config.Port, "p", "", "Port to listen on.")
fs.StringVar(&config.UnixFilePath, "unixfilepath", "", "unix sock to listen on.")
fs.StringVar(&config.Host, "host", "0.0.0.0", "Network host to listen on")
fs.StringVar(&config.Cluster.Port, "cp", "", "Cluster port from which members can connect.")
fs.StringVar(&config.Cluster.Port, "clusterport", "", "Cluster port from which members can connect.")
Expand Down

0 comments on commit 8ddca9b

Please sign in to comment.