Skip to content

Commit

Permalink
feat(hmq):add windows pipe socket.Use the open source project npipe, …
Browse files Browse the repository at this point in the history
…which nicely encapsulates the operations of windows pipe and returns the connection type net.Conn.
  • Loading branch information
wei_lilitw committed Apr 18, 2024
1 parent 2ceb61a commit d504645
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 17 deletions.
60 changes: 58 additions & 2 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
encJson "encoding/json"
"errors"
"fmt"
"github.com/natefinch/npipe"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -165,6 +166,10 @@ func (b *Broker) Start() {
if b.config.Port == "" && b.config.UnixFilePath != "" {
go b.StartUnixSocketClientListening(b.config.UnixFilePath, true)
}
//listen client over windows pipe
if b.config.Port == "" && b.config.UnixFilePath == "" && b.config.WindowsPipeName != "" {
go b.StartPipeSocketListening(b.config.WindowsPipeName, true)
}

//listen for cluster
if b.config.Cluster.Port != "" {
Expand Down Expand Up @@ -274,11 +279,11 @@ func (b *Broker) StartClientListening(Tls bool) {
}
}

func (b *Broker) StartUnixSocketClientListening(socketPath string, UnixSocket bool) {
func (b *Broker) StartUnixSocketClientListening(socketPath string, unixSocket bool) {
var err error
var l net.Listener
for {
if UnixSocket {
if unixSocket {
if FileExist(socketPath) {
if err != nil {
log.Error("Remove Unix socketPath ", zap.Error(err))
Expand Down Expand Up @@ -328,6 +333,57 @@ func (b *Broker) StartUnixSocketClientListening(socketPath string, UnixSocket bo
}
}

// StartPipeSocketListening We use the open source npipe library to support pipe communication in windows
func (b *Broker) StartPipeSocketListening(pipeName string, usePipe bool) {
var err error
var ln *npipe.PipeListener

Check failure on line 339 in broker/broker.go

View workflow job for this annotation

GitHub Actions / build

undefined: npipe.PipeListener

Check failure on line 339 in broker/broker.go

View workflow job for this annotation

GitHub Actions / build

undefined: npipe.PipeListener

Check failure on line 339 in broker/broker.go

View workflow job for this annotation

GitHub Actions / build

undefined: npipe.PipeListener

for {
if usePipe {
fmt.Println(pipeName)
ln, err = npipe.Listen(pipeName)

Check failure on line 344 in broker/broker.go

View workflow job for this annotation

GitHub Actions / build

undefined: npipe.Listen

Check failure on line 344 in broker/broker.go

View workflow job for this annotation

GitHub Actions / build

undefined: npipe.Listen

Check failure on line 344 in broker/broker.go

View workflow job for this annotation

GitHub Actions / build

undefined: npipe.Listen
log.Info("Start Listening client on ", zap.String("pipeName", pipeName))
}
if err == nil {
break // successfully listening
}
log.Error("Error listening on ", zap.Error(err))
time.Sleep(1 * time.Second)
}

tmpDelay := 10 * ACCEPT_MIN_SLEEP

for {
conn, err := ln.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
log.Error(
"Temporary Client Accept Error(%v), sleeping %dms",
zap.Error(ne),
zap.Duration("sleeping", tmpDelay/time.Millisecond),
)

time.Sleep(tmpDelay)
tmpDelay *= 2
if tmpDelay > ACCEPT_MAX_SLEEP {
tmpDelay = ACCEPT_MAX_SLEEP
}
} else {
log.Error("Accept error", zap.Error(err))
}
continue
}

tmpDelay = ACCEPT_MIN_SLEEP
go func() {
err := b.handleConnection(CLIENT, conn)
if err != nil {
conn.Close()
}
}()
}
}

func (b *Broker) StartClusterListening() {
var hp string = b.config.Cluster.Host + ":" + b.config.Cluster.Port
log.Info("Start Listening cluster on ", zap.String("hp", hp))
Expand Down
31 changes: 16 additions & 15 deletions broker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,22 @@ import (
var json = jsoniter.ConfigCompatibleWithStandardLibrary

type Config struct {
Worker int `json:"workerNum"`
HTTPPort string `json:"httpPort"`
Host string `json:"host"`
Port string `json:"port"`
Cluster RouteInfo `json:"cluster"`
Router string `json:"router"`
TlsHost string `json:"tlsHost"`
TlsPort string `json:"tlsPort"`
WsPath string `json:"wsPath"`
WsPort string `json:"wsPort"`
WsTLS bool `json:"wsTLS"`
TlsInfo TLSInfo `json:"tlsInfo"`
Debug bool `json:"debug"`
Plugin Plugins `json:"plugins"`
UnixFilePath string `json:"unixFilePath"`
Worker int `json:"workerNum"`
HTTPPort string `json:"httpPort"`
Host string `json:"host"`
Port string `json:"port"`
Cluster RouteInfo `json:"cluster"`
Router string `json:"router"`
TlsHost string `json:"tlsHost"`
TlsPort string `json:"tlsPort"`
WsPath string `json:"wsPath"`
WsPort string `json:"wsPort"`
WsTLS bool `json:"wsTLS"`
TlsInfo TLSInfo `json:"tlsInfo"`
Debug bool `json:"debug"`
Plugin Plugins `json:"plugins"`
UnixFilePath string `json:"unixFilePath"`
WindowsPipeName string `json:"windowsPipeName"`
}

type Plugins struct {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ require (
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/natefinch/npipe v0.0.0-20160621034901-c1b8fa8bdcce // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/natefinch/npipe v0.0.0-20160621034901-c1b8fa8bdcce h1:TqjP/BTDrwN7zP9xyXVuLsMBXYMt6LLYi55PlrIcq8U=
github.com/natefinch/npipe v0.0.0-20160621034901-c1b8fa8bdcce/go.mod h1:ifHPsLndGGzvgzcaXUvzmt6LxKT4pJ+uzEhtnMt+f7A=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ=
Expand Down

0 comments on commit d504645

Please sign in to comment.