Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send the lh update worker into its own routine instead of taking over the reload routine #935

Merged
merged 2 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions control.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ type controlHostLister interface {
}

type Control struct {
f *Interface
l *logrus.Logger
cancel context.CancelFunc
sshStart func()
statsStart func()
dnsStart func()
f *Interface
l *logrus.Logger
cancel context.CancelFunc
sshStart func()
statsStart func()
dnsStart func()
lighthouseStart func()
}

type ControlHostInfo struct {
Expand Down Expand Up @@ -63,12 +64,15 @@ func (c *Control) Start() {
if c.dnsStart != nil {
go c.dnsStart()
}
if c.lighthouseStart != nil {
c.lighthouseStart()
}

// Start reading packets.
c.f.run()
}

// Stop signals nebula to shutdown, returns after the shutdown is complete
// Stop signals nebula to shutdown and close all tunnels, returns after the shutdown is complete
func (c *Control) Stop() {
// Stop the handshakeManager (and other services), to prevent new tunnels from
// being created while we're shutting them all down.
Expand Down Expand Up @@ -98,7 +102,7 @@ func (c *Control) RebindUDPServer() {
_ = c.f.outside.Rebind()

// Trigger a lighthouse update, useful for mobile clients that should have an update interval of 0
c.f.lightHouse.SendUpdate(c.f)
c.f.lightHouse.SendUpdate()

// Let the main interface know that we rebound so that underlying tunnels know to trigger punches from their remotes
c.f.rebindCount++
Expand Down
44 changes: 22 additions & 22 deletions lighthouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,10 @@ type LightHouse struct {
staticList atomic.Pointer[map[iputil.VpnIp]struct{}]
lighthouses atomic.Pointer[map[iputil.VpnIp]struct{}]

interval atomic.Int64
updateCancel context.CancelFunc
updateParentCtx context.Context
updateUdp EncWriter
nebulaPort uint32 // 32 bits because protobuf does not have a uint16
interval atomic.Int64
updateCancel context.CancelFunc
ifce EncWriter
nebulaPort uint32 // 32 bits because protobuf does not have a uint16

advertiseAddrs atomic.Pointer[[]netIpAndPort]

Expand Down Expand Up @@ -217,7 +216,7 @@ func (lh *LightHouse) reload(c *config.C, initial bool) error {
lh.updateCancel()
}

lh.LhUpdateWorker(lh.updateParentCtx, lh.updateUdp)
lh.StartUpdateWorker()
}
}

Expand Down Expand Up @@ -754,33 +753,34 @@ func NewUDPAddrFromLH6(ipp *Ip6AndPort) *udp.Addr {
return udp.NewAddr(lhIp6ToIp(ipp), uint16(ipp.Port))
}

func (lh *LightHouse) LhUpdateWorker(ctx context.Context, f EncWriter) {
lh.updateParentCtx = ctx
lh.updateUdp = f

func (lh *LightHouse) StartUpdateWorker() {
interval := lh.GetUpdateInterval()
if lh.amLighthouse || interval == 0 {
return
}

clockSource := time.NewTicker(time.Second * time.Duration(interval))
updateCtx, cancel := context.WithCancel(ctx)
updateCtx, cancel := context.WithCancel(lh.ctx)
// We seem to have a race right here, the first caller never gets canceled because the updateCancel is set by the 2nd?
nbrownus marked this conversation as resolved.
Show resolved Hide resolved
lh.updateCancel = cancel
nbrownus marked this conversation as resolved.
Show resolved Hide resolved
defer clockSource.Stop()

for {
lh.SendUpdate(f)
go func() {
defer clockSource.Stop()

select {
case <-updateCtx.Done():
return
case <-clockSource.C:
continue
for {
lh.SendUpdate()

select {
case <-updateCtx.Done():
return
case <-clockSource.C:
continue
}
}
}
}()
}

func (lh *LightHouse) SendUpdate(f EncWriter) {
func (lh *LightHouse) SendUpdate() {
var v4 []*Ip4AndPort
var v6 []*Ip6AndPort

Expand Down Expand Up @@ -833,7 +833,7 @@ func (lh *LightHouse) SendUpdate(f EncWriter) {
}

for vpnIp := range lighthouses {
f.SendMessageToVpnIp(header.LightHouse, 0, vpnIp, mm, nb, out)
lh.ifce.SendMessageToVpnIp(header.LightHouse, 0, vpnIp, mm, nb, out)
}
}

Expand Down
29 changes: 29 additions & 0 deletions lighthouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,35 @@ func Test_lhStaticMapping(t *testing.T) {
assert.EqualError(t, err, "lighthouse 10.128.0.3 does not have a static_host_map entry")
}

func TestReloadLighthouseInterval(t *testing.T) {
l := test.NewLogger()
_, myVpnNet, _ := net.ParseCIDR("10.128.0.1/16")
lh1 := "10.128.0.2"

c := config.NewC(l)
c.Settings["lighthouse"] = map[interface{}]interface{}{
"hosts": []interface{}{lh1},
"interval": "1s",
}

c.Settings["static_host_map"] = map[interface{}]interface{}{lh1: []interface{}{"1.1.1.1:4242"}}
lh, err := NewLightHouseFromConfig(context.Background(), l, c, myVpnNet, nil, nil)
assert.NoError(t, err)
lh.ifce = &mockEncWriter{}

// The first one routine is kicked off by main.go currently, lets make sure that one dies
c.ReloadConfigString("lighthouse:\n interval: 5")
assert.Equal(t, int64(5), lh.interval.Load())

// Subsequent calls are killed off by the LightHouse.Reload function
c.ReloadConfigString("lighthouse:\n interval: 10")
assert.Equal(t, int64(10), lh.interval.Load())

// If this completes then nothing is stealing our reload routine
c.ReloadConfigString("lighthouse:\n interval: 11")
assert.Equal(t, int64(11), lh.interval.Load())
}

func BenchmarkLighthouseHandleRequest(b *testing.B) {
l := test.NewLogger()
_, myVpnNet, _ := net.ParseCIDR("10.128.0.1/0")
Expand Down
13 changes: 10 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,12 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg
// TODO: Better way to attach these, probably want a new interface in InterfaceConfig
// I don't want to make this initial commit too far-reaching though
ifce.writers = udpConns
lightHouse.ifce = ifce

ifce.RegisterConfigChangeCallbacks(c)

ifce.reloadSendRecvError(c)

go handshakeManager.Run(ctx, ifce)
go lightHouse.LhUpdateWorker(ctx, ifce)
}

// TODO - stats third-party modules start uncancellable goroutines. Update those libs to accept
Expand All @@ -348,5 +347,13 @@ func Main(c *config.C, configTest bool, buildVersion string, logger *logrus.Logg
dnsStart = dnsMain(l, hostMap, c)
}

return &Control{ifce, l, cancel, sshStart, statsStart, dnsStart}, nil
return &Control{
ifce,
l,
cancel,
sshStart,
statsStart,
dnsStart,
lightHouse.StartUpdateWorker,
}, nil
}