From f76d0e9d7feda9c85e65bf60a58f7b9501ab2726 Mon Sep 17 00:00:00 2001 From: oskin1 Date: Mon, 14 Feb 2022 18:28:25 +0300 Subject: [PATCH 1/9] WIP: ErgoDEX backlog impl. --- .../ergoplatform/dex/executor/amm/App.scala | 8 +-- .../executor/amm/modules/CFMMBacklog.scala | 50 ++++++++++++++++++- .../executor/amm/processes/Registerer.scala | 47 +++++++++++++++++ .../dex/executor/amm/streaming.scala | 6 +-- .../common/streaming/StreamingCircuit.scala | 2 - 5 files changed, 102 insertions(+), 11 deletions(-) create mode 100644 modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Registerer.scala diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala index ada1e081..debe708d 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala @@ -15,7 +15,7 @@ import org.ergoplatform.dex.executor.amm.interpreters.{CFMMInterpreter, N2TCFMMI import org.ergoplatform.dex.executor.amm.processes.Executor import org.ergoplatform.dex.executor.amm.repositories.CFMMPools import org.ergoplatform.dex.executor.amm.services.Execution -import org.ergoplatform.dex.executor.amm.streaming.{CFMMCircuit, CFMMConsumerIn, CFMMConsumerRetries, CFMMProducerRetries} +import org.ergoplatform.dex.executor.amm.streaming.{CFMMCircuit, CFMMConsumer, CFMMConsumerRetries, CFMMProducerRetries} import org.ergoplatform.dex.protocol.amm.AMMType.{CFMMType, N2T_CFMM, T2T_CFMM} import org.ergoplatform.ergo.modules.ErgoNetwork import org.ergoplatform.ergo.services.explorer.{ErgoExplorer, ErgoExplorerStreaming} @@ -47,13 +47,13 @@ object App extends EnvApp[AppContext] { ctx = AppContext.init(configs) implicit0(isoKRun: IsoK[RunF, InitF]) = isoKRunByContext(ctx) implicit0(e: ErgoAddressEncoder) = ErgoAddressEncoder(configs.protocol.networkType.prefix) - implicit0(confirmedOrders: CFMMConsumerIn[StreamF, RunF, Confirmed]) = + implicit0(confirmedOrders: CFMMConsumer[StreamF, RunF, Confirmed]) = makeConsumer[OrderId, Confirmed[CFMMOrder]](configs.consumers.confirmedOrders) - implicit0(unconfirmedOrders: CFMMConsumerIn[StreamF, RunF, Unconfirmed]) = + implicit0(unconfirmedOrders: CFMMConsumer[StreamF, RunF, Unconfirmed]) = makeConsumer[OrderId, Unconfirmed[CFMMOrder]](configs.consumers.unconfirmedOrders) implicit0(consumerRetries: CFMMConsumerRetries[StreamF, RunF]) = makeConsumer[OrderId, Delayed[CFMMOrder]](configs.consumers.ordersRetry) - implicit0(orders: CFMMConsumerIn[StreamF, RunF, Id]) = + implicit0(orders: CFMMConsumer[StreamF, RunF, Id]) = Consumer.combine2(confirmedOrders, unconfirmedOrders)(_.entity, _.entity) implicit0(producerRetries: CFMMProducerRetries[StreamF]) <- Producer.make[InitF, StreamF, RunF, OrderId, Delayed[CFMMOrder]](configs.producers.ordersRetry) diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala index 41a413ce..c5e6ad84 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala @@ -1,6 +1,13 @@ package org.ergoplatform.dex.executor.amm.modules +import cats.Monad +import cats.effect.Timer +import cats.effect.concurrent.Ref import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId} +import tofu.concurrent.MakeRef +import tofu.syntax.monadic._ + +import scala.concurrent.duration.DurationInt trait CFMMBacklog[F[_]] { @@ -8,11 +15,50 @@ trait CFMMBacklog[F[_]] { */ def put(order: CFMMOrder): F[Unit] - /** Get candidate order for execution. Blocks until an order is available. + /** Pop a candidate order for execution. Blocks until an order is available. */ - def get: F[CFMMOrder] + def pop: F[CFMMOrder] /** Put an order from the backlog. */ def drop(id: OrderId): F[Unit] } + +object CFMMBacklog { + + private val PollInterval = 1.second + + def make[I[_]: Monad, F[_]: Monad: Timer](implicit makeRef: MakeRef[I, F]): I[CFMMBacklog[F]] = + for { + candidatesR <- makeRef.refOf(Set.empty[CFMMOrder]) + survivorsR <- makeRef.refOf(Set.empty[OrderId]) + } yield new EphemeralCFMMBacklog(candidatesR, survivorsR) + + // In-memory orders backlog. + // Note: Not thread safe. + final class EphemeralCFMMBacklog[F[_]: Monad]( + candidatesR: Ref[F, Set[CFMMOrder]], + survivorsR: Ref[F, Set[OrderId]] + )(implicit T: Timer[F]) + extends CFMMBacklog[F] { + + def put(order: CFMMOrder): F[Unit] = + candidatesR.update(_ + order) >> survivorsR.update(_ + order.id) + + def pop: F[CFMMOrder] = { + def tryPop: F[CFMMOrder] = + candidatesR.get.map(_.headOption).flatMap { + case Some(order) => candidatesR.update(_ - order) as order + case None => T.sleep(PollInterval) >> tryPop + } + for { + c <- tryPop + res <- survivorsR.get + .map(_.contains(c.id)) + .ifM(survivorsR.update(_ - c.id) as c, pop) + } yield res + } + + def drop(id: OrderId): F[Unit] = survivorsR.update(_ - id) + } +} diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Registerer.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Registerer.scala new file mode 100644 index 00000000..bd73b3c3 --- /dev/null +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Registerer.scala @@ -0,0 +1,47 @@ +package org.ergoplatform.dex.executor.amm.processes + +import cats.{Functor, Id, Monad} +import derevo.derive +import org.ergoplatform.common.TraceId +import org.ergoplatform.dex.executor.amm.modules.CFMMBacklog +import org.ergoplatform.dex.executor.amm.streaming.CFMMConsumer +import tofu.higherKind.derived.representableK +import tofu.logging.{Logging, Logs} +import tofu.streams.Evals +import tofu.syntax.logging._ +import tofu.syntax.monadic._ +import tofu.syntax.streams.all._ + +@derive(representableK) +trait Registerer[F[_]] { + + def run: F[Unit] +} + +object Registerer { + + def make[ + I[_]: Functor, + F[_]: Monad: Evals[*[_], G], + G[_]: Monad: TraceId.Local + ](implicit + orders: CFMMConsumer[F, G, Id], + backlog: CFMMBacklog[G], + logs: Logs[I, G] + ): I[Registerer[F]] = + logs.forService[Registerer[F]].map(implicit l => new Live[F, G]) + + final private class Live[ + F[_]: Monad: Evals[*[_], G], + G[_]: Monad: Logging: TraceId.Local + ](implicit + orders: CFMMConsumer[F, G, Id], + backlog: CFMMBacklog[G] + ) extends Registerer[F] { + + def run: F[Unit] = + orders.stream + .evalTap(rec => debug"Registered ${rec.message}" >> backlog.put(rec.message)) + .evalMap(_.commit) + } +} diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/streaming.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/streaming.scala index 8ee7e122..d7b5cc8d 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/streaming.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/streaming.scala @@ -6,9 +6,9 @@ import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId} object streaming { - type CFMMConsumerIn[F[_], G[_], Status[_]] = Consumer.Aux[OrderId, Status[CFMMOrder], KafkaOffset, F, G] - type CFMMConsumerRetries[F[_], G[_]] = Consumer.Aux[OrderId, Delayed[CFMMOrder], KafkaOffset, F, G] - type CFMMProducerRetries[F[_]] = Producer[OrderId, Delayed[CFMMOrder], F] + type CFMMConsumer[F[_], G[_], Status[_]] = Consumer.Aux[OrderId, Status[CFMMOrder], KafkaOffset, F, G] + type CFMMConsumerRetries[F[_], G[_]] = Consumer.Aux[OrderId, Delayed[CFMMOrder], KafkaOffset, F, G] + type CFMMProducerRetries[F[_]] = Producer[OrderId, Delayed[CFMMOrder], F] type CFMMCircuit[F[_], G[_]] = StreamingCircuit[OrderId, CFMMOrder, F, G] } diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/common/streaming/StreamingCircuit.scala b/modules/dex-core/src/main/scala/org/ergoplatform/common/streaming/StreamingCircuit.scala index ebccdfd3..78334e78 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/common/streaming/StreamingCircuit.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/common/streaming/StreamingCircuit.scala @@ -3,10 +3,8 @@ package org.ergoplatform.common.streaming import cats.effect.Timer import cats.{FlatMap, Monad} import fs2.kafka.types.KafkaOffset -import org.ergoplatform.ergo.state.{Confirmed, Unconfirmed} import tofu.higherKind.Embed import tofu.streams.{Evals, ParFlatten} -import tofu.syntax.embed._ import tofu.syntax.monadic._ import tofu.syntax.streams.all._ import tofu.syntax.time.now From 254e5aa420263e34fcb3567ad24040caf2a24c39 Mon Sep 17 00:00:00 2001 From: oskin1 Date: Mon, 14 Feb 2022 19:52:25 +0300 Subject: [PATCH 2/9] ErgoDEX Backlog impl. --- build.sbt | 2 +- .../ergoplatform/dex/executor/amm/App.scala | 33 +++++++------ .../dex/executor/amm/config/Consumers.scala | 2 +- .../executor/amm/modules/CFMMBacklog.scala | 9 ++-- .../dex/executor/amm/processes/Cleaner.scala | 46 +++++++++++++++++++ .../dex/executor/amm/processes/Executor.scala | 33 +++++++------ .../executor/amm/processes/Registerer.scala | 8 ++-- .../dex/executor/amm/streaming.scala | 9 ++-- .../dex/domain/amm/CFMMOrder.scala | 12 +++++ 9 files changed, 106 insertions(+), 48 deletions(-) create mode 100644 modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Cleaner.scala diff --git a/build.sbt b/build.sbt index fd590121..e5405b13 100644 --- a/build.sbt +++ b/build.sbt @@ -7,7 +7,7 @@ lazy val commonSettings = List( scalacOptions ++= commonScalacOptions, scalaVersion := "2.12.15", organization := "org.ergoplatform", - version := "1.0.0-M18", + version := "1.1.0-M1", resolvers ++= Seq( Resolver.sonatypeRepo("public"), Resolver.sonatypeRepo("snapshots"), diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala index debe708d..663b28b3 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala @@ -1,6 +1,5 @@ package org.ergoplatform.dex.executor.amm -import cats.Id import cats.effect.{Blocker, Resource} import fs2.kafka.RecordDeserializer import fs2.kafka.serde._ @@ -8,14 +7,15 @@ import org.ergoplatform.ErgoAddressEncoder import org.ergoplatform.common.EnvApp import org.ergoplatform.common.streaming._ import org.ergoplatform.dex.configs.ConsumerConfig -import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId} +import org.ergoplatform.dex.domain.amm.{CFMMOrder, EvaluatedCFMMOrder, OrderId} import org.ergoplatform.dex.executor.amm.config.ConfigBundle import org.ergoplatform.dex.executor.amm.context.AppContext import org.ergoplatform.dex.executor.amm.interpreters.{CFMMInterpreter, N2TCFMMInterpreter, T2TCFMMInterpreter} -import org.ergoplatform.dex.executor.amm.processes.Executor +import org.ergoplatform.dex.executor.amm.modules.CFMMBacklog +import org.ergoplatform.dex.executor.amm.processes.{Cleaner, Executor, Registerer} import org.ergoplatform.dex.executor.amm.repositories.CFMMPools import org.ergoplatform.dex.executor.amm.services.Execution -import org.ergoplatform.dex.executor.amm.streaming.{CFMMCircuit, CFMMConsumer, CFMMConsumerRetries, CFMMProducerRetries} +import org.ergoplatform.dex.executor.amm.streaming.{CFMMOrders, CFMMOrdersGen, EvaluatedCFMMOrders} import org.ergoplatform.dex.protocol.amm.AMMType.{CFMMType, N2T_CFMM, T2T_CFMM} import org.ergoplatform.ergo.modules.ErgoNetwork import org.ergoplatform.ergo.services.explorer.{ErgoExplorer, ErgoExplorerStreaming} @@ -23,7 +23,6 @@ import org.ergoplatform.ergo.services.node.ErgoNode import org.ergoplatform.ergo.state.{Confirmed, Unconfirmed} import sttp.capabilities.fs2.Fs2Streams import sttp.client3.SttpBackend -import sttp.client3.asynchttpclient.cats.AsyncHttpClientCatsBackend import sttp.client3.asynchttpclient.fs2.AsyncHttpClientFs2Backend import tofu.WithRun import tofu.fs2Instances._ @@ -35,29 +34,27 @@ import zio.{ExitCode, URIO, ZEnv} object App extends EnvApp[AppContext] { def run(args: List[String]): URIO[ZEnv, ExitCode] = - init(args.headOption).use { case (executor, ctx) => - val appF = executor.run.compile.drain + init(args.headOption).use { case (executor, registerer, cleaner, ctx) => + val appF = fs2.Stream(executor.run, registerer.run, cleaner.run).parJoinUnbounded.compile.drain appF.run(ctx) as ExitCode.success }.orDie - private def init(configPathOpt: Option[String]): Resource[InitF, (Executor[StreamF], AppContext)] = + private def init(configPathOpt: Option[String]) = for { blocker <- Blocker[InitF] configs <- Resource.eval(ConfigBundle.load[InitF](configPathOpt, blocker)) ctx = AppContext.init(configs) implicit0(isoKRun: IsoK[RunF, InitF]) = isoKRunByContext(ctx) implicit0(e: ErgoAddressEncoder) = ErgoAddressEncoder(configs.protocol.networkType.prefix) - implicit0(confirmedOrders: CFMMConsumer[StreamF, RunF, Confirmed]) = + implicit0(confirmedOrders: CFMMOrdersGen[StreamF, RunF, Confirmed]) = makeConsumer[OrderId, Confirmed[CFMMOrder]](configs.consumers.confirmedOrders) - implicit0(unconfirmedOrders: CFMMConsumer[StreamF, RunF, Unconfirmed]) = + implicit0(unconfirmedOrders: CFMMOrdersGen[StreamF, RunF, Unconfirmed]) = makeConsumer[OrderId, Unconfirmed[CFMMOrder]](configs.consumers.unconfirmedOrders) - implicit0(consumerRetries: CFMMConsumerRetries[StreamF, RunF]) = - makeConsumer[OrderId, Delayed[CFMMOrder]](configs.consumers.ordersRetry) - implicit0(orders: CFMMConsumer[StreamF, RunF, Id]) = + implicit0(orders: CFMMOrders[StreamF, RunF]) = Consumer.combine2(confirmedOrders, unconfirmedOrders)(_.entity, _.entity) - implicit0(producerRetries: CFMMProducerRetries[StreamF]) <- - Producer.make[InitF, StreamF, RunF, OrderId, Delayed[CFMMOrder]](configs.producers.ordersRetry) - implicit0(consumer: CFMMCircuit[StreamF, RunF]) = StreamingCircuit.make[StreamF, RunF, OrderId, CFMMOrder] + implicit0(evaluatedOrders: EvaluatedCFMMOrders[StreamF, RunF]) = + makeConsumer[OrderId, EvaluatedCFMMOrder.Any](configs.consumers.evaluatedOrders) + implicit0(backlog: CFMMBacklog[RunF]) <- Resource.eval(CFMMBacklog.make[InitF, RunF]) implicit0(backend: SttpBackend[RunF, Fs2Streams[RunF]]) <- makeBackend(ctx, blocker) implicit0(explorer: ErgoExplorer[RunF]) = ErgoExplorerStreaming.make[StreamF, RunF] implicit0(node: ErgoNode[RunF]) <- Resource.eval(ErgoNode.make[InitF, RunF]) @@ -68,7 +65,9 @@ object App extends EnvApp[AppContext] { implicit0(interpreter: CFMMInterpreter[CFMMType, RunF]) = CFMMInterpreter.make[RunF] implicit0(execution: Execution[RunF]) <- Resource.eval(Execution.make[InitF, RunF]) executor <- Resource.eval(Executor.make[InitF, StreamF, RunF]) - } yield executor -> ctx + registerer <- Resource.eval(Registerer.make[InitF, StreamF, RunF]) + cleaner <- Resource.eval(Cleaner.make[InitF, StreamF, RunF]) + } yield (executor, registerer, cleaner, ctx) private def makeBackend( ctx: AppContext, diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/Consumers.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/Consumers.scala index a28ff859..e81e93c2 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/Consumers.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/Consumers.scala @@ -9,5 +9,5 @@ import tofu.logging.derivation.loggable final case class Consumers( confirmedOrders: ConsumerConfig, unconfirmedOrders: ConsumerConfig, - ordersRetry: ConsumerConfig + evaluatedOrders: ConsumerConfig ) diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala index c5e6ad84..c73a7bf5 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala @@ -7,6 +7,7 @@ import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId} import tofu.concurrent.MakeRef import tofu.syntax.monadic._ +import scala.collection.immutable.{HashSet, TreeSet} import scala.concurrent.duration.DurationInt trait CFMMBacklog[F[_]] { @@ -30,15 +31,15 @@ object CFMMBacklog { def make[I[_]: Monad, F[_]: Monad: Timer](implicit makeRef: MakeRef[I, F]): I[CFMMBacklog[F]] = for { - candidatesR <- makeRef.refOf(Set.empty[CFMMOrder]) - survivorsR <- makeRef.refOf(Set.empty[OrderId]) + candidatesR <- makeRef.refOf(TreeSet.empty[CFMMOrder]) + survivorsR <- makeRef.refOf(HashSet.empty[OrderId]) } yield new EphemeralCFMMBacklog(candidatesR, survivorsR) // In-memory orders backlog. // Note: Not thread safe. final class EphemeralCFMMBacklog[F[_]: Monad]( - candidatesR: Ref[F, Set[CFMMOrder]], - survivorsR: Ref[F, Set[OrderId]] + candidatesR: Ref[F, TreeSet[CFMMOrder]], + survivorsR: Ref[F, HashSet[OrderId]] )(implicit T: Timer[F]) extends CFMMBacklog[F] { diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Cleaner.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Cleaner.scala new file mode 100644 index 00000000..be2a6c47 --- /dev/null +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Cleaner.scala @@ -0,0 +1,46 @@ +package org.ergoplatform.dex.executor.amm.processes + +import cats.{Functor, Monad} +import derevo.derive +import org.ergoplatform.dex.executor.amm.modules.CFMMBacklog +import org.ergoplatform.dex.executor.amm.streaming.EvaluatedCFMMOrders +import tofu.higherKind.derived.representableK +import tofu.logging.{Logging, Logs} +import tofu.streams.Evals +import tofu.syntax.logging._ +import tofu.syntax.monadic._ +import tofu.syntax.streams.all._ + +@derive(representableK) +trait Cleaner[F[_]] { + + def run: F[Unit] +} + +object Cleaner { + + def make[ + I[_]: Functor, + F[_]: Monad: Evals[*[_], G], + G[_]: Monad + ](implicit + orders: EvaluatedCFMMOrders[F, G], + backlog: CFMMBacklog[G], + logs: Logs[I, G] + ): I[Cleaner[F]] = + logs.forService[Cleaner[F]].map(implicit l => new Live[F, G]) + + final private class Live[ + F[_]: Monad: Evals[*[_], G], + G[_]: Monad: Logging + ](implicit + orders: EvaluatedCFMMOrders[F, G], + backlog: CFMMBacklog[G] + ) extends Cleaner[F] { + + def run: F[Unit] = + orders.stream + .evalTap(rec => debug"Order ${rec.message} is evaluated" >> backlog.drop(rec.message.order.id)) + .evalMap(_.commit) + } +} diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Executor.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Executor.scala index 747ee2cb..bf69d61b 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Executor.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Executor.scala @@ -2,13 +2,14 @@ package org.ergoplatform.dex.executor.amm.processes import cats.effect.Clock import cats.syntax.option._ -import cats.{Functor, Monad} +import cats.{Defer, Functor, Monad, SemigroupK} import derevo.derive import mouse.any._ import org.ergoplatform.common.TraceId import org.ergoplatform.common.streaming.syntax._ import org.ergoplatform.dex.domain.amm.CFMMOrder import org.ergoplatform.dex.executor.amm.config.ExecutionConfig +import org.ergoplatform.dex.executor.amm.modules.CFMMBacklog import org.ergoplatform.dex.executor.amm.services.Execution import org.ergoplatform.dex.executor.amm.streaming.CFMMCircuit import org.ergoplatform.ergo.services.explorer.TxSubmissionErrorParser @@ -34,10 +35,10 @@ object Executor { def make[ I[_]: Functor, - F[_]: Monad: Evals[*[_], G]: ExecutionConfig.Has, + F[_]: Monad: SemigroupK: Defer: Evals[*[_], G]: ExecutionConfig.Has, G[_]: Monad: TraceId.Local: Clock: Catches ](implicit - orders: CFMMCircuit[F, G], + backlog: CFMMBacklog[G], service: Execution[G], logs: Logs[I, G] ): I[Executor[F]] = @@ -48,34 +49,32 @@ object Executor { } final private class Live[ - F[_]: Monad: Evals[*[_], G], + F[_]: Monad: SemigroupK: Defer: Evals[*[_], G], G[_]: Monad: Logging: TraceId.Local: Clock: Catches ](conf: ExecutionConfig)(implicit - orders: CFMMCircuit[F, G], + backlog: CFMMBacklog[G], service: Execution[G], errParser: TxSubmissionErrorParser ) extends Executor[F] { def run: F[Unit] = - orders.stream - .evalMap { rec => + eval(backlog.pop).repeat + .evalMap { order => service - .executeAttempt(rec.message) + .executeAttempt(order) .handleWith[Throwable](e => warnCause"Order execution failed fatally" (e) as none[CFMMOrder]) - .local(_ => TraceId.fromString(rec.message.id.value)) - .tupleLeft(rec) + .local(_ => TraceId.fromString(order.id.value)) + .tupleLeft(order) } - .flatTap { - case (_, None) => unit[F] + .evalMap { + case (_, None) => unit[G] case (_, Some(order)) => - eval(now.millis) >>= { + now.millis >>= { case ts if ts - order.timestamp < conf.orderLifetime.toMillis => - eval(warn"Failed to execute $order. Going to retry.") >> - orders.retry((order.id -> order).pure[F]) + warn"Failed to execute $order. Going to retry." >> backlog.put(order) case _ => - eval(warn"Failed to execute $order. Order expired.") + warn"Failed to execute $order. Order expired." } } - .evalMap { case (rec, _) => rec.commit } } } diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Registerer.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Registerer.scala index bd73b3c3..8ab0d734 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Registerer.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Registerer.scala @@ -1,10 +1,10 @@ package org.ergoplatform.dex.executor.amm.processes -import cats.{Functor, Id, Monad} +import cats.{Functor, Monad} import derevo.derive import org.ergoplatform.common.TraceId import org.ergoplatform.dex.executor.amm.modules.CFMMBacklog -import org.ergoplatform.dex.executor.amm.streaming.CFMMConsumer +import org.ergoplatform.dex.executor.amm.streaming.CFMMOrders import tofu.higherKind.derived.representableK import tofu.logging.{Logging, Logs} import tofu.streams.Evals @@ -25,7 +25,7 @@ object Registerer { F[_]: Monad: Evals[*[_], G], G[_]: Monad: TraceId.Local ](implicit - orders: CFMMConsumer[F, G, Id], + orders: CFMMOrders[F, G], backlog: CFMMBacklog[G], logs: Logs[I, G] ): I[Registerer[F]] = @@ -35,7 +35,7 @@ object Registerer { F[_]: Monad: Evals[*[_], G], G[_]: Monad: Logging: TraceId.Local ](implicit - orders: CFMMConsumer[F, G, Id], + orders: CFMMOrders[F, G], backlog: CFMMBacklog[G] ) extends Registerer[F] { diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/streaming.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/streaming.scala index d7b5cc8d..8fa3c154 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/streaming.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/streaming.scala @@ -1,14 +1,15 @@ package org.ergoplatform.dex.executor.amm +import cats.Id import fs2.kafka.types.KafkaOffset import org.ergoplatform.common.streaming._ -import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId} +import org.ergoplatform.dex.domain.amm.{CFMMOrder, EvaluatedCFMMOrder, OrderId} object streaming { - type CFMMConsumer[F[_], G[_], Status[_]] = Consumer.Aux[OrderId, Status[CFMMOrder], KafkaOffset, F, G] - type CFMMConsumerRetries[F[_], G[_]] = Consumer.Aux[OrderId, Delayed[CFMMOrder], KafkaOffset, F, G] - type CFMMProducerRetries[F[_]] = Producer[OrderId, Delayed[CFMMOrder], F] + type CFMMOrdersGen[F[_], G[_], Status[_]] = Consumer.Aux[OrderId, Status[CFMMOrder], KafkaOffset, F, G] + type CFMMOrders[F[_], G[_]] = CFMMOrdersGen[F, G, Id] + type EvaluatedCFMMOrders[F[_], G[_]] = Consumer.Aux[OrderId, EvaluatedCFMMOrder.Any, KafkaOffset, F, G] type CFMMCircuit[F[_], G[_]] = StreamingCircuit[OrderId, CFMMOrder, F, G] } diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/CFMMOrder.scala b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/CFMMOrder.scala index d85ef88c..c1c1bc7b 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/CFMMOrder.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/CFMMOrder.scala @@ -15,6 +15,18 @@ sealed trait CFMMOrder { def id: OrderId = OrderId.fromBoxId(box.boxId) } +object CFMMOrder { + + implicit val orderingByFee: Ordering[CFMMOrder] = Ordering[Long] + .on[CFMMOrder] { + case Deposit(_, _, _, params, _) => params.dexFee + case Redeem(_, _, _, params, _) => params.dexFee + case Swap(_, _, _, params, _) => + params.dexFeePerTokenNum * params.minOutput.value / params.dexFeePerTokenDenom + } + .reverse +} + @derive(encoder, decoder, loggable) final case class Deposit(poolId: PoolId, maxMinerFee: Long, timestamp: Long, params: DepositParams, box: Output) extends CFMMOrder From cfd37806a5d486ce69d191b4a01f3e089a1c795c Mon Sep 17 00:00:00 2001 From: oskin1 Date: Mon, 14 Feb 2022 19:56:05 +0300 Subject: [PATCH 3/9] Config updated. --- build.sbt | 2 +- modules/amm-executor/src/main/resources/application.conf | 9 +++------ .../dex/executor/amm/config/ConfigBundle.scala | 1 - 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/build.sbt b/build.sbt index e5405b13..44272f94 100644 --- a/build.sbt +++ b/build.sbt @@ -7,7 +7,7 @@ lazy val commonSettings = List( scalacOptions ++= commonScalacOptions, scalaVersion := "2.12.15", organization := "org.ergoplatform", - version := "1.1.0-M1", + version := "1.1.0-M2", resolvers ++= Seq( Resolver.sonatypeRepo("public"), Resolver.sonatypeRepo("snapshots"), diff --git a/modules/amm-executor/src/main/resources/application.conf b/modules/amm-executor/src/main/resources/application.conf index e4dbe859..62310189 100644 --- a/modules/amm-executor/src/main/resources/application.conf +++ b/modules/amm-executor/src/main/resources/application.conf @@ -18,12 +18,9 @@ consumers.unconfirmed-orders.group-id = "ergo" consumers.unconfirmed-orders.client-id = "ergo" consumers.unconfirmed-orders.topic-id = "dex.amm.cfmm.unconfirmed.orders" -consumers.orders-retry.group-id = "ergo" -consumers.orders-retry.client-id = "ergo-retry" -consumers.orders-retry.topic-id = "dex.amm.cfmm.orders.retry" - -producers.orders-retry.topic-id = "dex.amm.cfmm.orders.retry" -producers.orders-retry.parallelism = 3 +consumers.evaluated-orders.group-id = "ergo" +consumers.evaluated-orders.client-id = "ergo" +consumers.evaluated-orders.topic-id = "dex.cfmm.history.orders" kafka.bootstrap-servers = ["kafka1:9092"] diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ConfigBundle.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ConfigBundle.scala index 6a534573..087c2672 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ConfigBundle.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ConfigBundle.scala @@ -17,7 +17,6 @@ final case class ConfigBundle( @promote monetary: MonetaryConfig, @promote protocol: ProtocolConfig, consumers: Consumers, - producers: Producers, @promote kafka: KafkaConfig, @promote network: NetworkConfig, @promote resolver: ResolverConfig From 2b8d26474df52bd74f77f5cdf43d13a274823cc6 Mon Sep 17 00:00:00 2001 From: oskin1 Date: Mon, 14 Feb 2022 20:22:58 +0300 Subject: [PATCH 4/9] Config updated. --- modules/amm-executor/src/main/resources/application.conf | 2 -- .../org/ergoplatform/dex/executor/amm/config/ConfigBundle.scala | 1 - 2 files changed, 3 deletions(-) diff --git a/modules/amm-executor/src/main/resources/application.conf b/modules/amm-executor/src/main/resources/application.conf index 62310189..429d7e64 100644 --- a/modules/amm-executor/src/main/resources/application.conf +++ b/modules/amm-executor/src/main/resources/application.conf @@ -1,5 +1,3 @@ -rotation.retry-delay = 120s - exchange.reward-address = "9gCigPc9cZNRhKgbgdmTkVxo1ZKgw79G8DvLjCcYWAvEF3XRUKy" execution.order-lifetime = 300s diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ConfigBundle.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ConfigBundle.scala index 087c2672..7c187de6 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ConfigBundle.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ConfigBundle.scala @@ -11,7 +11,6 @@ import tofu.optics.macros.{promote, ClassyOptics} @derive(pureconfigReader, loggable) @ClassyOptics final case class ConfigBundle( - @promote rotation: RotationConfig, @promote exchange: ExchangeConfig, @promote execution: ExecutionConfig, @promote monetary: MonetaryConfig, From 96b896f85bd8033cff2106b2a725859d0111360b Mon Sep 17 00:00:00 2001 From: oskin1 Date: Mon, 14 Feb 2022 23:25:20 +0300 Subject: [PATCH 5/9] Batch offset commits. --- build.sbt | 2 +- .../org/ergoplatform/dex/executor/amm/App.scala | 2 -- .../dex/executor/amm/modules/CFMMBacklog.scala | 5 +++-- .../dex/executor/amm/processes/Cleaner.scala | 2 +- .../org/ergoplatform/common/streaming/syntax.scala | 14 ++++++++++++++ .../dex/index/processes/HistoryIndexing.scala | 14 +++++++------- .../dex/index/processes/LocksIndexing.scala | 5 +++-- .../dex/index/processes/PoolsIndexing.scala | 5 +++-- 8 files changed, 32 insertions(+), 17 deletions(-) diff --git a/build.sbt b/build.sbt index 44272f94..78e7a443 100644 --- a/build.sbt +++ b/build.sbt @@ -7,7 +7,7 @@ lazy val commonSettings = List( scalacOptions ++= commonScalacOptions, scalaVersion := "2.12.15", organization := "org.ergoplatform", - version := "1.1.0-M2", + version := "1.1.0-M3", resolvers ++= Seq( Resolver.sonatypeRepo("public"), Resolver.sonatypeRepo("snapshots"), diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala index 663b28b3..500d6e57 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala @@ -26,7 +26,6 @@ import sttp.client3.SttpBackend import sttp.client3.asynchttpclient.fs2.AsyncHttpClientFs2Backend import tofu.WithRun import tofu.fs2Instances._ -import tofu.lift.IsoK import tofu.syntax.unlift._ import zio.interop.catz._ import zio.{ExitCode, URIO, ZEnv} @@ -44,7 +43,6 @@ object App extends EnvApp[AppContext] { blocker <- Blocker[InitF] configs <- Resource.eval(ConfigBundle.load[InitF](configPathOpt, blocker)) ctx = AppContext.init(configs) - implicit0(isoKRun: IsoK[RunF, InitF]) = isoKRunByContext(ctx) implicit0(e: ErgoAddressEncoder) = ErgoAddressEncoder(configs.protocol.networkType.prefix) implicit0(confirmedOrders: CFMMOrdersGen[StreamF, RunF, Confirmed]) = makeConsumer[OrderId, Confirmed[CFMMOrder]](configs.consumers.confirmedOrders) diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala index c73a7bf5..b7d79933 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala @@ -22,7 +22,7 @@ trait CFMMBacklog[F[_]] { /** Put an order from the backlog. */ - def drop(id: OrderId): F[Unit] + def drop(id: OrderId): F[Boolean] } object CFMMBacklog { @@ -60,6 +60,7 @@ object CFMMBacklog { } yield res } - def drop(id: OrderId): F[Unit] = survivorsR.update(_ - id) + def drop(id: OrderId): F[Boolean] = + survivorsR.get.map(_.contains(id)).ifM(survivorsR.update(_ - id) as true, false.pure) } } diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Cleaner.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Cleaner.scala index be2a6c47..9e0eb6b2 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Cleaner.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Cleaner.scala @@ -40,7 +40,7 @@ object Cleaner { def run: F[Unit] = orders.stream - .evalTap(rec => debug"Order ${rec.message} is evaluated" >> backlog.drop(rec.message.order.id)) + .evalTap(rec => backlog.drop(rec.message.order.id).ifM(debug"Order ${rec.message} is evicted", unit[G])) .evalMap(_.commit) } } diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/common/streaming/syntax.scala b/modules/dex-core/src/main/scala/org/ergoplatform/common/streaming/syntax.scala index e1755256..de16c839 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/common/streaming/syntax.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/common/streaming/syntax.scala @@ -24,4 +24,18 @@ object syntax { commit.traverse_(_(batch.foldLeft(Chain.empty[O])(_ append _.offset))) } } + + implicit final class CommittableBatchOps[S[_], C[_], F[_], K, V, O](private val fa: S[C[Committable[K, V, O, F]]]) + extends AnyVal { + + def commitBatch(implicit + S: Evals[S, F], + F: Applicative[F], + C: Foldable[C] + ): S[Unit] = + fa.evalMap { batch => + val commit = batch.get(0).map(_.commitBatch) + commit.traverse_(_(batch.foldLeft(Chain.empty[O])(_ append _.offset))) + } + } } diff --git a/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/processes/HistoryIndexing.scala b/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/processes/HistoryIndexing.scala index d3815c83..67e704a8 100644 --- a/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/processes/HistoryIndexing.scala +++ b/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/processes/HistoryIndexing.scala @@ -10,6 +10,7 @@ import org.ergoplatform.dex.index.db.Extract.syntax.ExtractOps import org.ergoplatform.dex.index.db.models.{DBDeposit, DBRedeem, DBSwap} import org.ergoplatform.dex.index.repositories.RepoBundle import org.ergoplatform.dex.index.streaming.CFMMHistConsumer +import org.ergoplatform.common.streaming.syntax._ import tofu.doobie.transactor.Txr import tofu.logging.{Logging, Logs} import tofu.streams.{Chunks, Evals} @@ -52,11 +53,9 @@ object HistoryIndexing { ) extends HistoryIndexing[S] { def run: S[Unit] = - orders.stream.chunks - .map(_.map(r => r.message).toList) - .evalTap(xs => warn"[${xs.count(_.isEmpty)}] records discarded.") - .evalMap { rs => - val orders = rs.flatten + orders.stream.chunks.evalTap { rs => + val ordersIn = rs.map(_.message).toList + val orders = ordersIn.flatten val (swaps, others) = orders.partitionEither { case EvaluatedCFMMOrder(o: Swap, Some(ev: SwapEvaluation), p) => Left(EvaluatedCFMMOrder(o, Some(ev), p).extract[DBSwap]) @@ -80,7 +79,8 @@ object HistoryIndexing { ds <- insertNel(deposits)(repos.deposits.insert) rs <- insertNel(redeems)(repos.redeems.insert) } yield ss + ds + rs - txr.trans(insert) >>= (n => info"[$n] orders indexed") - } + txr.trans(insert) >>= + (n => warn"[${ordersIn.count(_.isEmpty)}] records discarded." >> info"[$n] orders indexed") + }.commitBatch } } diff --git a/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/processes/LocksIndexing.scala b/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/processes/LocksIndexing.scala index 5a0c3883..84e1b9b6 100644 --- a/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/processes/LocksIndexing.scala +++ b/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/processes/LocksIndexing.scala @@ -7,6 +7,7 @@ import org.ergoplatform.dex.index.db.Extract.syntax.ExtractOps import org.ergoplatform.dex.index.db.models.DBLiquidityLock import org.ergoplatform.dex.index.repositories.RepoBundle import org.ergoplatform.dex.index.streaming.LqLocksConsumer +import org.ergoplatform.common.streaming.syntax._ import tofu.doobie.transactor.Txr import tofu.logging.{Logging, Logs} import tofu.streams.{Chunks, Evals} @@ -49,7 +50,7 @@ object LocksIndexing { ) extends LocksIndexing[S] { def run: S[Unit] = - locks.stream.chunks.evalMap { rs => + locks.stream.chunks.evalTap { rs => val locks = rs.map(r => r.message.entity).toList def insertNel[A](xs: List[A])(insert: NonEmptyList[A] => D[Int]) = NonEmptyList.fromList(xs).fold(0.pure[D])(insert) @@ -58,6 +59,6 @@ object LocksIndexing { txr.trans(insert) >>= { ls => info"[$ls] locks indexed" } - } + }.commitBatch } } diff --git a/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/processes/PoolsIndexing.scala b/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/processes/PoolsIndexing.scala index 9512f008..db97a5b0 100644 --- a/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/processes/PoolsIndexing.scala +++ b/modules/markets-index/src/main/scala/org/ergoplatform/dex/index/processes/PoolsIndexing.scala @@ -12,6 +12,7 @@ import org.ergoplatform.dex.protocol.constants.ErgoAssetId import org.ergoplatform.ergo.TokenId import org.ergoplatform.ergo.services.explorer.ErgoExplorer import org.ergoplatform.ergo.services.explorer.models.TokenInfo.ErgoTokenInfo +import org.ergoplatform.common.streaming.syntax._ import tofu.doobie.transactor.Txr import tofu.logging.{Logging, Logs} import tofu.streams.{Chunks, Evals} @@ -56,7 +57,7 @@ object PoolsIndexing { ) extends PoolsIndexing[S] { def run: S[Unit] = - pools.stream.chunks.evalMap { rs => + pools.stream.chunks.evalTap { rs => val poolSnapshots = rs.map(r => r.message).toList val assets = poolSnapshots.flatMap(p => List(p.entity.lp.id, p.entity.x.id, p.entity.y.id)).distinct @@ -82,6 +83,6 @@ object PoolsIndexing { (pn, an) <- txr.trans(insert) _ <- info"[$pn] pool snapshots indexed" >> info"[$an] assets indexed" } yield () - } + }.commitBatch } } From 87384a75ba882a3692c36f8ddc52697ac270ed1e Mon Sep 17 00:00:00 2001 From: oskin1 Date: Tue, 15 Feb 2022 11:39:03 +0300 Subject: [PATCH 6/9] CFMMBacklog improved. --- build.sbt | 2 +- .../executor/amm/modules/CFMMBacklog.scala | 42 ++++++++++++++----- .../org/ergoplatform/common/EnvApp.scala | 1 + .../dex/domain/amm/CFMMOrder.scala | 12 +++--- 4 files changed, 39 insertions(+), 18 deletions(-) diff --git a/build.sbt b/build.sbt index 78e7a443..8a228706 100644 --- a/build.sbt +++ b/build.sbt @@ -7,7 +7,7 @@ lazy val commonSettings = List( scalacOptions ++= commonScalacOptions, scalaVersion := "2.12.15", organization := "org.ergoplatform", - version := "1.1.0-M3", + version := "1.1.0-M4", resolvers ++= Seq( Resolver.sonatypeRepo("public"), Resolver.sonatypeRepo("snapshots"), diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala index b7d79933..944744e3 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala @@ -1,10 +1,11 @@ package org.ergoplatform.dex.executor.amm.modules import cats.Monad -import cats.effect.Timer +import cats.effect.{Sync, Timer} import cats.effect.concurrent.Ref import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId} import tofu.concurrent.MakeRef +import tofu.generate.GenRandom import tofu.syntax.monadic._ import scala.collection.immutable.{HashSet, TreeSet} @@ -16,6 +17,8 @@ trait CFMMBacklog[F[_]] { */ def put(order: CFMMOrder): F[Unit] + def putLowPriority(order: CFMMOrder): F[Unit] + /** Pop a candidate order for execution. Blocks until an order is available. */ def pop: F[CFMMOrder] @@ -27,18 +30,23 @@ trait CFMMBacklog[F[_]] { object CFMMBacklog { - private val PollInterval = 1.second + private val PollInterval = 100.millis + private val PriorityTreshold = 19 + private val PrioritySpace = 99 - def make[I[_]: Monad, F[_]: Monad: Timer](implicit makeRef: MakeRef[I, F]): I[CFMMBacklog[F]] = + def make[I[_]: Sync, F[_]: Sync: Timer](implicit makeRef: MakeRef[I, F]): I[CFMMBacklog[F]] = for { - candidatesR <- makeRef.refOf(TreeSet.empty[CFMMOrder]) - survivorsR <- makeRef.refOf(HashSet.empty[OrderId]) - } yield new EphemeralCFMMBacklog(candidatesR, survivorsR) + implicit0(rnd: GenRandom[F]) <- GenRandom.instance[I, F]() + candidatesR <- makeRef.refOf(TreeSet.empty[CFMMOrder]) + lpCandidatesR <- makeRef.refOf(TreeSet.empty[CFMMOrder]) + survivorsR <- makeRef.refOf(HashSet.empty[OrderId]) + } yield new EphemeralCFMMBacklog(candidatesR, lpCandidatesR, survivorsR) // In-memory orders backlog. // Note: Not thread safe. - final class EphemeralCFMMBacklog[F[_]: Monad]( + final class EphemeralCFMMBacklog[F[_]: Monad: GenRandom]( candidatesR: Ref[F, TreeSet[CFMMOrder]], + lowPriorityCandidatesR: Ref[F, TreeSet[CFMMOrder]], survivorsR: Ref[F, HashSet[OrderId]] )(implicit T: Timer[F]) extends CFMMBacklog[F] { @@ -46,12 +54,24 @@ object CFMMBacklog { def put(order: CFMMOrder): F[Unit] = candidatesR.update(_ + order) >> survivorsR.update(_ + order.id) + def putLowPriority(order: CFMMOrder): F[Unit] = + lowPriorityCandidatesR.update(_ + order) >> survivorsR.update(_ + order.id) + def pop: F[CFMMOrder] = { def tryPop: F[CFMMOrder] = - candidatesR.get.map(_.headOption).flatMap { - case Some(order) => candidatesR.update(_ - order) as order - case None => T.sleep(PollInterval) >> tryPop - } + for { + rnd <- GenRandom.nextInt(PrioritySpace) + lpc <- lowPriorityCandidatesR.get.map(_.headOption) + maybeWinner <- lpc match { + case Some(c) if rnd <= PriorityTreshold => Left(c).pure + case _ => candidatesR.get.map(xs => Right(xs.headOption)) + } + winner <- maybeWinner match { + case Right(Some(order)) => candidatesR.update(_ - order) as order + case Left(order) => lowPriorityCandidatesR.update(_ - order) as order + case _ => T.sleep(PollInterval) >> tryPop + } + } yield winner for { c <- tryPop res <- survivorsR.get diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/common/EnvApp.scala b/modules/dex-core/src/main/scala/org/ergoplatform/common/EnvApp.scala index 9dc6302a..c901a575 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/common/EnvApp.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/common/EnvApp.scala @@ -3,6 +3,7 @@ package org.ergoplatform.common import cats.data.ReaderT import fs2.Stream import tofu.WithRun +import tofu.generate.GenRandom import tofu.lift.{IsoK, Unlift} import tofu.logging.{Loggable, LoggableContext, Logs} import zio.interop.catz._ diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/CFMMOrder.scala b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/CFMMOrder.scala index c1c1bc7b..2a8fe583 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/CFMMOrder.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/dex/domain/amm/CFMMOrder.scala @@ -17,14 +17,14 @@ sealed trait CFMMOrder { object CFMMOrder { - implicit val orderingByFee: Ordering[CFMMOrder] = Ordering[Long] + implicit val orderingByFee: Ordering[CFMMOrder] = Ordering[(Long, Long)] .on[CFMMOrder] { - case Deposit(_, _, _, params, _) => params.dexFee - case Redeem(_, _, _, params, _) => params.dexFee - case Swap(_, _, _, params, _) => - params.dexFeePerTokenNum * params.minOutput.value / params.dexFeePerTokenDenom + case Deposit(_, _, timestamp, params, _) => (-params.dexFee, timestamp) + case Redeem(_, _, timestamp, params, _) => (-params.dexFee, timestamp) + case Swap(_, _, timestamp, params, _) => + val minFee = params.dexFeePerTokenNum * params.minOutput.value / params.dexFeePerTokenDenom + (-minFee, timestamp) } - .reverse } @derive(encoder, decoder, loggable) From 28b8fc4bad2c934ab88611759a3a5a3f2e0dd9b7 Mon Sep 17 00:00:00 2001 From: oskin1 Date: Tue, 15 Feb 2022 11:39:28 +0300 Subject: [PATCH 7/9] Remove junk. --- .../dex-core/src/main/scala/org/ergoplatform/common/EnvApp.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/dex-core/src/main/scala/org/ergoplatform/common/EnvApp.scala b/modules/dex-core/src/main/scala/org/ergoplatform/common/EnvApp.scala index c901a575..9dc6302a 100644 --- a/modules/dex-core/src/main/scala/org/ergoplatform/common/EnvApp.scala +++ b/modules/dex-core/src/main/scala/org/ergoplatform/common/EnvApp.scala @@ -3,7 +3,6 @@ package org.ergoplatform.common import cats.data.ReaderT import fs2.Stream import tofu.WithRun -import tofu.generate.GenRandom import tofu.lift.{IsoK, Unlift} import tofu.logging.{Loggable, LoggableContext, Logs} import zio.interop.catz._ From 77a3ce63bfc299d89fe6caf4d5b69c12df40b2d7 Mon Sep 17 00:00:00 2001 From: oskin1 Date: Tue, 15 Feb 2022 15:12:32 +0300 Subject: [PATCH 8/9] Backlog parameters fixed. --- build.sbt | 2 +- .../dex/executor/amm/modules/CFMMBacklog.scala | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/build.sbt b/build.sbt index 8a228706..5d36cfa3 100644 --- a/build.sbt +++ b/build.sbt @@ -7,7 +7,7 @@ lazy val commonSettings = List( scalacOptions ++= commonScalacOptions, scalaVersion := "2.12.15", organization := "org.ergoplatform", - version := "1.1.0-M4", + version := "1.1.0-M5", resolvers ++= Seq( Resolver.sonatypeRepo("public"), Resolver.sonatypeRepo("snapshots"), diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala index 944744e3..4c33a57f 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala @@ -30,9 +30,9 @@ trait CFMMBacklog[F[_]] { object CFMMBacklog { - private val PollInterval = 100.millis - private val PriorityTreshold = 19 - private val PrioritySpace = 99 + private val PollInterval = 200.millis + private val PriorityThreshold = 9 + private val PrioritySpace = 99 def make[I[_]: Sync, F[_]: Sync: Timer](implicit makeRef: MakeRef[I, F]): I[CFMMBacklog[F]] = for { @@ -63,8 +63,8 @@ object CFMMBacklog { rnd <- GenRandom.nextInt(PrioritySpace) lpc <- lowPriorityCandidatesR.get.map(_.headOption) maybeWinner <- lpc match { - case Some(c) if rnd <= PriorityTreshold => Left(c).pure - case _ => candidatesR.get.map(xs => Right(xs.headOption)) + case Some(c) if rnd <= PriorityThreshold => Left(c).pure + case _ => candidatesR.get.map(xs => Right(xs.headOption)) } winner <- maybeWinner match { case Right(Some(order)) => candidatesR.update(_ - order) as order From 97371c7f7f550ef614d58b4726cfb0bbb67ad371 Mon Sep 17 00:00:00 2001 From: oskin1 Date: Tue, 15 Feb 2022 15:19:04 +0300 Subject: [PATCH 9/9] Backlog processes failure handling. --- build.sbt | 2 +- .../org/ergoplatform/dex/executor/amm/App.scala | 4 ++-- .../{Cleaner.scala => BacklogCleaner.scala} | 17 ++++++++++------- .../dex/executor/amm/processes/Registerer.scala | 7 +++++-- 4 files changed, 18 insertions(+), 12 deletions(-) rename modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/{Cleaner.scala => BacklogCleaner.scala} (69%) diff --git a/build.sbt b/build.sbt index 5d36cfa3..3f9f522a 100644 --- a/build.sbt +++ b/build.sbt @@ -7,7 +7,7 @@ lazy val commonSettings = List( scalacOptions ++= commonScalacOptions, scalaVersion := "2.12.15", organization := "org.ergoplatform", - version := "1.1.0-M5", + version := "1.1.0-M6", resolvers ++= Seq( Resolver.sonatypeRepo("public"), Resolver.sonatypeRepo("snapshots"), diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala index 500d6e57..170016ef 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala @@ -12,7 +12,7 @@ import org.ergoplatform.dex.executor.amm.config.ConfigBundle import org.ergoplatform.dex.executor.amm.context.AppContext import org.ergoplatform.dex.executor.amm.interpreters.{CFMMInterpreter, N2TCFMMInterpreter, T2TCFMMInterpreter} import org.ergoplatform.dex.executor.amm.modules.CFMMBacklog -import org.ergoplatform.dex.executor.amm.processes.{Cleaner, Executor, Registerer} +import org.ergoplatform.dex.executor.amm.processes.{BacklogCleaner, Executor, Registerer} import org.ergoplatform.dex.executor.amm.repositories.CFMMPools import org.ergoplatform.dex.executor.amm.services.Execution import org.ergoplatform.dex.executor.amm.streaming.{CFMMOrders, CFMMOrdersGen, EvaluatedCFMMOrders} @@ -64,7 +64,7 @@ object App extends EnvApp[AppContext] { implicit0(execution: Execution[RunF]) <- Resource.eval(Execution.make[InitF, RunF]) executor <- Resource.eval(Executor.make[InitF, StreamF, RunF]) registerer <- Resource.eval(Registerer.make[InitF, StreamF, RunF]) - cleaner <- Resource.eval(Cleaner.make[InitF, StreamF, RunF]) + cleaner <- Resource.eval(BacklogCleaner.make[InitF, StreamF, RunF]) } yield (executor, registerer, cleaner, ctx) private def makeBackend( diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Cleaner.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/BacklogCleaner.scala similarity index 69% rename from modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Cleaner.scala rename to modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/BacklogCleaner.scala index 9e0eb6b2..227dffb0 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Cleaner.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/BacklogCleaner.scala @@ -4,43 +4,46 @@ import cats.{Functor, Monad} import derevo.derive import org.ergoplatform.dex.executor.amm.modules.CFMMBacklog import org.ergoplatform.dex.executor.amm.streaming.EvaluatedCFMMOrders +import tofu.Catches import tofu.higherKind.derived.representableK import tofu.logging.{Logging, Logs} import tofu.streams.Evals import tofu.syntax.logging._ import tofu.syntax.monadic._ +import tofu.syntax.handle._ import tofu.syntax.streams.all._ @derive(representableK) -trait Cleaner[F[_]] { +trait BacklogCleaner[F[_]] { def run: F[Unit] } -object Cleaner { +object BacklogCleaner { def make[ I[_]: Functor, - F[_]: Monad: Evals[*[_], G], + F[_]: Monad: Evals[*[_], G]: Catches, G[_]: Monad ](implicit orders: EvaluatedCFMMOrders[F, G], backlog: CFMMBacklog[G], logs: Logs[I, G] - ): I[Cleaner[F]] = - logs.forService[Cleaner[F]].map(implicit l => new Live[F, G]) + ): I[BacklogCleaner[F]] = + logs.forService[BacklogCleaner[F]].map(implicit l => new Live[F, G]) final private class Live[ - F[_]: Monad: Evals[*[_], G], + F[_]: Monad: Evals[*[_], G]: Catches, G[_]: Monad: Logging ](implicit orders: EvaluatedCFMMOrders[F, G], backlog: CFMMBacklog[G] - ) extends Cleaner[F] { + ) extends BacklogCleaner[F] { def run: F[Unit] = orders.stream .evalTap(rec => backlog.drop(rec.message.order.id).ifM(debug"Order ${rec.message} is evicted", unit[G])) .evalMap(_.commit) + .handleWith[Throwable](e => eval(warnCause"BacklogCleaner failed. Restarting .." (e)) >> run) } } diff --git a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Registerer.scala b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Registerer.scala index 8ab0d734..11ec3a98 100644 --- a/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Registerer.scala +++ b/modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/processes/Registerer.scala @@ -5,11 +5,13 @@ import derevo.derive import org.ergoplatform.common.TraceId import org.ergoplatform.dex.executor.amm.modules.CFMMBacklog import org.ergoplatform.dex.executor.amm.streaming.CFMMOrders +import tofu.Catches import tofu.higherKind.derived.representableK import tofu.logging.{Logging, Logs} import tofu.streams.Evals import tofu.syntax.logging._ import tofu.syntax.monadic._ +import tofu.syntax.handle._ import tofu.syntax.streams.all._ @derive(representableK) @@ -22,7 +24,7 @@ object Registerer { def make[ I[_]: Functor, - F[_]: Monad: Evals[*[_], G], + F[_]: Monad: Evals[*[_], G]: Catches, G[_]: Monad: TraceId.Local ](implicit orders: CFMMOrders[F, G], @@ -32,7 +34,7 @@ object Registerer { logs.forService[Registerer[F]].map(implicit l => new Live[F, G]) final private class Live[ - F[_]: Monad: Evals[*[_], G], + F[_]: Monad: Evals[*[_], G]: Catches, G[_]: Monad: Logging: TraceId.Local ](implicit orders: CFMMOrders[F, G], @@ -43,5 +45,6 @@ object Registerer { orders.stream .evalTap(rec => debug"Registered ${rec.message}" >> backlog.put(rec.message)) .evalMap(_.commit) + .handleWith[Throwable](e => eval(warnCause"Registerer failed. Restarting .." (e)) >> run) } }