Skip to content

Commit

Permalink
performance4: use buffer.view based channels instead of pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
cre4ture committed Jul 30, 2024
1 parent 02eaeb5 commit 77bf9f1
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 33 deletions.
53 changes: 35 additions & 18 deletions overlay/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/slackhq/nebula/config"
"github.com/slackhq/nebula/iputil"
"gvisor.dev/gvisor/pkg/buffer"
)

func NewUserDeviceFromConfig(c *config.C, l *logrus.Logger, tunCidr *net.IPNet, routines int) (Device, error) {
Expand All @@ -15,25 +16,18 @@ func NewUserDeviceFromConfig(c *config.C, l *logrus.Logger, tunCidr *net.IPNet,

func NewUserDevice(tunCidr *net.IPNet) (Device, error) {
// these pipes guarantee each write/read will match 1:1
or, ow := io.Pipe()
ir, iw := io.Pipe()
return &UserDevice{
tunCidr: tunCidr,
outboundReader: or,
outboundWriter: ow,
inboundReader: ir,
inboundWriter: iw,
tunCidr: tunCidr,
outboundChannel: make(chan *buffer.View),
inboundChannel: make(chan *buffer.View),
}, nil
}

type UserDevice struct {
tunCidr *net.IPNet

outboundReader *io.PipeReader
outboundWriter *io.PipeWriter

inboundReader *io.PipeReader
inboundWriter *io.PipeWriter
outboundChannel chan *buffer.View
inboundChannel chan *buffer.View
}

func (d *UserDevice) Activate() error {
Expand All @@ -46,18 +40,41 @@ func (d *UserDevice) NewMultiQueueReader() (io.ReadWriteCloser, error) {
return d, nil
}

func (d *UserDevice) Pipe() (*io.PipeReader, *io.PipeWriter) {
return d.inboundReader, d.outboundWriter
func (d *UserDevice) Pipe() (<-chan *buffer.View, chan<- *buffer.View) {
return d.inboundChannel, d.outboundChannel
}

func (d *UserDevice) Read(p []byte) (n int, err error) {
return d.outboundReader.Read(p)
view, ok := <-d.outboundChannel
if !ok {
return 0, io.EOF
}
return view.Read(p)
}
func (d *UserDevice) WriteTo(w io.Writer) (n int64, err error) {
view, ok := <-d.outboundChannel
if !ok {
return 0, io.EOF
}
return view.WriteTo(w)
}

func (d *UserDevice) Write(p []byte) (n int, err error) {
return d.inboundWriter.Write(p)
view := buffer.NewViewWithData(p)
d.inboundChannel <- view
return view.Size(), nil
}
func (d *UserDevice) ReadFrom(r io.Reader) (n int64, err error) {
view := buffer.NewViewSize(2048)
n, err = view.ReadFrom(r)
if n > 0 {
d.inboundChannel <- view
}
return
}

func (d *UserDevice) Close() error {
d.inboundWriter.Close()
d.outboundWriter.Close()
close(d.inboundChannel)
close(d.outboundChannel)
return nil
}
19 changes: 4 additions & 15 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,24 +105,17 @@ func New(config *config.C, logger *logrus.Logger) (*Service, error) {
tcpFwd := tcp.NewForwarder(s.ipstack, tcpReceiveBufferSize, maxInFlightConnectionAttempts, s.tcpHandler)
s.ipstack.SetTransportProtocolHandler(tcp.ProtocolNumber, tcpFwd.HandlePacket)

reader, writer := device.Pipe()
nebula_tun_reader, nebula_tun_writer := device.Pipe()

go func() {
<-ctx.Done()
reader.Close()
writer.Close()
close(nebula_tun_writer)
}()

// create Goroutines to forward packets between Nebula and Gvisor
eg.Go(func() error {
buf := make([]byte, header.IPv4MaximumHeaderSize+header.IPv4MaximumPayloadSize)
for {
// this will read exactly one packet
n, err := reader.Read(buf)
if err != nil {
return err
}
view := buffer.NewViewWithData(buf[:n])
view := <-nebula_tun_reader
packetBuf := stack.NewPacketBuffer(stack.PacketBufferOptions{
Payload: buffer.MakeWithView(view),
})
Expand All @@ -142,11 +135,7 @@ func New(config *config.C, logger *logrus.Logger) (*Service, error) {
}
continue
}
bufView := packet.ToView()
if _, err := bufView.WriteTo(writer); err != nil {
return err
}
bufView.Release()
nebula_tun_writer <- packet.ToView()
}
})

Expand Down

0 comments on commit 77bf9f1

Please sign in to comment.