From fe59f152300dfb852eae500a9f04897a379cd7eb Mon Sep 17 00:00:00 2001 From: Florin Peter Date: Thu, 11 May 2023 17:37:57 +0200 Subject: [PATCH] fix concurrent map access --- cmd/cmd.go | 2 +- metalbond.go | 14 +++- peer.go | 81 ++++++++++++++++---- peer_test.go | 208 ++++++++++++++++++++++++++++++++++++++++++++++---- suite_test.go | 3 +- 5 files changed, 276 insertions(+), 32 deletions(-) diff --git a/cmd/cmd.go b/cmd/cmd.go index 7af8a45..d99af51 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -150,7 +150,7 @@ func main() { } for _, server := range CLI.Client.Server { - if err := m.AddPeer(server); err != nil { + if err := m.AddPeer(server, ""); err != nil { panic(fmt.Errorf("failed to add server: %v", err)) } } diff --git a/metalbond.go b/metalbond.go index 3e72a09..df12ddd 100644 --- a/metalbond.go +++ b/metalbond.go @@ -18,6 +18,7 @@ import ( "fmt" "net" "sync" + "time" "github.com/onmetal/metalbond/pb" "github.com/sirupsen/logrus" @@ -74,7 +75,7 @@ func (m *MetalBond) StartHTTPServer(listen string) error { return nil } -func (m *MetalBond) AddPeer(addr string) error { +func (m *MetalBond) AddPeer(addr, localIP string) error { m.mtxPeers.Lock() defer m.mtxPeers.Unlock() @@ -86,6 +87,7 @@ func (m *MetalBond) AddPeer(addr string) error { m.peers[addr] = newMetalBondPeer( nil, addr, + localIP, m.keepaliveInterval, OUTGOING, m) @@ -101,7 +103,9 @@ func (m *MetalBond) RemovePeer(addr string) error { func (m *MetalBond) unsafeRemovePeer(addr string) { m.log().Infof("Removing peer %s", addr) + m.mtxPeers.RLock() p, exists := m.peers[addr] + m.mtxPeers.RUnlock() if !exists { m.log().Errorf("Peer %s does not exist", addr) } else { @@ -128,7 +132,10 @@ func (m *MetalBond) PeerState(addr string) (ConnectionState, error) { func (m *MetalBond) Subscribe(vni VNI) error { m.mtxMySubscriptions.Lock() - defer m.mtxMySubscriptions.Unlock() + defer func() { + m.mtxMySubscriptions.Unlock() + time.Sleep(1 * time.Second) + }() if _, exists := m.mySubscriptions[vni]; exists { return fmt.Errorf("Already subscribed to VNI %d", vni) @@ -384,10 +391,12 @@ func (m *MetalBond) removeSubscriber(peer *metalBondPeer, vni VNI) error { m.mtxSubscribers.RLock() if _, exists := m.subscribers[vni]; !exists { + m.mtxSubscribers.RUnlock() return fmt.Errorf("Peer is not subscribed!") } if _, exists := m.subscribers[vni][peer]; !exists { + m.mtxSubscribers.RUnlock() return fmt.Errorf("Peer is not subscribed!") } m.mtxSubscribers.RUnlock() @@ -440,6 +449,7 @@ func (m *MetalBond) StartServer(listenAddress string) error { p := newMetalBondPeer( &conn, conn.RemoteAddr().String(), + "", m.keepaliveInterval, INCOMING, m, diff --git a/peer.go b/peer.go index 6dbe732..a369632 100644 --- a/peer.go +++ b/peer.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "io" + "math/rand" "net" "sync" "time" @@ -25,11 +26,14 @@ import ( "github.com/sirupsen/logrus" ) -var RETRY_INTERVAL = time.Duration(5 * time.Second) +var RetryIntervalMin = 5 +var RetryIntervalMax = 5 type metalBondPeer struct { conn *net.Conn remoteAddr string + localIP string + localAddr string direction ConnectionDirection isServer bool @@ -37,8 +41,9 @@ type metalBondPeer struct { mtxState sync.RWMutex state ConnectionState - receivedRoutes routeTable - subscribedVNIs map[VNI]bool + receivedRoutes routeTable + subscribedVNIs map[VNI]bool + mtxSubscribedVNIs sync.RWMutex metalbond *MetalBond @@ -60,6 +65,7 @@ type metalBondPeer struct { func newMetalBondPeer( pconn *net.Conn, remoteAddr string, + localIP string, keepaliveInterval uint32, direction ConnectionDirection, metalbond *MetalBond) *metalBondPeer { @@ -67,6 +73,7 @@ func newMetalBondPeer( peer := metalBondPeer{ conn: pconn, remoteAddr: remoteAddr, + localIP: localIP, direction: direction, state: CONNECTING, receivedRoutes: newRouteTable(), @@ -185,7 +192,21 @@ func (p *metalBondPeer) setState(newState ConnectionState) { // Connection lost if oldState != newState && newState != ESTABLISHED { + subscribers := make(map[VNI]map[*metalBondPeer]bool) + p.metalbond.mtxSubscribers.RLock() for vni, peers := range p.metalbond.subscribers { + for peer := range peers { + if p == peer { + if _, ok := subscribers[vni]; !ok { + subscribers[vni] = make(map[*metalBondPeer]bool) + } + subscribers[vni][p] = true + } + } + } + p.metalbond.mtxSubscribers.RUnlock() + + for vni, peers := range subscribers { for peer := range peers { if p == peer { if err := p.metalbond.removeSubscriber(p, vni); err != nil { @@ -198,13 +219,16 @@ func (p *metalBondPeer) setState(newState ConnectionState) { } func (p *metalBondPeer) log() *logrus.Entry { - return logrus.WithField("peer", p.remoteAddr).WithField("state", p.GetState().String()) + return logrus.WithField("peer", p.remoteAddr).WithField("state", p.GetState().String()).WithField("localAddr", p.localAddr) } func (p *metalBondPeer) cleanup() { p.log().Debugf("cleanup") // unsubscribe from VNIs + p.mtxSubscribedVNIs.RLock() + defer p.mtxSubscribedVNIs.RUnlock() + for vni := range p.subscribedVNIs { err := p.metalbond.Unsubscribe(vni) if err != nil { @@ -239,11 +263,11 @@ func (p *metalBondPeer) handle() { p.shutdown = make(chan bool, 5) p.keepaliveStop = make(chan bool, 5) p.txChanClose = make(chan bool, 5) - p.rxHello = make(chan msgHello, 50) - p.rxKeepalive = make(chan msgKeepalive, 50) - p.rxSubscribe = make(chan msgSubscribe, 1000) - p.rxUnsubscribe = make(chan msgUnsubscribe, 1000) - p.rxUpdate = make(chan msgUpdate, 1000) + p.rxHello = make(chan msgHello, 5) + p.rxKeepalive = make(chan msgKeepalive, 5) + p.rxSubscribe = make(chan msgSubscribe, 100) + p.rxUnsubscribe = make(chan msgUnsubscribe, 100) + p.rxUpdate = make(chan msgUpdate, 100) // outgoing connections still need to be established. pconn is nil. for p.conn == nil { @@ -258,14 +282,34 @@ func (p *metalBondPeer) handle() { // proceed } - conn, err := net.Dial("tcp", p.remoteAddr) + var err error + localAddr := &net.TCPAddr{} + if p.localIP != "" { + localAddr, err = net.ResolveTCPAddr("tcp", p.localIP+":0") + if err != nil { + p.log().Errorf("Error resolving local address: %s", err) + return + } + } + + remoteAddr, err := net.ResolveTCPAddr("tcp", p.remoteAddr) if err != nil { - logrus.Infof("Cannot connect to server - %v - retry in %v", err, RETRY_INTERVAL) - time.Sleep(RETRY_INTERVAL) + p.log().Errorf("Error resolving remove address: %s", err) + return + } + + tcpConn, err := net.DialTCP("tcp", localAddr, remoteAddr) + if err != nil { + retry := time.Duration(rand.Intn(RetryIntervalMax)+RetryIntervalMin) * time.Second + logrus.Infof("Cannot connect to server - %v - retry in %v", err, retry) + time.Sleep(retry) continue } + conn := net.Conn(tcpConn) + p.localAddr = conn.LocalAddr().String() p.conn = &conn + } go p.rxLoop() @@ -471,6 +515,8 @@ func (p *metalBondPeer) processRxKeepalive(msg msgKeepalive) { func (p *metalBondPeer) processRxSubscribe(msg msgSubscribe) { p.log().Debugf("processRxSubscribe %#v", msg) + p.mtxSubscribedVNIs.Lock() + defer p.mtxSubscribedVNIs.Unlock() p.subscribedVNIs[msg.VNI] = true if err := p.metalbond.addSubscriber(p, msg.VNI); err != nil { p.log().Errorf("Failed to add subscriber: %v", err) @@ -479,10 +525,15 @@ func (p *metalBondPeer) processRxSubscribe(msg msgSubscribe) { func (p *metalBondPeer) processRxUnsubscribe(msg msgUnsubscribe) { p.log().Debugf("processRxUnsubscribe %#v", msg) + p.metalbond.mtxSubscribers.Lock() + defer p.metalbond.mtxSubscribers.Unlock() + if err := p.metalbond.removeSubscriber(p, msg.VNI); err != nil { p.log().Errorf("Failed to remove subscriber: %v", err) } + p.mtxSubscribedVNIs.RLock() + defer p.mtxSubscribedVNIs.RUnlock() delete(p.subscribedVNIs, msg.VNI) } @@ -554,9 +605,11 @@ func (p *metalBondPeer) Reset() { p.wg.Wait() p.conn = nil - p.log().Infof("Closed. Waiting %s...", RETRY_INTERVAL) + p.localAddr = "" + retry := time.Duration(rand.Intn(RetryIntervalMax)+RetryIntervalMin) * time.Second + p.log().Infof("Closed. Waiting %s...", retry) - time.Sleep(RETRY_INTERVAL) + time.Sleep(retry) p.setState(CONNECTING) p.log().Infof("Reconnecting...") diff --git a/peer_test.go b/peer_test.go index 236858d..894cca8 100644 --- a/peer_test.go +++ b/peer_test.go @@ -16,6 +16,10 @@ package metalbond import ( "fmt" + "math/rand" + "net" + "net/netip" + "sync" "time" . "github.com/onsi/ginkgo/v2" @@ -47,10 +51,13 @@ var _ = Describe("Peer", func() { It("should reset", func() { mbClient := NewMetalBond(Config{}, client) - err := mbClient.AddPeer(serverAddress) + err := mbClient.AddPeer(serverAddress, "127.0.0.2") Expect(err).NotTo(HaveOccurred()) - Expect(waitForPeerState(mbServer, ESTABLISHED)).NotTo(BeFalse()) + clientAddr := getLocalAddr(mbClient, "") + Expect(clientAddr).NotTo(Equal("")) + + Expect(waitForPeerState(mbServer, clientAddr, ESTABLISHED)).NotTo(BeFalse()) var p *metalBondPeer for _, peer := range mbServer.peers { @@ -66,16 +73,22 @@ var _ = Describe("Peer", func() { // expect the peer state to be closed Expect(p.GetState()).To(Equal(CLOSED)) + clientAddr = getLocalAddr(mbClient, clientAddr) + Expect(clientAddr).NotTo(Equal("")) + // wait for the peer to be established again - Expect(waitForPeerState(mbServer, ESTABLISHED)).NotTo(BeFalse()) + Expect(waitForPeerState(mbServer, clientAddr, ESTABLISHED)).NotTo(BeFalse()) }) It("should reconnect", func() { mbClient := NewMetalBond(Config{}, client) - err := mbClient.AddPeer(serverAddress) + err := mbClient.AddPeer(serverAddress, "127.0.0.2") Expect(err).NotTo(HaveOccurred()) - Expect(waitForPeerState(mbServer, ESTABLISHED)).NotTo(BeFalse()) + clientAddr := getLocalAddr(mbClient, "") + Expect(clientAddr).NotTo(Equal("")) + + Expect(waitForPeerState(mbServer, clientAddr, ESTABLISHED)).NotTo(BeFalse()) var p *metalBondPeer for _, peer := range mbServer.peers { @@ -86,21 +99,188 @@ var _ = Describe("Peer", func() { // Close the peer p.Close() - // + // expect the peer state to be closed Expect(p.GetState()).To(Equal(CLOSED)) + clientAddr = getLocalAddr(mbClient, clientAddr) + Expect(clientAddr).NotTo(Equal("")) + // wait for the peer to be established again - Expect(waitForPeerState(mbServer, ESTABLISHED)).NotTo(BeFalse()) + Expect(waitForPeerState(mbServer, clientAddr, ESTABLISHED)).NotTo(BeFalse()) + }) + + It("should announce", func() { + totalClients := 1000 + var wg sync.WaitGroup + + for i := 1; i < totalClients+1; i++ { + wg.Add(1) + + go func(index int) { + defer wg.Done() + mbClient := NewMetalBond(Config{}, client) + localIP := net.ParseIP("127.0.0.1") + localIP = incrementIPv4(localIP, index) + err := mbClient.AddPeer(serverAddress, localIP.String()) + Expect(err).NotTo(HaveOccurred()) + + // wait for the peer loop to start + time.Sleep(1 * time.Second) + clientAddr := getLocalAddr(mbClient, "") + Expect(clientAddr).NotTo(Equal("")) + + Expect(waitForPeerState(mbServer, clientAddr, ESTABLISHED)).NotTo(BeFalse()) + + mbServer.mtxPeers.RLock() + p := mbServer.peers[clientAddr] + mbServer.mtxPeers.RUnlock() + + vni := VNI(index % 10) + err = mbClient.Subscribe(vni) + if err != nil { + log.Errorf("subscribe failed: %v", err) + } + Expect(err).NotTo(HaveOccurred()) + + // prepare the route + startIP := net.ParseIP("100.64.0.0") + ip := incrementIPv4(startIP, index) + addr, err := netip.ParseAddr(ip.String()) + Expect(err).NotTo(HaveOccurred()) + underlayRoute, err := netip.ParseAddr(fmt.Sprintf("b198:5b10:3880:fd32:fb80:80dd:46f7:%d", index)) + Expect(err).NotTo(HaveOccurred()) + dest := Destination{ + Prefix: netip.PrefixFrom(addr, 32), + IPVersion: IPV4, + } + nextHop := NextHop{ + TargetVNI: uint32(vni), + TargetAddress: underlayRoute, + } + + err = mbClient.AnnounceRoute(vni, dest, nextHop) + Expect(err).NotTo(HaveOccurred()) + + // wait for the route to be received + time.Sleep(3 * time.Second) + + // check if the route was received + _, exists := p.receivedRoutes.routes[vni][dest][nextHop][p] + Expect(exists).To(BeTrue()) + Expect(err).NotTo(HaveOccurred()) + + // Close the peer + err = p.metalbond.RemovePeer(p.remoteAddr) + + // expect the peer state to be closed + Expect(p.GetState()).To(Equal(CLOSED)) + + // wait for the peer to be established again + wait := rand.Intn(20) + 1 + time.Sleep(time.Duration(wait) * time.Second) + + notExcept := clientAddr + clientAddr = getLocalAddr(mbClient, notExcept) + if clientAddr == "" { + log.Errorf("clientAddr is empty '%s'", clientAddr) + } + Expect(clientAddr).ShouldNot(BeEmpty()) + + // check if the peer is established again + Expect(waitForPeerState(mbServer, clientAddr, ESTABLISHED)).NotTo(BeFalse()) + + mbServer.mtxPeers.RLock() + p = mbServer.peers[clientAddr] + mbServer.mtxPeers.RUnlock() + + // wait for the route to be received + time.Sleep(3 * time.Second) + + // check if the route was received + _, exists = p.receivedRoutes.routes[vni][dest][nextHop][p] + if !exists { + log.Errorf("route not received vni %v, dest %v, nextHop %v, clientAddr %s", vni, dest, nextHop, clientAddr) + for vni, dest := range p.receivedRoutes.routes { + log.Errorf("vni %v", vni) + for dest, nextHop := range dest { + log.Errorf("dest %v", dest) + for nextHop, peers := range nextHop { + log.Errorf("nextHop %v", nextHop) + for peer := range peers { + log.Errorf("peer %v", peer) + } + } + } + } + + } + Expect(exists).To(BeTrue()) + }(i) + } + + wg.Wait() }) }) -func waitForPeerState(mbServer *MetalBond, expectedState ConnectionState) bool { - return Eventually(func() bool { - for _, peer := range mbServer.peers { - if peer.GetState() == expectedState { - return true +func waitForPeerState(mbServer *MetalBond, clientAddr string, expectedState ConnectionState) bool { + + // Call the checkPeerState function repeatedly until it returns true or a timeout is reached + timeout := 30 * time.Second + start := time.Now() + for { + mbServer.mtxPeers.RLock() + peer := mbServer.peers[clientAddr] + mbServer.mtxPeers.RUnlock() + + if peer != nil && peer.GetState() == expectedState { + return true + } + + if time.Since(start) >= timeout { + state := "NONE" + if peer != nil { + state = peer.GetState().String() } + log.Errorf("Timeout reached while waiting for peer (%s) to reach expected state %s, but state is %s", clientAddr, expectedState, state) + return false + } + + // Wait a short time before checking again + time.Sleep(500 * time.Millisecond) + } +} + +func getLocalAddr(mbClient *MetalBond, notExcept string) string { + timeout := 30 * time.Second + start := time.Now() + for { + for _, peer := range mbClient.peers { + if peer.localAddr != "" && peer.localAddr != notExcept { + return peer.localAddr + } + } + + if time.Since(start) >= timeout { + return "" + } + + // Wait a short time before checking again + time.Sleep(500 * time.Millisecond) + } +} + +func incrementIPv4(ip net.IP, count int) net.IP { + // Increment the IP address by the count + for i := len(ip) - 1; i >= 0; i-- { + octet := int(ip[i]) + (count % 256) + count /= 256 + if octet > 255 { + octet = 255 + } + ip[i] = byte(octet) + if count == 0 { + break } - return false - }, 10*time.Second).Should(BeTrue(), fmt.Sprintf("expected peer state to be %s", expectedState)) + } + return ip } diff --git a/suite_test.go b/suite_test.go index c5d8a97..37e9f9c 100644 --- a/suite_test.go +++ b/suite_test.go @@ -30,7 +30,8 @@ func TestMetalbond(t *testing.T) { } var _ = BeforeSuite(func() { - log.SetLevel(log.TraceLevel) + //log.SetLevel(log.TraceLevel) + log.SetLevel(log.InfoLevel) }) func getRandomTCPPort() int {