Skip to content

Commit

Permalink
Merge pull request #8520 from lightningnetwork/peer-msg-router
Browse files Browse the repository at this point in the history
[2/4] - peer: add new abstract message router
  • Loading branch information
Roasbeef authored Aug 16, 2024
2 parents 7c24e33 + f09c517 commit 8ac184a
Show file tree
Hide file tree
Showing 9 changed files with 547 additions and 5 deletions.
14 changes: 14 additions & 0 deletions config_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/lightningnetwork/lnd/chainreg"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/invoices"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/kvdb"
Expand All @@ -42,6 +43,7 @@ import (
"github.com/lightningnetwork/lnd/lnwallet/btcwallet"
"github.com/lightningnetwork/lnd/lnwallet/rpcwallet"
"github.com/lightningnetwork/lnd/macaroons"
"github.com/lightningnetwork/lnd/msgmux"
"github.com/lightningnetwork/lnd/rpcperms"
"github.com/lightningnetwork/lnd/signal"
"github.com/lightningnetwork/lnd/sqldb"
Expand Down Expand Up @@ -118,6 +120,14 @@ type ChainControlBuilder interface {
*btcwallet.Config) (*chainreg.ChainControl, func(), error)
}

// AuxComponents is a set of auxiliary components that can be used by lnd for
// certain custom channel types.
type AuxComponents struct {
// MsgRouter is an optional message router that if set will be used in
// place of a new blank default message router.
MsgRouter fn.Option[msgmux.Router]
}

