-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpeers.go
118 lines (108 loc) · 2.64 KB
/
peers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package main
import (
"errors"
"io"
"net/http"
"strings"
"sync"
"time"
"github.com/schollz/peerdiscovery"
)
// peers maps IP:port to names
var peers map[string]*peer
var peersMu = &sync.RWMutex{}
type peer struct {
ip string
Name string
lastSeen time.Time
}
// startPeerServices starts the peer discovery service in a goroutine.
// It needs the port number of the webserver.
func startPeerServices(serverPort string) {
// Discovery service
go peerdiscovery.Discover(
peerdiscovery.Settings{
Port: discoveryPort,
Payload: []byte(serverPort), // Payload holds the webserver port
TimeLimit: -1, // Never stop finding peers
Delay: pingInterval,
Notify: func(d peerdiscovery.Discovered) {
go func() {
peersMu.Lock()
defer peersMu.Unlock()
key := d.Address + ":" + string(d.Payload)
_, ok := peers[key]
if !ok {
// This peer isn't added yet
// Default first name is just IP address
peers[key] = &peer{
ip: d.Address,
Name: d.Address,
}
// But get their real name
go updatePeerName(key, peers[key])
}
// Update lastSeen every time
peers[key].lastSeen = time.Now()
}()
},
},
)
// Timeout service - remove peers after they haven't been seen for 5 times
// the amount of time they're supposed to ping
go func() {
for {
// Runs every 5 seconds
time.Sleep(5 * time.Second)
peersMu.Lock()
for addr, p := range peers {
if p.lastSeen.Add(5 * pingInterval).Before(time.Now()) {
delete(peers, addr)
}
}
peersMu.Unlock()
}
}()
// Name service - updates the names of peers
go func() {
for {
// Runs every 5 seconds
time.Sleep(5 * time.Second)
// Get peer names and update them, concurrently
c := make(chan struct{}, 8) // Limit workers; semaphore
for addr, p := range peers {
c <- struct{}{}
go func(addr2 string, p2 *peer) {
defer func() { <-c }()
updatePeerName(addr2, p2)
}(addr, p)
}
}
}()
}
func updatePeerName(addr string, p *peer) {
resp, err := http.Get("http://" + addr + "/.api/getName")
// Ignore errors, leaving the name as what it was before
if err != nil {
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
// Reset peer name to default of IP address
peersMu.Lock()
p.Name = p.ip
peersMu.Unlock()
return
}
// Only read name up to 64 bytes
var newName strings.Builder
_, err = io.CopyN(&newName, resp.Body, 65)
if !errors.Is(err, io.EOF) {
// Too long or some other error
return
}
// Change name
peersMu.Lock()
p.Name = strings.ToValidUTF8(newName.String(), "\uFFFD")
peersMu.Unlock()
}