Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include block header and height in init #2874

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package fr.acinq.eclair

import com.typesafe.config.{Config, ConfigFactory, ConfigValueType}
import fr.acinq.bitcoin.BlockHeader
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.{Block, BlockHash, Crypto, Satoshi}
import fr.acinq.eclair.Setup.Seeds
Expand Down Expand Up @@ -57,6 +58,7 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
onChainKeyManager_opt: Option[OnChainKeyManager],
instanceId: UUID, // a unique instance ID regenerated after each restart
private val blockHeight: AtomicLong,
private val blockHeader: AtomicReference[BlockHeader],
private val feerates: AtomicReference[FeeratesPerKw],
alias: String,
color: Color,
Expand Down Expand Up @@ -100,6 +102,8 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,

def currentBlockHeight: BlockHeight = BlockHeight(blockHeight.get)

def currentBlockHeader: BlockHeader = blockHeader.get()

def currentFeerates: FeeratesPerKw = feerates.get()

/** Only to be used in tests. */
Expand Down Expand Up @@ -215,7 +219,7 @@ object NodeParams extends Logging {

def makeNodeParams(config: Config, instanceId: UUID,
nodeKeyManager: NodeKeyManager, channelKeyManager: ChannelKeyManager, onChainKeyManager_opt: Option[OnChainKeyManager],
torAddress_opt: Option[NodeAddress], database: Databases, blockHeight: AtomicLong, feerates: AtomicReference[FeeratesPerKw],
torAddress_opt: Option[NodeAddress], database: Databases, blockHeight: AtomicLong, blockHeader: AtomicReference[BlockHeader], feerates: AtomicReference[FeeratesPerKw],
pluginParams: Seq[PluginParams] = Nil): NodeParams = {
// check configuration for keys that have been renamed
val deprecatedKeyPaths = Map(
Expand Down Expand Up @@ -482,6 +486,7 @@ object NodeParams extends Logging {
onChainKeyManager_opt = onChainKeyManager_opt,
instanceId = instanceId,
blockHeight = blockHeight,
blockHeader = blockHeader,
feerates = feerates,
alias = nodeAlias,
color = Color(color(0), color(1), color(2)),
Expand Down
17 changes: 13 additions & 4 deletions eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import akka.actor.typed.scaladsl.adapter.{ClassicActorRefOps, ClassicActorSystem
import akka.actor.{ActorRef, ActorSystem, Props, SupervisorStrategy, typed}
import akka.pattern.after
import akka.util.Timeout
import fr.acinq.bitcoin.BlockHeader
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.{Block, BlockHash, BlockId, ByteVector32, Satoshi, Script, addressToPublicKeyScript}
import fr.acinq.eclair.Setup.Seeds
Expand Down Expand Up @@ -110,6 +111,11 @@ class Setup(val datadir: File,
*/
val blockHeight = new AtomicLong(0)

/**
* This holds the latest block header we've received.
*/
val blockHeader = new AtomicReference[BlockHeader](null)

/**
* This holds the current feerates, in satoshi-per-kilobytes.
* The value is read by all actors, hence it needs to be thread-safe.
Expand All @@ -134,7 +140,7 @@ class Setup(val datadir: File,
case "password" => BitcoinJsonRPCAuthMethod.UserPassword(config.getString("bitcoind.rpcuser"), config.getString("bitcoind.rpcpassword"))
}

case class BitcoinStatus(version: Int, chainHash: BlockHash, initialBlockDownload: Boolean, verificationProgress: Double, blockCount: Long, headerCount: Long, unspentAddresses: List[String])
case class BitcoinStatus(version: Int, chainHash: BlockHash, initialBlockDownload: Boolean, verificationProgress: Double, blockCount: Long, headerCount: Long, latestHeader: BlockHeader, unspentAddresses: List[String])

def getBitcoinStatus(bitcoinClient: BasicBitcoinJsonRPCClient): Future[BitcoinStatus] = for {
json <- bitcoinClient.invoke("getblockchaininfo").recover { case e => throw BitcoinRPCConnectionException(e) }
Expand All @@ -150,6 +156,8 @@ class Setup(val datadir: File,
// NB: bitcoind confusingly returns the blockId instead of the blockHash.
chainHash <- bitcoinClient.invoke("getblockhash", 0).map(_.extract[String]).map(s => BlockId(ByteVector32.fromValidHex(s))).map(BlockHash(_))
bitcoinVersion <- bitcoinClient.invoke("getnetworkinfo").map(json => json \ "version").map(_.extract[Int])
blockHash <- bitcoinClient.invoke("getblockhash", blocks).map(_.extract[String])
latestHeader <- bitcoinClient.invoke("getblockheader", blockHash, /* verbose */ false).map(_.extract[String]).map(BlockHeader.read)
unspentAddresses <- bitcoinClient.invoke("listunspent").recover { _ => if (wallet.isEmpty && wallets.length > 1) throw BitcoinDefaultWalletException(wallets) else throw BitcoinWalletNotLoadedException(wallet.getOrElse(""), wallets) }
.collect { case JArray(values) =>
values
Expand All @@ -162,7 +170,7 @@ class Setup(val datadir: File,
case "signet" => bitcoinClient.invoke("getrawtransaction", "ff1027486b628b2d160859205a3401fb2ee379b43527153b0b50a92c17ee7955") // coinbase of #5000
case "regtest" => Future.successful(())
}
} yield BitcoinStatus(bitcoinVersion, chainHash, ibd, progress, blocks, headers, unspentAddresses)
} yield BitcoinStatus(bitcoinVersion, chainHash, ibd, progress, blocks, headers, latestHeader, unspentAddresses)

def pollBitcoinStatus(bitcoinClient: BasicBitcoinJsonRPCClient): Future[BitcoinStatus] = {
getBitcoinStatus(bitcoinClient).transformWith {
Expand Down Expand Up @@ -197,16 +205,17 @@ class Setup(val datadir: File,
assert(bitcoinStatus.verificationProgress > 0.999, s"bitcoind should be synchronized (progress=${bitcoinStatus.verificationProgress})")
assert(bitcoinStatus.headerCount - bitcoinStatus.blockCount <= 1, s"bitcoind should be synchronized (headers=${bitcoinStatus.headerCount} blocks=${bitcoinStatus.blockCount})")
}
logger.info(s"current blockchain height=${bitcoinStatus.blockCount}")
logger.info(s"current blockchain height=${bitcoinStatus.blockCount} header=${ByteVector(BlockHeader.write(bitcoinStatus.latestHeader)).toHex}")
blockHeight.set(bitcoinStatus.blockCount)
blockHeader.set(bitcoinStatus.latestHeader)
(bitcoinClient, bitcoinStatus.chainHash)
}

val instanceId = UUID.randomUUID()
logger.info(s"connecting to database with instanceId=$instanceId")
val databases = Databases.init(config.getConfig("db"), instanceId, chaindir, db)

val nodeParams = NodeParams.makeNodeParams(config, instanceId, nodeKeyManager, channelKeyManager, onChainKeyManager_opt, initTor(), databases, blockHeight, feeratesPerKw, pluginParams)
val nodeParams = NodeParams.makeNodeParams(config, instanceId, nodeKeyManager, channelKeyManager, onChainKeyManager_opt, initTor(), databases, blockHeight, blockHeader, feeratesPerKw, pluginParams)

logger.info(s"nodeid=${nodeParams.nodeId} alias=${nodeParams.alias}")
assert(bitcoinChainHash == nodeParams.chainHash, s"chainHash mismatch (conf=${nodeParams.chainHash} != bitcoind=$bitcoinChainHash)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package fr.acinq.eclair.blockchain

import fr.acinq.bitcoin.BlockHeader
import fr.acinq.bitcoin.scalacompat.{BlockId, Transaction}
import fr.acinq.eclair.BlockHeight
import fr.acinq.eclair.blockchain.fee.FeeratesPerKw
Expand All @@ -30,6 +31,6 @@ case class NewBlock(blockId: BlockId) extends BlockchainEvent

case class NewTransaction(tx: Transaction) extends BlockchainEvent

case class CurrentBlockHeight(blockHeight: BlockHeight) extends BlockchainEvent
case class CurrentBlockHeight(blockHeight: BlockHeight, blockHeader_opt: Option[BlockHeader]) extends BlockchainEvent

case class CurrentFeerates(feeratesPerKw: FeeratesPerKw) extends BlockchainEvent
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package fr.acinq.eclair.blockchain.bitcoind
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, TimerScheduler}
import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy}
import fr.acinq.bitcoin.BlockHeader
import fr.acinq.bitcoin.scalacompat._
import fr.acinq.eclair.blockchain.Monitoring.Metrics
import fr.acinq.eclair.blockchain._
Expand Down Expand Up @@ -60,7 +61,7 @@ object ZmqWatcher {
private case object TickBlockTimeout extends Command
private case class GetBlockCountFailed(t: Throwable) extends Command
private case class CheckBlockHeight(current: BlockHeight) extends Command
private case class PublishBlockHeight(current: BlockHeight) extends Command
private case class PublishBlockHeight(current: BlockHeight, header_opt: Option[BlockHeader]) extends Command
private case class ProcessNewBlock(blockId: BlockId) extends Command
private case class ProcessNewTransaction(tx: Transaction) extends Command

Expand Down Expand Up @@ -275,9 +276,12 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
Behaviors.same

case TickNewBlock =>
context.pipeToSelf(client.getBlockHeight()) {
context.pipeToSelf(for {
height <- client.getBlockHeight()
header_opt <- client.getBlockHeader(height)
} yield (height, header_opt)) {
case Failure(t) => GetBlockCountFailed(t)
case Success(currentHeight) => PublishBlockHeight(currentHeight)
case Success((currentHeight, header_opt)) => PublishBlockHeight(currentHeight, header_opt)
}
// TODO: beware of the herd effect
KamonExt.timeFuture(Metrics.NewBlockCheckConfirmedDuration.withoutTags()) {
Expand All @@ -288,10 +292,10 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
}
Behaviors.same

case PublishBlockHeight(currentHeight) =>
case PublishBlockHeight(currentHeight, header_opt) =>
log.debug("setting blockHeight={}", currentHeight)
blockHeight.set(currentHeight.toLong)
context.system.eventStream ! EventStream.Publish(CurrentBlockHeight(currentHeight))
context.system.eventStream ! EventStream.Publish(CurrentBlockHeight(currentHeight, header_opt))
Behaviors.same

case TriggerEvent(replyTo, watch, event) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ package fr.acinq.eclair.blockchain.bitcoind.rpc
import fr.acinq.bitcoin.psbt.Psbt
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat._
import fr.acinq.bitcoin.{Bech32, Block, SigHash}
import fr.acinq.bitcoin.{Bech32, Block, BlockHeader, SigHash}
import fr.acinq.eclair.ShortChannelId.coordinates
import fr.acinq.eclair.blockchain.OnChainWallet
import fr.acinq.eclair.blockchain.OnChainWallet.{FundTransactionResponse, MakeFundingTxResponse, OnChainBalance, ProcessPsbtResponse}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{GetTxWithMetaResponse, UtxoStatus, ValidateResult}
import fr.acinq.eclair.blockchain.fee.{FeeratePerKB, FeeratePerKw}
import fr.acinq.eclair.crypto.keymanager.OnChainKeyManager
import fr.acinq.eclair.json.SatoshiSerializer
import fr.acinq.eclair.transactions.Transactions
import fr.acinq.eclair.wire.protocol.ChannelAnnouncement
import fr.acinq.eclair.{BlockHeight, TimestampSecond, TxCoordinates}
Expand Down Expand Up @@ -655,6 +654,13 @@ class BitcoinCoreClient(val rpcClient: BitcoinJsonRPCClient, val onChainKeyManag
case JInt(count) => BlockHeight(count.toLong)
}

def getBlockHeader(height: BlockHeight)(implicit ec: ExecutionContext): Future[Option[BlockHeader]] =
rpcClient.invoke("getblockhash", height.toLong)
.collect { case JString(blockHash) => blockHash }
.flatMap(blockHash => rpcClient.invoke("getblockheader", blockHash, /* verbose */ false))
.collect { case JString(blockHeader) => Some(BlockHeader.read(blockHeader)) }
.recover { case _ => None }

def validate(c: ChannelAnnouncement)(implicit ec: ExecutionContext): Future[ValidateResult] = {
val TxCoordinates(blockHeight, txIndex, outputIndex) = coordinates(c.shortChannelId)
for {
Expand Down
21 changes: 13 additions & 8 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ package fr.acinq.eclair.io

import akka.actor.{ActorRef, FSM, OneForOneStrategy, PoisonPill, Props, Stash, SupervisorStrategy, Terminated}
import akka.event.Logging.MDC
import fr.acinq.bitcoin.scalacompat.{BlockHash, ByteVector32}
import fr.acinq.bitcoin.BlockHeader
import fr.acinq.bitcoin.scalacompat.BlockHash
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair.crypto.Noise.KeyPair
Expand All @@ -28,7 +29,7 @@ import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.wire.protocol
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{FSMDiagnosticActorLogging, FeatureCompatibilityResult, Features, InitFeature, Logs, TimestampMilli, TimestampSecond}
import fr.acinq.eclair.{BlockHeight, FSMDiagnosticActorLogging, Features, InitFeature, Logs, TimestampMilli, TimestampSecond}
import scodec.Attempt
import scodec.bits.ByteVector

Expand Down Expand Up @@ -103,14 +104,16 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
}

when(BEFORE_INIT) {
case Event(InitializeConnection(peer, chainHash, localFeatures, doSync), d: BeforeInitData) =>
case Event(InitializeConnection(peer, chainHash, currentBlockHeight, currentBlockHeader, localFeatures, doSync), d: BeforeInitData) =>
d.transport ! TransportHandler.Listener(self)
Metrics.PeerConnectionsConnecting.withTag(Tags.ConnectionState, Tags.ConnectionStates.Initializing).increment()
log.debug(s"using features=$localFeatures")
val localInit = d.pendingAuth.address match {
case remoteAddress if !d.pendingAuth.outgoing && conf.sendRemoteAddressInit && NodeAddress.isPublicIPAddress(remoteAddress) => protocol.Init(localFeatures, TlvStream(InitTlv.Networks(chainHash :: Nil), InitTlv.RemoteAddress(remoteAddress)))
case _ => protocol.Init(localFeatures, TlvStream(InitTlv.Networks(chainHash :: Nil)))
}
val tlvs = TlvStream(Set(
Some(InitTlv.Networks(chainHash :: Nil)),
if (!d.pendingAuth.outgoing && conf.sendRemoteAddressInit && NodeAddress.isPublicIPAddress(d.pendingAuth.address)) Some(InitTlv.RemoteAddress(d.pendingAuth.address)) else None,
Some(InitTlv.LatestBlockHeader(currentBlockHeight, currentBlockHeader))
).flatten[InitTlv])
val localInit = protocol.Init(localFeatures, tlvs)
d.transport ! localInit
startSingleTimer(INIT_TIMER, InitTimeout, conf.initTimeout)
unstashAll() // unstash remote init if it already arrived
Expand All @@ -130,6 +133,8 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A

log.info(s"peer is using features=${remoteInit.features}, networks=${remoteInit.networks.mkString(",")}")
remoteInit.remoteAddress_opt.foreach(address => log.info("peer reports that our IP address is {} (public={})", address.toString, NodeAddress.isPublicIPAddress(address)))
remoteInit.currentBlockHeight_opt.foreach(height => log.info("peer reports that the current block height is {}", height.toLong))
remoteInit.currentBlockHeader_opt.foreach(header => log.info("peer reports that the current block header is {}", ByteVector(BlockHeader.write(header)).toHex))

val featureGraphErr_opt = Features.validateFeatureGraph(remoteInit.features)
val featuresCompatibilityResult = Features.testCompatible(d.localInit.features, remoteInit.features)
Expand Down Expand Up @@ -574,7 +579,7 @@ object PeerConnection {
def outgoing: Boolean = remoteNodeId_opt.isDefined // if this is an outgoing connection, we know the node id in advance
}
case class Authenticated(peerConnection: ActorRef, remoteNodeId: PublicKey, outgoing: Boolean) extends RemoteTypes
case class InitializeConnection(peer: ActorRef, chainHash: BlockHash, features: Features[InitFeature], doSync: Boolean) extends RemoteTypes
case class InitializeConnection(peer: ActorRef, chainHash: BlockHash, currentBlockHeight: BlockHeight, currentBlockHeader: BlockHeader, features: Features[InitFeature], doSync: Boolean) extends RemoteTypes
case class ConnectionReady(peerConnection: ActorRef, remoteNodeId: PublicKey, address: NodeAddress, outgoing: Boolean, localInit: protocol.Init, remoteInit: protocol.Init) extends RemoteTypes

sealed trait ConnectionResult extends RemoteTypes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class Switchboard(nodeParams: NodeParams, peerFactory: Switchboard.PeerFactory)
val hasChannels = peersWithChannels.contains(authenticated.remoteNodeId)
// if the peer is whitelisted, we sync with them, otherwise we only sync with peers with whom we have at least one channel
val doSync = nodeParams.syncWhitelist.contains(authenticated.remoteNodeId) || (nodeParams.syncWhitelist.isEmpty && hasChannels)
authenticated.peerConnection ! PeerConnection.InitializeConnection(peer, nodeParams.chainHash, features, doSync)
authenticated.peerConnection ! PeerConnection.InitializeConnection(peer, nodeParams.chainHash, nodeParams.currentBlockHeight, nodeParams.currentBlockHeader, features, doSync)
if (!hasChannels && !authenticated.outgoing) {
incomingConnectionsTracker ! TrackIncomingConnection(authenticated.remoteNodeId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private class AsyncPaymentTriggerer(context: ActorContext[Command]) {
case (remoteNodeId, peer) => peer.cancel(paymentHash).map(peer1 => remoteNodeId -> peer1)
}
watching(peers1)
case WrappedCurrentBlockHeight(CurrentBlockHeight(currentBlockHeight)) =>
case WrappedCurrentBlockHeight(CurrentBlockHeight(currentBlockHeight, _)) =>
val peers1 = peers.flatMap {
case (remoteNodeId, peer) => peer.update(currentBlockHeight).map(peer1 => remoteNodeId -> peer1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ object EclairInternalsSerializer {
val messageRouteParamsCodec: Codec[MessageRouteParams] = (
("maxRouteLength" | int32) ::
(("baseFactor" | double) ::
("ageFactor" | double) ::
("capacityFactor" | double)).as[Graph.MessagePath.WeightRatios]).as[MessageRouteParams]
("ageFactor" | double) ::
("capacityFactor" | double)).as[Graph.MessagePath.WeightRatios]).as[MessageRouteParams]

val routerConfCodec: Codec[RouterConf] = (
("watchSpentWindow" | finiteDurationCodec) ::
Expand Down Expand Up @@ -150,6 +150,8 @@ object EclairInternalsSerializer {
def initializeConnectionCodec(system: ExtendedActorSystem): Codec[PeerConnection.InitializeConnection] = (
("peer" | actorRefCodec(system)) ::
("chainHash" | blockHash) ::
("currentBlockHeight" | blockHeight) ::
("currentBlockHeader" | blockHeader) ::
("features" | variableSizeBytes(uint16, initFeaturesCodec)) ::
("doSync" | bool(8))).as[PeerConnection.InitializeConnection]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package fr.acinq.eclair.wire.protocol

import fr.acinq.bitcoin.BlockHeader
import fr.acinq.bitcoin.scalacompat.Crypto.{PrivateKey, PublicKey}
import fr.acinq.bitcoin.scalacompat.{BlockHash, ByteVector32, ByteVector64, Satoshi, Transaction, TxHash, TxId}
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
Expand Down Expand Up @@ -170,6 +171,8 @@ object CommonCodecs {

val txCodec: Codec[Transaction] = bytes.xmap(d => Transaction.read(d.toArray), d => Transaction.write(d))

val blockHeader: Codec[BlockHeader] = bytes(80).xmap(b => BlockHeader.read(b.toArray), h => ByteVector(BlockHeader.write(h)))

def zeropaddedstring(size: Int): Codec[String] = fixedSizeBytes(size, utf8).xmap(s => s.takeWhile(_ != '\u0000'), s => s)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ sealed trait HtlcFailureMessage extends HtlcSettlementMessage // <- not in the s
case class Init(features: Features[InitFeature], tlvStream: TlvStream[InitTlv] = TlvStream.empty) extends SetupMessage {
val networks = tlvStream.get[InitTlv.Networks].map(_.chainHashes).getOrElse(Nil)
val remoteAddress_opt = tlvStream.get[InitTlv.RemoteAddress].map(_.address)
val currentBlockHeight_opt = tlvStream.get[InitTlv.LatestBlockHeader].map(_.blockHeight)
val currentBlockHeader_opt = tlvStream.get[InitTlv.LatestBlockHeader].map(_.blockHeader)
}

case class Warning(channelId: ByteVector32, data: ByteVector, tlvStream: TlvStream[WarningTlv] = TlvStream.empty) extends SetupMessage with HasChannelId {
Expand Down
Loading
Loading