// ImplementationCfg is a struct that holds all configuration items for
// components that can be implemented outside lnd itself.
type ImplementationCfg struct {
Expand All @@ -144,6 +154,10 @@ type ImplementationCfg struct {
// ChainControlBuilder is a type that can provide a custom wallet
// implementation.
ChainControlBuilder

// AuxComponents is a set of auxiliary components that can be used by
// lnd for certain custom channel types.
AuxComponents
}

// DefaultWalletImpl is the default implementation of our normal, btcwallet
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
github.com/lightningnetwork/lightning-onion v1.2.1-0.20240712235311-98bd56499dfb
github.com/lightningnetwork/lnd/cert v1.2.2
github.com/lightningnetwork/lnd/clock v1.1.1
github.com/lightningnetwork/lnd/fn v1.2.0
github.com/lightningnetwork/lnd/fn v1.2.1
github.com/lightningnetwork/lnd/healthcheck v1.2.5
github.com/lightningnetwork/lnd/kvdb v1.4.10
github.com/lightningnetwork/lnd/queue v1.1.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,8 @@ github.com/lightningnetwork/lnd/cert v1.2.2 h1:71YK6hogeJtxSxw2teq3eGeuy4rHGKcFf
github.com/lightningnetwork/lnd/cert v1.2.2/go.mod h1:jQmFn/Ez4zhDgq2hnYSw8r35bqGVxViXhX6Cd7HXM6U=
github.com/lightningnetwork/lnd/clock v1.1.1 h1:OfR3/zcJd2RhH0RU+zX/77c0ZiOnIMsDIBjgjWdZgA0=
github.com/lightningnetwork/lnd/clock v1.1.1/go.mod h1:mGnAhPyjYZQJmebS7aevElXKTFDuO+uNFFfMXK1W8xQ=
github.com/lightningnetwork/lnd/fn v1.2.0 h1:YTb2m8NN5ZiJAskHeBZAmR1AiPY8SXziIYPAX1VI/ZM=
github.com/lightningnetwork/lnd/fn v1.2.0/go.mod h1:SyFohpVrARPKH3XVAJZlXdVe+IwMYc4OMAvrDY32kw0=
github.com/lightningnetwork/lnd/fn v1.2.1 h1:pPsVGrwi9QBwdLJzaEGK33wmiVKOxs/zc8H7+MamFf0=
github.com/lightningnetwork/lnd/fn v1.2.1/go.mod h1:SyFohpVrARPKH3XVAJZlXdVe+IwMYc4OMAvrDY32kw0=
github.com/lightningnetwork/lnd/healthcheck v1.2.5 h1:aTJy5xeBpcWgRtW/PGBDe+LMQEmNm/HQewlQx2jt7OA=
github.com/lightningnetwork/lnd/healthcheck v1.2.5/go.mod h1:G7Tst2tVvWo7cx6mSBEToQC5L1XOGxzZTPB29g9Rv2I=
github.com/lightningnetwork/lnd/kvdb v1.4.10 h1:vK89IVv1oVH9ubQWU+EmoCQFeVRaC8kfmOrqHbY5zoY=
Expand Down
1 change: 1 addition & 0 deletions lnd.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ func Main(cfg *Config, lisCfg ListenerCfg, implCfg *ImplementationCfg,
cfg, cfg.Listeners, dbs, activeChainControl, &idKeyDesc,
activeChainControl.Cfg.WalletUnlockParams.ChansToRestore,
multiAcceptor, torController, tlsManager, leaderElector,
implCfg,
)
if err != nil {
return mkErr("unable to create server: %v", err)
Expand Down
32 changes: 32 additions & 0 deletions msgmux/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package msgmux

import (
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/build"
)

// Subsystem defines the logging code for this subsystem.
const Subsystem = "MSGX"

// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var log btclog.Logger

// The default amount of logging is none.
func init() {
UseLogger(build.NewSubLogger(Subsystem, nil))
}

// DisableLog disables all library log output. Logging output is disabled
// by default until UseLogger is called.
func DisableLog() {
UseLogger(btclog.Disabled)
}

// UseLogger uses a specified Logger to output package logging info.
// This should be used in preference to SetLogWriter if the caller is also
// using btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}
274 changes: 274 additions & 0 deletions msgmux/msg_router.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
package msgmux

// For some reason golangci-lint has a false positive on the sort order of the
// imports for the new "maps" package... We need the nolint directive here to
// ignore that.
//
//nolint:gci
import (
"fmt"
"maps"
"sync"

"github.com/btcsuite/btcd/btcec/v2"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/lnwire"
)

var (
// ErrDuplicateEndpoint is returned when an endpoint is registered with
// a name that already exists.
ErrDuplicateEndpoint = fmt.Errorf("endpoint already registered")

// ErrUnableToRouteMsg is returned when a message is unable to be
// routed to any endpoints.
ErrUnableToRouteMsg = fmt.Errorf("unable to route message")
)

// EndpointName is the name of a given endpoint. This MUST be unique across all
// registered endpoints.
type EndpointName = string

// PeerMsg is a wire message that includes the public key of the peer that sent
// it.
type PeerMsg struct {
lnwire.Message

// PeerPub is the public key of the peer that sent this message.
PeerPub btcec.PublicKey
}

// Endpoint is an interface that represents a message endpoint, or the
// sub-system that will handle processing an incoming wire message.
type Endpoint interface {
// Name returns the name of this endpoint. This MUST be unique across
// all registered endpoints.
Name() EndpointName

// CanHandle returns true if the target message can be routed to this
// endpoint.
CanHandle(msg PeerMsg) bool

// SendMessage handles the target message, and returns true if the
// message was able being processed.
SendMessage(msg PeerMsg) bool
}

// MsgRouter is an interface that represents a message router, which is generic
// sub-system capable of routing any incoming wire message to a set of
// registered endpoints.
type Router interface {
// RegisterEndpoint registers a new endpoint with the router. If a
// duplicate endpoint exists, an error is returned.
RegisterEndpoint(Endpoint) error

// UnregisterEndpoint unregisters the target endpoint from the router.
UnregisterEndpoint(EndpointName) error

// RouteMsg attempts to route the target message to a registered
// endpoint. If ANY endpoint could handle the message, then nil is
// returned. Otherwise, ErrUnableToRouteMsg is returned.
RouteMsg(PeerMsg) error

// Start starts the peer message router.
Start()

// Stop stops the peer message router.
Stop()
}

// sendQuery sends a query to the main event loop, and returns the response.
func sendQuery[Q any, R any](sendChan chan fn.Req[Q, R], queryArg Q,
quit chan struct{}) fn.Result[R] {

query, respChan := fn.NewReq[Q, R](queryArg)

if !fn.SendOrQuit(sendChan, query, quit) {
return fn.Errf[R]("router shutting down")
}

return fn.NewResult(fn.RecvResp(respChan, nil, quit))
}

// sendQueryErr is a helper function based on sendQuery that can be used when
// the query only needs an error response.
func sendQueryErr[Q any](sendChan chan fn.Req[Q, error], queryArg Q,
quitChan chan struct{}) error {

return fn.ElimEither(
fn.Iden, fn.Iden,
sendQuery(sendChan, queryArg, quitChan).Either,
)
}

// EndpointsMap is a map of all registered endpoints.
type EndpointsMap map[EndpointName]Endpoint

// MultiMsgRouter is a type of message router that is capable of routing new
// incoming messages, permitting a message to be routed to multiple registered
// endpoints.
type MultiMsgRouter struct {
startOnce sync.Once
stopOnce sync.Once

// registerChan is the channel that all new endpoints will be sent to.
registerChan chan fn.Req[Endpoint, error]

// unregisterChan is the channel that all endpoints that are to be
// removed are sent to.
unregisterChan chan fn.Req[EndpointName, error]

// msgChan is the channel that all messages will be sent to for
// processing.
msgChan chan fn.Req[PeerMsg, error]

// endpointsQueries is a channel that all queries to the endpoints map
// will be sent to.
endpointQueries chan fn.Req[Endpoint, EndpointsMap]

wg sync.WaitGroup
quit chan struct{}
}

// NewMultiMsgRouter creates a new instance of a peer message router.
func NewMultiMsgRouter() *MultiMsgRouter {
return &MultiMsgRouter{
registerChan: make(chan fn.Req[Endpoint, error]),
unregisterChan: make(chan fn.Req[EndpointName, error]),
msgChan: make(chan fn.Req[PeerMsg, error]),
endpointQueries: make(chan fn.Req[Endpoint, EndpointsMap]),
quit: make(chan struct{}),
}
}

// Start starts the peer message router.
func (p *MultiMsgRouter) Start() {
log.Infof("Starting Router")

p.startOnce.Do(func() {
p.wg.Add(1)
go p.msgRouter()
})
}

// Stop stops the peer message router.
func (p *MultiMsgRouter) Stop() {
log.Infof("Stopping Router")

p.stopOnce.Do(func() {
close(p.quit)
p.wg.Wait()
})
}

// RegisterEndpoint registers a new endpoint with the router. If a duplicate
// endpoint exists, an error is returned.
func (p *MultiMsgRouter) RegisterEndpoint(endpoint Endpoint) error {
return sendQueryErr(p.registerChan, endpoint, p.quit)
}

// UnregisterEndpoint unregisters the target endpoint from the router.
func (p *MultiMsgRouter) UnregisterEndpoint(name EndpointName) error {
return sendQueryErr(p.unregisterChan, name, p.quit)
}

// RouteMsg attempts to route the target message to a registered endpoint. If
// ANY endpoint could handle the message, then nil is returned.
func (p *MultiMsgRouter) RouteMsg(msg PeerMsg) error {
return sendQueryErr(p.msgChan, msg, p.quit)
}

// Endpoints returns a list of all registered endpoints.
func (p *MultiMsgRouter) endpoints() fn.Result[EndpointsMap] {
return sendQuery(p.endpointQueries, nil, p.quit)
}

// msgRouter is the main goroutine that handles all incoming messages.
func (p *MultiMsgRouter) msgRouter() {
defer p.wg.Done()

// endpoints is a map of all registered endpoints.
endpoints := make(map[EndpointName]Endpoint)

for {
select {
// A new endpoint was just sent in, so we'll add it to our set
// of registered endpoints.
case newEndpointMsg := <-p.registerChan:
endpoint := newEndpointMsg.Request

log.Infof("MsgRouter: registering new "+
"Endpoint(%s)", endpoint.Name())

// If this endpoint already exists, then we'll return
// an error as we require unique names.
if _, ok := endpoints[endpoint.Name()]; ok {
log.Errorf("MsgRouter: rejecting "+
"duplicate endpoint: %v",
endpoint.Name())

newEndpointMsg.Resolve(ErrDuplicateEndpoint)

continue
}

endpoints[endpoint.Name()] = endpoint

newEndpointMsg.Resolve(nil)

// A request to unregister an endpoint was just sent in, so
// we'll attempt to remove it.
case endpointName := <-p.unregisterChan:
delete(endpoints, endpointName.Request)

log.Infof("MsgRouter: unregistering "+
"Endpoint(%s)", endpointName.Request)

endpointName.Resolve(nil)

// A new message was just sent in. We'll attempt to route it to
// all the endpoints that can handle it.
case msgQuery := <-p.msgChan:
msg := msgQuery.Request

// Loop through all the endpoints and send the message
// to those that can handle it the message.
var couldSend bool
for _, endpoint := range endpoints {
if endpoint.CanHandle(msg) {
log.Tracef("MsgRouter: sending "+
"msg %T to endpoint %s", msg,
endpoint.Name())

sent := endpoint.SendMessage(msg)
couldSend = couldSend || sent
}
}

var err error
if !couldSend {
log.Tracef("MsgRouter: unable to route "+
"msg %T", msg)

err = ErrUnableToRouteMsg
}

msgQuery.Resolve(err)

// A query for the endpoint state just came in, we'll send back
// a copy of our current state.
case endpointQuery := <-p.endpointQueries:
endpointsCopy := make(EndpointsMap, len(endpoints))
maps.Copy(endpointsCopy, endpoints)

endpointQuery.Resolve(endpointsCopy)

case <-p.quit:
return
}
}
}

// A compile time check to ensure MultiMsgRouter implements the MsgRouter
// interface.
var _ Router = (*MultiMsgRouter)(nil)
Loading

0 comments on commit 8ac184a

Please sign in to comment.