Skip to content

Commit

Permalink
Delay considering a channel closed when seeing an on-chain spend
Browse files Browse the repository at this point in the history
Issue #2437

When external channel is spent, add it to the spentChannels list instead of immediately removing it from the graph.

Remove spent channels after 12 blocks. When a newly added channel is validated, if it spends the shared output of a recently spent channel then it is a splice.

A splice updates the graph edges to preserve balance estimate information in the graph.
  • Loading branch information
remyers committed Nov 4, 2024
1 parent 96d0c9a commit 5fbb4a9
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import fr.acinq.bitcoin.scalacompat.{Satoshi, SatoshiLong}
import fr.acinq.eclair.router.Graph.GraphStructure.{DirectedGraph, GraphEdge}
import fr.acinq.eclair.router.Router.{ChannelDesc, ChannelHop, Route}
import fr.acinq.eclair.wire.protocol.NodeAnnouncement
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, ShortChannelId, TimestampSecond, TimestampSecondLong, ToMilliSatoshiConversion}
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, RealShortChannelId, ShortChannelId, TimestampSecond, TimestampSecondLong, ToMilliSatoshiConversion}

import scala.concurrent.duration.{DurationInt, FiniteDuration}

Expand Down Expand Up @@ -194,6 +194,18 @@ case class BalanceEstimate private(low: MilliSatoshi,
)
}

def updateEdge(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): BalanceEstimate = {
val newCapacities = capacities - desc.shortChannelId + (newShortChannelId -> newCapacity)
val capacityDelta = (newCapacity - capacities.getOrElse(desc.shortChannelId, newCapacity)).toMilliSatoshi
copy(
// a capacity decrease will decrease the low bound, but not below 0
low = (low + capacityDelta.min(0 msat)).max(0 msat),
// a capacity increase will increase the high bound, but not above the capacity of the largest channel
high = (high + capacityDelta.max(0 msat)).min(newCapacities.values.maxOption.getOrElse(0 sat).toMilliSatoshi),
capacities = newCapacities
)
}

/**
* Estimate the probability that we can successfully send `amount` through the channel
*
Expand Down Expand Up @@ -256,6 +268,14 @@ case class BalancesEstimates(balances: Map[(PublicKey, PublicKey), BalanceEstima
defaultHalfLife
)

def updateEdge(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): BalancesEstimates = BalancesEstimates(
balances.updatedWith((desc.a, desc.b)) {
case None => None
case Some(balance) => Some(balance.updateEdge(desc, newShortChannelId, newCapacity))
},
defaultHalfLife
)

def channelCouldSend(hop: ChannelHop, amount: MilliSatoshi): BalancesEstimates = {
get(hop.nodeId, hop.nextNodeId).foreach { balance =>
val estimatedProbability = balance.canSend(amount)
Expand Down Expand Up @@ -298,6 +318,13 @@ case class GraphWithBalanceEstimates(graph: DirectedGraph, private val balances:
descList.foldLeft(balances)((acc, edge) => acc.removeEdge(edge).removeEdge(edge.reversed)),
)

def updateChannel(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): GraphWithBalanceEstimates = {
GraphWithBalanceEstimates(
graph.updateChannel(desc, newShortChannelId, newCapacity),
balances.updateEdge(desc, newShortChannelId, newCapacity).updateEdge(desc.reversed, newShortChannelId, newCapacity)
)
}

def routeCouldRelay(route: Route): GraphWithBalanceEstimates = {
val (balances1, _) = route.hops.foldRight((balances, route.amount)) {
case (hop, (balances, amount)) =>
Expand Down
20 changes: 20 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,26 @@ object Graph {
descList.foldLeft(this)((acc, edge) => acc.removeChannel(edge))
}

/**
* Update the shortChannelId and capacity of edges corresponding to the given channel-desc,
* both edges (corresponding to both directions) are updated.
*
* @param desc the channel description for the channel to update
* @param newShortChannelId the new shortChannelId for this channel
* @param newCapacity the new capacity of the channel
* @return a new graph with updated vertexes
*/
def updateChannel(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): DirectedGraph = {
val newDesc = desc.copy(shortChannelId = newShortChannelId)
val updatedVertices =
vertices
.updatedWith(desc.b)(_.map(vertexB => vertexB.copy(incomingEdges = vertexB.incomingEdges - desc +
(newDesc -> vertexB.incomingEdges(desc).copy(desc = newDesc, capacity = newCapacity)))))
.updatedWith(desc.a)(_.map(vertexA => vertexA.copy(incomingEdges = vertexA.incomingEdges - desc.reversed +
(newDesc.reversed -> vertexA.incomingEdges(desc.reversed).copy(desc = newDesc.reversed, capacity = newCapacity)))))
DirectedGraph(updatedVertices)
}

