forked from libp2p/go-libp2p-daemon
-
Notifications
You must be signed in to change notification settings - Fork 4
/
p2pclient.go
138 lines (112 loc) · 3.37 KB
/
p2pclient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package p2pclient
import (
"errors"
"sync"
"github.com/libp2p/go-libp2p/core/peer"
ggio "github.com/gogo/protobuf/io"
logging "github.com/ipfs/go-log"
pb "github.com/learning-at-home/go-libp2p-daemon/pb"
multiaddr "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
var log = logging.Logger("p2pclient")
// MessageSizeMax is cribbed from github.com/libp2p/go-libp2p-net
const MessageSizeMax = 1 << 22 // 4 MB
// Client is the struct that manages a connection to a libp2p daemon.
type Client struct {
controlMaddr multiaddr.Multiaddr
listenMaddr multiaddr.Multiaddr
listener manet.Listener
mhandlers sync.Mutex
handlers map[string]StreamHandlerFunc
openPersistentConn sync.Once
persistentConnWriter ggio.WriteCloser
// callID (uuid.UUID) -> persistentConnectionFuture
callFutures sync.Map
unaryHandlers sync.Map
}
// NewClient creates a new libp2p daemon client, connecting to a daemon
// listening on a multi-addr at controlMaddr, and establishing an inbound
// listening multi-address at listenMaddr
func NewClient(controlMaddr, listenMaddr multiaddr.Multiaddr) (*Client, error) {
client := &Client{
controlMaddr: controlMaddr,
handlers: make(map[string]StreamHandlerFunc),
}
if err := client.listen(listenMaddr); err != nil {
return nil, err
}
return client, nil
}
func (c *Client) newControlConn() (manet.Conn, error) {
return manet.Dial(c.controlMaddr)
}
// Identify queries the daemon for its peer ID and listen addresses.
func (c *Client) Identify() (peer.ID, []multiaddr.Multiaddr, error) {
control, err := c.newControlConn()
if err != nil {
return peer.ID(""), nil, err
}
defer control.Close()
r := ggio.NewDelimitedReader(control, MessageSizeMax)
w := ggio.NewDelimitedWriter(control)
req := &pb.Request{Type: pb.Request_IDENTIFY.Enum()}
if err = w.WriteMsg(req); err != nil {
return peer.ID(""), nil, err
}
res := &pb.Response{}
if err = r.ReadMsg(res); err != nil {
return peer.ID(""), nil, err
}
if reserr := res.GetError(); reserr != nil {
return peer.ID(""), nil, errors.New(reserr.GetMsg())
}
idres := res.GetIdentify()
id, err := peer.IDFromBytes(idres.Id)
if err != nil {
return peer.ID(""), nil, err
}
addrs := make([]multiaddr.Multiaddr, 0, len(idres.Addrs))
for i, addrbytes := range idres.Addrs {
addr, err := multiaddr.NewMultiaddrBytes(addrbytes)
if err != nil {
log.Errorf("failed to parse multiaddr in position %d in response to identify request", i)
continue
}
addrs = append(addrs, addr)
}
return id, addrs, nil
}
// Connect establishes a connection to a peer after populating the Peerstore
// entry for said peer with a list of addresses.
func (c *Client) Connect(p peer.ID, addrs []multiaddr.Multiaddr) error {
control, err := c.newControlConn()
if err != nil {
return err
}
defer control.Close()
r := ggio.NewDelimitedReader(control, MessageSizeMax)
w := ggio.NewDelimitedWriter(control)
addrbytes := make([][]byte, len(addrs))
for i, addr := range addrs {
addrbytes[i] = addr.Bytes()
}
req := &pb.Request{
Type: pb.Request_CONNECT.Enum(),
Connect: &pb.ConnectRequest{
Peer: []byte(p),
Addrs: addrbytes,
},
}
if err := w.WriteMsg(req); err != nil {
return err
}
res := &pb.Response{}
if err := r.ReadMsg(res); err != nil {
return err
}
if err := res.GetError(); err != nil {
return errors.New(err.GetMsg())
}
return nil
}