Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Federation envoy re-attempts sync push for issuance proofs #698

Merged
merged 16 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions itest/tapd_harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ type harnessOpts struct {
proofCourier proof.CourierHarness
custodianProofRetrievalDelay *time.Duration
addrAssetSyncerDisable bool

// fedSyncTickerInterval is the interval at which the federation envoy
// sync ticker will fire.
fedSyncTickerInterval *time.Duration
}

type harnessOption func(*harnessOpts)
Expand Down Expand Up @@ -242,6 +246,10 @@ func newTapdHarness(t *testing.T, ht *harnessTest, cfg tapdConfig,
finalCfg.CustodianProofRetrievalDelay = *opts.custodianProofRetrievalDelay
}

if opts.fedSyncTickerInterval != nil {
finalCfg.Universe.SyncInterval = *opts.fedSyncTickerInterval
}

return &tapdHarness{
cfg: &cfg,
clientCfg: finalCfg,
Expand Down
5 changes: 5 additions & 0 deletions itest/test_harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,10 @@ type tapdHarnessParams struct {
// synced from the above node.
startupSyncNumAssets int

// fedSyncTickerInterval is the interval at which the federation envoy
// sync ticker will fire.
fedSyncTickerInterval *time.Duration

// noDefaultUniverseSync indicates whether the default universe server
// should be added as a federation server or not.
noDefaultUniverseSync bool
Expand Down Expand Up @@ -402,6 +406,7 @@ func setupTapdHarness(t *testing.T, ht *harnessTest,
ho.proofCourier = selectedProofCourier
ho.custodianProofRetrievalDelay = params.custodianProofRetrievalDelay
ho.addrAssetSyncerDisable = params.addrAssetSyncerDisable
ho.fedSyncTickerInterval = params.fedSyncTickerInterval
}

tapdHarness, err := newTapdHarness(t, ht, tapdConfig{
Expand Down
4 changes: 4 additions & 0 deletions itest/test_list_on_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ var testCases = []*testCase{
name: "universe pagination simple",
test: testUniversePaginationSimple,
},
{
name: "mint proof repeat fed sync attempt",
test: testMintProofRepeatFedSyncAttempt,
},
}

var optionalTestCases = []*testCase{
Expand Down
99 changes: 99 additions & 0 deletions itest/universe_federation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package itest

import (
"context"
"time"

"github.com/lightninglabs/taproot-assets/taprpc/mintrpc"
unirpc "github.com/lightninglabs/taproot-assets/taprpc/universerpc"
"github.com/stretchr/testify/require"
)

// testMintProofRepeatFedSyncAttempt tests that the minting node will retry
// pushing the minting proofs to the federation server peer node, if the peer
// node is offline at the time of the initial sync attempt.
func testMintProofRepeatFedSyncAttempt(t *harnessTest) {
// Create a new minting node, without hooking it up to any existing
// Universe server. We will also set the sync ticker to 4 second, so
// that we can test that the proof push sync is retried and eventually
// succeeds after the fed server peer node reappears online.
syncTickerInterval := 4 * time.Second
mintingNode := setupTapdHarness(
t.t, t, t.lndHarness.Bob, nil,
func(params *tapdHarnessParams) {
params.fedSyncTickerInterval = &syncTickerInterval
params.noDefaultUniverseSync = true
},
)
defer func() {
require.NoError(t.t, mintingNode.stop(!*noDelete))
}()

// We'll use the main node as our federation universe server
// counterparty.
fedServerNode := t.tapd

// Keep a reference to the fed server node RPC host address, so that we
// can assert that it has not changed after the restart. This is
// important, because the minting node will be retrying the proof push
// to this address.
fedServerNodeRpcHost := fedServerNode.rpcHost()

// Register the fedServerNode as a federation universe server with the
// minting node.
ctxb := context.Background()
ctxt, cancel := context.WithTimeout(ctxb, defaultWaitTimeout)
defer cancel()

_, err := mintingNode.AddFederationServer(
ctxt, &unirpc.AddFederationServerRequest{
Servers: []*unirpc.UniverseFederationServer{
{
Host: fedServerNodeRpcHost,
},
},
},
)
require.NoError(t.t, err)

// Assert that the fed server node has not seen any asset proofs.
AssertUniverseStats(t.t, fedServerNode, 0, 0, 0)

// Stop the federation server peer node, so that it does not receive the
// newly minted asset proofs immediately upon minting.
t.Logf("Stopping fed server tapd node")
require.NoError(t.t, fedServerNode.stop(false))

// Now that federation peer node is inactive, we'll mint some assets.
t.Logf("Minting assets on minting node")
rpcAssets := MintAssetsConfirmBatch(
t.t, t.lndHarness.Miner.Client, mintingNode,
[]*mintrpc.MintAssetRequest{
simpleAssets[0], issuableAssets[0],
},
)
require.Len(t.t, rpcAssets, 2)

t.lndHarness.MineBlocks(7)

// Wait for the minting node to attempt (and fail) to push the minting
// proofs to the fed peer node. We wait some multiple of the sync ticker
// interval to ensure that the minting node has had time to retry the
// proof push sync.
time.Sleep(syncTickerInterval * 2)

// Start the federation server peer node. The federation envoy component
// of our minting node should currently be retrying the proof push sync
// with the federation peer at each tick.
t.Logf("Start (previously stopped) fed server tapd node")
err = fedServerNode.start(false)
require.NoError(t.t, err)

// Ensure that the federation server node RPC host address has not
// changed after the restart. If it has, then the minting node will be
// retrying the proof push to the wrong address.
require.Equal(t.t, fedServerNodeRpcHost, fedServerNode.rpcHost())

t.Logf("Assert that fed peer node has seen the asset minting proofs")
AssertUniverseStats(t.t, fedServerNode, 2, 2, 1)
}
12 changes: 11 additions & 1 deletion rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3997,7 +3997,17 @@ func (r *rpcServer) DeleteFederationServer(ctx context.Context,

serversToDel := fn.Map(req.Servers, unmarshalUniverseServer)

err := r.cfg.FederationDB.RemoveServers(ctx, serversToDel...)
// Remove the servers from the proofs sync log. This is necessary before
// we can remove the servers from the database because of a foreign
// key constraint.
err := r.cfg.FederationDB.DeleteProofsSyncLogEntries(
ctx, serversToDel...,
)
if err != nil {
return nil, err
}

err = r.cfg.FederationDB.RemoveServers(ctx, serversToDel...)
if err != nil {
return nil, err
}
Expand Down
108 changes: 11 additions & 97 deletions tapdb/assets_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,105 +246,17 @@ func assertAssetEqual(t *testing.T, a, b *asset.Asset) {
func TestImportAssetProof(t *testing.T) {
t.Parallel()

// First, we'll create a new instance of the database.
_, assetStore, db := newAssetStore(t)

// Next, we'll make a new random asset that also has a few inputs with
// dummy witness information.
testAsset := randAsset(t)

assetRoot, err := commitment.NewAssetCommitment(testAsset)
require.NoError(t, err)

taprootAssetRoot, err := commitment.NewTapCommitment(assetRoot)
require.NoError(t, err)

// With our asset created, we can now create the AnnotatedProof we use
// to import assets into the database.
var blockHash chainhash.Hash
_, err = rand.Read(blockHash[:])
require.NoError(t, err)
var (
ctxb = context.Background()

anchorTx := wire.NewMsgTx(2)
anchorTx.AddTxIn(&wire.TxIn{})
anchorTx.AddTxOut(&wire.TxOut{
PkScript: bytes.Repeat([]byte{0x01}, 34),
Value: 10,
})
dbHandle = NewDbHandle(t)
assetStore = dbHandle.AssetStore
)

// Add a random asset and corresponding proof into the database.
testAsset, testProof := dbHandle.AddRandomAssetProof(t)
assetID := testAsset.ID()
anchorPoint := wire.OutPoint{
Hash: anchorTx.TxHash(),
Index: 0,
}
initialBlob := bytes.Repeat([]byte{0x0}, 100)
updatedBlob := bytes.Repeat([]byte{0x77}, 100)
testProof := &proof.AnnotatedProof{
Locator: proof.Locator{
AssetID: &assetID,
ScriptKey: *testAsset.ScriptKey.PubKey,
},
Blob: initialBlob,
AssetSnapshot: &proof.AssetSnapshot{
Asset: testAsset,
OutPoint: anchorPoint,
AnchorBlockHash: blockHash,
AnchorBlockHeight: test.RandInt[uint32](),
AnchorTxIndex: test.RandInt[uint32](),
AnchorTx: anchorTx,
OutputIndex: 0,
InternalKey: test.RandPubKey(t),
ScriptRoot: taprootAssetRoot,
},
}
if testAsset.GroupKey != nil {
testProof.GroupKey = &testAsset.GroupKey.GroupPubKey
}

// We'll now insert the internal key information as well as the script
// key ahead of time to reflect the address creation that happens
// elsewhere.
ctxb := context.Background()
_, err = db.UpsertInternalKey(ctxb, InternalKey{
RawKey: testProof.InternalKey.SerializeCompressed(),
KeyFamily: test.RandInt[int32](),
KeyIndex: test.RandInt[int32](),
})
require.NoError(t, err)
rawScriptKeyID, err := db.UpsertInternalKey(ctxb, InternalKey{
RawKey: testAsset.ScriptKey.RawKey.PubKey.SerializeCompressed(),
KeyFamily: int32(testAsset.ScriptKey.RawKey.Family),
KeyIndex: int32(testAsset.ScriptKey.RawKey.Index),
})
require.NoError(t, err)
_, err = db.UpsertScriptKey(ctxb, NewScriptKey{
InternalKeyID: rawScriptKeyID,
TweakedScriptKey: testAsset.ScriptKey.PubKey.SerializeCompressed(),
Tweak: nil,
})
require.NoError(t, err)

// We'll add the chain transaction of the proof now to simulate a
// batched transfer on a higher layer.
var anchorTxBuf bytes.Buffer
err = testProof.AnchorTx.Serialize(&anchorTxBuf)
require.NoError(t, err)
anchorTXID := testProof.AnchorTx.TxHash()
_, err = db.UpsertChainTx(ctxb, ChainTxParams{
Txid: anchorTXID[:],
RawTx: anchorTxBuf.Bytes(),
BlockHeight: sqlInt32(testProof.AnchorBlockHeight),
BlockHash: testProof.AnchorBlockHash[:],
TxIndex: sqlInt32(testProof.AnchorTxIndex),
})
require.NoError(t, err, "unable to insert chain tx: %w", err)

// With all our test data constructed, we'll now attempt to import the
// asset into the database.
require.NoError(t, assetStore.ImportProofs(
ctxb, proof.MockHeaderVerifier, proof.MockGroupVerifier, false,
testProof,
))
initialBlob := testProof.Blob

// We should now be able to retrieve the set of all assets inserted on
// disk.
Expand All @@ -371,7 +283,7 @@ func TestImportAssetProof(t *testing.T) {
ScriptKey: *testAsset.ScriptKey.PubKey,
})
require.NoError(t, err)
require.Equal(t, initialBlob, []byte(currentBlob))
require.Equal(t, initialBlob, currentBlob)

// We should also be able to fetch the created asset above based on
// either the asset ID, or key group via the main coin selection
Expand All @@ -391,6 +303,8 @@ func TestImportAssetProof(t *testing.T) {

// We'll now attempt to overwrite the proof with one that has different
// block information (simulating a re-org).
updatedBlob := bytes.Repeat([]byte{0x77}, 100)

testProof.AnchorBlockHash = chainhash.Hash{12, 34, 56}
testProof.AnchorBlockHeight = 1234
testProof.AnchorTxIndex = 5678
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP INDEX IF EXISTS federation_proof_sync_log_unique_index_proof_leaf_id_servers_id;
DROP TABLE IF EXISTS federation_proof_sync_log;
ffranr marked this conversation as resolved.
Show resolved Hide resolved
36 changes: 36 additions & 0 deletions tapdb/sqlc/migrations/000013_universe_fed_proof_sync_log.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
-- This table stores the log of federation universe proof sync attempts. Rows
-- in this table are specific to a given proof leaf, server, and sync direction.
CREATE TABLE IF NOT EXISTS federation_proof_sync_log (
id BIGINT PRIMARY KEY,

-- The status of the proof sync attempt.
status TEXT NOT NULL CHECK(status IN ('pending', 'complete')),

-- The timestamp of when the log entry for the associated proof was last
-- updated.
timestamp TIMESTAMP NOT NULL,
ffranr marked this conversation as resolved.
Show resolved Hide resolved

-- The number of attempts that have been made to sync the proof.
attempt_counter BIGINT NOT NULL DEFAULT 0,

-- The direction of the proof sync attempt.
sync_direction TEXT NOT NULL CHECK(sync_direction IN ('push', 'pull')),

-- The ID of the subject proof leaf.
proof_leaf_id BIGINT NOT NULL REFERENCES universe_leaves(id),

-- The ID of the universe that the proof leaf belongs to.
universe_root_id BIGINT NOT NULL REFERENCES universe_roots(id),

-- The ID of the server that the proof will be/was synced to.
servers_id BIGINT NOT NULL REFERENCES universe_servers(id)
);

-- Create a unique index on table federation_proof_sync_log
CREATE UNIQUE INDEX federation_proof_sync_log_unique_index_proof_leaf_id_servers_id
ON federation_proof_sync_log (
sync_direction,
proof_leaf_id,
universe_root_id,
servers_id
);
11 changes: 11 additions & 0 deletions tapdb/sqlc/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading