Skip to content

Commit

Permalink
Update scalafmt-core to 3.8.6 (#176)
Browse files Browse the repository at this point in the history
* Update scalafmt-core to 3.8.6

* Reformat with scalafmt 3.8.6

Executed command: scalafmt --non-interactive

* Add 'Reformat with scalafmt 3.8.6' to .git-blame-ignore-revs
  • Loading branch information
scala-steward authored Jan 24, 2025
1 parent 8f7c202 commit be6f90f
Show file tree
Hide file tree
Showing 112 changed files with 2,203 additions and 1,672 deletions.
2 changes: 2 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Scala Steward: Reformat with scalafmt 3.8.6
c5e0595371a88f64dd1f32c9f226cea46132ccc7
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
preset = defaultWithAlign
//@formatter on
version = 3.7.17
version = 3.8.6
runner.dialect = scala3
align.preset = more
danglingParentheses.preset = true
Expand Down
26 changes: 14 additions & 12 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ sonatypeProfileName := "com.crobox"

lazy val root = (project in file("."))
.settings(
publish := {},
publish := {},
publishArtifact := false,
inThisBuild(
List(
organization := "com.crobox.clickhouse",
homepage := Some(url("https://github.com/crobox/clickhouse-scala-client")),
licenses := List("The GNU Lesser General Public License, Version 3.0" -> url("http://www.gnu.org/licenses/lgpl-3.0.txt")),
homepage := Some(url("https://github.com/crobox/clickhouse-scala-client")),
licenses := List(
"The GNU Lesser General Public License, Version 3.0" -> url("http://www.gnu.org/licenses/lgpl-3.0.txt")
),
developers := List(
Developer(
"crobox",
Expand All @@ -21,10 +23,10 @@ lazy val root = (project in file("."))
url("https://crobox.com")
)
),
scalaVersion := "2.13.16",
scalaVersion := "2.13.16",
crossScalaVersions := List("2.13.16", "3.3.1"),
javacOptions ++= Seq("-g", "-Xlint:unchecked", "-Xlint:deprecation", "-source", "11", "-target", "11"),
scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature", "-language:_", "-encoding", "UTF-8"),
scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature", "-language:_", "-encoding", "UTF-8")
)
),
name := "clickhouse"
Expand All @@ -35,36 +37,36 @@ lazy val client: Project = (project in file("client"))
.configs(Config.CustomIntegrationTest)
.settings(Config.testSettings: _*)
.settings(
name := "client",
name := "client",
sbtrelease.ReleasePlugin.autoImport.releasePublishArtifactsAction := PgpKeys.publishSigned.value,
libraryDependencies ++= Seq(
"io.spray" %% "spray-json" % "1.3.6",
"org.apache.pekko" %% "pekko-actor" % PekkoVersion,
"org.apache.pekko" %% "pekko-stream" % PekkoVersion,
"org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5",
"joda-time" % "joda-time" % "2.13.0"
) ++ Seq("org.apache.pekko" %% "pekko-testkit" % PekkoVersion % Test) ++ Build.testDependencies.map(_ % Test)
"joda-time" % "joda-time" % "2.13.0"
) ++ Seq("org.apache.pekko" %% "pekko-testkit" % PekkoVersion % Test) ++ Build.testDependencies.map(_ % Test)
)

lazy val dsl = (project in file("dsl"))
.dependsOn(client, client % "test->test", testkit % Test)
.configs(Config.CustomIntegrationTest)
.settings(Config.testSettings: _*)
.settings(
name := "dsl",
name := "dsl",
sbtrelease.ReleasePlugin.autoImport.releasePublishArtifactsAction := PgpKeys.publishSigned.value,
libraryDependencies ++= Seq(
"com.google.guava" % "guava" % "33.4.0-jre",
"com.typesafe" % "config" % "1.4.3"
"com.google.guava" % "guava" % "33.4.0-jre",
"com.typesafe" % "config" % "1.4.3"
)
)
// .settings(excludeDependencies ++= Seq(ExclusionRule("org.apache.pekko")))

lazy val testkit = (project in file("testkit"))
.dependsOn(client)
.settings(
name := "testkit",
name := "testkit",
sbtrelease.ReleasePlugin.autoImport.releasePublishArtifactsAction := PgpKeys.publishSigned.value,
libraryDependencies ++= Build.testDependencies
)
80 changes: 44 additions & 36 deletions client/src/main/scala/com/crobox/clickhouse/ClickhouseClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ import scala.concurrent.{Await, ExecutionContext, Future}
/**
* Async clickhouse client using Pekko Http and Streams
*
* @author Sjoerd Mulder
* @author
* Sjoerd Mulder
* @since 31-03-17
*/
class ClickhouseClient(configuration: Option[Config] = None,
override val customConnectionContext: Option[HttpsConnectionContext] = None)
extends ClickHouseExecutor
class ClickhouseClient(
configuration: Option[Config] = None,
override val customConnectionContext: Option[HttpsConnectionContext] = None
) extends ClickHouseExecutor
with ClickhouseResponseParser
with ClickhouseQueryBuilder {

Expand All @@ -42,33 +44,38 @@ class ClickhouseClient(configuration: Option[Config] = None,
/**
* Execute a read-only query on Clickhouse
*
* @param sql a valid Clickhouse SQL string
* @return Future with the result that clickhouse returns
* @param sql
* a valid Clickhouse SQL string
* @return
* Future with the result that clickhouse returns
*/
def query(sql: String)(implicit settings: QuerySettings = QuerySettings(ReadQueries)): Future[String] =
executeRequest(sql, settings.copy(readOnly = ReadQueries, idempotent = settings.idempotent.orElse(Some(true))))

/**
* Execute a read-only query on Clickhouse
* Experimental api, may change in the future.
* Execute a read-only query on Clickhouse Experimental api, may change in the future.
*
* @param sql a valid Clickhouse SQL string
* @return stream with the query progress (started/rejected/finished/failed) which materializes with the query result
* @param sql
* a valid Clickhouse SQL string
* @return
* stream with the query progress (started/rejected/finished/failed) which materializes with the query result
*/
def queryWithProgress(sql: String)(
implicit settings: QuerySettings = QuerySettings(ReadQueries)
def queryWithProgress(sql: String)(implicit
settings: QuerySettings = QuerySettings(ReadQueries)
): Source[QueryProgress, Future[String]] =
executeRequestWithProgress(
sql,
settings.copy(readOnly = ReadQueries, idempotent = settings.idempotent.orElse(Some(true)))
)

/**
* Execute a query that is modifying the state of the database. e.g. INSERT, SET, CREATE TABLE.
* For security purposes SELECT and SHOW queries are not allowed, use the .query() method for those.
* Execute a query that is modifying the state of the database. e.g. INSERT, SET, CREATE TABLE. For security purposes
* SELECT and SHOW queries are not allowed, use the .query() method for those.
*
* @param sql a valid Clickhouse SQL string
* @return Future with the result that clickhouse returns
* @param sql
* a valid Clickhouse SQL string
* @return
* Future with the result that clickhouse returns
*/
def execute(sql: String)(implicit settings: QuerySettings = QuerySettings(AllQueries)): Future[String] =
Future {
Expand All @@ -87,16 +94,17 @@ class ClickhouseClient(configuration: Option[Config] = None,
/**
* Creates a stream of the SQL query that will delimit the result from Clickhouse on new-line
*
* @param sql a valid Clickhouse SQL string
* @param sql
* a valid Clickhouse SQL string
*/
def source(sql: String)(implicit settings: QuerySettings = QuerySettings(ReadQueries)): Source[String, NotUsed] =
sourceByteString(sql).via(Framing.delimiter(ByteString("\n"), MaximumFrameLength)).map(_.utf8String)

/**
* Creates a stream of the SQL query that will emit every result as a ByteString
* It will not retry the queries.
* Creates a stream of the SQL query that will emit every result as a ByteString It will not retry the queries.
*
* @param sql a valid Clickhouse SQL string
* @param sql
* a valid Clickhouse SQL string
*/
def sourceByteString(
sql: String
Expand All @@ -108,22 +116,24 @@ class ClickhouseClient(configuration: Option[Config] = None,
.flatMapConcat(response => response.entity.withoutSizeLimit().dataBytes)

/**
* Accepts a source of Strings that it will stream to Clickhouse
* It will not retry the query as this will run the source once for every retry and might have
* unexpected consequences.
* Accepts a source of Strings that it will stream to Clickhouse It will not retry the query as this will run the
* source once for every retry and might have unexpected consequences.
*
* @param sql a valid Clickhouse SQL INSERT statement
* @param source the Source with strings
* @return Future with the result that clickhouse returns
* @param sql
* a valid Clickhouse SQL INSERT statement
* @param source
* the Source with strings
* @return
* Future with the result that clickhouse returns
*/
def sink(sql: String, source: Source[ByteString, Any])(
implicit settings: QuerySettings = QuerySettings(AllQueries)
def sink(sql: String, source: Source[ByteString, Any])(implicit
settings: QuerySettings = QuerySettings(AllQueries)
): Future[String] = {
val entity = HttpEntity.apply(ContentTypes.`text/plain(UTF-8)`, source)
executeRequestInternal(hostBalancer.nextHost, sql, queryIdentifier, settings, Option(entity), None)
}

val serverVersion: ClickhouseServerVersion = {
val serverVersion: ClickhouseServerVersion =
try {
val path = "crobox.clickhouse.server.version"
val cfg = configuration.getOrElse(ConfigFactory.load())
Expand All @@ -132,12 +142,11 @@ class ClickhouseClient(configuration: Option[Config] = None,
} else {
Await.result(
query("select version()")(QuerySettings(ReadQueries).copy(retries = Option(0)))
.recover {
case x: ClickhouseException =>
val key = "(version "
val idx = x.getMessage.indexOf(key)
if (idx > 0) x.getMessage.substring(idx + key.length, x.getMessage.indexOf(")", idx + key.length))
else "Unknown"
.recover { case x: ClickhouseException =>
val key = "(version "
val idx = x.getMessage.indexOf(key)
if (idx > 0) x.getMessage.substring(idx + key.length, x.getMessage.indexOf(")", idx + key.length))
else "Unknown"
}
.map(ClickhouseServerVersion(_)),
5.seconds
Expand All @@ -151,5 +160,4 @@ class ClickhouseClient(configuration: Option[Config] = None,
logger.error(s"Can't determine Clickhouse Server Version. Falling back to: $latest. Error: ${x.getMessage}", x)
latest
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,31 @@ import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}

/**
* Host balancer that does a round robin on all the entries found in the `system.clusters` table.
* It assumes that the service itself can access directly the clickhouse nodes and that the default port `8123` is used
* for every node.
**/
case class ClusterAwareHostBalancer(host: Uri,
cluster: String = "cluster",
manager: ActorRef,
scanningInterval: FiniteDuration)(
implicit system: ActorSystem,
* Host balancer that does a round robin on all the entries found in the `system.clusters` table. It assumes that the
* service itself can access directly the clickhouse nodes and that the default port `8123` is used for every node.
*/
case class ClusterAwareHostBalancer(
host: Uri,
cluster: String = "cluster",
manager: ActorRef,
scanningInterval: FiniteDuration
)(implicit
system: ActorSystem,
connectionRetrievalTimeout: Timeout,
ec: ExecutionContext
) extends HostBalancer {

ClusterConnectionFlow
.clusterConnectionsFlow(Future.successful(host), scanningInterval, cluster)
.withAttributes(
ActorAttributes.supervisionStrategy({
ActorAttributes.supervisionStrategy {
case ex: IllegalArgumentException =>
logger.error("Failed resolving hosts for cluster, stopping the flow.", ex)
Supervision.stop
case ex =>
logger.error("Failed resolving hosts for cluster, resuming.", ex)
Supervision.Resume
})
}
)
.runWith(Sink.actorRef(manager, LogDeadConnections, throwable => logger.error(throwable.getMessage, throwable)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ object HostBalancer extends ClickhouseHostBuilder {
case SingleHost => SingleHostBalancer(connectionHostFromConfig)
case BalancingHosts =>
val manager = system.actorOf(ConnectionManagerActor.props(ClickhouseHostHealth.healthFlow(_)))
MultiHostBalancer(connectionConfig
.getConfigList("hosts")
.asScala
.toSet
.map((config: Config) => extractHost(config)),
manager)
MultiHostBalancer(
connectionConfig
.getConfigList("hosts")
.asScala
.toSet
.map((config: Config) => extractHost(config)),
manager
)
case ClusterAware =>
val manager = system.actorOf(ConnectionManagerActor.props(ClickhouseHostHealth.healthFlow(_)))
ClusterAwareHostBalancer(
Expand All @@ -48,6 +50,8 @@ object HostBalancer extends ClickhouseHostBuilder {
}

def extractHost(connectionConfig: Config): Uri =
toHost(connectionConfig.getString("host"),
if (connectionConfig.hasPath("port")) Option(connectionConfig.getInt("port")) else None)
toHost(
connectionConfig.getString("host"),
if (connectionConfig.hasPath("port")) Option(connectionConfig.getInt("port")) else None
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import scala.concurrent.Future

/**
* The default host balancer which always provides the same host.
**/
*/
case class SingleHostBalancer(host: Uri) extends HostBalancer {

override def nextHost: Future[Uri] = Future.successful(host)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@ import com.typesafe.config.Config
import scala.collection.mutable
import scala.concurrent.duration._

class ConnectionManagerActor(healthSource: Uri => Source[ClickhouseHostStatus, Cancellable],
optionalConfig: Option[Config])(
implicit materializer: Materializer
class ConnectionManagerActor(
healthSource: Uri => Source[ClickhouseHostStatus, Cancellable],
optionalConfig: Option[Config]
)(implicit
materializer: Materializer
) extends Actor
with ActorLogging
with Stash {

import ConnectionManagerActor._

private val config = optionalConfig.getOrElse(context.system.settings.config).getConfig("connection")
private val config = optionalConfig.getOrElse(context.system.settings.config).getConfig("connection")
private val fallbackToConfigurationHost = config.getBoolean("fallback-to-config-host-during-initialization")

// state
Expand All @@ -38,7 +40,7 @@ class ConnectionManagerActor(healthSource: Uri => Source[ClickhouseHostStatus, C
override def receive: Receive = {
case Connections(hosts) =>
hosts
.foreach(host => {
.foreach(host =>
if (!currentConfiguredHosts.contains(host)) {
log.info(s"Setting up host health checks for host $host")
hostHealthScheduler.put(
Expand All @@ -50,7 +52,7 @@ class ConnectionManagerActor(healthSource: Uri => Source[ClickhouseHostStatus, C
.run()
)
}
})
)
currentConfiguredHosts = hosts

case GetConnection() =>
Expand Down Expand Up @@ -99,8 +101,8 @@ class ConnectionManagerActor(healthSource: Uri => Source[ClickhouseHostStatus, C
}

case LogDeadConnections =>
val deadHosts = hostsStatus.values.collect {
case Dead(host, _) => host
val deadHosts = hostsStatus.values.collect { case Dead(host, _) =>
host
}
if (deadHosts.nonEmpty)
log.error(s"Hosts ${deadHosts.mkString(" - ")} are still unreachable")
Expand Down
Loading

0 comments on commit be6f90f

Please sign in to comment.