Skip to content

Commit a216ccf

Browse files
committed
pdump: EthPortSource
1 parent 808655e commit a216ccf

24 files changed

+630
-244
lines changed

app/pdump/enum.go

+6
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ const (
1818
// NgTypeIDB is PCAPNG enhanced packet block type.
1919
NgTypeEPB = 0x00000006
2020

21+
// MbufTypeRaw indicates mbuf should be written unchanged.
22+
MbufTypeRaw = 0xF0010000
23+
24+
// MbufTypeSLL indicates mbuf should be written with SLL header.
25+
MbufTypeSLL = 0xF0020000
26+
2127
_ = "enumgen::Pdump"
2228
)
2329

app/pdump/ethport.go

+133
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package pdump
2+
3+
/*
4+
#include "../../csrc/pdump/source.h"
5+
#include "../../csrc/ethface/rxtable.h"
6+
*/
7+
import "C"
8+
import (
9+
"errors"
10+
"fmt"
11+
"unsafe"
12+
13+
"github.com/google/gopacket/layers"
14+
"github.com/google/gopacket/pcapgo"
15+
"github.com/usnistgov/ndn-dpdk/core/urcu"
16+
"github.com/usnistgov/ndn-dpdk/dpdk/eal"
17+
"github.com/usnistgov/ndn-dpdk/dpdk/ethdev"
18+
"github.com/usnistgov/ndn-dpdk/dpdk/pktmbuf"
19+
"github.com/usnistgov/ndn-dpdk/iface"
20+
"github.com/usnistgov/ndn-dpdk/iface/ethport"
21+
"go.uber.org/multierr"
22+
"go.uber.org/zap"
23+
)
24+
25+
var ethPortSources = map[*ethport.Port]*EthPortSource{}
26+
27+
// EthPortConfig contains EthPortSource configuration.
28+
type EthPortConfig struct {
29+
Writer *Writer
30+
Port *ethport.Port
31+
32+
rxt *C.EthRxTable
33+
}
34+
35+
func (cfg *EthPortConfig) validate() error {
36+
errs := []error{}
37+
38+
if cfg.Writer == nil {
39+
errs = append(errs, errors.New("writer not found"))
40+
}
41+
42+
if cfg.Port == nil {
43+
errs = append(errs, errors.New("port not found"))
44+
} else if cfg.rxt = (*C.EthRxTable)(ethport.RxTablePtrFromPort(cfg.Port)); cfg.rxt == nil {
45+
errs = append(errs, errors.New("port is not using RxTable"))
46+
}
47+
48+
return multierr.Combine(errs...)
49+
}
50+
51+
// EthPortSource is a packet dump source attached to EthRxTable.
52+
// It can capture incoming packets not matched to an existing face.
53+
type EthPortSource struct {
54+
EthPortConfig
55+
logger *zap.Logger
56+
c *C.PdumpSource
57+
}
58+
59+
func (s *EthPortSource) setRef(expected, newPtr *C.PdumpSource) {
60+
setSourceRef(&s.rxt.pdumpUnmatched, expected, newPtr)
61+
}
62+
63+
// Close detaches the dump source.
64+
func (s *EthPortSource) Close() error {
65+
sourcesLock.Lock()
66+
defer sourcesLock.Unlock()
67+
return s.closeImpl()
68+
}
69+
70+
func (s *EthPortSource) closeImpl() error {
71+
s.logger.Info("EthPortSource close")
72+
s.setRef(s.c, nil)
73+
delete(ethPortSources, s.Port)
74+
75+
go func() {
76+
urcu.Synchronize()
77+
s.Writer.stopSource()
78+
s.logger.Info("EthPortSource freed")
79+
eal.Free(s.c)
80+
}()
81+
return nil
82+
}
83+
84+
// NewEthPortSource creates a EthPortSource.
85+
func NewEthPortSource(cfg EthPortConfig) (s *EthPortSource, e error) {
86+
if e := cfg.validate(); e != nil {
87+
return nil, e
88+
}
89+
90+
sourcesLock.Lock()
91+
defer sourcesLock.Unlock()
92+
93+
s = &EthPortSource{
94+
EthPortConfig: cfg,
95+
}
96+
if _, ok := ethPortSources[s.Port]; ok {
97+
return nil, errors.New("another EthPortSource is attached to this port")
98+
}
99+
dev := s.Port.EthDev()
100+
id, socket := dev.ID(), dev.NumaSocket()
101+
102+
s.logger = logger.With(dev.ZapField("port"))
103+
s.c = (*C.PdumpSource)(eal.Zmalloc("PdumpSource", C.sizeof_PdumpSource, socket))
104+
*s.c = C.PdumpSource{
105+
directMp: (*C.struct_rte_mempool)(pktmbuf.Direct.Get(socket).Ptr()),
106+
queue: s.Writer.c.queue,
107+
filter: nil,
108+
mbufType: MbufTypeRaw,
109+
mbufPort: C.uint16_t(id),
110+
mbufCopy: false,
111+
}
112+
113+
s.Writer.defineIntf(id, pcapgo.NgInterface{
114+
Name: fmt.Sprintf("port%d", id),
115+
Description: dev.Name(),
116+
LinkType: layers.LinkTypeEthernet,
117+
})
118+
s.Writer.startSource()
119+
s.setRef(nil, s.c)
120+
121+
ethPortSources[s.Port] = s
122+
s.logger.Info("EthPortSource open",
123+
zap.Uintptr("dumper", uintptr(unsafe.Pointer(s.c))),
124+
zap.Uintptr("queue", uintptr(unsafe.Pointer(s.Writer.c.queue))),
125+
)
126+
return s, nil
127+
}
128+
129+
func init() {
130+
if ethdev.MaxEthDevs > iface.MinID {
131+
panic("FaceID and EthDevID must not overlap")
132+
}
133+
}

