Skip to content

Commit

Permalink
Rely on ActorLogging for logging inside bulkinserteractor
Browse files Browse the repository at this point in the history
  • Loading branch information
thyandrecardoso committed Oct 5, 2023
1 parent c38234c commit 6005faa
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 63 deletions.
4 changes: 2 additions & 2 deletions elasticsearch/build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import Dependencies._

libraryDependencies ++= Seq(
ScalaLogging,
AkkaActor % Provided,
ApacheHttpAsyncClient,
ApacheHttpClient,
Expand All @@ -10,12 +9,13 @@ libraryDependencies ++= Seq(
Elastic4sClientEsJava,
Elastic4sCore,
ElasticsearchRestClient,
ScalaLogging,
AkkaActorTestkitTyped % Test,
AkkaHttpTestkit % Test,
AkkaTestkitSpecs2Classic % Test,
Elastic4sTestkit % Test,
ElasticsearchClusterRunner % Test,
Log4JCore % Test,
Log4JSlf4j % Test,
Specs2Core % Test,
Specs2ScalaCheck % Test
)
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.requests.bulk.{BulkResponse, BulkResponseItem}
import com.sksamuel.elastic4s.requests.indexes.IndexRequest
import com.sksamuel.elastic4s.{ElasticClient, Indexable}
import com.typesafe.scalalogging.LazyLogging
import io.circe.Json

/** An actor responsible for inserting tracking events into Elasticsearch. This actor buffers requests until either the
Expand All @@ -22,7 +21,7 @@ class ElasticsearchBulkInserter(
logErrorsAsWarnings: Boolean,
timeoutOnStop: FiniteDuration = 3.seconds
) extends Actor
with LazyLogging {
with ActorLogging {
import ElasticsearchBulkInserter._

implicit private[this] val ec: ExecutionContext = context.system.dispatcher
Expand All @@ -47,10 +46,10 @@ class ElasticsearchBulkInserter(

private[this] def logErrorOrWarning(msg: => String, throwable: Option[Throwable] = None): Unit = {
(logErrorsAsWarnings, throwable) match {
case (true, Some(t)) => logger.warn(msg, t)
case (true, None) => logger.warn(msg)
case (false, Some(t)) => logger.error(msg, t)
case (false, None) => logger.error(msg)
case (true, Some(t)) => log.warning(s"$msg\n$t")
case (true, None) => log.warning(msg)
case (false, Some(t)) => log.error(s"$msg\n$t")
case (false, None) => log.error(msg)
}
}

Expand Down Expand Up @@ -83,7 +82,7 @@ class ElasticsearchBulkInserter(
Nil
} else {
tryCountMap(msg) = tryCount
logger.info(
log.info(
"Error inserting document in Elasticsearch: {}. Will retry {} more times",
item.error,
maxTryCount - tryCount
Expand Down Expand Up @@ -162,7 +161,7 @@ class ElasticsearchBulkInserter(
case msg: Message => buffer = msg :: buffer

case ElasticsearchUp =>
logger.info("Elasticsearch is up. Bulk inserter will start sending requests")
log.info("Elasticsearch is up. Bulk inserter will start sending requests")
becomeElasticsearchUp()

case ElasticsearchDown =>
Expand Down Expand Up @@ -209,7 +208,7 @@ class ElasticsearchBulkInserter(
checkElasticsearch().collect { case true => self ! ElasticsearchUp }

case ElasticsearchUp =>
logger.info("Elasticsearch is up. Bulk inserting started")
log.info("Elasticsearch is up. Bulk inserting started")
periodicCheck.cancel()
becomeElasticsearchUp()
self ! Flush
Expand All @@ -218,7 +217,7 @@ class ElasticsearchBulkInserter(
override def postStop() = {
super.postStop()

logger.info("Stopping Bulk Inserter...")
log.info("Stopping Bulk Inserter...")
val stop = if (buffer.nonEmpty) flush().andThen { case _ => client.close() }
else Future(client.close())

Expand Down
106 changes: 55 additions & 51 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ object Dependencies {
val Elastic4s = "7.16.3"
val Elasticsearch = "7.16.3"
val FastMd5 = "2.7.1"
val JodaTime = "2.12.5"
val JUnit = "4.13.2"
val JodaTime = "2.12.5"
val Log4J = "2.20.0"
val NscalaTime = "2.32.0"
val ScalaCheck = "1.17.0"
val ScalaLogging = "3.9.5"
Expand All @@ -30,54 +31,57 @@ object Dependencies {
val UnirestJava = "1.4.9"
}

val AkkaActor = "com.typesafe.akka" %% "akka-actor" % Versions.Akka
val AkkaActorTyped = "com.typesafe.akka" %% "akka-actor-typed" % Versions.Akka
val AkkaActorTestkitTyped = "com.typesafe.akka" %% "akka-actor-testkit-typed" % Versions.Akka
val AkkaHttp = "com.typesafe.akka" %% "akka-http" % Versions.AkkaHttp
val AkkaHttpCore = "com.typesafe.akka" %% "akka-http-core" % Versions.AkkaHttp
val AkkaHttpTestkit = "com.typesafe.akka" %% "akka-http-testkit" % Versions.AkkaHttp
val AkkaStream = "com.typesafe.akka" %% "akka-stream" % Versions.Akka
val AkkaStreamTestkit = "com.typesafe.akka" %% "akka-stream-testkit" % Versions.Akka
val AkkaTestkit = "com.typesafe.akka" %% "akka-testkit" % Versions.Akka
val AkkaTestkitSpecs2Classic = "net.ruippeixotog" %% "akka-testkit-specs2-classic" % Versions.AkkaTestkitSpecs2
val ApacheHttpAsyncClient = "org.apache.httpcomponents" % "httpasyncclient" % "4.1.5"
val ApacheHttpClient = "org.apache.httpcomponents" % "httpclient" % "4.5.14"
val ApacheHttpCore = "org.apache.httpcomponents" % "httpcore" % "4.4.16"
val AwsJavaSdkCore = "com.amazonaws" % "aws-java-sdk-core" % Versions.Aws
val AwsJavaSdkS3 = "com.amazonaws" % "aws-java-sdk-s3" % Versions.Aws
val BouncyCastlePkix = "org.bouncycastle" % "bcpkix-jdk18on" % Versions.BouncyCastle
val BouncyCastleProvider = "org.bouncycastle" % "bcprov-jdk18on" % Versions.BouncyCastle
val CatsCore = "org.typelevel" %% "cats-core" % Versions.Cats
val CirceCore = "io.circe" %% "circe-core" % Versions.Circe
val CirceGeneric = "io.circe" %% "circe-generic" % Versions.Circe
val CirceLiteral = "io.circe" %% "circe-literal" % Versions.Circe
val CirceParser = "io.circe" %% "circe-parser" % Versions.Circe
val CommonsCodec = "commons-codec" % "commons-codec" % Versions.CommonsCodec
val ConcurrentLinkedHashMapLru =
"com.googlecode.concurrentlinkedhashmap" % "concurrentlinkedhashmap-lru" % Versions.ConcurrentLinkedHashMap
val Elastic4sClientEsJava = "com.sksamuel.elastic4s" %% "elastic4s-client-esjava" % Versions.Elastic4s
val Elastic4sCore = "com.sksamuel.elastic4s" %% "elastic4s-core" % Versions.Elastic4s
val Elastic4sTestkit = "com.sksamuel.elastic4s" %% "elastic4s-testkit" % Versions.Elastic4s
val ElasticsearchClusterRunner = "org.codelibs" % "elasticsearch-cluster-runner" % "7.16.3.0"
val ElasticsearchRestClient = "org.elasticsearch.client" % "elasticsearch-rest-client" % Versions.Elasticsearch
val FastMd5 = "com.joyent.util" % "fast-md5" % Versions.FastMd5
val JodaTime = "joda-time" % "joda-time" % Versions.JodaTime
val JUnit = "junit" % "junit" % Versions.JUnit
val NscalaTime = "com.github.nscala-time" %% "nscala-time" % Versions.NscalaTime
val ScalaCheck = "org.scalacheck" %% "scalacheck" % Versions.ScalaCheck
val ScalaCollectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0"
val ScalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % Versions.ScalaLogging
val ScalaPool = "io.github.andrebeat" %% "scala-pool" % Versions.ScalaPool
val ScalaTestCore = "org.scalatest" %% "scalatest-core" % Versions.ScalaTest
val Shapeless = "com.chuusai" %% "shapeless" % Versions.Shapeless
val SimpleJmx = "com.j256.simplejmx" % "simplejmx" % Versions.SimpleJmx
val Specs2Common = "org.specs2" %% "specs2-common" % Versions.Specs2
val Specs2Core = "org.specs2" %% "specs2-core" % Versions.Specs2
val Specs2JUnit = "org.specs2" %% "specs2-junit" % Versions.Specs2
val Specs2Matcher = "org.specs2" %% "specs2-matcher" % Versions.Specs2
val Specs2ScalaCheck = "org.specs2" %% "specs2-scalacheck" % Versions.Specs2
val Squants = "org.typelevel" %% "squants" % Versions.Squants
val SshJ = "com.hierynomus" % "sshj" % Versions.SshJ
val TypesafeConfig = "com.typesafe" % "config" % Versions.TypesafeConfig
val UnirestJava = "com.mashape.unirest" % "unirest-java" % Versions.UnirestJava
// scalafmt: { maxColumn = 200 }
val AkkaActor = "com.typesafe.akka" %% "akka-actor" % Versions.Akka
val AkkaActorTestkitTyped = "com.typesafe.akka" %% "akka-actor-testkit-typed" % Versions.Akka
val AkkaActorTyped = "com.typesafe.akka" %% "akka-actor-typed" % Versions.Akka
val AkkaHttp = "com.typesafe.akka" %% "akka-http" % Versions.AkkaHttp
val AkkaHttpCore = "com.typesafe.akka" %% "akka-http-core" % Versions.AkkaHttp
val AkkaHttpTestkit = "com.typesafe.akka" %% "akka-http-testkit" % Versions.AkkaHttp
val AkkaStream = "com.typesafe.akka" %% "akka-stream" % Versions.Akka
val AkkaStreamTestkit = "com.typesafe.akka" %% "akka-stream-testkit" % Versions.Akka
val AkkaTestkit = "com.typesafe.akka" %% "akka-testkit" % Versions.Akka
val AkkaTestkitSpecs2Classic = "net.ruippeixotog" %% "akka-testkit-specs2-classic" % Versions.AkkaTestkitSpecs2
val ApacheHttpAsyncClient = "org.apache.httpcomponents" % "httpasyncclient" % "4.1.5"
val ApacheHttpClient = "org.apache.httpcomponents" % "httpclient" % "4.5.14"
val ApacheHttpCore = "org.apache.httpcomponents" % "httpcore" % "4.4.16"
val AwsJavaSdkCore = "com.amazonaws" % "aws-java-sdk-core" % Versions.Aws
val AwsJavaSdkS3 = "com.amazonaws" % "aws-java-sdk-s3" % Versions.Aws
val BouncyCastlePkix = "org.bouncycastle" % "bcpkix-jdk18on" % Versions.BouncyCastle
val BouncyCastleProvider = "org.bouncycastle" % "bcprov-jdk18on" % Versions.BouncyCastle
val CatsCore = "org.typelevel" %% "cats-core" % Versions.Cats
val CirceCore = "io.circe" %% "circe-core" % Versions.Circe
val CirceGeneric = "io.circe" %% "circe-generic" % Versions.Circe
val CirceLiteral = "io.circe" %% "circe-literal" % Versions.Circe
val CirceParser = "io.circe" %% "circe-parser" % Versions.Circe
val CommonsCodec = "commons-codec" % "commons-codec" % Versions.CommonsCodec
val ConcurrentLinkedHashMapLru = "com.googlecode.concurrentlinkedhashmap" % "concurrentlinkedhashmap-lru" % Versions.ConcurrentLinkedHashMap
val Elastic4sClientEsJava = "com.sksamuel.elastic4s" %% "elastic4s-client-esjava" % Versions.Elastic4s
val Elastic4sCore = "com.sksamuel.elastic4s" %% "elastic4s-core" % Versions.Elastic4s
val Elastic4sTestkit = "com.sksamuel.elastic4s" %% "elastic4s-testkit" % Versions.Elastic4s
val ElasticsearchClusterRunner = "org.codelibs" % "elasticsearch-cluster-runner" % "7.16.3.0"
val ElasticsearchRestClient = "org.elasticsearch.client" % "elasticsearch-rest-client" % Versions.Elasticsearch
val FastMd5 = "com.joyent.util" % "fast-md5" % Versions.FastMd5
val JUnit = "junit" % "junit" % Versions.JUnit
val JodaTime = "joda-time" % "joda-time" % Versions.JodaTime
val Log4JCore = "org.apache.logging.log4j" % "log4j-core" % Versions.Log4J
val Log4JSlf4j = "org.apache.logging.log4j" % "log4j-slf4j-impl" % Versions.Log4J
val NscalaTime = "com.github.nscala-time" %% "nscala-time" % Versions.NscalaTime
val ScalaCheck = "org.scalacheck" %% "scalacheck" % Versions.ScalaCheck
val ScalaCollectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0"
val ScalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % Versions.ScalaLogging
val ScalaPool = "io.github.andrebeat" %% "scala-pool" % Versions.ScalaPool
val ScalaTestCore = "org.scalatest" %% "scalatest-core" % Versions.ScalaTest
val Shapeless = "com.chuusai" %% "shapeless" % Versions.Shapeless
val SimpleJmx = "com.j256.simplejmx" % "simplejmx" % Versions.SimpleJmx
val Specs2Common = "org.specs2" %% "specs2-common" % Versions.Specs2
val Specs2Core = "org.specs2" %% "specs2-core" % Versions.Specs2
val Specs2JUnit = "org.specs2" %% "specs2-junit" % Versions.Specs2
val Specs2Matcher = "org.specs2" %% "specs2-matcher" % Versions.Specs2
val Specs2ScalaCheck = "org.specs2" %% "specs2-scalacheck" % Versions.Specs2
val Squants = "org.typelevel" %% "squants" % Versions.Squants
val SshJ = "com.hierynomus" % "sshj" % Versions.SshJ
val TypesafeConfig = "com.typesafe" % "config" % Versions.TypesafeConfig
val UnirestJava = "com.mashape.unirest" % "unirest-java" % Versions.UnirestJava
// scalafmt: { maxColumn = 120 }
}

0 comments on commit 6005faa

Please sign in to comment.