diff --git a/.bsp/sbt.json b/.bsp/sbt.json deleted file mode 100644 index 952c87ae..00000000 --- a/.bsp/sbt.json +++ /dev/null @@ -1 +0,0 @@ -{"name":"sbt","version":"1.5.5","bspVersion":"2.0.0-M5","languages":["scala"],"argv":["/Users/oskin/.sdkman/candidates/java/17.0.1-ms/bin/java","-Xms100m","-Xmx100m","-classpath","/Users/oskin/Library/Application Support/JetBrains/IntelliJIdea2021.2/plugins/Scala/launcher/sbt-launch.jar","xsbt.boot.Boot","-bsp","--sbt-launch-jar=/Users/oskin/Library/Application%20Support/JetBrains/IntelliJIdea2021.2/plugins/Scala/launcher/sbt-launch.jar"]} \ No newline at end of file diff --git a/.gitignore b/.gitignore index 049c42d9..e1abcac4 100644 --- a/.gitignore +++ b/.gitignore @@ -17,4 +17,4 @@ project/metals.sbt config.env -/.bsp/ \ No newline at end of file +.bsp/ \ No newline at end of file diff --git a/build.sbt b/build.sbt index 1b574dc9..7f12328c 100644 --- a/build.sbt +++ b/build.sbt @@ -158,7 +158,7 @@ lazy val ammExecutor = utils ) .settings(nativePackagerSettings("amm-executor")) .enablePlugins(JavaAppPackaging, UniversalPlugin, DockerPlugin) - .dependsOn(Seq(core, http).map(_ % allConfigDependency): _*) + .dependsOn(Seq(core, http, cache).map(_ % allConfigDependency): _*) lazy val poolResolver = utils .mkModule("pool-resolver", "PoolResolver") diff --git a/modules/amm-executor/src/main/resources/application.conf b/modules/amm-executor/src/main/resources/application.conf index e4dbe859..f72ea6b7 100644 --- a/modules/amm-executor/src/main/resources/application.conf +++ b/modules/amm-executor/src/main/resources/application.conf @@ -2,7 +2,9 @@ rotation.retry-delay = 120s exchange.reward-address = "9gCigPc9cZNRhKgbgdmTkVxo1ZKgw79G8DvLjCcYWAvEF3XRUKy" -execution.order-lifetime = 300s +backlogConfig.order-lifetime = 300s +backlogConfig.order-execution-time = 180s +backlogConfig.suspended-probability = 10 monetary.miner-fee = 2000000 monetary.min-dex-fee = 1000000 diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala index ada1e081..e8784514 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala @@ -6,16 +6,17 @@ import fs2.kafka.RecordDeserializer import fs2.kafka.serde._ import org.ergoplatform.ErgoAddressEncoder import org.ergoplatform.common.EnvApp +import org.ergoplatform.common.cache.{Cache, CacheStreaming, MakeRedisTransaction, Redis} import org.ergoplatform.common.streaming._ import org.ergoplatform.dex.configs.ConsumerConfig -import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId} +import org.ergoplatform.dex.domain.amm.{CFMMOrder, EvaluatedCFMMOrder, OrderId} import org.ergoplatform.dex.executor.amm.config.ConfigBundle import org.ergoplatform.dex.executor.amm.context.AppContext import org.ergoplatform.dex.executor.amm.interpreters.{CFMMInterpreter, N2TCFMMInterpreter, T2TCFMMInterpreter} import org.ergoplatform.dex.executor.amm.processes.Executor -import org.ergoplatform.dex.executor.amm.repositories.CFMMPools -import org.ergoplatform.dex.executor.amm.services.Execution -import org.ergoplatform.dex.executor.amm.streaming.{CFMMCircuit, CFMMConsumerIn, CFMMConsumerRetries, CFMMProducerRetries} +import org.ergoplatform.dex.executor.amm.repositories.{CFMMOrders, CFMMPools} +import org.ergoplatform.dex.executor.amm.services.{CFMMBacklog, Execution} +import org.ergoplatform.dex.executor.amm.streaming.{CFMMCircuit, CFMMConsumerIn, CFMMConsumerRetries, CFMMHistConsumer, CFMMProducerRetries} import org.ergoplatform.dex.protocol.amm.AMMType.{CFMMType, N2T_CFMM, T2T_CFMM} import org.ergoplatform.ergo.modules.ErgoNetwork import org.ergoplatform.ergo.services.explorer.{ErgoExplorer, ErgoExplorerStreaming} @@ -27,6 +28,7 @@ import sttp.client3.asynchttpclient.cats.AsyncHttpClientCatsBackend import sttp.client3.asynchttpclient.fs2.AsyncHttpClientFs2Backend import tofu.WithRun import tofu.fs2Instances._ +import tofu.generate.GenRandom import tofu.lift.IsoK import tofu.syntax.unlift._ import zio.interop.catz._ @@ -40,10 +42,13 @@ object App extends EnvApp[AppContext] { appF.run(ctx) as ExitCode.success }.orDie + implicit val mtx: MakeRedisTransaction[RunF] = MakeRedisTransaction.make[RunF] + private def init(configPathOpt: Option[String]): Resource[InitF, (Executor[StreamF], AppContext)] = for { blocker <- Blocker[InitF] configs <- Resource.eval(ConfigBundle.load[InitF](configPathOpt, blocker)) + implicit0(genRand: GenRandom[RunF]) <- Resource.eval(GenRandom.instance[InitF, RunF]()) ctx = AppContext.init(configs) implicit0(isoKRun: IsoK[RunF, InitF]) = isoKRunByContext(ctx) implicit0(e: ErgoAddressEncoder) = ErgoAddressEncoder(configs.protocol.networkType.prefix) @@ -51,6 +56,8 @@ object App extends EnvApp[AppContext] { makeConsumer[OrderId, Confirmed[CFMMOrder]](configs.consumers.confirmedOrders) implicit0(unconfirmedOrders: CFMMConsumerIn[StreamF, RunF, Unconfirmed]) = makeConsumer[OrderId, Unconfirmed[CFMMOrder]](configs.consumers.unconfirmedOrders) + implicit0(ammHistCons: CFMMHistConsumer[StreamF, RunF]) = + makeConsumer[OrderId, Option[EvaluatedCFMMOrder.Any]](configs.consumers.cfmmHistory) implicit0(consumerRetries: CFMMConsumerRetries[StreamF, RunF]) = makeConsumer[OrderId, Delayed[CFMMOrder]](configs.consumers.ordersRetry) implicit0(orders: CFMMConsumerIn[StreamF, RunF, Id]) = @@ -66,8 +73,14 @@ object App extends EnvApp[AppContext] { implicit0(t2tInt: CFMMInterpreter[T2T_CFMM, RunF]) <- Resource.eval(T2TCFMMInterpreter.make[InitF, RunF]) implicit0(n2tInt: CFMMInterpreter[N2T_CFMM, RunF]) <- Resource.eval(N2TCFMMInterpreter.make[InitF, RunF]) implicit0(interpreter: CFMMInterpreter[CFMMType, RunF]) = CFMMInterpreter.make[RunF] - implicit0(execution: Execution[RunF]) <- Resource.eval(Execution.make[InitF, RunF]) - executor <- Resource.eval(Executor.make[InitF, StreamF, RunF]) + implicit0(execution: Execution[RunF]) <- Resource.eval(Execution.make[InitF, RunF]) + implicit0(redis: Redis.Plain[RunF]) <- Redis.make[InitF, RunF](configs.redis) + implicit0(cache: Cache[RunF]) <- Resource.eval(Cache.make[InitF, RunF]) + implicit0(cfmmOrders: CFMMOrders[RunF]) <- Resource.eval[InitF, CFMMOrders[RunF]](CFMMOrders.make[InitF, RunF]) + implicit0(cacheStreaming: CacheStreaming[StreamF]) <- Resource.eval(CacheStreaming.make[InitF, StreamF, RunF]) + implicit0(cfmmBacklog: CFMMBacklog[RunF]) <- + Resource.eval[InitF, CFMMBacklog[RunF]](CFMMBacklog.make[InitF, StreamF, RunF]) + executor <- Resource.eval(Executor.make[InitF, StreamF, RunF]) } yield executor -> ctx private def makeBackend( diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/BacklogConfig.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/BacklogConfig.scala new file mode 100644 index 00000000..ffedb3ad --- /dev/null +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/BacklogConfig.scala @@ -0,0 +1,17 @@ +package org.ergoplatform.dex.executor.amm.config + +import derevo.derive +import derevo.pureconfig.pureconfigReader +import tofu.Context +import tofu.logging.derivation.loggable + +import scala.concurrent.duration.FiniteDuration + +@derive(pureconfigReader, loggable) +final case class BacklogConfig( + orderLifetime: FiniteDuration, + orderExecutionTime: FiniteDuration, + suspendedOrdersExecutionProbabilityPercent: Int +) + +object BacklogConfig extends Context.Companion[BacklogConfig] diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ConfigBundle.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ConfigBundle.scala index 6a534573..6e98bb8c 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ConfigBundle.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ConfigBundle.scala @@ -2,11 +2,12 @@ package org.ergoplatform.dex.executor.amm.config import derevo.derive import derevo.pureconfig.pureconfigReader +import org.ergoplatform.common.cache.RedisConfig import org.ergoplatform.common.streaming.RotationConfig import org.ergoplatform.dex.configs._ import tofu.Context import tofu.logging.derivation.loggable -import tofu.optics.macros.{promote, ClassyOptics} +import tofu.optics.macros.{ClassyOptics, promote} @derive(pureconfigReader, loggable) @ClassyOptics @@ -16,7 +17,9 @@ final case class ConfigBundle( @promote execution: ExecutionConfig, @promote monetary: MonetaryConfig, @promote protocol: ProtocolConfig, + @promote backlogConfig: BacklogConfig, consumers: Consumers, + redis: RedisConfig, producers: Producers, @promote kafka: KafkaConfig, @promote network: NetworkConfig, diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/Consumers.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/Consumers.scala index a28ff859..89218ca5 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/Consumers.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/Consumers.scala @@ -8,6 +8,7 @@ import tofu.logging.derivation.loggable @derive(pureconfigReader, loggable) final case class Consumers( confirmedOrders: ConsumerConfig, + cfmmHistory: ConsumerConfig, unconfirmedOrders: ConsumerConfig, ordersRetry: ConsumerConfig ) diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ExecutionConfig.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ExecutionConfig.scala index 1ae0489b..3f8124be 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ExecutionConfig.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ExecutionConfig.scala @@ -8,6 +8,6 @@ import tofu.logging.derivation.loggable import scala.concurrent.duration.FiniteDuration @derive(pureconfigReader, loggable) -final case class ExecutionConfig(orderLifetime: FiniteDuration) +final case class ExecutionConfig(order: FiniteDuration) object ExecutionConfig extends Context.Companion[ExecutionConfig] \ No newline at end of file diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala deleted file mode 100644 index 41a413ce..00000000 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala +++ /dev/null @@ -1,18 +0,0 @@ -package org.ergoplatform.dex.executor.amm.modules - -import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId} - -trait CFMMBacklog[F[_]] { - - /** Put an order to the backlog. - */ - def put(order: CFMMOrder): F[Unit] - - /** Get candidate order for execution. Blocks until an order is available. - */ - def get: F[CFMMOrder] - - /** Put an order from the backlog. - */ - def drop(id: OrderId): F[Unit] -} diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Executor.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Executor.scala index 747ee2cb..1462e98b 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Executor.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Executor.scala @@ -1,21 +1,22 @@ package org.ergoplatform.dex.executor.amm.processes -import cats.effect.Clock +import cats.effect.{Clock, Timer} import cats.syntax.option._ -import cats.{Functor, Monad} +import cats.syntax.traverse._ +import cats.{Defer, Functor, Monad, SemigroupK} import derevo.derive import mouse.any._ import org.ergoplatform.common.TraceId import org.ergoplatform.common.streaming.syntax._ import org.ergoplatform.dex.domain.amm.CFMMOrder import org.ergoplatform.dex.executor.amm.config.ExecutionConfig -import org.ergoplatform.dex.executor.amm.services.Execution -import org.ergoplatform.dex.executor.amm.streaming.CFMMCircuit +import org.ergoplatform.dex.executor.amm.services.{CFMMBacklog, Execution} +import org.ergoplatform.dex.executor.amm.streaming.{CFMMCircuit, CFMMHistConsumer} import org.ergoplatform.ergo.services.explorer.TxSubmissionErrorParser import tofu.Catches import tofu.higherKind.derived.representableK import tofu.logging.{Logging, Logs} -import tofu.streams.Evals +import tofu.streams.{Evals, ParFlatten} import tofu.syntax.context._ import tofu.syntax.embed._ import tofu.syntax.handle._ @@ -34,10 +35,12 @@ object Executor { def make[ I[_]: Functor, - F[_]: Monad: Evals[*[_], G]: ExecutionConfig.Has, - G[_]: Monad: TraceId.Local: Clock: Catches + F[_]: Monad: Evals[*[_], G]: ExecutionConfig.Has: Defer: SemigroupK: ParFlatten, + G[_]: Monad: TraceId.Local: Clock: Catches: Timer ](implicit orders: CFMMCircuit[F, G], + executedOrders: CFMMHistConsumer[F, G], + cfmmBacklog: CFMMBacklog[G], service: Execution[G], logs: Logs[I, G] ): I[Executor[F]] = @@ -48,34 +51,56 @@ object Executor { } final private class Live[ - F[_]: Monad: Evals[*[_], G], - G[_]: Monad: Logging: TraceId.Local: Clock: Catches + F[_]: Monad: Evals[*[_], G]: Defer: SemigroupK: ParFlatten, + G[_]: Monad: Logging: Catches: Timer ](conf: ExecutionConfig)(implicit orders: CFMMCircuit[F, G], + executedOrders: CFMMHistConsumer[F, G], + backlog: CFMMBacklog[G], service: Execution[G], errParser: TxSubmissionErrorParser ) extends Executor[F] { def run: F[Unit] = + emits( + List( + addToBacklog, + executeOrders, + dropExecuted + ) + ).parFlattenUnbounded + + def addToBacklog: F[Unit] = orders.stream - .evalMap { rec => - service - .executeAttempt(rec.message) - .handleWith[Throwable](e => warnCause"Order execution failed fatally" (e) as none[CFMMOrder]) - .local(_ => TraceId.fromString(rec.message.id.value)) - .tupleLeft(rec) + .evalTap(orderRec => + backlog.put(orderRec.message) + .handleWith[Throwable](e => warnCause"Attempt to add order to backlog failed." (e)) + ) + .evalMap(_.commit) + + def dropExecuted: F[Unit] = + executedOrders.stream + .evalTap { rec => + rec.message.traverse(order => + backlog.drop(order.order.id) + .handleWith[Throwable](e => warnCause"Attempt to drop order from backlog failed." (e)) + ) } - .flatTap { - case (_, None) => unit[F] - case (_, Some(order)) => - eval(now.millis) >>= { - case ts if ts - order.timestamp < conf.orderLifetime.toMillis => - eval(warn"Failed to execute $order. Going to retry.") >> - orders.retry((order.id -> order).pure[F]) - case _ => - eval(warn"Failed to execute $order. Order expired.") - } + .evalMap(_.commit) + + def executeOrders: F[Unit] = + eval(backlog.get).evalMap { + case Some(order) => executeOrder(order) + case None => trace"No orders to execute. Going to wait for" >> Timer[G].sleep(conf.order) + }.repeat + + private def executeOrder(order: CFMMOrder): G[Unit] = + service + .executeAttempt(order) + .handleWith[Throwable](e => warnCause"Order execution failed fatally" (e) as none[CFMMOrder]) + .flatMap { + case Some(order) => backlog.suspend(order) + case None => backlog.checkLater(order) } - .evalMap { case (rec, _) => rec.commit } } } diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/repositories/CFMMOrders.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/repositories/CFMMOrders.scala new file mode 100644 index 00000000..be7461c1 --- /dev/null +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/repositories/CFMMOrders.scala @@ -0,0 +1,80 @@ +package org.ergoplatform.dex.executor.amm.repositories + +import cats.{FlatMap, Functor} +import derevo.derive +import org.ergoplatform.common.cache.Cache +import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId} +import org.ergoplatform.dex.executor.amm.repositories.CFMMPools.{CFMMPoolsTracing, Live} +import tofu.higherKind.Mid +import tofu.higherKind.derived.representableK +import tofu.logging.{Logging, Logs} +import tofu.syntax.logging._ +import tofu.syntax.monadic._ + +@derive(representableK) +trait CFMMOrders[F[_]] { + + def put(order: CFMMOrder): F[Unit] + + def exists(orderId: OrderId): F[Boolean] + + def drop(orderId: OrderId): F[Unit] + + def get(orderId: OrderId): F[Option[CFMMOrder]] + + def getAll: F[List[CFMMOrder]] +} + +object CFMMOrders { + + def make[I[_]: Functor, F[_]: FlatMap](implicit logs: Logs[I, F], cache: Cache[F]): I[CFMMOrders[F]] = + logs.forService[CFMMOrders[F]].map { implicit logging => + new CFMMOrdersTracingMid[F] attach new Live[F](cache) + } + + final private class Live[F[_]](cache: Cache[F]) extends CFMMOrders[F] { + + def put(order: CFMMOrder): F[Unit] = cache.set(order.id, order) + + def exists(orderId: OrderId): F[Boolean] = cache.exists(orderId) + + def drop(orderId: OrderId): F[Unit] = cache.del(orderId) + + def get(orderId: OrderId): F[Option[CFMMOrder]] = cache.get[OrderId, CFMMOrder](orderId) + + def getAll: F[List[CFMMOrder]] = cache.getAll + } + + final private class CFMMOrdersTracingMid[F[_]: FlatMap: Logging] extends CFMMOrders[Mid[F, *]] { + + def put(order: CFMMOrder): Mid[F, Unit] = for { + _ <- trace"put(order=$order)" + r <- _ + _ <- trace"put(order=$order) -> $r" + } yield r + + def exists(orderId: OrderId): Mid[F, Boolean] = for { + _ <- trace"exists(orderId=$orderId)" + r <- _ + _ <- trace"exists(orderId=$orderId) -> $r" + } yield r + + def drop(orderId: OrderId): Mid[F, Unit] = for { + _ <- trace"drop(orderId=$orderId)" + r <- _ + _ <- trace"drop(orderId=$orderId) -> $r" + } yield r + + def get(orderId: OrderId): Mid[F, Option[CFMMOrder]] = for { + _ <- trace"checkLater(order=$orderId)" + r <- _ + _ <- trace"checkLater(order=$orderId) -> $r" + } yield r + + def getAll: Mid[F, List[CFMMOrder]] = for { + _ <- trace"getAll()" + r <- _ + _ <- trace"getAll() -> length: ${r.length}" + } yield r + } +} diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/services/CFMMBacklog.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/services/CFMMBacklog.scala new file mode 100644 index 00000000..00ce932a --- /dev/null +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/services/CFMMBacklog.scala @@ -0,0 +1,211 @@ +package org.ergoplatform.dex.executor.amm.services + +import cats.effect.{Clock, Sync} +import cats.{FlatMap, Monad} +import derevo.derive +import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId, WeightedOrder} +import org.ergoplatform.dex.executor.amm.repositories.CFMMOrders +import tofu.concurrent.{Atom, MakeAtom} +import cats.syntax.traverse._ +import cats.syntax.option._ +import org.ergoplatform.common.cache.CacheStreaming +import org.ergoplatform.dex.executor.amm.config.BacklogConfig +import tofu.concurrent.MakeAtom._ +import tofu.generate.GenRandom +import tofu.higherKind.Mid +import tofu.higherKind.derived.representableK +import tofu.lift.IsoK +import tofu.logging.{Logging, Logs} +import tofu.streams.{Compile, Evals} +import tofu.syntax.logging._ +import tofu.syntax.monadic._ +import tofu.syntax.streams.compile._ +import tofu.syntax.embed._ +import tofu.syntax.time.now +import tofu.syntax.streams.evals._ + +import scala.collection.mutable + +@derive(representableK) +trait CFMMBacklog[F[_]] { + + /** Put an order to the backlog. + */ + def put(order: CFMMOrder): F[Unit] + + /** Put an order with priceTooHigh or priceTooLow execution err + */ + def suspend(order: CFMMOrder): F[Unit] + + /** Put possibly executed order to the backlog + */ + def checkLater(order: CFMMOrder): F[Unit] + + /** Get candidate order for execution. Blocks until an order is available. + */ + def get: F[Option[CFMMOrder]] + + /** Put an order from the backlog. + */ + def drop(id: OrderId): F[Unit] +} + +object CFMMBacklog { + + def make[ + I[_]: Sync, + F[_]: Monad: Evals[*[_], G]: Compile[*[_], G], + G[_]: Sync: BacklogConfig.Has: Clock: GenRandom + ](implicit + logs: Logs[I, G], + cfmmOrders: CFMMOrders[G], + isoKGI: IsoK[G, I], + streamingCache: CacheStreaming[F] + ): I[CFMMBacklog[G]] = for { + implicit0(logging: Logging[G]) <- logs.forService[CFMMBacklog[G]] + pendingQueueRef <- MakeAtom[I, G].of(mutable.PriorityQueue.empty[WeightedOrder]) + suspendedQueueRef <- MakeAtom[I, G].of(mutable.PriorityQueue.empty[WeightedOrder]) + revisitOrders <- MakeAtom[I, G].of(List.empty[WeightedOrder]) + _ <- isoKGI.to(recoverPendingQueue[F, G](pendingQueueRef, streamingCache).drain) + } yield BacklogConfig.access + .map(cfg => + new CFMMBacklogTracingMid[G] attach new Live[G]( + pendingQueueRef, + suspendedQueueRef, + revisitOrders, + cfmmOrders, + cfg + ) + ) + .embed + + final private class Live[F[_]: Monad: Clock: GenRandom]( + pendingQueueRef: Atom[F, mutable.PriorityQueue[WeightedOrder]], + suspendedQueueRef: Atom[F, mutable.PriorityQueue[WeightedOrder]], + revisitOrders: Atom[F, List[WeightedOrder]], + cfmmOrders: CFMMOrders[F], + backlogConfig: BacklogConfig + ) extends CFMMBacklog[F] { + + /** Put an order to the backlog. + */ + def put(order: CFMMOrder): F[Unit] = + cfmmOrders.put(order) >> pendingQueueRef.update(_ += WeightedOrder.fromOrder(order)) + + /** Put an order with priceTooHigh or priceTooLow execution err + */ + def suspend(order: CFMMOrder): F[Unit] = + suspendedQueueRef.update(_ += WeightedOrder.fromOrder(order)) + + /** Put possibly executed order to the backlog + */ + def checkLater(order: CFMMOrder): F[Unit] = + revisitOrders.update(_ :+ WeightedOrder.fromOrder(order)) + + /** Get candidate order for execution. Blocks until an order is available. + */ + def get: F[Option[CFMMOrder]] = for { + _ <- filterRevisit + random <- GenRandom.nextInt[F](100) + order <- + if (random > backlogConfig.suspendedOrdersExecutionProbabilityPercent) + getMaxOrderFromQueue(pendingQueueRef) + else + getMaxOrderFromQueue(suspendedQueueRef) + } yield order + + /** Put an order from the backlog. + */ + def drop(id: OrderId): F[Unit] = + cfmmOrders.drop(id) + + def getMaxOrderFromQueue(queue: Atom[F, mutable.PriorityQueue[WeightedOrder]]): F[Option[CFMMOrder]] = + for { + maxId <- queue.modify { queue => + val elem = if (queue.isEmpty) none else queue.dequeue().some + (queue, elem) + } + time <- now.millis + suspendedElem <- maxId match { + case Some(value) if (time - value.timestamp) < backlogConfig.orderLifetime.toMillis => + cfmmOrders.get(value.orderId) >>= { + case Some(value) => value.some.pure[F] + case None => getMaxOrderFromQueue(queue) + } + case Some(value) => + cfmmOrders.drop(value.orderId) >> getMaxOrderFromQueue(queue) + case None => none.pure[F] + } + } yield suspendedElem + + def filterRevisit: F[Unit] = for { + curTime <- now.millis + possible2pending <- + revisitOrders.modify(_.span(wOrd => (curTime - wOrd.timestamp < backlogConfig.orderExecutionTime.toMillis))) + _ <- possible2pending.traverse { + case ord if (curTime - ord.timestamp) > backlogConfig.orderLifetime.toMillis => + drop(ord.orderId) + case ord => + pendingQueueRef.update(_ += ord) + } + } yield () + } + + private def recoverPendingQueue[F[_]: Evals[*[_], G]: Monad, G[_]: Monad: Clock: BacklogConfig.Has]( + pendingQueue: Atom[G, mutable.PriorityQueue[WeightedOrder]], + cfmmOrders: CacheStreaming[F] + ): F[Unit] = for { + cfg <- eval(BacklogConfig.access) + order <- cfmmOrders.getAll[CFMMOrder] + curTime <- eval(now.millis) + _ <- eval( + if (curTime - order.timestamp < cfg.orderLifetime.toMillis) + pendingQueue.update(_ += WeightedOrder.fromOrder(order)) + else + ().pure[G] + ) + } yield () + + final private class CFMMBacklogTracingMid[F[_]: FlatMap: Logging] extends CFMMBacklog[Mid[F, *]] { + + /** Put an order to the backlog. + */ + override def put(order: CFMMOrder): Mid[F, Unit] = for { + _ <- trace"put(order=$order)" + r <- _ + _ <- trace"put(order=$order) -> $r" + } yield r + + /** Put an order with priceTooHigh or priceTooLow execution err + */ + override def suspend(order: CFMMOrder): Mid[F, Unit] = for { + _ <- trace"suspend(order=$order)" + r <- _ + _ <- trace"suspend(order=$order) -> $r" + } yield r + + /** Put possibly executed order to the backlog + */ + override def checkLater(order: CFMMOrder): Mid[F, Unit] = for { + _ <- trace"checkLater(order=$order)" + r <- _ + _ <- trace"checkLater(order=$order) -> $r" + } yield r + + /** Get candidate order for execution. Blocks until an order is available. + */ + override def get: Mid[F, Option[CFMMOrder]] = for { + _ <- trace"get()" + r <- _ + _ <- trace"get() -> $r" + } yield r + + /** Put an order from the backlog. + */ + override def drop(id: OrderId): Mid[F, Unit] = for { + _ <- trace"drop(id=$id)" + r <- _ + _ <- trace"drop(id=$id) -> $r" + } yield r + } +} diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/streaming.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/streaming.scala index 8ee7e122..b29fbe3f 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/streaming.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/streaming.scala @@ -2,13 +2,16 @@ package org.ergoplatform.dex.executor.amm import fs2.kafka.types.KafkaOffset import org.ergoplatform.common.streaming._ -import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId} +import org.ergoplatform.dex.domain.amm.{CFMMOrder, EvaluatedCFMMOrder, OrderId} +import org.ergoplatform.ergo.BoxId object streaming { - type CFMMConsumerIn[F[_], G[_], Status[_]] = Consumer.Aux[OrderId, Status[CFMMOrder], KafkaOffset, F, G] - type CFMMConsumerRetries[F[_], G[_]] = Consumer.Aux[OrderId, Delayed[CFMMOrder], KafkaOffset, F, G] - type CFMMProducerRetries[F[_]] = Producer[OrderId, Delayed[CFMMOrder], F] + type CFMMConsumerIn[F[_], G[_], Status[_]] = Consumer.Aux[OrderId, Status[CFMMOrder], KafkaOffset, F, G] + type CFMMConsumerInId[F[_], G[_], Status[_]] = Consumer.Aux[OrderId, Status[BoxId], KafkaOffset, F, G] + type CFMMConsumerRetries[F[_], G[_]] = Consumer.Aux[OrderId, Delayed[CFMMOrder], KafkaOffset, F, G] + type CFMMHistConsumer[F[_], G[_]] = Consumer.Aux[OrderId, Option[EvaluatedCFMMOrder.Any], KafkaOffset, F, G] + type CFMMProducerRetries[F[_]] = Producer[OrderId, Delayed[CFMMOrder], F] type CFMMCircuit[F[_], G[_]] = StreamingCircuit[OrderId, CFMMOrder, F, G] } diff --git a/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/configs/package.scala b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/configs/package.scala new file mode 100644 index 00000000..d2875912 --- /dev/null +++ b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/configs/package.scala @@ -0,0 +1,40 @@ +package org.ergoplatform.dex.executor.amm + +import cats.Functor +import cats.effect.IO +import org.ergoplatform.dex.executor.amm.config.BacklogConfig +import tofu.WithContext + +import scala.concurrent.duration._ + +package object configs { + + val backlogCfgWithNoSuspended = BacklogConfig( + orderLifetime = 120.seconds, + orderExecutionTime = 10.seconds, + suspendedOrdersExecutionProbabilityPercent = -1 + ) + + val backlogCfgWithOnlySuspended = BacklogConfig( + orderLifetime = 120.seconds, + orderExecutionTime = 10.seconds, + suspendedOrdersExecutionProbabilityPercent = 100 + ) + + object has { + + val cfgWithNoSuspended: BacklogConfig.Has[IO] = new WithContext[IO, BacklogConfig] { + + override def functor: Functor[IO] = Functor[IO] + + override def context: IO[BacklogConfig] = IO.pure(backlogCfgWithNoSuspended) + } + + val cfgWithOnlySuspended: BacklogConfig.Has[IO] = new WithContext[IO, BacklogConfig] { + + override def functor: Functor[IO] = Functor[IO] + + override def context: IO[BacklogConfig] = IO.pure(backlogCfgWithOnlySuspended) + } + } +} diff --git a/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/generators/CFMMOrdersGenerator.scala b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/generators/CFMMOrdersGenerator.scala new file mode 100644 index 00000000..2aa3a6c7 --- /dev/null +++ b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/generators/CFMMOrdersGenerator.scala @@ -0,0 +1,31 @@ +package org.ergoplatform.dex.executor.amm.generators + +import cats.effect.Sync +import org.ergoplatform.dex.domain.amm +import org.ergoplatform.dex.domain.amm.CFMMOrder +import org.ergoplatform.dex.executor.amm.repositories.CFMMOrders +import tofu.concurrent.MakeAtom +import tofu.syntax.monadic._ + +object CFMMOrdersGenerator { + + def genMapBased[I[_]: Sync, F[_]: Sync]: I[CFMMOrders[F]] = for { + map <- MakeAtom[I, F].of(Map.empty[amm.OrderId, CFMMOrder]) + } yield (new CFMMOrders[F] { + + override def put(order: CFMMOrder): F[Unit] = + map.update(_ + (order.id -> order)) + + override def exists(orderId: amm.OrderId): F[Boolean] = + map.get.map(_.exists(_._1 == orderId)) + + override def drop(orderId: amm.OrderId): F[Unit] = + map.update(_.filter(_._1 != orderId)) + + override def get(orderId: amm.OrderId): F[Option[CFMMOrder]] = + map.get.map(_.get(orderId)) + + override def getAll: F[List[CFMMOrder]] = + map.get.map(_.values.toList) + }) +} diff --git a/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/generators/CacheStreamingGenerator.scala b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/generators/CacheStreamingGenerator.scala new file mode 100644 index 00000000..71171e46 --- /dev/null +++ b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/generators/CacheStreamingGenerator.scala @@ -0,0 +1,38 @@ +package org.ergoplatform.dex.executor.amm.generators + +import cats.effect.IO +import org.ergoplatform.common.cache.CacheStreaming +import org.ergoplatform.dex.domain.amm.CFMMOrder +import org.ergoplatform.dex.executor.amm.services.StreamF +import scodec.Codec +import tofu.logging.Loggable +import fs2.Stream +import org.ergoplatform.common.cache.errors.{BinaryDecodingFailed, BinaryEncodingFailed} +import cats.syntax.either._ +import scodec.bits.BitVector +import tofu.syntax.raise._ + +object CacheStreamingGenerator { + + def cacheStreamingFor(orders: List[CFMMOrder]): CacheStreaming[StreamF] = new CacheStreaming[StreamF] { + + override def getAll[V: Codec: Loggable]: StreamF[V] = + Stream + .emits[IO, CFMMOrder](orders) + .evalMap(elem => + implicitly(Codec[CFMMOrder] + .encode(elem) + .toEither + .leftMap(err => BinaryEncodingFailed(elem.toString, err.messageWithContext)) + .toRaise[IO]) + ) + .evalMap(bytes => + Codec[V] + .decode(bytes) + .toEither + .map(_.value) + .leftMap(err => BinaryDecodingFailed("testKey", err.messageWithContext)) + .toRaise[IO] + ) + } +} diff --git a/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/generators/ErgoTreeGenerator.scala b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/generators/ErgoTreeGenerator.scala new file mode 100644 index 00000000..7f1f610f --- /dev/null +++ b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/generators/ErgoTreeGenerator.scala @@ -0,0 +1,38 @@ +package org.ergoplatform.dex.executor.amm.generators + +import org.ergoplatform.ErgoAddressEncoder +import org.ergoplatform.dex.protocol.{sigmaUtils, ErgoTreeSerializer} +import org.ergoplatform.dex.sources.n2tContracts +import scorex.util.encode.Base16 +import sigmastate.Values.ErgoTree +import sigmastate.lang.Terms.ValueOps +import sigmastate.basics.DLogProtocol.DLogProverInput +import sigmastate.eval.{CompiletimeIRContext, IRContext} +import sigmastate.lang.SigmaCompiler + +object ErgoTreeGenerator { + + implicit private val IR: IRContext = new CompiletimeIRContext() + val sigma = SigmaCompiler(ErgoAddressEncoder.MainnetNetworkPrefix) + + val env = Map( + "Pk" -> DLogProverInput(BigInt(Long.MaxValue).bigInteger).publicImage, + "PoolNFT" -> Array.fill(32)(0: Byte), + "QuoteId" -> Array.fill(32)(1.toByte), + "DexFee" -> 999999L, + "SelfX" -> 888888L, + "MaxMinerFee" -> 777777L, + "MinerPropBytes" -> Base16 + .decode( + "1005040004000e36100204a00b08cd0279be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798ea02d192a39a8cc7a701730073011001020402d19683030193a38cc7b2a57300000193c2b2a57301007473027303830108cdeeac93b1a57304" + ) + .get + ) + + val source = n2tContracts.swapSell + + val tree = + sigmaUtils.updateVersionHeader(ErgoTree.fromProposition(sigma.compile(env, source).asSigmaProp)) + + val serializedSwapTree = ErgoTreeSerializer.default.serialize(tree) +} diff --git a/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/generators/OrdersGenerator.scala b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/generators/OrdersGenerator.scala new file mode 100644 index 00000000..15cf7cbb --- /dev/null +++ b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/generators/OrdersGenerator.scala @@ -0,0 +1,77 @@ +package org.ergoplatform.dex.executor.amm.generators + +import cats.effect.{Clock, Sync} +import org.ergoplatform.dex.domain.amm.{PoolId, Swap, SwapParams} +import cats.syntax.traverse._ +import org.ergoplatform.common.HexString +import org.ergoplatform.dex.domain.AssetAmount +import org.ergoplatform.dex.executor.amm.generators.ErgoTreeGenerator.serializedSwapTree +import org.ergoplatform.ergo.{BoxId, PubKey, TokenId, TxId} +import org.ergoplatform.ergo.domain.{BoxAsset, Output} +import tofu.generate.GenRandom +import tofu.syntax.time.now +import tofu.syntax.monadic._ + +import scala.util.Random + +object OrdersGenerator { + + def genSwapOrders[F[+_]: Sync: Clock: GenRandom](qty: Int): F[List[Swap]] = + (0 until qty).toList.traverse(_ => genSwapOrder[F]) + + def genSwapOrder[F[_]: Sync: GenRandom: Clock]: F[Swap] = for { + timestamp <- now.millis + maxMinerFee <- GenRandom.nextLong + tokenHex <- randomHexString + inputAsset <- genAssetAmount + params <- genDummySwapParams(inputAsset) + box <- genDummyOutput(inputAsset) + } yield Swap( + PoolId(TokenId(tokenHex)), + maxMinerFee, + timestamp, + params, + box + ) + + def genDummySwapParams[F[_]: GenRandom: Sync](input: AssetAmount): F[SwapParams] = for { + minOutput <- genAssetAmount + dexFeePerTokenNum <- GenRandom.nextLong + dexFeePerTokenDenom <- GenRandom.nextLong + pubKeyHex <- randomHexString + } yield SwapParams( + input, + minOutput, + dexFeePerTokenNum, + dexFeePerTokenDenom, + PubKey(pubKeyHex) + ) + + def genDummyOutput[F[_]: Sync: GenRandom](input: AssetAmount): F[Output] = for { + boxIdRaw <- randomString(32) + txIdRaw <- randomString(32) + value <- GenRandom.nextLong + index <- GenRandom.nextInt(32) + creationHeight <- GenRandom.nextInt(32) + } yield Output( + BoxId(boxIdRaw), + TxId(txIdRaw), + value, + index, + creationHeight, + serializedSwapTree, + List(BoxAsset(input.id, input.value)), + Map.empty + ) + + def genAssetAmount[F[_]: GenRandom: Sync]: F[AssetAmount] = for { + assetId <- randomHexString + value <- GenRandom.nextLong + } yield AssetAmount(TokenId(assetId), value) + + def randomHexString[F[_]: Sync] = + randomString(32).map(str => HexString.fromBytes(str.getBytes)) + + def randomString[F[_]: Sync](length: Int): F[String] = + Sync[F].delay(Random.alphanumeric.take(length).mkString) +} diff --git a/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/services/CFMMBacklogTests.scala b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/services/CFMMBacklogTests.scala new file mode 100644 index 00000000..972b3a4a --- /dev/null +++ b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/services/CFMMBacklogTests.scala @@ -0,0 +1,144 @@ +package org.ergoplatform.dex.executor.amm.services + +import cats.effect.IO +import cats.syntax.traverse._ +import org.ergoplatform.dex.domain.amm.{CFMMOrder, WeightedOrder} +import org.scalatest.flatspec.AnyFlatSpec +import org.ergoplatform.dex.executor.amm._ +import org.scalatest.matchers.should +import org.ergoplatform.dex.executor.amm.generators._ +import org.ergoplatform.dex.executor.amm.utils.isoK._ +import org.ergoplatform.dex.executor.amm.repositories.CFMMOrders +import org.ergoplatform.dex.executor.amm.utils.Ordering.checkDescSort +import org.ergoplatform.dex.executor.amm.utils.genRandoms.genRandom +import tofu.logging.Logs +import org.ergoplatform.dex.executor.amm.services.StreamF +import tofu.fs2Instances._ +import fs2.Stream +import org.ergoplatform.common.cache.CacheStreaming + +import scala.concurrent.ExecutionContext + +class CFMMBacklogTests extends AnyFlatSpec with should.Matchers { + + implicit val timer = IO.timer(ExecutionContext.global) + + implicit val logs = Logs.empty[IO, IO] + + "CFMMBacklog" should "correctly retry non executed orders" in { + + val ordersQty = 100 + + implicit val cfgHas = configs.has.cfgWithOnlySuspended + + val ordersFromBacklog = for { + implicit0(cfmmOrders: CFMMOrders[IO]) <- CFMMOrdersGenerator.genMapBased[IO, IO] + orders <- OrdersGenerator.genSwapOrders[IO](ordersQty) + suspendedOrders = + orders.map(order => + order.copy(timestamp = order.timestamp - configs.backlogCfgWithOnlySuspended.orderExecutionTime.toMillis * 2) + ) + implicit0(streamCaching: CacheStreaming[StreamF]) = CacheStreamingGenerator.cacheStreamingFor(List.empty) + backlog <- CFMMBacklog.make[IO, StreamF, IO] + _ <- suspendedOrders.traverse(cfmmOrders.put) + _ <- suspendedOrders.traverse(backlog.suspend) + fromBacklog <- suspendedOrders.traverse(_ => backlog.get) + } yield fromBacklog + + val ordersOpts = ordersFromBacklog.unsafeRunSync() + + val orders = ordersOpts.collect[CFMMOrder, List[CFMMOrder]] { case Some(order) => + order + } + + orders.length shouldBe ordersQty + + checkDescSort(orders.map(WeightedOrder.fromOrder)) shouldBe true + } + + "CFMMBacklog" should "correctly drop outdated orders" in { + + val normalOrdersQty = 50 + + val outdatedOrdersQty = 50 + + implicit val cfgHas = configs.has.cfgWithNoSuspended + + val ordersFromBacklog = for { + implicit0(cfmmOrders: CFMMOrders[IO]) <- CFMMOrdersGenerator.genMapBased[IO, IO] + orders <- OrdersGenerator.genSwapOrders[IO](normalOrdersQty + outdatedOrdersQty) + outdatedOrders = + orders + .take(outdatedOrdersQty) + .map(order => + order.copy(timestamp = order.timestamp - configs.backlogCfgWithNoSuspended.orderLifetime.toMillis * 4) + ) + implicit0(streamCaching: CacheStreaming[StreamF]) = CacheStreamingGenerator.cacheStreamingFor(List.empty) + normalOrders = orders.drop(outdatedOrdersQty) + backlog <- CFMMBacklog.make[IO, StreamF, IO] + _ <- (normalOrders ++ outdatedOrders).traverse(backlog.put) + fromBacklog <- normalOrders.traverse(_ => backlog.get) + } yield fromBacklog + + val ordersOpts = ordersFromBacklog.unsafeRunSync() + + val orders = ordersOpts.collect[CFMMOrder, List[CFMMOrder]] { case Some(order) => + order + } + + orders.length shouldBe normalOrdersQty + + checkDescSort(orders.map(WeightedOrder.fromOrder)) shouldBe true + } + + "CFMMBacklog" should "correctly process orders by weight during pipeline" in { + val ordersQty = 100 + + implicit val cfgHas = configs.has.cfgWithNoSuspended + + val ordersFromBacklog = for { + implicit0(cfmmOrders: CFMMOrders[IO]) <- CFMMOrdersGenerator.genMapBased[IO, IO] + orders <- OrdersGenerator.genSwapOrders[IO](ordersQty) + implicit0(streamCaching: CacheStreaming[StreamF]) = CacheStreamingGenerator.cacheStreamingFor(List.empty) + backlog <- CFMMBacklog.make[IO, StreamF, IO] + _ <- orders.traverse(backlog.put) + fromBacklog <- orders.traverse(_ => backlog.get) + } yield fromBacklog + + val ordersOpts = ordersFromBacklog.unsafeRunSync() + + val orders = ordersOpts.collect[CFMMOrder, List[CFMMOrder]] { case Some(order) => + order + } + + orders.length shouldBe ordersQty + + checkDescSort(orders.map(WeightedOrder.fromOrder)) shouldBe true + } + + "CFMMBacklog" should "correctly restore" in { + + val ordersQty = 100 + + implicit val cfgHas = configs.has.cfgWithNoSuspended + + val ordersFromBacklog = for { + implicit0(cfmmOrders: CFMMOrders[IO]) <- CFMMOrdersGenerator.genMapBased[IO, IO] + orders <- OrdersGenerator.genSwapOrders[IO](ordersQty) + implicit0(streamCaching: CacheStreaming[StreamF]) = CacheStreamingGenerator.cacheStreamingFor(orders) + _ <- orders.traverse(cfmmOrders.put) + backlog <- CFMMBacklog.make[IO, StreamF, IO] + fromBacklog <- orders.traverse(_ => backlog.get) + } yield fromBacklog + + val ordersOpts = ordersFromBacklog.unsafeRunSync() + + val orders = ordersOpts.collect[CFMMOrder, List[CFMMOrder]] { case Some(order) => + order + } + + orders.length shouldBe ordersQty + + checkDescSort(orders.map(WeightedOrder.fromOrder)) shouldBe true + } +} diff --git a/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/services/package.scala b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/services/package.scala new file mode 100644 index 00000000..ac4bb095 --- /dev/null +++ b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/services/package.scala @@ -0,0 +1,9 @@ +package org.ergoplatform.dex.executor.amm + +import cats.effect.IO +import fs2.Stream + +package object services { + + type StreamF[+A] = Stream[IO, A] +} diff --git a/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/utils/Ordering.scala b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/utils/Ordering.scala new file mode 100644 index 00000000..68e9b7d8 --- /dev/null +++ b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/utils/Ordering.scala @@ -0,0 +1,14 @@ +package org.ergoplatform.dex.executor.amm.utils + +import scala.math.Ordering.Implicits.infixOrderingOps + +object Ordering { + + def checkDescSort[A: Ordering](in: List[A]): Boolean = + (in.headOption, in.tail.headOption) match { + case (Some(head), Some(nextAfterHead)) if head >= nextAfterHead => + checkDescSort(in.tail) + case (Some(_), None) => true + case _ => false + } +} diff --git a/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/utils/package.scala b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/utils/package.scala new file mode 100644 index 00000000..3589cc32 --- /dev/null +++ b/modules/amm-executor/src/test/scala/org/ergoplatform/dex/executor/amm/utils/package.scala @@ -0,0 +1,31 @@ +package org.ergoplatform.dex.executor.amm + +import cats.effect.IO +import tofu.generate.GenRandom +import tofu.lift.IsoK + +import scala.util.Random + +package object utils { + + object genRandoms { + + implicit val genRandom: GenRandom[IO] = new GenRandom[IO] { + + override def nextLong: IO[Long] = + IO.delay(Random.nextLong()) + + override def nextInt(n: Int): IO[Int] = + IO.delay(Random.nextInt(n)) + } + } + + object isoK { + + implicit val isokIO2IO: IsoK[IO, IO] = new IsoK[IO, IO] { + override def to[A](fa: IO[A]): IO[A] = fa + + override def from[A](ga: IO[A]): IO[A] = ga + } + } +} diff --git a/modules/cache/src/main/scala/org/ergoplatform/common/cache/Cache.scala b/modules/cache/src/main/scala/org/ergoplatform/common/cache/Cache.scala index 69ec5c9f..af5f4937 100644 --- a/modules/cache/src/main/scala/org/ergoplatform/common/cache/Cache.scala +++ b/modules/cache/src/main/scala/org/ergoplatform/common/cache/Cache.scala @@ -2,20 +2,24 @@ package org.ergoplatform.common.cache import cats.data.OptionT import cats.syntax.either._ +import cats.syntax.traverse._ import cats.syntax.show._ -import cats.{Functor, Monad, Show} +import cats.{Functor, Monad, MonadError, Show} import derevo.derive import derevo.tagless.applyK +import dev.profunktor.redis4cats.data.KeyScanCursor import dev.profunktor.redis4cats.hlist.{HList, Witness} -import org.ergoplatform.common.cache.errors.{BinaryDecodingFailed, BinaryEncodingFailed} +import org.ergoplatform.common.cache.errors.{BinaryDecodingFailed, BinaryEncodingFailed, ValueNotFound} import scodec.Codec import scodec.bits.BitVector import tofu.BracketThrow import tofu.higherKind.Mid import tofu.logging.{Loggable, Logging, Logs} import tofu.syntax.logging._ +import tofu.syntax.loggable._ import tofu.syntax.monadic._ import tofu.syntax.raise._ +import fs2.Stream @derive(applyK) trait Cache[F[_]] { @@ -26,6 +30,10 @@ trait Cache[F[_]] { def del[K: Codec: Loggable](key: K): F[Unit] + def exists[K: Codec: Loggable](key: K): F[Boolean] + + def getAll[V: Codec: Loggable]: F[List[V]] + def flushAll: F[Unit] def transaction[T <: HList](commands: T)(implicit w: Witness[T]): F[Unit] @@ -33,6 +41,8 @@ trait Cache[F[_]] { object Cache { + implicit val loggable: Loggable[Array[Byte]] = Loggable.empty + def make[I[_]: Functor, F[_]: Monad: BracketThrow](implicit redis: Redis.Plain[F], makeTx: MakeRedisTransaction[F], @@ -43,7 +53,7 @@ object Cache { } final class Redis[ - F[_]: Monad: BinaryEncodingFailed.Raise: BinaryDecodingFailed.Raise: BracketThrow + F[_]: Monad: BinaryEncodingFailed.Raise: BinaryDecodingFailed.Raise: ValueNotFound.Raise: BracketThrow ](implicit redis: Redis.Plain[F], makeTx: MakeRedisTransaction[F]) extends Cache[F] { @@ -73,15 +83,7 @@ object Cache { .leftMap(err => BinaryEncodingFailed(key.show, err.messageWithContext)) .toRaise ) - raw <- OptionT(redis.get(k.toByteArray)) - value <- OptionT.liftF( - Codec[V] - .decode(BitVector(raw)) - .toEither - .map(_.value) - .leftMap(err => BinaryDecodingFailed(key.show, err.messageWithContext)) - .toRaise - ) + value <- getValue[V](k.toByteArray) } yield value).value def del[K: Codec: Loggable](key: K): F[Unit] = @@ -95,8 +97,66 @@ object Cache { def flushAll: F[Unit] = redis.flushAll + def exists[K: Codec: Loggable](key: K): F[Boolean] = + Codec[K] + .encode(key) + .toEither + .leftMap(err => BinaryEncodingFailed(key.show, err.messageWithContext)) + .toRaise + .flatMap(k => redis.exists(k.toByteArray)) + def transaction[T <: HList](commands: T)(implicit w: Witness[T]): F[Unit] = makeTx.make.use(_.exec(commands).void) + + def getAll[V: Codec: Loggable]: F[List[V]] = { + def iterate(acc: List[V], scanner: KeyScanCursor[Array[Byte]]): F[List[V]] = + for { + elems <- scanner.keys + .traverse(key => + getValue[V](key).value >>= { + case Some(elem) => elem.pure + case None => ValueNotFound(key.logShow).raise[F, V] + } + ) + newAcc = acc ++ elems + toReturn <- + if (scanner.isFinished) newAcc.pure + else redis.scan(scanner) >>= (iterate(newAcc, _)) + } yield toReturn + + redis.scan >>= (iterate(List.empty, _)) + } + + private def getValue[V: Codec: Loggable](key: Array[Byte]) = for { + raw <- OptionT(redis.get(key)) + value <- OptionT.liftF( + Codec[V] + .decode(BitVector(raw)) + .toEither + .map(_.value) + .leftMap(err => BinaryDecodingFailed(key.show, err.messageWithContext)) + .toRaise + ) + } yield value + + def getAllStream[V: Codec : Loggable]: Stream[F, V] = { + def iterate(scanner: KeyScanCursor[Array[Byte]]): Stream[F, V] = + Stream.evals( + scanner.keys + .traverse(key => + getValue[V](key).value >>= { + case Some(elem) => elem.pure + case None => ValueNotFound(key.logShow).raise[F, V] + } + ) + ) ++ ( + if (scanner.isFinished) Stream.empty + else Stream.eval(redis.scan(scanner)) >>= iterate + ) + + Stream.eval(redis.scan) >>= iterate + } + } final class CacheTracing[F[_]: Monad: Logging] extends Cache[Mid[F, *]] { @@ -115,5 +175,16 @@ object Cache { def transaction[T <: HList](commands: T)(implicit w: Witness[T]): Mid[F, Unit] = fa => trace"transaction begin" >> fa.flatTap(_ => trace"transaction end") + + def exists[K: Codec: Loggable](key: K): Mid[F, Boolean] = + _ >>= (r => trace"exists(key=$key) -> $r" as r) + + def getAll[V: Codec: Loggable]: Mid[F, List[V]] = + _ >>= (r => trace"getAll() -> length: ${r.length}" as r) + + def getAllStream[V: Codec : Loggable]: Stream[Mid[F, *], V] = + Stream.eval( + (fa: F[V]) => trace"getAllStream()" >> fa + ) } } diff --git a/modules/cache/src/main/scala/org/ergoplatform/common/cache/CacheStreaming.scala b/modules/cache/src/main/scala/org/ergoplatform/common/cache/CacheStreaming.scala new file mode 100644 index 00000000..b9e6e984 --- /dev/null +++ b/modules/cache/src/main/scala/org/ergoplatform/common/cache/CacheStreaming.scala @@ -0,0 +1,92 @@ +package org.ergoplatform.common.cache + +import cats.{FlatMap, Monad, Show} +import cats.data.OptionT +import dev.profunktor.redis4cats.data.KeyScanCursor +import org.ergoplatform.common.cache.errors.{BinaryDecodingFailed, ValueNotFound} +import scodec.Codec +import derevo.tagless.applyK +import tofu.logging.{Loggable, Logging, Logs} +import cats.syntax.show._ +import cats.syntax.traverse._ +import cats.syntax.either._ +import derevo.derive +import tofu.syntax.monadic._ +import scodec.bits.BitVector +import tofu.BracketThrow +import tofu.higherKind.Mid +import tofu.higherKind.derived.representableK +import tofu.streams.{Evals, Merge} +import tofu.syntax.raise._ +import tofu.syntax.loggable._ +import tofu.syntax.logging._ +import tofu.syntax.streams.evals._ +import tofu.syntax.streams.merge._ + +@derive(representableK) +trait CacheStreaming[S[_]] { + + def getAll[V: Codec: Loggable]: S[V] +} + +object CacheStreaming { + + implicit val loggable: Loggable[Array[Byte]] = Loggable[String].contramap(new String(_)) + + def make[ + I[_]: FlatMap, + F[_]: Evals[*[_], G]: Merge: Monad, + G[_]: ValueNotFound.Raise: BracketThrow + ](implicit + redis: Redis.Plain[G], + makeTx: MakeRedisTransaction[G], + logs: Logs[I, G] + ): I[CacheStreaming[F]] = + logs.forService[CacheStreaming[F]].map { implicit logging => + new Live[F, G] + } + + final private class Live[ + F[_]: Evals[*[_], G]: Merge: FlatMap, + G[_]: ValueNotFound.Raise: BracketThrow + ](implicit + redis: Redis.Plain[G], + makeTx: MakeRedisTransaction[G], + logging: Logging[G] + ) extends CacheStreaming[F] { + + implicit def showFromLoggable[T](implicit l: Loggable[T]): Show[T] = l.showInstance + + def getAll[V: Codec: Loggable]: F[V] = { + + def iterate(scanner: KeyScanCursor[Array[Byte]]): F[V] = { + val elems = evals( + scanner.keys + .traverse(key => + getValue[V](key).value >>= { + case Some(elem) => + debug"Restore elem $key -> $elem" >> elem.pure + case None => ValueNotFound(key.logShow).raise[G, V] + } + ) + ) + if (scanner.isFinished) elems + else elems.merge(eval(redis.scan) >>= iterate) + } + + eval(redis.scan) >>= iterate + } + + private def getValue[V: Codec: Loggable](key: Array[Byte]): OptionT[G, V] = for { + raw <- OptionT(redis.get(key)) + value <- OptionT.liftF( + Codec[V] + .decode(BitVector(raw)) + .toEither + .map(_.value) + .leftMap(err => BinaryDecodingFailed(key.show, err.messageWithContext)) + .toRaise + ) + } yield value + } +} diff --git a/modules/cache/src/main/scala/org/ergoplatform/common/cache/errors.scala b/modules/cache/src/main/scala/org/ergoplatform/common/cache/errors.scala index 3e265d4e..c8a25a74 100644 --- a/modules/cache/src/main/scala/org/ergoplatform/common/cache/errors.scala +++ b/modules/cache/src/main/scala/org/ergoplatform/common/cache/errors.scala @@ -13,4 +13,9 @@ object errors { extends Exception(s"Failed to decode value {$showValue}. $reason") object BinaryDecodingFailed extends Errors.Companion[BinaryDecodingFailed] + + final case class ValueNotFound(key: String) + extends Exception(s"Failed to get value by key {$key}.") + + object ValueNotFound extends Errors.Companion[ValueNotFound] } diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/CFMMOrder.scala b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/CFMMOrder.scala index d85ef88c..77108dad 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/CFMMOrder.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/CFMMOrder.scala @@ -2,7 +2,12 @@ package org.ergoplatform.dex.domain.amm import derevo.circe.{decoder, encoder} import derevo.derive +import io.circe.{Decoder, Encoder} +import org.ergoplatform.dex.domain.AssetAmount +import org.ergoplatform.dex.protocol.codecs.{ergoLikeTransactionDecoder, ergoLikeTransactionEncoder} import org.ergoplatform.ergo.domain.Output +import scodec.Codec +import scodec.codecs.{int64, int8} import tofu.logging.derivation.loggable @derive(encoder, decoder, loggable) @@ -19,10 +24,46 @@ sealed trait CFMMOrder { final case class Deposit(poolId: PoolId, maxMinerFee: Long, timestamp: Long, params: DepositParams, box: Output) extends CFMMOrder +object Deposit { + + implicit val codec: Codec[Deposit] = + (implicitly[Codec[PoolId]] :: + int64 :: + int64 :: + implicitly[Codec[DepositParams]] :: + implicitly[Codec[Output]]).as[Deposit] +} + @derive(encoder, decoder, loggable) final case class Redeem(poolId: PoolId, maxMinerFee: Long, timestamp: Long, params: RedeemParams, box: Output) extends CFMMOrder +object Redeem { + + implicit val codec: Codec[Redeem] = + (implicitly[Codec[PoolId]] :: + int64 :: + int64 :: + implicitly[Codec[RedeemParams]] :: + implicitly[Codec[Output]]).as[Redeem] +} + @derive(encoder, decoder, loggable) final case class Swap(poolId: PoolId, maxMinerFee: Long, timestamp: Long, params: SwapParams, box: Output) extends CFMMOrder + +object Swap { + + implicit val codec: Codec[Swap] = + (implicitly[Codec[PoolId]] :: + int64 :: + int64 :: + implicitly[Codec[SwapParams]] :: + implicitly[Codec[Output]]).as[Swap] +} + +object CFMMOrder { + + implicit val codec: Codec[CFMMOrder] = + Codec.coproduct[CFMMOrder].discriminatedByIndex(int8) +} diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/DepositParams.scala b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/DepositParams.scala index de63771e..8f2f82a1 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/DepositParams.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/DepositParams.scala @@ -4,6 +4,8 @@ import derevo.circe.{decoder, encoder} import derevo.derive import org.ergoplatform.ergo.{Address, PubKey} import org.ergoplatform.dex.domain.AssetAmount +import scodec.Codec +import scodec.codecs.int64 import tofu.logging.derivation.loggable @derive(encoder, decoder, loggable) @@ -13,3 +15,14 @@ final case class DepositParams( dexFee: Long, redeemer: PubKey ) + +object DepositParams { + + implicit val codec: Codec[DepositParams] = + ( + implicitly[Codec[AssetAmount]] :: + implicitly[Codec[AssetAmount]] :: + int64 :: + implicitly[Codec[PubKey]] + ).as[DepositParams] +} diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/RedeemParams.scala b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/RedeemParams.scala index c3fe9b92..ff03e9ec 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/RedeemParams.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/RedeemParams.scala @@ -4,7 +4,17 @@ import derevo.circe.{decoder, encoder} import derevo.derive import org.ergoplatform.ergo.{Address, PubKey} import org.ergoplatform.dex.domain.AssetAmount +import scodec.Codec +import scodec.codecs.int64 import tofu.logging.derivation.loggable @derive(encoder, decoder, loggable) final case class RedeemParams(lp: AssetAmount, dexFee: Long, redeemer: PubKey) + +object RedeemParams { + + implicit val codec: Codec[RedeemParams] = + (implicitly[Codec[AssetAmount]] :: + int64 :: + implicitly[Codec[PubKey]]).as[RedeemParams] +} diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/SwapParams.scala b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/SwapParams.scala index f792a1e9..1061cd2b 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/SwapParams.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/SwapParams.scala @@ -4,6 +4,8 @@ import derevo.circe.{decoder, encoder} import derevo.derive import org.ergoplatform.dex.domain.AssetAmount import org.ergoplatform.ergo.{Address, PubKey} +import scodec.Codec +import scodec.codecs.int64 import tofu.logging.derivation.loggable @derive(encoder, decoder, loggable) @@ -14,3 +16,13 @@ final case class SwapParams( dexFeePerTokenDenom: Long, redeemer: PubKey ) + +object SwapParams { + + implicit val codec: Codec[SwapParams] = + (implicitly[Codec[AssetAmount]] :: + implicitly[Codec[AssetAmount]] :: + int64 :: + int64 :: + implicitly[Codec[PubKey]]).as[SwapParams] +} diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/package.scala b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/package.scala index b6a6c9b0..447e0340 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/package.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/package.scala @@ -12,6 +12,7 @@ import org.ergoplatform.common.HexString import org.ergoplatform.dex.domain.amm.PoolId import org.ergoplatform.ergo.{BoxId, TokenId} import scodec.bits.ByteVector +import scodec.codecs.{uint16, utf8} import sttp.tapir.{Codec, Schema, Validator} import tofu.logging.derivation.loggable @@ -83,6 +84,24 @@ package object amm { implicit val get: Get[OrderId] = deriving implicit val put: Put[OrderId] = deriving + implicit def codec: scodec.Codec[OrderId] = + scodec.codecs.variableSizeBits(uint16, utf8).xmap(OrderId(_), _.value) + + implicit def recordSerializer[F[_]: Sync]: RecordSerializer[F, OrderId] = serializerViaCirceEncoder + implicit def recordDeserializer[F[_]: Sync]: RecordDeserializer[F, OrderId] = deserializerViaKafkaDecoder + } + + @derive(show, loggable, encoder, decoder) + case class WeightedOrder(weight: Long, orderId: OrderId, timestamp: Long) + + object WeightedOrder { + + def fromOrder(order: CFMMOrder): WeightedOrder = + WeightedOrder(order.maxMinerFee, order.id, order.timestamp) + + implicit val ord: Ordering[WeightedOrder] = + (x: WeightedOrder, y: WeightedOrder) => x.weight compare y.weight + implicit def recordSerializer[F[_]: Sync]: RecordSerializer[F, OrderId] = serializerViaCirceEncoder implicit def recordDeserializer[F[_]: Sync]: RecordDeserializer[F, OrderId] = deserializerViaKafkaDecoder } diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/BoxAsset.scala b/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/BoxAsset.scala index 5440e18a..a49a1b60 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/BoxAsset.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/BoxAsset.scala @@ -8,6 +8,8 @@ import shapeless.Lazy import tofu.logging.derivation.loggable import org.ergoplatform.ergo.services.explorer.models.{BoxAsset => ExplorerBoxAsset} import org.ergoplatform.ergo.services.node.models.{BoxAsset => NodeAsset} +import scodec.Codec +import scodec.codecs.int64 @derive(encoder, decoder, loggable) final case class BoxAsset( @@ -23,4 +25,7 @@ object BoxAsset { def fromNode(a: NodeAsset): BoxAsset = BoxAsset(a.tokenId, a.amount) + + implicit val codec: Codec[BoxAsset] = + (implicitly[Codec[TokenId]] :: int64).as[BoxAsset] } diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/Output.scala b/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/Output.scala index 423d20c2..aea50f5c 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/Output.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/Output.scala @@ -5,6 +5,8 @@ import derevo.derive import org.ergoplatform.ergo.services.explorer.models.{Output => ExplorerOutput} import org.ergoplatform.ergo.services.node.models.{Output => NodeOutput} import org.ergoplatform.ergo.{BoxId, SErgoTree, TxId} +import scodec.Codec +import scodec.codecs.{int32, int64, list} import tofu.logging.derivation.loggable @derive(encoder, decoder, loggable) @@ -44,4 +46,22 @@ object Output { o.assets.map(BoxAsset.fromNode), Map.empty // todo ) + + private val tupleCodec: Codec[(RegisterId, SConstant)] = + implicitly[Codec[RegisterId]].pairedWith(implicitly[Codec[SConstant]]) + + private val mapCodec: Codec[Map[RegisterId, SConstant]] = + list(tupleCodec).xmap(_.toMap, _.toList) + + implicit val codec: Codec[Output] = + ( + implicitly[Codec[BoxId]] :: + implicitly[Codec[TxId]] :: + int64 :: + int32 :: + int32 :: + implicitly[Codec[SErgoTree]] :: + list(implicitly[Codec[BoxAsset]]) :: + mapCodec + ).as[Output] } diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/RegisterId.scala b/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/RegisterId.scala index 652e61d6..8e0b192f 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/RegisterId.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/RegisterId.scala @@ -4,6 +4,9 @@ import cats.syntax.either._ import doobie.util.{Get, Put} import enumeratum.{CirceEnum, Enum, EnumEntry} import io.circe.{KeyDecoder, KeyEncoder} +import org.ergoplatform.ergo.TxId +import scodec.{Attempt, Codec, Err} +import scodec.codecs.{uint16, utf8} import tofu.logging.Loggable sealed abstract class RegisterId extends EnumEntry @@ -33,4 +36,17 @@ object RegisterId extends Enum[RegisterId] with CirceEnum[RegisterId] { implicit val put: Put[RegisterId] = Put[String].contramap[RegisterId](_.entryName) + + implicit val codec: scodec.Codec[RegisterId] = + scodec.codecs + .variableSizeBits(uint16, utf8) + .exmap( + str => + Attempt.fromEither( + Either + .catchNonFatal(RegisterId.withName(str)) + .leftMap(err => Err(err.getMessage)) + ), + registerId => Attempt.successful(registerId.entryName) + ) } diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/SConstant.scala b/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/SConstant.scala index 1d00eca2..cf95571a 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/SConstant.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/SConstant.scala @@ -2,11 +2,15 @@ package org.ergoplatform.ergo.domain import derevo.circe.encoder import derevo.derive +import cats.syntax.either._ import io.circe.Decoder import org.ergoplatform.common.HexString import org.ergoplatform.ergo.PubKey import org.ergoplatform.ergo.domain.SigmaType.SimpleKindSigmaType._ import org.ergoplatform.ergo.domain.SigmaType._ +import scodec.{Attempt, Codec, Err} +import scodec.codecs._ +import scodec.codecs.uint8 import tofu.logging.derivation.loggable @derive(encoder, loggable) @@ -17,18 +21,55 @@ object SConstant { @derive(encoder, loggable) final case class IntConstant(value: Int) extends SConstant + object IntConstant { + implicit val codec: Codec[IntConstant] = int32.as[IntConstant] + } + @derive(encoder, loggable) final case class LongConstant(value: Long) extends SConstant + object LongConstant { + implicit val codec: Codec[LongConstant] = int64.as[LongConstant] + } + @derive(encoder, loggable) final case class ByteaConstant(value: HexString) extends SConstant + object ByteaConstant { + + implicit val codec: Codec[ByteaConstant] = + scodec.codecs + .variableSizeBits(uint16, utf8) + .exmap( + str => + Attempt.fromEither( + Either + .catchNonFatal(HexString.unsafeFromString(str)) + .map(ByteaConstant(_)) + .leftMap(err => Err(err.getMessage)) + ), + const => Attempt.successful(const.value.unwrapped) + ) + } + @derive(encoder, loggable) final case class SigmaPropConstant(value: PubKey) extends SConstant + object SigmaPropConstant { + implicit val codec: Codec[SigmaPropConstant] = implicitly[Codec[PubKey]].as[SigmaPropConstant] + } + @derive(encoder, loggable) final case class UnresolvedConstant(raw: String) extends SConstant + object UnresolvedConstant { + + implicit val codec: Codec[UnresolvedConstant] = + scodec.codecs + .variableSizeBits(uint16, utf8) + .xmap(UnresolvedConstant(_), _.raw) + } + implicit val decoder: Decoder[SConstant] = { c => c.downField("renderedValue").as[String].flatMap { value => c.downField("sigmaType").as[SigmaType].map { @@ -40,4 +81,7 @@ object SConstant { } } } + + implicit val codec: Codec[SConstant] = + Codec.coproduct[SConstant].discriminatedByIndex(uint8) } diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/ergo/package.scala b/modules/dex-core/src/main/scala/org/ergoplatform/ergo/package.scala index 863e8005..54049bfb 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/ergo/package.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/ergo/package.scala @@ -23,9 +23,11 @@ import io.estatico.newtype.ops._ import org.ergoplatform.common.HexString import org.ergoplatform.common.errors.RefinementFailed import org.ergoplatform.ergo.CurrencyId +import org.ergoplatform.ergo.domain.SConstant.ByteaConstant import org.ergoplatform.ergo.syntax.PubKeyOps import pureconfig.ConfigReader import pureconfig.error.CannotConvert +import scodec.{Attempt, Err} import scodec.bits.ByteVector import scorex.crypto.hash.Sha256 import scorex.util.encode.Base16 @@ -69,6 +71,9 @@ package object ergo { implicit val put: Put[TxId] = deriving implicit def recordDeserializer[F[_]: Sync]: RecordDeserializer[F, TxId] = deserializerViaKafkaDecoder + + implicit def codec: scodec.Codec[TxId] = + scodec.codecs.variableSizeBits(uint16, utf8).xmap(TxId(_), _.value) } @newtype case class BlockId(value: String) @@ -237,6 +242,20 @@ package object ergo { ): F[SErgoTree] = HexString.fromString(s).map(SErgoTree.apply) def unsafeFromString(s: String): SErgoTree = SErgoTree(HexString.unsafeFromString(s)) + + implicit def codec: scodec.Codec[SErgoTree] = + scodec.codecs + .variableSizeBits(uint16, utf8) + .exmap( + str => + Attempt.fromEither( + Either + .catchNonFatal(HexString.unsafeFromString(str)) + .map(SErgoTree(_)) + .leftMap(err => Err(err.getMessage)) + ), + tree => Attempt.successful(tree.value.unwrapped) + ) } @newtype case class ErgoTreeTemplate(value: HexString) { @@ -290,5 +309,19 @@ package object ergo { ): F[PubKey] = HexString.fromString(s).map(PubKey.apply) def unsafeFromString(s: String): PubKey = PubKey(HexString.unsafeFromString(s)) + + implicit val codec: scodec.Codec[PubKey] = + scodec.codecs + .variableSizeBits(uint16, utf8) + .exmap( + str => + Attempt.fromEither( + Either + .catchNonFatal(HexString.unsafeFromString(str)) + .map(PubKey(_)) + .leftMap(err => Err(err.getMessage)) + ), + tree => Attempt.successful(tree.value.unwrapped) + ) } }