Skip to content

Commit

Permalink
packet
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouyuyan committed Sep 12, 2017
1 parent 5601632 commit 9627799
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 6 deletions.
1 change: 1 addition & 0 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ func (b *Broker) connectRouter(url, remoteID string) {
route: route,
info: info,
}
c.init()
b.remotes.Store(cid, c)
c.SendConnect()
c.SendInfo()
Expand Down
34 changes: 29 additions & 5 deletions broker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type client struct {
conn net.Conn
info info
route *route
status int
smu sync.RWMutex
subs map[string]*subscription
rsubs map[string]*subInfo
}
Expand Down Expand Up @@ -69,6 +71,9 @@ var (
)

func (c *client) init() {
c.smu.Lock()
defer c.smu.Unlock()
c.status = Connected
typ := c.typ
if typ == ROUTER {
c.rsubs = make(map[string]*subInfo)
Expand Down Expand Up @@ -162,8 +167,11 @@ func ProcessMessage(msg *Message) {
}

func (c *client) ProcessPublish(packet *packets.PublishPacket) {
topic := packet.TopicName
if c.status == Disconnected {
return
}

topic := packet.TopicName
if !c.CheckTopicAuth(PUB, topic) {
log.Error("Pub Topics Auth failed, ", topic)
return
Expand Down Expand Up @@ -198,6 +206,9 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) {
}

func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
if c.status == Disconnected {
return
}

b := c.broker
if b == nil {
Expand Down Expand Up @@ -282,6 +293,10 @@ func getQueueSubscribeNum(qsubs []*subscription) int {
}

func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
if c.status == Disconnected {
return
}

b := c.broker
if b == nil {
return
Expand Down Expand Up @@ -375,6 +390,9 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
}

func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) {
if c.status == Disconnected {
return
}
b := c.broker
if b == nil {
return
Expand Down Expand Up @@ -437,6 +455,9 @@ func (c *client) unsubscribe(sub *subscription) {
}

func (c *client) ProcessPing() {
if c.status == Disconnected {
return
}
resp := packets.NewControlPacket(packets.Pingresp).(*packets.PingrespPacket)
err := c.WriterPacket(resp)
if err != nil {
Expand All @@ -446,6 +467,13 @@ func (c *client) ProcessPing() {
}

func (c *client) Close() {
c.smu.Lock()
c.status = Disconnected
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
c.smu.Unlock()
b := c.broker
subs := c.subs
if b != nil {
Expand All @@ -463,10 +491,6 @@ func (c *client) Close() {
b.PublishMessage(c.info.willMsg)
}
}
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
}

func (c *client) WriterPacket(packet packets.ControlPacket) error {
Expand Down
7 changes: 6 additions & 1 deletion broker/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (
)

func (c *client) SendInfo() {
if c.status == Disconnected {
return
}
url := c.info.localIP + ":" + c.broker.config.Cluster.Port

infoMsg := NewInfo(c.broker.id, url, false)
Expand All @@ -37,7 +40,9 @@ func (c *client) StartPing() {
}

func (c *client) SendConnect() {

if c.status == Disconnected {
return
}
m := packets.NewControlPacket(packets.Connect).(*packets.ConnectPacket)

m.CleanSession = true
Expand Down

0 comments on commit 9627799

Please sign in to comment.