Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Chainnotifier Service to leverage batch-request support #1

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 148 additions & 49 deletions chainntnfs/bitcoindnotify/bitcoind.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/rpcclient"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcwallet/chain"
Expand Down Expand Up @@ -525,8 +526,24 @@ func (b *BitcoindNotifier) confDetailsManually(confRequest chainntnfs.ConfReques
heightHint, currentHeight uint32) (*chainntnfs.TxConfirmation,
chainntnfs.TxConfStatus, error) {

// Begin scanning blocks at every height to determine where the
// transaction was included in.
// batchRequest will store the scan requests at every height to determine
// where the transaction was included in.
batchRequest := make(
map[uint32]rpcclient.FutureGetBlockHashResult, currentHeight-heightHint,
)

Vib-UX marked this conversation as resolved.
Show resolved Hide resolved
for height := currentHeight; height >= heightHint && height > 0; height-- {
batchRequest[height] = b.chainConn.GetBlockHashAsync(int64(height))
}

// Sending the bulk request using the updated batchClient.
err := b.chainConn.SendAsyncQueue()
if err != nil {
return nil, chainntnfs.TxNotFoundManually, err
}

blockHashes := make([]*chainhash.Hash, 0, len(batchRequest))

for height := currentHeight; height >= heightHint && height > 0; height-- {
// Ensure we haven't been requested to shut down before
// processing the next height.
Expand All @@ -537,35 +554,67 @@ func (b *BitcoindNotifier) confDetailsManually(confRequest chainntnfs.ConfReques
default:
}

blockHash, err := b.chainConn.GetBlockHash(int64(height))
// Receive the next block hash from the async queue.
blockHash, err := batchRequest[height].Receive()
if err != nil {
return nil, chainntnfs.TxNotFoundManually,
fmt.Errorf("unable to get hash from block "+
"with height %d", height)
fmt.Errorf("unable to retrieve hash for block "+
"with height %d: %v", height, err)
}

block, err := b.GetBlock(blockHash)
if err != nil {
blockHashes = append(blockHashes, blockHash)
}

// Now we fetch blocks in interval of 15 to avoid out of memory
// errors in case of fetching too many blocks with GetBlocksBatch().
const batchSize = 15
total := len(blockHashes)

for i := 0; i < total; i += batchSize {
// Ensure we haven't been requested to shut down before
// processing next set of blocks.
select {
case <-b.quit:
return nil, chainntnfs.TxNotFoundManually,
fmt.Errorf("unable to get block with hash "+
"%v: %v", blockHash, err)
chainntnfs.ErrChainNotifierShuttingDown
default:
}

// For every transaction in the block, check which one matches
// our request. If we find one that does, we can dispatch its
// confirmation details.
for txIndex, tx := range block.Transactions {
if !confRequest.MatchesTx(tx) {
continue
}
start := i
end := i + batchSize

if end > total {
end = total
}

blocks, err := b.chainConn.GetBlocksBatch(
blockHashes[start:end],
)

Vib-UX marked this conversation as resolved.
Show resolved Hide resolved
return &chainntnfs.TxConfirmation{
Tx: tx,
BlockHash: blockHash,
BlockHeight: height,
TxIndex: uint32(txIndex),
Block: block,
}, chainntnfs.TxFoundManually, nil
if err != nil {
return nil, chainntnfs.TxNotFoundManually, err
}

// Note:- blockHashes are stored in reverse order
// currentHeight --> heightHint, so we maintain the same refs
// of currentHeight to return the correct BlockHeight.
height := int(currentHeight) - start
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps worth adding a comment here to explain what we track with height.


for j := range blocks {
// For every transaction in the block, check which one
// matches our request. If we find one that does, we can
// dispatch its confirmation details.
for txIndex, tx := range blocks[j].Transactions {
if confRequest.MatchesTx(tx) {
return &chainntnfs.TxConfirmation{
Tx: tx,
BlockHash: blockHashes[start+j],
BlockHeight: uint32(height - j),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is also a + ? Maybe I'm wrong, but since we iterate from currentHeight backwards, height is always the start of the current batch iuuc.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be -ve only, as we traverse height from currentHeight to heightHint with height--. Now since we are traversing backwards.

Lets take an example,

  • currentHeight = 100
  • start=0
  • batchSize = 15
  • j is traversing from block=100 to block=86

So here if we found the relevantTxMatch at j=0 we return currentHeight-j --> 100-0 = 100

TxIndex: uint32(txIndex),
Vib-UX marked this conversation as resolved.
Show resolved Hide resolved
Block: blocks[j],
}, chainntnfs.TxFoundManually, nil
}
}
}
}

Expand Down Expand Up @@ -786,8 +835,24 @@ func (b *BitcoindNotifier) historicalSpendDetails(
spendRequest chainntnfs.SpendRequest, startHeight, endHeight uint32) (
*chainntnfs.SpendDetail, error) {

// Begin scanning blocks at every height to determine if the outpoint
// was spent.
// batchRequest will store the scan requests at every height to determine
// if the output was spent.
batchRequest := make(
map[uint32]rpcclient.FutureGetBlockHashResult, endHeight-startHeight,
)

Vib-UX marked this conversation as resolved.
Show resolved Hide resolved
for height := endHeight; height >= startHeight && height > 0; height-- {
batchRequest[height] = b.chainConn.GetBlockHashAsync(int64(height))
}

// Sending the bulk request using updated batchClient.
err := b.chainConn.SendAsyncQueue()
if err != nil {
return nil, err
}

blockHashes := make([]*chainhash.Hash, 0, len(batchRequest))

for height := endHeight; height >= startHeight && height > 0; height-- {
// Ensure we haven't been requested to shut down before
// processing the next height.
Expand All @@ -797,38 +862,72 @@ func (b *BitcoindNotifier) historicalSpendDetails(
default:
}

// First, we'll fetch the block for the current height.
blockHash, err := b.chainConn.GetBlockHash(int64(height))
// Receive the next block hash from the async queue.
blockHash, err := batchRequest[height].Receive()
if err != nil {
return nil, fmt.Errorf("unable to retrieve hash for "+
"block with height %d: %v", height, err)
}
block, err := b.GetBlock(blockHash)

blockHashes = append(blockHashes, blockHash)
}

// Now we fetch blocks in interval of 15 to avoid out of memory errors
// in case of fetching too many blocks with GetBlocksBatch().
const batchSize = 15
total := len(blockHashes)

for i := 0; i < total; i += batchSize {
// Ensure we haven't been requested to shut down before
// processing next set of blocks.
select {
case <-b.quit:
return nil, chainntnfs.ErrChainNotifierShuttingDown
default:
}

start := i
end := i + batchSize

if end > total {
end = total
}

blocks, err := b.chainConn.GetBlocksBatch(
blockHashes[start:end],
)
if err != nil {
return nil, fmt.Errorf("unable to retrieve block "+
"with hash %v: %v", blockHash, err)
return nil, err
}

// Then, we'll manually go over every input in every transaction
// in it and determine whether it spends the request in
// question. If we find one, we'll dispatch the spend details.
for _, tx := range block.Transactions {
matches, inputIdx, err := spendRequest.MatchesTx(tx)
if err != nil {
return nil, err
}
if !matches {
continue
}
// Note:- blockHashes are stored in reverse order
// endHeight --> startHeight, so we maintain the same refs
// of endHeight to return the correct SpendHeight.
height := int(endHeight) - start
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some comment there to help readers understand the stepping.


for j := range blocks {
// Now we'll manually go over every input in every
// transaction in it and determine whether it spends the
// request in question. If we find one, we'll dispatch
// the spend details.
for _, tx := range blocks[j].Transactions {
matches, inputIdx, err := spendRequest.MatchesTx(tx)
if err != nil {
return nil, err
}
if !matches {
continue
}

txHash := tx.TxHash()
return &chainntnfs.SpendDetail{
SpentOutPoint: &tx.TxIn[inputIdx].PreviousOutPoint,
SpenderTxHash: &txHash,
SpendingTx: tx,
SpenderInputIndex: inputIdx,
SpendingHeight: int32(height),
}, nil
txHash := tx.TxHash()
return &chainntnfs.SpendDetail{
SpentOutPoint: &tx.TxIn[inputIdx].PreviousOutPoint,
SpenderTxHash: &txHash,
SpendingTx: tx,
SpenderInputIndex: inputIdx,
SpendingHeight: int32(height - j),
}, nil
}
}
}

Expand Down
100 changes: 97 additions & 3 deletions chainntnfs/bitcoindnotify/bitcoind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ package bitcoindnotify

import (
"bytes"

"io/ioutil"
"testing"
"time"

"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/integration/rpctest"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcwallet/chain"
"github.com/lightningnetwork/lnd/blockcache"
"github.com/lightningnetwork/lnd/chainntnfs"
Expand Down Expand Up @@ -243,32 +245,124 @@ func testHistoricalConfDetailsNoTxIndex(t *testing.T, rpcpolling bool) {
// ensured above.
outpoint, output, privKey := chainntnfs.CreateSpendableOutput(t, miner)
spendTx := chainntnfs.CreateSpendTx(t, outpoint, output, privKey)
const noOfBlocks = 100
spendTxHash, err := miner.Client.SendRawTransaction(spendTx, true)
require.NoError(t, err, "unable to broadcast tx")
broadcastHeight = syncNotifierWithMiner(t, notifier, miner)
if err := chainntnfs.WaitForMempoolTx(miner, spendTxHash); err != nil {
t.Fatalf("tx not relayed to miner: %v", err)
}
if _, err := miner.Client.Generate(1); err != nil {
t.Fatalf("unable to generate block: %v", err)
if _, err := miner.Client.Generate(noOfBlocks); err != nil {
t.Fatalf("unable to generate blocks: %v", err)
}

// Ensure the notifier and miner are synced to the same height to ensure
// we can find the transaction when manually scanning the chain.
confReq, err := chainntnfs.NewConfRequest(&outpoint.Hash, output.PkScript)
require.NoError(t, err, "unable to create conf request")
currentHeight := syncNotifierWithMiner(t, notifier, miner)
_, txStatus, err = notifier.historicalConfDetails(
txConfirmation, txStatus, err := notifier.historicalConfDetails(
confReq, uint32(broadcastHeight), uint32(currentHeight),
)
require.NoError(t, err, "unable to retrieve historical conf details")
blockHash, err := notifier.chainConn.GetBlockHash(int64(broadcastHeight))
require.NoError(t, err, "unable to get blockHash")

// Since the backend node's txindex is disabled and the transaction has
// confirmed, we should be able to find it by falling back to scanning
// the chain manually.
switch txStatus {
case chainntnfs.TxFoundManually:
require.Equal(
t, txConfirmation.BlockHash, blockHash, "blockhash mismatch",
)
require.Equal(
t, txConfirmation.BlockHeight, broadcastHeight, "height mismatch",
)
default:
t.Fatal("should have found the transaction by manually " +
"scanning the chain, but did not")
}
}

// TestHistoricalSpendDetailsNoTxIndex ensures that we correctly retrieve
// historical spend details using the set of fallback methods when the
// backend node's txindex is disabled.
func TestHistoricalSpendDetailsNoTxIndex(t *testing.T) {
testHistoricalSpendDetailsNoTxIndex(t, true)
testHistoricalSpendDetailsNoTxIndex(t, false)
}

func testHistoricalSpendDetailsNoTxIndex(t *testing.T, rpcPolling bool) {
miner, tearDown := chainntnfs.NewMiner(t, nil, true, 25)
defer tearDown()

bitcoindConn, cleanUp := chainntnfs.NewBitcoindBackend(
t, miner.P2PAddress(), false, rpcPolling,
)
defer cleanUp()

hintCache := initHintCache(t)
blockCache := blockcache.NewBlockCache(10000)

notifier := setUpNotifier(
t, bitcoindConn, hintCache, hintCache, blockCache,
)
defer func() {
err := notifier.Stop()
require.NoError(t, err)
}()

// Since the node has its txindex disabled, we fall back to scanning the
// chain manually. A outpoint unknown to the network should not be
// notified.
var unknownHash chainhash.Hash
copy(unknownHash[:], bytes.Repeat([]byte{0x10}, 32))
invalidOutpoint := wire.NewOutPoint(&unknownHash, 0)
unknownSpendReq, err := chainntnfs.NewSpendRequest(invalidOutpoint, testScript)
require.NoError(t, err, "unable to create spend request")
broadcastHeight := syncNotifierWithMiner(t, notifier, miner)
spendDetails, errSpend := notifier.historicalSpendDetails(
unknownSpendReq, broadcastHeight, broadcastHeight,
)
require.NoError(t, errSpend, "unable to retrieve historical spend details")
require.Equal(t, spendDetails, (*chainntnfs.SpendDetail)(nil))

// Now, we'll create a test transaction and attempt to retrieve its
// spending details. In order to fall back to manually scanning the
// chain, the transaction must be in the chain and not contain any
// unspent outputs. To ensure this, we'll create a transaction with only
// one output, which we will manually spend. The backend node's
// transaction index should also be disabled, which we've already
// ensured above.
outpoint, output, privKey := chainntnfs.CreateSpendableOutput(t, miner)
spendTx := chainntnfs.CreateSpendTx(t, outpoint, output, privKey)
const noOfBlocks = 100
spendTxHash, err := miner.Client.SendRawTransaction(spendTx, true)
require.NoError(t, err, "unable to broadcast tx")
broadcastHeight = syncNotifierWithMiner(t, notifier, miner)
err = chainntnfs.WaitForMempoolTx(miner, spendTxHash)
require.NoError(t, err, "tx not relayed to miner")
_, err = miner.Client.Generate(noOfBlocks)
require.NoError(t, err, "unable to generate blocks")

// Ensure the notifier and miner are synced to the same height to ensure
// we can find the transaction spend details when manually scanning the
// chain.
spendReq, err := chainntnfs.NewSpendRequest(outpoint, output.PkScript)
require.NoError(t, err, "unable to create conf request")
currentHeight := syncNotifierWithMiner(t, notifier, miner)
validSpendDetails, err := notifier.historicalSpendDetails(
spendReq, broadcastHeight, currentHeight,
)
require.NoError(t, err, "unable to retrieve historical spend details")

// Since the backend node's txindex is disabled and the transaction has
// confirmed, we should be able to find it by falling back to scanning
// the chain manually.
require.NotNil(t, validSpendDetails)
require.Equal(t, validSpendDetails.SpentOutPoint, outpoint)
require.Equal(t, validSpendDetails.SpenderTxHash, spendTxHash)
require.Equal(t, validSpendDetails.SpendingTx, spendTx)
require.Equal(t, validSpendDetails.SpendingHeight, int32(broadcastHeight+1))
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if indexing would be off this would have thrown error.

I compared actualSpendHeight with received BlockHeight from validSpendDetails.

}
Loading