diff --git a/journal/src/main/scala/com/evolutiongaming/kafka/journal/HeadCache.scala b/journal/src/main/scala/com/evolutiongaming/kafka/journal/HeadCache.scala index f8588dae8..15132eed7 100644 --- a/journal/src/main/scala/com/evolutiongaming/kafka/journal/HeadCache.scala +++ b/journal/src/main/scala/com/evolutiongaming/kafka/journal/HeadCache.scala @@ -2,19 +2,20 @@ package com.evolutiongaming.kafka.journal import cats._ import cats.effect._ -import cats.effect.syntax.all._ import cats.effect.kernel.Async +import cats.effect.syntax.all._ import cats.syntax.all._ +import com.evolution.scache.{Cache, ExpiringCache} import com.evolutiongaming.catshelper._ import com.evolutiongaming.kafka.journal.PartitionCache.Result import com.evolutiongaming.kafka.journal.conversions.ConsRecordToActionHeader import com.evolutiongaming.kafka.journal.eventual.{EventualJournal, TopicPointers} -import com.evolution.scache.{Cache, ExpiringCache} -import com.evolutiongaming.skafka.consumer.ConsumerConfig +import com.evolutiongaming.skafka.consumer.{AutoOffsetReset, ConsumerConfig} import com.evolutiongaming.skafka.{Offset, Partition, Topic} import com.evolutiongaming.smetrics.MetricsHelper._ import com.evolutiongaming.smetrics._ +import java.nio.ByteBuffer import scala.concurrent.duration._ /** Metainfo of events written to Kafka, but not yet replicated to Cassandra. @@ -107,13 +108,38 @@ object HeadCache { eventualJournal: EventualJournal[F], metrics: Option[HeadCacheMetrics[F]] ): Resource[F, HeadCache[F]] = { + + import com.evolutiongaming.skafka.FromBytes + + implicit val partitionFromBytes: FromBytes[F, Partition] = (bytes, _) => + for { + int <- Sync[F].delay { ByteBuffer.wrap(bytes).getInt() } + partition <- Partition.of(int) + } yield partition + + implicit val offsetFromBytes: FromBytes[F, Offset] = (bytes, _) => + for { + long <- Sync[F].delay { ByteBuffer.wrap(bytes).getLong() } + partition <- Offset.of(long) + } yield partition + for { log <- LogOf[F].apply(HeadCache.getClass).toResource result <- HeadCache.of( Eventual(eventualJournal), log, TopicCache.Consumer.of[F](consumerConfig), - metrics) + metrics, + pointer = KafkaConsumerOf[F].apply[Partition, Offset]( + consumerConfig.copy( + groupId = none, + autoCommit = true, + autoCommitInterval = 5.seconds.some, + autoOffsetReset = AutoOffsetReset.Latest, + fetchMinBytes = 1, + ) + ) + ) result <- result.withFence } yield { result.withLog(log) @@ -132,7 +158,7 @@ object HeadCache { * debug logging will be affected by this. One needs to call * [[HeadCache#withLog]] if debug logging for [[HeadCache]] is required. * @param consumer - * Kakfa data source factory. The reason why it is factory (i.e. + * Kafka data source factory. The reason why it is factory (i.e. * `Resource`) is that [[HeadCache]] will try to recreate consumer in case * of the failure. * @param metrics @@ -153,7 +179,8 @@ object HeadCache { log: Log[F], consumer: Resource[F, TopicCache.Consumer[F]], metrics: Option[HeadCacheMetrics[F]], - config: HeadCacheConfig = HeadCacheConfig.default + config: HeadCacheConfig = HeadCacheConfig.default, + pointer: Resource[F, KafkaConsumer[F, Partition, Offset]] ): Resource[F, HeadCache[F]] = { val consRecordToActionHeader = ConsRecordToActionHeader[F] @@ -179,7 +206,9 @@ object HeadCache { consumer, config, consRecordToActionHeader, - metrics.map { _.headCache }) + metrics.map { _.headCache }, + pointer + ) .map { cache => metrics .fold(cache) { metrics => cache.withMetrics(topic, metrics.headCache) } diff --git a/journal/src/main/scala/com/evolutiongaming/kafka/journal/TopicCache.scala b/journal/src/main/scala/com/evolutiongaming/kafka/journal/TopicCache.scala index 9fee85df9..38f1b7c25 100644 --- a/journal/src/main/scala/com/evolutiongaming/kafka/journal/TopicCache.scala +++ b/journal/src/main/scala/com/evolutiongaming/kafka/journal/TopicCache.scala @@ -1,21 +1,21 @@ package com.evolutiongaming.kafka.journal import cats._ -import cats.data.{NonEmptyMap => Nem, NonEmptySet => Nes} +import cats.data.{NonEmptyList => Nel, NonEmptyMap => Nem, NonEmptySet => Nes} import cats.effect._ import cats.effect.syntax.all._ import cats.syntax.all._ -import com.evolutiongaming.catshelper._ +import com.evolution.scache.Cache import com.evolutiongaming.catshelper.ParallelHelper._ +import com.evolutiongaming.catshelper._ +import com.evolutiongaming.kafka.journal.HeadCache.Eventual import com.evolutiongaming.kafka.journal.conversions.ConsRecordToActionHeader import com.evolutiongaming.kafka.journal.util.SkafkaHelper._ -import com.evolutiongaming.kafka.journal.HeadCache.Eventual import com.evolutiongaming.random.Random import com.evolutiongaming.retry.Retry.implicits._ import com.evolutiongaming.retry.{Sleep, Strategy} import com.evolutiongaming.skafka.consumer.{AutoOffsetReset, ConsumerConfig, ConsumerRecords} import com.evolutiongaming.skafka.{Offset, Partition, Topic, TopicPartition} -import com.evolution.scache.Cache import scala.concurrent.duration._ @@ -84,9 +84,9 @@ object TopicCache { consumer: Resource[F, Consumer[F]], config: HeadCacheConfig, consRecordToActionHeader: ConsRecordToActionHeader[F], - metrics: Option[HeadCache.Metrics[F]] + metrics: Option[HeadCache.Metrics[F]], + pointer: Resource[F, KafkaConsumer[F, Partition, Offset]], ): Resource[F, TopicCache[F]] = { - for { consumer <- consumer .map { _.withLog(log) } @@ -125,6 +125,50 @@ object TopicCache { } } _ <- remove.toResource + + pointer <- pointer + subscribePointer = for { + partitions <- pointer.partitions(pointerTopic) + partitions <- Nel + .fromList(partitions.toList.map { partition => TopicPartition(pointerTopic, partition) }) + .map(_.toNes) + .liftTo[F](new RuntimeException(s"no partitions in topic $pointerTopic")) + _ <- pointer.assign(partitions) + } yield {} + _ <- subscribePointer.toResource.whenA(topic == rbowTopic) + pollPointers = pointer + .poll(1.second) + .flatMap { records => + val offsets = for { + case (_, records) <- records.values.toList + record <- records.toList + partition <- record.key + offset <- record.value + } yield partition.value -> offset.value + offsets + .groupBy { case (partition, _) => partition } + .map { case (partition, offsets) => partition -> offsets.map(_._2).maxBy(_.value) } + .toList + .foldMapM { case (partition, offset) => + for { + cache <- partitionCacheOf(partition) + result <- cache + .remove(offset) + .map { diff => + diff.foldMap { a => Sample(a.value) } + } + } yield result + } + } + .flatMap { sample => + sample + .avg + .foldMapM { diff => + metrics.foldMapM { _.storage(topic, diff) } + } + .as(sample.count) + } + pointers = { cache .values1 @@ -225,6 +269,16 @@ object TopicCache { .exponential(10.millis) .cap(3.seconds) .jitter(random) + _ <- pollPointers + .flatMap { + case 0 => Sleep[F].sleep(config.removeInterval) + case _ => Applicative[F].unit + } + .retry(strategy) // probably not needed here + .handleErrorWith { a => log.error(s"remove poll failed, error: $a", a) } + .foreverM[Unit] + .background + .whenA(topic == rbowTopic) _ <- Sleep[F] .sleep(config.removeInterval) .productR { remove } @@ -232,6 +286,7 @@ object TopicCache { .handleErrorWith { a => log.error(s"remove failed, error: $a", a) } .foreverM[Unit] .background + .whenA(topic != rbowTopic) _ <- metrics.foldMapM { metrics => val result = for { _ <- Temporal[F].sleep(1.minute) diff --git a/journal/src/main/scala/com/evolutiongaming/kafka/journal/package.scala b/journal/src/main/scala/com/evolutiongaming/kafka/journal/package.scala index ad503716a..a509f7111 100644 --- a/journal/src/main/scala/com/evolutiongaming/kafka/journal/package.scala +++ b/journal/src/main/scala/com/evolutiongaming/kafka/journal/package.scala @@ -16,4 +16,7 @@ package object journal { type ConsRecord = ConsumerRecord[String, ByteVector] type ConsRecords = ConsumerRecords[String, ByteVector] + + val rbowTopic = "rbow-journal-4.PlayerBetting" + val pointerTopic = "kafka-journal-offset-temp" } diff --git a/journal/src/test/scala/com/evolutiongaming/kafka/journal/HeadCacheSpec.scala b/journal/src/test/scala/com/evolutiongaming/kafka/journal/HeadCacheSpec.scala index cc3c23d8e..ec227647f 100644 --- a/journal/src/test/scala/com/evolutiongaming/kafka/journal/HeadCacheSpec.scala +++ b/journal/src/test/scala/com/evolutiongaming/kafka/journal/HeadCacheSpec.scala @@ -293,7 +293,9 @@ object HeadCacheSpec { config = config, eventual = eventual, consumer = consumer, - metrics = metrics.some) + metrics = metrics.some, + pointer = ??? + ) } yield headCache } diff --git a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Replicator.scala b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Replicator.scala index 0bcf95e71..873e0e11d 100644 --- a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Replicator.scala +++ b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Replicator.scala @@ -22,6 +22,7 @@ import com.evolutiongaming.skafka.consumer.{ConsumerConfig, ConsumerMetrics} import com.evolutiongaming.skafka.{ClientId, Topic, Bytes => _} import com.evolutiongaming.smetrics.CollectorRegistry import com.evolution.scache.CacheMetrics +import com.evolutiongaming.skafka.producer.{Acks, ProducerConfig} import scodec.bits.ByteVector import scala.concurrent.duration._ @@ -59,7 +60,7 @@ object Replicator { def of[ F[_] - : Temporal : Parallel + : Async : Parallel : Runtime : FromTry : ToTry : Fail : LogOf : KafkaConsumerOf : MeasureDuration : JsonCodec @@ -72,11 +73,27 @@ object Replicator { val topicReplicator: Topic => Resource[F, F[Outcome[F, Throwable, Unit]]] = (topic: Topic) => { + val pointerConfig = topic match { + case theTopic if theTopic == rbowTopic => + TopicReplicator.ConsumerOf.PointerConfig( + topic = pointerTopic, + config = ProducerConfig( + common = config.kafka.consumer.common, + batchSize = 0, + acks = Acks.One, + retries = 0, + ) + ).some + case _ => none + } + val consumer = TopicReplicator.ConsumerOf.of[F]( topic, config.kafka.consumer, config.pollTimeout, - hostName) + hostName, + pointerConfig + ) val metrics1 = metrics .flatMap { _.replicator } diff --git a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/TopicCommit.scala b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/TopicCommit.scala index b7541f0e2..2aa643c2f 100644 --- a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/TopicCommit.scala +++ b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/TopicCommit.scala @@ -4,14 +4,17 @@ import java.time.Instant import cats.Applicative import cats.data.{NonEmptyMap => Nem} import cats.effect.kernel.Concurrent -import cats.effect.{Clock, Ref} +import cats.effect.{Clock, Ref, Sync} import cats.syntax.all._ import com.evolutiongaming.catshelper.ClockHelper._ import com.evolutiongaming.catshelper.DataHelper._ import com.evolutiongaming.kafka.journal.util.TemporalHelper._ -import com.evolutiongaming.kafka.journal.KafkaConsumer +import com.evolutiongaming.kafka.journal.{KafkaConsumer, KafkaProducer} import com.evolutiongaming.skafka._ +import com.evolutiongaming.skafka.producer.ProducerRecord +import com.evolutiongaming.skafka.ToBytes +import java.nio.ByteBuffer import scala.collection.immutable.SortedMap import scala.concurrent.duration._ @@ -77,4 +80,39 @@ object TopicCommit { } } } -} \ No newline at end of file + + def pointer[F[_]: Sync]( + topic: Topic, + producer: KafkaProducer[F], + commit: TopicCommit[F] + ): TopicCommit[F] = + new TopicCommit[F] { + + implicit val partitionToBytes: ToBytes[F, Partition] = + (partition: Partition, _) => Sync[F].delay { ByteBuffer.allocate(4).putInt(partition.value).array() } + + implicit val offsetToBytes: ToBytes[F, Offset] = + (offset: Offset, _) => Sync[F].delay { ByteBuffer.allocate(8).putLong(offset.value).array() } + + override def apply(offsets: Nem[Partition, Offset]): F[Unit] = { + + val commitPointers = offsets.toNel.toList.traverse { + case (partition, offset) => + val record = new ProducerRecord[Partition, Offset]( + topic = topic, + partition = partition.some, // manually setting partition for imitating journal topic + key = partition.some, + value = offset.some, + ) + producer.send(record) + } + + val commitOffsets = commit(offsets) + + for { + _ <- commitPointers + _ <- commitOffsets + } yield {} + } + } +} diff --git a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/TopicReplicator.scala b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/TopicReplicator.scala index 220cad5a1..b67ccda0d 100644 --- a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/TopicReplicator.scala +++ b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/TopicReplicator.scala @@ -15,6 +15,7 @@ import com.evolutiongaming.kafka.journal.util.SkafkaHelper._ import com.evolutiongaming.retry.Sleep import com.evolutiongaming.skafka.{Metadata, Offset, Partition, Topic} import com.evolutiongaming.skafka.consumer.{AutoOffsetReset, ConsumerConfig} +import com.evolutiongaming.skafka.producer.ProducerConfig import scodec.bits.ByteVector import java.time.Instant @@ -231,11 +232,17 @@ object TopicReplicator { object ConsumerOf { - def of[F[_]: Concurrent : KafkaConsumerOf : FromTry : Clock]( + final case class PointerConfig( + topic: Topic, + config: ProducerConfig + ) + + def of[F[_]: Async : KafkaConsumerOf : FromTry : ToTry : Clock]( topic: Topic, config: ConsumerConfig, pollTimeout: FiniteDuration, - hostName: Option[HostName] + hostName: Option[HostName], + pointerConfig: Option[PointerConfig] = None, ): Resource[F, TopicConsumer[F]] = { val groupId = { @@ -261,6 +268,13 @@ object TopicReplicator { metadata = hostName.fold { Metadata.empty } { _.value } commit = TopicCommit(topic, metadata, consumer) commit <- TopicCommit.delayed(5.seconds, commit).toResource + commit <- pointerConfig match { + case None => commit.pure[Resource[F, *]] + case Some(PointerConfig(topic, config)) => + for { + producer <- KafkaProducerOf[F](none).apply(config) + } yield TopicCommit.pointer[F](topic, producer, commit) + } } yield { TopicConsumer(topic, pollTimeout, commit, consumer) }