Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[2/3] Graph RIP: multi: Graph Source Abstraction #9243

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
488fa3e
graph: remove unused ForEachNode method
ellemouton Nov 8, 2024
9d389ad
graph: let FetchNodeFeatures take an optional read tx
ellemouton Nov 8, 2024
755065b
graph: rename directory from graphsession to session
ellemouton Nov 13, 2024
6c008ff
lnd+graph: add GraphSource interface and implementation
ellemouton Nov 11, 2024
aa24804
graph: add ReadOnlyGraph interface to GraphSource interface
ellemouton Nov 11, 2024
9854bad
graph: add contexts to the ReadOnlyGraph interface
ellemouton Nov 11, 2024
6f3d45f
invoicesrpc: remove invoicerpc server's access to ChannelGraph pointer
ellemouton Nov 11, 2024
237151d
netann+lnd: add netann.ChannelGraph to the GraphSource interface
ellemouton Nov 11, 2024
bfe6262
graph+channeldb: add AddrSource interface to GraphSource
ellemouton Nov 12, 2024
28415f5
graph+lnd: add various calls to GraphSource
ellemouton Nov 12, 2024
0f33d41
discovery: pass contexts to NetworkPeerBootstrapper methods
ellemouton Nov 12, 2024
372883a
lnd+graph: add GraphBootstrapper to the GraphSource interface
ellemouton Nov 12, 2024
8007061
graph+lnd: add NetworkStats to GraphSource interface
ellemouton Nov 12, 2024
f36fbd0
graph+lnd: add BetweennessCentrality to GraphSource interface
ellemouton Nov 12, 2024
2192bf4
lnd+chanbackup: thread contexts through
ellemouton Nov 13, 2024
dcfffd6
invoicesrpc: remove a context.TODO
ellemouton Nov 13, 2024
75b1069
blindedpath: remove a context.TODO
ellemouton Nov 13, 2024
a28102e
netann: remove context.TODO
ellemouton Nov 11, 2024
c5cc6f1
routing: remove context.TODOs
ellemouton Nov 11, 2024
791ac91
remove context.TODOs from tests
ellemouton Nov 11, 2024
15c2161
docs: update release notes
ellemouton Nov 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions chanbackup/backup.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chanbackup

