forked from usnistgov/ndn-dpdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathport.go
267 lines (230 loc) · 6.1 KB
/
port.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
// Package ethport implements faces using DPDK Ethernet device as transport.
package ethport
import (
"errors"
"fmt"
"sync"
"go.uber.org/zap"
"github.com/usnistgov/ndn-dpdk/core/logging"
"github.com/usnistgov/ndn-dpdk/dpdk/ethdev"
"github.com/usnistgov/ndn-dpdk/dpdk/ethdev/ethnetif"
"github.com/usnistgov/ndn-dpdk/dpdk/pktmbuf"
"github.com/usnistgov/ndn-dpdk/iface"
"github.com/usnistgov/ndn-dpdk/ndni"
)
var logger = logging.New("ethport")
var (
ports = map[ethdev.EthDev]*Port{}
portsMutex sync.RWMutex
)
// Limits and defaults.
const (
DefaultRxQueueSize = 4096
DefaultTxQueueSize = 4096
xdpMinDataroom = 2048 // XDP_UMEM_MIN_CHUNK_SIZE in kernel
)
// Config contains Port creation arguments.
type Config struct {
// ethnetif.Config specifies how to find or create EthDev.
ethnetif.Config
// EthDev specifies EthDev. It overrides ethnetif.Config.
EthDev ethdev.EthDev `json:"-"`
// AutoClose indicates that EthDev should be closed when the last face is closed.
AutoClose bool `json:"-"`
RxQueueSize int `json:"rxQueueSize,omitempty" gqldesc:"Hardware RX queue capacity."`
TxQueueSize int `json:"txQueueSize,omitempty" gqldesc:"Hardware TX queue capacity."`
MTU int `json:"mtu,omitempty" gqldesc:"Change interface MTU (excluding Ethernet/VLAN headers)."`
RxFlowQueues int `json:"rxFlowQueues,omitempty" gqldesc:"Enable RxFlow and set maximum queue count."`
}
// ensureEthDev creates EthDev if it's not set.
func (cfg *Config) ensureEthDev() (e error) {
if cfg.EthDev != nil {
return nil
}
if cfg.EthDev, e = ethnetif.CreateEthDev(cfg.Config); e != nil {
return e
}
return nil
}
// applyDefaults applies defaults.
// cfg.EthDev must be set before calling this function.
func (cfg *Config) applyDefaults() {
if cfg.MTU == 0 {
cfg.MTU = cfg.EthDev.MTU()
}
if cfg.RxQueueSize == 0 {
cfg.RxQueueSize = DefaultRxQueueSize
}
if cfg.TxQueueSize == 0 {
cfg.TxQueueSize = DefaultTxQueueSize
}
}
// Port organizes EthFaces on an EthDev.
type Port struct {
mutex sync.Mutex
cfg Config
logger *zap.Logger
dev ethdev.EthDev
devInfo ethdev.DevInfo
ddpRollback func() error
faces map[iface.ID]*Face
rxBouncePool *pktmbuf.Pool
rxImpl rxImpl
txl iface.TxLoop
}
// EthDev returns the Ethernet device.
func (port *Port) EthDev() ethdev.EthDev {
return port.dev
}
// Faces returns a list of active faces.
func (port *Port) Faces() (list []iface.Face) {
port.mutex.Lock()
defer port.mutex.Unlock()
for _, face := range port.faces {
list = append(list, face)
}
return list
}
// Close closes the port.
func (port *Port) Close() error {
portsMutex.Lock()
defer portsMutex.Unlock()
return port.closeWithPortsMutex()
}
func (port *Port) closeWithPortsMutex() error {
port.mutex.Lock()
defer port.mutex.Unlock()
if nFaces := len(port.faces); nFaces > 0 {
return fmt.Errorf("cannot close Port with %d active faces", nFaces)
}
errs := []error{}
if port.rxImpl != nil {
errs = append(errs, port.rxImpl.Close(port))
port.rxImpl = nil
}
if port.ddpRollback != nil {
errs = append(errs, port.ddpRollback())
port.ddpRollback = nil
}
if port.dev != nil {
errs = append(errs, port.dev.Close())
delete(ports, port.dev)
port.dev = nil
}
if port.rxBouncePool != nil {
errs = append(errs, port.rxBouncePool.Close())
port.rxBouncePool = nil
}
if e := errors.Join(errs...); e != nil {
port.logger.Warn("port closed with errors", zap.Error(e))
} else {
port.logger.Info("port closed")
}
return nil
}
func (port *Port) startDev(nRxQueues int, promisc bool) (e error) {
socket := port.dev.NumaSocket()
rxPool := port.rxBouncePool
if rxPool == nil {
rxPool = ndni.PacketMempool.Get(socket)
}
if port.cfg.RxFlowQueues > 0 && port.devInfo.Driver() == ethdev.DriverI40e {
if dp, e := ethdev.OpenDdpProfile("gtp"); e == nil {
port.ddpRollback, _ = dp.Upload(port.dev)
}
}
cfg := ethdev.Config{
MTU: port.cfg.MTU,
Promisc: promisc,
}
cfg.AddRxQueues(nRxQueues, ethdev.RxQueueConfig{
Capacity: port.cfg.RxQueueSize,
Socket: socket,
RxPool: rxPool,
})
cfg.AddTxQueues(1, ethdev.TxQueueConfig{
Capacity: port.cfg.TxQueueSize,
Socket: socket,
})
return port.dev.Start(cfg)
}
func (port *Port) activateTx(face iface.Face) {
if port.txl == nil {
port.txl = iface.ActivateTxFace(face)
} else {
port.txl.Add(face)
}
}
func (port *Port) deactivateTx(face iface.Face) {
iface.DeactivateTxFace(face)
if len(port.faces) == 0 {
port.txl = nil
}
}
// New opens a Port.
func New(cfg Config) (port *Port, e error) {
portsMutex.Lock()
defer portsMutex.Unlock()
if e = cfg.ensureEthDev(); e != nil {
return nil, e
}
if ports[cfg.EthDev] != nil {
return nil, errors.New("Port already exists")
}
cfg.applyDefaults()
if ndni.PacketMempool.Config().Dataroom < pktmbuf.DefaultHeadroom+cfg.MTU {
return nil, errors.New("PacketMempool dataroom is too small for requested MTU")
}
port = &Port{
cfg: cfg,
logger: logger.With(cfg.EthDev.ZapField("port")),
dev: cfg.EthDev,
devInfo: cfg.EthDev.DevInfo(),
faces: map[iface.ID]*Face{},
}
switch port.devInfo.Driver() {
case ethdev.DriverXDP:
if port.rxBouncePool, e = pktmbuf.NewPool(pktmbuf.PoolConfig{
Capacity: cfg.RxQueueSize + iface.MaxBurstSize,
Dataroom: max(pktmbuf.DefaultHeadroom+cfg.MTU, xdpMinDataroom),
}, cfg.EthDev.NumaSocket()); e != nil {
return nil, e
}
case ethdev.DriverMemif:
port.rxImpl = &rxMemif{}
}
if port.rxImpl == nil {
if port.cfg.RxFlowQueues > 0 {
port.rxImpl = &rxFlow{}
} else {
port.rxImpl = &rxTable{}
}
}
if e := port.rxImpl.Init(port); e != nil {
port.logger.Error("rxImpl init error", zap.Error(e))
port.rxImpl = nil
port.Close()
return nil, e
}
port.logger.Info("port opened", zap.Stringer("rxImpl", port.rxImpl))
ports[port.dev] = port
return port, nil
}
// Find finds Port by EthDev.
func Find(dev ethdev.EthDev) *Port {
if dev == nil {
return nil
}
portsMutex.RLock()
defer portsMutex.RUnlock()
return ports[dev]
}
func init() {
iface.OnCloseAll(func() {
portsMutex.Lock()
defer portsMutex.Unlock()
for _, port := range ports {
port.closeWithPortsMutex()
}
})
}