Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[2/4] - peer: add new abstract message router #8520

Merged
merged 4 commits into from
Aug 16, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion peer/brontide.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
"github.com/lightningnetwork/lnd/lnwallet/chancloser"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/msgmux"
"github.com/lightningnetwork/lnd/netann"
"github.com/lightningnetwork/lnd/pool"
"github.com/lightningnetwork/lnd/queue"
Expand Down Expand Up @@ -522,6 +523,10 @@ type Brontide struct {
// potentially holding lots of un-consumed events.
channelEventClient *subscribe.Client

// msgRouter is an instance of the msgmux.Router which is used to send
// off new wire messages for handing.
msgRouter fn.Option[msgmux.Router]

startReady chan struct{}
quit chan struct{}
wg sync.WaitGroup
Expand Down Expand Up @@ -559,6 +564,9 @@ func NewBrontide(cfg Config) *Brontide {
startReady: make(chan struct{}),
quit: make(chan struct{}),
log: build.NewPrefixLog(logPrefix, peerLog),
msgRouter: fn.Some[msgmux.Router](
msgmux.NewMultiMsgRouter(),
),
}

if cfg.Conn != nil && cfg.Conn.RemoteAddr() != nil {
Expand Down Expand Up @@ -738,6 +746,12 @@ func (p *Brontide) Start() error {
return err
}

// Register the message router now as we may need to register some
// endpoints while loading the channels below.
p.msgRouter.WhenSome(func(router msgmux.Router) {
router.Start()
})
Comment on lines +769 to +773
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really wish that go would let you just do p.msgRouter.WhenSome(Router.Start) but alas...


msgs, err := p.loadActiveChannels(activeChans)
if err != nil {
return fmt.Errorf("unable to load channels: %w", err)
Expand Down Expand Up @@ -913,7 +927,8 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
p.cfg.Signer, dbChan, p.cfg.SigPool,
)
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to create channel "+
"state machine: %w", err)
}

chanPoint := dbChan.FundingOutpoint
Expand Down Expand Up @@ -1368,6 +1383,10 @@ func (p *Brontide) Disconnect(reason error) {
p.cfg.Conn.Close()

close(p.quit)

p.msgRouter.WhenSome(func(router msgmux.Router) {
router.Stop()
})
}

// String returns the string representation of this peer.
Expand Down Expand Up @@ -1809,6 +1828,22 @@ out:
}
}

// If a message router is active, then we'll try to have it
// handle this message. If it can, then we're able to skip the
// rest of the message handling logic.
err = fn.MapOptionZ(p.msgRouter, func(r msgmux.Router) error {
return r.RouteMsg(msgmux.PeerMsg{
PeerPub: *p.IdentityKey(),
Message: nextMsg,
})
})

// No error occurred, and the message was handled by the
// router.
if err == nil {
continue
}

var (
targetChan lnwire.ChannelID
isLinkUpdate bool
Expand Down