-
Notifications
You must be signed in to change notification settings - Fork 0
/
efmq.go
185 lines (170 loc) · 4.45 KB
/
efmq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
// Package efmq provides basic MQTT like functionality for message
// publishing and subscriptions within a local area network
package efmq
import (
"encoding/json"
"errors"
"log"
"net"
"github.com/mdlayher/ethernet"
"github.com/mdlayher/raw"
)
// EFQM represents a connection
type EFMQ struct {
netInterface *net.Interface
connection *net.PacketConn
subscription []string
listening bool
Message chan Message
}
type Message struct {
Topic string `json:"tpc"`
Payload string `json:"pyld"`
}
const etherType = 0xcccc
// NewEFMQ is a factory function to create a value of EFMQ type
func NewEFMQ(networkInterface string) (*EFMQ, error) {
mq := new(EFMQ)
mq.Message = make(chan Message)
// set network interface
ni, err := net.InterfaceByName(networkInterface)
if err != nil {
return mq, errors.New("NewEFMQ: could not detect interface " + networkInterface)
}
// create connection/listener
conn, err := connect(ni)
if err != nil {
return mq, err
}
// store in struct
mq.netInterface = ni
mq.connection = &conn
return mq, nil
}
// connect opens network interface to create connection for listening
func connect(ni *net.Interface) (net.PacketConn, error) {
var conn net.PacketConn
conn, err := raw.ListenPacket(ni, etherType)
if err != nil {
return conn, err
}
return conn, nil
}
// Subscribe takes a new subscription and stores it to slice
func (mq *EFMQ) Subscribe(topic string) {
// add topic to subscriptions and start listener
mq.subscription = append(mq.subscription, topic)
}
// Unsubscribe removes subscription from slice store
func (mq *EFMQ) Unsubscribe(topic string) error {
// remove topic from subscriptions
for i, v := range mq.subscription {
if v == topic {
mq.subscription = append(mq.subscription[:i], mq.subscription[i+1:]...)
}
}
return nil
}
// Publish broadcasts a message on the network which comprises topic
// and payload
func (mq *EFMQ) Publish(topic string, payload string) error {
// build a JSON object
message := Message{
Topic: topic,
Payload: payload,
}
// marshal to byte slice of JSON
content, err := json.Marshal(&message)
if err != nil {
return errors.New("Publish: failed to marshal JSON")
}
// pass to despatcher
if err := mq.despatcher(content); err != nil {
return err
}
return nil
}
// despatcher handles the transmission of message over ethernet frames
func (mq *EFMQ) despatcher(content []byte) error {
// configure frame
f := ðernet.Frame{
Destination: ethernet.Broadcast,
Source: mq.netInterface.HardwareAddr,
EtherType: etherType,
Payload: content,
}
// required for linux as mdlayher ethecho
addr := &raw.Addr{
HardwareAddr: ethernet.Broadcast,
}
// prepare
binary, err := f.MarshalBinary()
if err != nil {
return errors.New("despatcher: failed to marshal ethernet frame")
}
// send
conn := *mq.connection
if _, err := conn.WriteTo(binary, addr); err != nil {
return errors.New("despatcher: failed to send message")
}
return nil
}
// Subscriptions returns list of topics currently subscribed to
func (mq *EFMQ) Subscriptions() []string {
return mq.subscription
}
// Listen announces the subscriptions to which we are subscribed
// and then starts listener func in goroutine
func (mq *EFMQ) Listen() {
var subs string
subsLen := len(mq.subscription)
for i, v := range mq.subscription {
subs += v
if i < subsLen-1 {
subs += ", "
} else {
subs += "."
}
}
// listen & log
log.Println("Subscribed to topic(s):", subs, "Now listening...")
go mq.listener()
}
// listener filters messages before presenting to client using topic
func (mq *EFMQ) listener() {
var f ethernet.Frame
var conn net.PacketConn
var subs []string
conn = *mq.connection
subs = mq.subscription
b := make([]byte, mq.netInterface.MTU)
// handle messages indefinitely
for {
n, _, err := conn.ReadFrom(b)
if err != nil {
log.Printf("listener: failed to receive message: %v", err)
}
if err := (&f).UnmarshalBinary(b[:n]); err != nil {
log.Printf("listener: failed to unmarshal ethernet frame: %v", err)
}
// f.Payload could be padded with zeros, need to deal before unmarshal
var payload []byte
for _, v := range f.Payload {
if v != 0 {
payload = append(payload, v)
}
}
// unmarshal JSON
message := new(Message)
err = json.Unmarshal(payload, message)
if err != nil {
log.Println(err)
}
for _, v := range subs {
if message.Topic == v {
// put message on channel if matches a subscription
mq.Message <- *message
}
}
}
}