Skip to content

Commit 733b853

Browse files
committed
fetch: TaskContext API
1 parent 0f66433 commit 733b853

29 files changed

+693
-348
lines changed

.gitignore

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
/*.out
1+
*.out
2+
.pnpm-debug.log
23
/*ndn-dpdk-*.tgz
3-
/.pnpm-debug.log
44
/.vscode
55
/build*
66
/csrc/*/*enum.h

app/fetch/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22

33
This package is used in the [traffic generator](../tg).
44
It implements a consumer that follows the TCP CUBIC congestion control algorithm, simulating traffic patterns similar to bulk file transfer.
5-
It requires one thread, running the `FetchThread_Run` function.
5+
It requires at least one thread, running the `FetchThread_Run` function.

app/fetch/counters.go

-43
This file was deleted.

app/fetch/fetcher.go

+122-152
Original file line numberDiff line numberDiff line change
@@ -8,209 +8,179 @@ import "C"
88
import (
99
"errors"
1010
"math"
11-
"unsafe"
1211

1312
"github.com/usnistgov/ndn-dpdk/app/tg/tgdef"
14-
"github.com/usnistgov/ndn-dpdk/core/cptr"
15-
"github.com/usnistgov/ndn-dpdk/core/pcg32"
16-
"github.com/usnistgov/ndn-dpdk/core/urcu"
13+
"github.com/usnistgov/ndn-dpdk/core/logging"
1714
"github.com/usnistgov/ndn-dpdk/dpdk/eal"
1815
"github.com/usnistgov/ndn-dpdk/dpdk/ealthread"
19-
"github.com/usnistgov/ndn-dpdk/dpdk/ringbuffer"
2016
"github.com/usnistgov/ndn-dpdk/iface"
21-
"github.com/usnistgov/ndn-dpdk/ndn/an"
22-
"github.com/usnistgov/ndn-dpdk/ndni"
2317
"github.com/zyedidia/generic"
2418
"go.uber.org/multierr"
19+
"golang.org/x/exp/maps"
2520
)
2621

27-
// FetcherConfig contains Fetcher configuration.
28-
type FetcherConfig struct {
29-
NThreads int `json:"nThreads,omitempty"`
30-
NProcs int `json:"nProcs,omitempty"`
31-
RxQueue iface.PktQueueConfig `json:"rxQueue,omitempty"`
32-
WindowCapacity int `json:"windowCapacity,omitempty"`
33-
}
34-
35-
// Validate applies defaults and validates the configuration.
36-
func (cfg *FetcherConfig) Validate() error {
37-
cfg.NThreads = generic.Max(1, cfg.NThreads)
38-
cfg.NProcs = generic.Max(1, cfg.NProcs)
39-
cfg.RxQueue.DisableCoDel = true
40-
cfg.WindowCapacity = ringbuffer.AlignCapacity(cfg.WindowCapacity, 16, 65536)
41-
return nil
42-
}
22+
var logger = logging.New("fetch")
4323

