-
Notifications
You must be signed in to change notification settings - Fork 0
Update Chainnotifier Service to leverage batch-request support #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ import ( | |
"github.com/btcsuite/btcd/btcutil" | ||
"github.com/btcsuite/btcd/chaincfg" | ||
"github.com/btcsuite/btcd/chaincfg/chainhash" | ||
"github.com/btcsuite/btcd/rpcclient" | ||
"github.com/btcsuite/btcd/txscript" | ||
"github.com/btcsuite/btcd/wire" | ||
"github.com/btcsuite/btcwallet/chain" | ||
|
@@ -525,8 +526,24 @@ func (b *BitcoindNotifier) confDetailsManually(confRequest chainntnfs.ConfReques | |
heightHint, currentHeight uint32) (*chainntnfs.TxConfirmation, | ||
chainntnfs.TxConfStatus, error) { | ||
|
||
// Begin scanning blocks at every height to determine where the | ||
// transaction was included in. | ||
// batchRequest will store the scan requests at every height to determine | ||
// where the transaction was included in. | ||
batchRequest := make( | ||
map[uint32]rpcclient.FutureGetBlockHashResult, currentHeight-heightHint, | ||
) | ||
|
||
for height := currentHeight; height >= heightHint && height > 0; height-- { | ||
batchRequest[height] = b.chainConn.GetBlockHashAsync(int64(height)) | ||
} | ||
|
||
// Sending the bulk request using the updated batchClient. | ||
err := b.chainConn.SendAsyncQueue() | ||
if err != nil { | ||
return nil, chainntnfs.TxNotFoundManually, err | ||
} | ||
|
||
blockHashes := make([]*chainhash.Hash, 0, len(batchRequest)) | ||
|
||
for height := currentHeight; height >= heightHint && height > 0; height-- { | ||
// Ensure we haven't been requested to shut down before | ||
// processing the next height. | ||
|
@@ -537,35 +554,67 @@ func (b *BitcoindNotifier) confDetailsManually(confRequest chainntnfs.ConfReques | |
default: | ||
} | ||
|
||
blockHash, err := b.chainConn.GetBlockHash(int64(height)) | ||
// Receive the next block hash from the async queue. | ||
blockHash, err := batchRequest[height].Receive() | ||
if err != nil { | ||
return nil, chainntnfs.TxNotFoundManually, | ||
fmt.Errorf("unable to get hash from block "+ | ||
"with height %d", height) | ||
fmt.Errorf("unable to retrieve hash for block "+ | ||
"with height %d: %v", height, err) | ||
} | ||
|
||
block, err := b.GetBlock(blockHash) | ||
if err != nil { | ||
blockHashes = append(blockHashes, blockHash) | ||
} | ||
|
||
// Now we fetch blocks in interval of 15 to avoid out of memory | ||
// errors in case of fetching too many blocks with GetBlocksBatch(). | ||
const batchSize = 15 | ||
total := len(blockHashes) | ||
|
||
for i := 0; i < total; i += batchSize { | ||
// Ensure we haven't been requested to shut down before | ||
// processing next set of blocks. | ||
select { | ||
case <-b.quit: | ||
return nil, chainntnfs.TxNotFoundManually, | ||
fmt.Errorf("unable to get block with hash "+ | ||
"%v: %v", blockHash, err) | ||
chainntnfs.ErrChainNotifierShuttingDown | ||
default: | ||
} | ||
|
||
// For every transaction in the block, check which one matches | ||
// our request. If we find one that does, we can dispatch its | ||
// confirmation details. | ||
for txIndex, tx := range block.Transactions { | ||
if !confRequest.MatchesTx(tx) { | ||
continue | ||
} | ||
start := i | ||
end := i + batchSize | ||
|
||
if end > total { | ||
end = total | ||
} | ||
|
||
blocks, err := b.chainConn.GetBlocksBatch( | ||
blockHashes[start:end], | ||
) | ||
|
||
Vib-UX marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return &chainntnfs.TxConfirmation{ | ||
Tx: tx, | ||
BlockHash: blockHash, | ||
BlockHeight: height, | ||
TxIndex: uint32(txIndex), | ||
Block: block, | ||
}, chainntnfs.TxFoundManually, nil | ||
if err != nil { | ||
return nil, chainntnfs.TxNotFoundManually, err | ||
} | ||
|
||
// Note:- blockHashes are stored in reverse order | ||
// currentHeight --> heightHint, so we maintain the same refs | ||
// of currentHeight to return the correct BlockHeight. | ||
height := int(currentHeight) - start | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps worth adding a comment here to explain what we track with |
||
|
||
for j := range blocks { | ||
// For every transaction in the block, check which one | ||
// matches our request. If we find one that does, we can | ||
// dispatch its confirmation details. | ||
for txIndex, tx := range blocks[j].Transactions { | ||
if confRequest.MatchesTx(tx) { | ||
return &chainntnfs.TxConfirmation{ | ||
Tx: tx, | ||
BlockHash: blockHashes[start+j], | ||
BlockHeight: uint32(height - j), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is also a + ? Maybe I'm wrong, but since we iterate from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it should be -ve only, as we traverse Lets take an example,
So here if we found the relevantTxMatch at |
||
TxIndex: uint32(txIndex), | ||
Vib-UX marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Block: blocks[j], | ||
}, chainntnfs.TxFoundManually, nil | ||
} | ||
} | ||
} | ||
} | ||
|
||
|
@@ -786,8 +835,24 @@ func (b *BitcoindNotifier) historicalSpendDetails( | |
spendRequest chainntnfs.SpendRequest, startHeight, endHeight uint32) ( | ||
*chainntnfs.SpendDetail, error) { | ||
|
||
// Begin scanning blocks at every height to determine if the outpoint | ||
// was spent. | ||
// batchRequest will store the scan requests at every height to determine | ||
// if the output was spent. | ||
batchRequest := make( | ||
map[uint32]rpcclient.FutureGetBlockHashResult, endHeight-startHeight, | ||
) | ||
|
||
Vib-UX marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for height := endHeight; height >= startHeight && height > 0; height-- { | ||
batchRequest[height] = b.chainConn.GetBlockHashAsync(int64(height)) | ||
} | ||
|
||
// Sending the bulk request using updated batchClient. | ||
err := b.chainConn.SendAsyncQueue() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
blockHashes := make([]*chainhash.Hash, 0, len(batchRequest)) | ||
|
||
for height := endHeight; height >= startHeight && height > 0; height-- { | ||
// Ensure we haven't been requested to shut down before | ||
// processing the next height. | ||
|
@@ -797,38 +862,72 @@ func (b *BitcoindNotifier) historicalSpendDetails( | |
default: | ||
} | ||
|
||
// First, we'll fetch the block for the current height. | ||
blockHash, err := b.chainConn.GetBlockHash(int64(height)) | ||
// Receive the next block hash from the async queue. | ||
blockHash, err := batchRequest[height].Receive() | ||
if err != nil { | ||
return nil, fmt.Errorf("unable to retrieve hash for "+ | ||
"block with height %d: %v", height, err) | ||
} | ||
block, err := b.GetBlock(blockHash) | ||
|
||
blockHashes = append(blockHashes, blockHash) | ||
} | ||
|
||
// Now we fetch blocks in interval of 15 to avoid out of memory errors | ||
// in case of fetching too many blocks with GetBlocksBatch(). | ||
const batchSize = 15 | ||
total := len(blockHashes) | ||
|
||
for i := 0; i < total; i += batchSize { | ||
// Ensure we haven't been requested to shut down before | ||
// processing next set of blocks. | ||
select { | ||
case <-b.quit: | ||
return nil, chainntnfs.ErrChainNotifierShuttingDown | ||
default: | ||
} | ||
|
||
start := i | ||
end := i + batchSize | ||
|
||
if end > total { | ||
end = total | ||
} | ||
|
||
blocks, err := b.chainConn.GetBlocksBatch( | ||
blockHashes[start:end], | ||
) | ||
if err != nil { | ||
return nil, fmt.Errorf("unable to retrieve block "+ | ||
"with hash %v: %v", blockHash, err) | ||
return nil, err | ||
} | ||
|
||
// Then, we'll manually go over every input in every transaction | ||
// in it and determine whether it spends the request in | ||
// question. If we find one, we'll dispatch the spend details. | ||
for _, tx := range block.Transactions { | ||
matches, inputIdx, err := spendRequest.MatchesTx(tx) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if !matches { | ||
continue | ||
} | ||
// Note:- blockHashes are stored in reverse order | ||
// endHeight --> startHeight, so we maintain the same refs | ||
// of endHeight to return the correct SpendHeight. | ||
height := int(endHeight) - start | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add some comment there to help readers understand the stepping. |
||
|
||
for j := range blocks { | ||
// Now we'll manually go over every input in every | ||
// transaction in it and determine whether it spends the | ||
// request in question. If we find one, we'll dispatch | ||
// the spend details. | ||
for _, tx := range blocks[j].Transactions { | ||
matches, inputIdx, err := spendRequest.MatchesTx(tx) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if !matches { | ||
continue | ||
} | ||
|
||
txHash := tx.TxHash() | ||
return &chainntnfs.SpendDetail{ | ||
SpentOutPoint: &tx.TxIn[inputIdx].PreviousOutPoint, | ||
SpenderTxHash: &txHash, | ||
SpendingTx: tx, | ||
SpenderInputIndex: inputIdx, | ||
SpendingHeight: int32(height), | ||
}, nil | ||
txHash := tx.TxHash() | ||
return &chainntnfs.SpendDetail{ | ||
SpentOutPoint: &tx.TxIn[inputIdx].PreviousOutPoint, | ||
SpenderTxHash: &txHash, | ||
SpendingTx: tx, | ||
SpenderInputIndex: inputIdx, | ||
SpendingHeight: int32(height - j), | ||
}, nil | ||
} | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,12 +5,14 @@ package bitcoindnotify | |
|
||
import ( | ||
"bytes" | ||
|
||
"io/ioutil" | ||
"testing" | ||
"time" | ||
|
||
"github.com/btcsuite/btcd/chaincfg/chainhash" | ||
"github.com/btcsuite/btcd/integration/rpctest" | ||
"github.com/btcsuite/btcd/wire" | ||
"github.com/btcsuite/btcwallet/chain" | ||
"github.com/lightningnetwork/lnd/blockcache" | ||
"github.com/lightningnetwork/lnd/chainntnfs" | ||
|
@@ -243,32 +245,124 @@ func testHistoricalConfDetailsNoTxIndex(t *testing.T, rpcpolling bool) { | |
// ensured above. | ||
outpoint, output, privKey := chainntnfs.CreateSpendableOutput(t, miner) | ||
spendTx := chainntnfs.CreateSpendTx(t, outpoint, output, privKey) | ||
const noOfBlocks = 100 | ||
spendTxHash, err := miner.Client.SendRawTransaction(spendTx, true) | ||
require.NoError(t, err, "unable to broadcast tx") | ||
broadcastHeight = syncNotifierWithMiner(t, notifier, miner) | ||
if err := chainntnfs.WaitForMempoolTx(miner, spendTxHash); err != nil { | ||
t.Fatalf("tx not relayed to miner: %v", err) | ||
} | ||
if _, err := miner.Client.Generate(1); err != nil { | ||
t.Fatalf("unable to generate block: %v", err) | ||
if _, err := miner.Client.Generate(noOfBlocks); err != nil { | ||
t.Fatalf("unable to generate blocks: %v", err) | ||
} | ||
|
||
// Ensure the notifier and miner are synced to the same height to ensure | ||
// we can find the transaction when manually scanning the chain. | ||
confReq, err := chainntnfs.NewConfRequest(&outpoint.Hash, output.PkScript) | ||
require.NoError(t, err, "unable to create conf request") | ||
currentHeight := syncNotifierWithMiner(t, notifier, miner) | ||
_, txStatus, err = notifier.historicalConfDetails( | ||
txConfirmation, txStatus, err := notifier.historicalConfDetails( | ||
confReq, uint32(broadcastHeight), uint32(currentHeight), | ||
) | ||
require.NoError(t, err, "unable to retrieve historical conf details") | ||
blockHash, err := notifier.chainConn.GetBlockHash(int64(broadcastHeight)) | ||
require.NoError(t, err, "unable to get blockHash") | ||
|
||
// Since the backend node's txindex is disabled and the transaction has | ||
// confirmed, we should be able to find it by falling back to scanning | ||
// the chain manually. | ||
switch txStatus { | ||
case chainntnfs.TxFoundManually: | ||
require.Equal( | ||
t, txConfirmation.BlockHash, blockHash, "blockhash mismatch", | ||
) | ||
require.Equal( | ||
t, txConfirmation.BlockHeight, broadcastHeight, "height mismatch", | ||
) | ||
default: | ||
t.Fatal("should have found the transaction by manually " + | ||
"scanning the chain, but did not") | ||
} | ||
} | ||
|
||
// TestHistoricalSpendDetailsNoTxIndex ensures that we correctly retrieve | ||
// historical spend details using the set of fallback methods when the | ||
// backend node's txindex is disabled. | ||
func TestHistoricalSpendDetailsNoTxIndex(t *testing.T) { | ||
testHistoricalSpendDetailsNoTxIndex(t, true) | ||
testHistoricalSpendDetailsNoTxIndex(t, false) | ||
} | ||
|
||
func testHistoricalSpendDetailsNoTxIndex(t *testing.T, rpcPolling bool) { | ||
miner, tearDown := chainntnfs.NewMiner(t, nil, true, 25) | ||
defer tearDown() | ||
|
||
bitcoindConn, cleanUp := chainntnfs.NewBitcoindBackend( | ||
t, miner.P2PAddress(), false, rpcPolling, | ||
) | ||
defer cleanUp() | ||
|
||
hintCache := initHintCache(t) | ||
blockCache := blockcache.NewBlockCache(10000) | ||
|
||
notifier := setUpNotifier( | ||
t, bitcoindConn, hintCache, hintCache, blockCache, | ||
) | ||
defer func() { | ||
err := notifier.Stop() | ||
require.NoError(t, err) | ||
}() | ||
|
||
// Since the node has its txindex disabled, we fall back to scanning the | ||
// chain manually. A outpoint unknown to the network should not be | ||
// notified. | ||
var unknownHash chainhash.Hash | ||
copy(unknownHash[:], bytes.Repeat([]byte{0x10}, 32)) | ||
invalidOutpoint := wire.NewOutPoint(&unknownHash, 0) | ||
unknownSpendReq, err := chainntnfs.NewSpendRequest(invalidOutpoint, testScript) | ||
require.NoError(t, err, "unable to create spend request") | ||
broadcastHeight := syncNotifierWithMiner(t, notifier, miner) | ||
spendDetails, errSpend := notifier.historicalSpendDetails( | ||
unknownSpendReq, broadcastHeight, broadcastHeight, | ||
) | ||
require.NoError(t, errSpend, "unable to retrieve historical spend details") | ||
require.Equal(t, spendDetails, (*chainntnfs.SpendDetail)(nil)) | ||
|
||
// Now, we'll create a test transaction and attempt to retrieve its | ||
// spending details. In order to fall back to manually scanning the | ||
// chain, the transaction must be in the chain and not contain any | ||
// unspent outputs. To ensure this, we'll create a transaction with only | ||
// one output, which we will manually spend. The backend node's | ||
// transaction index should also be disabled, which we've already | ||
// ensured above. | ||
outpoint, output, privKey := chainntnfs.CreateSpendableOutput(t, miner) | ||
spendTx := chainntnfs.CreateSpendTx(t, outpoint, output, privKey) | ||
const noOfBlocks = 100 | ||
spendTxHash, err := miner.Client.SendRawTransaction(spendTx, true) | ||
require.NoError(t, err, "unable to broadcast tx") | ||
broadcastHeight = syncNotifierWithMiner(t, notifier, miner) | ||
err = chainntnfs.WaitForMempoolTx(miner, spendTxHash) | ||
require.NoError(t, err, "tx not relayed to miner") | ||
_, err = miner.Client.Generate(noOfBlocks) | ||
require.NoError(t, err, "unable to generate blocks") | ||
|
||
// Ensure the notifier and miner are synced to the same height to ensure | ||
// we can find the transaction spend details when manually scanning the | ||
// chain. | ||
spendReq, err := chainntnfs.NewSpendRequest(outpoint, output.PkScript) | ||
require.NoError(t, err, "unable to create conf request") | ||
currentHeight := syncNotifierWithMiner(t, notifier, miner) | ||
validSpendDetails, err := notifier.historicalSpendDetails( | ||
spendReq, broadcastHeight, currentHeight, | ||
) | ||
require.NoError(t, err, "unable to retrieve historical spend details") | ||
|
||
// Since the backend node's txindex is disabled and the transaction has | ||
// confirmed, we should be able to find it by falling back to scanning | ||
// the chain manually. | ||
require.NotNil(t, validSpendDetails) | ||
require.Equal(t, validSpendDetails.SpentOutPoint, outpoint) | ||
require.Equal(t, validSpendDetails.SpenderTxHash, spendTxHash) | ||
require.Equal(t, validSpendDetails.SpendingTx, spendTx) | ||
require.Equal(t, validSpendDetails.SpendingHeight, int32(broadcastHeight+1)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if indexing would be off this would have thrown error. I compared |
||
} |
Uh oh!
There was an error while loading. Please reload this page.