app/pdump/face.go

+60-59
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package pdump
22

33
/*
4-
#include "../../csrc/pdump/face.h"
54
#include "../../csrc/pdump/format.h"
5+
#include "../../csrc/pdump/source.h"
66
#include "../../csrc/iface/face.h"
77
*/
88
import "C"
@@ -15,6 +15,8 @@ import (
1515
"sync"
1616
"unsafe"
1717

18+
"github.com/google/gopacket/layers"
19+
"github.com/google/gopacket/pcapgo"
1820
"github.com/usnistgov/ndn-dpdk/core/urcu"
1921
"github.com/usnistgov/ndn-dpdk/dpdk/eal"
2022
"github.com/usnistgov/ndn-dpdk/dpdk/pktmbuf"
@@ -36,15 +38,15 @@ const (
3638

3739
var dirImpls = map[Direction]struct {
3840
sllType C.rte_be16_t
39-
getRef func(faceC *C.Face) *C.PdumpFaceRef
41+
getRef func(faceC *C.Face) *C.PdumpSourceRef
4042
}{
4143
DirIncoming: {
4244
C.SLLIncoming,
43-
func(faceC *C.Face) *C.PdumpFaceRef { return &faceC.impl.rx.pdump },
45+
func(faceC *C.Face) *C.PdumpSourceRef { return &faceC.impl.rx.pdump },
4446
},
4547
DirOutgoing: {
4648
C.SLLOutgoing,
47-
func(faceC *C.Face) *C.PdumpFaceRef { return &faceC.impl.tx.pdump },
49+
func(faceC *C.Face) *C.PdumpSourceRef { return &faceC.impl.tx.pdump },
4850
},
4951
}
5052

@@ -64,24 +66,23 @@ func parseFaceDir(input string) (fd faceDir, e error) {
6466

6567
var (
6668
faceSources = map[faceDir]*FaceSource{}
67-
faceSourcesLock sync.Mutex
6869
faceClosingOnce sync.Once
6970
)
7071

7172
func handleFaceClosing(id iface.ID) {
72-
faceSourcesLock.Lock()
73-
defer faceSourcesLock.Unlock()
73+
sourcesLock.Lock()
74+
defer sourcesLock.Unlock()
7475

7576
for dir := range dirImpls {
76-
fs, ok := faceSources[faceDir{id, dir}]
77+
s, ok := faceSources[faceDir{id, dir}]
7778
if !ok {
7879
continue
7980
}
80-
fs.closeImpl()
81+
s.closeImpl()
8182
}
8283
}
8384

84-
// FaceConfig contains face dumper configuration.
85+
// FaceConfig contains FaceSource configuration.
8586
type FaceConfig struct {
8687
Writer *Writer
8788
Face iface.Face
@@ -129,90 +130,90 @@ type FaceSource struct {
129130
FaceConfig
130131
key faceDir
131132
logger *zap.Logger
132-
c *C.PdumpFace
133+
c *C.PdumpFaceSource
133134
}
134135

135-
func (fs *FaceSource) setPdumpFaceRef(expected, newPtr *C.PdumpFace) {
136-
ref := dirImpls[fs.Dir].getRef((*C.Face)(fs.Face.Ptr()))
137-
if old := C.PdumpFaceRef_Set(ref, newPtr); old != expected {
138-
fs.logger.Panic("PdumpFaceRef pointer mismatch",
139-
zap.Uintptr("new", uintptr(unsafe.Pointer(newPtr))),
140-
zap.Uintptr("old", uintptr(unsafe.Pointer(old))),
141-
zap.Uintptr("expected", uintptr(unsafe.Pointer(expected))),
142-
)
143-
}
136+
func (s *FaceSource) setRef(expected, newPtr *C.PdumpSource) {
137+
ref := dirImpls[s.Dir].getRef((*C.Face)(s.Face.Ptr()))
138+
setSourceRef(ref, expected, newPtr)
144139
}
145140

146141
// Close detaches the dump source.
147-
func (fs *FaceSource) Close() error {
148-
faceSourcesLock.Lock()
149-
defer faceSourcesLock.Unlock()
150-
return fs.closeImpl()
142+
func (s *FaceSource) Close() error {
143+
sourcesLock.Lock()
144+
defer sourcesLock.Unlock()
145+
return s.closeImpl()
151146
}
152147

153-
func (fs *FaceSource) closeImpl() error {
154-
fs.logger.Info("PdumpFace close")
155-
fs.setPdumpFaceRef(fs.c, nil)
156-
delete(faceSources, fs.key)
148+
func (s *FaceSource) closeImpl() error {
149+
s.logger.Info("FaceSource close")
150+
s.setRef(&s.c.base, nil)
151+
delete(faceSources, s.key)
157152

158153
go func() {
159154
urcu.Synchronize()
160-
fs.Writer.stopSource()
161-
fs.logger.Info("PdumpFace freed")
162-
eal.Free(fs.c)
155+
s.Writer.stopSource()
156+
s.logger.Info("FaceSource freed")
157+
eal.Free(s.c)
163158
}()
164159
return nil
165160
}
166161

167162
// NewFaceSource creates a FaceSource.
168-
func NewFaceSource(cfg FaceConfig) (fs *FaceSource, e error) {
163+
func NewFaceSource(cfg FaceConfig) (s *FaceSource, e error) {
169164
if e := cfg.validate(); e != nil {
170165
return nil, e
171166
}
172167

173-
faceSourcesLock.Lock()
174-
defer faceSourcesLock.Unlock()
168+
sourcesLock.Lock()
169+
defer sourcesLock.Unlock()
175170

176-
fs = &FaceSource{
171+
s = &FaceSource{
177172
FaceConfig: cfg,
178173
key: faceDir{cfg.Face.ID(), cfg.Dir},
179174
}
180-
if _, ok := faceSources[fs.key]; ok {
181-
return nil, errors.New("another PdumpFace is attached to this face and direction")
175+
if _, ok := faceSources[s.key]; ok {
176+
return nil, errors.New("another FaceSource is attached to this face and direction")
182177
}
183-
socket := cfg.Face.NumaSocket()
178+
socket := s.Face.NumaSocket()
184179

185-
fs.logger = logger.With(cfg.Face.ID().ZapField("face"), zap.String("dir", string(cfg.Dir)))
186-
fs.c = (*C.PdumpFace)(eal.Zmalloc("PdumpFace", C.sizeof_PdumpFace, socket))
187-
*fs.c = C.PdumpFace{
180+
s.logger = logger.With(s.Face.ID().ZapField("face"), zap.String("dir", string(s.Dir)))
181+
s.c = (*C.PdumpFaceSource)(eal.Zmalloc("PdumpFaceSource", C.sizeof_PdumpFaceSource, socket))
182+
s.c.base = C.PdumpSource{
188183
directMp: (*C.struct_rte_mempool)(pktmbuf.Direct.Get(socket).Ptr()),
189-
queue: fs.Writer.c.queue,
190-
sllType: dirImpls[cfg.Dir].sllType,
184+
queue: s.Writer.c.queue,
185+
filter: C.PdumpSource_Filter(C.PdumpFaceSource_Filter),
186+
mbufType: MbufTypeSLL | C.uint32_t(dirImpls[s.Dir].sllType),
187+
mbufPort: C.uint16_t(s.Face.ID()),
188+
mbufCopy: true,
191189
}
192-
C.pcg32_srandom_r(&fs.c.rng, C.uint64_t(rand.Uint64()), C.uint64_t(rand.Uint64()))
190+
C.pcg32_srandom_r(&s.c.rng, C.uint64_t(rand.Uint64()), C.uint64_t(rand.Uint64()))
193191

194192
// sort by decending name length for longest prefix match
195-
sort.Slice(cfg.Names, func(i, j int) bool { return len(cfg.Names[i].Name) > len(cfg.Names[j].Name) })
196-
prefixes := ndni.NewLNamePrefixFilterBuilder(unsafe.Pointer(&fs.c.nameL), unsafe.Sizeof(fs.c.nameL),
197-
unsafe.Pointer(&fs.c.nameV), unsafe.Sizeof(fs.c.nameV))
198-
for i, nf := range cfg.Names {
193+
sort.Slice(s.Names, func(i, j int) bool { return len(s.Names[i].Name) > len(s.Names[j].Name) })
194+
prefixes := ndni.NewLNamePrefixFilterBuilder(unsafe.Pointer(&s.c.nameL), unsafe.Sizeof(s.c.nameL),
195+
unsafe.Pointer(&s.c.nameV), unsafe.Sizeof(s.c.nameV))
196+
for i, nf := range s.Names {
199197
if e := prefixes.Append(nf.Name); e != nil {
200-
eal.Free(fs.c)
198+
eal.Free(s.c)
201199
return nil, errors.New("names too long")
202200
}
203-
fs.c.sample[i] = C.uint32_t(math.Ceil(nf.SampleProbability * math.MaxUint32))
201+
s.c.sample[i] = C.uint32_t(math.Ceil(nf.SampleProbability * math.MaxUint32))
204202
}
205203

206-
fs.Writer.defineFace(fs.Face)
207-
fs.Writer.startSource()
208-
fs.setPdumpFaceRef(nil, fs.c)
204+
s.Writer.defineIntf(int(s.Face.ID()), pcapgo.NgInterface{
205+
Name: fmt.Sprintf("face%d", s.Face.ID()),
206+
Description: iface.LocatorString(s.Face.Locator()),
207+
LinkType: layers.LinkTypeLinuxSLL,
208+
})
209+
s.Writer.startSource()
210+
s.setRef(nil, &s.c.base)
209211

210212
faceClosingOnce.Do(func() { iface.OnFaceClosing(handleFaceClosing) })
211-
faceSources[fs.key] = fs
212-
213-
fs.logger.Info("PdumpFace open",
214-
zap.Uintptr("dumper", uintptr(unsafe.Pointer(fs.c))),
215-
zap.Uintptr("queue", uintptr(unsafe.Pointer(fs.c.queue))),
213+
faceSources[s.key] = s
214+
s.logger.Info("FaceSource open",
215+
zap.Uintptr("dumper", uintptr(unsafe.Pointer(s.c))),
216+
zap.Uintptr("queue", uintptr(unsafe.Pointer(s.Writer.c.queue))),
216217
)
217-
return fs, nil
218+
return s, nil
218219
}

app/pdump/gql.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,8 @@ func init() {
162162
return nil, e
163163
}
164164

165-
faceSourcesLock.Lock()
166-
defer faceSourcesLock.Unlock()
165+
sourcesLock.Lock()
166+
defer sourcesLock.Unlock()
167167
return faceSources[fd], nil
168168
}
169169
GqlFaceSourceNodeType.Delete = func(source interface{}) error {
@@ -256,8 +256,8 @@ func init() {
256256
Resolve: func(p graphql.ResolveParams) (interface{}, error) {
257257
sources := []interface{}{}
258258

259-
faceSourcesLock.Lock()
260-
defer faceSourcesLock.Unlock()
259+
sourcesLock.Lock()
260+
defer sourcesLock.Unlock()
261261
for _, fs := range faceSources {
262262
sources = append(sources, fs)
263263
}

0 commit comments

Comments
 (0)