44-
type worker struct {
45-
ealthread.ThreadWithCtrl
46-
c *C.FetchThread
47-
}
24+
// Config contains Fetcher configuration.
25+
type Config struct {
26+
TaskSlotConfig
4827

49-
var (
50-
_ ealthread.ThreadWithRole = (*worker)(nil)
51-
_ ealthread.ThreadWithLoadStat = (*worker)(nil)
52-
)
28+
// NThreads is the number of worker threads.
29+
// Each worker thread can serve multiple fetch tasks.
30+
NThreads int `json:"nThreads,omitempty"`
5331

54-
// ThreadRole implements ealthread.ThreadWithRole interface.
55-
func (worker) ThreadRole() string {
56-
return tgdef.RoleConsumer
32+
// NTasks is the number of task slots.
33+
// Each task retrieves one segmented object and has independent congestion control.
34+
NTasks int `json:"nTasks,omitempty"`
5735
}
5836

59-
// NumaSocket implements eal.WithNumaSocket interface.
60-
func (w worker) NumaSocket() eal.NumaSocket {
61-
return w.face().NumaSocket()
62-
}
63-
64-
func (w worker) face() iface.Face {
65-
return iface.Get(iface.ID(w.c.face))
37+
// Validate applies defaults and validates the configuration.
38+
func (cfg *Config) Validate() error {
39+
cfg.TaskSlotConfig.applyDefaults()
40+
cfg.NThreads = generic.Clamp(cfg.NThreads, 1, math.MaxInt8)
41+
cfg.NTasks = generic.Clamp(cfg.NTasks, 1, iface.MaxInputDemuxDest)
42+
return nil
6643
}
6744

68-
// Fetcher controls fetch threads and fetch procedures on a face.
69-
// A fetch procedure retrieves Data under a single name prefix, and has independent congestion control.
70-
// A fetch thread runs on an lcore, and can serve multiple fetch procedures.
45+
// Fetcher controls worker threads and task slots on a face.
7146
type Fetcher struct {
72-
workers []*worker
73-
fp []*C.FetchProc
74-
nActiveProcs int
47+
workers []*worker
48+
taskSlots []*taskSlot
7549
}
7650

7751
var _ tgdef.Consumer = &Fetcher{}
7852

79-
// New creates a Fetcher.
80-
func New(face iface.Face, cfg FetcherConfig) (*Fetcher, error) {
81-
cfg.Validate()
82-
if cfg.NProcs >= math.MaxUint8 { // InputDemux dispatches on 1-octet of PIT token
83-
return nil, errors.New("too many procs")
84-
}
85-
86-
faceID := face.ID()
87-
socket := face.NumaSocket()
88-
interestMp := (*C.struct_rte_mempool)(ndni.InterestMempool.Get(socket).Ptr())
89-
90-
fetcher := &Fetcher{
91-
workers: make([]*worker, cfg.NThreads),
92-
fp: make([]*C.FetchProc, cfg.NProcs),
93-
}
94-
for i := range fetcher.workers {
95-
w := &worker{
96-
c: eal.Zmalloc[C.FetchThread]("FetchThread", C.sizeof_FetchThread, socket),
97-
}
98-
w.c.face = (C.FaceID)(faceID)
99-
w.c.interestMp = interestMp
100-
pcg32.Init(unsafe.Pointer(&w.c.nonceRng))
101-
w.ThreadWithCtrl = ealthread.NewThreadWithCtrl(
102-
cptr.Func0.C(C.FetchThread_Run, w.c),
103-
unsafe.Pointer(&w.c.ctrl),
104-
)
105-
fetcher.workers[i] = w
106-
}
107-
108-
for i := range fetcher.fp {
109-
fp := eal.Zmalloc[C.FetchProc]("FetchProc", C.sizeof_FetchProc, socket)
110-
if e := iface.PktQueueFromPtr(unsafe.Pointer(&fp.rxQueue)).Init(cfg.RxQueue, socket); e != nil {
111-
return nil, e
112-
}
113-
fp.pitToken = C.uint8_t(i)
114-
fetcher.fp[i] = fp
115-
fetcher.Logic(i).Init(cfg.WindowCapacity, socket)
116-
}
117-
118-
return fetcher, nil
119-
}
120-
121-
// Logic returns the Logic of i-th fetch procedure.
122-
func (fetcher *Fetcher) Logic(i int) *Logic {
123-
return (*Logic)(&fetcher.fp[i].logic)
53+
// Face returns the face.
54+
func (fetcher *Fetcher) Face() iface.Face {
55+
return fetcher.workers[0].Face()
12456
}
12557

126-
// Reset resets all Logics.
127-
// If the fetcher is running, it is automatically stopped.
128-
func (fetcher *Fetcher) Reset() {
129-
fetcher.Stop()
130-
for _, fth := range fetcher.workers {
131-
fth.c.head.next = nil
132-
}
133-
for i := range fetcher.fp {
134-
fetcher.Logic(i).Reset()
58+
// ConnectRxQueues connects Data InputDemux to RxQueues.
59+
// Nack InputDemux is set to drop packets because fetcher does not support Nacks.
60+
func (fetcher *Fetcher) ConnectRxQueues(demuxD, demuxN *iface.InputDemux) {
61+
demuxD.InitToken(0)
62+
for i, ts := range fetcher.taskSlots {
63+
demuxD.SetDest(i, ts.RxQueueD())
13564
}
136-
fetcher.nActiveProcs = 0
65+
demuxN.InitDrop()
13766
}
13867

139-
// AddTemplate sets name prefix and other InterestTemplate arguments.
140-
// Return index of fetch procedure.
141-
func (fetcher *Fetcher) AddTemplate(tplCfg ndni.InterestTemplateConfig) (i int, e error) {
142-
i = fetcher.nActiveProcs
143-
if i >= len(fetcher.fp) {
144-
return -1, errors.New("too many templates")
145-
}
146-
147-
fp := fetcher.fp[i]
148-
tpl := ndni.InterestTemplateFromPtr(unsafe.Pointer(&fp.tpl))
149-
tplCfg.Apply(tpl)
68+
// Workers returns worker threads.
69+
func (fetcher *Fetcher) Workers() []ealthread.ThreadWithRole {
70+
return tgdef.GatherWorkers(fetcher.workers)
71+
}
15072

151-
if uintptr(fp.tpl.prefixL+1) >= unsafe.Sizeof(fp.tpl.prefixV) {
152-
return -1, errors.New("name too long")
73+
// Tasks returns running fetch tasks.
74+
func (fetcher *Fetcher) Tasks() (list []*TaskContext) {
75+
taskContextLock.RLock()
76+
defer taskContextLock.RUnlock()
77+
list = []*TaskContext{}
78+
for _, task := range taskContextByID {
79+
if task.fetcher == fetcher {
80+
list = append(list, task)
81+
}
15382
}
154-
fp.tpl.prefixV[fp.tpl.prefixL] = an.TtSegmentNameComponent
155-
// put SegmentNameComponent TLV-TYPE in the buffer so that it's checked in same memcmp
156-
157-
rs := urcu.NewReadSide()
158-
defer rs.Close()
159-
fth := fetcher.workers[i%len(fetcher.workers)]
160-
C.cds_hlist_add_head_rcu(&fp.fthNode, &fth.c.head)
161-
fetcher.nActiveProcs++
162-
return i, nil
83+
return
16384
}
16485

165-
// Face returns the face.
166-
func (fetcher *Fetcher) Face() iface.Face {
167-
return fetcher.workers[0].face()
168-
}
86+
// Fetch starts a fetch task.
87+
func (fetcher *Fetcher) Fetch(d TaskDef) (task *TaskContext, e error) {
88+
eal.CallMain(func() {
89+
task = &TaskContext{
90+
d: d,
91+
fetcher: fetcher,
92+
w: fetcher.workers[0],
93+
stopping: make(chan struct{}),
94+
}
16995

170-
func (fetcher *Fetcher) rxQueue(i int) *iface.PktQueue {
171-
return iface.PktQueueFromPtr(unsafe.Pointer(&fetcher.fp[i].rxQueue))
172-
}
96+
for _, ts := range fetcher.taskSlots {
97+
if ts.worker == -1 {
98+
task.ts = ts
99+
break
100+
}
101+
}
102+
if task.ts == nil {
103+
task, e = nil, errors.New("too many running tasks")
104+
return
105+
}
106+
if e = task.ts.Init(d); e != nil {
107+
task = nil
108+
return
109+
}
173110

174-
// ConnectRxQueues connects Data+Nack InputDemux to RxQueues.
175-
func (fetcher *Fetcher) ConnectRxQueues(demuxD, demuxN *iface.InputDemux) {
176-
demuxD.InitToken(0)
177-
demuxN.InitToken(0)
178-
for i := range fetcher.fp {
179-
q := fetcher.rxQueue(i)
180-
demuxD.SetDest(i, q)
181-
demuxN.SetDest(i, q)
182-
}
183-
}
111+
for _, w := range fetcher.workers {
112+
if task.w.nTasks > w.nTasks {
113+
task.w = w
114+
}
115+
}
184116

185-
// Workers returns worker threads.
186-
func (fetcher Fetcher) Workers() []ealthread.ThreadWithRole {
187-
return tgdef.GatherWorkers(fetcher.workers)
117+
task.w.AddTask(eal.MainReadSide, task.ts)
118+
e = nil
119+
taskContextLock.Lock()
120+
defer taskContextLock.Unlock()
121+
lastTaskContextID++
122+
task.id = lastTaskContextID
123+
taskContextByID[task.id] = task
124+
})
125+
return
188126
}
189127

190-
// Launch launches all fetch threads.
128+
// Launch launches all worker threads.
191129
func (fetcher *Fetcher) Launch() {
192130
tgdef.LaunchWorkers(fetcher.workers)
193131
}
194132

195-
// Stop stops all fetch threads.
133+
// Stop stops all worker threads.
196134
func (fetcher *Fetcher) Stop() error {
197135
return tgdef.StopWorkers(fetcher.workers)
198136
}
199137

138+
// Reset aborts all tasks and stops all worker threads.
139+
func (fetcher *Fetcher) Reset() {
140+
fetcher.Stop()
141+
for _, w := range fetcher.workers {
142+
w.ClearTasks()
143+
}
144+
for _, ts := range fetcher.taskSlots {
145+
ts.worker = -1
146+
}
147+
maps.DeleteFunc(taskContextByID, func(id int, task *TaskContext) bool { return task.fetcher == fetcher })
148+
}
149+
200150
// Close deallocates data structures.
201151
func (fetcher *Fetcher) Close() error {
202152
errs := []error{
203153
fetcher.Stop(),
204154
}
205-
for i, fp := range fetcher.fp {
155+
for _, ts := range fetcher.taskSlots {
206156
errs = append(errs,
207-
fetcher.rxQueue(i).Close(),
208-
fetcher.Logic(i).Close(),
157+
ts.RxQueueD().Close(),
158+
ts.Logic().Close(),
209159
)
210-
eal.Free(fp)
160+
eal.Free(ts)
211161
}
212-
for _, fth := range fetcher.workers {
213-
eal.Free(fth.c)
162+
for _, w := range fetcher.workers {
163+
eal.Free(w.c)
214164
}
215165
return multierr.Combine(errs...)
216166
}
167+
168+
// New creates a Fetcher.
169+
func New(face iface.Face, cfg Config) (*Fetcher, error) {
170+
cfg.applyDefaults()
171+
172+
fetcher := &Fetcher{
173+
workers: make([]*worker, cfg.NThreads),
174+
taskSlots: make([]*taskSlot, cfg.NTasks),
175+
}
176+
for i := range fetcher.workers {
177+
fetcher.workers[i] = newWorker(face, i)
178+
}
179+
180+
socket := face.NumaSocket()
181+
for i := range fetcher.taskSlots {
182+
fetcher.taskSlots[i] = newTaskSlot(i, cfg.TaskSlotConfig, socket)
183+
}
184+
185+
return fetcher, nil
186+
}

0 commit comments

Comments
 (0)