forked from usnistgov/ndn-dpdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathforwarder.go
185 lines (162 loc) · 4.54 KB
/
forwarder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package l3
import (
"math/rand"
"sync"
"github.com/usnistgov/ndn-dpdk/ndn"
"github.com/zyedidia/generic/multimap"
"github.com/zyedidia/generic/set"
)
// Forwarder is a logical forwarding plane.
// Its main purpose is to demultiplex incoming packets among faces, where a 'face' is defined as a duplex stream of packets.
//
// This is a simplified forwarder with several limitations.
// - There is no loop prevention: no Nonce list and no decrementing HopLimit.
// If multiple uplinks have "/" route, Interests will be forwarded among them and might cause persistent loops.
// Thus, it is not recommended to connect to multiple uplinks with overlapping routes.
// - There is no pending Interest table. Instead, downstream 'face' ID is inserted as part of the PIT token.
// Since PIT token cannot exceed 32 octets, this takes away some space.
// Thus, consumers are allowed to use a PIT token up to 28 octets; Interests with longer PIT tokens may be dropped.
type Forwarder interface {
// AddFace adds a Face to the forwarder.
// face.Rx() and face.Tx() should not be used after this operation.
AddFace(face Face) (FwFace, error)
// AddReadvertiseDestination adds a destination for prefix announcement.
//
// Limitations of current implementation:
// - Existing announcements are not advertised on dest.
// Thus, it is recommended to add all readvertise destinations before announcing a prefix.
// - There is no error handling.
AddReadvertiseDestination(dest ReadvertiseDestination)
// RemoveReadvertiseDestination removes a destination for prefix announcement.
//
// Limitations of current implementation:
// - Announcements are not withdrawn before removing dest.
// - There is no error handling.
RemoveReadvertiseDestination(dest ReadvertiseDestination)
}
// NewForwarder creates a Forwarder.
func NewForwarder() Forwarder {
fw := &forwarder{
faces: map[uint32]*fwFace{},
announcements: multimap.NewMapSlice[string, *fwFace](),
readvertise: set.NewMapset[ReadvertiseDestination](),
cmd: make(chan func()),
rx: make(chan fwRxPkt),
}
go fw.loop()
return fw
}
type fwRxPkt struct {
*ndn.Packet
rxFace *fwFace
}
type forwarder struct {
faces map[uint32]*fwFace
announcements multimap.MultiMap[string, *fwFace]
readvertise set.Set[ReadvertiseDestination]
cmd chan func()
rx chan fwRxPkt
}
func (fw *forwarder) AddFace(face Face) (ff FwFace, e error) {
f := &fwFace{
Face: face,
fw: fw,
tx: face.Tx(),
routes: map[string]ndn.Name{},
announcements: map[string]ndn.Name{},
}
fw.do(func() {
if len(fw.faces) >= MaxFwFaces {
e = ErrMaxFwFaces
f = nil
return
}
for f.id == 0 || fw.faces[f.id] != nil {
f.id = rand.Uint32()
}
fw.faces[f.id] = f
})
if e != nil {
return nil, e
}
go f.rxLoop()
return f, nil
}
func (fw *forwarder) AddReadvertiseDestination(dest ReadvertiseDestination) {
fw.do(func() {
fw.readvertise.Put(dest)
})
}
func (fw *forwarder) RemoveReadvertiseDestination(dest ReadvertiseDestination) {
fw.do(func() {
fw.readvertise.Remove(dest)
})
}
func (fw *forwarder) do(fn func()) {
done := make(chan struct{})
fw.cmd <- func() {
defer close(done)
fn()
}
<-done
}
func (fw *forwarder) loop() {
for {
select {
case fn := <-fw.cmd:
fn()
case pkt := <-fw.rx:
switch {
case pkt.Interest != nil:
fw.forwardInterest(pkt)
case pkt.Data != nil, pkt.Nack != nil:
fw.forwardDataNack(pkt)
}
}
}
}
func (fw *forwarder) forwardInterest(pkt fwRxPkt) {
lpmLen := 0
var nexthops []*fwFace
for _, f := range fw.faces {
if pkt.rxFace == f {
continue
}
matchLen := f.lpmRoute(pkt.Interest.Name)
switch {
case matchLen > lpmLen:
lpmLen = matchLen
nexthops = nil
fallthrough
case matchLen == lpmLen:
nexthops = append(nexthops, f)
}
}
for _, f := range nexthops {
f.tx <- pkt
}
}
func (fw *forwarder) forwardDataNack(pkt fwRxPkt) {
var id uint32
id, pkt.Lp.PitToken = tokenStripID(pkt.Lp.PitToken)
if f := fw.faces[id]; f != nil {
f.tx <- pkt.Packet
}
}
var (
defaultForwarder Forwarder
defaultForwarderOnce sync.Once
)
// GetDefaultForwarder returns the default Forwarder.
func GetDefaultForwarder() Forwarder {
defaultForwarderOnce.Do(func() {
defaultForwarder = NewForwarder()
})
return defaultForwarder
}
// DeleteDefaultForwarder deletes the default Forwarder.
// This is non-thread-safe and should only be used in test cases.
func DeleteDefaultForwarder() {
defaultForwarder = nil
defaultForwarderOnce = sync.Once{}
}