-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathmessenger_out.go
177 lines (156 loc) · 4.32 KB
/
messenger_out.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package msngr
import (
"context"
inet "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/celestiaorg/go-libp2p-messenger/serde"
)
// streamOut stands for outbound streams creation.
func (m *Messenger[M]) streamOut(p peer.ID) {
s, err := m.host.NewStream(m.ctx, p, m.pids...)
if err != nil {
// it is a normal case when a peer does not speak the same protocol while we connected to him
log.Debugw("error opening new stream to peer", "peer", p.ShortString(), "err", err)
return
}
select {
case m.newStreamsOut <- s:
case <-m.ctx.Done():
s.Reset()
}
}
// processOut means processing everything related to outbound data.
func (m *Messenger[M]) processOut() {
defer func() {
for p := range m.streamsOut {
delete(m.streamsOut, p)
}
for p := range m.peersOut {
delete(m.peersOut, p)
}
}()
for {
select {
case msg := <-m.broadcast:
sentTo := make(peer.IDSlice, 0, len(m.peersOut))
for id, out := range m.peersOut {
select {
case out <- msg:
sentTo = append(sentTo, id)
case <-m.ctx.Done():
return
}
}
select {
case m.broadcastPeers <- sentTo:
case <-m.ctx.Done():
return
}
case msg := <-m.outbound:
to := msg.To()
out, ok := m.peersOut[to]
if !ok {
out = make(chan M, 32)
m.peersOut[to] = out
}
if len(m.streamsOut[to]) == 0 {
// connect if there is no stream
// new connection will trigger new stream
log.Debugw("connect", "to", to.ShortString())
go m.connect(to)
}
select {
case out <- msg:
default:
log.Warnw("dropped message - full buffer", "to", to.ShortString())
}
case s := <-m.newStreamsOut:
p := s.Conn().RemotePeer()
log.Debugw("new stream", "to", p.ShortString())
ss, ok := m.streamsOut[p]
if !ok {
ss = make(map[inet.Stream]context.CancelFunc)
m.streamsOut[p] = ss
}
if len(ss) != 0 {
// duplicate? we use the recent stream only
for _, cancel := range ss {
cancel()
}
log.Warnw("duplicate stream", "to", p.ShortString())
}
out, ok := m.peersOut[p]
if !ok {
out = make(chan M, 32)
m.peersOut[p] = out
}
ctx, cancel := context.WithCancel(m.ctx)
go m.msgsOut(ctx, s, out)
ss[s] = cancel
case s := <-m.deadStreamsOut:
p := s.Conn().RemotePeer()
ss := m.streamsOut[p]
delete(ss, s)
if len(ss) != 0 {
// cleanup of an original stream in case of a duplicate
continue
}
// if no more streams, but there are msgs to send - trigger reconnect
if len(m.peersOut[p]) > 0 {
log.Warnw("reconnect", "to", p.ShortString())
go m.reconnect(p)
}
// TODO: This is the place where we could also cleanup outbound chan,
// but the reason for peer being dead might be a short term disconnect,
// so instead of dropping all the pending messages in the chan, we should give them some time to live
// and only after the time passes - drop.
//
// NOTE: There is a chance for a first out message to be dropped due to reconnect,
// as msgsOut will read from the out chan and fail with msg reset. For this to be fixed more advanced
// queue should be used instead of native Go chan.
case req := <-m.peersReqs:
out := make([]peer.ID, 0, len(m.peersOut))
for p := range m.peersOut {
out = append(out, p)
}
req <- out // not blocking
case <-m.ctx.Done():
return
}
}
}
// msgsOut handles outbound peer stream lifecycle and writes outgoing messages handed from processOut
func (m *Messenger[M]) msgsOut(ctx context.Context, s inet.Stream, out <-chan M) {
closed := make(chan struct{})
go func() {
// a valid trick to check if stream is closed/reset
// once Read unblocked then its closed/reset
_, err := serde.Read(s, &serde.PlainMessage{})
if err == nil {
s.Reset()
log.Warnw("totally unexpected message", "from", s.Conn().RemotePeer().ShortString())
}
close(closed)
select {
case m.deadStreamsOut <- s:
log.Warnw("dead stream", "to", s.Conn().RemotePeer().ShortString())
case <-m.ctx.Done():
}
}()
defer s.Close()
for {
select {
case msg := <-out: // out is not going to be closed, thus no 'ok' check
_, err := serde.Write(s, msg)
if err != nil {
log.Errorw("writing message", "to", msg.To().ShortString(), "err", err)
s.Reset()
return
}
case <-closed:
return
case <-ctx.Done():
return
}
}
}