From e95f560b826ec4d6cd606c8220779d30ac1e4c05 Mon Sep 17 00:00:00 2001 From: amannocci Date: Wed, 19 Aug 2020 11:41:58 +0200 Subject: [PATCH] [Improved] Better kafka at least once test Signed-off-by: amannocci --- .../streamy/graphite/util/parser/GraphiteParser.scala | 6 ------ .../streamy/kafka/component/KafkaSourceSpec.scala | 9 +++++++-- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/plugin-graphite/src/main/scala/io/techcode/streamy/graphite/util/parser/GraphiteParser.scala b/plugin-graphite/src/main/scala/io/techcode/streamy/graphite/util/parser/GraphiteParser.scala index 0d9e2604..0ced00f2 100644 --- a/plugin-graphite/src/main/scala/io/techcode/streamy/graphite/util/parser/GraphiteParser.scala +++ b/plugin-graphite/src/main/scala/io/techcode/streamy/graphite/util/parser/GraphiteParser.scala @@ -101,12 +101,6 @@ private class GraphiteParser(config: GraphiteTransformer.Config) extends ByteStr true } - @inline private def parseUntilDelimiter(field: Option[String]): Boolean = - capture(oneOrMore(GraphiteParser.DelimiterMatcher)) { value => - field.foreach(bind => builder += bind -> JsString.fromByteStringUnsafe(value)) - true - } - override def cleanup(): Unit = { super.cleanup() builder = Json.objectBuilder() diff --git a/plugin-kafka/src/test/scala/io/techcode/streamy/kafka/component/KafkaSourceSpec.scala b/plugin-kafka/src/test/scala/io/techcode/streamy/kafka/component/KafkaSourceSpec.scala index 1149565d..0015301b 100644 --- a/plugin-kafka/src/test/scala/io/techcode/streamy/kafka/component/KafkaSourceSpec.scala +++ b/plugin-kafka/src/test/scala/io/techcode/streamy/kafka/component/KafkaSourceSpec.scala @@ -23,10 +23,12 @@ */ package io.techcode.streamy.kafka.component -import akka.kafka.ConsumerMessage.CommittableOffset +import akka.Done import akka.stream.scaladsl.Flow import io.techcode.streamy.event.StreamEvent import io.techcode.streamy.kafka.util.KafkaSpec +import scala.concurrent.duration._ +import scala.language.postfixOps /** * Kafka source spec. @@ -39,12 +41,15 @@ class KafkaSourceSpec extends KafkaSpec { val groupId = createGroupId() awaitProduce(produceString(topic, Seq("foobar"))) - KafkaSource.atLeastOnce(KafkaSource.Config( + val control = KafkaSource.atLeastOnce(KafkaSource.Config( handler = Flow[StreamEvent], bootstrapServers = bootstrapServers, groupId = groupId, topics = KafkaSource.StaticTopicConfig(Set(topic)) )) + whenReady(control.drainAndShutdown(), timeout(60 seconds), interval(100 millis)) { x => + x should equal(Done) + } } }