diff --git a/.github/workflows/smoke.yml b/.github/workflows/smoke.yml index 54833bdc7..f7a73d314 100644 --- a/.github/workflows/smoke.yml +++ b/.github/workflows/smoke.yml @@ -26,7 +26,7 @@ jobs: check-latest: true - name: build - run: make bin-docker CGO_ENABLED=1 BUILD_ARGS=-race + run: make bin-docker CGO_ENABLED=1 BUILD_ARGS="-race -tags=mutex_debug" - name: setup docker image working-directory: ./.github/workflows/smoke diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 844eaf2fc..d452a49c0 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -35,7 +35,7 @@ jobs: run: make test - name: End 2 end - run: make e2evv + run: make e2e-mutex-debug TEST_LOGS=1 TEST_FLAGS=-v - name: Build test mobile run: make build-test-mobile diff --git a/Makefile b/Makefile index 0d0943f0a..add686d35 100644 --- a/Makefile +++ b/Makefile @@ -63,6 +63,9 @@ ALL = $(ALL_LINUX) \ e2e: $(TEST_ENV) go test -tags=e2e_testing -count=1 $(TEST_FLAGS) ./e2e +e2e-mutex-debug: + $(TEST_ENV) go test -tags=mutex_debug,e2e_testing -count=1 $(TEST_FLAGS) ./e2e + e2ev: TEST_FLAGS = -v e2ev: e2e @@ -206,6 +209,7 @@ ifeq ($(words $(MAKECMDGOALS)),1) @$(MAKE) service ${.DEFAULT_GOAL} --no-print-directory endif +bin-docker: BUILD_ARGS = -tags=mutex_debug bin-docker: bin build/linux-amd64/nebula build/linux-amd64/nebula-cert smoke-docker: bin-docker diff --git a/connection_manager.go b/connection_manager.go index 0b277b5c1..f9e1b7173 100644 --- a/connection_manager.go +++ b/connection_manager.go @@ -3,7 +3,6 @@ package nebula import ( "bytes" "context" - "sync" "time" "github.com/rcrowley/go-metrics" @@ -28,14 +27,14 @@ const ( type connectionManager struct { in map[uint32]struct{} - inLock *sync.RWMutex + inLock syncRWMutex out map[uint32]struct{} - outLock *sync.RWMutex + outLock syncRWMutex // relayUsed holds which relay localIndexs are in use relayUsed map[uint32]struct{} - relayUsedLock *sync.RWMutex + relayUsedLock syncRWMutex hostMap *HostMap trafficTimer *LockingTimerWheel[uint32] @@ -60,12 +59,12 @@ func newConnectionManager(ctx context.Context, l *logrus.Logger, intf *Interface nc := &connectionManager{ hostMap: intf.hostMap, in: make(map[uint32]struct{}), - inLock: &sync.RWMutex{}, + inLock: newSyncRWMutex("connection-manager-in"), out: make(map[uint32]struct{}), - outLock: &sync.RWMutex{}, + outLock: newSyncRWMutex("connection-manager-out"), relayUsed: make(map[uint32]struct{}), - relayUsedLock: &sync.RWMutex{}, - trafficTimer: NewLockingTimerWheel[uint32](time.Millisecond*500, max), + relayUsedLock: newSyncRWMutex("connection-manager-relay-used"), + trafficTimer: NewLockingTimerWheel[uint32]("connection-manager-timer", time.Millisecond*500, max), intf: intf, pendingDeletion: make(map[uint32]struct{}), checkInterval: checkInterval, diff --git a/connection_state.go b/connection_state.go index 8ef8b3a24..5373f967c 100644 --- a/connection_state.go +++ b/connection_state.go @@ -3,7 +3,6 @@ package nebula import ( "crypto/rand" "encoding/json" - "sync" "sync/atomic" "github.com/flynn/noise" @@ -23,7 +22,7 @@ type ConnectionState struct { initiator bool messageCounter atomic.Uint64 window *Bits - writeLock sync.Mutex + writeLock syncMutex } func NewConnectionState(l *logrus.Logger, cipher string, certState *CertState, initiator bool, pattern noise.HandshakePattern, psk []byte, pskStage int) *ConnectionState { @@ -71,6 +70,7 @@ func NewConnectionState(l *logrus.Logger, cipher string, certState *CertState, i initiator: initiator, window: b, myCert: certState.Certificate, + writeLock: newSyncMutex("connection-state-write"), } return ci diff --git a/dns_server.go b/dns_server.go index 4e7bb83af..bc25adc2c 100644 --- a/dns_server.go +++ b/dns_server.go @@ -5,7 +5,6 @@ import ( "net" "strconv" "strings" - "sync" "github.com/miekg/dns" "github.com/sirupsen/logrus" @@ -20,15 +19,16 @@ var dnsServer *dns.Server var dnsAddr string type dnsRecords struct { - sync.RWMutex + syncRWMutex dnsMap map[string]string hostMap *HostMap } func newDnsRecords(hostMap *HostMap) *dnsRecords { return &dnsRecords{ - dnsMap: make(map[string]string), - hostMap: hostMap, + syncRWMutex: newSyncRWMutex("dns-records"), + dnsMap: make(map[string]string), + hostMap: hostMap, } } diff --git a/firewall.go b/firewall.go index 3e760feb3..d172bd933 100644 --- a/firewall.go +++ b/firewall.go @@ -10,7 +10,6 @@ import ( "reflect" "strconv" "strings" - "sync" "time" "github.com/rcrowley/go-metrics" @@ -73,7 +72,7 @@ type firewallMetrics struct { } type FirewallConntrack struct { - sync.Mutex + syncMutex Conns map[firewall.Packet]*conn TimerWheel *TimerWheel[firewall.Packet] @@ -162,6 +161,7 @@ func NewFirewall(l *logrus.Logger, tcpTimeout, UDPTimeout, defaultTimeout time.D return &Firewall{ Conntrack: &FirewallConntrack{ + syncMutex: newSyncMutex("firewall-conntrack"), Conns: make(map[firewall.Packet]*conn), TimerWheel: NewTimerWheel[firewall.Packet](min, max), }, diff --git a/go.mod b/go.mod index b1f7215ea..c5e7dddc2 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( dario.cat/mergo v1.0.0 github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be github.com/armon/go-radix v1.0.0 + github.com/clarkmcc/go-dag v0.0.0-20220908000337-9c3ba5b365fc github.com/cyberdelia/go-metrics-graphite v0.0.0-20161219230853-39f87cc3b432 github.com/flynn/noise v1.1.0 github.com/gogo/protobuf v1.3.2 @@ -21,6 +22,7 @@ require ( github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e github.com/songgao/water v0.0.0-20200317203138-2b4b6d7c09d8 github.com/stretchr/testify v1.9.0 + github.com/timandy/routine v1.1.1 github.com/vishvananda/netlink v1.2.1-beta.2 golang.org/x/crypto v0.23.0 golang.org/x/exp v0.0.0-20230725093048-515e97ebf090 diff --git a/go.sum b/go.sum index 0e671861f..44309d006 100644 --- a/go.sum +++ b/go.sum @@ -17,6 +17,8 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/clarkmcc/go-dag v0.0.0-20220908000337-9c3ba5b365fc h1:6e91sWiDE69Jl0WUsY/LvTCBPRBe6b2j8H7W96JGJ4s= +github.com/clarkmcc/go-dag v0.0.0-20220908000337-9c3ba5b365fc/go.mod h1:RGIcF96ORCYAsdz60Ou9mPBNa4+DjoQFS8nelPniFoY= github.com/cyberdelia/go-metrics-graphite v0.0.0-20161219230853-39f87cc3b432 h1:M5QgkYacWj0Xs8MhpIK/5uwU02icXpEoSo9sM2aRCps= github.com/cyberdelia/go-metrics-graphite v0.0.0-20161219230853-39f87cc3b432/go.mod h1:xwIwAxMvYnVrGJPe2FKx5prTrnAjGOD8zvDOnxnrrkM= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -133,6 +135,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/timandy/routine v1.1.1 h1:6/Z7qLFZj3GrzuRksBFzIG8YGUh8CLhjnnMePBQTrEI= +github.com/timandy/routine v1.1.1/go.mod h1:OZHPOKSvqL/ZvqXFkNZyit0xIVelERptYXdAHH00adQ= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/vishvananda/netlink v1.2.1-beta.2 h1:Llsql0lnQEbHj0I1OuKyp8otXp0r3q0mPkuhwHfStVs= diff --git a/handshake_ix.go b/handshake_ix.go index 8727b16f1..d53a6a8f4 100644 --- a/handshake_ix.go +++ b/handshake_ix.go @@ -132,6 +132,7 @@ func ixHandshakeStage1(f *Interface, addr *udp.Addr, via *ViaSender, packet []by } hostinfo := &HostInfo{ + syncRWMutex: newSyncRWMutex("hostinfo"), ConnectionState: ci, localIndexId: myIndex, remoteIndexId: hs.Details.InitiatorIndex, @@ -139,6 +140,7 @@ func ixHandshakeStage1(f *Interface, addr *udp.Addr, via *ViaSender, packet []by HandshakePacket: make(map[uint8][]byte, 0), lastHandshakeTime: hs.Details.Time, relayState: RelayState{ + syncRWMutex: newSyncRWMutex("relay-state"), relays: map[iputil.VpnIp]struct{}{}, relayForByIp: map[iputil.VpnIp]*Relay{}, relayForByIdx: map[uint32]*Relay{}, diff --git a/handshake_manager.go b/handshake_manager.go index 640227a7e..7222919f7 100644 --- a/handshake_manager.go +++ b/handshake_manager.go @@ -7,7 +7,6 @@ import ( "encoding/binary" "errors" "net" - "sync" "time" "github.com/rcrowley/go-metrics" @@ -44,7 +43,7 @@ type HandshakeConfig struct { type HandshakeManager struct { // Mutex for interacting with the vpnIps and indexes maps - sync.RWMutex + syncRWMutex vpnIps map[iputil.VpnIp]*HandshakeHostInfo indexes map[uint32]*HandshakeHostInfo @@ -65,7 +64,7 @@ type HandshakeManager struct { } type HandshakeHostInfo struct { - sync.Mutex + syncMutex startTime time.Time // Time that we first started trying with this handshake ready bool // Is the handshake ready @@ -103,6 +102,7 @@ func (hh *HandshakeHostInfo) cachePacket(l *logrus.Logger, t header.MessageType, func NewHandshakeManager(l *logrus.Logger, mainHostMap *HostMap, lightHouse *LightHouse, outside udp.Conn, config HandshakeConfig) *HandshakeManager { return &HandshakeManager{ + syncRWMutex: newSyncRWMutex("handshake-manager"), vpnIps: map[iputil.VpnIp]*HandshakeHostInfo{}, indexes: map[uint32]*HandshakeHostInfo{}, mainHostMap: mainHostMap, @@ -110,7 +110,7 @@ func NewHandshakeManager(l *logrus.Logger, mainHostMap *HostMap, lightHouse *Lig outside: outside, config: config, trigger: make(chan iputil.VpnIp, config.triggerBuffer), - OutboundHandshakeTimer: NewLockingTimerWheel[iputil.VpnIp](config.tryInterval, hsTimeout(config.retries, config.tryInterval)), + OutboundHandshakeTimer: NewLockingTimerWheel[iputil.VpnIp]("handshake-manager-timer", config.tryInterval, hsTimeout(config.retries, config.tryInterval)), messageMetrics: config.messageMetrics, metricInitiated: metrics.GetOrRegisterCounter("handshake_manager.initiated", nil), metricTimedOut: metrics.GetOrRegisterCounter("handshake_manager.timed_out", nil), @@ -385,9 +385,11 @@ func (hm *HandshakeManager) StartHandshake(vpnIp iputil.VpnIp, cacheCb func(*Han } hostinfo := &HostInfo{ + syncRWMutex: newSyncRWMutex("hostinfo"), vpnIp: vpnIp, HandshakePacket: make(map[uint8][]byte, 0), relayState: RelayState{ + syncRWMutex: newSyncRWMutex("relay-state"), relays: map[iputil.VpnIp]struct{}{}, relayForByIp: map[iputil.VpnIp]*Relay{}, relayForByIdx: map[uint32]*Relay{}, @@ -395,6 +397,7 @@ func (hm *HandshakeManager) StartHandshake(vpnIp iputil.VpnIp, cacheCb func(*Han } hh := &HandshakeHostInfo{ + syncMutex: newSyncMutex("handshake-hostinfo"), hostinfo: hostinfo, startTime: time.Now(), } diff --git a/hostmap.go b/hostmap.go index 589a12463..362a76af6 100644 --- a/hostmap.go +++ b/hostmap.go @@ -3,7 +3,6 @@ package nebula import ( "errors" "net" - "sync" "sync/atomic" "time" @@ -53,7 +52,7 @@ type Relay struct { } type HostMap struct { - sync.RWMutex //Because we concurrently read and write to our maps + syncRWMutex //Because we concurrently read and write to our maps Indexes map[uint32]*HostInfo Relays map[uint32]*HostInfo // Maps a Relay IDX to a Relay HostInfo object RemoteIndexes map[uint32]*HostInfo @@ -67,7 +66,7 @@ type HostMap struct { // struct, make a copy of an existing value, edit the fileds in the copy, and // then store a pointer to the new copy in both realyForBy* maps. type RelayState struct { - sync.RWMutex + syncRWMutex relays map[iputil.VpnIp]struct{} // Set of VpnIp's of Hosts to use as relays to access this peer relayForByIp map[iputil.VpnIp]*Relay // Maps VpnIps of peers for which this HostInfo is a relay to some Relay info @@ -197,6 +196,7 @@ func (rs *RelayState) InsertRelay(ip iputil.VpnIp, idx uint32, r *Relay) { } type HostInfo struct { + syncRWMutex remote *udp.Addr remotes *RemoteList promoteCounter atomic.Uint32 @@ -271,6 +271,7 @@ func NewHostMapFromConfig(l *logrus.Logger, vpnCIDR *net.IPNet, c *config.C) *Ho func newHostMap(l *logrus.Logger, vpnCIDR *net.IPNet) *HostMap { return &HostMap{ + syncRWMutex: newSyncRWMutex("hostmap"), Indexes: map[uint32]*HostInfo{}, Relays: map[uint32]*HostInfo{}, RemoteIndexes: map[uint32]*HostInfo{}, diff --git a/lighthouse.go b/lighthouse.go index aa54c4bc5..bdabbd718 100644 --- a/lighthouse.go +++ b/lighthouse.go @@ -7,7 +7,6 @@ import ( "fmt" "net" "net/netip" - "sync" "sync/atomic" "time" @@ -33,7 +32,7 @@ type netIpAndPort struct { type LightHouse struct { //TODO: We need a timer wheel to kick out vpnIps that haven't reported in a long time - sync.RWMutex //Because we concurrently read and write to our maps + syncRWMutex //Because we concurrently read and write to our maps ctx context.Context amLighthouse bool myVpnIp iputil.VpnIp @@ -103,6 +102,7 @@ func NewLightHouseFromConfig(ctx context.Context, l *logrus.Logger, c *config.C, ones, _ := myVpnNet.Mask.Size() h := LightHouse{ + syncRWMutex: newSyncRWMutex("lighthouse"), ctx: ctx, amLighthouse: amLighthouse, myVpnIp: iputil.Ip2VpnIp(myVpnNet.IP), @@ -468,6 +468,7 @@ func (lh *LightHouse) QueryServer(ip iputil.VpnIp) { return } + chanDebugSend("lighthouse-query-chan") lh.queryChan <- ip } @@ -750,6 +751,8 @@ func (lh *LightHouse) startQueryWorker() { nb := make([]byte, 12, 12) out := make([]byte, mtu) + chanDebugRecv("lighthouse-query-chan") + for { select { case <-lh.ctx.Done(): diff --git a/mutex_debug.go b/mutex_debug.go new file mode 100644 index 000000000..ce5050a57 --- /dev/null +++ b/mutex_debug.go @@ -0,0 +1,195 @@ +//go:build mutex_debug +// +build mutex_debug + +package nebula + +import ( + "fmt" + "runtime" + "sync" + + "github.com/clarkmcc/go-dag" + "github.com/timandy/routine" +) + +type mutexKey = string + +// For each Key in this map, the Value is a list of lock types you can already have +// when you want to grab that Key. This ensures that locks are always fetched +// in the same order, to prevent deadlocks. +var allowedConcurrentLocks = map[mutexKey][]mutexKey{ + "connection-manager-in": {"hostmap"}, + "connection-manager-out": {"connection-state-write", "connection-manager-in"}, + "connection-manager-relay-used": {"handshake-hostinfo"}, + "connection-manager-timer": {"connection-manager-out"}, + "connection-state-write": {"hostmap"}, + "firewall-conntrack": {"handshake-hostinfo"}, + "handshake-manager": {"hostmap"}, + "handshake-manager-timer": {"handshake-manager"}, + "hostmap": {"handshake-hostinfo", "lighthouse-query-chan"}, + "lighthouse": {"handshake-manager"}, + "relay-state": {"hostmap", "connection-manager-relay-used"}, + "remote-list": {"lighthouse"}, + "lighthouse-query-chan": {"handshake-hostinfo"}, +} + +type mutexValue struct { + file string + line int +} + +func (m mutexValue) String() string { + return fmt.Sprintf("%s:%d", m.file, m.line) +} + +var threadLocal routine.ThreadLocal = routine.NewThreadLocalWithInitial(func() any { return map[mutexKey]mutexValue{} }) + +var allowedDAG dag.AcyclicGraph + +// We build a directed acyclic graph to assert that the locks can only be +// acquired in a determined order, If there are cycles in the DAG, then we +// know that the locking order is not guaranteed. +func init() { + for k, v := range allowedConcurrentLocks { + allowedDAG.Add(k) + for _, t := range v { + allowedDAG.Add(t) + } + } + for k, v := range allowedConcurrentLocks { + for _, t := range v { + allowedDAG.Connect(dag.BasicEdge(k, t)) + } + } + + if cycles := allowedDAG.Cycles(); len(cycles) > 0 { + panic(fmt.Errorf("Cycles found in allowedConcurrentLocks: %v", cycles)) + } + + // Rebuild allowedConcurrentLocks as a flattened list of all possibilities + for k := range allowedConcurrentLocks { + ancestors, err := allowedDAG.Ancestors(k) + if err != nil { + panic(err) + } + + var allowed []mutexKey + for t := range ancestors { + allowed = append(allowed, t.(mutexKey)) + } + allowedConcurrentLocks[k] = allowed + } +} + +type syncRWMutex struct { + sync.RWMutex + mutexKey +} + +type syncMutex struct { + sync.Mutex + mutexKey +} + +func newSyncRWMutex(key mutexKey) syncRWMutex { + return syncRWMutex{ + mutexKey: key, + } +} + +func newSyncMutex(key mutexKey) syncMutex { + return syncMutex{ + mutexKey: key, + } +} + +func alertMutex(err error) { + panic(err) + // NOTE: you could switch to this log Line and remove the panic if you want + // to log all failures instead of panicking on the first one + //log.Print(err, string(debug.Stack())) +} + +func checkMutex(state map[mutexKey]mutexValue, add mutexKey) { + if add == "" { + alertMutex(fmt.Errorf("mutex not initialized with mutexKey")) + } + + allowedConcurrent := allowedConcurrentLocks[add] + + for k, v := range state { + if add == k { + alertMutex(fmt.Errorf("re-entrant lock: %s. previous allocation: %s", add, v)) + } + + // TODO use slices.Contains, but requires go1.21 + var found bool + for _, a := range allowedConcurrent { + if a == k { + found = true + break + } + } + if !found { + alertMutex(fmt.Errorf("grabbing %s lock and already have these locks: %s", add, state)) + } + } +} + +func chanDebugRecv(key mutexKey) { + m := threadLocal.Get().(map[mutexKey]mutexValue) + checkMutex(m, key) + v := mutexValue{} + _, v.file, v.line, _ = runtime.Caller(1) + m[key] = v +} + +func chanDebugSend(key mutexKey) { + m := threadLocal.Get().(map[mutexKey]mutexValue) + checkMutex(m, key) +} + +func (s *syncRWMutex) Lock() { + m := threadLocal.Get().(map[mutexKey]mutexValue) + checkMutex(m, s.mutexKey) + v := mutexValue{} + _, v.file, v.line, _ = runtime.Caller(1) + m[s.mutexKey] = v + s.RWMutex.Lock() +} + +func (s *syncRWMutex) Unlock() { + m := threadLocal.Get().(map[mutexKey]mutexValue) + delete(m, s.mutexKey) + s.RWMutex.Unlock() +} + +func (s *syncRWMutex) RLock() { + m := threadLocal.Get().(map[mutexKey]mutexValue) + checkMutex(m, s.mutexKey) + v := mutexValue{} + _, v.file, v.line, _ = runtime.Caller(1) + m[s.mutexKey] = v + s.RWMutex.RLock() +} + +func (s *syncRWMutex) RUnlock() { + m := threadLocal.Get().(map[mutexKey]mutexValue) + delete(m, s.mutexKey) + s.RWMutex.RUnlock() +} + +func (s *syncMutex) Lock() { + m := threadLocal.Get().(map[mutexKey]mutexValue) + checkMutex(m, s.mutexKey) + v := mutexValue{} + _, v.file, v.line, _ = runtime.Caller(1) + m[s.mutexKey] = v + s.Mutex.Lock() +} + +func (s *syncMutex) Unlock() { + m := threadLocal.Get().(map[mutexKey]mutexValue) + delete(m, s.mutexKey) + s.Mutex.Unlock() +} diff --git a/mutex_nodebug.go b/mutex_nodebug.go new file mode 100644 index 000000000..87e0c5a95 --- /dev/null +++ b/mutex_nodebug.go @@ -0,0 +1,23 @@ +//go:build !mutex_debug +// +build !mutex_debug + +package nebula + +import ( + "sync" +) + +type mutexKey = string +type syncRWMutex = sync.RWMutex +type syncMutex = sync.Mutex + +func newSyncRWMutex(mutexKey) syncRWMutex { + return sync.RWMutex{} +} + +func newSyncMutex(mutexKey) syncMutex { + return sync.Mutex{} +} + +func chanDebugRecv(key mutexKey) {} +func chanDebugSend(key mutexKey) {} diff --git a/remote_list.go b/remote_list.go index 60a1afdaf..b07d15cc1 100644 --- a/remote_list.go +++ b/remote_list.go @@ -7,7 +7,6 @@ import ( "net/netip" "sort" "strconv" - "sync" "sync/atomic" "time" @@ -190,7 +189,7 @@ func (hr *hostnamesResults) GetIPs() []netip.AddrPort { // It serves as a local cache of query replies, host update notifications, and locally learned addresses type RemoteList struct { // Every interaction with internals requires a lock! - sync.RWMutex + syncRWMutex // A deduplicated set of addresses. Any accessor should lock beforehand. addrs []*udp.Addr @@ -217,10 +216,11 @@ type RemoteList struct { // NewRemoteList creates a new empty RemoteList func NewRemoteList(shouldAdd func(netip.Addr) bool) *RemoteList { return &RemoteList{ - addrs: make([]*udp.Addr, 0), - relays: make([]*iputil.VpnIp, 0), - cache: make(map[iputil.VpnIp]*cache), - shouldAdd: shouldAdd, + syncRWMutex: newSyncRWMutex("remote-list"), + addrs: make([]*udp.Addr, 0), + relays: make([]*iputil.VpnIp, 0), + cache: make(map[iputil.VpnIp]*cache), + shouldAdd: shouldAdd, } } diff --git a/timeout.go b/timeout.go index c1b4c398b..705e58f56 100644 --- a/timeout.go +++ b/timeout.go @@ -1,7 +1,6 @@ package nebula import ( - "sync" "time" ) @@ -34,7 +33,7 @@ type TimerWheel[T any] struct { } type LockingTimerWheel[T any] struct { - m sync.Mutex + m syncMutex t *TimerWheel[T] } @@ -81,8 +80,9 @@ func NewTimerWheel[T any](min, max time.Duration) *TimerWheel[T] { } // NewLockingTimerWheel is version of TimerWheel that is safe for concurrent use with a small performance penalty -func NewLockingTimerWheel[T any](min, max time.Duration) *LockingTimerWheel[T] { +func NewLockingTimerWheel[T any](name string, min, max time.Duration) *LockingTimerWheel[T] { return &LockingTimerWheel[T]{ + m: newSyncMutex(name), t: NewTimerWheel[T](min, max), } }