/**
* @return For edges to be considered equal they must have the same in/out vertices AND same shortChannelId
*/
Expand Down
24 changes: 22 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.{BlockHash, ByteVector32, Satoshi, TxId}
import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair._
import fr.acinq.eclair.blockchain.CurrentBlockHeight
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{ValidateResult, WatchExternalChannelSpent, WatchExternalChannelSpentTriggered}
import fr.acinq.eclair.channel._
Expand Down Expand Up @@ -64,6 +65,7 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
context.system.eventStream.subscribe(self, classOf[LocalChannelUpdate])
context.system.eventStream.subscribe(self, classOf[LocalChannelDown])
context.system.eventStream.subscribe(self, classOf[AvailableBalanceChanged])
context.system.eventStream.subscribe(self, classOf[CurrentBlockHeight])
context.system.eventStream.publish(SubscriptionsComplete(this.getClass))

startTimerWithFixedDelay(TickBroadcast.toString, TickBroadcast, nodeParams.routerConf.routerBroadcastInterval)
Expand Down Expand Up @@ -113,7 +115,8 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
scid2PrivateChannels = Map.empty,
excludedChannels = Map.empty,
graphWithBalances = GraphWithBalanceEstimates(graph, nodeParams.routerConf.balanceEstimateHalfLife),
sync = Map.empty)
sync = Map.empty,
spentChannels = Map.empty)
startWith(NORMAL, data)
}

Expand Down Expand Up @@ -260,6 +263,18 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
stay() using Validation.handleChannelValidationResponse(d, nodeParams, watcher, r)

case Event(WatchExternalChannelSpentTriggered(shortChannelId), d) if d.channels.contains(shortChannelId) || d.prunedChannels.contains(shortChannelId) =>
log.info("funding tx of channelId={} has been spent - delay removing it from the graph for 12 blocks", shortChannelId)
stay() using d.copy(spentChannels = d.spentChannels + (shortChannelId -> nodeParams.currentBlockHeight))

case Event(c: CurrentBlockHeight, d) =>
val spentChannels1 = d.spentChannels.filter {
// spent channels may be confirmed as a splice; wait 12 blocks before removing them from the graph
case (_, blockHeight) if blockHeight >= c.blockHeight + 12 => true
case (shortChannelId, _) => self ! HandleChannelSpent(shortChannelId); false
}
stay() using d.copy(spentChannels = spentChannels1)

case Event(HandleChannelSpent(shortChannelId), d: Data) if d.channels.contains(shortChannelId) || d.prunedChannels.contains(shortChannelId) =>
stay() using Validation.handleChannelSpent(d, nodeParams.db.network, shortChannelId)

