From 19c4c0f8a6a70c3190c31821c077dcd54485767e Mon Sep 17 00:00:00 2001 From: Krystian Pudlik Date: Fri, 25 Oct 2024 16:57:27 +0200 Subject: [PATCH] feat: getLastSynchronizedGlobalSnapshot support for L0NodeContext --- .../currency/l0/CurrencyL0App.scala | 5 ++- .../currency/l0/StateChannel.scala | 14 +++++-- .../currency/l0/cli/method.scala | 4 +- .../currency/l0/config/types.scala | 4 +- .../currency/l0/modules/Services.scala | 17 ++++---- .../currency/l0/node/L0NodeContext.scala | 21 ++++++++-- .../services/StateChannelBinarySender.scala | 41 +++++++++++++++---- .../StateChannelBinarySenderSuite.scala | 8 +++- .../currency/dataApplication/package.scala | 2 + 9 files changed, 85 insertions(+), 31 deletions(-) diff --git a/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/CurrencyL0App.scala b/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/CurrencyL0App.scala index 488fbd185..deb544e12 100644 --- a/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/CurrencyL0App.scala +++ b/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/CurrencyL0App.scala @@ -77,7 +77,6 @@ abstract class CurrencyL0App( p2pClient = P2PClient.make[IO](sharedP2PClient, sharedResources.client, sharedServices.session) maybeAllowanceList = StateChannelAllowanceLists.get(cfg.environment) validators = Validators.make[IO](cfg.shared, seedlist, maybeAllowanceList, Hasher.forKryo[IO]) - implicit0(nodeContext: L0NodeContext[IO]) = L0NodeContext.make[IO](storages.snapshot, hasherSelectorAlwaysCurrent) maybeMajorityPeerIds <- getMajorityPeerIds[IO]( nodeShared.prioritySeedlist, sharedConfig.priorityPeerIds, @@ -102,6 +101,8 @@ abstract class CurrencyL0App( hasherSelectorAlwaysCurrent ) .asResource + implicit0(nodeContext: L0NodeContext[IO]) = L0NodeContext + .make[IO](storages.snapshot, hasherSelectorAlwaysCurrent, services.stateChannelBinarySender, storages.lastNGlobalSnapshot) programs = Programs.make[IO, Run]( keyPair, nodeShared.nodeId, @@ -320,7 +321,7 @@ abstract class CurrencyL0App( case _ => IO.unit } _ <- StateChannel - .run[IO](services, storages, programs, dataApplicationService) + .run[IO](services, storages, programs, dataApplicationService, cfg.snapshotConfirmation) .compile .drain } yield innerProgram diff --git a/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/StateChannel.scala b/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/StateChannel.scala index 51729bb50..a0e98ef96 100644 --- a/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/StateChannel.scala +++ b/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/StateChannel.scala @@ -9,12 +9,14 @@ import scala.concurrent.duration._ import io.constellationnetwork.currency.dataApplication.BaseDataApplicationL0Service import io.constellationnetwork.currency.l0.cli.method.Run +import io.constellationnetwork.currency.l0.config.types.SnapshotConfirmationConfig import io.constellationnetwork.currency.l0.modules.{Programs, Services, Storages} import io.constellationnetwork.node.shared.domain.snapshot.Validator import io.constellationnetwork.schema.peer.PeerId import io.constellationnetwork.schema.{GlobalIncrementalSnapshot, GlobalSnapshotInfo} import io.constellationnetwork.security.{Hashed, HasherSelector} +import eu.timepit.refined.auto._ import fs2.Stream import org.typelevel.log4cats.slf4j.Slf4jLogger @@ -26,7 +28,8 @@ object StateChannel { services: Services[F, Run], storages: Storages[F], programs: Programs[F], - dataApplicationService: Option[BaseDataApplicationL0Service[F]] + dataApplicationService: Option[BaseDataApplicationL0Service[F]], + snapshotConfirmationConfig: SnapshotConfirmationConfig )(implicit S: Supervisor[F]): Stream[F, Unit] = { val logger = Slf4jLogger.getLogger[F] @@ -66,9 +69,12 @@ object StateChannel { .flatMap { context => storages.lastNGlobalSnapshot.set(snapshot, context) >> triggerOnGlobalSnapshotPullHook(snapshot, context) >> - services.stateChannelBinarySender.confirm(snapshot) >> S - .supervise(services.stateChannelBinarySender.processPending) - .void + services.stateChannelBinarySender.confirm(snapshot) >> + services.stateChannelBinarySender.getLastConfirmedWithinFixedWindow.flatMap { + _.traverse_ { confirmed => + storages.lastNGlobalSnapshot.deleteBelow(confirmed.confirmationProof.globalOrdinal) + } + } >> S.supervise(services.stateChannelBinarySender.processPending).void } }, Applicative[F].unit diff --git a/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/cli/method.scala b/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/cli/method.scala index 00fcf9f03..ce433c406 100644 --- a/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/cli/method.scala +++ b/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/cli/method.scala @@ -22,7 +22,7 @@ import io.constellationnetwork.schema.peer.L0Peer import com.monovore.decline.Opts import eu.timepit.refined.auto._ -import eu.timepit.refined.types.numeric.{NonNegLong, PosLong} +import eu.timepit.refined.types.numeric.{NonNegLong, PosInt} import fs2.io.file.Path object method { @@ -33,7 +33,7 @@ object method { def appConfig(c: AppConfigReader, shared: SharedConfig): AppConfig = AppConfig( snapshot = c.snapshot, snapshotConfirmation = SnapshotConfirmationConfig( - confirmationWindowSize = PosLong(5L) + fixedWindowSize = PosInt(5) ), globalL0Peer = globalL0Peer, peerDiscovery = c.peerDiscovery, diff --git a/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/config/types.scala b/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/config/types.scala index e8d3ee473..1460d7eee 100644 --- a/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/config/types.scala +++ b/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/config/types.scala @@ -3,7 +3,7 @@ package io.constellationnetwork.currency.l0.config import io.constellationnetwork.node.shared.config.types._ import io.constellationnetwork.schema.peer.L0Peer -import eu.timepit.refined.types.numeric.PosLong +import eu.timepit.refined.types.numeric.PosInt object types { case class AppConfigReader( @@ -30,6 +30,6 @@ object types { ) case class SnapshotConfirmationConfig( - confirmationWindowSize: PosLong + fixedWindowSize: PosInt ) } diff --git a/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/modules/Services.scala b/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/modules/Services.scala index 944e895a2..8b08bf980 100644 --- a/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/modules/Services.scala +++ b/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/modules/Services.scala @@ -61,20 +61,21 @@ object Services { jsonBrotliBinarySerializer <- JsonBrotliBinarySerializer.forSync[F] implicit0(hasher: Hasher[F]) = hasherSelector.getCurrent - l0NodeContext = L0NodeContext.make[F](storages.snapshot, hasherSelector) - - dataApplicationAcceptanceManager = (maybeDataApplication, storages.calculatedStateStorage).mapN { - case (service, storage) => - DataApplicationSnapshotAcceptanceManager.make[F](service, l0NodeContext, storage) - } - stateChannelBinarySender <- StateChannelBinarySender.make( storages.identifier, storages.globalL0Cluster, storages.lastNGlobalSnapshot, - p2PClient.stateChannelSnapshot + p2PClient.stateChannelSnapshot, + cfg.snapshotConfirmation ) + l0NodeContext = L0NodeContext.make[F](storages.snapshot, hasherSelector, stateChannelBinarySender, storages.lastNGlobalSnapshot) + + dataApplicationAcceptanceManager = (maybeDataApplication, storages.calculatedStateStorage).mapN { + case (service, storage) => + DataApplicationSnapshotAcceptanceManager.make[F](service, l0NodeContext, storage) + } + feeCalculator = FeeCalculator.make(cfg.shared.feeConfigs) stateChannelSnapshotService <- StateChannelSnapshotService diff --git a/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/node/L0NodeContext.scala b/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/node/L0NodeContext.scala index 22a3fcadf..138a393e7 100644 --- a/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/node/L0NodeContext.scala +++ b/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/node/L0NodeContext.scala @@ -2,18 +2,22 @@ package io.constellationnetwork.currency.l0.node import cats.data.OptionT import cats.effect.Async -import cats.syntax.functor._ +import cats.syntax.all._ import io.constellationnetwork.currency.dataApplication.L0NodeContext +import io.constellationnetwork.currency.l0.snapshot.services.StateChannelBinarySender +import io.constellationnetwork.currency.l0.snapshot.storage.LastNGlobalSnapshotStorage import io.constellationnetwork.currency.schema.currency.{CurrencyIncrementalSnapshot, CurrencySnapshotInfo} -import io.constellationnetwork.node.shared.domain.snapshot.storage.SnapshotStorage -import io.constellationnetwork.schema.SnapshotOrdinal +import io.constellationnetwork.node.shared.domain.snapshot.storage.{LastSnapshotStorage, SnapshotStorage} +import io.constellationnetwork.schema.{GlobalIncrementalSnapshot, GlobalSnapshotInfo, SnapshotOrdinal} import io.constellationnetwork.security.{Hashed, HasherSelector, SecurityProvider} object L0NodeContext { def make[F[_]: SecurityProvider: Async]( snapshotStorage: SnapshotStorage[F, CurrencyIncrementalSnapshot, CurrencySnapshotInfo], - hasherSelector: HasherSelector[F] + hasherSelector: HasherSelector[F], + stateChannelBinarySender: StateChannelBinarySender[F], + lastNGlobalSnapshotStorage: LastSnapshotStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo] with LastNGlobalSnapshotStorage[F] ): L0NodeContext[F] = new L0NodeContext[F] { def securityProvider: SecurityProvider[F] = SecurityProvider[F] @@ -32,5 +36,14 @@ object L0NodeContext { case (snapshot, info) => hasherSelector.forOrdinal(snapshot.ordinal)(implicit hasher => snapshot.toHashed).map((_, info)) }.value + def getLastSynchronizedGlobalSnapshot: F[Option[Hashed[GlobalIncrementalSnapshot]]] = + getLastSynchronizedGlobalSnapshotCombined.map(_.map { case (snapshot, _) => snapshot }) + + def getLastSynchronizedGlobalSnapshotCombined: F[Option[(Hashed[GlobalIncrementalSnapshot], GlobalSnapshotInfo)]] = + stateChannelBinarySender.getLastConfirmedWithinFixedWindow.flatMap { lastConfirmed => + lastConfirmed + .map(_.confirmationProof.globalOrdinal) + .flatTraverse(lastNGlobalSnapshotStorage.getCombined) + } } } diff --git a/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/snapshot/services/StateChannelBinarySender.scala b/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/snapshot/services/StateChannelBinarySender.scala index eeef325a4..9e0527dfb 100644 --- a/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/snapshot/services/StateChannelBinarySender.scala +++ b/modules/currency-l0/src/main/scala/io/constellationnetwork/currency/l0/snapshot/services/StateChannelBinarySender.scala @@ -7,6 +7,7 @@ import cats.syntax.all._ import scala.collection.immutable.Queue +import io.constellationnetwork.currency.l0.config.types.SnapshotConfirmationConfig import io.constellationnetwork.currency.l0.node.IdentifierStorage import io.constellationnetwork.node.shared.domain.cluster.storage.L0ClusterStorage import io.constellationnetwork.node.shared.domain.snapshot.storage.LastSnapshotStorage @@ -37,12 +38,14 @@ object GlobalSnapshotConfirmationProof { @derive(eqv) sealed trait TrackedBinary +@derive(eqv) case class PendingBinary( binary: Hashed[StateChannelSnapshotBinary], enqueuedAtOrdinal: SnapshotOrdinal, sendsSoFar: NonNegLong ) extends TrackedBinary +@derive(eqv) case class ConfirmedBinary( pendingBinary: PendingBinary, confirmationProof: GlobalSnapshotConfirmationProof @@ -69,9 +72,9 @@ object State { trait StateChannelBinarySender[F[_]] { def processPending: F[Unit] def clearPending: F[Unit] - def confirm(globalSnapshot: Hashed[GlobalIncrementalSnapshot]): F[Unit] def process(binaryHashed: Hashed[StateChannelSnapshotBinary]): F[Unit] + def getLastConfirmedWithinFixedWindow: F[Option[ConfirmedBinary]] } object StateChannelBinarySender { @@ -79,18 +82,29 @@ object StateChannelBinarySender { identifierStorage: IdentifierStorage[F], globalL0ClusterStorage: L0ClusterStorage[F], lastGlobalSnapshotStorage: LastSnapshotStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo], - stateChannelSnapshotClient: StateChannelSnapshotClient[F] + stateChannelSnapshotClient: StateChannelSnapshotClient[F], + snapshotConfirmationConfig: SnapshotConfirmationConfig ): F[StateChannelBinarySender[F]] = Ref .of[F, State](State.empty) - .map(make[F](_, identifierStorage, globalL0ClusterStorage, lastGlobalSnapshotStorage, stateChannelSnapshotClient)) + .map( + make[F]( + _, + identifierStorage, + globalL0ClusterStorage, + lastGlobalSnapshotStorage, + stateChannelSnapshotClient, + snapshotConfirmationConfig + ) + ) def make[F[_]: Async: Hasher]( stateR: Ref[F, State], identifierStorage: IdentifierStorage[F], globalL0ClusterStorage: L0ClusterStorage[F], lastGlobalSnapshotStorage: LastSnapshotStorage[F, GlobalIncrementalSnapshot, GlobalSnapshotInfo], - stateChannelSnapshotClient: StateChannelSnapshotClient[F] + stateChannelSnapshotClient: StateChannelSnapshotClient[F], + snapshotConfirmationConfig: SnapshotConfirmationConfig ): StateChannelBinarySender[F] = new StateChannelBinarySender[F] { @@ -125,13 +139,21 @@ object StateChannelBinarySender { case (PendingBinary(tracked, _, _), index) if confirmedHashesInGlobalSnapshot.contains(tracked.hash) => index }.maxOption - val updatedTracked = indexedTracked.map { - case (pendingBinary @ PendingBinary(tracked, enqueuedAtOrdinal, sendsSoFar), index) - if index <= maybeHighestConfirmationIndex.getOrElse(-1) => + val afterConfirmation = indexedTracked.map { + case (pendingBinary @ PendingBinary(_, _, _), index) if maybeHighestConfirmationIndex.exists(_ >= index) => ConfirmedBinary(pendingBinary, GlobalSnapshotConfirmationProof.fromGlobalSnapshot(globalSnapshot)) case (other, _) => other } + val maybeLowestConfirmationIndexWithinFixedWindow = maybeHighestConfirmationIndex + .map(_ - snapshotConfirmationConfig.fixedWindowSize) + .filter(_ > 0) + + val updatedTracked = maybeLowestConfirmationIndexWithinFixedWindow match { + case Some(outsideWindow) => afterConfirmation.drop(outsideWindow) + case None => afterConfirmation + } + val updatedRetryMode = { val hasStalled = updatedTracked.exists { @@ -273,5 +295,10 @@ object StateChannelBinarySender { } ).void } + + def getLastConfirmedWithinFixedWindow: F[Option[ConfirmedBinary]] = stateR.get.map { state => + val confirmed = state.tracked.toList.collect { case confirmed @ ConfirmedBinary(_, _) => confirmed } + confirmed.headOption.filter(_ => confirmed.length > snapshotConfirmationConfig.fixedWindowSize) + } } } diff --git a/modules/currency-l0/src/test/scala/io/constellationnetwork/currency/l0/snapshot/services/StateChannelBinarySenderSuite.scala b/modules/currency-l0/src/test/scala/io/constellationnetwork/currency/l0/snapshot/services/StateChannelBinarySenderSuite.scala index 4754e47aa..3cd70d8a8 100644 --- a/modules/currency-l0/src/test/scala/io/constellationnetwork/currency/l0/snapshot/services/StateChannelBinarySenderSuite.scala +++ b/modules/currency-l0/src/test/scala/io/constellationnetwork/currency/l0/snapshot/services/StateChannelBinarySenderSuite.scala @@ -8,6 +8,7 @@ import cats.syntax.all._ import scala.collection.immutable.SortedMap +import io.constellationnetwork.currency.l0.config.types.SnapshotConfirmationConfig import io.constellationnetwork.currency.l0.node.IdentifierStorage import io.constellationnetwork.currency.schema.currency.SnapshotFee import io.constellationnetwork.ext.cats.effect.ResourceIO @@ -34,7 +35,7 @@ import io.constellationnetwork.statechannel.StateChannelSnapshotBinary import com.comcast.ip4s.{Host, Port} import eu.timepit.refined.auto._ -import eu.timepit.refined.types.numeric.NonNegLong +import eu.timepit.refined.types.numeric.{NonNegLong, PosInt} import org.scalacheck.Gen import weaver.MutableIOSuite import weaver.scalacheck.Checkers @@ -99,6 +100,8 @@ object StateChannelBinarySenderSuite extends MutableIOSuite with Checkers { def getHeight: StateChannelBinarySenderSuite.F[Option[height.Height]] = ??? } + snapshotConfirmationConfig = SnapshotConfirmationConfig(PosInt.MaxValue) + postedRef <- Ref.of[IO, List[Hashed[StateChannelSnapshotBinary]]](List.empty) stateChannelSnapshotClient = new StateChannelSnapshotClient[F] { def send( @@ -117,7 +120,8 @@ object StateChannelBinarySenderSuite extends MutableIOSuite with Checkers { identifierStorage, globalL0ClusterStorage, lastSnapshotStorage, - stateChannelSnapshotClient + stateChannelSnapshotClient, + snapshotConfirmationConfig ) } yield (sender, stateRef, postedRef) diff --git a/modules/shared/src/main/scala/io/constellationnetwork/currency/dataApplication/package.scala b/modules/shared/src/main/scala/io/constellationnetwork/currency/dataApplication/package.scala index b14aecfc0..a5c0c7dbd 100644 --- a/modules/shared/src/main/scala/io/constellationnetwork/currency/dataApplication/package.scala +++ b/modules/shared/src/main/scala/io/constellationnetwork/currency/dataApplication/package.scala @@ -539,6 +539,8 @@ trait L1NodeContext[F[_]] { } trait L0NodeContext[F[_]] { + def getLastSynchronizedGlobalSnapshot: F[Option[Hashed[GlobalIncrementalSnapshot]]] + def getLastSynchronizedGlobalSnapshotCombined: F[Option[(Hashed[GlobalIncrementalSnapshot], GlobalSnapshotInfo)]] def getLastCurrencySnapshot: F[Option[Hashed[CurrencyIncrementalSnapshot]]] def getCurrencySnapshot(ordinal: SnapshotOrdinal): F[Option[Hashed[CurrencyIncrementalSnapshot]]] def getLastCurrencySnapshotCombined: F[Option[(Hashed[CurrencyIncrementalSnapshot], CurrencySnapshotInfo)]]