Skip to content

Commit

Permalink
fix concurrent map access
Browse files Browse the repository at this point in the history
  • Loading branch information
FlorinPeter authored and guvenc committed Jun 28, 2023
1 parent d4278c8 commit fe59f15
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 32 deletions.
2 changes: 1 addition & 1 deletion cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func main() {
}

for _, server := range CLI.Client.Server {
if err := m.AddPeer(server); err != nil {
if err := m.AddPeer(server, ""); err != nil {
panic(fmt.Errorf("failed to add server: %v", err))
}
}
Expand Down
14 changes: 12 additions & 2 deletions metalbond.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"net"
"sync"
"time"

"github.com/onmetal/metalbond/pb"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -74,7 +75,7 @@ func (m *MetalBond) StartHTTPServer(listen string) error {
return nil
}

func (m *MetalBond) AddPeer(addr string) error {
func (m *MetalBond) AddPeer(addr, localIP string) error {
m.mtxPeers.Lock()
defer m.mtxPeers.Unlock()

Expand All @@ -86,6 +87,7 @@ func (m *MetalBond) AddPeer(addr string) error {
m.peers[addr] = newMetalBondPeer(
nil,
addr,
localIP,
m.keepaliveInterval,
OUTGOING,
m)
Expand All @@ -101,7 +103,9 @@ func (m *MetalBond) RemovePeer(addr string) error {

func (m *MetalBond) unsafeRemovePeer(addr string) {
m.log().Infof("Removing peer %s", addr)
m.mtxPeers.RLock()
p, exists := m.peers[addr]
m.mtxPeers.RUnlock()
if !exists {
m.log().Errorf("Peer %s does not exist", addr)
} else {
Expand All @@ -128,7 +132,10 @@ func (m *MetalBond) PeerState(addr string) (ConnectionState, error) {

func (m *MetalBond) Subscribe(vni VNI) error {
m.mtxMySubscriptions.Lock()
defer m.mtxMySubscriptions.Unlock()
defer func() {
m.mtxMySubscriptions.Unlock()
time.Sleep(1 * time.Second)
}()

if _, exists := m.mySubscriptions[vni]; exists {
return fmt.Errorf("Already subscribed to VNI %d", vni)
Expand Down Expand Up @@ -384,10 +391,12 @@ func (m *MetalBond) removeSubscriber(peer *metalBondPeer, vni VNI) error {

m.mtxSubscribers.RLock()
if _, exists := m.subscribers[vni]; !exists {
m.mtxSubscribers.RUnlock()
return fmt.Errorf("Peer is not subscribed!")
}

if _, exists := m.subscribers[vni][peer]; !exists {
m.mtxSubscribers.RUnlock()
return fmt.Errorf("Peer is not subscribed!")
}
m.mtxSubscribers.RUnlock()
Expand Down Expand Up @@ -440,6 +449,7 @@ func (m *MetalBond) StartServer(listenAddress string) error {
p := newMetalBondPeer(
&conn,
conn.RemoteAddr().String(),
"",
m.keepaliveInterval,
INCOMING,
m,
Expand Down
81 changes: 67 additions & 14 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,32 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"net"
"sync"
"time"

"github.com/sirupsen/logrus"
)

var RETRY_INTERVAL = time.Duration(5 * time.Second)
var RetryIntervalMin = 5
var RetryIntervalMax = 5

type metalBondPeer struct {
conn *net.Conn
remoteAddr string
localIP string
localAddr string
direction ConnectionDirection
isServer bool

mtxReset sync.RWMutex
mtxState sync.RWMutex
state ConnectionState

receivedRoutes routeTable
subscribedVNIs map[VNI]bool
receivedRoutes routeTable
subscribedVNIs map[VNI]bool
mtxSubscribedVNIs sync.RWMutex

metalbond *MetalBond

Expand All @@ -60,13 +65,15 @@ type metalBondPeer struct {
func newMetalBondPeer(
pconn *net.Conn,
remoteAddr string,
localIP string,
keepaliveInterval uint32,
direction ConnectionDirection,
metalbond *MetalBond) *metalBondPeer {

peer := metalBondPeer{
conn: pconn,
remoteAddr: remoteAddr,
localIP: localIP,
direction: direction,
state: CONNECTING,
receivedRoutes: newRouteTable(),
Expand Down Expand Up @@ -185,7 +192,21 @@ func (p *metalBondPeer) setState(newState ConnectionState) {

// Connection lost
if oldState != newState && newState != ESTABLISHED {
subscribers := make(map[VNI]map[*metalBondPeer]bool)
p.metalbond.mtxSubscribers.RLock()
for vni, peers := range p.metalbond.subscribers {
for peer := range peers {
if p == peer {
if _, ok := subscribers[vni]; !ok {
subscribers[vni] = make(map[*metalBondPeer]bool)
}
subscribers[vni][p] = true
}
}
}
p.metalbond.mtxSubscribers.RUnlock()

for vni, peers := range subscribers {
for peer := range peers {
if p == peer {
if err := p.metalbond.removeSubscriber(p, vni); err != nil {
Expand All @@ -198,13 +219,16 @@ func (p *metalBondPeer) setState(newState ConnectionState) {
}

func (p *metalBondPeer) log() *logrus.Entry {
return logrus.WithField("peer", p.remoteAddr).WithField("state", p.GetState().String())
return logrus.WithField("peer", p.remoteAddr).WithField("state", p.GetState().String()).WithField("localAddr", p.localAddr)
}

func (p *metalBondPeer) cleanup() {
p.log().Debugf("cleanup")

// unsubscribe from VNIs
p.mtxSubscribedVNIs.RLock()
defer p.mtxSubscribedVNIs.RUnlock()

for vni := range p.subscribedVNIs {
err := p.metalbond.Unsubscribe(vni)
if err != nil {
Expand Down Expand Up @@ -239,11 +263,11 @@ func (p *metalBondPeer) handle() {
p.shutdown = make(chan bool, 5)
p.keepaliveStop = make(chan bool, 5)
p.txChanClose = make(chan bool, 5)
p.rxHello = make(chan msgHello, 50)
p.rxKeepalive = make(chan msgKeepalive, 50)
p.rxSubscribe = make(chan msgSubscribe, 1000)
p.rxUnsubscribe = make(chan msgUnsubscribe, 1000)
p.rxUpdate = make(chan msgUpdate, 1000)
p.rxHello = make(chan msgHello, 5)
p.rxKeepalive = make(chan msgKeepalive, 5)
p.rxSubscribe = make(chan msgSubscribe, 100)
p.rxUnsubscribe = make(chan msgUnsubscribe, 100)
p.rxUpdate = make(chan msgUpdate, 100)

// outgoing connections still need to be established. pconn is nil.
for p.conn == nil {
Expand All @@ -258,14 +282,34 @@ func (p *metalBondPeer) handle() {
// proceed
}

conn, err := net.Dial("tcp", p.remoteAddr)
var err error
localAddr := &net.TCPAddr{}
if p.localIP != "" {
localAddr, err = net.ResolveTCPAddr("tcp", p.localIP+":0")
if err != nil {
p.log().Errorf("Error resolving local address: %s", err)
return
}
}

remoteAddr, err := net.ResolveTCPAddr("tcp", p.remoteAddr)
if err != nil {
logrus.Infof("Cannot connect to server - %v - retry in %v", err, RETRY_INTERVAL)
time.Sleep(RETRY_INTERVAL)
p.log().Errorf("Error resolving remove address: %s", err)
return
}

tcpConn, err := net.DialTCP("tcp", localAddr, remoteAddr)
if err != nil {
retry := time.Duration(rand.Intn(RetryIntervalMax)+RetryIntervalMin) * time.Second
logrus.Infof("Cannot connect to server - %v - retry in %v", err, retry)
time.Sleep(retry)
continue
}

conn := net.Conn(tcpConn)
p.localAddr = conn.LocalAddr().String()
p.conn = &conn

}

go p.rxLoop()
Expand Down Expand Up @@ -471,6 +515,8 @@ func (p *metalBondPeer) processRxKeepalive(msg msgKeepalive) {

func (p *metalBondPeer) processRxSubscribe(msg msgSubscribe) {
p.log().Debugf("processRxSubscribe %#v", msg)
p.mtxSubscribedVNIs.Lock()
defer p.mtxSubscribedVNIs.Unlock()
p.subscribedVNIs[msg.VNI] = true
if err := p.metalbond.addSubscriber(p, msg.VNI); err != nil {
p.log().Errorf("Failed to add subscriber: %v", err)
Expand All @@ -479,10 +525,15 @@ func (p *metalBondPeer) processRxSubscribe(msg msgSubscribe) {

func (p *metalBondPeer) processRxUnsubscribe(msg msgUnsubscribe) {
p.log().Debugf("processRxUnsubscribe %#v", msg)
p.metalbond.mtxSubscribers.Lock()
defer p.metalbond.mtxSubscribers.Unlock()

if err := p.metalbond.removeSubscriber(p, msg.VNI); err != nil {
p.log().Errorf("Failed to remove subscriber: %v", err)
}

p.mtxSubscribedVNIs.RLock()
defer p.mtxSubscribedVNIs.RUnlock()
delete(p.subscribedVNIs, msg.VNI)
}

Expand Down Expand Up @@ -554,9 +605,11 @@ func (p *metalBondPeer) Reset() {
p.wg.Wait()

p.conn = nil
p.log().Infof("Closed. Waiting %s...", RETRY_INTERVAL)
p.localAddr = ""
retry := time.Duration(rand.Intn(RetryIntervalMax)+RetryIntervalMin) * time.Second
p.log().Infof("Closed. Waiting %s...", retry)

time.Sleep(RETRY_INTERVAL)
time.Sleep(retry)
p.setState(CONNECTING)
p.log().Infof("Reconnecting...")

Expand Down
Loading

0 comments on commit fe59f15

Please sign in to comment.