case Event(n: NodeAnnouncement, d: Data) =>
Expand Down Expand Up @@ -757,7 +772,8 @@ object Router {
scid2PrivateChannels: Map[Long, ByteVector32], // real scid or alias to channel_id, only to be used for private channels
excludedChannels: Map[ChannelDesc, ExcludedChannelStatus], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure
graphWithBalances: GraphWithBalanceEstimates,
sync: Map[PublicKey, Syncing] // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message
sync: Map[PublicKey, Syncing], // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message
spentChannels: Map[RealShortChannelId, BlockHeight],
) {
def resolve(scid: ShortChannelId): Option[KnownChannel] = {
// let's assume this is a real scid
Expand Down Expand Up @@ -797,4 +813,8 @@ object Router {

/** We have tried to relay this amount from this channel and it failed. */
case class ChannelCouldNotRelay(amount: MilliSatoshi, hop: ChannelHop)

/** Funding Tx of the channel id has been spent and not updated with a splice within 12 blocks. */
private case class HandleChannelSpent(shortChannelId: RealShortChannelId)

}
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,18 @@ object Validation {
None
} else {
log.debug("validation successful for shortChannelId={}", c.shortChannelId)
val sharedInputTxId_opt = tx.txIn.find(_.signatureScript == fundingOutputScript).map(_.outPoint.txid)
remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.Accepted(c)))
val capacity = tx.txOut(outputIndex).amount
Some(addPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, None))
// if a channel spends the shared output of a recently spent channel, then it is a splice
sharedInputTxId_opt match {
case None => Some(addPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, None))
case Some(sharedInputTxId) =>
d0.spentChannels.find(spent => d0.channels.get(spent._1).exists(_.fundingTxId == sharedInputTxId)) match {
case Some((parentScid, _)) => Some(splicePublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, d0.channels(parentScid)))
case None => log.error("channel shortChannelId={} is a splice, but not matching channel found!", c.shortChannelId); None
}
}
}
case ValidateResult(c, Right((tx, fundingTxStatus: UtxoStatus.Spent))) =>
if (fundingTxStatus.spendingTxConfirmed) {
Expand Down Expand Up @@ -156,6 +165,37 @@ object Validation {
}
}

private def splicePublicChannel(d: Data, nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], ann: ChannelAnnouncement, fundingTxId: TxId, capacity: Satoshi, parentChannel: PublicChannel)(implicit ctx: ActorContext, log: DiagnosticLoggingAdapter): Data = {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
val fundingOutputIndex = outputIndex(ann.shortChannelId)
watcher ! WatchExternalChannelSpent(ctx.self, fundingTxId, fundingOutputIndex, ann.shortChannelId)
ctx.system.eventStream.publish(ChannelsDiscovered(SingleChannelDiscovered(ann, capacity, None, None) :: Nil))
nodeParams.db.network.addChannel(ann, fundingTxId, capacity)
nodeParams.db.network.removeChannel(parentChannel.shortChannelId)
val pubChan = PublicChannel(
ann = ann,
fundingTxId = fundingTxId,
capacity = capacity,
update_1_opt = parentChannel.update_1_opt,
update_2_opt = parentChannel.update_2_opt,
meta_opt = parentChannel.meta_opt
)
log.debug("replacing parent channel scid={} with splice channel scid={}; splice channel={}", parentChannel.shortChannelId, ann.shortChannelId, pubChan)
// we need to update the graph because the edge identifiers and capacity change from the parent scid to the new splice scid
log.debug("updating the graph for shortChannelId={}", pubChan.shortChannelId)
val graph1 = d.graphWithBalances.updateChannel(ChannelDesc(parentChannel.shortChannelId, parentChannel.nodeId1, parentChannel.nodeId2), ann.shortChannelId, capacity)
d.copy(
// we also add the splice scid -> channelId and remove the parent scid -> channelId mappings
channels = d.channels + (pubChan.shortChannelId -> pubChan) - parentChannel.shortChannelId,
// we also add the newly validated channels to the rebroadcast queue
rebroadcast = d.rebroadcast.copy(
// we rebroadcast the splice channel to our peers
channels = d.rebroadcast.channels + (pubChan.ann -> d.awaiting.getOrElse(pubChan.ann, if (pubChan.nodeId1 == nodeParams.nodeId || pubChan.nodeId2 == nodeParams.nodeId) Seq(LocalGossip) else Nil).toSet),
),
graphWithBalances = graph1
)
}

