Skip to content

Commit

Permalink
Merge branch 'master' of chowyu08.github.com:fhmq/hmq
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouyy committed Jan 8, 2024
2 parents 1c2d20e + de2dd52 commit 5ba8038
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 67 deletions.
28 changes: 28 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# This workflow will build a golang project
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go

name: Go

on:
push:
branches: [ "master" ]
pull_request:
branches: [ "master" ]

jobs:

build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.19

- name: Build
run: go build -v ./...

- name: Test
run: go test -v ./...
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ log/*
.vscode/settings.json
.pre-commit-config.yaml
hmq.exe
*.sw*
*.swo
*.swp
*.swn
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ COPY . .
RUN CGO_ENABLED=0 go build -o hmq -a -ldflags '-extldflags "-static"' .


FROM alpine
FROM alpine:3.17.3
WORKDIR /
COPY --from=builder /go/src/github.com/fhmq/hmq/hmq .
EXPOSE 1883
Expand Down
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
![build](https://img.shields.io/github/workflow/status/fhmq/hmq/Ubuntu%20build?label=Ubuntu&style=for-the-badge)
![build](https://img.shields.io/github/workflow/status/fhmq/hmq/MacOS%20build?label=MacOS&style=for-the-badge)
![build](https://img.shields.io/github/workflow/status/fhmq/hmq/Windows%20build?label=Windows&style=for-the-badge)

Free and High Performance MQTT Broker
============
Expand Down
106 changes: 73 additions & 33 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package broker

import (
"crypto/tls"
"errors"
"fmt"
"net"
"net/http"
"sync"
"time"
encJson "encoding/json"

"github.com/fhmq/hmq/broker/lib/sessions"
"github.com/fhmq/hmq/broker/lib/topics"
Expand Down Expand Up @@ -203,7 +205,10 @@ func (b *Broker) StartWebsocketListening() {
func (b *Broker) wsHandler(ws *websocket.Conn) {
// io.Copy(ws, ws)
ws.PayloadType = websocket.BinaryFrame
b.handleConnection(CLIENT, ws)
err:=b.handleConnection(CLIENT, ws)
if err!=nil{
ws.Close()
}
}

func (b *Broker) StartClientListening(Tls bool) {
Expand Down Expand Up @@ -254,7 +259,12 @@ func (b *Broker) StartClientListening(Tls bool) {
}

tmpDelay = ACCEPT_MIN_SLEEP
go b.handleConnection(CLIENT, conn)
go func(){
err :=b.handleConnection(CLIENT, conn)
if err!=nil{
conn.Close()
}
}()
}
}

Expand Down Expand Up @@ -291,7 +301,12 @@ func (b *Broker) StartClusterListening() {
}
tmpDelay = ACCEPT_MIN_SLEEP

go b.handleConnection(ROUTER, conn)
go func(){
err :=b.handleConnection(ROUTER, conn)
if err!=nil{
conn.Close()
}
}()
}
}

Expand All @@ -307,22 +322,18 @@ func (b *Broker) DisConnClientByClientId(clientId string) {
conn.Close()
}

func (b *Broker) handleConnection(typ int, conn net.Conn) {
func (b *Broker) handleConnection(typ int, conn net.Conn) error{
//process connect packet
packet, err := packets.ReadPacket(conn)
if err != nil {
log.Error("read connect packet error", zap.Error(err))
conn.Close()
return
return errors.New(fmt.Sprintln("read connect packet error:%v",err))

Check failure on line 329 in broker/broker.go

View workflow job for this annotation

GitHub Actions / build

fmt.Sprintln call has possible formatting directive %v
}
if packet == nil {
log.Error("received nil packet")
return
return errors.New("received nil packet")
}
msg, ok := packet.(*packets.ConnectPacket)
if !ok {
log.Error("received msg that was not Connect")
return
return errors.New("received msg that was not Connect")
}

log.Info("read connect from ", getAdditionalLogFields(msg.ClientIdentifier, conn)...)
Expand All @@ -332,29 +343,22 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) {
connack.ReturnCode = msg.Validate()

if connack.ReturnCode != packets.Accepted {
func() {
defer conn.Close()
if err := connack.Write(conn); err != nil {
log.Error("send connack error", getAdditionalLogFields(msg.ClientIdentifier, conn, zap.Error(err))...)
}
}()
return
if err := connack.Write(conn); err != nil {
return errors.New(fmt.Sprintln("send connack error:%v,clientID:%v,conn:%v",err,msg.ClientIdentifier,conn))

Check failure on line 347 in broker/broker.go

View workflow job for this annotation

GitHub Actions / build

fmt.Sprintln call has possible formatting directive %v
}
return errors.New(fmt.Sprintln("connect packet validate failed with connack.ReturnCode%v",connack.ReturnCode))

Check failure on line 349 in broker/broker.go

View workflow job for this annotation

GitHub Actions / build

fmt.Sprintln call has possible formatting directive %v
}

if typ == CLIENT && !b.CheckConnectAuth(msg.ClientIdentifier, msg.Username, string(msg.Password)) {
connack.ReturnCode = packets.ErrRefusedNotAuthorised
func() {
defer conn.Close()
if err := connack.Write(conn); err != nil {
log.Error("send connack error", getAdditionalLogFields(msg.ClientIdentifier, conn, zap.Error(err))...)
}
}()
return
if err := connack.Write(conn); err != nil {
return errors.New(fmt.Sprintln("send connack error:%v,clientID:%v,conn:%v",err,msg.ClientIdentifier,conn))

Check failure on line 355 in broker/broker.go

View workflow job for this annotation

GitHub Actions / build

fmt.Sprintln call has possible formatting directive %v
}
return errors.New(fmt.Sprintln("connect packet CheckConnectAuth failed with connack.ReturnCode%v",connack.ReturnCode))

Check failure on line 357 in broker/broker.go

View workflow job for this annotation

GitHub Actions / build

fmt.Sprintln call has possible formatting directive %v
}

if err := connack.Write(conn); err != nil {
log.Error("send connack error", getAdditionalLogFields(msg.ClientIdentifier, conn, zap.Error(err))...)
return
return errors.New(fmt.Sprintln("send connack error:%v,clientID:%v,conn:%v",err,msg.ClientIdentifier,conn))

Check failure on line 361 in broker/broker.go

View workflow job for this annotation

GitHub Actions / build

fmt.Sprintln call has possible formatting directive %v
}

willmsg := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket)
Expand Down Expand Up @@ -385,8 +389,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) {
c.init()

if err := b.getSession(c, msg, connack); err != nil {
log.Error("get session error", getAdditionalLogFields(c.info.clientID, conn, zap.Error(err))...)
return
return errors.New(fmt.Sprintln("get session error:%v,clientID:%v,conn:%v",err,msg.ClientIdentifier,conn))

Check failure on line 392 in broker/broker.go

View workflow job for this annotation

GitHub Actions / build

fmt.Sprintln call has possible formatting directive %v
}

cid := c.info.clientID
Expand All @@ -405,7 +408,21 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) {
}
b.clients.Store(cid, c)

b.OnlineOfflineNotification(cid, true)
var pubPack = PubPacket{}
if willmsg != nil {
pubPack.TopicName = info.willMsg.TopicName
pubPack.Payload = info.willMsg.Payload
}

pubInfo := Info{
ClientID: info.clientID,
Username: info.username,
Password: info.password,
Keepalive: info.keepalive,
WillMsg: pubPack,
}

b.OnlineOfflineNotification(pubInfo, true, c.lastMsgTime)
{
b.Publish(&bridge.Elements{
ClientID: msg.ClientIdentifier,
Expand All @@ -426,6 +443,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) {
}

c.readLoop()
return nil
}

func (b *Broker) ConnectToDiscovery() {
Expand Down Expand Up @@ -695,11 +713,33 @@ func (b *Broker) BroadcastUnSubscribe(topicsToUnSubscribeFrom []string) {
b.BroadcastSubOrUnsubMessage(unsub)
}

func (b *Broker) OnlineOfflineNotification(clientID string, online bool) {
type OnlineOfflineMsg struct {
ClientID string `json:"clientID"`
Online bool `json:"online"`
Timestamp string `json:"timestamp"`
ClientInfo Info `json:"info"`
LastMsgTime int64 `json:"lastMsg"`
}

func (b *Broker) OnlineOfflineNotification(info Info, online bool, lastMsg int64) {
packet := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket)
packet.TopicName = "$SYS/broker/connection/clients/" + clientID
packet.TopicName = "$SYS/broker/connection/clients/" + info.ClientID
packet.Qos = 0
packet.Payload = []byte(fmt.Sprintf(`{"clientID":"%s","online":%v,"timestamp":"%s"}`, clientID, online, time.Now().UTC().Format(time.RFC3339)))

msg := OnlineOfflineMsg{
ClientID: info.ClientID,
Online: online,
Timestamp: time.Now().UTC().Format(time.RFC3339),
ClientInfo: info,
LastMsgTime: lastMsg,
}

if b, err := encJson.Marshal(msg); err != nil {
//This is a TERRIBLE situation, falling back to legacy format to not break API Contract
packet.Payload = []byte(fmt.Sprintf(`{"clientID":"%s","online":%v,"timestamp":"%s"}`, info.ClientID, online, time.Now().UTC().Format(time.RFC3339)))
} else {
packet.Payload = b
}

b.PublishMessage(packet)
}
35 changes: 33 additions & 2 deletions broker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type client struct {
mqueue *queue.Queue
retryTimer *time.Timer
retryTimerLock sync.Mutex
lastMsgTime int64
}

type InflightStatus uint8
Expand Down Expand Up @@ -111,6 +112,19 @@ type info struct {
remoteIP string
}

type PubPacket struct {
TopicName string `json:"topicName"`
Payload []byte `json:"payload"`
}

type Info struct {
ClientID string `json:"clientId"`
Username string `json:"username"`
Password []byte `json:"password"`
Keepalive uint16 `json:"keepalive"`
WillMsg PubPacket `json:"willMsg"`
}

type route struct {
remoteID string
remoteUrl string
Expand All @@ -122,6 +136,7 @@ var (
)

func (c *client) init() {
c.lastMsgTime = time.Now().Unix() //mark the connection packet time as last time messaged
c.status = Connected
c.info.localIP, _, _ = net.SplitHostPort(c.conn.LocalAddr().String())
remoteAddr := c.conn.RemoteAddr()
Expand Down Expand Up @@ -185,6 +200,8 @@ func (c *client) readLoop() {
if _, isDisconnect := packet.(*packets.DisconnectPacket); isDisconnect {
c.info.willMsg = nil
c.cancelFunc()
} else {
c.lastMsgTime = time.Now().Unix()
}

msg := &Message{
Expand Down Expand Up @@ -842,8 +859,22 @@ func (c *client) Close() {

if c.typ == CLIENT {
b.BroadcastUnSubscribe(unSubTopics)

var pubPack = PubPacket{}
if c.info.willMsg != nil {
pubPack.TopicName = c.info.willMsg.TopicName
pubPack.Payload = c.info.willMsg.Payload
}

pubInfo := Info{
ClientID: c.info.clientID,
Username: c.info.username,
Password: c.info.password,
Keepalive: c.info.keepalive,
WillMsg: pubPack,
}
//offline notification
b.OnlineOfflineNotification(c.info.clientID, false)
b.OnlineOfflineNotification(pubInfo, false, c.lastMsgTime)
}

if c.info.willMsg != nil {
Expand All @@ -864,7 +895,7 @@ func (c *client) Close() {
func (c *client) WriterPacket(packet packets.ControlPacket) error {
defer func() {
if err := recover(); err != nil {
log.Error("recover error, ", zap.Any("recover", r))
log.Error("recover error, ", zap.Any("recover", err))
}
}()
if c.status == Disconnected {
Expand Down
49 changes: 44 additions & 5 deletions broker/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,24 @@ import (
"github.com/gin-gonic/gin"
)

const (
CONNECTIONS = "api/v1/connections"
)

type ConnClient struct {
Info `json:"info"`
LastMsgTime int64 `json:"lastMsg"`
}

type resp struct {
Code int `json:"code,omitempty"`
Clients []ConnClient `json:"clients,omitempty"`
}

func InitHTTPMoniter(b *Broker) {
gin.SetMode(gin.ReleaseMode)
router := gin.Default()
router.DELETE("api/v1/connections/:clientid", func(c *gin.Context) {
router.DELETE(CONNECTIONS + "/:clientid", func(c *gin.Context) {
clientid := c.Param("clientid")
cli, ok := b.clients.Load(clientid)
if ok {
Expand All @@ -16,10 +30,35 @@ func InitHTTPMoniter(b *Broker) {
conn.Close()
}
}
resp := map[string]int{
"code": 0,
}
c.JSON(200, &resp)
r := resp{Code: 0}
c.JSON(200, &r)
})
router.GET(CONNECTIONS, func(c *gin.Context) {
conns := make([]ConnClient, 0)
b.clients.Range(func (k, v interface{}) bool {
cl, _ := v.(*client)
var pubPack = PubPacket{}
if cl.info.willMsg != nil {
pubPack.TopicName = cl.info.willMsg.TopicName
pubPack.Payload = cl.info.willMsg.Payload
}

msg := ConnClient{
Info: Info{
ClientID: cl.info.clientID,
Username: cl.info.username,
Password: cl.info.password,
Keepalive: cl.info.keepalive,
WillMsg: pubPack,
},
LastMsgTime: cl.lastMsgTime,
}

conns = append(conns, msg)
return true
})
r := resp{Clients: conns}
c.JSON(200, &r)
})

router.Run(":" + b.config.HTTPPort)
Expand Down
Loading

0 comments on commit 5ba8038

Please sign in to comment.