-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathmessenger_in.go
91 lines (80 loc) · 1.93 KB
/
messenger_in.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package msngr
import (
"bufio"
"context"
"errors"
"io"
inet "github.com/libp2p/go-libp2p/core/network"
"github.com/celestiaorg/go-libp2p-messenger/serde"
)
// streamIn handles inbound streams from StreamHandler registered on the Host.
func (m *Messenger[M]) streamIn(s inet.Stream) {
select {
case m.newStreamsIn <- s:
case <-m.ctx.Done():
return
}
}
// processIn means processing everything related inbound data.
func (m *Messenger[M]) processIn() {
defer func() {
for p := range m.streamsIn {
delete(m.streamsIn, p)
}
}()
for {
select {
case s := <-m.newStreamsIn:
p := s.Conn().RemotePeer()
log.Debugw("new stream", "from", p.ShortString())
ss, ok := m.streamsIn[p]
if !ok {
ss = make(map[inet.Stream]context.CancelFunc)
m.streamsIn[p] = ss
} else if len(ss) != 0 {
// duplicate? we use the recent stream only
for _, cancel := range ss {
cancel()
}
log.Warnw("duplicate stream", "from", p.ShortString())
}
ctx, cancel := context.WithCancel(m.ctx)
go m.msgsIn(ctx, s)
ss[s] = cancel
case s := <-m.deadStreamsIn:
delete(m.streamsIn[s.Conn().RemotePeer()], s)
case <-m.ctx.Done():
return
}
}
}
// msgsIn handles an inbound peer stream lifecycle and reads msgs from it handing them to inbound chan.
func (m *Messenger[M]) msgsIn(ctx context.Context, s inet.Stream) {
defer s.Close()
r := bufio.NewReader(s)
from, to := s.Conn().RemotePeer(), s.Conn().LocalPeer()
for {
msg := m.new(from, to)
_, err := serde.Read(r, msg)
if err != nil {
select {
case m.deadStreamsIn <- s:
case <-ctx.Done():
return
}
if errors.Is(err, io.EOF) || errors.Is(err, inet.ErrReset) {
return
}
log.Errorw("reading message", "from", from.ShortString(), "err", err)
s.Reset()
return
}
select {
case m.inbound <- msg:
case <-ctx.Done():
return
default:
log.Warnw("message dropped (slow Receive reader)", "from", from.ShortString())
}
}
}