private def addPublicChannel(d: Data, nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], ann: ChannelAnnouncement, fundingTxId: TxId, capacity: Satoshi, privChan_opt: Option[PrivateChannel])(implicit ctx: ActorContext, log: DiagnosticLoggingAdapter): Data = {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
val fundingOutputIndex = outputIndex(ann.shortChannelId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ class EclairImplSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with I
shortIds4.real.toOption.get.toLong -> channelId4,
)
val g = GraphWithBalanceEstimates(DirectedGraph(Nil), 1 hour)
val routerData = Router.Data(Map.empty, publicChannels, SortedMap.empty, Router.Stash(Map.empty, Map.empty), Router.Rebroadcast(Map.empty, Map.empty, Map.empty), Map.empty, privateChannels, scidMapping, Map.empty, g, Map.empty)
val routerData = Router.Data(Map.empty, publicChannels, SortedMap.empty, Router.Stash(Map.empty, Map.empty), Router.Rebroadcast(Map.empty, Map.empty, Map.empty), Map.empty, privateChannels, scidMapping, Map.empty, g, Map.empty, Map.empty)

eclair.findRoute(c, 250_000 msat, None)
val routeRequest1 = router.expectMsgType[RouteRequest]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import fr.acinq.bitcoin.scalacompat.{Satoshi, SatoshiLong}
import fr.acinq.eclair.payment.Invoice
import fr.acinq.eclair.router.Graph.GraphStructure.{DirectedGraph, GraphEdge}
import fr.acinq.eclair.router.Router.{ChannelDesc, HopRelayParams}
import fr.acinq.eclair.{CltvExpiryDelta, MilliSatoshiLong, ShortChannelId, TimestampSecond, randomKey}
import fr.acinq.eclair.{CltvExpiryDelta, MilliSatoshiLong, RealShortChannelId, ShortChannelId, TimestampSecond, randomKey}
import org.scalactic.Tolerance.convertNumericToPlusOrMinusWrapper
import org.scalatest.funsuite.AnyFunSuite

Expand Down Expand Up @@ -84,6 +84,68 @@ class BalanceEstimateSpec extends AnyFunSuite {
assert(balance2.capacities.isEmpty)
}

test("update channels after a splice") {
val a = makeEdge(0, 200 sat)
val b = makeEdge(1, 100 sat)
val unknownDesc = ChannelDesc(ShortChannelId(3), randomKey().publicKey, randomKey().publicKey)
val balance = BalanceEstimate.empty(1 day)
.addEdge(a)
.addEdge(b)
.couldNotSend(140_000 msat, TimestampSecond.now())
.couldSend(60_000 msat, TimestampSecond.now())

// a splice-in that increases channel capacity increases high but not low bounds
val balance1 = balance
.updateEdge(a.desc, RealShortChannelId(5), 250 sat)
assert(balance1.maxCapacity == 250.sat)
assert(balance1.low == 60_000.msat)
assert(balance1.high == 190_000.msat)

// a splice-in that increases channel capacity of smaller channel does not increase high more than max capacity
val balance2 = balance
.updateEdge(b.desc, RealShortChannelId(5), 300 sat)
assert(balance2.maxCapacity == 300.sat)
assert(balance2.low == 60_000.msat)
assert(balance2.high == 300_000.msat)

// a splice-out that decreases channel capacity decreases low bounds but not high bounds
val balance3 = balance
.updateEdge(a.desc, RealShortChannelId(5), 150 sat)
assert(balance3.maxCapacity == 150.sat)
assert(balance3.low == 10_000.msat)
assert(balance3.high == 140_000.msat)

// a splice-out that decreases channel capacity of largest channel does not decrease low bounds below zero
val balance4 = balance
.updateEdge(a.desc, RealShortChannelId(5), 50 sat)
assert(balance4.maxCapacity == 100.sat)
assert(balance4.low == 0.msat)
assert(balance4.high == 100_000.msat)

// a splice-out that does not decrease the largest channel only decreases low bounds
val balance5 = balance
.updateEdge(b.desc, RealShortChannelId(5), 50 sat)
assert(balance5.maxCapacity == 200.sat)
assert(balance5.low == 10_000.msat)
assert(balance5.high == 140_000.msat)

// a splice of an unknown channel that increases max capacity does not change the low/high bounds
val balance6 = balance
.updateEdge(unknownDesc, RealShortChannelId(5), 900 sat)
assert(isValid(balance6))
assert(balance6.maxCapacity == 900.sat)
assert(balance6.low == 60_000.msat)
assert(balance6.high == 140_000.msat)

// a splice of an unknown channel below max capacity does not change max capacity or low/high bounds
val balance7 = balance
.updateEdge(unknownDesc, RealShortChannelId(5), 150 sat)
assert(isValid(balance7))
assert(balance7.maxCapacity == 200.sat)
assert(balance7.low == 60_000.msat)
assert(balance7.high == 140_000.msat)
}

test("update bounds based on what could then could not be sent (increasing amounts)") {
val now = TimestampSecond.now()
val balance = BalanceEstimate.empty(1 day)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fr.acinq.eclair.router

import akka.actor.ActorSystem
import akka.testkit.{TestFSMRef, TestProbe}
import fr.acinq.eclair.blockchain.CurrentBlockHeight
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{WatchExternalChannelSpent, WatchExternalChannelSpentTriggered, WatchFundingDeeplyBuriedTriggered}
import fr.acinq.eclair.channel.states.{ChannelStateTestsBase, ChannelStateTestsTags}
import fr.acinq.eclair.channel.{CMD_CLOSE, DATA_NORMAL}
Expand All @@ -21,7 +22,7 @@ import scala.concurrent.duration.DurationInt
*/
class ChannelRouterIntegrationSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with ChannelStateTestsBase {

case class FixtureParam(router: TestFSMRef[Router.State, Router.Data, Router], rebroadcastListener: TestProbe, channels: SetupFixture, testTags: Set[String]) {
case class FixtureParam(router: TestFSMRef[Router.State, Router.Data, Router], channels: SetupFixture, testTags: Set[String]) {
//@formatter:off
/** there is only one channel here */
def privateChannel: PrivateChannel = router.stateData.privateChannels.values.head
Expand All @@ -33,14 +34,12 @@ class ChannelRouterIntegrationSpec extends TestKitBaseClass with FixtureAnyFunSu

override def withFixture(test: OneArgTest): Outcome = {
val channels = init(tags = test.tags)
val rebroadcastListener = TestProbe()
val router: TestFSMRef[Router.State, Router.Data, Router] = {
// we use alice's actor system so we share the same event stream
implicit val system: ActorSystem = channels.alice.underlying.system
system.eventStream.subscribe(rebroadcastListener.ref, classOf[Router.Rebroadcast])
TestFSMRef(new Router(channels.alice.underlyingActor.nodeParams, channels.alice.underlyingActor.blockchain, initialized = None))
}
withFixture(test.toNoArgTest(FixtureParam(router, rebroadcastListener, channels, test.tags)))
withFixture(test.toNoArgTest(FixtureParam(router, channels, test.tags)))
}

private def internalTest(f: FixtureParam): Unit = {
Expand Down Expand Up @@ -178,6 +177,7 @@ class ChannelRouterIntegrationSpec extends TestKitBaseClass with FixtureAnyFunSu
// if the channel was public, the router asked the watcher to watch the funding tx and will be notified
val watchSpentBasic = channels.alice2blockchain.expectMsgType[WatchExternalChannelSpent]
watchSpentBasic.replyTo ! WatchExternalChannelSpentTriggered(watchSpentBasic.shortChannelId)
channels.alice.underlying.system.eventStream.publish(CurrentBlockHeight(BlockHeight(400012)))
}
// router cleans up the channel
awaitAssert {
Expand Down
Loading

0 comments on commit 5fbb4a9

Please sign in to comment.