import (
"context"
"fmt"

"github.com/btcsuite/btcd/wire"
Expand All @@ -24,15 +25,17 @@ type LiveChannelSource interface {
// passed open channel. The backup includes all information required to restore
// the channel, as well as addressing information so we can find the peer and
// reconnect to them to initiate the protocol.
func assembleChanBackup(addrSource channeldb.AddrSource,
func assembleChanBackup(ctx context.Context, addrSource channeldb.AddrSource,
openChan *channeldb.OpenChannel) (*Single, error) {

log.Debugf("Crafting backup for ChannelPoint(%v)",
openChan.FundingOutpoint)

// First, we'll query the channel source to obtain all the addresses
// that are associated with the peer for this channel.
known, nodeAddrs, err := addrSource.AddrsForNode(openChan.IdentityPub)
known, nodeAddrs, err := addrSource.AddrsForNode(
ctx, openChan.IdentityPub,
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -90,7 +93,8 @@ func buildCloseTxInputs(
// FetchBackupForChan attempts to create a plaintext static channel backup for
// the target channel identified by its channel point. If we're unable to find
// the target channel, then an error will be returned.
func FetchBackupForChan(chanPoint wire.OutPoint, chanSource LiveChannelSource,
func FetchBackupForChan(ctx context.Context, chanPoint wire.OutPoint,
chanSource LiveChannelSource,
addrSource channeldb.AddrSource) (*Single, error) {

// First, we'll query the channel source to see if the channel is known
Expand All @@ -104,7 +108,7 @@ func FetchBackupForChan(chanPoint wire.OutPoint, chanSource LiveChannelSource,

// Once we have the target channel, we can assemble the backup using
// the source to obtain any extra information that we may need.
staticChanBackup, err := assembleChanBackup(addrSource, targetChan)
staticChanBackup, err := assembleChanBackup(ctx, addrSource, targetChan)
if err != nil {
return nil, fmt.Errorf("unable to create chan backup: %w", err)
}
Expand All @@ -114,7 +118,7 @@ func FetchBackupForChan(chanPoint wire.OutPoint, chanSource LiveChannelSource,

// FetchStaticChanBackups will return a plaintext static channel back up for
// all known active/open channels within the passed channel source.
func FetchStaticChanBackups(chanSource LiveChannelSource,
func FetchStaticChanBackups(ctx context.Context, chanSource LiveChannelSource,
addrSource channeldb.AddrSource) ([]Single, error) {

// First, we'll query the backup source for information concerning all
Expand All @@ -129,7 +133,7 @@ func FetchStaticChanBackups(chanSource LiveChannelSource,
// channel.
staticChanBackups := make([]Single, 0, len(openChans))
for _, openChan := range openChans {
chanBackup, err := assembleChanBackup(addrSource, openChan)
chanBackup, err := assembleChanBackup(ctx, addrSource, openChan)
if err != nil {
return nil, err
}
Expand Down
15 changes: 9 additions & 6 deletions chanbackup/backup_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chanbackup

import (
"context"
"fmt"
"net"
"testing"
Expand Down Expand Up @@ -61,8 +62,8 @@ func (m *mockChannelSource) addAddrsForNode(nodePub *btcec.PublicKey, addrs []ne
m.addrs[nodeKey] = addrs
}

func (m *mockChannelSource) AddrsForNode(nodePub *btcec.PublicKey) (bool,
[]net.Addr, error) {
func (m *mockChannelSource) AddrsForNode(_ context.Context,
nodePub *btcec.PublicKey) (bool, []net.Addr, error) {

if m.failQuery {
return false, nil, fmt.Errorf("fail")
Expand All @@ -81,6 +82,7 @@ func (m *mockChannelSource) AddrsForNode(nodePub *btcec.PublicKey) (bool,
// can find addresses for and otherwise.
func TestFetchBackupForChan(t *testing.T) {
t.Parallel()
ctx := context.Background()

// First, we'll make two channels, only one of them will have all the
// information we need to construct set of backups for them.
Expand Down Expand Up @@ -120,7 +122,7 @@ func TestFetchBackupForChan(t *testing.T) {
}
for i, testCase := range testCases {
_, err := FetchBackupForChan(
testCase.chanPoint, chanSource, chanSource,
ctx, testCase.chanPoint, chanSource, chanSource,
)
switch {
// If this is a valid test case, and we failed, then we'll
Expand All @@ -141,6 +143,7 @@ func TestFetchBackupForChan(t *testing.T) {
// channel source for all channels and construct a Single for each channel.
func TestFetchStaticChanBackups(t *testing.T) {
t.Parallel()
ctx := context.Background()

// First, we'll make the set of channels that we want to seed the
// channel source with. Both channels will be fully populated in the
Expand All @@ -160,7 +163,7 @@ func TestFetchStaticChanBackups(t *testing.T) {
// With the channel source populated, we'll now attempt to create a set
// of backups for all the channels. This should succeed, as all items
// are populated within the channel source.
backups, err := FetchStaticChanBackups(chanSource, chanSource)
backups, err := FetchStaticChanBackups(ctx, chanSource, chanSource)
require.NoError(t, err, "unable to create chan back ups")

if len(backups) != numChans {
Expand All @@ -175,7 +178,7 @@ func TestFetchStaticChanBackups(t *testing.T) {
copy(n[:], randomChan2.IdentityPub.SerializeCompressed())
delete(chanSource.addrs, n)

_, err = FetchStaticChanBackups(chanSource, chanSource)
_, err = FetchStaticChanBackups(ctx, chanSource, chanSource)
if err == nil {
t.Fatalf("query with incomplete information should fail")
}
Expand All @@ -184,7 +187,7 @@ func TestFetchStaticChanBackups(t *testing.T) {
// source at all, then we'll fail as well.
chanSource = newMockChannelSource()
chanSource.failQuery = true
_, err = FetchStaticChanBackups(chanSource, chanSource)
_, err = FetchStaticChanBackups(ctx, chanSource, chanSource)
if err == nil {
t.Fatalf("query should fail")
}
Expand Down
11 changes: 7 additions & 4 deletions chanbackup/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chanbackup

import (
"bytes"
"context"
"fmt"
"net"
"os"
Expand Down Expand Up @@ -81,7 +82,8 @@ type ChannelNotifier interface {
// synchronization point to ensure that the chanbackup.SubSwapper does
// not miss any channel open or close events in the period between when
// it's created, and when it requests the channel subscription.
SubscribeChans(map[wire.OutPoint]struct{}) (*ChannelSubscription, error)
SubscribeChans(context.Context, map[wire.OutPoint]struct{}) (
*ChannelSubscription, error)
}

// SubSwapper subscribes to new updates to the open channel state, and then
Expand Down Expand Up @@ -119,16 +121,17 @@ type SubSwapper struct {
// set of channels, and the required interfaces to be notified of new channel
// updates, pack a multi backup, and swap the current best backup from its
// storage location.
func NewSubSwapper(startingChans []Single, chanNotifier ChannelNotifier,
keyRing keychain.KeyRing, backupSwapper Swapper) (*SubSwapper, error) {
func NewSubSwapper(ctx context.Context, startingChans []Single,
chanNotifier ChannelNotifier, keyRing keychain.KeyRing,
backupSwapper Swapper) (*SubSwapper, error) {

// First, we'll subscribe to the latest set of channel updates given
// the set of channels we already know of.
knownChans := make(map[wire.OutPoint]struct{})
for _, chanBackup := range startingChans {
knownChans[chanBackup.FundingOutpoint] = struct{}{}
}
chanEvents, err := chanNotifier.SubscribeChans(knownChans)
chanEvents, err := chanNotifier.SubscribeChans(ctx, knownChans)
if err != nil {
return nil, err
}
Expand Down
16 changes: 11 additions & 5 deletions chanbackup/pubsub_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chanbackup

import (
"context"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -62,8 +63,8 @@ func newMockChannelNotifier() *mockChannelNotifier {
}
}

func (m *mockChannelNotifier) SubscribeChans(chans map[wire.OutPoint]struct{}) (
*ChannelSubscription, error) {
func (m *mockChannelNotifier) SubscribeChans(_ context.Context,
_ map[wire.OutPoint]struct{}) (*ChannelSubscription, error) {

if m.fail {
return nil, fmt.Errorf("fail")
Expand All @@ -80,6 +81,7 @@ func (m *mockChannelNotifier) SubscribeChans(chans map[wire.OutPoint]struct{}) (
// channel subscription, then the entire sub-swapper will fail to start.
func TestNewSubSwapperSubscribeFail(t *testing.T) {
t.Parallel()
ctx := context.Background()

keyRing := &lnencrypt.MockKeyRing{}

Expand All @@ -88,7 +90,7 @@ func TestNewSubSwapperSubscribeFail(t *testing.T) {
fail: true,
}

_, err := NewSubSwapper(nil, &chanNotifier, keyRing, &swapper)
_, err := NewSubSwapper(ctx, nil, &chanNotifier, keyRing, &swapper)
if err == nil {
t.Fatalf("expected fail due to lack of subscription")
}
Expand Down Expand Up @@ -152,13 +154,16 @@ func assertExpectedBackupSwap(t *testing.T, swapper *mockSwapper,
// multiple time is permitted.
func TestSubSwapperIdempotentStartStop(t *testing.T) {
t.Parallel()
ctx := context.Background()

keyRing := &lnencrypt.MockKeyRing{}

var chanNotifier mockChannelNotifier

swapper := newMockSwapper(keyRing)
subSwapper, err := NewSubSwapper(nil, &chanNotifier, keyRing, swapper)
subSwapper, err := NewSubSwapper(
ctx, nil, &chanNotifier, keyRing, swapper,
)
require.NoError(t, err, "unable to init subSwapper")

if err := subSwapper.Start(); err != nil {
Expand All @@ -181,6 +186,7 @@ func TestSubSwapperIdempotentStartStop(t *testing.T) {
// the master multi file backup.
func TestSubSwapperUpdater(t *testing.T) {
t.Parallel()
ctx := context.Background()

keyRing := &lnencrypt.MockKeyRing{}
chanNotifier := newMockChannelNotifier()
Expand Down Expand Up @@ -224,7 +230,7 @@ func TestSubSwapperUpdater(t *testing.T) {
// With our channel set created, we'll make a fresh sub swapper
// instance to begin our test.
subSwapper, err := NewSubSwapper(
initialChanSet, chanNotifier, keyRing, swapper,
ctx, initialChanSet, chanNotifier, keyRing, swapper,
)
require.NoError(t, err, "unable to make swapper")
if err := subSwapper.Start(); err != nil {
Expand Down
16 changes: 9 additions & 7 deletions chanbackup/recover.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chanbackup

import (
"context"
"net"

"github.com/btcsuite/btcd/btcec/v2"
Expand Down Expand Up @@ -29,7 +30,8 @@ type PeerConnector interface {
// available addresses. Once this method returns with a non-nil error,
// the connector should attempt to persistently connect to the target
// peer in the background as a persistent attempt.
ConnectPeer(node *btcec.PublicKey, addrs []net.Addr) error
ConnectPeer(ctx context.Context, node *btcec.PublicKey,
addrs []net.Addr) error
}

// Recover attempts to recover the static channel state from a set of static
Expand All @@ -41,7 +43,7 @@ type PeerConnector interface {
// well, in order to expose the addressing information required to locate to
// and connect to each peer in order to initiate the recovery protocol.
// The number of channels that were successfully restored is returned.
func Recover(backups []Single, restorer ChannelRestorer,
func Recover(ctx context.Context, backups []Single, restorer ChannelRestorer,
peerConnector PeerConnector) (int, error) {

var numRestored int
Expand Down Expand Up @@ -70,7 +72,7 @@ func Recover(backups []Single, restorer ChannelRestorer,
backup.FundingOutpoint)

err = peerConnector.ConnectPeer(
backup.RemoteNodePub, backup.Addresses,
ctx, backup.RemoteNodePub, backup.Addresses,
)
if err != nil {
return numRestored, err
Expand All @@ -95,7 +97,7 @@ func Recover(backups []Single, restorer ChannelRestorer,
// established, then the PeerConnector will continue to attempt to re-establish
// a persistent connection in the background. The number of channels that were
// successfully restored is returned.
func UnpackAndRecoverSingles(singles PackedSingles,
func UnpackAndRecoverSingles(ctx context.Context, singles PackedSingles,
keyChain keychain.KeyRing, restorer ChannelRestorer,
peerConnector PeerConnector) (int, error) {

Expand All @@ -104,7 +106,7 @@ func UnpackAndRecoverSingles(singles PackedSingles,
return 0, err
}

return Recover(chanBackups, restorer, peerConnector)
return Recover(ctx, chanBackups, restorer, peerConnector)
}

// UnpackAndRecoverMulti is a one-shot method, that given a set of packed
Expand All @@ -114,7 +116,7 @@ func UnpackAndRecoverSingles(singles PackedSingles,
// established, then the PeerConnector will continue to attempt to re-establish
// a persistent connection in the background. The number of channels that were
// successfully restored is returned.
func UnpackAndRecoverMulti(packedMulti PackedMulti,
func UnpackAndRecoverMulti(ctx context.Context, packedMulti PackedMulti,
keyChain keychain.KeyRing, restorer ChannelRestorer,
peerConnector PeerConnector) (int, error) {

Expand All @@ -123,5 +125,5 @@ func UnpackAndRecoverMulti(packedMulti PackedMulti,
return 0, err
}

return Recover(chanBackups.StaticBackups, restorer, peerConnector)
return Recover(ctx, chanBackups.StaticBackups, restorer, peerConnector)
}
Loading
Loading