Skip to content

Commit

Permalink
Merge pull request #364 from lightninglabs/universe-server-info
Browse files Browse the repository at this point in the history
universe: add server info RPC, check we're not connecting to ourselves
  • Loading branch information
jharveyb authored Jun 22, 2023
2 parents 14ec806 + aca9702 commit 2891eaf
Show file tree
Hide file tree
Showing 18 changed files with 856 additions and 396 deletions.
5 changes: 5 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ type DatabaseConfig struct {
type Config struct {
DebugLevel string

// RuntimeID is a pseudo-random ID that is generated when the server
// starts. It is used to identify the server to itself, to avoid
// connecting to itself as a federation member.
RuntimeID int64

AcceptRemoteUniverseProofs bool

// TODO(roasbeef): use the Taproot Asset chain param wrapper here?
Expand Down
15 changes: 15 additions & 0 deletions fn/func.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,21 @@ func Map[I, O any, S []I](s S, f func(I) O) []O {
return output
}

// Filter applies the given predicate function to each element of the given
// slice and generates a new slice containing only the elements for which the
// predicate returned true.
func Filter[T any](s []T, f func(T) bool) []T {
output := make([]T, len(s))

for _, x := range s {
if f(x) {
output = append(output, x)
}
}

return output
}

// MapErr applies the given fallible mapping function to each element of the
// given slice and generates a new slice. This is identical to Map, but
// returns early if any single mapping fails.
Expand Down
41 changes: 34 additions & 7 deletions itest/universe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,17 +314,39 @@ func testUniverseFederation(t *harnessTest) {
require.NoError(t.t, bob.stop(true))
}()

ctx := context.Background()
ctxb := context.Background()
ctxt, cancel := context.WithTimeout(ctxb, defaultWaitTimeout)
defer cancel()

// Now that Bob is active, we'll make a set of assets with the main node.
firstAsset := mintAssetsConfirmBatch(t, t.tapd, simpleAssets[:1])
require.Len(t.t, firstAsset, 1)

// Make sure we can't add ourselves to the universe.
_, err := t.tapd.AddFederationServer(
ctxt, &unirpc.AddFederationServerRequest{
Servers: []*unirpc.UniverseFederationServer{{
Host: t.tapd.rpcHost(),
}},
},
)
require.ErrorContains(t.t, err, "cannot add ourselves")

// Make sure we can't add an invalid server to the universe.
_, err = t.tapd.AddFederationServer(
ctxt, &unirpc.AddFederationServerRequest{
Servers: []*unirpc.UniverseFederationServer{{
Host: "foobar this is not even a valid address",
}},
},
)
require.ErrorContains(t.t, err, "no such host")

// We'll now add the main node, as a member of Bob's Universe
// federation. We expect that their state is synchronized shortly after
// the call returns.
_, err := bob.AddFederationServer(
ctx, &unirpc.AddFederationServerRequest{
_, err = bob.AddFederationServer(
ctxt, &unirpc.AddFederationServerRequest{
Servers: []*unirpc.UniverseFederationServer{
{
Host: t.tapd.rpcHost(),
Expand All @@ -337,7 +359,7 @@ func testUniverseFederation(t *harnessTest) {
// If we fetch the set of federation nodes, then the main node should
// be shown as being a part of that set.
fedNodes, err := bob.ListFederationServers(
ctx, &unirpc.ListFederationServersRequest{},
ctxt, &unirpc.ListFederationServersRequest{},
)
require.NoError(t.t, err)
require.Equal(t.t, 1, len(fedNodes.Servers))
Expand All @@ -352,14 +374,19 @@ func testUniverseFederation(t *harnessTest) {
// should also be able to query for stats specifically for the asset.
assertUniverseStats(t.t, bob, 1, 0, 1)

// Test the content of the universe info call.
info, err := bob.Info(ctxt, &unirpc.InfoRequest{})
require.NoError(t.t, err)
require.EqualValues(t.t, 1, info.NumAssets)

// We'll now make a new asset with Bob, and ensure that the state is
// properly pushed to the main node which is a part of the federation.
newAsset := mintAssetsConfirmBatch(t, bob, simpleAssets[1:])

// Bob should have a new asset in its local Universe tree.
assetID := newAsset[0].AssetGenesis.AssetId
waitErr := wait.NoError(func() error {
_, err := bob.QueryAssetRoots(ctx, &unirpc.AssetRootQuery{
_, err := bob.QueryAssetRoots(ctxt, &unirpc.AssetRootQuery{
Id: &unirpc.ID{
Id: &unirpc.ID_AssetId{
AssetId: assetID,
Expand Down Expand Up @@ -390,7 +417,7 @@ func testUniverseFederation(t *harnessTest) {

// Next, we'll try to delete the main node from the federation.
_, err = bob.DeleteFederationServer(
ctx, &unirpc.DeleteFederationServerRequest{
ctxt, &unirpc.DeleteFederationServerRequest{
Servers: []*unirpc.UniverseFederationServer{
{
Host: t.tapd.rpcHost(),
Expand All @@ -403,7 +430,7 @@ func testUniverseFederation(t *harnessTest) {
// If we fetch the set of federation nodes, then the main node should
// no longer be present.
fedNodes, err = bob.ListFederationServers(
ctx, &unirpc.ListFederationServersRequest{},
ctxt, &unirpc.ListFederationServersRequest{},
)
require.NoError(t.t, err)
require.Equal(t.t, 0, len(fedNodes.Servers))
Expand Down
1 change: 1 addition & 0 deletions perms/perms.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,5 +193,6 @@ var (
"/universerpc.Universe/AssetLeaves": {},
"/universerpc.Universe/QueryProof": {},
"/universerpc.Universe/InsertProof": {},
"/universerpc.Universe/Info": {},
}
)
35 changes: 32 additions & 3 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2734,6 +2734,21 @@ func (r *rpcServer) InsertProof(ctx context.Context,
return r.marshalIssuanceProof(ctx, req.Key, newUniverseState)
}

// Info returns a set of information about the current state of the Universe.
func (r *rpcServer) Info(ctx context.Context,
req *unirpc.InfoRequest) (*unirpc.InfoResponse, error) {

universeStats, err := r.cfg.UniverseStats.AggregateSyncStats(ctx)
if err != nil {
return nil, err
}

return &unirpc.InfoResponse{
RuntimeId: r.cfg.RuntimeID,
NumAssets: universeStats.NumTotalAssets,
}, nil
}

// unmarshalUniverseSyncType maps an RPC universe sync type into a concrete
// type.
func unmarshalUniverseSyncType(req unirpc.UniverseSyncMode) (
Expand Down Expand Up @@ -2884,6 +2899,20 @@ func (r *rpcServer) AddFederationServer(ctx context.Context,

serversToAdd := fn.Map(in.Servers, unmarshalUniverseServer)

for idx := range serversToAdd {
server := serversToAdd[idx]

// Before we add the server as a federation member, we check
// that we can actually connect to it and that it isn't
// ourselves.
err := CheckFederationServer(
r.cfg.RuntimeID, universe.DefaultTimeout, server,
)
if err != nil {
return nil, err
}
}

err := r.cfg.UniverseFederation.AddServer(serversToAdd...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -3010,7 +3039,7 @@ func (r *rpcServer) VerifyAssetOwnership(ctx context.Context,
}, nil
}

// UniverseStats returns a set of aggregrate statistics for the current state
// UniverseStats returns a set of aggregate statistics for the current state
// of the Universe.
func (r *rpcServer) UniverseStats(ctx context.Context,
req *unirpc.StatsRequest) (*unirpc.StatsResponse, error) {
Expand All @@ -3029,8 +3058,8 @@ func (r *rpcServer) UniverseStats(ctx context.Context,

// marshalAssetSyncSnapshot maps a universe asset sync stat snapshot to the RPC
// counterpart.
func marshalAssetSyncSnapshot(a universe.AssetSyncSnapshot,
) *unirpc.AssetStatsSnapshot {
func marshalAssetSyncSnapshot(
a universe.AssetSyncSnapshot) *unirpc.AssetStatsSnapshot {

return &unirpc.AssetStatsSnapshot{
AssetId: a.AssetID[:],
Expand Down
2 changes: 1 addition & 1 deletion tapcfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ const (
// use for waiting for a receiver to acknowledge a proof transfer.
defaultProofTransferReceiverAckTimeout = time.Hour * 6

// defaultuniverseSyncInterval is the default interval that we'll use
// defaultUniverseSyncInterval is the default interval that we'll use
// to sync Universe state with the federation.
defaultUniverseSyncInterval = time.Minute * 10
)
Expand Down
17 changes: 13 additions & 4 deletions tapcfg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
prand "math/rand"

"github.com/btcsuite/btclog"
"github.com/lightninglabs/lndclient"
Expand Down Expand Up @@ -188,15 +189,22 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,
)
}

runtimeID := prand.Int63() // nolint:gosec
universeFederation := universe.NewFederationEnvoy(
universe.FederationConfig{
FederationDB: federationDB,
UniverseSyncer: universeSyncer,
LocalRegistrar: baseUni,
SyncInterval: cfg.Universe.SyncInterval,
NewRemoteRegistrar: tap.NewRpcUniverseRegistar,
NewRemoteRegistrar: tap.NewRpcUniverseRegistrar,
StaticFederationMembers: federationMembers,
ErrChan: mainErrChan,
ServerChecker: func(addr universe.ServerAddr) error {
return tap.CheckFederationServer(
runtimeID, universe.DefaultTimeout,
addr,
)
},
ErrChan: mainErrChan,
},
)

Expand All @@ -215,6 +223,7 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,

return &tap.Config{
DebugLevel: cfg.DebugLevel,
RuntimeID: runtimeID,
AcceptRemoteUniverseProofs: cfg.Universe.AcceptRemoteProofs,
Lnd: lndServices,
ChainParams: cfg.ActiveNetParams,
Expand Down Expand Up @@ -339,8 +348,8 @@ func CreateServerFromConfig(cfg *Config, cfgLogger btclog.Logger,
return tap.NewServer(serverCfg), nil
}

// CreateServerFromConfig creates a new Taproot Asset server from the given CLI
// config.
// CreateSubServerFromConfig creates a new Taproot Asset server from the given
// CLI config.
func CreateSubServerFromConfig(cfg *Config, cfgLogger btclog.Logger,
lndServices *lndclient.LndServices,
mainErrChan chan<- error) (*tap.Server, error) {
Expand Down
6 changes: 3 additions & 3 deletions tapdb/universe_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ func (u *UniverseStats) LogNewProofEvent(ctx context.Context,
})
}

// AggreagateSyncStats returns stats aggregated over all assets within the
// AggregateSyncStats returns stats aggregated over all assets within the
// Universe.
func (u *UniverseStats) AggregateSyncStats(ctx context.Context,
) (universe.AggregateStats, error) {
func (u *UniverseStats) AggregateSyncStats(
ctx context.Context) (universe.AggregateStats, error) {

var stats universe.AggregateStats

Expand Down
Loading

0 comments on commit 2891eaf

Please sign in to comment.